summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/build/changelog/changelog_08.rst17
-rw-r--r--lib/sqlalchemy/engine/base.py130
-rw-r--r--test/engine/test_execute.py65
-rw-r--r--test/ext/test_horizontal_shard.py86
4 files changed, 259 insertions, 39 deletions
diff --git a/doc/build/changelog/changelog_08.rst b/doc/build/changelog/changelog_08.rst
index 42185d4b7..d5ac10f10 100644
--- a/doc/build/changelog/changelog_08.rst
+++ b/doc/build/changelog/changelog_08.rst
@@ -9,6 +9,23 @@
:released:
.. change::
+ :tags: sql, feature
+
+ Added a new method :meth:`.Engine.execution_options`
+ to :class:`.Engine`. This method works similarly to
+ :class:`.Connection.execution_options` in that it creates
+ a copy of the parent object which will refer to the new
+ set of options. The method can be used to build
+ sharding schemes where each engine shares the same
+ underlying pool of connections. The method
+ has been tested against the horizontal shard
+ recipe in the ORM as well.
+
+ .. seealso::
+
+ :meth:`.Engine.execution_options`
+
+ .. change::
:tags: sql, orm, bug
:tickets: 2595
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py
index 05ca0f980..bdb1a0004 100644
--- a/lib/sqlalchemy/engine/base.py
+++ b/lib/sqlalchemy/engine/base.py
@@ -112,16 +112,20 @@ class Connection(Connectable):
the same underlying DBAPI connection, but also defines the given
execution options which will take effect for a call to
:meth:`execute`. As the new :class:`.Connection` references the same
- underlying resource, it is probably best to ensure that the copies
+ underlying resource, it's usually a good idea to ensure that the copies
would be discarded immediately, which is implicit if used as in::
result = connection.execution_options(stream_results=True).\\
execute(stmt)
- :meth:`.Connection.execution_options` accepts all options as those
- accepted by :meth:`.Executable.execution_options`. Additionally,
- it includes options that are applicable only to
- :class:`.Connection`.
+ Note that any key/value can be passed to :meth:`.Connection.execution_options`,
+ and it will be stored in the ``_execution_options`` dictionary of
+ the :class:`.Connnection`. It is suitable for usage by end-user
+ schemes to communicate with event listeners, for example.
+
+ The keywords that are currently recognized by SQLAlchemy itself
+ include all those listed under :meth:`.Executable.execution_options`,
+ as well as others that are specific to :class:`.Connection`.
:param autocommit: Available on: Connection, statement.
When True, a COMMIT will be invoked after execution
@@ -416,7 +420,7 @@ class Connection(Connectable):
"Cannot start a two phase transaction when a transaction "
"is already in progress.")
if xid is None:
- xid = self.engine.dialect.create_xid();
+ xid = self.engine.dialect.create_xid()
self.__transaction = TwoPhaseTransaction(self, xid)
return self.__transaction
@@ -1313,13 +1317,6 @@ class Engine(Connectable, log.Identified):
if proxy:
interfaces.ConnectionProxy._adapt_listener(self, proxy)
if execution_options:
- if 'isolation_level' in execution_options:
- raise exc.ArgumentError(
- "'isolation_level' execution option may "
- "only be specified on Connection.execution_options(). "
- "To set engine-wide isolation level, "
- "use the isolation_level argument to create_engine()."
- )
self.update_execution_options(**execution_options)
def update_execution_options(self, **opt):
@@ -1332,13 +1329,87 @@ class Engine(Connectable, log.Identified):
can be sent via the ``execution_options`` parameter
to :func:`.create_engine`.
- See :meth:`.Connection.execution_options` for more
- details on execution options.
+ .. seealso::
+
+ :meth:`.Connection.execution_options`
+
+ :meth:`.Engine.execution_options`
"""
+ if 'isolation_level' in opt:
+ raise exc.ArgumentError(
+ "'isolation_level' execution option may "
+ "only be specified on Connection.execution_options(). "
+ "To set engine-wide isolation level, "
+ "use the isolation_level argument to create_engine()."
+ )
self._execution_options = \
self._execution_options.union(opt)
+ def execution_options(self, **opt):
+ """Return a new :class:`.Engine` that will provide
+ :class:`.Connection` objects with the given execution options.
+
+ The returned :class:`.Engine` remains related to the original
+ :class:`.Engine` in that it shares the same connection pool and
+ other state:
+
+ * The :class:`.Pool` used by the new :class:`.Engine` is the
+ same instance. The :meth:`.Engine.dispose` method will replace
+ the connection pool instance for the parent engine as well
+ as this one.
+ * Event listeners are "cascaded" - meaning, the new :class:`.Engine`
+ inherits the events of the parent, and new events can be associated
+ with the new :class:`.Engine` individually.
+ * The logging configuration and logging_name is copied from the parent
+ :class:`.Engine`.
+
+ The intent of the :meth:`.Engine.execution_options` method is
+ to implement "sharding" schemes where multiple :class:`.Engine`
+ objects refer to the same connection pool, but are differentiated
+ by options that would be consumed by a custom event::
+
+ primary_engine = create_engine("mysql://")
+ shard1 = primary_engine.execution_options(shard_id="shard1")
+ shard2 = primary_engine.execution_options(shard_id="shard2")
+
+ Above, the ``shard1`` engine serves as a factory for :class:`.Connection`
+ objects that will contain the execution option ``shard_id=shard1``,
+ and ``shard2`` will produce :class:`.Connection` objects that contain
+ the execution option ``shard_id=shard2``.
+
+ An event handler can consume the above execution option to perform
+ a schema switch or other operation, given a connection. Below
+ we emit a MySQL ``use`` statement to switch databases, at the same
+ time keeping track of which database we've established using the
+ :attr:`.Connection.info` dictionary, which gives us a persistent
+ storage space that follows the DBAPI connection::
+
+ from sqlalchemy import event
+ from sqlalchemy.engine import Engine
+
+ shards = {"default": "base", shard_1": "db1", "shard_2": "db2"}
+
+ @event.listens_for(Engine, "before_cursor_execute")
+ def _switch_shard(conn, cursor, stmt, params, context, executemany):
+ shard_id = conn._execution_options.get('shard_id', "default")
+ current_shard = conn.info.get("current_shard", None)
+
+ if current_shard != shard_id:
+ cursor.execute("use %%s" %% shards[shard_id])
+ conn.info["current_shard"] = shard_id
+
+ .. seealso::
+
+ :meth:`.Connection.execution_options` - update execution options
+ on a :class:`.Connection` object.
+
+ :meth:`.Engine.update_execution_options` - update the execution
+ options for a given :class:.`Engine` in place.
+
+ """
+ return OptionEngine(self, opt)
+
@property
def name(self):
"""String name of the :class:`~sqlalchemy.engine.Dialect` in use by
@@ -1605,7 +1676,7 @@ class Engine(Connectable, log.Identified):
else:
conn = connection
if not schema:
- schema = self.dialect.default_schema_name
+ schema = self.dialect.default_schema_name
try:
return self.dialect.get_table_names(conn, schema)
finally:
@@ -1634,4 +1705,31 @@ class Engine(Connectable, log.Identified):
return self.pool.unique_connection()
+class OptionEngine(Engine):
+ def __init__(self, proxied, execution_options):
+ self._proxied = proxied
+ self.url = proxied.url
+ self.dialect = proxied.dialect
+ self.logging_name = proxied.logging_name
+ self.echo = proxied.echo
+ log.instance_logger(self, echoflag=self.echo)
+ self.dispatch = self.dispatch._join(proxied.dispatch)
+ self._execution_options = proxied._execution_options
+ self.update_execution_options(**execution_options)
+
+ def _get_pool(self):
+ return self._proxied.pool
+
+ def _set_pool(self, pool):
+ self._proxied.pool = pool
+
+ pool = property(_get_pool, _set_pool)
+
+ def _get_has_events(self):
+ return self._proxied._has_events or \
+ self.__dict__.get('_has_events', False)
+
+ def _set_has_events(self, value):
+ self.__dict__['_has_events'] = value
+ _has_events = property(_get_has_events, _set_has_events) \ No newline at end of file
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py
index ced72f276..69507eabe 100644
--- a/test/engine/test_execute.py
+++ b/test/engine/test_execute.py
@@ -322,8 +322,8 @@ class ExecuteTest(fixtures.TestBase):
@testing.requires.ad_hoc_engines
def test_engine_level_options(self):
- eng = engines.testing_engine(options={'execution_options'
- : {'foo': 'bar'}})
+ eng = engines.testing_engine(options={'execution_options':
+ {'foo': 'bar'}})
conn = eng.contextual_connect()
eq_(conn._execution_options['foo'], 'bar')
eq_(conn.execution_options(bat='hoho')._execution_options['foo'
@@ -336,6 +336,66 @@ class ExecuteTest(fixtures.TestBase):
conn = eng.contextual_connect()
eq_(conn._execution_options['foo'], 'hoho')
+ @testing.requires.ad_hoc_engines
+ def test_generative_engine_execution_options(self):
+ eng = engines.testing_engine(options={'execution_options':
+ {'base': 'x1'}})
+
+ eng1 = eng.execution_options(foo="b1")
+ eng2 = eng.execution_options(foo="b2")
+ eng1a = eng1.execution_options(bar="a1")
+ eng2a = eng2.execution_options(foo="b3", bar="a2")
+
+ eq_(eng._execution_options,
+ {'base': 'x1'})
+ eq_(eng1._execution_options,
+ {'base': 'x1', 'foo': 'b1'})
+ eq_(eng2._execution_options,
+ {'base': 'x1', 'foo': 'b2'})
+ eq_(eng1a._execution_options,
+ {'base': 'x1', 'foo': 'b1', 'bar': 'a1'})
+ eq_(eng2a._execution_options,
+ {'base': 'x1', 'foo': 'b3', 'bar': 'a2'})
+ is_(eng1a.pool, eng.pool)
+
+ # test pool is shared
+ eng2.dispose()
+ is_(eng1a.pool, eng2.pool)
+ is_(eng.pool, eng2.pool)
+
+ @testing.requires.ad_hoc_engines
+ def test_generative_engine_event_dispatch(self):
+ canary = []
+ def l1(*arg, **kw):
+ canary.append("l1")
+ def l2(*arg, **kw):
+ canary.append("l2")
+ def l3(*arg, **kw):
+ canary.append("l3")
+
+ eng = engines.testing_engine(options={'execution_options':
+ {'base': 'x1'}})
+ event.listen(eng, "before_execute", l1)
+
+ eng1 = eng.execution_options(foo="b1")
+ event.listen(eng, "before_execute", l2)
+ event.listen(eng1, "before_execute", l3)
+
+ eng.execute(select([1]))
+ eng1.execute(select([1]))
+
+ eq_(canary, ["l1", "l2", "l3", "l1", "l2"])
+
+ @testing.requires.ad_hoc_engines
+ def test_generative_engine_event_dispatch_hasevents(self):
+ def l1(*arg, **kw):
+ pass
+ eng = create_engine(testing.db.url)
+ assert not eng._has_events
+ event.listen(eng, "before_execute", l1)
+ eng2 = eng.execution_options(foo='bar')
+ assert eng2._has_events
+
def test_unicode_test_fails_warning(self):
class MockCursor(engines.DBAPIProxyCursor):
def execute(self, stmt, params=None, **kw):
@@ -1018,6 +1078,7 @@ class EngineEventsTest(fixtures.TestBase):
def tearDown(self):
Engine.dispatch._clear()
+ Engine._has_events = False
def _assert_stmts(self, expected, received):
orig = list(received)
diff --git a/test/ext/test_horizontal_shard.py b/test/ext/test_horizontal_shard.py
index 0b9b89d2b..024510886 100644
--- a/test/ext/test_horizontal_shard.py
+++ b/test/ext/test_horizontal_shard.py
@@ -5,6 +5,7 @@ from sqlalchemy import sql, util
from sqlalchemy.orm import *
from sqlalchemy.ext.horizontal_shard import ShardedSession
from sqlalchemy.sql import operators
+from sqlalchemy import pool
from sqlalchemy.testing import fixtures
from sqlalchemy import testing
from sqlalchemy.testing.engines import testing_engine
@@ -13,19 +14,19 @@ from nose import SkipTest
# TODO: ShardTest can be turned into a base for further subclasses
-class ShardTest(fixtures.TestBase):
+
+
+
+class ShardTest(object):
__skip_if__ = (lambda: util.win32,)
+ __requires__ = 'sqlite',
+
+ schema = None
def setUp(self):
global db1, db2, db3, db4, weather_locations, weather_reports
- try:
- db1 = testing_engine('sqlite:///shard1.db', options=dict(pool_threadlocal=True))
- except ImportError:
- raise SkipTest('Requires sqlite')
- db2 = testing_engine('sqlite:///shard2.db')
- db3 = testing_engine('sqlite:///shard3.db')
- db4 = testing_engine('sqlite:///shard4.db')
+ db1, db2, db3, db4 = self._init_dbs()
meta = MetaData()
ids = Table('ids', meta,
@@ -36,13 +37,14 @@ class ShardTest(fixtures.TestBase):
c = db1.contextual_connect()
nextid = c.execute(ids.select(for_update=True)).scalar()
- c.execute(ids.update(values={ids.c.nextid : ids.c.nextid + 1}))
+ c.execute(ids.update(values={ids.c.nextid: ids.c.nextid + 1}))
return nextid
weather_locations = Table("weather_locations", meta,
Column('id', Integer, primary_key=True, default=id_generator),
Column('continent', String(30), nullable=False),
- Column('city', String(50), nullable=False)
+ Column('city', String(50), nullable=False),
+ schema=self.schema
)
weather_reports = Table(
@@ -50,10 +52,11 @@ class ShardTest(fixtures.TestBase):
meta,
Column('id', Integer, primary_key=True),
Column('location_id', Integer,
- ForeignKey('weather_locations.id')),
+ ForeignKey(weather_locations.c.id)),
Column('temperature', Float),
Column('report_time', DateTime,
default=datetime.datetime.now),
+ schema=self.schema
)
for db in (db1, db2, db3, db4):
@@ -64,13 +67,6 @@ class ShardTest(fixtures.TestBase):
self.setup_session()
self.setup_mappers()
- def tearDown(self):
- clear_mappers()
-
- for db in (db1, db2, db3, db4):
- db.connect().invalidate()
- for i in range(1,5):
- os.remove("shard%d.db" % i)
@classmethod
def setup_session(cls):
@@ -139,11 +135,12 @@ class ShardTest(fixtures.TestBase):
self.temperature = temperature
mapper(WeatherLocation, weather_locations, properties={
- 'reports':relationship(Report, backref='location'),
+ 'reports': relationship(Report, backref='location'),
'city': deferred(weather_locations.c.city),
})
mapper(Report, weather_reports)
+
def _fixture_data(self):
tokyo = WeatherLocation('Asia', 'Tokyo')
newyork = WeatherLocation('North America', 'New York')
@@ -204,7 +201,8 @@ class ShardTest(fixtures.TestBase):
event.listen(WeatherLocation, "load", load)
sess = self._fixture_data()
- tokyo = sess.query(WeatherLocation).filter_by(city="Tokyo").set_shard("asia").one()
+ tokyo = sess.query(WeatherLocation).\
+ filter_by(city="Tokyo").set_shard("asia").one()
sess.query(WeatherLocation).all()
eq_(
@@ -212,4 +210,50 @@ class ShardTest(fixtures.TestBase):
['asia', 'north_america', 'north_america',
'europe', 'europe', 'south_america',
'south_america']
- ) \ No newline at end of file
+ )
+
+class DistinctEngineShardTest(ShardTest, fixtures.TestBase):
+
+ def _init_dbs(self):
+ db1 = testing_engine('sqlite:///shard1.db',
+ options=dict(pool_threadlocal=True))
+ db2 = testing_engine('sqlite:///shard2.db')
+ db3 = testing_engine('sqlite:///shard3.db')
+ db4 = testing_engine('sqlite:///shard4.db')
+
+ return db1, db2, db3, db4
+
+ def tearDown(self):
+ clear_mappers()
+
+ for db in (db1, db2, db3, db4):
+ db.connect().invalidate()
+ for i in range(1, 5):
+ os.remove("shard%d.db" % i)
+
+class AttachedFileShardTest(ShardTest, fixtures.TestBase):
+ schema = "changeme"
+
+ def _init_dbs(self):
+ db1 = testing_engine('sqlite://', options={"execution_options":
+ {"shard_id": "shard1"}})
+ assert db1._has_events
+
+ db2 = db1.execution_options(shard_id="shard2")
+ db3 = db1.execution_options(shard_id="shard3")
+ db4 = db1.execution_options(shard_id="shard4")
+
+ import re
+ @event.listens_for(db1, "before_cursor_execute", retval=True)
+ def _switch_shard(conn, cursor, stmt, params, context, executemany):
+ shard_id = conn._execution_options['shard_id']
+ # because SQLite can't just give us a "use" statement, we have
+ # to use the schema hack to locate table names
+ if shard_id:
+ stmt = re.sub(r"\"?changeme\"?\.", shard_id + "_", stmt)
+
+ return stmt, params
+
+ return db1, db2, db3, db4
+
+