summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOlly Cope <olly@ollycope.com>2018-01-29 15:08:14 +0000
committerOlly Cope <olly@ollycope.com>2018-01-29 15:08:14 +0000
commit927babde3aa62cf3c7c7a67c8ba323af78b72bb7 (patch)
tree4ef974b82153235fd415328ea9d4bea12dc2bdb3
parent24ba3b04ee98b4846c1ecf4539117d7fda4d8450 (diff)
parentc6bd09b841efdaef7cb76ddf120f25db13f9bd0a (diff)
downloadyoyo-927babde3aa62cf3c7c7a67c8ba323af78b72bb7.tar.gz
Merged in Artimi/yoyo (pull request #10)
-rwxr-xr-xREADME.rst6
-rwxr-xr-xyoyo/__init__.py2
-rw-r--r--yoyo/backends.py119
-rwxr-xr-xyoyo/connections.py7
-rwxr-xr-xyoyo/exceptions.py6
-rwxr-xr-xyoyo/migrations.py4
-rwxr-xr-xyoyo/scripts/main.py4
-rwxr-xr-xyoyo/scripts/migrate.py38
-rw-r--r--yoyo/tests/conftest.py24
-rw-r--r--yoyo/tests/test_backends.py84
-rw-r--r--yoyo/tests/test_cli_script.py30
-rw-r--r--yoyo/tests/test_connections.py4
-rw-r--r--yoyo/tests/test_migrations.py26
13 files changed, 246 insertions, 108 deletions
diff --git a/README.rst b/README.rst
index 14d62c7..cb9e189 100755
--- a/README.rst
+++ b/README.rst
@@ -332,10 +332,12 @@ Using yoyo from python code
The following example shows how to apply migrations from inside python code::
- from yoyo import read_migrations, get_backend
+ from yoyo import read_migrations
+ from yoyo import get_backend
backend = get_backend('postgres://myuser@localhost/mydatabase')
migrations = read_migrations('path/to/migrations')
- backend.apply_migrations(backend.to_apply(migrations))
+ with backend.lock():
+ backend.apply_migrations(backend.to_apply(migrations))
.. :vim:sw=4:et
diff --git a/yoyo/__init__.py b/yoyo/__init__.py
index 32b9765..e469c9c 100755
--- a/yoyo/__init__.py
+++ b/yoyo/__init__.py
@@ -24,4 +24,4 @@ from yoyo.migrations import (read_migrations, # noqa
from yoyo.connections import get_backend # noqa
-__version__ = '5.0.6.dev0'
+__version__ = '5.1.0.dev0'
diff --git a/yoyo/backends.py b/yoyo/backends.py
index 9e9490a..ea062b0 100644
--- a/yoyo/backends.py
+++ b/yoyo/backends.py
@@ -17,6 +17,8 @@ from contextlib import contextmanager
from importlib import import_module
from itertools import count
from logging import getLogger
+import os
+import time
from . import exceptions, utils
from .migrations import topological_sort
@@ -104,11 +106,19 @@ class DatabaseBackend(object):
driver_module = None
connection = None
- create_table_sql = """
+ lock_table = '_yoyo_lock'
+ create_migration_table_sql = """
CREATE TABLE {table_name} (
id VARCHAR(255) NOT NULL PRIMARY KEY,
ctime TIMESTAMP
)"""
+ create_lock_table_sql = """
+ CREATE TABLE {table_name} (
+ locked INT DEFAULT 1,
+ ctime TIMESTAMP,
+ pid INT NOT NULL,
+ PRIMARY KEY (locked)
+ )"""
list_tables_sql = "SELECT table_name FROM information_schema.tables"
is_applied_sql = "SELECT COUNT(1) FROM {0.migration_table} WHERE id=?"
insert_migration_sql = ("INSERT INTO {0.migration_table} (id, ctime) "
@@ -125,7 +135,7 @@ class DatabaseBackend(object):
self.DatabaseError = self.driver.DatabaseError
self._connection = self.connect(dburi)
self.migration_table = migration_table
- self.create_migrations_table()
+ self.create_tables()
self.has_transactional_ddl = self._check_transactional_ddl()
def _load_driver_module(self):
@@ -226,11 +236,55 @@ class DatabaseBackend(object):
yield
@contextmanager
- def lock_migration_table(self):
+ def lock(self, timeout=10):
"""
- Lock `migrations_table` to prevent concurrent migrations.
- """
- yield
+ Create a lock to prevent concurrent migrations.
+
+ :param timeout: duration in seconds before raising a LockTimeout error.
+ """
+
+ pid = os.getpid()
+ self._insert_lock_row(pid, timeout)
+ try:
+ yield
+ finally:
+ self._delete_lock_row(pid)
+
+ def _insert_lock_row(self, pid, timeout, poll_interval=0.5):
+ started = time.time()
+ while True:
+ try:
+ with self.transaction():
+ self.execute("INSERT INTO {} (locked, ctime, pid) "
+ "VALUES (1, ?, ?)".format(self.lock_table),
+ (datetime.utcnow(), pid))
+ except self.DatabaseError:
+ if timeout and time.time() > started + timeout:
+ cursor = self.execute("SELECT pid FROM {}"
+ .format(self.lock_table))
+ row = cursor.fetchone()
+ if row:
+ raise exceptions.LockTimeout(
+ "Process {} has locked this database "
+ "(run yoyo break-lock to remove this lock)"
+ .format(row[0]))
+ else:
+ raise exceptions.LockTimeout(
+ "Database locked "
+ "(run yoyo break-lock to remove this lock)")
+ time.sleep(poll_interval)
+ else:
+ return
+
+ def _delete_lock_row(self, pid):
+ with self.transaction():
+ self.execute("DELETE FROM {} WHERE pid=?"
+ .format(self.lock_table),
+ (pid,))
+
+ def break_lock(self):
+ with self.transaction():
+ self.execute("DELETE FROM {}" .format(self.lock_table))
def execute(self, stmt, args=tuple()):
"""
@@ -241,21 +295,21 @@ class DatabaseBackend(object):
cursor.execute(self._with_placeholders(stmt), args)
return cursor
- def create_migrations_table(self):
+ def create_tables(self):
"""
- Create the migrations table if it does not already exist.
+ Create the migrations and lock tables if they do not already exist.
"""
- sql = self.create_table_sql.format(table_name=self.migration_table)
- try:
- with self.transaction():
- self.get_applied_migration_ids()
- table_exists = True
- except self.DatabaseError:
- table_exists = False
+ statements = [
+ self.create_migration_table_sql.format(table_name=self.migration_table),
+ self.create_lock_table_sql.format(table_name=self.lock_table)
+ ]
- if not table_exists:
- with self.transaction():
- self.execute(sql)
+ for stmt in statements:
+ try:
+ with self.transaction():
+ self.execute(stmt)
+ except self.DatabaseError:
+ pass
def _with_placeholders(self, sql):
placeholder_gen = {'qmark': '?',
@@ -309,12 +363,11 @@ class DatabaseBackend(object):
"""
if not migrations:
return
- with self.lock_migration_table():
- for m in migrations:
- try:
- self.apply_one(m, force=force)
- except exceptions.BadMigration:
- continue
+ for m in migrations:
+ try:
+ self.apply_one(m, force=force)
+ except exceptions.BadMigration:
+ continue
def run_post_apply(self, migrations, force=False):
"""
@@ -447,17 +500,16 @@ class PostgresqlBackend(DatabaseBackend):
driver_module = 'psycopg2'
def connect(self, dburi):
- connargs = []
+ connect_args = {'dbname': dburi.database}
if dburi.username is not None:
- connargs.append('user=%s' % dburi.username)
+ connect_args['user'] = dburi.username
if dburi.password is not None:
- connargs.append('password=%s' % dburi.password)
+ connect_args['password'] = dburi.password
if dburi.port is not None:
- connargs.append('port=%d' % dburi.port)
+ connect_args['port'] = dburi.port
if dburi.hostname is not None:
- connargs.append('host=%s' % dburi.hostname)
- connargs.append('dbname=%s' % dburi.database)
- return self.driver.connect(' '.join(connargs))
+ connect_args['host'] = dburi.hostname
+ return self.driver.connect(**connect_args)
@contextmanager
def disable_transactions(self):
@@ -466,8 +518,3 @@ class PostgresqlBackend(DatabaseBackend):
self.connection.autocommit = True
yield
self.connection.autocommit = saved
-
- @contextmanager
- def lock_migration_table(self):
- self.execute('LOCK TABLE {table_name} IN ACCESS EXCLUSIVE MODE'.format(table_name=self.migration_table))
- yield
diff --git a/yoyo/connections.py b/yoyo/connections.py
index 410c6a0..777692d 100755
--- a/yoyo/connections.py
+++ b/yoyo/connections.py
@@ -57,14 +57,17 @@ class DatabaseURI(_DatabaseURI):
else:
return hostpart
- @property
- def uri(self):
+ def __str__(self):
return urlunsplit((self.scheme,
self.netloc,
self.database,
urlencode(self.args),
''))
+ @property
+ def uri(self):
+ return str(self)
+
class BadConnectionURI(Exception):
"""
diff --git a/yoyo/exceptions.py b/yoyo/exceptions.py
index 606c51c..03ddd8f 100755
--- a/yoyo/exceptions.py
+++ b/yoyo/exceptions.py
@@ -29,3 +29,9 @@ class MigrationConflict(Exception):
"""
The migration id conflicts with another migration
"""
+
+
+class LockTimeout(Exception):
+ """
+ Timeout was reached while acquiring the migration lock
+ """
diff --git a/yoyo/migrations.py b/yoyo/migrations.py
index b07f0af..e4c799f 100755
--- a/yoyo/migrations.py
+++ b/yoyo/migrations.py
@@ -111,8 +111,8 @@ class Migration(object):
except backend.DatabaseError:
exc_info = sys.exc_info()
- if not (backend.has_transactional_ddl or
- not self.use_transactions):
+ if not backend.has_transactional_ddl or \
+ not self.use_transactions:
# Any DDL statements that have been executed have been
# committed. Go through the rollback steps to undo
# these inasmuch is possible.
diff --git a/yoyo/scripts/main.py b/yoyo/scripts/main.py
index 4142c03..5bb7c14 100755
--- a/yoyo/scripts/main.py
+++ b/yoyo/scripts/main.py
@@ -262,7 +262,8 @@ def main(argv=None):
if args.sources:
config.set('DEFAULT', 'sources', ' '.join(args.sources))
if args.database:
- config.set('DEFAULT', 'database', args.database)
+ # ConfigParser requires that any percent signs in the db uri be escaped.
+ config.set('DEFAULT', 'database', args.database.replace('%', '%%'))
config.set('DEFAULT', 'migration_table', args.migration_table)
config.set('DEFAULT', 'batch_mode', 'on' if args.batch_mode else 'off')
config.set('DEFAULT', 'verbosity', str(args.verbosity))
@@ -277,7 +278,6 @@ def main(argv=None):
argparser.error(e.args[0])
if config_is_empty and args.use_config_file and not args.batch_mode:
-
prompt_save_config(config, args.config or CONFIG_FILENAME)
diff --git a/yoyo/scripts/migrate.py b/yoyo/scripts/migrate.py
index c18d02e..2e19700 100755
--- a/yoyo/scripts/migrate.py
+++ b/yoyo/scripts/migrate.py
@@ -96,6 +96,12 @@ def install_argparsers(global_parser, subparsers):
help="Unmark applied migrations, without rolling them back")
parser_unmark.set_defaults(func=unmark, command_name='unmark')
+ parser_break_lock = subparsers.add_parser(
+ 'break_lock',
+ parents=[global_parser],
+ help="Break migration locks")
+ parser_break_lock.set_defaults(func=break_lock, command_name='break-lock')
+
def get_migrations(args, backend):
@@ -160,33 +166,43 @@ def get_migrations(args, backend):
def apply(args, config):
backend = get_backend(args, config)
- migrations = get_migrations(args, backend)
- backend.apply_migrations(migrations, args.force)
+ with backend.lock():
+ migrations = get_migrations(args, backend)
+ backend.apply_migrations(migrations, args.force)
def reapply(args, config):
backend = get_backend(args, config)
- migrations = get_migrations(args, backend)
- backend.rollback_migrations(migrations, args.force)
- backend.apply_migrations(migrations, args.force)
+ with backend.lock():
+ migrations = get_migrations(args, backend)
+ backend.rollback_migrations(migrations, args.force)
+ backend.apply_migrations(migrations, args.force)
def rollback(args, config):
backend = get_backend(args, config)
- migrations = get_migrations(args, backend)
- backend.rollback_migrations(migrations, args.force)
+ with backend.lock():
+ migrations = get_migrations(args, backend)
+ backend.rollback_migrations(migrations, args.force)
def mark(args, config):
backend = get_backend(args, config)
- migrations = get_migrations(args, backend)
- backend.mark_migrations(migrations)
+ with backend.lock():
+ migrations = get_migrations(args, backend)
+ backend.mark_migrations(migrations)
def unmark(args, config):
backend = get_backend(args, config)
- migrations = get_migrations(args, backend)
- backend.unmark_migrations(migrations)
+ with backend.lock():
+ migrations = get_migrations(args, backend)
+ backend.unmark_migrations(migrations)
+
+
+def break_lock(args, config):
+ backend = get_backend(args, config)
+ backend.break_lock()
def prompt_migrations(backend, migrations, direction):
diff --git a/yoyo/tests/conftest.py b/yoyo/tests/conftest.py
index d57c004..bba941d 100644
--- a/yoyo/tests/conftest.py
+++ b/yoyo/tests/conftest.py
@@ -1,20 +1,38 @@
import pytest
+from yoyo import backends
from yoyo.connections import get_backend
+from yoyo.tests import get_test_backends
from yoyo.tests import get_test_dburis
-@pytest.yield_fixture(params=get_test_dburis())
-def backend_fixture(request):
+@pytest.fixture(params=get_test_dburis())
+def backend(request):
"""
Return all backends configured in ``test_databases.ini``
"""
backend = get_backend(request.param)
+ with backend.transaction():
+ if backend.__class__ is backends.MySQLBackend:
+ backend.execute('CREATE TABLE _yoyo_t '
+ '(id CHAR(1) primary key) '
+ 'ENGINE=InnoDB')
+ else:
+ backend.execute('CREATE TABLE _yoyo_t '
+ '(id CHAR(1) primary key)')
try:
yield backend
finally:
backend.rollback()
- for table in (backend.list_tables()):
+ for table in backend.list_tables():
+ if table.startswith('_yoyo'):
+ with backend.transaction():
+ backend.execute('DROP TABLE {}'.format(table))
+
+
+def pytest_configure(config):
+ for backend in get_test_backends():
+ for table in backend.list_tables():
if table.startswith('_yoyo'):
with backend.transaction():
backend.execute("DROP TABLE {}".format(table))
diff --git a/yoyo/tests/test_backends.py b/yoyo/tests/test_backends.py
index d567689..019ec26 100644
--- a/yoyo/tests/test_backends.py
+++ b/yoyo/tests/test_backends.py
@@ -1,32 +1,16 @@
import pytest
-import time
from threading import Thread
+import time
-from yoyo import backends, read_migrations
+from yoyo import backends
+from yoyo import read_migrations
+from yoyo import exceptions
from yoyo.tests import get_test_backends
from yoyo.tests import with_migrations
class TestTransactionHandling(object):
- @pytest.fixture(autouse=True, params=get_test_backends())
- def backend(self, request):
- backend = request.param
- with backend.transaction():
- if backend.__class__ is backends.MySQLBackend:
- backend.execute("CREATE TABLE _yoyo_t "
- "(id CHAR(1) primary key) "
- "ENGINE=InnoDB")
- else:
- backend.execute("CREATE TABLE _yoyo_t "
- "(id CHAR(1) primary key)")
- yield backend
- backend.rollback()
- for table in (backend.list_tables()):
- if table.startswith('_yoyo'):
- with backend.transaction():
- backend.execute("DROP TABLE {}".format(table))
-
def test_it_commits(self, backend):
with backend.transaction():
backend.execute("INSERT INTO _yoyo_t values ('A')")
@@ -109,22 +93,54 @@ class TestTransactionHandling(object):
As far as I know this behavior is PostgreSQL specific. We can't run
this test in sqlite as it does not support CREATE DATABASE.
"""
- from yoyo import read_migrations
for backend in get_test_backends(exclude={'sqlite'}):
migrations = read_migrations(tmpdir)
backend.apply_migrations(migrations)
backend.rollback_migrations(migrations)
-
- @with_migrations(a="""
- steps = [
- step("SELECT pg_sleep(1)"),
- ]
- """)
- def test_lock_migration_table(self, tmpdir):
- backend = get_test_backends(only={'postgresql'})[0]
- migrations = read_migrations(tmpdir)
- Thread(target = backend.apply_migrations, args = (migrations,)).start()
- # give a chance to start, but wake up in the middle of applying
- time.sleep(0.1)
- assert backend.get_applied_migration_ids() == ['a']
+ def test_lock(self, backend):
+ """
+ Test that :meth:`~yoyo.backends.DatabaseBackend.lock`
+ acquires an exclusive lock
+ """
+ if backend.uri.scheme == 'sqlite':
+ pytest.skip("Concurrency tests not supported for sqlite databases")
+
+ lock_duration = 0.2
+
+ def do_something_with_lock():
+ with backend.lock():
+ time.sleep(lock_duration)
+
+ thread = Thread(target=do_something_with_lock)
+ t = time.time()
+ thread.start()
+ # Give the thread time to acquire the lock, but not enough
+ # to complete
+ time.sleep(lock_duration * 0.2)
+ with backend.lock():
+ delta = time.time() - t
+ assert delta >= lock_duration
+
+ thread.join()
+
+ def test_lock_times_out(self, backend):
+
+ if backend.uri.scheme == 'sqlite':
+ pytest.skip("Concurrency tests not supported for sqlite databases")
+
+ def do_something_with_lock():
+ with backend.lock():
+ time.sleep(lock_duration)
+
+ lock_duration = 2
+ thread = Thread(target=do_something_with_lock)
+ thread.start()
+ # Give the thread time to acquire the lock, but not enough
+ # to complete
+ time.sleep(lock_duration * 0.1)
+ with pytest.raises(exceptions.LockTimeout):
+ with backend.lock(timeout=lock_duration * 0.1):
+ assert False, "Execution should never reach this point"
+
+ thread.join()
diff --git a/yoyo/tests/test_cli_script.py b/yoyo/tests/test_cli_script.py
index 6bcef48..7d5ed11 100644
--- a/yoyo/tests/test_cli_script.py
+++ b/yoyo/tests/test_cli_script.py
@@ -26,6 +26,7 @@ import re
from mock import Mock, patch, call
import frozendate
+import pytest
import tms
from yoyo import read_migrations
@@ -210,6 +211,35 @@ class TestYoyoScript(TestInteractiveScript):
assert self.prompt.call_count == 0
assert self.confirm.call_count == 0
+ def test_concurrent_instances_do_not_conflict(self, backend):
+ import threading
+ from functools import partial
+
+ if backend.uri.scheme == 'sqlite':
+ pytest.skip("Concurrency tests not supported for sqlite databases")
+
+ with with_migrations(m1=('import time\n'
+ 'step(lambda conn: time.sleep(0.1))\n'
+ 'step("INSERT INTO _yoyo_t VALUES (\'A\')")')
+ ) as tmpdir:
+ assert '_yoyo_t' in backend.list_tables()
+ backend.rollback()
+ backend.execute("SELECT * FROM _yoyo_t")
+ run_migrations = partial(
+ main,
+ ['apply', '-b', tmpdir, '--database', str(backend.uri)])
+ threads = [threading.Thread(target=run_migrations)
+ for ix in range(20)]
+ for t in threads:
+ t.start()
+ for t in threads:
+ t.join()
+
+ # Exactly one instance of the migration script should have succeeded
+ backend.rollback()
+ cursor = backend.execute('SELECT COUNT(1) from _yoyo_t')
+ assert cursor.fetchone()[0] == 1
+
class TestArgParsing(TestInteractiveScript):
diff --git a/yoyo/tests/test_connections.py b/yoyo/tests/test_connections.py
index f3b7440..9e5808c 100644
--- a/yoyo/tests/test_connections.py
+++ b/yoyo/tests/test_connections.py
@@ -77,8 +77,8 @@ def test_connections(import_module):
db='northwind', foo='bar')),
(backends.SQLiteBackend, 'sqlite3', call('northwind')),
(backends.PostgresqlBackend, 'psycopg2',
- call('user=scott password=tiger port=42 '
- 'host=db.example.org dbname=northwind')),
+ call(user='scott', password='tiger', port=42,
+ host='db.example.org', dbname='northwind')),
]
diff --git a/yoyo/tests/test_migrations.py b/yoyo/tests/test_migrations.py
index 376fcab..3bbd2b1 100644
--- a/yoyo/tests/test_migrations.py
+++ b/yoyo/tests/test_migrations.py
@@ -122,17 +122,17 @@ def test_rollbackignores_errors(tmpdir):
assert cursor.fetchall() == []
-def test_migration_is_committed(backend_fixture):
+def test_migration_is_committed(backend):
with migrations_dir('step("CREATE TABLE _yoyo_test (id INT)")') as tmpdir:
migrations = read_migrations(tmpdir)
- backend_fixture.apply_migrations(migrations)
+ backend.apply_migrations(migrations)
- backend_fixture.rollback()
- rows = backend_fixture.execute("SELECT * FROM _yoyo_test").fetchall()
+ backend.rollback()
+ rows = backend.execute("SELECT * FROM _yoyo_test").fetchall()
assert list(rows) == []
-def test_rollback_happens_on_step_failure(backend_fixture):
+def test_rollback_happens_on_step_failure(backend):
with migrations_dir('''
step("",
"CREATE TABLE _yoyo_is_rolledback (i INT)"),
@@ -140,22 +140,22 @@ def test_rollback_happens_on_step_failure(backend_fixture):
"DROP TABLE _yoyo_test")
step("invalid sql!")''') as tmpdir:
migrations = read_migrations(tmpdir)
- with pytest.raises(backend_fixture.DatabaseError):
- backend_fixture.apply_migrations(migrations)
+ with pytest.raises(backend.DatabaseError):
+ backend.apply_migrations(migrations)
# The _yoyo_test table should have either been deleted (transactional ddl)
# or dropped (non-transactional-ddl)
- with pytest.raises(backend_fixture.DatabaseError):
- backend_fixture.execute("SELECT * FROM _yoyo_test")
+ with pytest.raises(backend.DatabaseError):
+ backend.execute("SELECT * FROM _yoyo_test")
# Transactional DDL: rollback steps not executed
- if backend_fixture.has_transactional_ddl:
- with pytest.raises(backend_fixture.DatabaseError):
- backend_fixture.execute("SELECT * FROM _yoyo_is_rolledback")
+ if backend.has_transactional_ddl:
+ with pytest.raises(backend.DatabaseError):
+ backend.execute("SELECT * FROM _yoyo_is_rolledback")
# Non-transactional DDL: ensure the rollback steps were executed
else:
- cursor = backend_fixture.execute("SELECT * FROM _yoyo_is_rolledback")
+ cursor = backend.execute("SELECT * FROM _yoyo_is_rolledback")
assert list(cursor.fetchall()) == []