summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/sql/base.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/sql/base.py')
-rw-r--r--lib/sqlalchemy/sql/base.py114
1 files changed, 66 insertions, 48 deletions
diff --git a/lib/sqlalchemy/sql/base.py b/lib/sqlalchemy/sql/base.py
index 6b9b55753..45db215fe 100644
--- a/lib/sqlalchemy/sql/base.py
+++ b/lib/sqlalchemy/sql/base.py
@@ -15,8 +15,8 @@ import itertools
from .visitors import ClauseVisitor
import re
-PARSE_AUTOCOMMIT = util.symbol('PARSE_AUTOCOMMIT')
-NO_ARG = util.symbol('NO_ARG')
+PARSE_AUTOCOMMIT = util.symbol("PARSE_AUTOCOMMIT")
+NO_ARG = util.symbol("NO_ARG")
class Immutable(object):
@@ -77,7 +77,8 @@ class _DialectArgView(util.collections_abc.MutableMapping):
dialect, value_key = self._key(key)
except KeyError:
raise exc.ArgumentError(
- "Keys must be of the form <dialectname>_<argname>")
+ "Keys must be of the form <dialectname>_<argname>"
+ )
else:
self.obj.dialect_options[dialect][value_key] = value
@@ -86,15 +87,18 @@ class _DialectArgView(util.collections_abc.MutableMapping):
del self.obj.dialect_options[dialect][value_key]
def __len__(self):
- return sum(len(args._non_defaults) for args in
- self.obj.dialect_options.values())
+ return sum(
+ len(args._non_defaults)
+ for args in self.obj.dialect_options.values()
+ )
def __iter__(self):
return (
util.safe_kwarg("%s_%s" % (dialect_name, value_name))
for dialect_name in self.obj.dialect_options
- for value_name in
- self.obj.dialect_options[dialect_name]._non_defaults
+ for value_name in self.obj.dialect_options[
+ dialect_name
+ ]._non_defaults
)
@@ -187,8 +191,8 @@ class DialectKWArgs(object):
if construct_arg_dictionary is None:
raise exc.ArgumentError(
"Dialect '%s' does have keyword-argument "
- "validation and defaults enabled configured" %
- dialect_name)
+ "validation and defaults enabled configured" % dialect_name
+ )
if cls not in construct_arg_dictionary:
construct_arg_dictionary[cls] = {}
construct_arg_dictionary[cls][argument_name] = default
@@ -230,6 +234,7 @@ class DialectKWArgs(object):
if dialect_cls.construct_arguments is None:
return None
return dict(dialect_cls.construct_arguments)
+
_kw_registry = util.PopulateDict(_kw_reg_for_dialect)
def _kw_reg_for_dialect_cls(self, dialect_name):
@@ -274,11 +279,12 @@ class DialectKWArgs(object):
return
for k in kwargs:
- m = re.match('^(.+?)_(.+)$', k)
+ m = re.match("^(.+?)_(.+)$", k)
if not m:
raise TypeError(
"Additional arguments should be "
- "named <dialectname>_<argument>, got '%s'" % k)
+ "named <dialectname>_<argument>, got '%s'" % k
+ )
dialect_name, arg_name = m.group(1, 2)
try:
@@ -286,20 +292,22 @@ class DialectKWArgs(object):
except exc.NoSuchModuleError:
util.warn(
"Can't validate argument %r; can't "
- "locate any SQLAlchemy dialect named %r" %
- (k, dialect_name))
+ "locate any SQLAlchemy dialect named %r"
+ % (k, dialect_name)
+ )
self.dialect_options[dialect_name] = d = _DialectArgDict()
d._defaults.update({"*": None})
d._non_defaults[arg_name] = kwargs[k]
else:
- if "*" not in construct_arg_dictionary and \
- arg_name not in construct_arg_dictionary:
+ if (
+ "*" not in construct_arg_dictionary
+ and arg_name not in construct_arg_dictionary
+ ):
raise exc.ArgumentError(
"Argument %r is not accepted by "
- "dialect %r on behalf of %r" % (
- k,
- dialect_name, self.__class__
- ))
+ "dialect %r on behalf of %r"
+ % (k, dialect_name, self.__class__)
+ )
else:
construct_arg_dictionary[arg_name] = kwargs[k]
@@ -359,14 +367,14 @@ class Executable(Generative):
:meth:`.Query.execution_options()`
"""
- if 'isolation_level' in kw:
+ if "isolation_level" in kw:
raise exc.ArgumentError(
"'isolation_level' execution option may only be specified "
"on Connection.execution_options(), or "
"per-engine using the isolation_level "
"argument to create_engine()."
)
- if 'compiled_cache' in kw:
+ if "compiled_cache" in kw:
raise exc.ArgumentError(
"'compiled_cache' execution option may only be specified "
"on Connection.execution_options(), not per statement."
@@ -377,10 +385,12 @@ class Executable(Generative):
"""Compile and execute this :class:`.Executable`."""
e = self.bind
if e is None:
- label = getattr(self, 'description', self.__class__.__name__)
- msg = ('This %s is not directly bound to a Connection or Engine. '
- 'Use the .execute() method of a Connection or Engine '
- 'to execute this construct.' % label)
+ label = getattr(self, "description", self.__class__.__name__)
+ msg = (
+ "This %s is not directly bound to a Connection or Engine. "
+ "Use the .execute() method of a Connection or Engine "
+ "to execute this construct." % label
+ )
raise exc.UnboundExecutionError(msg)
return e._execute_clauseelement(self, multiparams, params)
@@ -434,7 +444,7 @@ class SchemaEventTarget(object):
class SchemaVisitor(ClauseVisitor):
"""Define the visiting for ``SchemaItem`` objects."""
- __traverse_options__ = {'schema_visitor': True}
+ __traverse_options__ = {"schema_visitor": True}
class ColumnCollection(util.OrderedProperties):
@@ -446,11 +456,11 @@ class ColumnCollection(util.OrderedProperties):
"""
- __slots__ = '_all_columns'
+ __slots__ = "_all_columns"
def __init__(self, *columns):
super(ColumnCollection, self).__init__()
- object.__setattr__(self, '_all_columns', [])
+ object.__setattr__(self, "_all_columns", [])
for c in columns:
self.add(c)
@@ -485,8 +495,9 @@ class ColumnCollection(util.OrderedProperties):
self._data[column.key] = column
if remove_col is not None:
- self._all_columns[:] = [column if c is remove_col
- else c for c in self._all_columns]
+ self._all_columns[:] = [
+ column if c is remove_col else c for c in self._all_columns
+ ]
else:
self._all_columns.append(column)
@@ -499,7 +510,8 @@ class ColumnCollection(util.OrderedProperties):
"""
if not column.key:
raise exc.ArgumentError(
- "Can't add unnamed column to column collection")
+ "Can't add unnamed column to column collection"
+ )
self[column.key] = column
def __delitem__(self, key):
@@ -521,10 +533,12 @@ class ColumnCollection(util.OrderedProperties):
return
if not existing.shares_lineage(value):
- util.warn('Column %r on table %r being replaced by '
- '%r, which has the same key. Consider '
- 'use_labels for select() statements.' %
- (key, getattr(existing, 'table', None), value))
+ util.warn(
+ "Column %r on table %r being replaced by "
+ "%r, which has the same key. Consider "
+ "use_labels for select() statements."
+ % (key, getattr(existing, "table", None), value)
+ )
# pop out memoized proxy_set as this
# operation may very well be occurring
@@ -540,13 +554,15 @@ class ColumnCollection(util.OrderedProperties):
def remove(self, column):
del self._data[column.key]
self._all_columns[:] = [
- c for c in self._all_columns if c is not column]
+ c for c in self._all_columns if c is not column
+ ]
def update(self, iter):
cols = list(iter)
all_col_set = set(self._all_columns)
self._all_columns.extend(
- c for label, c in cols if c not in all_col_set)
+ c for label, c in cols if c not in all_col_set
+ )
self._data.update((label, c) for label, c in cols)
def extend(self, iter):
@@ -572,12 +588,11 @@ class ColumnCollection(util.OrderedProperties):
return util.OrderedProperties.__contains__(self, other)
def __getstate__(self):
- return {'_data': self._data,
- '_all_columns': self._all_columns}
+ return {"_data": self._data, "_all_columns": self._all_columns}
def __setstate__(self, state):
- object.__setattr__(self, '_data', state['_data'])
- object.__setattr__(self, '_all_columns', state['_all_columns'])
+ object.__setattr__(self, "_data", state["_data"])
+ object.__setattr__(self, "_all_columns", state["_all_columns"])
def contains_column(self, col):
return col in set(self._all_columns)
@@ -589,7 +604,7 @@ class ColumnCollection(util.OrderedProperties):
class ImmutableColumnCollection(util.ImmutableProperties, ColumnCollection):
def __init__(self, data, all_columns):
util.ImmutableProperties.__init__(self, data)
- object.__setattr__(self, '_all_columns', all_columns)
+ object.__setattr__(self, "_all_columns", all_columns)
extend = remove = util.ImmutableProperties._immutable
@@ -622,15 +637,18 @@ def _bind_or_error(schemaitem, msg=None):
bind = schemaitem.bind
if not bind:
name = schemaitem.__class__.__name__
- label = getattr(schemaitem, 'fullname',
- getattr(schemaitem, 'name', None))
+ label = getattr(
+ schemaitem, "fullname", getattr(schemaitem, "name", None)
+ )
if label:
- item = '%s object %r' % (name, label)
+ item = "%s object %r" % (name, label)
else:
- item = '%s object' % name
+ item = "%s object" % name
if msg is None:
- msg = "%s is not bound to an Engine or Connection. "\
- "Execution can not proceed without a database to execute "\
+ msg = (
+ "%s is not bound to an Engine or Connection. "
+ "Execution can not proceed without a database to execute "
"against." % item
+ )
raise exc.UnboundExecutionError(msg)
return bind