summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGES20
-rw-r--r--lib/sqlalchemy/dialects/postgresql/psycopg2.py40
-rw-r--r--lib/sqlalchemy/engine/default.py4
-rw-r--r--lib/sqlalchemy/orm/query.py40
-rw-r--r--lib/sqlalchemy/schema.py3
-rw-r--r--lib/sqlalchemy/sql/expression.py65
-rw-r--r--lib/sqlalchemy/util.py20
-rw-r--r--test/aaa_profiling/test_orm.py2
-rw-r--r--test/dialect/test_postgresql.py72
-rw-r--r--test/orm/test_query.py27
-rw-r--r--test/sql/test_generative.py21
11 files changed, 267 insertions, 47 deletions
diff --git a/CHANGES b/CHANGES
index 68d497ca7..aa7001a82 100644
--- a/CHANGES
+++ b/CHANGES
@@ -176,6 +176,15 @@ CHANGES
- The version_id_col feature on mapper() will raise a warning when
used with dialects that don't support "rowcount" adequately.
[ticket:1569]
+
+ - added "statement_options()" to Query, to so options can be
+ passed to the resulting statement. Currently only
+ Select-statements have these options, and the only option
+ used is "stream_results", and the only dialect which knows
+ "stream_results" is psycopg2.
+
+ - Query.yield_per() will set the "stream_results" statement
+ option automatically.
- Deprecated or removed:
* 'allow_null_pks' flag on mapper() is deprecated. It does
@@ -294,6 +303,10 @@ CHANGES
instead simply not querying, or modifying the criterion
as appropriate for more complex situations.
[ticket:1628]
+
+ - Added "statement_options()" to Selects, which set statement
+ specific options. These enable e.g. dialect specific options
+ such as whether to enable using server side cursors, etc.
- Deprecated or removed:
* "scalar" flag on select() is removed, use
@@ -596,6 +609,13 @@ CHANGES
- postgresql dialect can properly detect pg "devel" version
strings, i.e. "8.5devel" [ticket:1636]
+
+ - The psycopg2 now respects the statement option
+ "stream_results". This option overrides the connection setting
+ "server_side_cursors". If true, server side cursors will be
+ used for the statement. If false, they will not be used, even
+ if "server_side_cursors" is true on the
+ connection. [ticket:1619]
- mysql
- New dialects: oursql, a new native dialect,
diff --git a/lib/sqlalchemy/dialects/postgresql/psycopg2.py b/lib/sqlalchemy/dialects/postgresql/psycopg2.py
index a46fdbddb..7733aadcd 100644
--- a/lib/sqlalchemy/dialects/postgresql/psycopg2.py
+++ b/lib/sqlalchemy/dialects/postgresql/psycopg2.py
@@ -35,6 +35,15 @@ Transactions
The psycopg2 dialect fully supports SAVEPOINT and two-phase commit operations.
+Statement options
+-----------------
+
+The following statement options are respected:
+
+* *stream_results* - Enable or disable usage of server side cursors for the SELECT-statement.
+ If *None* or not set, the *server_side_cursors* option of the connection is used. If
+ auto-commit is enabled, the option is ignored.
+
"""
import decimal, random, re
@@ -93,8 +102,9 @@ class _PGArray(ARRAY):
if isinstance(self.item_type, sqltypes.String) and \
self.item_type.convert_unicode:
self.item_type.convert_unicode = "force"
-
-# TODO: filter out 'FOR UPDATE' statements
+
+# When we're handed literal SQL, ensure it's a SELECT-query. Since
+# 8.3, combining cursors and "FOR UPDATE" has been fine.
SERVER_SIDE_CURSOR_RE = re.compile(
r'\s*SELECT',
re.I | re.UNICODE)
@@ -102,16 +112,22 @@ SERVER_SIDE_CURSOR_RE = re.compile(
class PostgreSQL_psycopg2ExecutionContext(PGExecutionContext):
def create_cursor(self):
# TODO: coverage for server side cursors + select.for_update()
- is_server_side = \
- self.dialect.server_side_cursors and \
- not self.should_autocommit and \
- ((self.compiled and isinstance(self.compiled.statement, expression.Selectable)
- and not getattr(self.compiled.statement, 'for_update', False)) \
- or \
- (
- (not self.compiled or isinstance(self.compiled.statement, expression._TextClause))
- and self.statement and SERVER_SIDE_CURSOR_RE.match(self.statement))
- )
+ stream_results_option = self.statement_options.get('stream_results')
+ is_server_side = (
+ # Enabled for this statement ...
+ (stream_results_option or
+ # ... or enabled for all statements
+ (self.dialect.server_side_cursors and
+ # ... and not explicitly disabled for this one.
+ (stream_results_option or stream_results_option is None))
+ ) and (
+ # But don't use SS-cursors when autocommit is on ...
+ (not self.should_autocommit and
+ self.compiled and isinstance(self.compiled.statement, expression.Selectable))
+ or (
+ # ... or if it's not even a SELECT.
+ (not self.compiled or isinstance(self.compiled.statement, expression._TextClause))
+ and self.statement and SERVER_SIDE_CURSOR_RE.match(self.statement))))
self.__is_server_side = is_server_side
if is_server_side:
diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py
index 1bdeca377..baa92b5d4 100644
--- a/lib/sqlalchemy/engine/default.py
+++ b/lib/sqlalchemy/engine/default.py
@@ -223,6 +223,7 @@ class DefaultDialect(base.Dialect):
class DefaultExecutionContext(base.ExecutionContext):
+ statement_options = util.frozendict()
def __init__(self, dialect, connection, compiled_sql=None, compiled_ddl=None, statement=None, parameters=None):
self.dialect = dialect
@@ -249,7 +250,7 @@ class DefaultExecutionContext(base.ExecutionContext):
if not compiled.can_execute:
raise exc.ArgumentError("Not an executable clause: %s" % compiled)
-
+
self.processors = dict(
(key, value) for key, value in
( (compiled.bind_names[bindparam],
@@ -268,6 +269,7 @@ class DefaultExecutionContext(base.ExecutionContext):
self.isupdate = compiled.isupdate
self.isdelete = compiled.isdelete
self.should_autocommit = compiled.statement._autocommit
+ self.statement_options = compiled.statement._statement_options
if self.should_autocommit is expression.PARSE_AUTOCOMMIT:
self.should_autocommit = self.should_autocommit_text(self.statement)
diff --git a/lib/sqlalchemy/orm/query.py b/lib/sqlalchemy/orm/query.py
index 7be068019..444ad37bc 100644
--- a/lib/sqlalchemy/orm/query.py
+++ b/lib/sqlalchemy/orm/query.py
@@ -79,14 +79,14 @@ class Query(object):
_from_obj = ()
_filter_aliases = None
_from_obj_alias = None
- _joinpath = _joinpoint = {}
+ _joinpath = _joinpoint = util.frozendict()
+ _statement_options = util.frozendict()
+ _params = util.frozendict()
+ _attributes = util.frozendict()
+ _with_options = ()
def __init__(self, entities, session=None):
self.session = session
-
- self._with_options = []
- self._params = {}
- self._attributes = {}
self._polymorphic_adapters = {}
self._set_entities(entities)
@@ -488,9 +488,17 @@ class Query(object):
collections will be cleared for a new load when encountered in a
subsequent result batch.
+ Also note that many DBAPIs do not "stream" results, pre-buffering
+ all rows before making them available, including mysql-python and
+ psycopg2. yield_per() will also set the ``stream_results`` statement
+ option to ``True``, which currently is only understood by psycopg2
+ and causes server side cursors to be used.
+
"""
self._yield_per = count
-
+ self._statement_options = self._statement_options.copy()
+ self._statement_options['stream_results'] = True
+
def get(self, ident):
"""Return an instance of the object based on the given identifier, or None if not found.
@@ -658,7 +666,7 @@ class Query(object):
# most MapperOptions write to the '_attributes' dictionary,
# so copy that as well
self._attributes = self._attributes.copy()
- opts = list(util.flatten_iterator(args))
+ opts = tuple(util.flatten_iterator(args))
self._with_options = self._with_options + opts
if conditional:
for opt in opts:
@@ -668,6 +676,21 @@ class Query(object):
opt.process_query(self)
@_generative()
+ def statement_options(self, **kwargs):
+ """ Set non-SQL options for the resulting statement, such as dialect-specific options.
+
+ The only option currently understood is ``stream_results=True``,
+ only used by Psycopg2 to enable "server side cursors". This option
+ only has a useful effect if used in conjunction with :meth:`~sqlalchemy.orm.query.Query.yield_per()`,
+ which currently sets ``stream_results`` to ``True`` automatically.
+
+ """
+ _statement_options = self._statement_options.copy()
+ for key, value in kwargs.items():
+ _statement_options[key] = value
+ self._statement_options = _statement_options
+
+ @_generative()
def with_lockmode(self, mode):
"""Return a new Query object with the specified locking mode."""
@@ -1915,7 +1938,7 @@ class Query(object):
context.adapter = sql_util.ColumnAdapter(inner, equivs)
- statement = sql.select([inner] + context.secondary_columns, for_update=for_update, use_labels=labels)
+ statement = sql.select([inner] + context.secondary_columns, for_update=for_update, use_labels=labels, statement_options=self._statement_options)
from_clause = inner
for eager_join in eager_joins:
@@ -1947,6 +1970,7 @@ class Query(object):
for_update=for_update,
correlate=False,
order_by=context.order_by,
+ statement_options=self._statement_options,
**self._select_args
)
diff --git a/lib/sqlalchemy/schema.py b/lib/sqlalchemy/schema.py
index 7c9fa58fe..e40b6f592 100644
--- a/lib/sqlalchemy/schema.py
+++ b/lib/sqlalchemy/schema.py
@@ -2027,10 +2027,9 @@ class SchemaVisitor(visitors.ClauseVisitor):
__traverse_options__ = {'schema_visitor':True}
-class DDLElement(expression.ClauseElement):
+class DDLElement(expression._Executable, expression.ClauseElement):
"""Base class for DDL expression constructs."""
- supports_execution = True
_autocommit = True
target = None
diff --git a/lib/sqlalchemy/sql/expression.py b/lib/sqlalchemy/sql/expression.py
index 742746cbe..2dc13ee82 100644
--- a/lib/sqlalchemy/sql/expression.py
+++ b/lib/sqlalchemy/sql/expression.py
@@ -2178,8 +2178,13 @@ class _TypeClause(ClauseElement):
def __init__(self, type):
self.type = type
+class _Executable(object):
+ """Mark a ClauseElement as supporting execution."""
-class _TextClause(ClauseElement):
+ supports_execution = True
+ _statement_options = util.frozendict()
+
+class _TextClause(_Executable, ClauseElement):
"""Represent a literal SQL text fragment.
Public constructor is the :func:`text()` function.
@@ -2189,7 +2194,6 @@ class _TextClause(ClauseElement):
__visit_name__ = 'textclause'
_bind_params_regex = re.compile(r'(?<![:\w\x5c]):(\w+)(?!:)', re.UNICODE)
- supports_execution = True
@property
def _select_iterable(self):
@@ -2198,11 +2202,14 @@ class _TextClause(ClauseElement):
_hide_froms = []
def __init__(self, text = "", bind=None,
- bindparams=None, typemap=None, autocommit=PARSE_AUTOCOMMIT):
+ bindparams=None, typemap=None, autocommit=PARSE_AUTOCOMMIT, statement_options=None):
self._bind = bind
self.bindparams = {}
self.typemap = typemap
self._autocommit = autocommit
+ self._statement_options = statement_options
+ if self._statement_options is None:
+ self._statement_options = {}
if typemap is not None:
for key in typemap.keys():
typemap[key] = sqltypes.to_instance(typemap[key])
@@ -2792,6 +2799,7 @@ class Alias(FromClause):
self.supports_execution = baseselectable.supports_execution
if self.supports_execution:
self._autocommit = baseselectable._autocommit
+ self._statement_options = baseselectable._statement_options
self.element = selectable
if alias is None:
if self.original.named_with_column:
@@ -2845,6 +2853,7 @@ class Alias(FromClause):
def bind(self):
return self.element.bind
+
class _Grouping(ColumnElement):
"""Represent a grouping within a column expression"""
@@ -3159,11 +3168,9 @@ def _generative(fn, *args, **kw):
fn(self, *args[1:], **kw)
return self
-class _SelectBaseMixin(object):
+class _SelectBaseMixin(_Executable):
"""Base class for :class:`Select` and ``CompoundSelects``."""
- supports_execution = True
-
def __init__(self,
use_labels=False,
for_update=False,
@@ -3172,13 +3179,17 @@ class _SelectBaseMixin(object):
order_by=None,
group_by=None,
bind=None,
- autocommit=False):
+ autocommit=False,
+ statement_options=None):
self.use_labels = use_labels
self.for_update = for_update
self._autocommit = autocommit
self._limit = limit
self._offset = offset
self._bind = bind
+ self._statement_options = statement_options
+ if self._statement_options is None:
+ self._statement_options = dict()
self._order_by_clause = ClauseList(*util.to_list(order_by) or [])
self._group_by_clause = ClauseList(*util.to_list(group_by) or [])
@@ -3290,6 +3301,18 @@ class _SelectBaseMixin(object):
def _from_objects(self):
return [self]
+ @_generative
+ def statement_options(self, **kwargs):
+ """ Set non-SQL options for the statement, such as dialect-specific options.
+
+ The options available are covered in the respective dialect's section.
+
+ """
+ _statement_options = self._statement_options.copy()
+ for key, value in kwargs.items():
+ _statement_options[key] = value
+ self._statement_options = _statement_options
+
class _ScalarSelect(_Grouping):
_from_objects = []
@@ -3411,7 +3434,9 @@ class Select(_SelectBaseMixin, FromClause):
"""
__visit_name__ = 'select'
-
+
+ _prefixes = ()
+
def __init__(self,
columns,
whereclause=None,
@@ -3468,9 +3493,7 @@ class Select(_SelectBaseMixin, FromClause):
self._having = None
if prefixes:
- self._prefixes = [_literal_as_text(p) for p in prefixes]
- else:
- self._prefixes = []
+ self._prefixes = tuple([_literal_as_text(p) for p in prefixes])
_SelectBaseMixin.__init__(self, **kwargs)
@@ -3624,7 +3647,7 @@ class Select(_SelectBaseMixin, FromClause):
"""
clause = _literal_as_text(clause)
- self._prefixes = self._prefixes + [clause]
+ self._prefixes = self._prefixes + (clause,)
@_generative
def select_from(self, fromclause):
@@ -3682,7 +3705,7 @@ class Select(_SelectBaseMixin, FromClause):
"""
clause = _literal_as_text(clause)
- self._prefixes = self._prefixes.union([clause])
+ self._prefixes = self._prefixes + (clause,)
def append_whereclause(self, whereclause):
"""append the given expression to this select() construct's WHERE criterion.
@@ -3803,12 +3826,11 @@ class Select(_SelectBaseMixin, FromClause):
self._bind = bind
bind = property(bind, _set_bind)
-class _UpdateBase(ClauseElement):
+class _UpdateBase(_Executable, ClauseElement):
"""Form the base for ``INSERT``, ``UPDATE``, and ``DELETE`` statements."""
__visit_name__ = 'update_base'
- supports_execution = True
_autocommit = True
def _generate(self):
@@ -3924,7 +3946,9 @@ class Insert(_ValuesBase):
"""
__visit_name__ = 'insert'
-
+
+ _prefixes = ()
+
def __init__(self,
table,
values=None,
@@ -3939,9 +3963,7 @@ class Insert(_ValuesBase):
self.inline = inline
self._returning = returning
if prefixes:
- self._prefixes = [_literal_as_text(p) for p in prefixes]
- else:
- self._prefixes = []
+ self._prefixes = tuple([_literal_as_text(p) for p in prefixes])
self.kwargs = self._process_deprecated_kw(kwargs)
@@ -3964,7 +3986,7 @@ class Insert(_ValuesBase):
"""
clause = _literal_as_text(clause)
- self._prefixes = self._prefixes + [clause]
+ self._prefixes = self._prefixes + (clause,)
class Update(_ValuesBase):
"""Represent an Update construct.
@@ -4061,9 +4083,8 @@ class Delete(_UpdateBase):
# TODO: coverage
self._whereclause = clone(self._whereclause)
-class _IdentifiedClause(ClauseElement):
+class _IdentifiedClause(_Executable, ClauseElement):
__visit_name__ = 'identified'
- supports_execution = True
_autocommit = False
quote = None
diff --git a/lib/sqlalchemy/util.py b/lib/sqlalchemy/util.py
index c3ae25589..bd988dd20 100644
--- a/lib/sqlalchemy/util.py
+++ b/lib/sqlalchemy/util.py
@@ -138,7 +138,25 @@ except ImportError:
return 'defaultdict(%s, %s)' % (self.default_factory,
dict.__repr__(self))
-
+class frozendict(dict):
+ def _blocked_attribute(obj):
+ raise AttributeError, "A frozendict cannot be modified."
+ _blocked_attribute = property(_blocked_attribute)
+
+ __delitem__ = __setitem__ = clear = _blocked_attribute
+ pop = popitem = setdefault = update = _blocked_attribute
+
+ def __new__(cls, *args):
+ new = dict.__new__(cls)
+ dict.__init__(new, *args)
+ return new
+
+ def __init__(self, *args):
+ pass
+
+ def __repr__(self):
+ return "frozendict(%s)" % dict.__repr__(self)
+
def to_list(x, default=None):
if x is None:
return default
diff --git a/test/aaa_profiling/test_orm.py b/test/aaa_profiling/test_orm.py
index 327f3226f..a95086509 100644
--- a/test/aaa_profiling/test_orm.py
+++ b/test/aaa_profiling/test_orm.py
@@ -72,7 +72,7 @@ class MergeTest(_base.MappedTest):
p3 = go()
-
+ @testing.fails_on_everything_except('sqlite')
@testing.resolve_artifact_names
def test_merge_load(self):
sess = sessionmaker()()
diff --git a/test/dialect/test_postgresql.py b/test/dialect/test_postgresql.py
index 9833042fe..5d6bcaf6d 100644
--- a/test/dialect/test_postgresql.py
+++ b/test/dialect/test_postgresql.py
@@ -1448,6 +1448,78 @@ class ServerSideCursorsTest(TestBase, AssertsExecutionResults):
result = ss_engine.execute(select([1]))
assert result.cursor.name
+
+ def test_uses_ss_when_explicitly_enabled(self):
+ engine = engines.testing_engine(options={'server_side_cursors':False})
+ result = engine.execute(text("select 1"))
+ # It should be off globally ...
+ assert not result.cursor.name
+
+ s = select([1]).statement_options(stream_results=True)
+ result = engine.execute(s)
+ # ... but enabled for this one.
+ assert result.cursor.name
+
+ def test_ss_explicitly_disabled(self):
+ s = select([1]).statement_options(stream_results=False)
+ result = ss_engine.execute(s)
+ assert not result.cursor.name
+
+ def test_aliases_and_ss(self):
+ engine = engines.testing_engine(options={'server_side_cursors':False})
+ s1 = select([1]).statement_options(stream_results=True).alias()
+ result = engine.execute(s1)
+ assert result.cursor.name
+
+ # s1's options shouldn't affect s2 when s2 is used as a from_obj.
+ s2 = select([1], from_obj=s1)
+ result = engine.execute(s2)
+ assert not result.cursor.name
+
+ def test_for_update_and_ss(self):
+ s1 = select([1], for_update=True)
+ result = ss_engine.execute(s1)
+ assert result.cursor.name
+
+ result = ss_engine.execute('SELECT 1 FOR UPDATE')
+ assert result.cursor.name
+
+ def test_orm_queries_with_ss(self):
+ metadata = MetaData(testing.db)
+ class Foo(object): pass
+ footable = Table('foobar', metadata,
+ Column('id', Integer, primary_key=True),
+ )
+ mapper(Foo, footable)
+ metadata.create_all()
+ try:
+ sess = create_session()
+
+ engine = engines.testing_engine(options={'server_side_cursors':False})
+ result = engine.execute(sess.query(Foo).statement)
+ assert not result.cursor.name, result.cursor.name
+ result.close()
+
+ q = sess.query(Foo).statement_options(stream_results=True)
+ result = engine.execute(q.statement)
+ assert result.cursor.name
+ result.close()
+
+ result = sess.query(Foo).statement_options(stream_results=True).subquery().execute()
+ assert result.cursor.name
+ result.close()
+ finally:
+ metadata.drop_all()
+
+ def test_text_with_ss(self):
+ engine = engines.testing_engine(options={'server_side_cursors':False})
+ s = text('select 42')
+ result = engine.execute(s)
+ assert not result.cursor.name
+ s = text('select 42', statement_options=dict(stream_results=True))
+ result = engine.execute(s)
+ assert result.cursor.name
+
def test_roundtrip(self):
test_table = Table('test_table', MetaData(ss_engine),
diff --git a/test/orm/test_query.py b/test/orm/test_query.py
index ee9f1853b..cd89f4d92 100644
--- a/test/orm/test_query.py
+++ b/test/orm/test_query.py
@@ -3746,3 +3746,30 @@ class UpdateDeleteTest(_base.MappedTest):
assert not (john in sess or jack in sess or jill in sess or jane in sess)
eq_(sess.query(User).count(), 0)
+
+class StatementOptionsTest(QueryTest):
+ """ Make sure a Query's statement_options are passed on to the
+ resulting statement. """
+
+ def test_query_with_statement_option(self):
+ sess = create_session(bind=testing.db, autocommit=False)
+
+ q1 = sess.query(User)
+ assert q1._statement_options == dict()
+ q2 = q1.statement_options(foo='bar', stream_results=True)
+ # q1's options should be unchanged.
+ assert q1._statement_options == dict()
+ # q2 should have them set.
+ assert q2._statement_options == dict(foo='bar', stream_results=True)
+ q3 = q2.statement_options(foo='not bar', answer=42)
+ assert q2._statement_options == dict(foo='bar', stream_results=True)
+
+ q3_options = dict(foo='not bar', stream_results=True, answer=42)
+ assert q3._statement_options == q3_options
+ assert q3.statement._statement_options == q3_options
+ assert q3._compile_context().statement._statement_options == q3_options
+ assert q3.subquery().original._statement_options == q3_options
+
+ # TODO: Test that statement options are passed on to
+ # updates/deletes, but currently there are no such options
+ # applicable for them.
diff --git a/test/sql/test_generative.py b/test/sql/test_generative.py
index 66c6b6c45..a9a1f59dd 100644
--- a/test/sql/test_generative.py
+++ b/test/sql/test_generative.py
@@ -785,6 +785,27 @@ class SelectTest(TestBase, AssertsCompiledSQL):
self.assert_compile(select_copy, "SELECT FOOBER table1.col1, table1.col2, table1.col3 FROM table1")
self.assert_compile(s, "SELECT table1.col1, table1.col2, table1.col3 FROM table1")
+ def test_statement_options(self):
+ s = select().statement_options(foo='bar')
+ s2 = s.statement_options(bar='baz')
+ s3 = s.statement_options(foo='not bar')
+ # The original select should not be modified.
+ assert s._statement_options == dict(foo='bar')
+ # s2 should have its statement_options based on s, though.
+ assert s2._statement_options == dict(foo='bar', bar='baz')
+ assert s3._statement_options == dict(foo='not bar')
+
+ def test_statement_options_in_kwargs(self):
+ s = select(statement_options=dict(foo='bar'))
+ s2 = s.statement_options(bar='baz')
+ # The original select should not be modified.
+ assert s._statement_options == dict(foo='bar')
+ # s2 should have its statement_options based on s, though.
+ assert s2._statement_options == dict(foo='bar', bar='baz')
+
+ def test_statement_options_in_text(self):
+ s = text('select 42', statement_options=dict(foo='bar'))
+ assert s._statement_options == dict(foo='bar')
class InsertTest(TestBase, AssertsCompiledSQL):
"""Tests the generative capability of Insert"""