diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2010-01-16 22:44:04 +0000 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2010-01-16 22:44:04 +0000 |
commit | abccc0624228def744b0382e84f01cf95e0d3aed (patch) | |
tree | fca7eb29b90211daa699da6d0358f81243c243d9 | |
parent | 00df05061e7a0333022d02705c21270f9de4edab (diff) | |
download | sqlalchemy-abccc0624228def744b0382e84f01cf95e0d3aed.tar.gz |
- added "statement_options()" to Query, to so options can be
passed to the resulting statement. Currently only
Select-statements have these options, and the only option
used is "stream_results", and the only dialect which knows
"stream_results" is psycopg2.
- Query.yield_per() will set the "stream_results" statement
option automatically.
- Added "statement_options()" to Selects, which set statement
specific options. These enable e.g. dialect specific options
such as whether to enable using server side cursors, etc.
- The psycopg2 now respects the statement option
"stream_results". This option overrides the connection setting
"server_side_cursors". If true, server side cursors will be
used for the statement. If false, they will not be used, even
if "server_side_cursors" is true on the
connection. [ticket:1619]
- added a "frozendict" from http://code.activestate.com/recipes/414283/,
adding more default collections as immutable class vars on
Query, Insert, Select
-rw-r--r-- | CHANGES | 20 | ||||
-rw-r--r-- | lib/sqlalchemy/dialects/postgresql/psycopg2.py | 40 | ||||
-rw-r--r-- | lib/sqlalchemy/engine/default.py | 4 | ||||
-rw-r--r-- | lib/sqlalchemy/orm/query.py | 40 | ||||
-rw-r--r-- | lib/sqlalchemy/schema.py | 3 | ||||
-rw-r--r-- | lib/sqlalchemy/sql/expression.py | 65 | ||||
-rw-r--r-- | lib/sqlalchemy/util.py | 20 | ||||
-rw-r--r-- | test/aaa_profiling/test_orm.py | 2 | ||||
-rw-r--r-- | test/dialect/test_postgresql.py | 72 | ||||
-rw-r--r-- | test/orm/test_query.py | 27 | ||||
-rw-r--r-- | test/sql/test_generative.py | 21 |
11 files changed, 267 insertions, 47 deletions
@@ -176,6 +176,15 @@ CHANGES - The version_id_col feature on mapper() will raise a warning when used with dialects that don't support "rowcount" adequately. [ticket:1569] + + - added "statement_options()" to Query, to so options can be + passed to the resulting statement. Currently only + Select-statements have these options, and the only option + used is "stream_results", and the only dialect which knows + "stream_results" is psycopg2. + + - Query.yield_per() will set the "stream_results" statement + option automatically. - Deprecated or removed: * 'allow_null_pks' flag on mapper() is deprecated. It does @@ -294,6 +303,10 @@ CHANGES instead simply not querying, or modifying the criterion as appropriate for more complex situations. [ticket:1628] + + - Added "statement_options()" to Selects, which set statement + specific options. These enable e.g. dialect specific options + such as whether to enable using server side cursors, etc. - Deprecated or removed: * "scalar" flag on select() is removed, use @@ -596,6 +609,13 @@ CHANGES - postgresql dialect can properly detect pg "devel" version strings, i.e. "8.5devel" [ticket:1636] + + - The psycopg2 now respects the statement option + "stream_results". This option overrides the connection setting + "server_side_cursors". If true, server side cursors will be + used for the statement. If false, they will not be used, even + if "server_side_cursors" is true on the + connection. [ticket:1619] - mysql - New dialects: oursql, a new native dialect, diff --git a/lib/sqlalchemy/dialects/postgresql/psycopg2.py b/lib/sqlalchemy/dialects/postgresql/psycopg2.py index a46fdbddb..7733aadcd 100644 --- a/lib/sqlalchemy/dialects/postgresql/psycopg2.py +++ b/lib/sqlalchemy/dialects/postgresql/psycopg2.py @@ -35,6 +35,15 @@ Transactions The psycopg2 dialect fully supports SAVEPOINT and two-phase commit operations. +Statement options +----------------- + +The following statement options are respected: + +* *stream_results* - Enable or disable usage of server side cursors for the SELECT-statement. + If *None* or not set, the *server_side_cursors* option of the connection is used. If + auto-commit is enabled, the option is ignored. + """ import decimal, random, re @@ -93,8 +102,9 @@ class _PGArray(ARRAY): if isinstance(self.item_type, sqltypes.String) and \ self.item_type.convert_unicode: self.item_type.convert_unicode = "force" - -# TODO: filter out 'FOR UPDATE' statements + +# When we're handed literal SQL, ensure it's a SELECT-query. Since +# 8.3, combining cursors and "FOR UPDATE" has been fine. SERVER_SIDE_CURSOR_RE = re.compile( r'\s*SELECT', re.I | re.UNICODE) @@ -102,16 +112,22 @@ SERVER_SIDE_CURSOR_RE = re.compile( class PostgreSQL_psycopg2ExecutionContext(PGExecutionContext): def create_cursor(self): # TODO: coverage for server side cursors + select.for_update() - is_server_side = \ - self.dialect.server_side_cursors and \ - not self.should_autocommit and \ - ((self.compiled and isinstance(self.compiled.statement, expression.Selectable) - and not getattr(self.compiled.statement, 'for_update', False)) \ - or \ - ( - (not self.compiled or isinstance(self.compiled.statement, expression._TextClause)) - and self.statement and SERVER_SIDE_CURSOR_RE.match(self.statement)) - ) + stream_results_option = self.statement_options.get('stream_results') + is_server_side = ( + # Enabled for this statement ... + (stream_results_option or + # ... or enabled for all statements + (self.dialect.server_side_cursors and + # ... and not explicitly disabled for this one. + (stream_results_option or stream_results_option is None)) + ) and ( + # But don't use SS-cursors when autocommit is on ... + (not self.should_autocommit and + self.compiled and isinstance(self.compiled.statement, expression.Selectable)) + or ( + # ... or if it's not even a SELECT. + (not self.compiled or isinstance(self.compiled.statement, expression._TextClause)) + and self.statement and SERVER_SIDE_CURSOR_RE.match(self.statement)))) self.__is_server_side = is_server_side if is_server_side: diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py index 1bdeca377..baa92b5d4 100644 --- a/lib/sqlalchemy/engine/default.py +++ b/lib/sqlalchemy/engine/default.py @@ -223,6 +223,7 @@ class DefaultDialect(base.Dialect): class DefaultExecutionContext(base.ExecutionContext): + statement_options = util.frozendict() def __init__(self, dialect, connection, compiled_sql=None, compiled_ddl=None, statement=None, parameters=None): self.dialect = dialect @@ -249,7 +250,7 @@ class DefaultExecutionContext(base.ExecutionContext): if not compiled.can_execute: raise exc.ArgumentError("Not an executable clause: %s" % compiled) - + self.processors = dict( (key, value) for key, value in ( (compiled.bind_names[bindparam], @@ -268,6 +269,7 @@ class DefaultExecutionContext(base.ExecutionContext): self.isupdate = compiled.isupdate self.isdelete = compiled.isdelete self.should_autocommit = compiled.statement._autocommit + self.statement_options = compiled.statement._statement_options if self.should_autocommit is expression.PARSE_AUTOCOMMIT: self.should_autocommit = self.should_autocommit_text(self.statement) diff --git a/lib/sqlalchemy/orm/query.py b/lib/sqlalchemy/orm/query.py index 7be068019..444ad37bc 100644 --- a/lib/sqlalchemy/orm/query.py +++ b/lib/sqlalchemy/orm/query.py @@ -79,14 +79,14 @@ class Query(object): _from_obj = () _filter_aliases = None _from_obj_alias = None - _joinpath = _joinpoint = {} + _joinpath = _joinpoint = util.frozendict() + _statement_options = util.frozendict() + _params = util.frozendict() + _attributes = util.frozendict() + _with_options = () def __init__(self, entities, session=None): self.session = session - - self._with_options = [] - self._params = {} - self._attributes = {} self._polymorphic_adapters = {} self._set_entities(entities) @@ -488,9 +488,17 @@ class Query(object): collections will be cleared for a new load when encountered in a subsequent result batch. + Also note that many DBAPIs do not "stream" results, pre-buffering + all rows before making them available, including mysql-python and + psycopg2. yield_per() will also set the ``stream_results`` statement + option to ``True``, which currently is only understood by psycopg2 + and causes server side cursors to be used. + """ self._yield_per = count - + self._statement_options = self._statement_options.copy() + self._statement_options['stream_results'] = True + def get(self, ident): """Return an instance of the object based on the given identifier, or None if not found. @@ -658,7 +666,7 @@ class Query(object): # most MapperOptions write to the '_attributes' dictionary, # so copy that as well self._attributes = self._attributes.copy() - opts = list(util.flatten_iterator(args)) + opts = tuple(util.flatten_iterator(args)) self._with_options = self._with_options + opts if conditional: for opt in opts: @@ -668,6 +676,21 @@ class Query(object): opt.process_query(self) @_generative() + def statement_options(self, **kwargs): + """ Set non-SQL options for the resulting statement, such as dialect-specific options. + + The only option currently understood is ``stream_results=True``, + only used by Psycopg2 to enable "server side cursors". This option + only has a useful effect if used in conjunction with :meth:`~sqlalchemy.orm.query.Query.yield_per()`, + which currently sets ``stream_results`` to ``True`` automatically. + + """ + _statement_options = self._statement_options.copy() + for key, value in kwargs.items(): + _statement_options[key] = value + self._statement_options = _statement_options + + @_generative() def with_lockmode(self, mode): """Return a new Query object with the specified locking mode.""" @@ -1915,7 +1938,7 @@ class Query(object): context.adapter = sql_util.ColumnAdapter(inner, equivs) - statement = sql.select([inner] + context.secondary_columns, for_update=for_update, use_labels=labels) + statement = sql.select([inner] + context.secondary_columns, for_update=for_update, use_labels=labels, statement_options=self._statement_options) from_clause = inner for eager_join in eager_joins: @@ -1947,6 +1970,7 @@ class Query(object): for_update=for_update, correlate=False, order_by=context.order_by, + statement_options=self._statement_options, **self._select_args ) diff --git a/lib/sqlalchemy/schema.py b/lib/sqlalchemy/schema.py index 7c9fa58fe..e40b6f592 100644 --- a/lib/sqlalchemy/schema.py +++ b/lib/sqlalchemy/schema.py @@ -2027,10 +2027,9 @@ class SchemaVisitor(visitors.ClauseVisitor): __traverse_options__ = {'schema_visitor':True} -class DDLElement(expression.ClauseElement): +class DDLElement(expression._Executable, expression.ClauseElement): """Base class for DDL expression constructs.""" - supports_execution = True _autocommit = True target = None diff --git a/lib/sqlalchemy/sql/expression.py b/lib/sqlalchemy/sql/expression.py index 742746cbe..2dc13ee82 100644 --- a/lib/sqlalchemy/sql/expression.py +++ b/lib/sqlalchemy/sql/expression.py @@ -2178,8 +2178,13 @@ class _TypeClause(ClauseElement): def __init__(self, type): self.type = type +class _Executable(object): + """Mark a ClauseElement as supporting execution.""" -class _TextClause(ClauseElement): + supports_execution = True + _statement_options = util.frozendict() + +class _TextClause(_Executable, ClauseElement): """Represent a literal SQL text fragment. Public constructor is the :func:`text()` function. @@ -2189,7 +2194,6 @@ class _TextClause(ClauseElement): __visit_name__ = 'textclause' _bind_params_regex = re.compile(r'(?<![:\w\x5c]):(\w+)(?!:)', re.UNICODE) - supports_execution = True @property def _select_iterable(self): @@ -2198,11 +2202,14 @@ class _TextClause(ClauseElement): _hide_froms = [] def __init__(self, text = "", bind=None, - bindparams=None, typemap=None, autocommit=PARSE_AUTOCOMMIT): + bindparams=None, typemap=None, autocommit=PARSE_AUTOCOMMIT, statement_options=None): self._bind = bind self.bindparams = {} self.typemap = typemap self._autocommit = autocommit + self._statement_options = statement_options + if self._statement_options is None: + self._statement_options = {} if typemap is not None: for key in typemap.keys(): typemap[key] = sqltypes.to_instance(typemap[key]) @@ -2792,6 +2799,7 @@ class Alias(FromClause): self.supports_execution = baseselectable.supports_execution if self.supports_execution: self._autocommit = baseselectable._autocommit + self._statement_options = baseselectable._statement_options self.element = selectable if alias is None: if self.original.named_with_column: @@ -2845,6 +2853,7 @@ class Alias(FromClause): def bind(self): return self.element.bind + class _Grouping(ColumnElement): """Represent a grouping within a column expression""" @@ -3159,11 +3168,9 @@ def _generative(fn, *args, **kw): fn(self, *args[1:], **kw) return self -class _SelectBaseMixin(object): +class _SelectBaseMixin(_Executable): """Base class for :class:`Select` and ``CompoundSelects``.""" - supports_execution = True - def __init__(self, use_labels=False, for_update=False, @@ -3172,13 +3179,17 @@ class _SelectBaseMixin(object): order_by=None, group_by=None, bind=None, - autocommit=False): + autocommit=False, + statement_options=None): self.use_labels = use_labels self.for_update = for_update self._autocommit = autocommit self._limit = limit self._offset = offset self._bind = bind + self._statement_options = statement_options + if self._statement_options is None: + self._statement_options = dict() self._order_by_clause = ClauseList(*util.to_list(order_by) or []) self._group_by_clause = ClauseList(*util.to_list(group_by) or []) @@ -3290,6 +3301,18 @@ class _SelectBaseMixin(object): def _from_objects(self): return [self] + @_generative + def statement_options(self, **kwargs): + """ Set non-SQL options for the statement, such as dialect-specific options. + + The options available are covered in the respective dialect's section. + + """ + _statement_options = self._statement_options.copy() + for key, value in kwargs.items(): + _statement_options[key] = value + self._statement_options = _statement_options + class _ScalarSelect(_Grouping): _from_objects = [] @@ -3411,7 +3434,9 @@ class Select(_SelectBaseMixin, FromClause): """ __visit_name__ = 'select' - + + _prefixes = () + def __init__(self, columns, whereclause=None, @@ -3468,9 +3493,7 @@ class Select(_SelectBaseMixin, FromClause): self._having = None if prefixes: - self._prefixes = [_literal_as_text(p) for p in prefixes] - else: - self._prefixes = [] + self._prefixes = tuple([_literal_as_text(p) for p in prefixes]) _SelectBaseMixin.__init__(self, **kwargs) @@ -3624,7 +3647,7 @@ class Select(_SelectBaseMixin, FromClause): """ clause = _literal_as_text(clause) - self._prefixes = self._prefixes + [clause] + self._prefixes = self._prefixes + (clause,) @_generative def select_from(self, fromclause): @@ -3682,7 +3705,7 @@ class Select(_SelectBaseMixin, FromClause): """ clause = _literal_as_text(clause) - self._prefixes = self._prefixes.union([clause]) + self._prefixes = self._prefixes + (clause,) def append_whereclause(self, whereclause): """append the given expression to this select() construct's WHERE criterion. @@ -3803,12 +3826,11 @@ class Select(_SelectBaseMixin, FromClause): self._bind = bind bind = property(bind, _set_bind) -class _UpdateBase(ClauseElement): +class _UpdateBase(_Executable, ClauseElement): """Form the base for ``INSERT``, ``UPDATE``, and ``DELETE`` statements.""" __visit_name__ = 'update_base' - supports_execution = True _autocommit = True def _generate(self): @@ -3924,7 +3946,9 @@ class Insert(_ValuesBase): """ __visit_name__ = 'insert' - + + _prefixes = () + def __init__(self, table, values=None, @@ -3939,9 +3963,7 @@ class Insert(_ValuesBase): self.inline = inline self._returning = returning if prefixes: - self._prefixes = [_literal_as_text(p) for p in prefixes] - else: - self._prefixes = [] + self._prefixes = tuple([_literal_as_text(p) for p in prefixes]) self.kwargs = self._process_deprecated_kw(kwargs) @@ -3964,7 +3986,7 @@ class Insert(_ValuesBase): """ clause = _literal_as_text(clause) - self._prefixes = self._prefixes + [clause] + self._prefixes = self._prefixes + (clause,) class Update(_ValuesBase): """Represent an Update construct. @@ -4061,9 +4083,8 @@ class Delete(_UpdateBase): # TODO: coverage self._whereclause = clone(self._whereclause) -class _IdentifiedClause(ClauseElement): +class _IdentifiedClause(_Executable, ClauseElement): __visit_name__ = 'identified' - supports_execution = True _autocommit = False quote = None diff --git a/lib/sqlalchemy/util.py b/lib/sqlalchemy/util.py index c3ae25589..bd988dd20 100644 --- a/lib/sqlalchemy/util.py +++ b/lib/sqlalchemy/util.py @@ -138,7 +138,25 @@ except ImportError: return 'defaultdict(%s, %s)' % (self.default_factory, dict.__repr__(self)) - +class frozendict(dict): + def _blocked_attribute(obj): + raise AttributeError, "A frozendict cannot be modified." + _blocked_attribute = property(_blocked_attribute) + + __delitem__ = __setitem__ = clear = _blocked_attribute + pop = popitem = setdefault = update = _blocked_attribute + + def __new__(cls, *args): + new = dict.__new__(cls) + dict.__init__(new, *args) + return new + + def __init__(self, *args): + pass + + def __repr__(self): + return "frozendict(%s)" % dict.__repr__(self) + def to_list(x, default=None): if x is None: return default diff --git a/test/aaa_profiling/test_orm.py b/test/aaa_profiling/test_orm.py index 327f3226f..a95086509 100644 --- a/test/aaa_profiling/test_orm.py +++ b/test/aaa_profiling/test_orm.py @@ -72,7 +72,7 @@ class MergeTest(_base.MappedTest): p3 = go() - + @testing.fails_on_everything_except('sqlite') @testing.resolve_artifact_names def test_merge_load(self): sess = sessionmaker()() diff --git a/test/dialect/test_postgresql.py b/test/dialect/test_postgresql.py index 9833042fe..5d6bcaf6d 100644 --- a/test/dialect/test_postgresql.py +++ b/test/dialect/test_postgresql.py @@ -1448,6 +1448,78 @@ class ServerSideCursorsTest(TestBase, AssertsExecutionResults): result = ss_engine.execute(select([1])) assert result.cursor.name + + def test_uses_ss_when_explicitly_enabled(self): + engine = engines.testing_engine(options={'server_side_cursors':False}) + result = engine.execute(text("select 1")) + # It should be off globally ... + assert not result.cursor.name + + s = select([1]).statement_options(stream_results=True) + result = engine.execute(s) + # ... but enabled for this one. + assert result.cursor.name + + def test_ss_explicitly_disabled(self): + s = select([1]).statement_options(stream_results=False) + result = ss_engine.execute(s) + assert not result.cursor.name + + def test_aliases_and_ss(self): + engine = engines.testing_engine(options={'server_side_cursors':False}) + s1 = select([1]).statement_options(stream_results=True).alias() + result = engine.execute(s1) + assert result.cursor.name + + # s1's options shouldn't affect s2 when s2 is used as a from_obj. + s2 = select([1], from_obj=s1) + result = engine.execute(s2) + assert not result.cursor.name + + def test_for_update_and_ss(self): + s1 = select([1], for_update=True) + result = ss_engine.execute(s1) + assert result.cursor.name + + result = ss_engine.execute('SELECT 1 FOR UPDATE') + assert result.cursor.name + + def test_orm_queries_with_ss(self): + metadata = MetaData(testing.db) + class Foo(object): pass + footable = Table('foobar', metadata, + Column('id', Integer, primary_key=True), + ) + mapper(Foo, footable) + metadata.create_all() + try: + sess = create_session() + + engine = engines.testing_engine(options={'server_side_cursors':False}) + result = engine.execute(sess.query(Foo).statement) + assert not result.cursor.name, result.cursor.name + result.close() + + q = sess.query(Foo).statement_options(stream_results=True) + result = engine.execute(q.statement) + assert result.cursor.name + result.close() + + result = sess.query(Foo).statement_options(stream_results=True).subquery().execute() + assert result.cursor.name + result.close() + finally: + metadata.drop_all() + + def test_text_with_ss(self): + engine = engines.testing_engine(options={'server_side_cursors':False}) + s = text('select 42') + result = engine.execute(s) + assert not result.cursor.name + s = text('select 42', statement_options=dict(stream_results=True)) + result = engine.execute(s) + assert result.cursor.name + def test_roundtrip(self): test_table = Table('test_table', MetaData(ss_engine), diff --git a/test/orm/test_query.py b/test/orm/test_query.py index ee9f1853b..cd89f4d92 100644 --- a/test/orm/test_query.py +++ b/test/orm/test_query.py @@ -3746,3 +3746,30 @@ class UpdateDeleteTest(_base.MappedTest): assert not (john in sess or jack in sess or jill in sess or jane in sess) eq_(sess.query(User).count(), 0) + +class StatementOptionsTest(QueryTest): + """ Make sure a Query's statement_options are passed on to the + resulting statement. """ + + def test_query_with_statement_option(self): + sess = create_session(bind=testing.db, autocommit=False) + + q1 = sess.query(User) + assert q1._statement_options == dict() + q2 = q1.statement_options(foo='bar', stream_results=True) + # q1's options should be unchanged. + assert q1._statement_options == dict() + # q2 should have them set. + assert q2._statement_options == dict(foo='bar', stream_results=True) + q3 = q2.statement_options(foo='not bar', answer=42) + assert q2._statement_options == dict(foo='bar', stream_results=True) + + q3_options = dict(foo='not bar', stream_results=True, answer=42) + assert q3._statement_options == q3_options + assert q3.statement._statement_options == q3_options + assert q3._compile_context().statement._statement_options == q3_options + assert q3.subquery().original._statement_options == q3_options + + # TODO: Test that statement options are passed on to + # updates/deletes, but currently there are no such options + # applicable for them. diff --git a/test/sql/test_generative.py b/test/sql/test_generative.py index 66c6b6c45..a9a1f59dd 100644 --- a/test/sql/test_generative.py +++ b/test/sql/test_generative.py @@ -785,6 +785,27 @@ class SelectTest(TestBase, AssertsCompiledSQL): self.assert_compile(select_copy, "SELECT FOOBER table1.col1, table1.col2, table1.col3 FROM table1") self.assert_compile(s, "SELECT table1.col1, table1.col2, table1.col3 FROM table1") + def test_statement_options(self): + s = select().statement_options(foo='bar') + s2 = s.statement_options(bar='baz') + s3 = s.statement_options(foo='not bar') + # The original select should not be modified. + assert s._statement_options == dict(foo='bar') + # s2 should have its statement_options based on s, though. + assert s2._statement_options == dict(foo='bar', bar='baz') + assert s3._statement_options == dict(foo='not bar') + + def test_statement_options_in_kwargs(self): + s = select(statement_options=dict(foo='bar')) + s2 = s.statement_options(bar='baz') + # The original select should not be modified. + assert s._statement_options == dict(foo='bar') + # s2 should have its statement_options based on s, though. + assert s2._statement_options == dict(foo='bar', bar='baz') + + def test_statement_options_in_text(self): + s = text('select 42', statement_options=dict(foo='bar')) + assert s._statement_options == dict(foo='bar') class InsertTest(TestBase, AssertsCompiledSQL): """Tests the generative capability of Insert""" |