diff options
author | Olly Cope <olly@ollycope.com> | 2018-01-29 15:08:14 +0000 |
---|---|---|
committer | Olly Cope <olly@ollycope.com> | 2018-01-29 15:08:14 +0000 |
commit | 927babde3aa62cf3c7c7a67c8ba323af78b72bb7 (patch) | |
tree | 4ef974b82153235fd415328ea9d4bea12dc2bdb3 | |
parent | 24ba3b04ee98b4846c1ecf4539117d7fda4d8450 (diff) | |
parent | c6bd09b841efdaef7cb76ddf120f25db13f9bd0a (diff) | |
download | yoyo-927babde3aa62cf3c7c7a67c8ba323af78b72bb7.tar.gz |
Merged in Artimi/yoyo (pull request #10)
-rwxr-xr-x | README.rst | 6 | ||||
-rwxr-xr-x | yoyo/__init__.py | 2 | ||||
-rw-r--r-- | yoyo/backends.py | 119 | ||||
-rwxr-xr-x | yoyo/connections.py | 7 | ||||
-rwxr-xr-x | yoyo/exceptions.py | 6 | ||||
-rwxr-xr-x | yoyo/migrations.py | 4 | ||||
-rwxr-xr-x | yoyo/scripts/main.py | 4 | ||||
-rwxr-xr-x | yoyo/scripts/migrate.py | 38 | ||||
-rw-r--r-- | yoyo/tests/conftest.py | 24 | ||||
-rw-r--r-- | yoyo/tests/test_backends.py | 84 | ||||
-rw-r--r-- | yoyo/tests/test_cli_script.py | 30 | ||||
-rw-r--r-- | yoyo/tests/test_connections.py | 4 | ||||
-rw-r--r-- | yoyo/tests/test_migrations.py | 26 |
13 files changed, 246 insertions, 108 deletions
@@ -332,10 +332,12 @@ Using yoyo from python code The following example shows how to apply migrations from inside python code:: - from yoyo import read_migrations, get_backend + from yoyo import read_migrations + from yoyo import get_backend backend = get_backend('postgres://myuser@localhost/mydatabase') migrations = read_migrations('path/to/migrations') - backend.apply_migrations(backend.to_apply(migrations)) + with backend.lock(): + backend.apply_migrations(backend.to_apply(migrations)) .. :vim:sw=4:et diff --git a/yoyo/__init__.py b/yoyo/__init__.py index 32b9765..e469c9c 100755 --- a/yoyo/__init__.py +++ b/yoyo/__init__.py @@ -24,4 +24,4 @@ from yoyo.migrations import (read_migrations, # noqa from yoyo.connections import get_backend # noqa -__version__ = '5.0.6.dev0' +__version__ = '5.1.0.dev0' diff --git a/yoyo/backends.py b/yoyo/backends.py index 9e9490a..ea062b0 100644 --- a/yoyo/backends.py +++ b/yoyo/backends.py @@ -17,6 +17,8 @@ from contextlib import contextmanager from importlib import import_module from itertools import count from logging import getLogger +import os +import time from . import exceptions, utils from .migrations import topological_sort @@ -104,11 +106,19 @@ class DatabaseBackend(object): driver_module = None connection = None - create_table_sql = """ + lock_table = '_yoyo_lock' + create_migration_table_sql = """ CREATE TABLE {table_name} ( id VARCHAR(255) NOT NULL PRIMARY KEY, ctime TIMESTAMP )""" + create_lock_table_sql = """ + CREATE TABLE {table_name} ( + locked INT DEFAULT 1, + ctime TIMESTAMP, + pid INT NOT NULL, + PRIMARY KEY (locked) + )""" list_tables_sql = "SELECT table_name FROM information_schema.tables" is_applied_sql = "SELECT COUNT(1) FROM {0.migration_table} WHERE id=?" insert_migration_sql = ("INSERT INTO {0.migration_table} (id, ctime) " @@ -125,7 +135,7 @@ class DatabaseBackend(object): self.DatabaseError = self.driver.DatabaseError self._connection = self.connect(dburi) self.migration_table = migration_table - self.create_migrations_table() + self.create_tables() self.has_transactional_ddl = self._check_transactional_ddl() def _load_driver_module(self): @@ -226,11 +236,55 @@ class DatabaseBackend(object): yield @contextmanager - def lock_migration_table(self): + def lock(self, timeout=10): """ - Lock `migrations_table` to prevent concurrent migrations. - """ - yield + Create a lock to prevent concurrent migrations. + + :param timeout: duration in seconds before raising a LockTimeout error. + """ + + pid = os.getpid() + self._insert_lock_row(pid, timeout) + try: + yield + finally: + self._delete_lock_row(pid) + + def _insert_lock_row(self, pid, timeout, poll_interval=0.5): + started = time.time() + while True: + try: + with self.transaction(): + self.execute("INSERT INTO {} (locked, ctime, pid) " + "VALUES (1, ?, ?)".format(self.lock_table), + (datetime.utcnow(), pid)) + except self.DatabaseError: + if timeout and time.time() > started + timeout: + cursor = self.execute("SELECT pid FROM {}" + .format(self.lock_table)) + row = cursor.fetchone() + if row: + raise exceptions.LockTimeout( + "Process {} has locked this database " + "(run yoyo break-lock to remove this lock)" + .format(row[0])) + else: + raise exceptions.LockTimeout( + "Database locked " + "(run yoyo break-lock to remove this lock)") + time.sleep(poll_interval) + else: + return + + def _delete_lock_row(self, pid): + with self.transaction(): + self.execute("DELETE FROM {} WHERE pid=?" + .format(self.lock_table), + (pid,)) + + def break_lock(self): + with self.transaction(): + self.execute("DELETE FROM {}" .format(self.lock_table)) def execute(self, stmt, args=tuple()): """ @@ -241,21 +295,21 @@ class DatabaseBackend(object): cursor.execute(self._with_placeholders(stmt), args) return cursor - def create_migrations_table(self): + def create_tables(self): """ - Create the migrations table if it does not already exist. + Create the migrations and lock tables if they do not already exist. """ - sql = self.create_table_sql.format(table_name=self.migration_table) - try: - with self.transaction(): - self.get_applied_migration_ids() - table_exists = True - except self.DatabaseError: - table_exists = False + statements = [ + self.create_migration_table_sql.format(table_name=self.migration_table), + self.create_lock_table_sql.format(table_name=self.lock_table) + ] - if not table_exists: - with self.transaction(): - self.execute(sql) + for stmt in statements: + try: + with self.transaction(): + self.execute(stmt) + except self.DatabaseError: + pass def _with_placeholders(self, sql): placeholder_gen = {'qmark': '?', @@ -309,12 +363,11 @@ class DatabaseBackend(object): """ if not migrations: return - with self.lock_migration_table(): - for m in migrations: - try: - self.apply_one(m, force=force) - except exceptions.BadMigration: - continue + for m in migrations: + try: + self.apply_one(m, force=force) + except exceptions.BadMigration: + continue def run_post_apply(self, migrations, force=False): """ @@ -447,17 +500,16 @@ class PostgresqlBackend(DatabaseBackend): driver_module = 'psycopg2' def connect(self, dburi): - connargs = [] + connect_args = {'dbname': dburi.database} if dburi.username is not None: - connargs.append('user=%s' % dburi.username) + connect_args['user'] = dburi.username if dburi.password is not None: - connargs.append('password=%s' % dburi.password) + connect_args['password'] = dburi.password if dburi.port is not None: - connargs.append('port=%d' % dburi.port) + connect_args['port'] = dburi.port if dburi.hostname is not None: - connargs.append('host=%s' % dburi.hostname) - connargs.append('dbname=%s' % dburi.database) - return self.driver.connect(' '.join(connargs)) + connect_args['host'] = dburi.hostname + return self.driver.connect(**connect_args) @contextmanager def disable_transactions(self): @@ -466,8 +518,3 @@ class PostgresqlBackend(DatabaseBackend): self.connection.autocommit = True yield self.connection.autocommit = saved - - @contextmanager - def lock_migration_table(self): - self.execute('LOCK TABLE {table_name} IN ACCESS EXCLUSIVE MODE'.format(table_name=self.migration_table)) - yield diff --git a/yoyo/connections.py b/yoyo/connections.py index 410c6a0..777692d 100755 --- a/yoyo/connections.py +++ b/yoyo/connections.py @@ -57,14 +57,17 @@ class DatabaseURI(_DatabaseURI): else: return hostpart - @property - def uri(self): + def __str__(self): return urlunsplit((self.scheme, self.netloc, self.database, urlencode(self.args), '')) + @property + def uri(self): + return str(self) + class BadConnectionURI(Exception): """ diff --git a/yoyo/exceptions.py b/yoyo/exceptions.py index 606c51c..03ddd8f 100755 --- a/yoyo/exceptions.py +++ b/yoyo/exceptions.py @@ -29,3 +29,9 @@ class MigrationConflict(Exception): """ The migration id conflicts with another migration """ + + +class LockTimeout(Exception): + """ + Timeout was reached while acquiring the migration lock + """ diff --git a/yoyo/migrations.py b/yoyo/migrations.py index b07f0af..e4c799f 100755 --- a/yoyo/migrations.py +++ b/yoyo/migrations.py @@ -111,8 +111,8 @@ class Migration(object): except backend.DatabaseError: exc_info = sys.exc_info() - if not (backend.has_transactional_ddl or - not self.use_transactions): + if not backend.has_transactional_ddl or \ + not self.use_transactions: # Any DDL statements that have been executed have been # committed. Go through the rollback steps to undo # these inasmuch is possible. diff --git a/yoyo/scripts/main.py b/yoyo/scripts/main.py index 4142c03..5bb7c14 100755 --- a/yoyo/scripts/main.py +++ b/yoyo/scripts/main.py @@ -262,7 +262,8 @@ def main(argv=None): if args.sources: config.set('DEFAULT', 'sources', ' '.join(args.sources)) if args.database: - config.set('DEFAULT', 'database', args.database) + # ConfigParser requires that any percent signs in the db uri be escaped. + config.set('DEFAULT', 'database', args.database.replace('%', '%%')) config.set('DEFAULT', 'migration_table', args.migration_table) config.set('DEFAULT', 'batch_mode', 'on' if args.batch_mode else 'off') config.set('DEFAULT', 'verbosity', str(args.verbosity)) @@ -277,7 +278,6 @@ def main(argv=None): argparser.error(e.args[0]) if config_is_empty and args.use_config_file and not args.batch_mode: - prompt_save_config(config, args.config or CONFIG_FILENAME) diff --git a/yoyo/scripts/migrate.py b/yoyo/scripts/migrate.py index c18d02e..2e19700 100755 --- a/yoyo/scripts/migrate.py +++ b/yoyo/scripts/migrate.py @@ -96,6 +96,12 @@ def install_argparsers(global_parser, subparsers): help="Unmark applied migrations, without rolling them back") parser_unmark.set_defaults(func=unmark, command_name='unmark') + parser_break_lock = subparsers.add_parser( + 'break_lock', + parents=[global_parser], + help="Break migration locks") + parser_break_lock.set_defaults(func=break_lock, command_name='break-lock') + def get_migrations(args, backend): @@ -160,33 +166,43 @@ def get_migrations(args, backend): def apply(args, config): backend = get_backend(args, config) - migrations = get_migrations(args, backend) - backend.apply_migrations(migrations, args.force) + with backend.lock(): + migrations = get_migrations(args, backend) + backend.apply_migrations(migrations, args.force) def reapply(args, config): backend = get_backend(args, config) - migrations = get_migrations(args, backend) - backend.rollback_migrations(migrations, args.force) - backend.apply_migrations(migrations, args.force) + with backend.lock(): + migrations = get_migrations(args, backend) + backend.rollback_migrations(migrations, args.force) + backend.apply_migrations(migrations, args.force) def rollback(args, config): backend = get_backend(args, config) - migrations = get_migrations(args, backend) - backend.rollback_migrations(migrations, args.force) + with backend.lock(): + migrations = get_migrations(args, backend) + backend.rollback_migrations(migrations, args.force) def mark(args, config): backend = get_backend(args, config) - migrations = get_migrations(args, backend) - backend.mark_migrations(migrations) + with backend.lock(): + migrations = get_migrations(args, backend) + backend.mark_migrations(migrations) def unmark(args, config): backend = get_backend(args, config) - migrations = get_migrations(args, backend) - backend.unmark_migrations(migrations) + with backend.lock(): + migrations = get_migrations(args, backend) + backend.unmark_migrations(migrations) + + +def break_lock(args, config): + backend = get_backend(args, config) + backend.break_lock() def prompt_migrations(backend, migrations, direction): diff --git a/yoyo/tests/conftest.py b/yoyo/tests/conftest.py index d57c004..bba941d 100644 --- a/yoyo/tests/conftest.py +++ b/yoyo/tests/conftest.py @@ -1,20 +1,38 @@ import pytest +from yoyo import backends from yoyo.connections import get_backend +from yoyo.tests import get_test_backends from yoyo.tests import get_test_dburis -@pytest.yield_fixture(params=get_test_dburis()) -def backend_fixture(request): +@pytest.fixture(params=get_test_dburis()) +def backend(request): """ Return all backends configured in ``test_databases.ini`` """ backend = get_backend(request.param) + with backend.transaction(): + if backend.__class__ is backends.MySQLBackend: + backend.execute('CREATE TABLE _yoyo_t ' + '(id CHAR(1) primary key) ' + 'ENGINE=InnoDB') + else: + backend.execute('CREATE TABLE _yoyo_t ' + '(id CHAR(1) primary key)') try: yield backend finally: backend.rollback() - for table in (backend.list_tables()): + for table in backend.list_tables(): + if table.startswith('_yoyo'): + with backend.transaction(): + backend.execute('DROP TABLE {}'.format(table)) + + +def pytest_configure(config): + for backend in get_test_backends(): + for table in backend.list_tables(): if table.startswith('_yoyo'): with backend.transaction(): backend.execute("DROP TABLE {}".format(table)) diff --git a/yoyo/tests/test_backends.py b/yoyo/tests/test_backends.py index d567689..019ec26 100644 --- a/yoyo/tests/test_backends.py +++ b/yoyo/tests/test_backends.py @@ -1,32 +1,16 @@ import pytest -import time from threading import Thread +import time -from yoyo import backends, read_migrations +from yoyo import backends +from yoyo import read_migrations +from yoyo import exceptions from yoyo.tests import get_test_backends from yoyo.tests import with_migrations class TestTransactionHandling(object): - @pytest.fixture(autouse=True, params=get_test_backends()) - def backend(self, request): - backend = request.param - with backend.transaction(): - if backend.__class__ is backends.MySQLBackend: - backend.execute("CREATE TABLE _yoyo_t " - "(id CHAR(1) primary key) " - "ENGINE=InnoDB") - else: - backend.execute("CREATE TABLE _yoyo_t " - "(id CHAR(1) primary key)") - yield backend - backend.rollback() - for table in (backend.list_tables()): - if table.startswith('_yoyo'): - with backend.transaction(): - backend.execute("DROP TABLE {}".format(table)) - def test_it_commits(self, backend): with backend.transaction(): backend.execute("INSERT INTO _yoyo_t values ('A')") @@ -109,22 +93,54 @@ class TestTransactionHandling(object): As far as I know this behavior is PostgreSQL specific. We can't run this test in sqlite as it does not support CREATE DATABASE. """ - from yoyo import read_migrations for backend in get_test_backends(exclude={'sqlite'}): migrations = read_migrations(tmpdir) backend.apply_migrations(migrations) backend.rollback_migrations(migrations) - - @with_migrations(a=""" - steps = [ - step("SELECT pg_sleep(1)"), - ] - """) - def test_lock_migration_table(self, tmpdir): - backend = get_test_backends(only={'postgresql'})[0] - migrations = read_migrations(tmpdir) - Thread(target = backend.apply_migrations, args = (migrations,)).start() - # give a chance to start, but wake up in the middle of applying - time.sleep(0.1) - assert backend.get_applied_migration_ids() == ['a'] + def test_lock(self, backend): + """ + Test that :meth:`~yoyo.backends.DatabaseBackend.lock` + acquires an exclusive lock + """ + if backend.uri.scheme == 'sqlite': + pytest.skip("Concurrency tests not supported for sqlite databases") + + lock_duration = 0.2 + + def do_something_with_lock(): + with backend.lock(): + time.sleep(lock_duration) + + thread = Thread(target=do_something_with_lock) + t = time.time() + thread.start() + # Give the thread time to acquire the lock, but not enough + # to complete + time.sleep(lock_duration * 0.2) + with backend.lock(): + delta = time.time() - t + assert delta >= lock_duration + + thread.join() + + def test_lock_times_out(self, backend): + + if backend.uri.scheme == 'sqlite': + pytest.skip("Concurrency tests not supported for sqlite databases") + + def do_something_with_lock(): + with backend.lock(): + time.sleep(lock_duration) + + lock_duration = 2 + thread = Thread(target=do_something_with_lock) + thread.start() + # Give the thread time to acquire the lock, but not enough + # to complete + time.sleep(lock_duration * 0.1) + with pytest.raises(exceptions.LockTimeout): + with backend.lock(timeout=lock_duration * 0.1): + assert False, "Execution should never reach this point" + + thread.join() diff --git a/yoyo/tests/test_cli_script.py b/yoyo/tests/test_cli_script.py index 6bcef48..7d5ed11 100644 --- a/yoyo/tests/test_cli_script.py +++ b/yoyo/tests/test_cli_script.py @@ -26,6 +26,7 @@ import re from mock import Mock, patch, call import frozendate +import pytest import tms from yoyo import read_migrations @@ -210,6 +211,35 @@ class TestYoyoScript(TestInteractiveScript): assert self.prompt.call_count == 0 assert self.confirm.call_count == 0 + def test_concurrent_instances_do_not_conflict(self, backend): + import threading + from functools import partial + + if backend.uri.scheme == 'sqlite': + pytest.skip("Concurrency tests not supported for sqlite databases") + + with with_migrations(m1=('import time\n' + 'step(lambda conn: time.sleep(0.1))\n' + 'step("INSERT INTO _yoyo_t VALUES (\'A\')")') + ) as tmpdir: + assert '_yoyo_t' in backend.list_tables() + backend.rollback() + backend.execute("SELECT * FROM _yoyo_t") + run_migrations = partial( + main, + ['apply', '-b', tmpdir, '--database', str(backend.uri)]) + threads = [threading.Thread(target=run_migrations) + for ix in range(20)] + for t in threads: + t.start() + for t in threads: + t.join() + + # Exactly one instance of the migration script should have succeeded + backend.rollback() + cursor = backend.execute('SELECT COUNT(1) from _yoyo_t') + assert cursor.fetchone()[0] == 1 + class TestArgParsing(TestInteractiveScript): diff --git a/yoyo/tests/test_connections.py b/yoyo/tests/test_connections.py index f3b7440..9e5808c 100644 --- a/yoyo/tests/test_connections.py +++ b/yoyo/tests/test_connections.py @@ -77,8 +77,8 @@ def test_connections(import_module): db='northwind', foo='bar')), (backends.SQLiteBackend, 'sqlite3', call('northwind')), (backends.PostgresqlBackend, 'psycopg2', - call('user=scott password=tiger port=42 ' - 'host=db.example.org dbname=northwind')), + call(user='scott', password='tiger', port=42, + host='db.example.org', dbname='northwind')), ] diff --git a/yoyo/tests/test_migrations.py b/yoyo/tests/test_migrations.py index 376fcab..3bbd2b1 100644 --- a/yoyo/tests/test_migrations.py +++ b/yoyo/tests/test_migrations.py @@ -122,17 +122,17 @@ def test_rollbackignores_errors(tmpdir): assert cursor.fetchall() == [] -def test_migration_is_committed(backend_fixture): +def test_migration_is_committed(backend): with migrations_dir('step("CREATE TABLE _yoyo_test (id INT)")') as tmpdir: migrations = read_migrations(tmpdir) - backend_fixture.apply_migrations(migrations) + backend.apply_migrations(migrations) - backend_fixture.rollback() - rows = backend_fixture.execute("SELECT * FROM _yoyo_test").fetchall() + backend.rollback() + rows = backend.execute("SELECT * FROM _yoyo_test").fetchall() assert list(rows) == [] -def test_rollback_happens_on_step_failure(backend_fixture): +def test_rollback_happens_on_step_failure(backend): with migrations_dir(''' step("", "CREATE TABLE _yoyo_is_rolledback (i INT)"), @@ -140,22 +140,22 @@ def test_rollback_happens_on_step_failure(backend_fixture): "DROP TABLE _yoyo_test") step("invalid sql!")''') as tmpdir: migrations = read_migrations(tmpdir) - with pytest.raises(backend_fixture.DatabaseError): - backend_fixture.apply_migrations(migrations) + with pytest.raises(backend.DatabaseError): + backend.apply_migrations(migrations) # The _yoyo_test table should have either been deleted (transactional ddl) # or dropped (non-transactional-ddl) - with pytest.raises(backend_fixture.DatabaseError): - backend_fixture.execute("SELECT * FROM _yoyo_test") + with pytest.raises(backend.DatabaseError): + backend.execute("SELECT * FROM _yoyo_test") # Transactional DDL: rollback steps not executed - if backend_fixture.has_transactional_ddl: - with pytest.raises(backend_fixture.DatabaseError): - backend_fixture.execute("SELECT * FROM _yoyo_is_rolledback") + if backend.has_transactional_ddl: + with pytest.raises(backend.DatabaseError): + backend.execute("SELECT * FROM _yoyo_is_rolledback") # Non-transactional DDL: ensure the rollback steps were executed else: - cursor = backend_fixture.execute("SELECT * FROM _yoyo_is_rolledback") + cursor = backend.execute("SELECT * FROM _yoyo_is_rolledback") assert list(cursor.fetchall()) == [] |