summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2012-10-23 12:08:20 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2012-10-23 12:08:20 -0400
commitbac14cdf477151f5d3bea3450565462a66c17ee2 (patch)
tree2396dfbb8b18c90e2fcd43892a9d9d5e400cf679
parent389325099d4d8c0ce42a5a0d5395fbe3ead15af5 (diff)
downloadsqlalchemy-bac14cdf477151f5d3bea3450565462a66c17ee2.tar.gz
Added a new method :meth:`.Engine.execution_options`
to :class:`.Engine`. This method works similarly to :class:`.Connection.execution_options` in that it creates a copy of the parent object which will refer to the new set of options. The method can be used to build sharding schemes where each engine shares the same underlying pool of connections. The method has been tested against the horizontal shard recipe in the ORM as well.
-rw-r--r--doc/build/changelog/changelog_08.rst17
-rw-r--r--lib/sqlalchemy/engine/base.py130
-rw-r--r--test/engine/test_execute.py65
-rw-r--r--test/ext/test_horizontal_shard.py86
4 files changed, 259 insertions, 39 deletions
diff --git a/doc/build/changelog/changelog_08.rst b/doc/build/changelog/changelog_08.rst
index 42185d4b7..d5ac10f10 100644
--- a/doc/build/changelog/changelog_08.rst
+++ b/doc/build/changelog/changelog_08.rst
@@ -9,6 +9,23 @@
:released:
.. change::
+ :tags: sql, feature
+
+ Added a new method :meth:`.Engine.execution_options`
+ to :class:`.Engine`. This method works similarly to
+ :class:`.Connection.execution_options` in that it creates
+ a copy of the parent object which will refer to the new
+ set of options. The method can be used to build
+ sharding schemes where each engine shares the same
+ underlying pool of connections. The method
+ has been tested against the horizontal shard
+ recipe in the ORM as well.
+
+ .. seealso::
+
+ :meth:`.Engine.execution_options`
+
+ .. change::
:tags: sql, orm, bug
:tickets: 2595
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py
index 05ca0f980..bdb1a0004 100644
--- a/lib/sqlalchemy/engine/base.py
+++ b/lib/sqlalchemy/engine/base.py
@@ -112,16 +112,20 @@ class Connection(Connectable):
the same underlying DBAPI connection, but also defines the given
execution options which will take effect for a call to
:meth:`execute`. As the new :class:`.Connection` references the same
- underlying resource, it is probably best to ensure that the copies
+ underlying resource, it's usually a good idea to ensure that the copies
would be discarded immediately, which is implicit if used as in::
result = connection.execution_options(stream_results=True).\\
execute(stmt)
- :meth:`.Connection.execution_options` accepts all options as those
- accepted by :meth:`.Executable.execution_options`. Additionally,
- it includes options that are applicable only to
- :class:`.Connection`.
+ Note that any key/value can be passed to :meth:`.Connection.execution_options`,
+ and it will be stored in the ``_execution_options`` dictionary of
+ the :class:`.Connnection`. It is suitable for usage by end-user
+ schemes to communicate with event listeners, for example.
+
+ The keywords that are currently recognized by SQLAlchemy itself
+ include all those listed under :meth:`.Executable.execution_options`,
+ as well as others that are specific to :class:`.Connection`.
:param autocommit: Available on: Connection, statement.
When True, a COMMIT will be invoked after execution
@@ -416,7 +420,7 @@ class Connection(Connectable):
"Cannot start a two phase transaction when a transaction "
"is already in progress.")
if xid is None:
- xid = self.engine.dialect.create_xid();
+ xid = self.engine.dialect.create_xid()
self.__transaction = TwoPhaseTransaction(self, xid)
return self.__transaction
@@ -1313,13 +1317,6 @@ class Engine(Connectable, log.Identified):
if proxy:
interfaces.ConnectionProxy._adapt_listener(self, proxy)
if execution_options:
- if 'isolation_level' in execution_options:
- raise exc.ArgumentError(
- "'isolation_level' execution option may "
- "only be specified on Connection.execution_options(). "
- "To set engine-wide isolation level, "
- "use the isolation_level argument to create_engine()."
- )
self.update_execution_options(**execution_options)
def update_execution_options(self, **opt):
@@ -1332,13 +1329,87 @@ class Engine(Connectable, log.Identified):
can be sent via the ``execution_options`` parameter
to :func:`.create_engine`.
- See :meth:`.Connection.execution_options` for more
- details on execution options.
+ .. seealso::
+
+ :meth:`.Connection.execution_options`
+
+ :meth:`.Engine.execution_options`
"""
+ if 'isolation_level' in opt:
+ raise exc.ArgumentError(
+ "'isolation_level' execution option may "
+ "only be specified on Connection.execution_options(). "
+ "To set engine-wide isolation level, "
+ "use the isolation_level argument to create_engine()."
+ )
self._execution_options = \
self._execution_options.union(opt)
+ def execution_options(self, **opt):
+ """Return a new :class:`.Engine` that will provide
+ :class:`.Connection` objects with the given execution options.
+
+ The returned :class:`.Engine` remains related to the original
+ :class:`.Engine` in that it shares the same connection pool and
+ other state:
+
+ * The :class:`.Pool` used by the new :class:`.Engine` is the
+ same instance. The :meth:`.Engine.dispose` method will replace
+ the connection pool instance for the parent engine as well
+ as this one.
+ * Event listeners are "cascaded" - meaning, the new :class:`.Engine`
+ inherits the events of the parent, and new events can be associated
+ with the new :class:`.Engine` individually.
+ * The logging configuration and logging_name is copied from the parent
+ :class:`.Engine`.
+
+ The intent of the :meth:`.Engine.execution_options` method is
+ to implement "sharding" schemes where multiple :class:`.Engine`
+ objects refer to the same connection pool, but are differentiated
+ by options that would be consumed by a custom event::
+
+ primary_engine = create_engine("mysql://")
+ shard1 = primary_engine.execution_options(shard_id="shard1")
+ shard2 = primary_engine.execution_options(shard_id="shard2")
+
+ Above, the ``shard1`` engine serves as a factory for :class:`.Connection`
+ objects that will contain the execution option ``shard_id=shard1``,
+ and ``shard2`` will produce :class:`.Connection` objects that contain
+ the execution option ``shard_id=shard2``.
+
+ An event handler can consume the above execution option to perform
+ a schema switch or other operation, given a connection. Below
+ we emit a MySQL ``use`` statement to switch databases, at the same
+ time keeping track of which database we've established using the
+ :attr:`.Connection.info` dictionary, which gives us a persistent
+ storage space that follows the DBAPI connection::
+
+ from sqlalchemy import event
+ from sqlalchemy.engine import Engine
+
+ shards = {"default": "base", shard_1": "db1", "shard_2": "db2"}
+
+ @event.listens_for(Engine, "before_cursor_execute")
+ def _switch_shard(conn, cursor, stmt, params, context, executemany):
+ shard_id = conn._execution_options.get('shard_id', "default")
+ current_shard = conn.info.get("current_shard", None)
+
+ if current_shard != shard_id:
+ cursor.execute("use %%s" %% shards[shard_id])
+ conn.info["current_shard"] = shard_id
+
+ .. seealso::
+
+ :meth:`.Connection.execution_options` - update execution options
+ on a :class:`.Connection` object.
+
+ :meth:`.Engine.update_execution_options` - update the execution
+ options for a given :class:.`Engine` in place.
+
+ """
+ return OptionEngine(self, opt)
+
@property
def name(self):
"""String name of the :class:`~sqlalchemy.engine.Dialect` in use by
@@ -1605,7 +1676,7 @@ class Engine(Connectable, log.Identified):
else:
conn = connection
if not schema:
- schema = self.dialect.default_schema_name
+ schema = self.dialect.default_schema_name
try:
return self.dialect.get_table_names(conn, schema)
finally:
@@ -1634,4 +1705,31 @@ class Engine(Connectable, log.Identified):
return self.pool.unique_connection()
+class OptionEngine(Engine):
+ def __init__(self, proxied, execution_options):
+ self._proxied = proxied
+ self.url = proxied.url
+ self.dialect = proxied.dialect
+ self.logging_name = proxied.logging_name
+ self.echo = proxied.echo
+ log.instance_logger(self, echoflag=self.echo)
+ self.dispatch = self.dispatch._join(proxied.dispatch)
+ self._execution_options = proxied._execution_options
+ self.update_execution_options(**execution_options)
+
+ def _get_pool(self):
+ return self._proxied.pool
+
+ def _set_pool(self, pool):
+ self._proxied.pool = pool
+
+ pool = property(_get_pool, _set_pool)
+
+ def _get_has_events(self):
+ return self._proxied._has_events or \
+ self.__dict__.get('_has_events', False)
+
+ def _set_has_events(self, value):
+ self.__dict__['_has_events'] = value
+ _has_events = property(_get_has_events, _set_has_events) \ No newline at end of file
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py
index ced72f276..69507eabe 100644
--- a/test/engine/test_execute.py
+++ b/test/engine/test_execute.py
@@ -322,8 +322,8 @@ class ExecuteTest(fixtures.TestBase):
@testing.requires.ad_hoc_engines
def test_engine_level_options(self):
- eng = engines.testing_engine(options={'execution_options'
- : {'foo': 'bar'}})
+ eng = engines.testing_engine(options={'execution_options':
+ {'foo': 'bar'}})
conn = eng.contextual_connect()
eq_(conn._execution_options['foo'], 'bar')
eq_(conn.execution_options(bat='hoho')._execution_options['foo'
@@ -336,6 +336,66 @@ class ExecuteTest(fixtures.TestBase):
conn = eng.contextual_connect()
eq_(conn._execution_options['foo'], 'hoho')
+ @testing.requires.ad_hoc_engines
+ def test_generative_engine_execution_options(self):
+ eng = engines.testing_engine(options={'execution_options':
+ {'base': 'x1'}})
+
+ eng1 = eng.execution_options(foo="b1")
+ eng2 = eng.execution_options(foo="b2")
+ eng1a = eng1.execution_options(bar="a1")
+ eng2a = eng2.execution_options(foo="b3", bar="a2")
+
+ eq_(eng._execution_options,
+ {'base': 'x1'})
+ eq_(eng1._execution_options,
+ {'base': 'x1', 'foo': 'b1'})
+ eq_(eng2._execution_options,
+ {'base': 'x1', 'foo': 'b2'})
+ eq_(eng1a._execution_options,
+ {'base': 'x1', 'foo': 'b1', 'bar': 'a1'})
+ eq_(eng2a._execution_options,
+ {'base': 'x1', 'foo': 'b3', 'bar': 'a2'})
+ is_(eng1a.pool, eng.pool)
+
+ # test pool is shared
+ eng2.dispose()
+ is_(eng1a.pool, eng2.pool)
+ is_(eng.pool, eng2.pool)
+
+ @testing.requires.ad_hoc_engines
+ def test_generative_engine_event_dispatch(self):
+ canary = []
+ def l1(*arg, **kw):
+ canary.append("l1")
+ def l2(*arg, **kw):
+ canary.append("l2")
+ def l3(*arg, **kw):
+ canary.append("l3")
+
+ eng = engines.testing_engine(options={'execution_options':
+ {'base': 'x1'}})
+ event.listen(eng, "before_execute", l1)
+
+ eng1 = eng.execution_options(foo="b1")
+ event.listen(eng, "before_execute", l2)
+ event.listen(eng1, "before_execute", l3)
+
+ eng.execute(select([1]))
+ eng1.execute(select([1]))
+
+ eq_(canary, ["l1", "l2", "l3", "l1", "l2"])
+
+ @testing.requires.ad_hoc_engines
+ def test_generative_engine_event_dispatch_hasevents(self):
+ def l1(*arg, **kw):
+ pass
+ eng = create_engine(testing.db.url)
+ assert not eng._has_events
+ event.listen(eng, "before_execute", l1)
+ eng2 = eng.execution_options(foo='bar')
+ assert eng2._has_events
+
def test_unicode_test_fails_warning(self):
class MockCursor(engines.DBAPIProxyCursor):
def execute(self, stmt, params=None, **kw):
@@ -1018,6 +1078,7 @@ class EngineEventsTest(fixtures.TestBase):
def tearDown(self):
Engine.dispatch._clear()
+ Engine._has_events = False
def _assert_stmts(self, expected, received):
orig = list(received)
diff --git a/test/ext/test_horizontal_shard.py b/test/ext/test_horizontal_shard.py
index 0b9b89d2b..024510886 100644
--- a/test/ext/test_horizontal_shard.py
+++ b/test/ext/test_horizontal_shard.py
@@ -5,6 +5,7 @@ from sqlalchemy import sql, util
from sqlalchemy.orm import *
from sqlalchemy.ext.horizontal_shard import ShardedSession
from sqlalchemy.sql import operators
+from sqlalchemy import pool
from sqlalchemy.testing import fixtures
from sqlalchemy import testing
from sqlalchemy.testing.engines import testing_engine
@@ -13,19 +14,19 @@ from nose import SkipTest
# TODO: ShardTest can be turned into a base for further subclasses
-class ShardTest(fixtures.TestBase):
+
+
+
+class ShardTest(object):
__skip_if__ = (lambda: util.win32,)
+ __requires__ = 'sqlite',
+
+ schema = None
def setUp(self):
global db1, db2, db3, db4, weather_locations, weather_reports
- try:
- db1 = testing_engine('sqlite:///shard1.db', options=dict(pool_threadlocal=True))
- except ImportError:
- raise SkipTest('Requires sqlite')
- db2 = testing_engine('sqlite:///shard2.db')
- db3 = testing_engine('sqlite:///shard3.db')
- db4 = testing_engine('sqlite:///shard4.db')
+ db1, db2, db3, db4 = self._init_dbs()
meta = MetaData()
ids = Table('ids', meta,
@@ -36,13 +37,14 @@ class ShardTest(fixtures.TestBase):
c = db1.contextual_connect()
nextid = c.execute(ids.select(for_update=True)).scalar()
- c.execute(ids.update(values={ids.c.nextid : ids.c.nextid + 1}))
+ c.execute(ids.update(values={ids.c.nextid: ids.c.nextid + 1}))
return nextid
weather_locations = Table("weather_locations", meta,
Column('id', Integer, primary_key=True, default=id_generator),
Column('continent', String(30), nullable=False),
- Column('city', String(50), nullable=False)
+ Column('city', String(50), nullable=False),
+ schema=self.schema
)
weather_reports = Table(
@@ -50,10 +52,11 @@ class ShardTest(fixtures.TestBase):
meta,
Column('id', Integer, primary_key=True),
Column('location_id', Integer,
- ForeignKey('weather_locations.id')),
+ ForeignKey(weather_locations.c.id)),
Column('temperature', Float),
Column('report_time', DateTime,
default=datetime.datetime.now),
+ schema=self.schema
)
for db in (db1, db2, db3, db4):
@@ -64,13 +67,6 @@ class ShardTest(fixtures.TestBase):
self.setup_session()
self.setup_mappers()
- def tearDown(self):
- clear_mappers()
-
- for db in (db1, db2, db3, db4):
- db.connect().invalidate()
- for i in range(1,5):
- os.remove("shard%d.db" % i)
@classmethod
def setup_session(cls):
@@ -139,11 +135,12 @@ class ShardTest(fixtures.TestBase):
self.temperature = temperature
mapper(WeatherLocation, weather_locations, properties={
- 'reports':relationship(Report, backref='location'),
+ 'reports': relationship(Report, backref='location'),
'city': deferred(weather_locations.c.city),
})
mapper(Report, weather_reports)
+
def _fixture_data(self):
tokyo = WeatherLocation('Asia', 'Tokyo')
newyork = WeatherLocation('North America', 'New York')
@@ -204,7 +201,8 @@ class ShardTest(fixtures.TestBase):
event.listen(WeatherLocation, "load", load)
sess = self._fixture_data()
- tokyo = sess.query(WeatherLocation).filter_by(city="Tokyo").set_shard("asia").one()
+ tokyo = sess.query(WeatherLocation).\
+ filter_by(city="Tokyo").set_shard("asia").one()
sess.query(WeatherLocation).all()
eq_(
@@ -212,4 +210,50 @@ class ShardTest(fixtures.TestBase):
['asia', 'north_america', 'north_america',
'europe', 'europe', 'south_america',
'south_america']
- ) \ No newline at end of file
+ )
+
+class DistinctEngineShardTest(ShardTest, fixtures.TestBase):
+
+ def _init_dbs(self):
+ db1 = testing_engine('sqlite:///shard1.db',
+ options=dict(pool_threadlocal=True))
+ db2 = testing_engine('sqlite:///shard2.db')
+ db3 = testing_engine('sqlite:///shard3.db')
+ db4 = testing_engine('sqlite:///shard4.db')
+
+ return db1, db2, db3, db4
+
+ def tearDown(self):
+ clear_mappers()
+
+ for db in (db1, db2, db3, db4):
+ db.connect().invalidate()
+ for i in range(1, 5):
+ os.remove("shard%d.db" % i)
+
+class AttachedFileShardTest(ShardTest, fixtures.TestBase):
+ schema = "changeme"
+
+ def _init_dbs(self):
+ db1 = testing_engine('sqlite://', options={"execution_options":
+ {"shard_id": "shard1"}})
+ assert db1._has_events
+
+ db2 = db1.execution_options(shard_id="shard2")
+ db3 = db1.execution_options(shard_id="shard3")
+ db4 = db1.execution_options(shard_id="shard4")
+
+ import re
+ @event.listens_for(db1, "before_cursor_execute", retval=True)
+ def _switch_shard(conn, cursor, stmt, params, context, executemany):
+ shard_id = conn._execution_options['shard_id']
+ # because SQLite can't just give us a "use" statement, we have
+ # to use the schema hack to locate table names
+ if shard_id:
+ stmt = re.sub(r"\"?changeme\"?\.", shard_id + "_", stmt)
+
+ return stmt, params
+
+ return db1, db2, db3, db4
+
+