diff options
-rw-r--r-- | doc/build/changelog/changelog_08.rst | 17 | ||||
-rw-r--r-- | lib/sqlalchemy/engine/base.py | 130 | ||||
-rw-r--r-- | test/engine/test_execute.py | 65 | ||||
-rw-r--r-- | test/ext/test_horizontal_shard.py | 86 |
4 files changed, 259 insertions, 39 deletions
diff --git a/doc/build/changelog/changelog_08.rst b/doc/build/changelog/changelog_08.rst index 42185d4b7..d5ac10f10 100644 --- a/doc/build/changelog/changelog_08.rst +++ b/doc/build/changelog/changelog_08.rst @@ -9,6 +9,23 @@ :released: .. change:: + :tags: sql, feature + + Added a new method :meth:`.Engine.execution_options` + to :class:`.Engine`. This method works similarly to + :class:`.Connection.execution_options` in that it creates + a copy of the parent object which will refer to the new + set of options. The method can be used to build + sharding schemes where each engine shares the same + underlying pool of connections. The method + has been tested against the horizontal shard + recipe in the ORM as well. + + .. seealso:: + + :meth:`.Engine.execution_options` + + .. change:: :tags: sql, orm, bug :tickets: 2595 diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index 05ca0f980..bdb1a0004 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -112,16 +112,20 @@ class Connection(Connectable): the same underlying DBAPI connection, but also defines the given execution options which will take effect for a call to :meth:`execute`. As the new :class:`.Connection` references the same - underlying resource, it is probably best to ensure that the copies + underlying resource, it's usually a good idea to ensure that the copies would be discarded immediately, which is implicit if used as in:: result = connection.execution_options(stream_results=True).\\ execute(stmt) - :meth:`.Connection.execution_options` accepts all options as those - accepted by :meth:`.Executable.execution_options`. Additionally, - it includes options that are applicable only to - :class:`.Connection`. + Note that any key/value can be passed to :meth:`.Connection.execution_options`, + and it will be stored in the ``_execution_options`` dictionary of + the :class:`.Connnection`. It is suitable for usage by end-user + schemes to communicate with event listeners, for example. + + The keywords that are currently recognized by SQLAlchemy itself + include all those listed under :meth:`.Executable.execution_options`, + as well as others that are specific to :class:`.Connection`. :param autocommit: Available on: Connection, statement. When True, a COMMIT will be invoked after execution @@ -416,7 +420,7 @@ class Connection(Connectable): "Cannot start a two phase transaction when a transaction " "is already in progress.") if xid is None: - xid = self.engine.dialect.create_xid(); + xid = self.engine.dialect.create_xid() self.__transaction = TwoPhaseTransaction(self, xid) return self.__transaction @@ -1313,13 +1317,6 @@ class Engine(Connectable, log.Identified): if proxy: interfaces.ConnectionProxy._adapt_listener(self, proxy) if execution_options: - if 'isolation_level' in execution_options: - raise exc.ArgumentError( - "'isolation_level' execution option may " - "only be specified on Connection.execution_options(). " - "To set engine-wide isolation level, " - "use the isolation_level argument to create_engine()." - ) self.update_execution_options(**execution_options) def update_execution_options(self, **opt): @@ -1332,13 +1329,87 @@ class Engine(Connectable, log.Identified): can be sent via the ``execution_options`` parameter to :func:`.create_engine`. - See :meth:`.Connection.execution_options` for more - details on execution options. + .. seealso:: + + :meth:`.Connection.execution_options` + + :meth:`.Engine.execution_options` """ + if 'isolation_level' in opt: + raise exc.ArgumentError( + "'isolation_level' execution option may " + "only be specified on Connection.execution_options(). " + "To set engine-wide isolation level, " + "use the isolation_level argument to create_engine()." + ) self._execution_options = \ self._execution_options.union(opt) + def execution_options(self, **opt): + """Return a new :class:`.Engine` that will provide + :class:`.Connection` objects with the given execution options. + + The returned :class:`.Engine` remains related to the original + :class:`.Engine` in that it shares the same connection pool and + other state: + + * The :class:`.Pool` used by the new :class:`.Engine` is the + same instance. The :meth:`.Engine.dispose` method will replace + the connection pool instance for the parent engine as well + as this one. + * Event listeners are "cascaded" - meaning, the new :class:`.Engine` + inherits the events of the parent, and new events can be associated + with the new :class:`.Engine` individually. + * The logging configuration and logging_name is copied from the parent + :class:`.Engine`. + + The intent of the :meth:`.Engine.execution_options` method is + to implement "sharding" schemes where multiple :class:`.Engine` + objects refer to the same connection pool, but are differentiated + by options that would be consumed by a custom event:: + + primary_engine = create_engine("mysql://") + shard1 = primary_engine.execution_options(shard_id="shard1") + shard2 = primary_engine.execution_options(shard_id="shard2") + + Above, the ``shard1`` engine serves as a factory for :class:`.Connection` + objects that will contain the execution option ``shard_id=shard1``, + and ``shard2`` will produce :class:`.Connection` objects that contain + the execution option ``shard_id=shard2``. + + An event handler can consume the above execution option to perform + a schema switch or other operation, given a connection. Below + we emit a MySQL ``use`` statement to switch databases, at the same + time keeping track of which database we've established using the + :attr:`.Connection.info` dictionary, which gives us a persistent + storage space that follows the DBAPI connection:: + + from sqlalchemy import event + from sqlalchemy.engine import Engine + + shards = {"default": "base", shard_1": "db1", "shard_2": "db2"} + + @event.listens_for(Engine, "before_cursor_execute") + def _switch_shard(conn, cursor, stmt, params, context, executemany): + shard_id = conn._execution_options.get('shard_id', "default") + current_shard = conn.info.get("current_shard", None) + + if current_shard != shard_id: + cursor.execute("use %%s" %% shards[shard_id]) + conn.info["current_shard"] = shard_id + + .. seealso:: + + :meth:`.Connection.execution_options` - update execution options + on a :class:`.Connection` object. + + :meth:`.Engine.update_execution_options` - update the execution + options for a given :class:.`Engine` in place. + + """ + return OptionEngine(self, opt) + @property def name(self): """String name of the :class:`~sqlalchemy.engine.Dialect` in use by @@ -1605,7 +1676,7 @@ class Engine(Connectable, log.Identified): else: conn = connection if not schema: - schema = self.dialect.default_schema_name + schema = self.dialect.default_schema_name try: return self.dialect.get_table_names(conn, schema) finally: @@ -1634,4 +1705,31 @@ class Engine(Connectable, log.Identified): return self.pool.unique_connection() +class OptionEngine(Engine): + def __init__(self, proxied, execution_options): + self._proxied = proxied + self.url = proxied.url + self.dialect = proxied.dialect + self.logging_name = proxied.logging_name + self.echo = proxied.echo + log.instance_logger(self, echoflag=self.echo) + self.dispatch = self.dispatch._join(proxied.dispatch) + self._execution_options = proxied._execution_options + self.update_execution_options(**execution_options) + + def _get_pool(self): + return self._proxied.pool + + def _set_pool(self, pool): + self._proxied.pool = pool + + pool = property(_get_pool, _set_pool) + + def _get_has_events(self): + return self._proxied._has_events or \ + self.__dict__.get('_has_events', False) + + def _set_has_events(self, value): + self.__dict__['_has_events'] = value + _has_events = property(_get_has_events, _set_has_events)
\ No newline at end of file diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py index ced72f276..69507eabe 100644 --- a/test/engine/test_execute.py +++ b/test/engine/test_execute.py @@ -322,8 +322,8 @@ class ExecuteTest(fixtures.TestBase): @testing.requires.ad_hoc_engines def test_engine_level_options(self): - eng = engines.testing_engine(options={'execution_options' - : {'foo': 'bar'}}) + eng = engines.testing_engine(options={'execution_options': + {'foo': 'bar'}}) conn = eng.contextual_connect() eq_(conn._execution_options['foo'], 'bar') eq_(conn.execution_options(bat='hoho')._execution_options['foo' @@ -336,6 +336,66 @@ class ExecuteTest(fixtures.TestBase): conn = eng.contextual_connect() eq_(conn._execution_options['foo'], 'hoho') + @testing.requires.ad_hoc_engines + def test_generative_engine_execution_options(self): + eng = engines.testing_engine(options={'execution_options': + {'base': 'x1'}}) + + eng1 = eng.execution_options(foo="b1") + eng2 = eng.execution_options(foo="b2") + eng1a = eng1.execution_options(bar="a1") + eng2a = eng2.execution_options(foo="b3", bar="a2") + + eq_(eng._execution_options, + {'base': 'x1'}) + eq_(eng1._execution_options, + {'base': 'x1', 'foo': 'b1'}) + eq_(eng2._execution_options, + {'base': 'x1', 'foo': 'b2'}) + eq_(eng1a._execution_options, + {'base': 'x1', 'foo': 'b1', 'bar': 'a1'}) + eq_(eng2a._execution_options, + {'base': 'x1', 'foo': 'b3', 'bar': 'a2'}) + is_(eng1a.pool, eng.pool) + + # test pool is shared + eng2.dispose() + is_(eng1a.pool, eng2.pool) + is_(eng.pool, eng2.pool) + + @testing.requires.ad_hoc_engines + def test_generative_engine_event_dispatch(self): + canary = [] + def l1(*arg, **kw): + canary.append("l1") + def l2(*arg, **kw): + canary.append("l2") + def l3(*arg, **kw): + canary.append("l3") + + eng = engines.testing_engine(options={'execution_options': + {'base': 'x1'}}) + event.listen(eng, "before_execute", l1) + + eng1 = eng.execution_options(foo="b1") + event.listen(eng, "before_execute", l2) + event.listen(eng1, "before_execute", l3) + + eng.execute(select([1])) + eng1.execute(select([1])) + + eq_(canary, ["l1", "l2", "l3", "l1", "l2"]) + + @testing.requires.ad_hoc_engines + def test_generative_engine_event_dispatch_hasevents(self): + def l1(*arg, **kw): + pass + eng = create_engine(testing.db.url) + assert not eng._has_events + event.listen(eng, "before_execute", l1) + eng2 = eng.execution_options(foo='bar') + assert eng2._has_events + def test_unicode_test_fails_warning(self): class MockCursor(engines.DBAPIProxyCursor): def execute(self, stmt, params=None, **kw): @@ -1018,6 +1078,7 @@ class EngineEventsTest(fixtures.TestBase): def tearDown(self): Engine.dispatch._clear() + Engine._has_events = False def _assert_stmts(self, expected, received): orig = list(received) diff --git a/test/ext/test_horizontal_shard.py b/test/ext/test_horizontal_shard.py index 0b9b89d2b..024510886 100644 --- a/test/ext/test_horizontal_shard.py +++ b/test/ext/test_horizontal_shard.py @@ -5,6 +5,7 @@ from sqlalchemy import sql, util from sqlalchemy.orm import * from sqlalchemy.ext.horizontal_shard import ShardedSession from sqlalchemy.sql import operators +from sqlalchemy import pool from sqlalchemy.testing import fixtures from sqlalchemy import testing from sqlalchemy.testing.engines import testing_engine @@ -13,19 +14,19 @@ from nose import SkipTest # TODO: ShardTest can be turned into a base for further subclasses -class ShardTest(fixtures.TestBase): + + + +class ShardTest(object): __skip_if__ = (lambda: util.win32,) + __requires__ = 'sqlite', + + schema = None def setUp(self): global db1, db2, db3, db4, weather_locations, weather_reports - try: - db1 = testing_engine('sqlite:///shard1.db', options=dict(pool_threadlocal=True)) - except ImportError: - raise SkipTest('Requires sqlite') - db2 = testing_engine('sqlite:///shard2.db') - db3 = testing_engine('sqlite:///shard3.db') - db4 = testing_engine('sqlite:///shard4.db') + db1, db2, db3, db4 = self._init_dbs() meta = MetaData() ids = Table('ids', meta, @@ -36,13 +37,14 @@ class ShardTest(fixtures.TestBase): c = db1.contextual_connect() nextid = c.execute(ids.select(for_update=True)).scalar() - c.execute(ids.update(values={ids.c.nextid : ids.c.nextid + 1})) + c.execute(ids.update(values={ids.c.nextid: ids.c.nextid + 1})) return nextid weather_locations = Table("weather_locations", meta, Column('id', Integer, primary_key=True, default=id_generator), Column('continent', String(30), nullable=False), - Column('city', String(50), nullable=False) + Column('city', String(50), nullable=False), + schema=self.schema ) weather_reports = Table( @@ -50,10 +52,11 @@ class ShardTest(fixtures.TestBase): meta, Column('id', Integer, primary_key=True), Column('location_id', Integer, - ForeignKey('weather_locations.id')), + ForeignKey(weather_locations.c.id)), Column('temperature', Float), Column('report_time', DateTime, default=datetime.datetime.now), + schema=self.schema ) for db in (db1, db2, db3, db4): @@ -64,13 +67,6 @@ class ShardTest(fixtures.TestBase): self.setup_session() self.setup_mappers() - def tearDown(self): - clear_mappers() - - for db in (db1, db2, db3, db4): - db.connect().invalidate() - for i in range(1,5): - os.remove("shard%d.db" % i) @classmethod def setup_session(cls): @@ -139,11 +135,12 @@ class ShardTest(fixtures.TestBase): self.temperature = temperature mapper(WeatherLocation, weather_locations, properties={ - 'reports':relationship(Report, backref='location'), + 'reports': relationship(Report, backref='location'), 'city': deferred(weather_locations.c.city), }) mapper(Report, weather_reports) + def _fixture_data(self): tokyo = WeatherLocation('Asia', 'Tokyo') newyork = WeatherLocation('North America', 'New York') @@ -204,7 +201,8 @@ class ShardTest(fixtures.TestBase): event.listen(WeatherLocation, "load", load) sess = self._fixture_data() - tokyo = sess.query(WeatherLocation).filter_by(city="Tokyo").set_shard("asia").one() + tokyo = sess.query(WeatherLocation).\ + filter_by(city="Tokyo").set_shard("asia").one() sess.query(WeatherLocation).all() eq_( @@ -212,4 +210,50 @@ class ShardTest(fixtures.TestBase): ['asia', 'north_america', 'north_america', 'europe', 'europe', 'south_america', 'south_america'] - )
\ No newline at end of file + ) + +class DistinctEngineShardTest(ShardTest, fixtures.TestBase): + + def _init_dbs(self): + db1 = testing_engine('sqlite:///shard1.db', + options=dict(pool_threadlocal=True)) + db2 = testing_engine('sqlite:///shard2.db') + db3 = testing_engine('sqlite:///shard3.db') + db4 = testing_engine('sqlite:///shard4.db') + + return db1, db2, db3, db4 + + def tearDown(self): + clear_mappers() + + for db in (db1, db2, db3, db4): + db.connect().invalidate() + for i in range(1, 5): + os.remove("shard%d.db" % i) + +class AttachedFileShardTest(ShardTest, fixtures.TestBase): + schema = "changeme" + + def _init_dbs(self): + db1 = testing_engine('sqlite://', options={"execution_options": + {"shard_id": "shard1"}}) + assert db1._has_events + + db2 = db1.execution_options(shard_id="shard2") + db3 = db1.execution_options(shard_id="shard3") + db4 = db1.execution_options(shard_id="shard4") + + import re + @event.listens_for(db1, "before_cursor_execute", retval=True) + def _switch_shard(conn, cursor, stmt, params, context, executemany): + shard_id = conn._execution_options['shard_id'] + # because SQLite can't just give us a "use" statement, we have + # to use the schema hack to locate table names + if shard_id: + stmt = re.sub(r"\"?changeme\"?\.", shard_id + "_", stmt) + + return stmt, params + + return db1, db2, db3, db4 + + |