diff --git a/luigi/db_task_history.py b/luigi/db_task_history.py index 6847c71f6d..187f21e7ad 100644 --- a/luigi/db_task_history.py +++ b/luigi/db_task_history.py @@ -49,10 +49,19 @@ import sqlalchemy.orm import sqlalchemy.orm.collections from sqlalchemy.engine import reflection + Base = sqlalchemy.ext.declarative.declarative_base() logger = logging.getLogger('luigi-interface') +if sqlalchemy.__version__.startswith('2'): + logger.warning('SQLAlchemy 2.x is not tested with luigi.db_task_history.DbTaskHistory') + from sqlalchemy import text + +else: + def text(sql): + return sql + class DbTaskHistory(task_history.TaskHistory): """ @@ -258,25 +267,26 @@ def _upgrade_schema(engine): # Upgrade 1. Add task_id column and index to tasks if 'task_id' not in [x['name'] for x in inspector.get_columns('tasks')]: logger.warning('Upgrading DbTaskHistory schema: Adding tasks.task_id') - conn.execute('ALTER TABLE tasks ADD COLUMN task_id VARCHAR(200)') - conn.execute('CREATE INDEX ix_task_id ON tasks (task_id)') + conn.execute(text('ALTER TABLE tasks ADD COLUMN task_id VARCHAR(200)')) + conn.execute(text('CREATE INDEX ix_task_id ON tasks (task_id)')) # Upgrade 2. Alter value column to be TEXT, note that this is idempotent so no if-guard if 'mysql' in engine.dialect.name: - conn.execute('ALTER TABLE task_parameters MODIFY COLUMN value TEXT') + conn.execute(text('ALTER TABLE task_parameters MODIFY COLUMN value TEXT')) elif 'oracle' in engine.dialect.name: - conn.execute('ALTER TABLE task_parameters MODIFY value TEXT') + conn.execute(text('ALTER TABLE task_parameters MODIFY value TEXT')) elif 'mssql' in engine.dialect.name: - conn.execute('ALTER TABLE task_parameters ALTER COLUMN value TEXT') + conn.execute(text('ALTER TABLE task_parameters ALTER COLUMN value TEXT')) elif 'postgresql' in engine.dialect.name: if str([x for x in inspector.get_columns('task_parameters') if x['name'] == 'value'][0]['type']) != 'TEXT': - conn.execute('ALTER TABLE task_parameters ALTER COLUMN value TYPE TEXT') + conn.execute(text('ALTER TABLE task_parameters ALTER COLUMN value TYPE TEXT')) elif 'sqlite' in engine.dialect.name: # SQLite does not support changing column types. A database file will need # to be used to pickup this migration change. - for i in conn.execute('PRAGMA table_info(task_parameters);').fetchall(): - if i['name'] == 'value' and i['type'] != 'TEXT': + for i in conn.execute(text('PRAGMA table_info(task_parameters);')).fetchall(): + x = i._asdict() + if x['name'] == 'value' and x['type'] != 'TEXT': logger.warning( 'SQLite can not change column types. Please use a new database ' 'to pickup column type changes.' diff --git a/tox.ini b/tox.ini index 2cf6e69e99..abb1092115 100644 --- a/tox.ini +++ b/tox.ini @@ -33,7 +33,7 @@ deps = boto3>=1.11.0 pyhive[presto]==0.6.1 s3transfer>=0.3,<4.0 - sqlalchemy<1.4 + sqlalchemy<2.1 elasticsearch>=1.0.0,<2.0.0 psutil<4.0 cdh,hdp: hdfs>=2.0.4,<3.0.0