summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2010-01-16 22:44:04 +0000
committerMike Bayer <mike_mp@zzzcomputing.com>2010-01-16 22:44:04 +0000
commitabccc0624228def744b0382e84f01cf95e0d3aed (patch)
treefca7eb29b90211daa699da6d0358f81243c243d9 /lib/sqlalchemy
parent00df05061e7a0333022d02705c21270f9de4edab (diff)
downloadsqlalchemy-abccc0624228def744b0382e84f01cf95e0d3aed.tar.gz
- added "statement_options()" to Query, to so options can be
passed to the resulting statement. Currently only Select-statements have these options, and the only option used is "stream_results", and the only dialect which knows "stream_results" is psycopg2. - Query.yield_per() will set the "stream_results" statement option automatically. - Added "statement_options()" to Selects, which set statement specific options. These enable e.g. dialect specific options such as whether to enable using server side cursors, etc. - The psycopg2 now respects the statement option "stream_results". This option overrides the connection setting "server_side_cursors". If true, server side cursors will be used for the statement. If false, they will not be used, even if "server_side_cursors" is true on the connection. [ticket:1619] - added a "frozendict" from http://code.activestate.com/recipes/414283/, adding more default collections as immutable class vars on Query, Insert, Select
Diffstat (limited to 'lib/sqlalchemy')
-rw-r--r--lib/sqlalchemy/dialects/postgresql/psycopg2.py40
-rw-r--r--lib/sqlalchemy/engine/default.py4
-rw-r--r--lib/sqlalchemy/orm/query.py40
-rw-r--r--lib/sqlalchemy/schema.py3
-rw-r--r--lib/sqlalchemy/sql/expression.py65
-rw-r--r--lib/sqlalchemy/util.py20
6 files changed, 126 insertions, 46 deletions
diff --git a/lib/sqlalchemy/dialects/postgresql/psycopg2.py b/lib/sqlalchemy/dialects/postgresql/psycopg2.py
index a46fdbddb..7733aadcd 100644
--- a/lib/sqlalchemy/dialects/postgresql/psycopg2.py
+++ b/lib/sqlalchemy/dialects/postgresql/psycopg2.py
@@ -35,6 +35,15 @@ Transactions
The psycopg2 dialect fully supports SAVEPOINT and two-phase commit operations.
+Statement options
+-----------------
+
+The following statement options are respected:
+
+* *stream_results* - Enable or disable usage of server side cursors for the SELECT-statement.
+ If *None* or not set, the *server_side_cursors* option of the connection is used. If
+ auto-commit is enabled, the option is ignored.
+
"""
import decimal, random, re
@@ -93,8 +102,9 @@ class _PGArray(ARRAY):
if isinstance(self.item_type, sqltypes.String) and \
self.item_type.convert_unicode:
self.item_type.convert_unicode = "force"
-
-# TODO: filter out 'FOR UPDATE' statements
+
+# When we're handed literal SQL, ensure it's a SELECT-query. Since
+# 8.3, combining cursors and "FOR UPDATE" has been fine.
SERVER_SIDE_CURSOR_RE = re.compile(
r'\s*SELECT',
re.I | re.UNICODE)
@@ -102,16 +112,22 @@ SERVER_SIDE_CURSOR_RE = re.compile(
class PostgreSQL_psycopg2ExecutionContext(PGExecutionContext):
def create_cursor(self):
# TODO: coverage for server side cursors + select.for_update()
- is_server_side = \
- self.dialect.server_side_cursors and \
- not self.should_autocommit and \
- ((self.compiled and isinstance(self.compiled.statement, expression.Selectable)
- and not getattr(self.compiled.statement, 'for_update', False)) \
- or \
- (
- (not self.compiled or isinstance(self.compiled.statement, expression._TextClause))
- and self.statement and SERVER_SIDE_CURSOR_RE.match(self.statement))
- )
+ stream_results_option = self.statement_options.get('stream_results')
+ is_server_side = (
+ # Enabled for this statement ...
+ (stream_results_option or
+ # ... or enabled for all statements
+ (self.dialect.server_side_cursors and
+ # ... and not explicitly disabled for this one.
+ (stream_results_option or stream_results_option is None))
+ ) and (
+ # But don't use SS-cursors when autocommit is on ...
+ (not self.should_autocommit and
+ self.compiled and isinstance(self.compiled.statement, expression.Selectable))
+ or (
+ # ... or if it's not even a SELECT.
+ (not self.compiled or isinstance(self.compiled.statement, expression._TextClause))
+ and self.statement and SERVER_SIDE_CURSOR_RE.match(self.statement))))
self.__is_server_side = is_server_side
if is_server_side:
diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py
index 1bdeca377..baa92b5d4 100644
--- a/lib/sqlalchemy/engine/default.py
+++ b/lib/sqlalchemy/engine/default.py
@@ -223,6 +223,7 @@ class DefaultDialect(base.Dialect):
class DefaultExecutionContext(base.ExecutionContext):
+ statement_options = util.frozendict()
def __init__(self, dialect, connection, compiled_sql=None, compiled_ddl=None, statement=None, parameters=None):
self.dialect = dialect
@@ -249,7 +250,7 @@ class DefaultExecutionContext(base.ExecutionContext):
if not compiled.can_execute:
raise exc.ArgumentError("Not an executable clause: %s" % compiled)
-
+
self.processors = dict(
(key, value) for key, value in
( (compiled.bind_names[bindparam],
@@ -268,6 +269,7 @@ class DefaultExecutionContext(base.ExecutionContext):
self.isupdate = compiled.isupdate
self.isdelete = compiled.isdelete
self.should_autocommit = compiled.statement._autocommit
+ self.statement_options = compiled.statement._statement_options
if self.should_autocommit is expression.PARSE_AUTOCOMMIT:
self.should_autocommit = self.should_autocommit_text(self.statement)
diff --git a/lib/sqlalchemy/orm/query.py b/lib/sqlalchemy/orm/query.py
index 7be068019..444ad37bc 100644
--- a/lib/sqlalchemy/orm/query.py
+++ b/lib/sqlalchemy/orm/query.py
@@ -79,14 +79,14 @@ class Query(object):
_from_obj = ()
_filter_aliases = None
_from_obj_alias = None
- _joinpath = _joinpoint = {}
+ _joinpath = _joinpoint = util.frozendict()
+ _statement_options = util.frozendict()
+ _params = util.frozendict()
+ _attributes = util.frozendict()
+ _with_options = ()
def __init__(self, entities, session=None):
self.session = session
-
- self._with_options = []
- self._params = {}
- self._attributes = {}
self._polymorphic_adapters = {}
self._set_entities(entities)
@@ -488,9 +488,17 @@ class Query(object):
collections will be cleared for a new load when encountered in a
subsequent result batch.
+ Also note that many DBAPIs do not "stream" results, pre-buffering
+ all rows before making them available, including mysql-python and
+ psycopg2. yield_per() will also set the ``stream_results`` statement
+ option to ``True``, which currently is only understood by psycopg2
+ and causes server side cursors to be used.
+
"""
self._yield_per = count
-
+ self._statement_options = self._statement_options.copy()
+ self._statement_options['stream_results'] = True
+
def get(self, ident):
"""Return an instance of the object based on the given identifier, or None if not found.
@@ -658,7 +666,7 @@ class Query(object):
# most MapperOptions write to the '_attributes' dictionary,
# so copy that as well
self._attributes = self._attributes.copy()
- opts = list(util.flatten_iterator(args))
+ opts = tuple(util.flatten_iterator(args))
self._with_options = self._with_options + opts
if conditional:
for opt in opts:
@@ -668,6 +676,21 @@ class Query(object):
opt.process_query(self)
@_generative()
+ def statement_options(self, **kwargs):
+ """ Set non-SQL options for the resulting statement, such as dialect-specific options.
+
+ The only option currently understood is ``stream_results=True``,
+ only used by Psycopg2 to enable "server side cursors". This option
+ only has a useful effect if used in conjunction with :meth:`~sqlalchemy.orm.query.Query.yield_per()`,
+ which currently sets ``stream_results`` to ``True`` automatically.
+
+ """
+ _statement_options = self._statement_options.copy()
+ for key, value in kwargs.items():
+ _statement_options[key] = value
+ self._statement_options = _statement_options
+
+ @_generative()
def with_lockmode(self, mode):
"""Return a new Query object with the specified locking mode."""
@@ -1915,7 +1938,7 @@ class Query(object):
context.adapter = sql_util.ColumnAdapter(inner, equivs)
- statement = sql.select([inner] + context.secondary_columns, for_update=for_update, use_labels=labels)
+ statement = sql.select([inner] + context.secondary_columns, for_update=for_update, use_labels=labels, statement_options=self._statement_options)
from_clause = inner
for eager_join in eager_joins:
@@ -1947,6 +1970,7 @@ class Query(object):
for_update=for_update,
correlate=False,
order_by=context.order_by,
+ statement_options=self._statement_options,
**self._select_args
)
diff --git a/lib/sqlalchemy/schema.py b/lib/sqlalchemy/schema.py
index 7c9fa58fe..e40b6f592 100644
--- a/lib/sqlalchemy/schema.py
+++ b/lib/sqlalchemy/schema.py
@@ -2027,10 +2027,9 @@ class SchemaVisitor(visitors.ClauseVisitor):
__traverse_options__ = {'schema_visitor':True}
-class DDLElement(expression.ClauseElement):
+class DDLElement(expression._Executable, expression.ClauseElement):
"""Base class for DDL expression constructs."""
- supports_execution = True
_autocommit = True
target = None
diff --git a/lib/sqlalchemy/sql/expression.py b/lib/sqlalchemy/sql/expression.py
index 742746cbe..2dc13ee82 100644
--- a/lib/sqlalchemy/sql/expression.py
+++ b/lib/sqlalchemy/sql/expression.py
@@ -2178,8 +2178,13 @@ class _TypeClause(ClauseElement):
def __init__(self, type):
self.type = type
+class _Executable(object):
+ """Mark a ClauseElement as supporting execution."""
-class _TextClause(ClauseElement):
+ supports_execution = True
+ _statement_options = util.frozendict()
+
+class _TextClause(_Executable, ClauseElement):
"""Represent a literal SQL text fragment.
Public constructor is the :func:`text()` function.
@@ -2189,7 +2194,6 @@ class _TextClause(ClauseElement):
__visit_name__ = 'textclause'
_bind_params_regex = re.compile(r'(?<![:\w\x5c]):(\w+)(?!:)', re.UNICODE)
- supports_execution = True
@property
def _select_iterable(self):
@@ -2198,11 +2202,14 @@ class _TextClause(ClauseElement):
_hide_froms = []
def __init__(self, text = "", bind=None,
- bindparams=None, typemap=None, autocommit=PARSE_AUTOCOMMIT):
+ bindparams=None, typemap=None, autocommit=PARSE_AUTOCOMMIT, statement_options=None):
self._bind = bind
self.bindparams = {}
self.typemap = typemap
self._autocommit = autocommit
+ self._statement_options = statement_options
+ if self._statement_options is None:
+ self._statement_options = {}
if typemap is not None:
for key in typemap.keys():
typemap[key] = sqltypes.to_instance(typemap[key])
@@ -2792,6 +2799,7 @@ class Alias(FromClause):
self.supports_execution = baseselectable.supports_execution
if self.supports_execution:
self._autocommit = baseselectable._autocommit
+ self._statement_options = baseselectable._statement_options
self.element = selectable
if alias is None:
if self.original.named_with_column:
@@ -2845,6 +2853,7 @@ class Alias(FromClause):
def bind(self):
return self.element.bind
+
class _Grouping(ColumnElement):
"""Represent a grouping within a column expression"""
@@ -3159,11 +3168,9 @@ def _generative(fn, *args, **kw):
fn(self, *args[1:], **kw)
return self
-class _SelectBaseMixin(object):
+class _SelectBaseMixin(_Executable):
"""Base class for :class:`Select` and ``CompoundSelects``."""
- supports_execution = True
-
def __init__(self,
use_labels=False,
for_update=False,
@@ -3172,13 +3179,17 @@ class _SelectBaseMixin(object):
order_by=None,
group_by=None,
bind=None,
- autocommit=False):
+ autocommit=False,
+ statement_options=None):
self.use_labels = use_labels
self.for_update = for_update
self._autocommit = autocommit
self._limit = limit
self._offset = offset
self._bind = bind
+ self._statement_options = statement_options
+ if self._statement_options is None:
+ self._statement_options = dict()
self._order_by_clause = ClauseList(*util.to_list(order_by) or [])
self._group_by_clause = ClauseList(*util.to_list(group_by) or [])
@@ -3290,6 +3301,18 @@ class _SelectBaseMixin(object):
def _from_objects(self):
return [self]
+ @_generative
+ def statement_options(self, **kwargs):
+ """ Set non-SQL options for the statement, such as dialect-specific options.
+
+ The options available are covered in the respective dialect's section.
+
+ """
+ _statement_options = self._statement_options.copy()
+ for key, value in kwargs.items():
+ _statement_options[key] = value
+ self._statement_options = _statement_options
+
class _ScalarSelect(_Grouping):
_from_objects = []
@@ -3411,7 +3434,9 @@ class Select(_SelectBaseMixin, FromClause):
"""
__visit_name__ = 'select'
-
+
+ _prefixes = ()
+
def __init__(self,
columns,
whereclause=None,
@@ -3468,9 +3493,7 @@ class Select(_SelectBaseMixin, FromClause):
self._having = None
if prefixes:
- self._prefixes = [_literal_as_text(p) for p in prefixes]
- else:
- self._prefixes = []
+ self._prefixes = tuple([_literal_as_text(p) for p in prefixes])
_SelectBaseMixin.__init__(self, **kwargs)
@@ -3624,7 +3647,7 @@ class Select(_SelectBaseMixin, FromClause):
"""
clause = _literal_as_text(clause)
- self._prefixes = self._prefixes + [clause]
+ self._prefixes = self._prefixes + (clause,)
@_generative
def select_from(self, fromclause):
@@ -3682,7 +3705,7 @@ class Select(_SelectBaseMixin, FromClause):
"""
clause = _literal_as_text(clause)
- self._prefixes = self._prefixes.union([clause])
+ self._prefixes = self._prefixes + (clause,)
def append_whereclause(self, whereclause):
"""append the given expression to this select() construct's WHERE criterion.
@@ -3803,12 +3826,11 @@ class Select(_SelectBaseMixin, FromClause):
self._bind = bind
bind = property(bind, _set_bind)
-class _UpdateBase(ClauseElement):
+class _UpdateBase(_Executable, ClauseElement):
"""Form the base for ``INSERT``, ``UPDATE``, and ``DELETE`` statements."""
__visit_name__ = 'update_base'
- supports_execution = True
_autocommit = True
def _generate(self):
@@ -3924,7 +3946,9 @@ class Insert(_ValuesBase):
"""
__visit_name__ = 'insert'
-
+
+ _prefixes = ()
+
def __init__(self,
table,
values=None,
@@ -3939,9 +3963,7 @@ class Insert(_ValuesBase):
self.inline = inline
self._returning = returning
if prefixes:
- self._prefixes = [_literal_as_text(p) for p in prefixes]
- else:
- self._prefixes = []
+ self._prefixes = tuple([_literal_as_text(p) for p in prefixes])
self.kwargs = self._process_deprecated_kw(kwargs)
@@ -3964,7 +3986,7 @@ class Insert(_ValuesBase):
"""
clause = _literal_as_text(clause)
- self._prefixes = self._prefixes + [clause]
+ self._prefixes = self._prefixes + (clause,)
class Update(_ValuesBase):
"""Represent an Update construct.
@@ -4061,9 +4083,8 @@ class Delete(_UpdateBase):
# TODO: coverage
self._whereclause = clone(self._whereclause)
-class _IdentifiedClause(ClauseElement):
+class _IdentifiedClause(_Executable, ClauseElement):
__visit_name__ = 'identified'
- supports_execution = True
_autocommit = False
quote = None
diff --git a/lib/sqlalchemy/util.py b/lib/sqlalchemy/util.py
index c3ae25589..bd988dd20 100644
--- a/lib/sqlalchemy/util.py
+++ b/lib/sqlalchemy/util.py
@@ -138,7 +138,25 @@ except ImportError:
return 'defaultdict(%s, %s)' % (self.default_factory,
dict.__repr__(self))
-
+class frozendict(dict):
+ def _blocked_attribute(obj):
+ raise AttributeError, "A frozendict cannot be modified."
+ _blocked_attribute = property(_blocked_attribute)
+
+ __delitem__ = __setitem__ = clear = _blocked_attribute
+ pop = popitem = setdefault = update = _blocked_attribute
+
+ def __new__(cls, *args):
+ new = dict.__new__(cls)
+ dict.__init__(new, *args)
+ return new
+
+ def __init__(self, *args):
+ pass
+
+ def __repr__(self):
+ return "frozendict(%s)" % dict.__repr__(self)
+
def to_list(x, default=None):
if x is None:
return default