summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--test/base/test_dependency.py197
-rw-r--r--test/base/test_except.py122
-rw-r--r--test/base/test_utils.py63
-rw-r--r--test/ex/test_examples.py33
-rw-r--r--test/ext/test_associationproxy.py263
-rw-r--r--test/ext/test_declarative.py2328
-rw-r--r--test/ext/test_horizontal_shard.py91
-rw-r--r--test/ext/test_orderinglist.py15
-rw-r--r--test/ext/test_serializer.py169
-rw-r--r--test/ext/test_sqlsoup.py378
10 files changed, 1937 insertions, 1722 deletions
diff --git a/test/base/test_dependency.py b/test/base/test_dependency.py
index c24c3e5e8..aa4410576 100644
--- a/test/base/test_dependency.py
+++ b/test/base/test_dependency.py
@@ -3,20 +3,18 @@ from sqlalchemy.test import TestBase
from sqlalchemy.test.testing import assert_raises, eq_
from sqlalchemy import exc, util
+
class DependencySortTest(TestBase):
+
def assert_sort(self, tuples, allitems=None):
-
if allitems is None:
allitems = self._nodes_from_tuples(tuples)
else:
allitems = self._nodes_from_tuples(tuples).union(allitems)
-
result = list(topological.sort(tuples, allitems))
-
deps = util.defaultdict(set)
for parent, child in tuples:
deps[parent].add(child)
-
assert len(result)
for i, node in enumerate(result):
for n in result[i:]:
@@ -27,7 +25,7 @@ class DependencySortTest(TestBase):
for tup in tups:
s.update(tup)
return s
-
+
def test_sort_one(self):
rootnode = 'root'
node2 = 'node2'
@@ -46,8 +44,8 @@ class DependencySortTest(TestBase):
(rootnode, node3),
(rootnode, node4),
(node4, subnode3),
- (node4, subnode4)
- ]
+ (node4, subnode4),
+ ]
self.assert_sort(tuples)
def test_sort_two(self):
@@ -58,13 +56,8 @@ class DependencySortTest(TestBase):
node5 = 'node5'
node6 = 'node6'
node7 = 'node7'
- tuples = [
- (node1, node2),
- (node3, node4),
- (node4, node5),
- (node5, node6),
- (node6, node2)
- ]
+ tuples = [(node1, node2), (node3, node4), (node4, node5),
+ (node5, node6), (node6, node2)]
self.assert_sort(tuples, [node7])
def test_sort_three(self):
@@ -72,12 +65,8 @@ class DependencySortTest(TestBase):
node2 = 'itemkeyowrds'
node3 = 'items'
node4 = 'hoho'
- tuples = [
- (node1, node2),
- (node4, node1),
- (node1, node3),
- (node3, node2)
- ]
+ tuples = [(node1, node2), (node4, node1), (node1, node3),
+ (node3, node2)]
self.assert_sort(tuples)
def test_raise_on_cycle_one(self):
@@ -92,70 +81,71 @@ class DependencySortTest(TestBase):
(node1, node2),
(node2, node3),
(node3, node1),
- (node4, node1)
- ]
+ (node4, node1),
+ ]
allitems = self._nodes_from_tuples(tuples)
- assert_raises(exc.CircularDependencyError, list, topological.sort(tuples, allitems))
+ assert_raises(exc.CircularDependencyError, list,
+ topological.sort(tuples, allitems))
# TODO: test find_cycles
def test_raise_on_cycle_two(self):
- # this condition was arising from ticket:362
- # and was not treated properly by topological sort
+
+ # this condition was arising from ticket:362 and was not treated
+ # properly by topological sort
+
node1 = 'node1'
node2 = 'node2'
node3 = 'node3'
node4 = 'node4'
- tuples = [
- (node1, node2),
- (node3, node1),
- (node2, node4),
- (node3, node2),
- (node2, node3)
- ]
+ tuples = [(node1, node2), (node3, node1), (node2, node4),
+ (node3, node2), (node2, node3)]
allitems = self._nodes_from_tuples(tuples)
- assert_raises(exc.CircularDependencyError, list, topological.sort(tuples, allitems))
+ assert_raises(exc.CircularDependencyError, list,
+ topological.sort(tuples, allitems))
# TODO: test find_cycles
def test_raise_on_cycle_three(self):
- question, issue, providerservice, answer, provider = "Question", "Issue", "ProviderService", "Answer", "Provider"
-
- tuples = [(question, issue), (providerservice, issue), (provider, question),
- (question, provider), (providerservice, question),
- (provider, providerservice), (question, answer), (issue, question)]
-
+ question, issue, providerservice, answer, provider = \
+ 'Question', 'Issue', 'ProviderService', 'Answer', 'Provider'
+ tuples = [
+ (question, issue),
+ (providerservice, issue),
+ (provider, question),
+ (question, provider),
+ (providerservice, question),
+ (provider, providerservice),
+ (question, answer),
+ (issue, question),
+ ]
allitems = self._nodes_from_tuples(tuples)
- assert_raises(exc.CircularDependencyError, list, topological.sort(tuples, allitems))
-
+ assert_raises(exc.CircularDependencyError, list,
+ topological.sort(tuples, allitems))
+
# TODO: test find_cycles
-
+
def test_large_sort(self):
tuples = [(i, i + 1) for i in range(0, 1500, 2)]
self.assert_sort(tuples)
def test_ticket_1380(self):
+
# ticket:1380 regression: would raise a KeyError
+
tuples = [(id(i), i) for i in range(3)]
self.assert_sort(tuples)
-
+
def test_find_cycle_one(self):
node1 = 'node1'
node2 = 'node2'
node3 = 'node3'
node4 = 'node4'
- tuples = [
- (node1, node2),
- (node3, node1),
- (node2, node4),
- (node3, node2),
- (node2, node3)
- ]
-
- eq_(
- topological.find_cycles(tuples, self._nodes_from_tuples(tuples)),
- set([node1, node2, node3])
- )
+ tuples = [(node1, node2), (node3, node1), (node2, node4),
+ (node3, node2), (node2, node3)]
+ eq_(topological.find_cycles(tuples,
+ self._nodes_from_tuples(tuples)), set([node1, node2,
+ node3]))
def test_find_multiple_cycles_one(self):
node1 = 'node1'
@@ -167,33 +157,41 @@ class DependencySortTest(TestBase):
node7 = 'node7'
node8 = 'node8'
node9 = 'node9'
- tuples = [
- # cycle 1
+ tuples = [ # cycle 1 cycle 2 cycle 3 cycle 4, but only if cycle
+ # 1 nodes are present
(node1, node2),
(node2, node4),
(node4, node1),
-
- # cycle 2
(node9, node9),
-
- # cycle 3
(node7, node5),
(node5, node7),
-
- # cycle 4, but only if cycle 1 nodes are present
(node1, node6),
(node6, node8),
(node8, node4),
-
(node3, node1),
(node3, node2),
- ]
-
- allnodes = set([node1, node2, node3, node4, node5, node6, node7, node8, node9])
- eq_(
- topological.find_cycles(tuples, allnodes),
- set(['node8', 'node1', 'node2', 'node5', 'node4', 'node7', 'node6', 'node9'])
- )
+ ]
+ allnodes = set([
+ node1,
+ node2,
+ node3,
+ node4,
+ node5,
+ node6,
+ node7,
+ node8,
+ node9,
+ ])
+ eq_(topological.find_cycles(tuples, allnodes), set([
+ 'node8',
+ 'node1',
+ 'node2',
+ 'node5',
+ 'node4',
+ 'node7',
+ 'node6',
+ 'node9',
+ ]))
def test_find_multiple_cycles_two(self):
node1 = 'node1'
@@ -202,24 +200,25 @@ class DependencySortTest(TestBase):
node4 = 'node4'
node5 = 'node5'
node6 = 'node6'
- tuples = [
- # cycle 1
+ tuples = [ # cycle 1 cycle 2
(node1, node2),
(node2, node4),
(node4, node1),
-
- # cycle 2
(node1, node6),
(node6, node2),
(node2, node4),
(node4, node1),
- ]
-
- allnodes = set([node1, node2, node3, node4, node5, node6])
- eq_(
- topological.find_cycles(tuples, allnodes),
- set(['node1', 'node2', 'node4'])
- )
+ ]
+ allnodes = set([
+ node1,
+ node2,
+ node3,
+ node4,
+ node5,
+ node6,
+ ])
+ eq_(topological.find_cycles(tuples, allnodes), set(['node1',
+ 'node2', 'node4']))
def test_find_multiple_cycles_three(self):
node1 = 'node1'
@@ -228,33 +227,23 @@ class DependencySortTest(TestBase):
node4 = 'node4'
node5 = 'node5'
node6 = 'node6'
- tuples = [
-
- # cycle 1
+ tuples = [ # cycle 1 cycle 2 cycle3 cycle4
(node1, node2),
(node2, node1),
-
- # cycle 2
(node2, node3),
(node3, node2),
-
- # cycle3
(node2, node4),
(node4, node2),
-
- # cycle4
(node2, node5),
(node5, node6),
- (node6, node2)
- ]
-
- allnodes = set([node1, node2, node3, node4, node5, node6])
- eq_(
- topological.find_cycles(tuples, allnodes),
- allnodes
- )
-
-
-
-
-
+ (node6, node2),
+ ]
+ allnodes = set([
+ node1,
+ node2,
+ node3,
+ node4,
+ node5,
+ node6,
+ ])
+ eq_(topological.find_cycles(tuples, allnodes), allnodes)
diff --git a/test/base/test_except.py b/test/base/test_except.py
index fbe0a05de..09a9eb7b4 100644
--- a/test/base/test_except.py
+++ b/test/base/test_except.py
@@ -1,92 +1,125 @@
"""Tests exceptions and DB-API exception wrapping."""
+
+
from sqlalchemy import exc as sa_exceptions
from sqlalchemy.test import TestBase
-# Py3K
-#StandardError = BaseException
-# Py2K
+# Py3K StandardError = BaseException Py2K
+
from exceptions import StandardError, KeyboardInterrupt, SystemExit
+
# end Py2K
+
class Error(StandardError):
- """This class will be old-style on <= 2.4 and new-style on >= 2.5."""
+ """This class will be old-style on <= 2.4 and new-style on >=
+ 2.5."""
+
+
class DatabaseError(Error):
pass
+
+
class OperationalError(DatabaseError):
pass
+
+
class ProgrammingError(DatabaseError):
def __str__(self):
- return "<%s>" % self.bogus
+ return '<%s>' % self.bogus
+
+
class OutOfSpec(DatabaseError):
pass
class WrapTest(TestBase):
+
def test_db_error_normal(self):
try:
- raise sa_exceptions.DBAPIError.instance(
- '', [], OperationalError())
+ raise sa_exceptions.DBAPIError.instance('', [],
+ OperationalError())
except sa_exceptions.DBAPIError:
self.assert_(True)
-
+
def test_tostring(self):
try:
- raise sa_exceptions.DBAPIError.instance(
- 'this is a message', None, OperationalError())
+ raise sa_exceptions.DBAPIError.instance('this is a message'
+ , None, OperationalError())
except sa_exceptions.DBAPIError, exc:
- assert str(exc) == "(OperationalError) 'this is a message' None"
+ assert str(exc) \
+ == "(OperationalError) 'this is a message' None"
def test_tostring_large_dict(self):
try:
- raise sa_exceptions.DBAPIError.instance(
- 'this is a message', {'a':1, 'b':2, 'c':3, 'd':4, 'e':5, 'f':6, 'g':7, 'h':8, 'i':9, 'j':10, 'k':11}, OperationalError())
+ raise sa_exceptions.DBAPIError.instance('this is a message'
+ ,
+ {'a': 1, 'b': 2, 'c': 3, 'd': 4, 'e': 5, 'f': 6, 'g': 7, 'h':
+ 8, 'i': 9, 'j': 10, 'k': 11,
+ }, OperationalError())
except sa_exceptions.DBAPIError, exc:
- assert str(exc).startswith("(OperationalError) 'this is a message' {")
+ assert str(exc).startswith("(OperationalError) 'this is a "
+ "message' {")
def test_tostring_large_list(self):
try:
- raise sa_exceptions.DBAPIError.instance(
- 'this is a message', [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11], OperationalError())
+ raise sa_exceptions.DBAPIError.instance('this is a message',
+ [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11,], OperationalError())
except sa_exceptions.DBAPIError, exc:
- assert str(exc).startswith("(OperationalError) 'this is a message' [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]")
+ assert str(exc).startswith("(OperationalError) 'this is a "
+ "message' [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]")
def test_tostring_large_executemany(self):
try:
- raise sa_exceptions.DBAPIError.instance(
- 'this is a message', [{1:1},{1:1},{1:1},{1:1},{1:1},{1:1},{1:1},{1:1},{1:1},{1:1},], OperationalError())
+ raise sa_exceptions.DBAPIError.instance('this is a message',
+ [{1: 1}, {1: 1}, {1: 1}, {1: 1}, {1: 1}, {1: 1},
+ {1: 1}, {1:1}, {1: 1}, {1: 1},],
+ OperationalError())
except sa_exceptions.DBAPIError, exc:
- assert str(exc) == "(OperationalError) 'this is a message' [{1: 1}, {1: 1}, {1: 1}, {1: 1}, {1: 1}, {1: 1}, {1: 1}, {1: 1}, {1: 1}, {1: 1}]", str(exc)
-
+ assert str(exc) \
+ == "(OperationalError) 'this is a message' [{1: 1}, "\
+ "{1: 1}, {1: 1}, {1: 1}, {1: 1}, {1: 1}, {1: 1}, {1: "\
+ "1}, {1: 1}, {1: 1}]", str(exc)
try:
- raise sa_exceptions.DBAPIError.instance(
- 'this is a message', [{1:1},{1:1},{1:1},{1:1},{1:1},{1:1},{1:1},{1:1},{1:1},{1:1},{1:1},], OperationalError())
+ raise sa_exceptions.DBAPIError.instance('this is a message', [
+ {1: 1}, {1: 1}, {1: 1}, {1: 1}, {1: 1}, {1: 1}, {1: 1},
+ {1:1}, {1: 1}, {1: 1}, {1: 1},
+ ], OperationalError())
except sa_exceptions.DBAPIError, exc:
- assert str(exc) == "(OperationalError) 'this is a message' [{1: 1}, {1: 1}] ... and a total of 11 bound parameter sets"
-
+ assert str(exc) \
+ == "(OperationalError) 'this is a message' [{1: 1}, "\
+ "{1: 1}] ... and a total of 11 bound parameter sets"
try:
- raise sa_exceptions.DBAPIError.instance(
- 'this is a message', [(1,), (1,), (1,), (1,), (1,), (1,), (1,), (1,), (1,), (1,)], OperationalError())
+ raise sa_exceptions.DBAPIError.instance('this is a message',
+ [
+ (1, ), (1, ), (1, ), (1, ), (1, ), (1, ), (1, ), (1, ), (1, ),
+ (1, ),
+ ], OperationalError())
except sa_exceptions.DBAPIError, exc:
- assert str(exc) == "(OperationalError) 'this is a message' [(1,), (1,), (1,), (1,), (1,), (1,), (1,), (1,), (1,), (1,)]"
-
+ assert str(exc) \
+ == "(OperationalError) 'this is a message' [(1,), "\
+ "(1,), (1,), (1,), (1,), (1,), (1,), (1,), (1,), (1,)]"
try:
- raise sa_exceptions.DBAPIError.instance(
- 'this is a message', [(1,), (1,), (1,), (1,), (1,), (1,), (1,), (1,), (1,), (1,), (1,), ], OperationalError())
+ raise sa_exceptions.DBAPIError.instance('this is a message', [
+ (1, ), (1, ), (1, ), (1, ), (1, ), (1, ), (1, ), (1, ), (1, ),
+ (1, ), (1, ),
+ ], OperationalError())
except sa_exceptions.DBAPIError, exc:
- assert str(exc) == "(OperationalError) 'this is a message' [(1,), (1,)] ... and a total of 11 bound parameter sets"
-
+ assert str(exc) \
+ == "(OperationalError) 'this is a message' [(1,), "\
+ "(1,)] ... and a total of 11 bound parameter sets"
+
def test_db_error_busted_dbapi(self):
try:
- raise sa_exceptions.DBAPIError.instance(
- '', [], ProgrammingError())
+ raise sa_exceptions.DBAPIError.instance('', [],
+ ProgrammingError())
except sa_exceptions.DBAPIError, e:
self.assert_(True)
self.assert_('Error in str() of DB-API' in e.args[0])
def test_db_error_noncompliant_dbapi(self):
try:
- raise sa_exceptions.DBAPIError.instance(
- '', [], OutOfSpec())
+ raise sa_exceptions.DBAPIError.instance('', [], OutOfSpec())
except sa_exceptions.DBAPIError, e:
self.assert_(e.__class__ is sa_exceptions.DBAPIError)
except OutOfSpec:
@@ -94,9 +127,10 @@ class WrapTest(TestBase):
# Make sure the DatabaseError recognition logic is limited to
# subclasses of sqlalchemy.exceptions.DBAPIError
+
try:
- raise sa_exceptions.DBAPIError.instance(
- '', [], sa_exceptions.ArgumentError())
+ raise sa_exceptions.DBAPIError.instance('', [],
+ sa_exceptions.ArgumentError())
except sa_exceptions.DBAPIError, e:
self.assert_(e.__class__ is sa_exceptions.DBAPIError)
except sa_exceptions.ArgumentError:
@@ -104,8 +138,8 @@ class WrapTest(TestBase):
def test_db_error_keyboard_interrupt(self):
try:
- raise sa_exceptions.DBAPIError.instance(
- '', [], KeyboardInterrupt())
+ raise sa_exceptions.DBAPIError.instance('', [],
+ KeyboardInterrupt())
except sa_exceptions.DBAPIError:
self.assert_(False)
except KeyboardInterrupt:
@@ -113,11 +147,9 @@ class WrapTest(TestBase):
def test_db_error_system_exit(self):
try:
- raise sa_exceptions.DBAPIError.instance(
- '', [], SystemExit())
+ raise sa_exceptions.DBAPIError.instance('', [],
+ SystemExit())
except sa_exceptions.DBAPIError:
self.assert_(False)
except SystemExit:
self.assert_(True)
-
-
diff --git a/test/base/test_utils.py b/test/base/test_utils.py
index 68ccc6ba2..d083a8458 100644
--- a/test/base/test_utils.py
+++ b/test/base/test_utils.py
@@ -4,6 +4,8 @@ from sqlalchemy import util, sql, exc
from sqlalchemy.test import TestBase
from sqlalchemy.test.testing import eq_, is_, ne_
from sqlalchemy.test.util import gc_collect, picklers
+from sqlalchemy.util import classproperty
+
class OrderedDictTest(TestBase):
def test_odict(self):
@@ -53,7 +55,8 @@ class OrderedDictTest(TestBase):
eq_(o.values(), [1, 2, 3, 4, 5, 6])
def test_odict_constructor(self):
- o = util.OrderedDict([('name', 'jbe'), ('fullname', 'jonathan'), ('password', '')])
+ o = util.OrderedDict([('name', 'jbe'), ('fullname', 'jonathan'
+ ), ('password', '')])
eq_(o.keys(), ['name', 'fullname', 'password'])
def test_odict_copy(self):
@@ -196,19 +199,23 @@ class ImmutableSubclass(str):
pass
class FlattenIteratorTest(TestBase):
+
def test_flatten(self):
- assert list(util.flatten_iterator([[1,2,3], [4,5,6], 7, 8])) == [1,2,3,4,5,6,7,8]
+ assert list(util.flatten_iterator([[1, 2, 3], [4, 5, 6], 7,
+ 8])) == [1, 2, 3, 4, 5, 6, 7, 8]
def test_str_with_iter(self):
- """ensure that a str object with an __iter__ method (like in PyPy) is not interpreted
- as an iterable.
+ """ensure that a str object with an __iter__ method (like in
+ PyPy) is not interpreted as an iterable.
"""
class IterString(str):
def __iter__(self):
- return iter(self + "")
+ return iter(self + '')
- assert list(util.flatten_iterator([IterString("asdf"), [IterString("x"), IterString("y")]])) == ["asdf", "x", "y"]
+ assert list(util.flatten_iterator([IterString('asdf'),
+ [IterString('x'), IterString('y')]])) == ['asdf',
+ 'x', 'y']
class HashOverride(object):
def __init__(self, value=None):
@@ -230,6 +237,7 @@ class EqOverride(object):
return self.value != other.value
else:
return True
+
class HashEqOverride(object):
def __init__(self, value=None):
self.value = value
@@ -381,7 +389,8 @@ class OrderedIdentitySetTest(TestBase):
elem = object
eq_ = self.assert_eq
- a, b, c, d, e, f, g = elem(), elem(), elem(), elem(), elem(), elem(), elem()
+ a, b, c, d, e, f, g = \
+ elem(), elem(), elem(), elem(), elem(), elem(), elem()
s1 = util.OrderedIdentitySet([a, b, c])
s2 = util.OrderedIdentitySet([d, e, f])
@@ -977,13 +986,11 @@ class AsInterfaceTest(TestBase):
def test_dict(self):
obj = {}
-
assert_raises(TypeError, util.as_interface, obj,
- cls=self.Something)
- assert_raises(TypeError, util.as_interface, obj,
- methods=('foo'))
+ cls=self.Something)
+ assert_raises(TypeError, util.as_interface, obj, methods='foo')
assert_raises(TypeError, util.as_interface, obj,
- cls=self.Something, required=('foo'))
+ cls=self.Something, required='foo')
def assertAdapted(obj, *methods):
assert isinstance(obj, type)
@@ -994,34 +1001,29 @@ class AsInterfaceTest(TestBase):
assert not found
fn = lambda self: 123
-
obj = {'foo': fn, 'bar': fn}
-
res = util.as_interface(obj, cls=self.Something)
assertAdapted(res, 'foo', 'bar')
-
- res = util.as_interface(obj, cls=self.Something, required=self.Something)
+ res = util.as_interface(obj, cls=self.Something,
+ required=self.Something)
assertAdapted(res, 'foo', 'bar')
-
- res = util.as_interface(obj, cls=self.Something, required=('foo',))
+ res = util.as_interface(obj, cls=self.Something, required=('foo'
+ , ))
assertAdapted(res, 'foo', 'bar')
-
res = util.as_interface(obj, methods=('foo', 'bar'))
assertAdapted(res, 'foo', 'bar')
-
res = util.as_interface(obj, methods=('foo', 'bar', 'baz'))
assertAdapted(res, 'foo', 'bar')
-
- res = util.as_interface(obj, methods=('foo', 'bar'), required=('foo',))
+ res = util.as_interface(obj, methods=('foo', 'bar'),
+ required=('foo', ))
assertAdapted(res, 'foo', 'bar')
-
- assert_raises(TypeError, util.as_interface, obj, methods=('foo',))
-
- assert_raises(TypeError, util.as_interface, obj,
- methods=('foo', 'bar', 'baz'), required=('baz',))
-
+ assert_raises(TypeError, util.as_interface, obj, methods=('foo'
+ , ))
+ assert_raises(TypeError, util.as_interface, obj, methods=('foo'
+ , 'bar', 'baz'), required=('baz', ))
obj = {'foo': 123}
- assert_raises(TypeError, util.as_interface, obj, cls=self.Something)
+ assert_raises(TypeError, util.as_interface, obj,
+ cls=self.Something)
class TestClassHierarchy(TestBase):
@@ -1064,9 +1066,6 @@ class TestClassHierarchy(TestBase):
class TestClassProperty(TestBase):
def test_simple(self):
-
- from sqlalchemy.util import classproperty
-
class A(object):
something = {'foo':1}
diff --git a/test/ex/test_examples.py b/test/ex/test_examples.py
index e7ae33cc8..9c68f1825 100644
--- a/test/ex/test_examples.py
+++ b/test/ex/test_examples.py
@@ -1,5 +1,6 @@
from sqlalchemy.test import *
-import os, re
+import os
+import re
def find_py_files(dirs):
@@ -11,41 +12,47 @@ def find_py_files(dirs):
dirs.remove(r)
except ValueError:
pass
-
pyfiles = [fn for fn in files if fn.endswith('.py')]
if not pyfiles:
continue
# Find the root of the packages.
+
packroot = root
while 1:
- if not os.path.exists(os.path.join(packroot, '__init__.py')):
+ if not os.path.exists(os.path.join(packroot,
+ '__init__.py')):
break
packroot = os.path.dirname(packroot)
-
for fn in pyfiles:
- yield os.path.join(root[len(packroot)+1:], fn)
+ yield os.path.join(root[len(packroot) + 1:], fn)
+
def filename_to_module_name(fn):
if os.path.basename(fn) == '__init__.py':
fn = os.path.dirname(fn)
return re.sub('\.py$', '', fn.replace(os.sep, '.'))
+
def find_modules(*args):
- for fn in find_py_files(args or ('./examples',)):
+ for fn in find_py_files(args or ('./examples', )):
yield filename_to_module_name(fn)
+
def check_import(module):
__import__(module)
class ExamplesTest(TestBase):
- # TODO: ensure examples are actually run regardless of
- # check for "__main__", perhaps standardizing the format of all examples.
- # ensure that examples with external dependencies are not run if those dependencies are
- # not present (i.e. elementtree, postgis)
+
+ # TODO: ensure examples are actually run regardless of check for
+ # "__main__", perhaps standardizing the format of all examples.
+ # ensure that examples with external dependencies are not run if
+ # those dependencies are not present (i.e. elementtree, postgis)
+
def test_examples(self):
pass
- #for module in find_modules():
- # check_import.description = module
- # yield check_import, module
+
+
+ # for module in find_modules(): check_import.description =
+ # module yield check_import, module
diff --git a/test/ext/test_associationproxy.py b/test/ext/test_associationproxy.py
index df51c322b..d22e45796 100644
--- a/test/ext/test_associationproxy.py
+++ b/test/ext/test_associationproxy.py
@@ -610,8 +610,9 @@ class CustomObjectTest(_CollectionOperations):
p = self.roundtrip(p)
self.assert_(len(list(p.children)) == 1)
- # We didn't provide an alternate _AssociationList implementation for
- # our ObjectCollection, so indexing will fail.
+ # We didn't provide an alternate _AssociationList implementation
+ # for our ObjectCollection, so indexing will fail.
+
try:
v = p.children[1]
self.fail()
@@ -635,10 +636,22 @@ class ProxyFactoryTest(ListTest):
Column('name', String(128)))
class CustomProxy(_AssociationList):
- def __init__(self, lazy_collection, creator, value_attr, parent):
+ def __init__(
+ self,
+ lazy_collection,
+ creator,
+ value_attr,
+ parent,
+ ):
getter, setter = parent._default_getset(lazy_collection)
- _AssociationList.__init__(self, lazy_collection, creator, getter, setter, parent)
-
+ _AssociationList.__init__(
+ self,
+ lazy_collection,
+ creator,
+ getter,
+ setter,
+ parent,
+ )
class Parent(object):
children = association_proxy('_children', 'name',
@@ -897,102 +910,94 @@ class LazyLoadTest(TestBase):
class ReconstitutionTest(TestBase):
+
def setup(self):
metadata = MetaData(testing.db)
- parents = Table('parents', metadata,
- Column('id', Integer, primary_key=True,
- test_needs_autoincrement=True),
- Column('name', String(30)))
- children = Table('children', metadata,
- Column('id', Integer, primary_key=True,
- test_needs_autoincrement=True),
- Column('parent_id', Integer, ForeignKey('parents.id')),
- Column('name', String(30)))
+ parents = Table('parents', metadata, Column('id', Integer,
+ primary_key=True,
+ test_needs_autoincrement=True), Column('name',
+ String(30)))
+ children = Table('children', metadata, Column('id', Integer,
+ primary_key=True,
+ test_needs_autoincrement=True),
+ Column('parent_id', Integer,
+ ForeignKey('parents.id')), Column('name',
+ String(30)))
metadata.create_all()
parents.insert().execute(name='p1')
-
-
self.metadata = metadata
self.parents = parents
self.children = children
-
+
def teardown(self):
self.metadata.drop_all()
clear_mappers()
def test_weak_identity_map(self):
- mapper(Parent, self.parents, properties=dict(children=relationship(Child)))
+ mapper(Parent, self.parents,
+ properties=dict(children=relationship(Child)))
mapper(Child, self.children)
-
session = create_session(weak_identity_map=True)
def add_child(parent_name, child_name):
- parent = (session.query(Parent).
- filter_by(name=parent_name)).one()
+ parent = \
+ session.query(Parent).filter_by(name=parent_name).one()
parent.kids.append(child_name)
-
add_child('p1', 'c1')
gc_collect()
add_child('p1', 'c2')
-
session.flush()
p = session.query(Parent).filter_by(name='p1').one()
assert set(p.kids) == set(['c1', 'c2']), p.kids
def test_copy(self):
- mapper(Parent, self.parents, properties=dict(children=relationship(Child)))
+ mapper(Parent, self.parents,
+ properties=dict(children=relationship(Child)))
mapper(Child, self.children)
-
p = Parent('p1')
p.kids.extend(['c1', 'c2'])
p_copy = copy.copy(p)
del p
gc_collect()
-
assert set(p_copy.kids) == set(['c1', 'c2']), p.kids
def test_pickle_list(self):
- mapper(Parent, self.parents, properties=dict(children=relationship(Child)))
+ mapper(Parent, self.parents,
+ properties=dict(children=relationship(Child)))
mapper(Child, self.children)
-
p = Parent('p1')
p.kids.extend(['c1', 'c2'])
-
r1 = pickle.loads(pickle.dumps(p))
assert r1.kids == ['c1', 'c2']
-
r2 = pickle.loads(pickle.dumps(p.kids))
assert r2 == ['c1', 'c2']
def test_pickle_set(self):
- mapper(Parent, self.parents, properties=dict(children=relationship(Child, collection_class=set)))
+ mapper(Parent, self.parents,
+ properties=dict(children=relationship(Child,
+ collection_class=set)))
mapper(Child, self.children)
-
p = Parent('p1')
p.kids.update(['c1', 'c2'])
-
r1 = pickle.loads(pickle.dumps(p))
assert r1.kids == set(['c1', 'c2'])
-
r2 = pickle.loads(pickle.dumps(p.kids))
assert r2 == set(['c1', 'c2'])
def test_pickle_dict(self):
- mapper(Parent, self.parents, properties=dict(
- children=relationship(KVChild, collection_class=collections.mapped_collection(PickleKeyFunc('name')))
- ))
+ mapper(Parent, self.parents,
+ properties=dict(children=relationship(KVChild,
+ collection_class=
+ collections.mapped_collection(PickleKeyFunc('name')))))
mapper(KVChild, self.children)
-
p = Parent('p1')
- p.kids.update({'c1':'v1', 'c2':'v2'})
- assert p.kids == {'c1':'c1', 'c2':'c2'}
-
+ p.kids.update({'c1': 'v1', 'c2': 'v2'})
+ assert p.kids == {'c1': 'c1', 'c2': 'c2'}
r1 = pickle.loads(pickle.dumps(p))
- assert r1.kids == {'c1':'c1', 'c2':'c2'}
-
+ assert r1.kids == {'c1': 'c1', 'c2': 'c2'}
r2 = pickle.loads(pickle.dumps(p.kids))
- assert r2 == {'c1':'c1', 'c2':'c2'}
+ assert r2 == {'c1': 'c1', 'c2': 'c2'}
class PickleKeyFunc(object):
def __init__(self, name):
@@ -1002,40 +1007,37 @@ class PickleKeyFunc(object):
return getattr(obj, self.name)
class ComparatorTest(_base.MappedTest):
+
run_inserts = 'once'
run_deletes = None
run_setup_mappers = 'once'
+ run_setup_classes = 'once'
@classmethod
def define_tables(cls, metadata):
-
- Table(
- 'userkeywords', metadata,
- Column('keyword_id', Integer, ForeignKey('keywords.id'),
- primary_key=True),
- Column('user_id', Integer, ForeignKey('users.id')))
-
- Table(
- 'users', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
- Column('name', String(64)))
-
- Table(
- 'keywords', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
- Column('keyword', String(64)))
+ Table('userkeywords', metadata, Column('keyword_id', Integer,
+ ForeignKey('keywords.id'), primary_key=True),
+ Column('user_id', Integer, ForeignKey('users.id')))
+ Table('users', metadata, Column('id', Integer,
+ primary_key=True, test_needs_autoincrement=True),
+ Column('name', String(64)))
+ Table('keywords', metadata, Column('id', Integer,
+ primary_key=True, test_needs_autoincrement=True),
+ Column('keyword', String(64)))
@classmethod
def setup_classes(cls):
class User(_base.ComparableEntity):
def __init__(self, name):
self.name = name
+
keywords = association_proxy('user_keywords', 'keyword',
- creator=lambda k: UserKeyword(keyword=k))
+ creator=lambda k: UserKeyword(keyword=k))
class Keyword(_base.ComparableEntity):
def __init__(self, keyword):
self.keyword = keyword
+
user = association_proxy('user_keyword', 'user')
class UserKeyword(_base.ComparableEntity):
@@ -1046,32 +1048,35 @@ class ComparatorTest(_base.MappedTest):
@classmethod
@testing.resolve_artifact_names
def setup_mappers(cls):
-
mapper(User, users)
- mapper(Keyword, keywords, properties={
- 'user_keyword': relationship(UserKeyword, uselist=False)
- })
- mapper(UserKeyword, userkeywords, properties={
- 'user': relationship(User, backref='user_keywords'),
- 'keyword': relationship(Keyword),
- })
+ mapper(Keyword, keywords, properties={'user_keyword'
+ : relationship(UserKeyword, uselist=False)})
+ mapper(UserKeyword, userkeywords, properties={'user'
+ : relationship(User, backref='user_keywords'), 'keyword'
+ : relationship(Keyword)})
@classmethod
@testing.resolve_artifact_names
def insert_data(cls):
session = sessionmaker()()
- words = ('quick', 'brown', 'fox', 'jumped', 'over', 'the', 'lazy')
+ words = (
+ 'quick',
+ 'brown',
+ 'fox',
+ 'jumped',
+ 'over',
+ 'the',
+ 'lazy',
+ )
for ii in range(4):
user = User('user%d' % ii)
session.add(user)
- for jj in words[ii:ii+3]:
+ for jj in words[ii:ii + 3]:
user.keywords.append(Keyword(jj))
-
orphan = Keyword('orphan')
orphan.user_keyword = UserKeyword(keyword=orphan, user=None)
session.add(orphan)
session.commit()
-
cls.u = user
cls.kw = user.keywords[0]
cls.session = session
@@ -1081,88 +1086,94 @@ class ComparatorTest(_base.MappedTest):
@testing.resolve_artifact_names
def test_filter_any_kwarg(self):
- self._equivalent(
- self.session.query(User).\
- filter(User.keywords.any(keyword='jumped')),
- self.session.query(User).\
- filter(User.user_keywords.any(
- UserKeyword.keyword.has(keyword='jumped'))))
+ self._equivalent(self.session.query(User).
+ filter(User.keywords.any(keyword='jumped'
+ )),
+ self.session.query(User).filter(
+ User.user_keywords.any(
+ UserKeyword.keyword.has(keyword='jumped'
+ ))))
@testing.resolve_artifact_names
def test_filter_has_kwarg(self):
- self._equivalent(
- self.session.query(Keyword).\
- filter(Keyword.user.has(name='user2')),
- self.session.query(Keyword).\
- filter(Keyword.user_keyword.has(
- UserKeyword.user.has(name='user2'))))
+ self._equivalent(self.session.query(Keyword).
+ filter(Keyword.user.has(name='user2'
+ )),
+ self.session.query(Keyword).
+ filter(Keyword.user_keyword.has(
+ UserKeyword.user.has(name='user2'
+ ))))
@testing.resolve_artifact_names
def test_filter_any_criterion(self):
- self._equivalent(
- self.session.query(User).\
- filter(User.keywords.any(Keyword.keyword == 'jumped')),
- self.session.query(User).\
- filter(User.user_keywords.any(
- UserKeyword.keyword.has(Keyword.keyword == 'jumped'))))
+ self._equivalent(self.session.query(User).
+ filter(User.keywords.any(Keyword.keyword
+ == 'jumped')),
+ self.session.query(User).
+ filter(User.user_keywords.any(
+ UserKeyword.keyword.has(Keyword.keyword
+ == 'jumped'))))
@testing.resolve_artifact_names
def test_filter_has_criterion(self):
- self._equivalent(
- self.session.query(Keyword).\
- filter(Keyword.user.has(User.name == 'user2')),
- self.session.query(Keyword).\
- filter(Keyword.user_keyword.has(
- UserKeyword.user.has(User.name == 'user2'))))
-
+ self._equivalent(self.session.query(Keyword).
+ filter(Keyword.user.has(User.name
+ == 'user2')),
+ self.session.query(Keyword).
+ filter(Keyword.user_keyword.has(
+ UserKeyword.user.has(User.name
+ == 'user2'))))
+
@testing.resolve_artifact_names
def test_filter_contains(self):
- self._equivalent(
- self.session.query(User).\
- filter(User.keywords.contains(self.kw)),
- self.session.query(User).\
- filter(User.user_keywords.any(keyword=self.kw)))
-
+ self._equivalent(self.session.query(User).
+ filter(User.keywords.contains(self.kw)),
+ self.session.query(User).
+ filter(User.user_keywords.any(keyword=self.kw)))
+
@testing.resolve_artifact_names
def test_filter_eq(self):
- self._equivalent(
- self.session.query(Keyword).\
- filter(Keyword.user == self.u),
- self.session.query(Keyword).\
- filter(Keyword.user_keyword.has(user=self.u)))
-
+ self._equivalent(self.session.query(Keyword).filter(Keyword.user
+ == self.u),
+ self.session.query(Keyword).
+ filter(Keyword.user_keyword.has(user=self.u)))
+
@testing.resolve_artifact_names
def test_filter_ne(self):
- self._equivalent(
- self.session.query(Keyword).\
- filter(Keyword.user != self.u),
- self.session.query(Keyword).\
- filter(not_(Keyword.user_keyword.has(user=self.u))))
+ self._equivalent(self.session.query(Keyword).filter(Keyword.user
+ != self.u),
+ self.session.query(Keyword).
+ filter(not_(Keyword.user_keyword.has(user=self.u))))
@testing.resolve_artifact_names
def test_filter_eq_null(self):
- self._equivalent(
- self.session.query(Keyword).\
- filter(Keyword.user == None),
- self.session.query(Keyword).\
- filter(Keyword.user_keyword.has(UserKeyword.user == None)))
+ self._equivalent(self.session.query(Keyword).filter(Keyword.user
+ == None),
+ self.session.query(Keyword).
+ filter(Keyword.user_keyword.has(UserKeyword.user
+ == None)))
@testing.resolve_artifact_names
def test_filter_scalar_contains_fails(self):
- assert_raises(exceptions.InvalidRequestError, lambda: Keyword.user.contains(self.u))
-
+ assert_raises(exceptions.InvalidRequestError, lambda : \
+ Keyword.user.contains(self.u))
+
@testing.resolve_artifact_names
def test_filter_scalar_any_fails(self):
- assert_raises(exceptions.InvalidRequestError, lambda: Keyword.user.any(name='user2'))
+ assert_raises(exceptions.InvalidRequestError, lambda : \
+ Keyword.user.any(name='user2'))
@testing.resolve_artifact_names
def test_filter_collection_has_fails(self):
- assert_raises(exceptions.InvalidRequestError, lambda: User.keywords.has(keyword='quick'))
+ assert_raises(exceptions.InvalidRequestError, lambda : \
+ User.keywords.has(keyword='quick'))
@testing.resolve_artifact_names
def test_filter_collection_eq_fails(self):
- assert_raises(exceptions.InvalidRequestError, lambda: User.keywords == self.kw)
+ assert_raises(exceptions.InvalidRequestError, lambda : \
+ User.keywords == self.kw)
@testing.resolve_artifact_names
def test_filter_collection_ne_fails(self):
- assert_raises(exceptions.InvalidRequestError, lambda: User.keywords != self.kw)
+ assert_raises(exceptions.InvalidRequestError, lambda : \
+ User.keywords != self.kw)
diff --git a/test/ext/test_declarative.py b/test/ext/test_declarative.py
index 5e5c38073..4da826d38 100644
--- a/test/ext/test_declarative.py
+++ b/test/ext/test_declarative.py
@@ -1,19 +1,18 @@
-from sqlalchemy.test.testing import eq_, assert_raises, assert_raises_message
+from sqlalchemy.test.testing import eq_, assert_raises, \
+ assert_raises_message
from sqlalchemy.ext import declarative as decl
from sqlalchemy import exc
import sqlalchemy as sa
from sqlalchemy.test import testing
-from sqlalchemy import MetaData, Integer, String, ForeignKey, ForeignKeyConstraint, asc, Index
+from sqlalchemy import MetaData, Integer, String, ForeignKey, \
+ ForeignKeyConstraint, asc, Index
from sqlalchemy.test.schema import Table, Column
from sqlalchemy.orm import relationship, create_session, class_mapper, \
- joinedload, compile_mappers, backref, \
- clear_mappers, polymorphic_union, \
- deferred, column_property
-
+ joinedload, compile_mappers, backref, clear_mappers, \
+ polymorphic_union, deferred, column_property
from sqlalchemy.test.testing import eq_
from sqlalchemy.util import classproperty
-
from test.orm._base import ComparableEntity, MappedTest
class DeclarativeTestBase(testing.TestBase, testing.AssertsExecutionResults):
@@ -72,25 +71,33 @@ class DeclarativeTest(DeclarativeTestBase):
def go():
class User(Base):
id = Column('id', Integer, primary_key=True)
- assert_raises_message(sa.exc.InvalidRequestError, "does not have a __table__", go)
+
+ assert_raises_message(sa.exc.InvalidRequestError,
+ 'does not have a __table__', go)
def test_cant_add_columns(self):
- t = Table('t', Base.metadata, Column('id', Integer, primary_key=True), Column('data', String))
+ t = Table('t', Base.metadata, Column('id', Integer,
+ primary_key=True), Column('data', String))
+
def go():
class User(Base):
__table__ = t
foo = Column(Integer, primary_key=True)
+
# can't specify new columns not already in the table
- assert_raises_message(sa.exc.ArgumentError,
- "Can't add additional column 'foo' when specifying __table__",
- go)
+
+ assert_raises_message(sa.exc.ArgumentError,
+ "Can't add additional column 'foo' when "
+ "specifying __table__", go)
# regular re-mapping works tho
+
class Bar(Base):
__table__ = t
some_data = t.c.data
-
- assert class_mapper(Bar).get_property('some_data').columns[0] is t.c.data
+
+ assert class_mapper(Bar).get_property('some_data').columns[0] \
+ is t.c.data
def test_difficult_class(self):
"""test no getattr() errors with a customized class"""
@@ -158,177 +165,215 @@ class DeclarativeTest(DeclarativeTestBase):
def test_string_dependency_resolution(self):
from sqlalchemy.sql import desc
-
+
class User(Base, ComparableEntity):
+
__tablename__ = 'users'
- id = Column(Integer, primary_key=True, test_needs_autoincrement=True)
+ id = Column(Integer, primary_key=True,
+ test_needs_autoincrement=True)
name = Column(String(50))
- addresses = relationship("Address", order_by="desc(Address.email)",
- primaryjoin="User.id==Address.user_id", foreign_keys="[Address.user_id]",
- backref=backref('user', primaryjoin="User.id==Address.user_id", foreign_keys="[Address.user_id]")
- )
+ addresses = relationship('Address',
+ order_by='desc(Address.email)',
+ primaryjoin='User.id==Address.user_id',
+ foreign_keys='[Address.user_id]',
+ backref=backref('user',
+ primaryjoin='User.id==Address.user_id',
+ foreign_keys='[Address.user_id]'))
class Address(Base, ComparableEntity):
+
__tablename__ = 'addresses'
- id = Column(Integer, primary_key=True, test_needs_autoincrement=True)
+ id = Column(Integer, primary_key=True,
+ test_needs_autoincrement=True)
email = Column(String(50))
user_id = Column(Integer) # note no foreign key
-
+
Base.metadata.create_all()
-
sess = create_session()
- u1 = User(name='ed', addresses=[Address(email='abc'), Address(email='def'), Address(email='xyz')])
+ u1 = User(name='ed', addresses=[Address(email='abc'),
+ Address(email='def'), Address(email='xyz')])
sess.add(u1)
sess.flush()
sess.expunge_all()
eq_(sess.query(User).filter(User.name == 'ed').one(),
- User(name='ed', addresses=[Address(email='xyz'), Address(email='def'), Address(email='abc')])
- )
-
+ User(name='ed', addresses=[Address(email='xyz'),
+ Address(email='def'), Address(email='abc')]))
+
class Foo(Base, ComparableEntity):
+
__tablename__ = 'foo'
id = Column(Integer, primary_key=True)
- rel = relationship("User", primaryjoin="User.addresses==Foo.id")
- assert_raises_message(exc.InvalidRequestError, "'addresses' is not an instance of ColumnProperty", compile_mappers)
+ rel = relationship('User',
+ primaryjoin='User.addresses==Foo.id')
+
+ assert_raises_message(exc.InvalidRequestError,
+ "'addresses' is not an instance of "
+ "ColumnProperty", compile_mappers)
def test_string_dependency_resolution_two(self):
+
class User(Base, ComparableEntity):
__tablename__ = 'users'
- id = Column(Integer, primary_key=True, test_needs_autoincrement=True)
+ id = Column(Integer, primary_key=True,
+ test_needs_autoincrement=True)
name = Column(String(50))
-
+
class Bar(Base, ComparableEntity):
__tablename__ = 'bar'
id = Column(Integer, primary_key=True)
- rel = relationship("User", primaryjoin="User.id==Bar.__table__.id")
- assert_raises_message(exc.InvalidRequestError, "does not have a mapped column named '__table__'", compile_mappers)
+ rel = relationship('User',
+ primaryjoin='User.id==Bar.__table__.id')
+
+ assert_raises_message(exc.InvalidRequestError,
+ "does not have a mapped column named "
+ "'__table__'", compile_mappers)
def test_string_dependency_resolution_no_magic(self):
"""test that full tinkery expressions work as written"""
-
+
class User(Base, ComparableEntity):
+
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
- addresses = relationship("Address",
- primaryjoin="User.id==Address.user_id.prop.columns[0]")
-
+ addresses = relationship('Address',
+ primaryjoin='User.id==Address.user_id.prop.columns['
+ '0]')
+
class Address(Base, ComparableEntity):
+
__tablename__ = 'addresses'
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey('users.id'))
-
+
compile_mappers()
- eq_(
- str(User.addresses.prop.primaryjoin), "users.id = addresses.user_id"
- )
+ eq_(str(User.addresses.prop.primaryjoin),
+ 'users.id = addresses.user_id')
def test_string_dependency_resolution_in_backref(self):
+
class User(Base, ComparableEntity):
+
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(50))
- addresses = relationship("Address",
- primaryjoin="User.id==Address.user_id",
- backref="user"
- )
+ addresses = relationship('Address',
+ primaryjoin='User.id==Address.user_id',
+ backref='user')
class Address(Base, ComparableEntity):
+
__tablename__ = 'addresses'
id = Column(Integer, primary_key=True)
email = Column(String(50))
- user_id = Column(Integer, ForeignKey('users.id'))
+ user_id = Column(Integer, ForeignKey('users.id'))
compile_mappers()
- eq_(str(User.addresses.property.primaryjoin), str(Address.user.property.primaryjoin))
+ eq_(str(User.addresses.property.primaryjoin),
+ str(Address.user.property.primaryjoin))
def test_string_dependency_resolution_tables(self):
+
class User(Base, ComparableEntity):
+
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(50))
-
- props = relationship("Prop",
- secondary="user_to_prop",
- primaryjoin="User.id==user_to_prop.c.user_id",
- secondaryjoin="user_to_prop.c.prop_id==Prop.id",
- backref="users")
+ props = relationship('Prop', secondary='user_to_prop',
+ primaryjoin='User.id==user_to_prop.c.u'
+ 'ser_id',
+ secondaryjoin='user_to_prop.c.prop_id='
+ '=Prop.id', backref='users')
class Prop(Base, ComparableEntity):
+
__tablename__ = 'props'
id = Column(Integer, primary_key=True)
name = Column(String(50))
-
- user_to_prop = Table('user_to_prop', Base.metadata,
- Column('user_id', Integer, ForeignKey('users.id')),
- Column('prop_id', Integer, ForeignKey('props.id')),
- )
+ user_to_prop = Table('user_to_prop', Base.metadata,
+ Column('user_id', Integer,
+ ForeignKey('users.id')), Column('prop_id',
+ Integer, ForeignKey('props.id')))
compile_mappers()
- assert class_mapper(User).get_property("props").secondary is user_to_prop
+ assert class_mapper(User).get_property('props').secondary \
+ is user_to_prop
def test_uncompiled_attributes_in_relationship(self):
+
class Address(Base, ComparableEntity):
+
__tablename__ = 'addresses'
- id = Column(Integer, primary_key=True, test_needs_autoincrement=True)
+ id = Column(Integer, primary_key=True,
+ test_needs_autoincrement=True)
email = Column(String(50))
user_id = Column(Integer, ForeignKey('users.id'))
class User(Base, ComparableEntity):
+
__tablename__ = 'users'
- id = Column(Integer, primary_key=True, test_needs_autoincrement=True)
+ id = Column(Integer, primary_key=True,
+ test_needs_autoincrement=True)
name = Column(String(50))
- addresses = relationship("Address", order_by=Address.email,
- foreign_keys=Address.user_id,
- remote_side=Address.user_id,
- )
-
+ addresses = relationship('Address', order_by=Address.email,
+ foreign_keys=Address.user_id,
+ remote_side=Address.user_id)
+
# get the mapper for User. User mapper will compile,
# "addresses" relationship will call upon Address.user_id for
# its clause element. Address.user_id is a _CompileOnAttr,
# which then calls class_mapper(Address). But ! We're already
- # "in compilation", but class_mapper(Address) needs to initialize
- # regardless, or COA's assertion fails
- # and things generally go downhill from there.
+ # "in compilation", but class_mapper(Address) needs to
+ # initialize regardless, or COA's assertion fails and things
+ # generally go downhill from there.
+
class_mapper(User)
-
Base.metadata.create_all()
-
sess = create_session()
- u1 = User(name='ed', addresses=[Address(email='abc'), Address(email='xyz'), Address(email='def')])
+ u1 = User(name='ed', addresses=[Address(email='abc'),
+ Address(email='xyz'), Address(email='def')])
sess.add(u1)
sess.flush()
sess.expunge_all()
eq_(sess.query(User).filter(User.name == 'ed').one(),
- User(name='ed', addresses=[Address(email='abc'), Address(email='def'), Address(email='xyz')])
- )
+ User(name='ed', addresses=[Address(email='abc'),
+ Address(email='def'), Address(email='xyz')]))
def test_nice_dependency_error(self):
+
class User(Base):
+
__tablename__ = 'users'
id = Column('id', Integer, primary_key=True)
- addresses = relationship("Address")
+ addresses = relationship('Address')
class Address(Base):
- __tablename__ = 'addresses'
+ __tablename__ = 'addresses'
id = Column(Integer, primary_key=True)
foo = sa.orm.column_property(User.id == 5)
- # this used to raise an error when accessing User.id but that's no longer the case
- # since we got rid of _CompileOnAttr.
+ # this used to raise an error when accessing User.id but that's
+ # no longer the case since we got rid of _CompileOnAttr.
+
assert_raises(sa.exc.ArgumentError, compile_mappers)
-
+
def test_nice_dependency_error_works_with_hasattr(self):
+
class User(Base):
+
__tablename__ = 'users'
id = Column('id', Integer, primary_key=True)
- addresses = relationship("Addresss")
+ addresses = relationship('Addresss')
# hasattr() on a compile-loaded attribute
+
hasattr(User.addresses, 'property')
+
# the exeption is preserved
- assert_raises_message(sa.exc.InvalidRequestError,
- r"suppressed within a hasattr\(\)", compile_mappers)
+
+ assert_raises_message(sa.exc.InvalidRequestError,
+ r"suppressed within a hasattr\(\)",
+ compile_mappers)
def test_custom_base(self):
class MyBase(object):
@@ -339,34 +384,39 @@ class DeclarativeTest(DeclarativeTestBase):
assert Base().foobar() == "foobar"
def test_uses_get_on_class_col_fk(self):
+
# test [ticket:1492]
-
- class Master(Base):
- __tablename__ = 'master'
- id = Column(Integer, primary_key=True, test_needs_autoincrement=True)
- class Detail(Base):
- __tablename__ = 'detail'
- id = Column(Integer, primary_key=True, test_needs_autoincrement=True)
- master_id = Column(None, ForeignKey(Master.id))
- master = relationship(Master)
+ class Master(Base):
+
+ __tablename__ = 'master'
+ id = Column(Integer, primary_key=True,
+ test_needs_autoincrement=True)
+
+ class Detail(Base):
+
+ __tablename__ = 'detail'
+ id = Column(Integer, primary_key=True,
+ test_needs_autoincrement=True)
+ master_id = Column(None, ForeignKey(Master.id))
+ master = relationship(Master)
Base.metadata.create_all()
-
compile_mappers()
- assert class_mapper(Detail).get_property('master').strategy.use_get
-
+ assert class_mapper(Detail).get_property('master'
+ ).strategy.use_get
m1 = Master()
d1 = Detail(master=m1)
sess = create_session()
sess.add(d1)
sess.flush()
sess.expunge_all()
-
d1 = sess.query(Detail).first()
m1 = sess.query(Master).first()
+
def go():
assert d1.master
+
self.assert_sql_count(testing.db, go, 0)
def test_index_doesnt_compile(self):
@@ -390,147 +440,146 @@ class DeclarativeTest(DeclarativeTestBase):
Base.metadata.create_all()
def test_add_prop(self):
+
class User(Base, ComparableEntity):
+
__tablename__ = 'users'
+ id = Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True)
- id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
User.name = Column('name', String(50))
- User.addresses = relationship("Address", backref="user")
+ User.addresses = relationship('Address', backref='user')
class Address(Base, ComparableEntity):
+
__tablename__ = 'addresses'
+ id = Column(Integer, primary_key=True,
+ test_needs_autoincrement=True)
- id = Column(Integer, primary_key=True, test_needs_autoincrement=True)
Address.email = Column(String(50), key='_email')
- Address.user_id = Column('user_id', Integer, ForeignKey('users.id'),
- key='_user_id')
-
+ Address.user_id = Column('user_id', Integer,
+ ForeignKey('users.id'), key='_user_id')
Base.metadata.create_all()
-
eq_(Address.__table__.c['id'].name, 'id')
eq_(Address.__table__.c['_email'].name, 'email')
eq_(Address.__table__.c['_user_id'].name, 'user_id')
-
- u1 = User(name='u1', addresses=[
- Address(email='one'),
- Address(email='two'),
- ])
+ u1 = User(name='u1', addresses=[Address(email='one'),
+ Address(email='two')])
sess = create_session()
sess.add(u1)
sess.flush()
sess.expunge_all()
-
- eq_(sess.query(User).all(), [User(name='u1', addresses=[
- Address(email='one'),
- Address(email='two'),
- ])])
-
+ eq_(sess.query(User).all(), [User(name='u1',
+ addresses=[Address(email='one'), Address(email='two')])])
a1 = sess.query(Address).filter(Address.email == 'two').one()
eq_(a1, Address(email='two'))
eq_(a1.user, User(name='u1'))
def test_eager_order_by(self):
+
class Address(Base, ComparableEntity):
- __tablename__ = 'addresses'
- id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
+ __tablename__ = 'addresses'
+ id = Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True)
email = Column('email', String(50))
user_id = Column('user_id', Integer, ForeignKey('users.id'))
class User(Base, ComparableEntity):
- __tablename__ = 'users'
- id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
+ __tablename__ = 'users'
+ id = Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True)
name = Column('name', String(50))
- addresses = relationship("Address", order_by=Address.email)
+ addresses = relationship('Address', order_by=Address.email)
Base.metadata.create_all()
- u1 = User(name='u1', addresses=[
- Address(email='two'),
- Address(email='one'),
- ])
+ u1 = User(name='u1', addresses=[Address(email='two'),
+ Address(email='one')])
sess = create_session()
sess.add(u1)
sess.flush()
sess.expunge_all()
- eq_(sess.query(User).options(joinedload(User.addresses)).all(), [User(name='u1', addresses=[
- Address(email='one'),
- Address(email='two'),
- ])])
+ eq_(sess.query(User).options(joinedload(User.addresses)).all(),
+ [User(name='u1', addresses=[Address(email='one'),
+ Address(email='two')])])
def test_order_by_multi(self):
+
class Address(Base, ComparableEntity):
- __tablename__ = 'addresses'
- id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
+ __tablename__ = 'addresses'
+ id = Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True)
email = Column('email', String(50))
user_id = Column('user_id', Integer, ForeignKey('users.id'))
class User(Base, ComparableEntity):
- __tablename__ = 'users'
- id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
+ __tablename__ = 'users'
+ id = Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True)
name = Column('name', String(50))
- addresses = relationship("Address", order_by=(Address.email, Address.id))
+ addresses = relationship('Address',
+ order_by=(Address.email, Address.id))
Base.metadata.create_all()
- u1 = User(name='u1', addresses=[
- Address(email='two'),
- Address(email='one'),
- ])
+ u1 = User(name='u1', addresses=[Address(email='two'),
+ Address(email='one')])
sess = create_session()
sess.add(u1)
sess.flush()
sess.expunge_all()
u = sess.query(User).filter(User.name == 'u1').one()
a = u.addresses
-
+
def test_as_declarative(self):
+
class User(ComparableEntity):
- __tablename__ = 'users'
- id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
+ __tablename__ = 'users'
+ id = Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True)
name = Column('name', String(50))
- addresses = relationship("Address", backref="user")
+ addresses = relationship('Address', backref='user')
class Address(ComparableEntity):
- __tablename__ = 'addresses'
- id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
+ __tablename__ = 'addresses'
+ id = Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True)
email = Column('email', String(50))
user_id = Column('user_id', Integer, ForeignKey('users.id'))
-
+
reg = {}
decl.instrument_declarative(User, reg, Base.metadata)
decl.instrument_declarative(Address, reg, Base.metadata)
Base.metadata.create_all()
-
- u1 = User(name='u1', addresses=[
- Address(email='one'),
- Address(email='two'),
- ])
+ u1 = User(name='u1', addresses=[Address(email='one'),
+ Address(email='two')])
sess = create_session()
sess.add(u1)
sess.flush()
sess.expunge_all()
+ eq_(sess.query(User).all(), [User(name='u1',
+ addresses=[Address(email='one'), Address(email='two')])])
- eq_(sess.query(User).all(), [User(name='u1', addresses=[
- Address(email='one'),
- Address(email='two'),
- ])])
-
@testing.uses_deprecated()
def test_custom_mapper(self):
+
class MyExt(sa.orm.MapperExtension):
+
def create_instance(self):
- return "CHECK"
+ return 'CHECK'
def mymapper(cls, tbl, **kwargs):
kwargs['extension'] = MyExt()
return sa.orm.mapper(cls, tbl, **kwargs)
from sqlalchemy.orm.mapper import Mapper
+
class MyMapper(Mapper):
+
def __init__(self, *args, **kwargs):
kwargs['extension'] = MyExt()
Mapper.__init__(self, *args, **kwargs)
@@ -539,109 +588,120 @@ class DeclarativeTest(DeclarativeTestBase):
ss = scoping.ScopedSession(create_session)
ss.extension = MyExt()
ss_mapper = ss.mapper
-
- for mapperfunc in (mymapper, MyMapper, ss_mapper):
+ for mapperfunc in mymapper, MyMapper, ss_mapper:
base = decl.declarative_base()
+
class Foo(base):
+
__tablename__ = 'foo'
__mapper_cls__ = mapperfunc
id = Column(Integer, primary_key=True)
- eq_(Foo.__mapper__.compile().extension.create_instance(), 'CHECK')
+ eq_(Foo.__mapper__.compile().extension.create_instance(),
+ 'CHECK')
base = decl.declarative_base(mapper=mapperfunc)
+
class Foo(base):
+
__tablename__ = 'foo'
id = Column(Integer, primary_key=True)
- eq_(Foo.__mapper__.compile().extension.create_instance(), 'CHECK')
+ eq_(Foo.__mapper__.compile().extension.create_instance(),
+ 'CHECK')
@testing.emits_warning('Ignoring declarative-like tuple value of '
'attribute id')
def test_oops(self):
+
def define():
+
class User(Base, ComparableEntity):
- __tablename__ = 'users'
+ __tablename__ = 'users'
id = Column('id', Integer, primary_key=True),
name = Column('name', String(50))
+
assert False
- assert_raises_message(
- sa.exc.ArgumentError,
- "Mapper Mapper|User|users could not assemble any primary key",
- define)
+
+ assert_raises_message(sa.exc.ArgumentError,
+ 'Mapper Mapper|User|users could not '
+ 'assemble any primary key', define)
def test_table_args(self):
-
+
def err():
class Foo(Base):
+
__tablename__ = 'foo'
- __table_args__ = (ForeignKeyConstraint(['id'], ['foo.id']),)
+ __table_args__ = ForeignKeyConstraint(['id'], ['foo.id'
+ ]),
id = Column('id', Integer, primary_key=True)
-
- assert_raises_message(sa.exc.ArgumentError, "Tuple form of __table_args__ is ", err)
-
+
+ assert_raises_message(sa.exc.ArgumentError,
+ 'Tuple form of __table_args__ is ', err)
+
class Foo(Base):
+
__tablename__ = 'foo'
- __table_args__ = {'mysql_engine':'InnoDB'}
+ __table_args__ = {'mysql_engine': 'InnoDB'}
id = Column('id', Integer, primary_key=True)
-
+
assert Foo.__table__.kwargs['mysql_engine'] == 'InnoDB'
class Bar(Base):
+
__tablename__ = 'bar'
- __table_args__ = (ForeignKeyConstraint(['id'], ['foo.id']), {'mysql_engine':'InnoDB'})
+ __table_args__ = ForeignKeyConstraint(['id'], ['foo.id']), \
+ {'mysql_engine': 'InnoDB'}
id = Column('id', Integer, primary_key=True)
-
+
assert Bar.__table__.c.id.references(Foo.__table__.c.id)
assert Bar.__table__.kwargs['mysql_engine'] == 'InnoDB'
-
+
def test_expression(self):
+
class User(Base, ComparableEntity):
- __tablename__ = 'users'
- id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
+ __tablename__ = 'users'
+ id = Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True)
name = Column('name', String(50))
- addresses = relationship("Address", backref="user")
+ addresses = relationship('Address', backref='user')
class Address(Base, ComparableEntity):
- __tablename__ = 'addresses'
- id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
+ __tablename__ = 'addresses'
+ id = Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True)
email = Column('email', String(50))
user_id = Column('user_id', Integer, ForeignKey('users.id'))
- User.address_count = sa.orm.column_property(
- sa.select([sa.func.count(Address.id)]).
- where(Address.user_id == User.id).as_scalar())
-
+ User.address_count = \
+ sa.orm.column_property(sa.select([sa.func.count(Address.id)]).
+ where(Address.user_id
+ == User.id).as_scalar())
Base.metadata.create_all()
-
- u1 = User(name='u1', addresses=[
- Address(email='one'),
- Address(email='two'),
- ])
+ u1 = User(name='u1', addresses=[Address(email='one'),
+ Address(email='two')])
sess = create_session()
sess.add(u1)
sess.flush()
sess.expunge_all()
-
- eq_(sess.query(User).all(),
- [User(name='u1', address_count=2, addresses=[
- Address(email='one'),
- Address(email='two')])])
+ eq_(sess.query(User).all(), [User(name='u1', address_count=2,
+ addresses=[Address(email='one'), Address(email='two')])])
def test_column(self):
+
class User(Base, ComparableEntity):
- __tablename__ = 'users'
- id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
+ __tablename__ = 'users'
+ id = Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True)
name = Column('name', String(50))
User.a = Column('a', String(10))
User.b = Column(String(10))
-
Base.metadata.create_all()
-
u1 = User(name='u1', a='a', b='b')
eq_(u1.a, 'a')
eq_(User.a.get_history(u1), (['a'], (), ()))
@@ -649,67 +709,70 @@ class DeclarativeTest(DeclarativeTestBase):
sess.add(u1)
sess.flush()
sess.expunge_all()
-
- eq_(sess.query(User).all(),
- [User(name='u1', a='a', b='b')])
+ eq_(sess.query(User).all(), [User(name='u1', a='a', b='b')])
def test_column_properties(self):
+
class Address(Base, ComparableEntity):
+
__tablename__ = 'addresses'
- id = Column(Integer, primary_key=True, test_needs_autoincrement=True)
+ id = Column(Integer, primary_key=True,
+ test_needs_autoincrement=True)
email = Column(String(50))
user_id = Column(Integer, ForeignKey('users.id'))
class User(Base, ComparableEntity):
- __tablename__ = 'users'
- id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
+ __tablename__ = 'users'
+ id = Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True)
name = Column('name', String(50))
- adr_count = sa.orm.column_property(
- sa.select([sa.func.count(Address.id)], Address.user_id == id).
- as_scalar())
+ adr_count = \
+ sa.orm.column_property(sa.select([sa.func.count(Address.id)],
+ Address.user_id == id).as_scalar())
addresses = relationship(Address)
Base.metadata.create_all()
-
- u1 = User(name='u1', addresses=[
- Address(email='one'),
- Address(email='two'),
- ])
+ u1 = User(name='u1', addresses=[Address(email='one'),
+ Address(email='two')])
sess = create_session()
sess.add(u1)
sess.flush()
sess.expunge_all()
-
- eq_(sess.query(User).all(),
- [User(name='u1', adr_count=2, addresses=[
- Address(email='one'),
- Address(email='two')])])
+ eq_(sess.query(User).all(), [User(name='u1', adr_count=2,
+ addresses=[Address(email='one'), Address(email='two')])])
def test_column_properties_2(self):
+
class Address(Base, ComparableEntity):
+
__tablename__ = 'addresses'
id = Column(Integer, primary_key=True)
email = Column(String(50))
user_id = Column(Integer, ForeignKey('users.id'))
class User(Base, ComparableEntity):
- __tablename__ = 'users'
+ __tablename__ = 'users'
id = Column('id', Integer, primary_key=True)
name = Column('name', String(50))
- # this is not "valid" but we want to test that Address.id doesnt
- # get stuck into user's table
+
+ # this is not "valid" but we want to test that Address.id
+ # doesnt get stuck into user's table
+
adr_count = Address.id
eq_(set(User.__table__.c.keys()), set(['id', 'name']))
- eq_(set(Address.__table__.c.keys()), set(['id', 'email', 'user_id']))
+ eq_(set(Address.__table__.c.keys()), set(['id', 'email',
+ 'user_id']))
def test_deferred(self):
+
class User(Base, ComparableEntity):
- __tablename__ = 'users'
- id = Column(Integer, primary_key=True, test_needs_autoincrement=True)
+ __tablename__ = 'users'
+ id = Column(Integer, primary_key=True,
+ test_needs_autoincrement=True)
name = sa.orm.deferred(Column(String(50)))
Base.metadata.create_all()
@@ -717,194 +780,209 @@ class DeclarativeTest(DeclarativeTestBase):
sess.add(User(name='u1'))
sess.flush()
sess.expunge_all()
-
u1 = sess.query(User).filter(User.name == 'u1').one()
assert 'name' not in u1.__dict__
+
def go():
eq_(u1.name, 'u1')
+
self.assert_sql_count(testing.db, go, 1)
def test_synonym_inline(self):
+
class User(Base, ComparableEntity):
- __tablename__ = 'users'
- id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
+ __tablename__ = 'users'
+ id = Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True)
_name = Column('name', String(50))
+
def _set_name(self, name):
- self._name = "SOMENAME " + name
+ self._name = 'SOMENAME ' + name
+
def _get_name(self):
return self._name
+
name = sa.orm.synonym('_name',
- descriptor=property(_get_name, _set_name))
+ descriptor=property(_get_name,
+ _set_name))
Base.metadata.create_all()
-
sess = create_session()
u1 = User(name='someuser')
- eq_(u1.name, "SOMENAME someuser")
+ eq_(u1.name, 'SOMENAME someuser')
sess.add(u1)
sess.flush()
- eq_(sess.query(User).filter(User.name == "SOMENAME someuser").one(), u1)
-
+ eq_(sess.query(User).filter(User.name == 'SOMENAME someuser'
+ ).one(), u1)
+
def test_synonym_no_descriptor(self):
from sqlalchemy.orm.properties import ColumnProperty
-
+
class CustomCompare(ColumnProperty.Comparator):
+
__hash__ = None
+
def __eq__(self, other):
return self.__clause_element__() == other + ' FOO'
-
+
class User(Base, ComparableEntity):
- __tablename__ = 'users'
- id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
+ __tablename__ = 'users'
+ id = Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True)
_name = Column('name', String(50))
- name = sa.orm.synonym('_name', comparator_factory=CustomCompare)
-
- Base.metadata.create_all()
+ name = sa.orm.synonym('_name',
+ comparator_factory=CustomCompare)
+ Base.metadata.create_all()
sess = create_session()
u1 = User(name='someuser FOO')
sess.add(u1)
sess.flush()
- eq_(sess.query(User).filter(User.name == "someuser").one(), u1)
-
+ eq_(sess.query(User).filter(User.name == 'someuser').one(), u1)
+
def test_synonym_added(self):
+
class User(Base, ComparableEntity):
- __tablename__ = 'users'
- id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
+ __tablename__ = 'users'
+ id = Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True)
_name = Column('name', String(50))
+
def _set_name(self, name):
- self._name = "SOMENAME " + name
+ self._name = 'SOMENAME ' + name
+
def _get_name(self):
return self._name
+
name = property(_get_name, _set_name)
- User.name = sa.orm.synonym('_name', descriptor=User.name)
+ User.name = sa.orm.synonym('_name', descriptor=User.name)
Base.metadata.create_all()
-
sess = create_session()
u1 = User(name='someuser')
- eq_(u1.name, "SOMENAME someuser")
+ eq_(u1.name, 'SOMENAME someuser')
sess.add(u1)
sess.flush()
- eq_(sess.query(User).filter(User.name == "SOMENAME someuser").one(), u1)
+ eq_(sess.query(User).filter(User.name == 'SOMENAME someuser'
+ ).one(), u1)
def test_reentrant_compile_via_foreignkey(self):
+
class User(Base, ComparableEntity):
- __tablename__ = 'users'
- id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
+ __tablename__ = 'users'
+ id = Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True)
name = Column('name', String(50))
- addresses = relationship("Address", backref="user")
+ addresses = relationship('Address', backref='user')
class Address(Base, ComparableEntity):
- __tablename__ = 'addresses'
- id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
+ __tablename__ = 'addresses'
+ id = Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True)
email = Column('email', String(50))
user_id = Column('user_id', Integer, ForeignKey(User.id))
- # previous versions would force a re-entrant mapper compile
- # via the User.id inside the ForeignKey but this is no
- # longer the case
- sa.orm.compile_mappers()
+ # previous versions would force a re-entrant mapper compile via
+ # the User.id inside the ForeignKey but this is no longer the
+ # case
- eq_(str(Address.user_id.property.columns[0].foreign_keys[0]), "ForeignKey('users.id')")
-
+ sa.orm.compile_mappers()
+ eq_(str(Address.user_id.property.columns[0].foreign_keys[0]),
+ "ForeignKey('users.id')")
Base.metadata.create_all()
- u1 = User(name='u1', addresses=[
- Address(email='one'),
- Address(email='two'),
- ])
+ u1 = User(name='u1', addresses=[Address(email='one'),
+ Address(email='two')])
sess = create_session()
sess.add(u1)
sess.flush()
sess.expunge_all()
-
- eq_(sess.query(User).all(), [User(name='u1', addresses=[
- Address(email='one'),
- Address(email='two'),
- ])])
+ eq_(sess.query(User).all(), [User(name='u1',
+ addresses=[Address(email='one'), Address(email='two')])])
def test_relationship_reference(self):
+
class Address(Base, ComparableEntity):
- __tablename__ = 'addresses'
- id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
+ __tablename__ = 'addresses'
+ id = Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True)
email = Column('email', String(50))
user_id = Column('user_id', Integer, ForeignKey('users.id'))
class User(Base, ComparableEntity):
- __tablename__ = 'users'
- id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
+ __tablename__ = 'users'
+ id = Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True)
name = Column('name', String(50))
- addresses = relationship("Address", backref="user",
- primaryjoin=id == Address.user_id)
-
- User.address_count = sa.orm.column_property(
- sa.select([sa.func.count(Address.id)]).
- where(Address.user_id == User.id).as_scalar())
+ addresses = relationship('Address', backref='user',
+ primaryjoin=id == Address.user_id)
+ User.address_count = \
+ sa.orm.column_property(sa.select([sa.func.count(Address.id)]).
+ where(Address.user_id
+ == User.id).as_scalar())
Base.metadata.create_all()
-
- u1 = User(name='u1', addresses=[
- Address(email='one'),
- Address(email='two'),
- ])
+ u1 = User(name='u1', addresses=[Address(email='one'),
+ Address(email='two')])
sess = create_session()
sess.add(u1)
sess.flush()
sess.expunge_all()
-
- eq_(sess.query(User).all(),
- [User(name='u1', address_count=2, addresses=[
- Address(email='one'),
- Address(email='two')])])
+ eq_(sess.query(User).all(), [User(name='u1', address_count=2,
+ addresses=[Address(email='one'), Address(email='two')])])
def test_pk_with_fk_init(self):
+
class Bar(Base):
- __tablename__ = 'bar'
- id = sa.Column(sa.Integer, sa.ForeignKey("foo.id"), primary_key=True)
+ __tablename__ = 'bar'
+ id = sa.Column(sa.Integer, sa.ForeignKey('foo.id'),
+ primary_key=True)
ex = sa.Column(sa.Integer, primary_key=True)
class Foo(Base):
- __tablename__ = 'foo'
+ __tablename__ = 'foo'
id = sa.Column(sa.Integer, primary_key=True)
bars = sa.orm.relationship(Bar)
-
+
assert Bar.__mapper__.primary_key[0] is Bar.__table__.c.id
assert Bar.__mapper__.primary_key[1] is Bar.__table__.c.ex
-
def test_with_explicit_autoloaded(self):
meta = MetaData(testing.db)
- t1 = Table('t1', meta,
- Column('id', String(50), primary_key=True, test_needs_autoincrement=True),
+ t1 = Table('t1', meta, Column('id', String(50),
+ primary_key=True, test_needs_autoincrement=True),
Column('data', String(50)))
meta.create_all()
try:
+
class MyObj(Base):
+
__table__ = Table('t1', Base.metadata, autoload=True)
sess = create_session()
- m = MyObj(id="someid", data="somedata")
+ m = MyObj(id='someid', data='somedata')
sess.add(m)
sess.flush()
-
- eq_(t1.select().execute().fetchall(), [('someid', 'somedata')])
+ eq_(t1.select().execute().fetchall(), [('someid', 'somedata'
+ )])
finally:
meta.drop_all()
def test_synonym_for(self):
+
class User(Base, ComparableEntity):
- __tablename__ = 'users'
- id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
+ __tablename__ = 'users'
+ id = Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True)
name = Column('name', String(50))
@decl.synonym_for('name')
@@ -913,374 +991,424 @@ class DeclarativeTest(DeclarativeTestBase):
return self.name
Base.metadata.create_all()
-
sess = create_session()
u1 = User(name='someuser')
- eq_(u1.name, "someuser")
+ eq_(u1.name, 'someuser')
eq_(u1.namesyn, 'someuser')
sess.add(u1)
sess.flush()
-
rt = sess.query(User).filter(User.namesyn == 'someuser').one()
eq_(rt, u1)
def test_comparable_using(self):
+
class NameComparator(sa.orm.PropComparator):
+
@property
def upperself(self):
cls = self.prop.parent.class_
col = getattr(cls, 'name')
return sa.func.upper(col)
- def operate(self, op, other, **kw):
+ def operate(
+ self,
+ op,
+ other,
+ **kw
+ ):
return op(self.upperself, other, **kw)
class User(Base, ComparableEntity):
- __tablename__ = 'users'
- id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
+ __tablename__ = 'users'
+ id = Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True)
name = Column('name', String(50))
@decl.comparable_using(NameComparator)
@property
def uc_name(self):
- return self.name is not None and self.name.upper() or None
+ return self.name is not None and self.name.upper() \
+ or None
Base.metadata.create_all()
-
sess = create_session()
u1 = User(name='someuser')
- eq_(u1.name, "someuser", u1.name)
+ eq_(u1.name, 'someuser', u1.name)
eq_(u1.uc_name, 'SOMEUSER', u1.uc_name)
sess.add(u1)
sess.flush()
sess.expunge_all()
-
rt = sess.query(User).filter(User.uc_name == 'SOMEUSER').one()
eq_(rt, u1)
sess.expunge_all()
-
- rt = sess.query(User).filter(User.uc_name.startswith('SOMEUSE')).one()
+ rt = sess.query(User).filter(User.uc_name.startswith('SOMEUSE'
+ )).one()
eq_(rt, u1)
-
class DeclarativeInheritanceTest(DeclarativeTestBase):
-
+
def test_we_must_copy_mapper_args(self):
+
class Person(Base):
+
__tablename__ = 'people'
id = Column(Integer, primary_key=True)
discriminator = Column('type', String(50))
- __mapper_args__ = {'polymorphic_on': discriminator,'polymorphic_identity':'person'}
-
+ __mapper_args__ = {'polymorphic_on': discriminator,
+ 'polymorphic_identity': 'person'}
+
class Engineer(Person):
+
primary_language = Column(String(50))
assert 'inherits' not in Person.__mapper_args__
assert class_mapper(Engineer).polymorphic_on is None
-
-
+
def test_custom_join_condition(self):
+
class Foo(Base):
+
__tablename__ = 'foo'
id = Column('id', Integer, primary_key=True)
class Bar(Foo):
+
__tablename__ = 'bar'
id = Column('id', Integer, primary_key=True)
foo_id = Column('foo_id', Integer)
- __mapper_args__ = {'inherit_condition':foo_id==Foo.id}
-
+ __mapper_args__ = {'inherit_condition': foo_id == Foo.id}
+
# compile succeeds because inherit_condition is honored
+
compile_mappers()
-
+
def test_joined(self):
+
class Company(Base, ComparableEntity):
+
__tablename__ = 'companies'
- id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
+ id = Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True)
name = Column('name', String(50))
- employees = relationship("Person")
+ employees = relationship('Person')
class Person(Base, ComparableEntity):
+
__tablename__ = 'people'
- id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
+ id = Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True)
company_id = Column('company_id', Integer,
ForeignKey('companies.id'))
name = Column('name', String(50))
discriminator = Column('type', String(50))
- __mapper_args__ = {'polymorphic_on':discriminator}
+ __mapper_args__ = {'polymorphic_on': discriminator}
class Engineer(Person):
+
__tablename__ = 'engineers'
- __mapper_args__ = {'polymorphic_identity':'engineer'}
- id = Column('id', Integer, ForeignKey('people.id'), primary_key=True)
+ __mapper_args__ = {'polymorphic_identity': 'engineer'}
+ id = Column('id', Integer, ForeignKey('people.id'),
+ primary_key=True)
primary_language = Column('primary_language', String(50))
class Manager(Person):
+
__tablename__ = 'managers'
- __mapper_args__ = {'polymorphic_identity':'manager'}
- id = Column('id', Integer, ForeignKey('people.id'), primary_key=True)
+ __mapper_args__ = {'polymorphic_identity': 'manager'}
+ id = Column('id', Integer, ForeignKey('people.id'),
+ primary_key=True)
golf_swing = Column('golf_swing', String(50))
Base.metadata.create_all()
-
sess = create_session()
-
- c1 = Company(name="MegaCorp, Inc.", employees=[
- Engineer(name="dilbert", primary_language="java"),
- Engineer(name="wally", primary_language="c++"),
- Manager(name="dogbert", golf_swing="fore!")
- ])
-
- c2 = Company(name="Elbonia, Inc.", employees=[
- Engineer(name="vlad", primary_language="cobol")
- ])
-
+ c1 = Company(name='MegaCorp, Inc.',
+ employees=[Engineer(name='dilbert',
+ primary_language='java'), Engineer(name='wally',
+ primary_language='c++'), Manager(name='dogbert',
+ golf_swing='fore!')])
+ c2 = Company(name='Elbonia, Inc.',
+ employees=[Engineer(name='vlad',
+ primary_language='cobol')])
sess.add(c1)
sess.add(c2)
sess.flush()
sess.expunge_all()
+ eq_(sess.query(Company).filter(Company.employees.of_type(Engineer).
+ any(Engineer.primary_language
+ == 'cobol')).first(), c2)
- eq_((sess.query(Company).
- filter(Company.employees.of_type(Engineer).
- any(Engineer.primary_language == 'cobol')).first()),
- c2)
+ # ensure that the Manager mapper was compiled with the Person id
+ # column as higher priority. this ensures that "id" will get
+ # loaded from the Person row and not the possibly non-present
+ # Manager row
+
+ assert Manager.id.property.columns == [Person.__table__.c.id,
+ Manager.__table__.c.id]
+
+ # assert that the "id" column is available without a second
+ # load. this would be the symptom of the previous step not being
+ # correct.
- # ensure that the Manager mapper was compiled
- # with the Person id column as higher priority.
- # this ensures that "id" will get loaded from the Person row
- # and not the possibly non-present Manager row
- assert Manager.id.property.columns == [Person.__table__.c.id, Manager.__table__.c.id]
-
- # assert that the "id" column is available without a second load.
- # this would be the symptom of the previous step not being correct.
sess.expunge_all()
+
def go():
- assert sess.query(Manager).filter(Manager.name=='dogbert').one().id
+ assert sess.query(Manager).filter(Manager.name == 'dogbert'
+ ).one().id
+
self.assert_sql_count(testing.db, go, 1)
sess.expunge_all()
+
def go():
- assert sess.query(Person).filter(Manager.name=='dogbert').one().id
+ assert sess.query(Person).filter(Manager.name == 'dogbert'
+ ).one().id
+
self.assert_sql_count(testing.db, go, 1)
-
+
def test_add_subcol_after_the_fact(self):
+
class Person(Base, ComparableEntity):
+
__tablename__ = 'people'
- id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
+ id = Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True)
name = Column('name', String(50))
discriminator = Column('type', String(50))
- __mapper_args__ = {'polymorphic_on':discriminator}
+ __mapper_args__ = {'polymorphic_on': discriminator}
class Engineer(Person):
+
__tablename__ = 'engineers'
- __mapper_args__ = {'polymorphic_identity':'engineer'}
- id = Column('id', Integer, ForeignKey('people.id'), primary_key=True)
-
- Engineer.primary_language = Column('primary_language', String(50))
-
- Base.metadata.create_all()
+ __mapper_args__ = {'polymorphic_identity': 'engineer'}
+ id = Column('id', Integer, ForeignKey('people.id'),
+ primary_key=True)
+ Engineer.primary_language = Column('primary_language',
+ String(50))
+ Base.metadata.create_all()
sess = create_session()
e1 = Engineer(primary_language='java', name='dilbert')
sess.add(e1)
sess.flush()
sess.expunge_all()
+ eq_(sess.query(Person).first(), Engineer(primary_language='java'
+ , name='dilbert'))
- eq_(sess.query(Person).first(),
- Engineer(primary_language='java', name='dilbert')
- )
-
def test_add_parentcol_after_the_fact(self):
+
class Person(Base, ComparableEntity):
+
__tablename__ = 'people'
- id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
+ id = Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True)
discriminator = Column('type', String(50))
- __mapper_args__ = {'polymorphic_on':discriminator}
+ __mapper_args__ = {'polymorphic_on': discriminator}
class Engineer(Person):
+
__tablename__ = 'engineers'
- __mapper_args__ = {'polymorphic_identity':'engineer'}
+ __mapper_args__ = {'polymorphic_identity': 'engineer'}
primary_language = Column(String(50))
- id = Column('id', Integer, ForeignKey('people.id'), primary_key=True)
-
+ id = Column('id', Integer, ForeignKey('people.id'),
+ primary_key=True)
+
Person.name = Column('name', String(50))
-
Base.metadata.create_all()
-
sess = create_session()
e1 = Engineer(primary_language='java', name='dilbert')
sess.add(e1)
sess.flush()
sess.expunge_all()
-
- eq_(sess.query(Person).first(),
- Engineer(primary_language='java', name='dilbert')
- )
+ eq_(sess.query(Person).first(),
+ Engineer(primary_language='java', name='dilbert'))
def test_add_sub_parentcol_after_the_fact(self):
+
class Person(Base, ComparableEntity):
+
__tablename__ = 'people'
- id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
+ id = Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True)
discriminator = Column('type', String(50))
- __mapper_args__ = {'polymorphic_on':discriminator}
+ __mapper_args__ = {'polymorphic_on': discriminator}
class Engineer(Person):
+
__tablename__ = 'engineers'
- __mapper_args__ = {'polymorphic_identity':'engineer'}
+ __mapper_args__ = {'polymorphic_identity': 'engineer'}
primary_language = Column(String(50))
- id = Column('id', Integer, ForeignKey('people.id'), primary_key=True)
-
+ id = Column('id', Integer, ForeignKey('people.id'),
+ primary_key=True)
+
class Admin(Engineer):
+
__tablename__ = 'admins'
- __mapper_args__ = {'polymorphic_identity':'admin'}
+ __mapper_args__ = {'polymorphic_identity': 'admin'}
workstation = Column(String(50))
- id = Column('id', Integer, ForeignKey('engineers.id'), primary_key=True)
-
- Person.name = Column('name', String(50))
+ id = Column('id', Integer, ForeignKey('engineers.id'),
+ primary_key=True)
+ Person.name = Column('name', String(50))
Base.metadata.create_all()
-
sess = create_session()
- e1 = Admin(primary_language='java', name='dilbert', workstation='foo')
+ e1 = Admin(primary_language='java', name='dilbert',
+ workstation='foo')
sess.add(e1)
sess.flush()
sess.expunge_all()
+ eq_(sess.query(Person).first(), Admin(primary_language='java',
+ name='dilbert', workstation='foo'))
- eq_(sess.query(Person).first(),
- Admin(primary_language='java', name='dilbert', workstation='foo')
- )
-
def test_subclass_mixin(self):
+
class Person(Base, ComparableEntity):
+
__tablename__ = 'people'
id = Column('id', Integer, primary_key=True)
name = Column('name', String(50))
discriminator = Column('type', String(50))
- __mapper_args__ = {'polymorphic_on':discriminator}
-
+ __mapper_args__ = {'polymorphic_on': discriminator}
+
class MyMixin(object):
+
pass
-
+
class Engineer(MyMixin, Person):
+
__tablename__ = 'engineers'
- __mapper_args__ = {'polymorphic_identity':'engineer'}
- id = Column('id', Integer, ForeignKey('people.id'), primary_key=True)
+ __mapper_args__ = {'polymorphic_identity': 'engineer'}
+ id = Column('id', Integer, ForeignKey('people.id'),
+ primary_key=True)
primary_language = Column('primary_language', String(50))
-
+
assert class_mapper(Engineer).inherits is class_mapper(Person)
-
+
def test_with_undefined_foreignkey(self):
+
class Parent(Base):
- __tablename__ = 'parent'
- id = Column('id', Integer, primary_key=True)
- tp = Column('type', String(50))
- __mapper_args__ = dict(polymorphic_on = tp)
+
+ __tablename__ = 'parent'
+ id = Column('id', Integer, primary_key=True)
+ tp = Column('type', String(50))
+ __mapper_args__ = dict(polymorphic_on=tp)
class Child1(Parent):
- __tablename__ = 'child1'
- id = Column('id', Integer, ForeignKey('parent.id'), primary_key=True)
- related_child2 = Column('c2', Integer, ForeignKey('child2.id'))
- __mapper_args__ = dict(polymorphic_identity = 'child1')
- # no exception is raised by the ForeignKey to "child2" even though
- # child2 doesn't exist yet
+ __tablename__ = 'child1'
+ id = Column('id', Integer, ForeignKey('parent.id'),
+ primary_key=True)
+ related_child2 = Column('c2', Integer,
+ ForeignKey('child2.id'))
+ __mapper_args__ = dict(polymorphic_identity='child1')
+
+ # no exception is raised by the ForeignKey to "child2" even
+ # though child2 doesn't exist yet
class Child2(Parent):
- __tablename__ = 'child2'
- id = Column('id', Integer, ForeignKey('parent.id'), primary_key=True)
- related_child1 = Column('c1', Integer)
- __mapper_args__ = dict(polymorphic_identity = 'child2')
+
+ __tablename__ = 'child2'
+ id = Column('id', Integer, ForeignKey('parent.id'),
+ primary_key=True)
+ related_child1 = Column('c1', Integer)
+ __mapper_args__ = dict(polymorphic_identity='child2')
sa.orm.compile_mappers() # no exceptions here
def test_single_colsonbase(self):
- """test single inheritance where all the columns are on the base class."""
-
+ """test single inheritance where all the columns are on the base
+ class."""
+
class Company(Base, ComparableEntity):
+
__tablename__ = 'companies'
- id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
+ id = Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True)
name = Column('name', String(50))
- employees = relationship("Person")
+ employees = relationship('Person')
class Person(Base, ComparableEntity):
+
__tablename__ = 'people'
- id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
+ id = Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True)
company_id = Column('company_id', Integer,
ForeignKey('companies.id'))
name = Column('name', String(50))
discriminator = Column('type', String(50))
primary_language = Column('primary_language', String(50))
golf_swing = Column('golf_swing', String(50))
- __mapper_args__ = {'polymorphic_on':discriminator}
+ __mapper_args__ = {'polymorphic_on': discriminator}
class Engineer(Person):
- __mapper_args__ = {'polymorphic_identity':'engineer'}
+
+ __mapper_args__ = {'polymorphic_identity': 'engineer'}
class Manager(Person):
- __mapper_args__ = {'polymorphic_identity':'manager'}
- Base.metadata.create_all()
+ __mapper_args__ = {'polymorphic_identity': 'manager'}
+ Base.metadata.create_all()
sess = create_session()
- c1 = Company(name="MegaCorp, Inc.", employees=[
- Engineer(name="dilbert", primary_language="java"),
- Engineer(name="wally", primary_language="c++"),
- Manager(name="dogbert", golf_swing="fore!")
- ])
-
- c2 = Company(name="Elbonia, Inc.", employees=[
- Engineer(name="vlad", primary_language="cobol")
- ])
-
+ c1 = Company(name='MegaCorp, Inc.',
+ employees=[Engineer(name='dilbert',
+ primary_language='java'), Engineer(name='wally',
+ primary_language='c++'), Manager(name='dogbert',
+ golf_swing='fore!')])
+ c2 = Company(name='Elbonia, Inc.',
+ employees=[Engineer(name='vlad',
+ primary_language='cobol')])
sess.add(c1)
sess.add(c2)
sess.flush()
sess.expunge_all()
-
- eq_((sess.query(Person).
- filter(Engineer.primary_language == 'cobol').first()),
- Engineer(name='vlad'))
- eq_((sess.query(Company).
- filter(Company.employees.of_type(Engineer).
- any(Engineer.primary_language == 'cobol')).first()),
- c2)
+ eq_(sess.query(Person).filter(Engineer.primary_language
+ == 'cobol').first(), Engineer(name='vlad'))
+ eq_(sess.query(Company).filter(Company.employees.of_type(Engineer).
+ any(Engineer.primary_language
+ == 'cobol')).first(), c2)
def test_single_colsonsub(self):
- """test single inheritance where the columns are local to their class.
+ """test single inheritance where the columns are local to their
+ class.
this is a newer usage.
"""
class Company(Base, ComparableEntity):
+
__tablename__ = 'companies'
- id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
+ id = Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True)
name = Column('name', String(50))
- employees = relationship("Person")
+ employees = relationship('Person')
class Person(Base, ComparableEntity):
+
__tablename__ = 'people'
- id = Column(Integer, primary_key=True, test_needs_autoincrement=True)
- company_id = Column(Integer,
- ForeignKey('companies.id'))
+ id = Column(Integer, primary_key=True,
+ test_needs_autoincrement=True)
+ company_id = Column(Integer, ForeignKey('companies.id'))
name = Column(String(50))
discriminator = Column('type', String(50))
- __mapper_args__ = {'polymorphic_on':discriminator}
+ __mapper_args__ = {'polymorphic_on': discriminator}
class Engineer(Person):
- __mapper_args__ = {'polymorphic_identity':'engineer'}
+
+ __mapper_args__ = {'polymorphic_identity': 'engineer'}
primary_language = Column(String(50))
class Manager(Person):
- __mapper_args__ = {'polymorphic_identity':'manager'}
+
+ __mapper_args__ = {'polymorphic_identity': 'manager'}
golf_swing = Column(String(50))
- # we have here a situation that is somewhat unique.
- # the Person class is mapped to the "people" table, but it
- # was mapped when the table did not include the "primary_language"
- # or "golf_swing" columns. declarative will also manipulate
- # the exclude_properties collection so that sibling classes
- # don't cross-pollinate.
+ # we have here a situation that is somewhat unique. the Person
+ # class is mapped to the "people" table, but it was mapped when
+ # the table did not include the "primary_language" or
+ # "golf_swing" columns. declarative will also manipulate the
+ # exclude_properties collection so that sibling classes don't
+ # cross-pollinate.
assert Person.__table__.c.company_id is not None
assert Person.__table__.c.golf_swing is not None
@@ -1291,59 +1419,57 @@ class DeclarativeInheritanceTest(DeclarativeTestBase):
assert not hasattr(Person, 'golf_swing')
assert not hasattr(Engineer, 'golf_swing')
assert not hasattr(Manager, 'primary_language')
-
Base.metadata.create_all()
-
sess = create_session()
-
- e1 = Engineer(name="dilbert", primary_language="java")
- e2 = Engineer(name="wally", primary_language="c++")
- m1 = Manager(name="dogbert", golf_swing="fore!")
- c1 = Company(name="MegaCorp, Inc.", employees=[e1, e2, m1])
-
- e3 =Engineer(name="vlad", primary_language="cobol")
- c2 = Company(name="Elbonia, Inc.", employees=[e3])
+ e1 = Engineer(name='dilbert', primary_language='java')
+ e2 = Engineer(name='wally', primary_language='c++')
+ m1 = Manager(name='dogbert', golf_swing='fore!')
+ c1 = Company(name='MegaCorp, Inc.', employees=[e1, e2, m1])
+ e3 = Engineer(name='vlad', primary_language='cobol')
+ c2 = Company(name='Elbonia, Inc.', employees=[e3])
sess.add(c1)
sess.add(c2)
sess.flush()
sess.expunge_all()
-
- eq_((sess.query(Person).
- filter(Engineer.primary_language == 'cobol').first()),
- Engineer(name='vlad'))
- eq_((sess.query(Company).
- filter(Company.employees.of_type(Engineer).
- any(Engineer.primary_language == 'cobol')).first()),
- c2)
-
- eq_(
- sess.query(Engineer).filter_by(primary_language='cobol').one(),
- Engineer(name="vlad", primary_language="cobol")
- )
+ eq_(sess.query(Person).filter(Engineer.primary_language
+ == 'cobol').first(), Engineer(name='vlad'))
+ eq_(sess.query(Company).filter(Company.employees.of_type(Engineer).
+ any(Engineer.primary_language
+ == 'cobol')).first(), c2)
+ eq_(sess.query(Engineer).filter_by(primary_language='cobol'
+ ).one(), Engineer(name='vlad', primary_language='cobol'))
def test_joined_from_single(self):
+
class Company(Base, ComparableEntity):
+
__tablename__ = 'companies'
- id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
+ id = Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True)
name = Column('name', String(50))
- employees = relationship("Person")
-
+ employees = relationship('Person')
+
class Person(Base, ComparableEntity):
+
__tablename__ = 'people'
- id = Column(Integer, primary_key=True, test_needs_autoincrement=True)
+ id = Column(Integer, primary_key=True,
+ test_needs_autoincrement=True)
company_id = Column(Integer, ForeignKey('companies.id'))
name = Column(String(50))
discriminator = Column('type', String(50))
- __mapper_args__ = {'polymorphic_on':discriminator}
+ __mapper_args__ = {'polymorphic_on': discriminator}
class Manager(Person):
- __mapper_args__ = {'polymorphic_identity':'manager'}
+
+ __mapper_args__ = {'polymorphic_identity': 'manager'}
golf_swing = Column(String(50))
class Engineer(Person):
+
__tablename__ = 'engineers'
- __mapper_args__ = {'polymorphic_identity':'engineer'}
- id = Column(Integer, ForeignKey('people.id'), primary_key=True)
+ __mapper_args__ = {'polymorphic_identity': 'engineer'}
+ id = Column(Integer, ForeignKey('people.id'),
+ primary_key=True)
primary_language = Column(String(50))
assert Person.__table__.c.golf_swing is not None
@@ -1355,132 +1481,128 @@ class DeclarativeInheritanceTest(DeclarativeTestBase):
assert not hasattr(Person, 'golf_swing')
assert not hasattr(Engineer, 'golf_swing')
assert not hasattr(Manager, 'primary_language')
-
Base.metadata.create_all()
-
sess = create_session()
-
- e1 = Engineer(name="dilbert", primary_language="java")
- e2 = Engineer(name="wally", primary_language="c++")
- m1 = Manager(name="dogbert", golf_swing="fore!")
- c1 = Company(name="MegaCorp, Inc.", employees=[e1, e2, m1])
- e3 =Engineer(name="vlad", primary_language="cobol")
- c2 = Company(name="Elbonia, Inc.", employees=[e3])
+ e1 = Engineer(name='dilbert', primary_language='java')
+ e2 = Engineer(name='wally', primary_language='c++')
+ m1 = Manager(name='dogbert', golf_swing='fore!')
+ c1 = Company(name='MegaCorp, Inc.', employees=[e1, e2, m1])
+ e3 = Engineer(name='vlad', primary_language='cobol')
+ c2 = Company(name='Elbonia, Inc.', employees=[e3])
sess.add(c1)
sess.add(c2)
sess.flush()
sess.expunge_all()
-
- eq_((sess.query(Person).with_polymorphic(Engineer).
- filter(Engineer.primary_language == 'cobol').first()),
- Engineer(name='vlad'))
- eq_((sess.query(Company).
- filter(Company.employees.of_type(Engineer).
- any(Engineer.primary_language == 'cobol')).first()),
- c2)
-
- eq_(
- sess.query(Engineer).filter_by(primary_language='cobol').one(),
- Engineer(name="vlad", primary_language="cobol")
- )
+ eq_(sess.query(Person).with_polymorphic(Engineer).
+ filter(Engineer.primary_language
+ == 'cobol').first(), Engineer(name='vlad'))
+ eq_(sess.query(Company).filter(Company.employees.of_type(Engineer).
+ any(Engineer.primary_language
+ == 'cobol')).first(), c2)
+ eq_(sess.query(Engineer).filter_by(primary_language='cobol'
+ ).one(), Engineer(name='vlad', primary_language='cobol'))
def test_add_deferred(self):
+
class Person(Base, ComparableEntity):
+
__tablename__ = 'people'
- id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
+ id = Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True)
Person.name = deferred(Column(String(10)))
-
Base.metadata.create_all()
sess = create_session()
p = Person(name='ratbert')
-
sess.add(p)
sess.flush()
sess.expunge_all()
- eq_(
- sess.query(Person).all(),
- [
- Person(name='ratbert')
- ]
- )
+ eq_(sess.query(Person).all(), [Person(name='ratbert')])
sess.expunge_all()
-
- person = sess.query(Person).filter(Person.name == 'ratbert').one()
+ person = sess.query(Person).filter(Person.name == 'ratbert'
+ ).one()
assert 'name' not in person.__dict__
def test_single_fksonsub(self):
- """test single inheritance with a foreign key-holding column on a subclass.
-
+ """test single inheritance with a foreign key-holding column on
+ a subclass.
+
"""
class Person(Base, ComparableEntity):
+
__tablename__ = 'people'
- id = Column(Integer, primary_key=True, test_needs_autoincrement=True)
+ id = Column(Integer, primary_key=True,
+ test_needs_autoincrement=True)
name = Column(String(50))
discriminator = Column('type', String(50))
- __mapper_args__ = {'polymorphic_on':discriminator}
+ __mapper_args__ = {'polymorphic_on': discriminator}
class Engineer(Person):
- __mapper_args__ = {'polymorphic_identity':'engineer'}
- primary_language_id = Column(Integer, ForeignKey('languages.id'))
- primary_language = relationship("Language")
-
+
+ __mapper_args__ = {'polymorphic_identity': 'engineer'}
+ primary_language_id = Column(Integer,
+ ForeignKey('languages.id'))
+ primary_language = relationship('Language')
+
class Language(Base, ComparableEntity):
+
__tablename__ = 'languages'
- id = Column(Integer, primary_key=True, test_needs_autoincrement=True)
+ id = Column(Integer, primary_key=True,
+ test_needs_autoincrement=True)
name = Column(String(50))
assert not hasattr(Person, 'primary_language_id')
-
Base.metadata.create_all()
-
sess = create_session()
-
- java, cpp, cobol = Language(name='java'),Language(name='cpp'), Language(name='cobol')
- e1 = Engineer(name="dilbert", primary_language=java)
- e2 = Engineer(name="wally", primary_language=cpp)
- e3 =Engineer(name="vlad", primary_language=cobol)
+ java, cpp, cobol = Language(name='java'), Language(name='cpp'), \
+ Language(name='cobol')
+ e1 = Engineer(name='dilbert', primary_language=java)
+ e2 = Engineer(name='wally', primary_language=cpp)
+ e3 = Engineer(name='vlad', primary_language=cobol)
sess.add_all([e1, e2, e3])
sess.flush()
sess.expunge_all()
+ eq_(sess.query(Person).filter(Engineer.primary_language.has(
+ Language.name
+ == 'cobol')).first(), Engineer(name='vlad',
+ primary_language=Language(name='cobol')))
+ eq_(sess.query(Engineer).filter(Engineer.primary_language.has(
+ Language.name
+ == 'cobol')).one(), Engineer(name='vlad',
+ primary_language=Language(name='cobol')))
+ eq_(sess.query(Person).join(Engineer.primary_language).order_by(
+ Language.name).all(),
+ [Engineer(name='vlad',
+ primary_language=Language(name='cobol')),
+ Engineer(name='wally', primary_language=Language(name='cpp'
+ )), Engineer(name='dilbert',
+ primary_language=Language(name='java'))])
- eq_((sess.query(Person).
- filter(Engineer.primary_language.has(Language.name=='cobol')).first()),
- Engineer(name='vlad', primary_language=Language(name='cobol')))
-
- eq_(
- sess.query(Engineer).filter(Engineer.primary_language.has(Language.name=='cobol')).one(),
- Engineer(name="vlad", primary_language=Language(name='cobol'))
- )
-
- eq_(
- sess.query(Person).join(Engineer.primary_language).order_by(Language.name).all(),
- [
- Engineer(name='vlad', primary_language=Language(name='cobol')),
- Engineer(name='wally', primary_language=Language(name='cpp')),
- Engineer(name='dilbert', primary_language=Language(name='java')),
- ]
- )
-
def test_single_three_levels(self):
+
class Person(Base, ComparableEntity):
+
__tablename__ = 'people'
id = Column(Integer, primary_key=True)
name = Column(String(50))
discriminator = Column('type', String(50))
- __mapper_args__ = {'polymorphic_on':discriminator}
+ __mapper_args__ = {'polymorphic_on': discriminator}
class Engineer(Person):
- __mapper_args__ = {'polymorphic_identity':'engineer'}
+
+ __mapper_args__ = {'polymorphic_identity': 'engineer'}
primary_language = Column(String(50))
class JuniorEngineer(Engineer):
- __mapper_args__ = {'polymorphic_identity':'junior_engineer'}
+
+ __mapper_args__ = \
+ {'polymorphic_identity': 'junior_engineer'}
nerf_gun = Column(String(50))
class Manager(Person):
- __mapper_args__ = {'polymorphic_identity':'manager'}
+
+ __mapper_args__ = {'polymorphic_identity': 'manager'}
golf_swing = Column(String(50))
assert JuniorEngineer.nerf_gun
@@ -1492,277 +1614,289 @@ class DeclarativeInheritanceTest(DeclarativeTestBase):
assert not hasattr(Engineer, 'nerf_gun')
assert not hasattr(Manager, 'nerf_gun')
assert not hasattr(Manager, 'primary_language')
-
+
def test_single_detects_conflict(self):
+
class Person(Base):
+
__tablename__ = 'people'
id = Column(Integer, primary_key=True)
name = Column(String(50))
discriminator = Column('type', String(50))
- __mapper_args__ = {'polymorphic_on':discriminator}
+ __mapper_args__ = {'polymorphic_on': discriminator}
class Engineer(Person):
- __mapper_args__ = {'polymorphic_identity':'engineer'}
+
+ __mapper_args__ = {'polymorphic_identity': 'engineer'}
primary_language = Column(String(50))
-
+
# test sibling col conflict
+
def go():
+
class Manager(Person):
- __mapper_args__ = {'polymorphic_identity':'manager'}
+
+ __mapper_args__ = {'polymorphic_identity': 'manager'}
golf_swing = Column(String(50))
primary_language = Column(String(50))
+
assert_raises(sa.exc.ArgumentError, go)
# test parent col conflict
+
def go():
+
class Salesman(Person):
- __mapper_args__ = {'polymorphic_identity':'manager'}
+
+ __mapper_args__ = {'polymorphic_identity': 'manager'}
name = Column(String(50))
+
assert_raises(sa.exc.ArgumentError, go)
-
def test_single_no_special_cols(self):
+
class Person(Base, ComparableEntity):
+
__tablename__ = 'people'
id = Column('id', Integer, primary_key=True)
name = Column('name', String(50))
discriminator = Column('type', String(50))
- __mapper_args__ = {'polymorphic_on':discriminator}
+ __mapper_args__ = {'polymorphic_on': discriminator}
def go():
+
class Engineer(Person):
- __mapper_args__ = {'polymorphic_identity':'engineer'}
- primary_language = Column('primary_language', String(50))
+
+ __mapper_args__ = {'polymorphic_identity': 'engineer'}
+ primary_language = Column('primary_language',
+ String(50))
foo_bar = Column(Integer, primary_key=True)
- assert_raises_message(sa.exc.ArgumentError, "place primary key", go)
-
+
+ assert_raises_message(sa.exc.ArgumentError, 'place primary key'
+ , go)
+
def test_single_no_table_args(self):
+
class Person(Base, ComparableEntity):
+
__tablename__ = 'people'
id = Column('id', Integer, primary_key=True)
name = Column('name', String(50))
discriminator = Column('type', String(50))
- __mapper_args__ = {'polymorphic_on':discriminator}
+ __mapper_args__ = {'polymorphic_on': discriminator}
def go():
+
class Engineer(Person):
- __mapper_args__ = {'polymorphic_identity':'engineer'}
- primary_language = Column('primary_language', String(50))
+
+ __mapper_args__ = {'polymorphic_identity': 'engineer'}
+ primary_language = Column('primary_language',
+ String(50))
+
# this should be on the Person class, as this is single
# table inheritance, which is why we test that this
# throws an exception!
- __table_args__ = {'mysql_engine':'InnoDB'}
- assert_raises_message(sa.exc.ArgumentError, "place __table_args__", go)
-
+
+ __table_args__ = {'mysql_engine': 'InnoDB'}
+
+ assert_raises_message(sa.exc.ArgumentError,
+ 'place __table_args__', go)
+
def test_concrete(self):
- engineers = Table('engineers', Base.metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
- Column('name', String(50)),
- Column('primary_language', String(50))
- )
- managers = Table('managers', Base.metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
- Column('name', String(50)),
- Column('golf_swing', String(50))
- )
-
- punion = polymorphic_union({
- 'engineer':engineers,
- 'manager':managers
- }, 'type', 'punion')
+ engineers = Table('engineers', Base.metadata, Column('id',
+ Integer, primary_key=True,
+ test_needs_autoincrement=True), Column('name'
+ , String(50)), Column('primary_language',
+ String(50)))
+ managers = Table('managers', Base.metadata, Column('id',
+ Integer, primary_key=True,
+ test_needs_autoincrement=True), Column('name',
+ String(50)), Column('golf_swing', String(50)))
+ punion = polymorphic_union({'engineer': engineers, 'manager'
+ : managers}, 'type', 'punion')
class Person(Base, ComparableEntity):
+
__table__ = punion
- __mapper_args__ = {'polymorphic_on':punion.c.type}
+ __mapper_args__ = {'polymorphic_on': punion.c.type}
class Engineer(Person):
+
__table__ = engineers
- __mapper_args__ = {'polymorphic_identity':'engineer', 'concrete':True}
+ __mapper_args__ = {'polymorphic_identity': 'engineer',
+ 'concrete': True}
class Manager(Person):
+
__table__ = managers
- __mapper_args__ = {'polymorphic_identity':'manager', 'concrete':True}
-
+ __mapper_args__ = {'polymorphic_identity': 'manager',
+ 'concrete': True}
+
Base.metadata.create_all()
sess = create_session()
-
- e1 = Engineer(name="dilbert", primary_language="java")
- e2 = Engineer(name="wally", primary_language="c++")
- m1 = Manager(name="dogbert", golf_swing="fore!")
- e3 = Engineer(name="vlad", primary_language="cobol")
-
+ e1 = Engineer(name='dilbert', primary_language='java')
+ e2 = Engineer(name='wally', primary_language='c++')
+ m1 = Manager(name='dogbert', golf_swing='fore!')
+ e3 = Engineer(name='vlad', primary_language='cobol')
sess.add_all([e1, e2, m1, e3])
sess.flush()
sess.expunge_all()
- eq_(
- sess.query(Person).order_by(Person.name).all(),
- [
- Engineer(name='dilbert'), Manager(name='dogbert'),
- Engineer(name='vlad'), Engineer(name='wally')
- ]
- )
-
+ eq_(sess.query(Person).order_by(Person.name).all(),
+ [Engineer(name='dilbert'), Manager(name='dogbert'),
+ Engineer(name='vlad'), Engineer(name='wally')])
+
def test_concrete_inline_non_polymorphic(self):
"""test the example from the declarative docs."""
-
+
class Person(Base, ComparableEntity):
+
__tablename__ = 'people'
- id = Column(Integer, primary_key=True, test_needs_autoincrement=True)
+ id = Column(Integer, primary_key=True,
+ test_needs_autoincrement=True)
name = Column(String(50))
class Engineer(Person):
+
__tablename__ = 'engineers'
- __mapper_args__ = {'concrete':True}
- id = Column(Integer, primary_key=True, test_needs_autoincrement=True)
+ __mapper_args__ = {'concrete': True}
+ id = Column(Integer, primary_key=True,
+ test_needs_autoincrement=True)
primary_language = Column(String(50))
name = Column(String(50))
class Manager(Person):
+
__tablename__ = 'manager'
- __mapper_args__ = {'concrete':True}
- id = Column(Integer, primary_key=True, test_needs_autoincrement=True)
+ __mapper_args__ = {'concrete': True}
+ id = Column(Integer, primary_key=True,
+ test_needs_autoincrement=True)
golf_swing = Column(String(50))
name = Column(String(50))
Base.metadata.create_all()
sess = create_session()
-
- e1 = Engineer(name="dilbert", primary_language="java")
- e2 = Engineer(name="wally", primary_language="c++")
- m1 = Manager(name="dogbert", golf_swing="fore!")
- e3 = Engineer(name="vlad", primary_language="cobol")
-
+ e1 = Engineer(name='dilbert', primary_language='java')
+ e2 = Engineer(name='wally', primary_language='c++')
+ m1 = Manager(name='dogbert', golf_swing='fore!')
+ e3 = Engineer(name='vlad', primary_language='cobol')
sess.add_all([e1, e2, m1, e3])
sess.flush()
sess.expunge_all()
- eq_(
- sess.query(Engineer).order_by(Engineer.name).all(),
- [
- Engineer(name='dilbert'),
- Engineer(name='vlad'), Engineer(name='wally')
- ]
- )
- eq_(
- sess.query(Manager).all(),
- [
- Manager(name='dogbert'),
- ]
- )
-
-
+ eq_(sess.query(Engineer).order_by(Engineer.name).all(),
+ [Engineer(name='dilbert'), Engineer(name='vlad'),
+ Engineer(name='wally')])
+ eq_(sess.query(Manager).all(), [Manager(name='dogbert')])
+
def _produce_test(inline, stringbased):
+
class ExplicitJoinTest(MappedTest):
-
+
@classmethod
def define_tables(cls, metadata):
global User, Address
Base = decl.declarative_base(metadata=metadata)
class User(Base, ComparableEntity):
+
__tablename__ = 'users'
id = Column(Integer, primary_key=True,
- test_needs_autoincrement=True)
+ test_needs_autoincrement=True)
name = Column(String(50))
-
+
class Address(Base, ComparableEntity):
+
__tablename__ = 'addresses'
id = Column(Integer, primary_key=True,
- test_needs_autoincrement=True)
+ test_needs_autoincrement=True)
email = Column(String(50))
user_id = Column(Integer, ForeignKey('users.id'))
if inline:
if stringbased:
- user = relationship("User",
- primaryjoin="User.id==Address.user_id",
- backref="addresses")
+ user = relationship('User',
+ primaryjoin='User.id==Address.user_id',
+ backref='addresses')
else:
- user = relationship(User,
- primaryjoin=User.id==user_id,
- backref="addresses")
-
+ user = relationship(User, primaryjoin=User.id
+ == user_id, backref='addresses')
+
if not inline:
compile_mappers()
if stringbased:
- Address.user = relationship("User",
- primaryjoin="User.id==Address.user_id",
- backref="addresses")
+ Address.user = relationship('User',
+ primaryjoin='User.id==Address.user_id',
+ backref='addresses')
else:
- Address.user = relationship(User,
- primaryjoin=User.id==Address.user_id,
- backref="addresses")
+ Address.user = relationship(User,
+ primaryjoin=User.id == Address.user_id,
+ backref='addresses')
@classmethod
def insert_data(cls):
- params = [dict(zip(('id', 'name'), column_values))
- for column_values in
- [(7, 'jack'),
- (8, 'ed'),
- (9, 'fred'),
- (10, 'chuck')]
- ]
+ params = [dict(zip(('id', 'name'), column_values))
+ for column_values in [(7, 'jack'), (8, 'ed'), (9,
+ 'fred'), (10, 'chuck')]]
User.__table__.insert().execute(params)
-
- Address.__table__.insert().execute(
- [dict(zip(('id', 'user_id', 'email'), column_values))
- for column_values in
- [(1, 7, "jack@bean.com"),
- (2, 8, "ed@wood.com"),
- (3, 8, "ed@bettyboop.com"),
- (4, 8, "ed@lala.com"),
- (5, 9, "fred@fred.com")]
- ]
- )
-
+ Address.__table__.insert().execute([dict(zip(('id',
+ 'user_id', 'email'), column_values))
+ for column_values in [(1, 7, 'jack@bean.com'), (2,
+ 8, 'ed@wood.com'), (3, 8, 'ed@bettyboop.com'), (4,
+ 8, 'ed@lala.com'), (5, 9, 'fred@fred.com')]])
+
def test_aliased_join(self):
- # this query will screw up if the aliasing
- # enabled in query.join() gets applied to the right half of the
- # join condition inside the any().
- # the join condition inside of any() comes from the
- # "primaryjoin" of the relationship,
- # and should not be annotated with _orm_adapt.
- # PropertyLoader.Comparator will annotate
- # the left side with _orm_adapt, though.
+
+ # this query will screw up if the aliasing enabled in
+ # query.join() gets applied to the right half of the join
+ # condition inside the any(). the join condition inside of
+ # any() comes from the "primaryjoin" of the relationship,
+ # and should not be annotated with _orm_adapt.
+ # PropertyLoader.Comparator will annotate the left side with
+ # _orm_adapt, though.
+
sess = create_session()
- eq_(
- sess.query(User).join(User.addresses, aliased=True).
- filter(Address.email=='ed@wood.com').
- filter(User.addresses.any(Address.email=='jack@bean.com')).all(),
- []
- )
-
- ExplicitJoinTest.__name__ = "ExplicitJoinTest%s%s" % \
- (inline and 'Inline' or 'Separate',
- stringbased and 'String' or 'Literal')
+ eq_(sess.query(User).join(User.addresses,
+ aliased=True).filter(Address.email == 'ed@wood.com'
+ ).filter(User.addresses.any(Address.email
+ == 'jack@bean.com')).all(), [])
+
+ ExplicitJoinTest.__name__ = 'ExplicitJoinTest%s%s' % (inline
+ and 'Inline' or 'Separate', stringbased and 'String'
+ or 'Literal')
return ExplicitJoinTest
-for inline in (True, False):
- for stringbased in (True, False):
+for inline in True, False:
+ for stringbased in True, False:
testclass = _produce_test(inline, stringbased)
- exec("%s = testclass" % testclass.__name__)
+ exec '%s = testclass' % testclass.__name__
del testclass
-
+
class DeclarativeReflectionTest(testing.TestBase):
+
@classmethod
def setup_class(cls):
global reflection_metadata
reflection_metadata = MetaData(testing.db)
-
- Table('users', reflection_metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
- Column('name', String(50)),
- test_needs_fk=True)
- Table('addresses', reflection_metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
- Column('email', String(50)),
- Column('user_id', Integer, ForeignKey('users.id')),
- test_needs_fk=True)
- Table('imhandles', reflection_metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
- Column('user_id', Integer),
- Column('network', String(50)),
- Column('handle', String(50)),
- test_needs_fk=True)
-
+ Table('users', reflection_metadata, Column('id', Integer,
+ primary_key=True, test_needs_autoincrement=True),
+ Column('name', String(50)), test_needs_fk=True)
+ Table(
+ 'addresses',
+ reflection_metadata,
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('email', String(50)),
+ Column('user_id', Integer, ForeignKey('users.id')),
+ test_needs_fk=True,
+ )
+ Table(
+ 'imhandles',
+ reflection_metadata,
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('user_id', Integer),
+ Column('network', String(50)),
+ Column('handle', String(50)),
+ test_needs_fk=True,
+ )
reflection_metadata.create_all()
def setup(self):
@@ -1781,33 +1915,30 @@ class DeclarativeReflectionTest(testing.TestBase):
meta = MetaData(testing.db)
class User(Base, ComparableEntity):
+
__tablename__ = 'users'
__autoload__ = True
if testing.against('oracle', 'firebird'):
- id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
- addresses = relationship("Address", backref="user")
+ id = Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True)
+ addresses = relationship('Address', backref='user')
class Address(Base, ComparableEntity):
+
__tablename__ = 'addresses'
__autoload__ = True
-
if testing.against('oracle', 'firebird'):
- id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
-
- u1 = User(name='u1', addresses=[
- Address(email='one'),
- Address(email='two'),
- ])
+ id = Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True)
+
+ u1 = User(name='u1', addresses=[Address(email='one'),
+ Address(email='two')])
sess = create_session()
sess.add(u1)
sess.flush()
sess.expunge_all()
-
- eq_(sess.query(User).all(), [User(name='u1', addresses=[
- Address(email='one'),
- Address(email='two'),
- ])])
-
+ eq_(sess.query(User).all(), [User(name='u1',
+ addresses=[Address(email='one'), Address(email='two')])])
a1 = sess.query(Address).filter(Address.email == 'two').one()
eq_(a1, Address(email='two'))
eq_(a1.user, User(name='u1'))
@@ -1816,218 +1947,226 @@ class DeclarativeReflectionTest(testing.TestBase):
meta = MetaData(testing.db)
class User(Base, ComparableEntity):
+
__tablename__ = 'users'
__autoload__ = True
if testing.against('oracle', 'firebird'):
- id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
+ id = Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True)
nom = Column('name', String(50), key='nom')
- addresses = relationship("Address", backref="user")
+ addresses = relationship('Address', backref='user')
class Address(Base, ComparableEntity):
+
__tablename__ = 'addresses'
__autoload__ = True
if testing.against('oracle', 'firebird'):
- id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
+ id = Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True)
- u1 = User(nom='u1', addresses=[
- Address(email='one'),
- Address(email='two'),
- ])
+ u1 = User(nom='u1', addresses=[Address(email='one'),
+ Address(email='two')])
sess = create_session()
sess.add(u1)
sess.flush()
sess.expunge_all()
-
- eq_(sess.query(User).all(), [User(nom='u1', addresses=[
- Address(email='one'),
- Address(email='two'),
- ])])
-
+ eq_(sess.query(User).all(), [User(nom='u1',
+ addresses=[Address(email='one'), Address(email='two')])])
a1 = sess.query(Address).filter(Address.email == 'two').one()
eq_(a1, Address(email='two'))
eq_(a1.user, User(nom='u1'))
-
assert_raises(TypeError, User, name='u3')
def test_supplied_fk(self):
meta = MetaData(testing.db)
class IMHandle(Base, ComparableEntity):
+
__tablename__ = 'imhandles'
__autoload__ = True
if testing.against('oracle', 'firebird'):
- id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
+ id = Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True)
+ user_id = Column('user_id', Integer, ForeignKey('users.id'))
- user_id = Column('user_id', Integer,
- ForeignKey('users.id'))
class User(Base, ComparableEntity):
+
__tablename__ = 'users'
__autoload__ = True
if testing.against('oracle', 'firebird'):
- id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
- handles = relationship("IMHandle", backref="user")
+ id = Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True)
+ handles = relationship('IMHandle', backref='user')
- u1 = User(name='u1', handles=[
- IMHandle(network='blabber', handle='foo'),
- IMHandle(network='lol', handle='zomg')
- ])
+ u1 = User(name='u1', handles=[IMHandle(network='blabber',
+ handle='foo'), IMHandle(network='lol', handle='zomg'
+ )])
sess = create_session()
sess.add(u1)
sess.flush()
sess.expunge_all()
-
- eq_(sess.query(User).all(), [User(name='u1', handles=[
- IMHandle(network='blabber', handle='foo'),
- IMHandle(network='lol', handle='zomg')
- ])])
-
- a1 = sess.query(IMHandle).filter(IMHandle.handle == 'zomg').one()
+ eq_(sess.query(User).all(), [User(name='u1',
+ handles=[IMHandle(network='blabber', handle='foo'),
+ IMHandle(network='lol', handle='zomg')])])
+ a1 = sess.query(IMHandle).filter(IMHandle.handle == 'zomg'
+ ).one()
eq_(a1, IMHandle(network='lol', handle='zomg'))
eq_(a1.user, User(name='u1'))
class DeclarativeMixinTest(DeclarativeTestBase):
-
+
def test_simple(self):
class MyMixin(object):
- id = Column(Integer, primary_key=True,
- test_needs_autoincrement=True)
+
+ id = Column(Integer, primary_key=True,
+ test_needs_autoincrement=True)
def foo(self):
- return 'bar'+str(self.id)
+ return 'bar' + str(self.id)
- class MyModel(Base,MyMixin):
- __tablename__='test'
+ class MyModel(Base, MyMixin):
+
+ __tablename__ = 'test'
name = Column(String(100), nullable=False, index=True)
Base.metadata.create_all()
-
session = create_session()
session.add(MyModel(name='testing'))
session.flush()
session.expunge_all()
-
obj = session.query(MyModel).one()
- eq_(obj.id,1)
- eq_(obj.name,'testing')
- eq_(obj.foo(),'bar1')
+ eq_(obj.id, 1)
+ eq_(obj.name, 'testing')
+ eq_(obj.foo(), 'bar1')
-
def test_unique_column(self):
-
+
class MyMixin(object):
+
id = Column(Integer, primary_key=True)
value = Column(String, unique=True)
class MyModel(Base, MyMixin):
+
__tablename__ = 'test'
-
+
assert MyModel.__table__.c.value.unique
-
+
def test_hierarchical_bases(self):
class MyMixinParent:
- id = Column(Integer, primary_key=True,
- test_needs_autoincrement=True)
+
+ id = Column(Integer, primary_key=True,
+ test_needs_autoincrement=True)
def foo(self):
- return 'bar'+str(self.id)
+ return 'bar' + str(self.id)
class MyMixin(MyMixinParent):
+
baz = Column(String(100), nullable=False, index=True)
- class MyModel(Base,MyMixin):
- __tablename__='test'
+ class MyModel(Base, MyMixin):
+
+ __tablename__ = 'test'
name = Column(String(100), nullable=False, index=True)
Base.metadata.create_all()
-
session = create_session()
session.add(MyModel(name='testing', baz='fu'))
session.flush()
session.expunge_all()
-
obj = session.query(MyModel).one()
- eq_(obj.id,1)
- eq_(obj.name,'testing')
- eq_(obj.foo(),'bar1')
- eq_(obj.baz,'fu')
-
+ eq_(obj.id, 1)
+ eq_(obj.name, 'testing')
+ eq_(obj.foo(), 'bar1')
+ eq_(obj.baz, 'fu')
+
def test_not_allowed(self):
+
class MyMixin:
foo = Column(Integer, ForeignKey('bar.id'))
-
+
def go():
class MyModel(Base, MyMixin):
__tablename__ = 'foo'
-
+
assert_raises(sa.exc.InvalidRequestError, go)
-
+
class MyRelMixin:
- foo = relationship("Bar")
+ foo = relationship('Bar')
+
def go():
class MyModel(Base, MyRelMixin):
+
__tablename__ = 'foo'
+
assert_raises(sa.exc.InvalidRequestError, go)
-
+
class MyDefMixin:
foo = deferred(Column('foo', String))
+
def go():
class MyModel(Base, MyDefMixin):
__tablename__ = 'foo'
+
assert_raises(sa.exc.InvalidRequestError, go)
class MyCPropMixin:
foo = column_property(Column('foo', String))
+
def go():
class MyModel(Base, MyCPropMixin):
__tablename__ = 'foo'
+
assert_raises(sa.exc.InvalidRequestError, go)
-
+
def test_table_name_inherited(self):
-
+
class MyMixin:
@classproperty
def __tablename__(cls):
return cls.__name__.lower()
- id = Column(Integer, primary_key=True)
+ id = Column(Integer, primary_key=True)
- class MyModel(Base,MyMixin):
+ class MyModel(Base, MyMixin):
pass
- eq_(MyModel.__table__.name,'mymodel')
-
+ eq_(MyModel.__table__.name, 'mymodel')
+
def test_table_name_not_inherited(self):
-
+
class MyMixin:
@classproperty
def __tablename__(cls):
return cls.__name__.lower()
- id = Column(Integer, primary_key=True)
+ id = Column(Integer, primary_key=True)
- class MyModel(Base,MyMixin):
+ class MyModel(Base, MyMixin):
__tablename__ = 'overridden'
- eq_(MyModel.__table__.name,'overridden')
-
+ eq_(MyModel.__table__.name, 'overridden')
+
def test_table_name_inheritance_order(self):
-
+
class MyMixin1:
@classproperty
def __tablename__(cls):
- return cls.__name__.lower()+'1'
+ return cls.__name__.lower() + '1'
class MyMixin2:
@classproperty
def __tablename__(cls):
- return cls.__name__.lower()+'2'
-
- class MyModel(Base,MyMixin1,MyMixin2):
- id = Column(Integer, primary_key=True)
+ return cls.__name__.lower() + '2'
+
+ class MyModel(Base, MyMixin1, MyMixin2):
+ id = Column(Integer, primary_key=True)
+
+ eq_(MyModel.__table__.name, 'mymodel1')
- eq_(MyModel.__table__.name,'mymodel1')
-
def test_table_name_dependent_on_subclass(self):
+
class MyHistoryMixin:
@classproperty
def __tablename__(cls):
@@ -2036,72 +2175,73 @@ class DeclarativeMixinTest(DeclarativeTestBase):
class MyModel(Base, MyHistoryMixin):
parent_name = 'foo'
id = Column(Integer, primary_key=True)
-
+
eq_(MyModel.__table__.name, 'foo_changelog')
-
+
def test_table_args_inherited(self):
-
+
class MyMixin:
- __table_args__ = {'mysql_engine':'InnoDB'}
+ __table_args__ = {'mysql_engine': 'InnoDB'}
- class MyModel(Base,MyMixin):
- __tablename__='test'
- id = Column(Integer, primary_key=True)
+ class MyModel(Base, MyMixin):
+ __tablename__ = 'test'
+ id = Column(Integer, primary_key=True)
+
+ eq_(MyModel.__table__.kwargs, {'mysql_engine': 'InnoDB'})
- eq_(MyModel.__table__.kwargs,{'mysql_engine': 'InnoDB'})
-
def test_table_args_inherited_descriptor(self):
-
+
class MyMixin:
@classproperty
def __table_args__(cls):
- return {'info':cls.__name__}
+ return {'info': cls.__name__}
+
+ class MyModel(Base, MyMixin):
+ __tablename__ = 'test'
+ id = Column(Integer, primary_key=True)
- class MyModel(Base,MyMixin):
- __tablename__='test'
- id = Column(Integer, primary_key=True)
+ eq_(MyModel.__table__.info, 'MyModel')
- eq_(MyModel.__table__.info,'MyModel')
-
def test_table_args_inherited_single_table_inheritance(self):
-
+
class MyMixin:
- __table_args__ = {'mysql_engine':'InnoDB'}
+ __table_args__ = {'mysql_engine': 'InnoDB'}
- class General(Base,MyMixin):
- __tablename__='test'
- id = Column(Integer, primary_key=True)
+ class General(Base, MyMixin):
+ __tablename__ = 'test'
+ id = Column(Integer, primary_key=True)
type_ = Column(String(50))
- __mapper__args = {'polymorphic_on':type_}
+ __mapper__args = {'polymorphic_on': type_}
class Specific(General):
- __mapper_args__ = {'polymorphic_identity':'specific'}
+ __mapper_args__ = {'polymorphic_identity': 'specific'}
assert Specific.__table__ is General.__table__
- eq_(General.__table__.kwargs,{'mysql_engine': 'InnoDB'})
-
+ eq_(General.__table__.kwargs, {'mysql_engine': 'InnoDB'})
+
def test_table_args_overridden(self):
-
+
class MyMixin:
- __table_args__ = {'mysql_engine':'Foo'}
+ __table_args__ = {'mysql_engine': 'Foo'}
+
+ class MyModel(Base, MyMixin):
+ __tablename__ = 'test'
+ __table_args__ = {'mysql_engine': 'InnoDB'}
+ id = Column(Integer, primary_key=True)
- class MyModel(Base,MyMixin):
- __tablename__='test'
- __table_args__ = {'mysql_engine':'InnoDB'}
- id = Column(Integer, primary_key=True)
+ eq_(MyModel.__table__.kwargs, {'mysql_engine': 'InnoDB'})
- eq_(MyModel.__table__.kwargs,{'mysql_engine': 'InnoDB'})
-
def test_mapper_args_classproperty(self):
+
class ComputedMapperArgs:
@classproperty
def __mapper_args__(cls):
- if cls.__name__=='Person':
- return {'polymorphic_on':cls.discriminator}
+ if cls.__name__ == 'Person':
+ return {'polymorphic_on': cls.discriminator}
else:
- return {'polymorphic_identity':cls.__name__}
+ return {'polymorphic_identity': cls.__name__}
- class Person(Base,ComputedMapperArgs):
+ class Person(Base, ComputedMapperArgs):
__tablename__ = 'people'
id = Column(Integer, primary_key=True)
discriminator = Column('type', String(50))
@@ -2110,24 +2250,25 @@ class DeclarativeMixinTest(DeclarativeTestBase):
pass
compile_mappers()
-
- assert class_mapper(Person).polymorphic_on is Person.__table__.c.type
+ assert class_mapper(Person).polymorphic_on \
+ is Person.__table__.c.type
eq_(class_mapper(Engineer).polymorphic_identity, 'Engineer')
def test_mapper_args_classproperty_two(self):
- # same as test_mapper_args_classproperty, but
- # we repeat ComputedMapperArgs on both classes
- # for no apparent reason.
-
+
+ # same as test_mapper_args_classproperty, but we repeat
+ # ComputedMapperArgs on both classes for no apparent reason.
+
class ComputedMapperArgs:
@classproperty
def __mapper_args__(cls):
- if cls.__name__=='Person':
- return {'polymorphic_on':cls.discriminator}
+ if cls.__name__ == 'Person':
+ return {'polymorphic_on': cls.discriminator}
else:
- return {'polymorphic_identity':cls.__name__}
+ return {'polymorphic_identity': cls.__name__}
+
+ class Person(Base, ComputedMapperArgs):
- class Person(Base,ComputedMapperArgs):
__tablename__ = 'people'
id = Column(Integer, primary_key=True)
discriminator = Column('type', String(50))
@@ -2136,20 +2277,23 @@ class DeclarativeMixinTest(DeclarativeTestBase):
pass
compile_mappers()
-
- assert class_mapper(Person).polymorphic_on is Person.__table__.c.type
+ assert class_mapper(Person).polymorphic_on \
+ is Person.__table__.c.type
eq_(class_mapper(Engineer).polymorphic_identity, 'Engineer')
-
+
def test_table_args_composite(self):
class MyMixin1:
- __table_args__ = {'info':{'baz':'bob'}}
+
+ __table_args__ = {'info': {'baz': 'bob'}}
class MyMixin2:
- __table_args__ = {'info':{'foo':'bar'}}
- class MyModel(Base,MyMixin1,MyMixin2):
- __tablename__='test'
+ __table_args__ = {'info': {'foo': 'bar'}}
+
+ class MyModel(Base, MyMixin1, MyMixin2):
+
+ __tablename__ = 'test'
@classproperty
def __table_args__(self):
@@ -2158,80 +2302,88 @@ class DeclarativeMixinTest(DeclarativeTestBase):
info.update(MyMixin1.__table_args__['info'])
info.update(MyMixin2.__table_args__['info'])
return args
-
- id = Column(Integer, primary_key=True)
+ id = Column(Integer, primary_key=True)
+
+ eq_(MyModel.__table__.info, {'foo': 'bar', 'baz': 'bob'})
- eq_(MyModel.__table__.info,{
- 'foo': 'bar',
- 'baz': 'bob',
- })
-
def test_mapper_args_inherited(self):
-
+
class MyMixin:
- __mapper_args__ = {'always_refresh':True}
- class MyModel(Base,MyMixin):
- __tablename__='test'
- id = Column(Integer, primary_key=True)
+ __mapper_args__ = {'always_refresh': True}
+
+ class MyModel(Base, MyMixin):
+
+ __tablename__ = 'test'
+ id = Column(Integer, primary_key=True)
+
+ eq_(MyModel.__mapper__.always_refresh, True)
- eq_(MyModel.__mapper__.always_refresh,True)
-
-
def test_mapper_args_inherited_descriptor(self):
-
+
class MyMixin:
+
@classproperty
def __mapper_args__(cls):
+
# tenuous, but illustrates the problem!
- if cls.__name__=='MyModel':
+
+ if cls.__name__ == 'MyModel':
return dict(always_refresh=True)
else:
return dict(always_refresh=False)
- class MyModel(Base,MyMixin):
- __tablename__='test'
- id = Column(Integer, primary_key=True)
+ class MyModel(Base, MyMixin):
+
+ __tablename__ = 'test'
+ id = Column(Integer, primary_key=True)
+
+ eq_(MyModel.__mapper__.always_refresh, True)
- eq_(MyModel.__mapper__.always_refresh,True)
-
def test_mapper_args_polymorphic_on_inherited(self):
class MyMixin:
+
type_ = Column(String(50))
- __mapper_args__= {'polymorphic_on':type_}
+ __mapper_args__ = {'polymorphic_on': type_}
- class MyModel(Base,MyMixin):
- __tablename__='test'
- id = Column(Integer, primary_key=True)
+ class MyModel(Base, MyMixin):
+
+ __tablename__ = 'test'
+ id = Column(Integer, primary_key=True)
col = MyModel.__mapper__.polymorphic_on
- eq_(col.name,'type_')
+ eq_(col.name, 'type_')
assert col.table is not None
def test_mapper_args_overridden(self):
-
+
class MyMixin:
- __mapper_args__=dict(always_refresh=True)
- class MyModel(Base,MyMixin):
- __tablename__='test'
- __mapper_args__=dict(always_refresh=False)
- id = Column(Integer, primary_key=True)
+ __mapper_args__ = dict(always_refresh=True)
+
+ class MyModel(Base, MyMixin):
+
+ __tablename__ = 'test'
+ __mapper_args__ = dict(always_refresh=False)
+ id = Column(Integer, primary_key=True)
- eq_(MyModel.__mapper__.always_refresh,False)
+ eq_(MyModel.__mapper__.always_refresh, False)
def test_mapper_args_composite(self):
class MyMixin1:
+
type_ = Column(String(50))
- __mapper_args__ = {'polymorphic_on':type_}
+ __mapper_args__ = {'polymorphic_on': type_}
class MyMixin2:
- __mapper_args__ = {'always_refresh':True}
- class MyModel(Base,MyMixin1,MyMixin2):
- __tablename__='test'
+ __mapper_args__ = {'always_refresh': True}
+
+ class MyModel(Base, MyMixin1, MyMixin2):
+
+ __tablename__ = 'test'
@classproperty
def __mapper_args__(self):
@@ -2239,99 +2391,107 @@ class DeclarativeMixinTest(DeclarativeTestBase):
args.update(MyMixin1.__mapper_args__)
args.update(MyMixin2.__mapper_args__)
return args
-
- id = Column(Integer, primary_key=True)
-
+ id = Column(Integer, primary_key=True)
+
col = MyModel.__mapper__.polymorphic_on
- eq_(col.name,'type_')
+ eq_(col.name, 'type_')
assert col.table is not None
-
- eq_(MyModel.__mapper__.always_refresh,True)
+ eq_(MyModel.__mapper__.always_refresh, True)
def test_single_table_no_propagation(self):
class IdColumn:
+
id = Column(Integer, primary_key=True)
class Generic(Base, IdColumn):
+
__tablename__ = 'base'
discriminator = Column('type', String(50))
- __mapper_args__= dict(polymorphic_on=discriminator)
+ __mapper_args__ = dict(polymorphic_on=discriminator)
value = Column(Integer())
class Specific(Generic):
+
__mapper_args__ = dict(polymorphic_identity='specific')
assert Specific.__table__ is Generic.__table__
- eq_(Generic.__table__.c.keys(),['id', 'type', 'value'])
- assert class_mapper(Specific).polymorphic_on is \
- Generic.__table__.c.type
+ eq_(Generic.__table__.c.keys(), ['id', 'type', 'value'])
+ assert class_mapper(Specific).polymorphic_on \
+ is Generic.__table__.c.type
eq_(class_mapper(Specific).polymorphic_identity, 'specific')
def test_joined_table_propagation(self):
class CommonMixin:
-
+
@classproperty
def __tablename__(cls):
return cls.__name__.lower()
-
- __table_args__ = {'mysql_engine':'InnoDB'}
-
- timestamp = Column(Integer)
+ __table_args__ = {'mysql_engine': 'InnoDB'}
+ timestamp = Column(Integer)
id = Column(Integer, primary_key=True)
-
+
class Generic(Base, CommonMixin):
+
discriminator = Column('python_type', String(50))
- __mapper_args__= dict(polymorphic_on=discriminator)
+ __mapper_args__ = dict(polymorphic_on=discriminator)
class Specific(Generic):
+
__mapper_args__ = dict(polymorphic_identity='specific')
- id = Column(Integer, ForeignKey('generic.id'), primary_key=True)
- eq_(Generic.__table__.name,'generic')
- eq_(Specific.__table__.name,'specific')
- eq_(Generic.__table__.c.keys(),['timestamp', 'id', 'python_type'])
- eq_(Specific.__table__.c.keys(),['timestamp', 'id'])
- eq_(Generic.__table__.kwargs,{'mysql_engine': 'InnoDB'})
- eq_(Specific.__table__.kwargs,{'mysql_engine': 'InnoDB'})
-
+ id = Column(Integer, ForeignKey('generic.id'),
+ primary_key=True)
+
+ eq_(Generic.__table__.name, 'generic')
+ eq_(Specific.__table__.name, 'specific')
+ eq_(Generic.__table__.c.keys(), ['timestamp', 'id',
+ 'python_type'])
+ eq_(Specific.__table__.c.keys(), ['timestamp', 'id'])
+ eq_(Generic.__table__.kwargs, {'mysql_engine': 'InnoDB'})
+ eq_(Specific.__table__.kwargs, {'mysql_engine': 'InnoDB'})
+
def test_some_propagation(self):
-
+
class CommonMixin:
+
@classproperty
def __tablename__(cls):
return cls.__name__.lower()
- __table_args__ = {'mysql_engine':'InnoDB'}
- timestamp = Column(Integer)
+ __table_args__ = {'mysql_engine': 'InnoDB'}
+ timestamp = Column(Integer)
class BaseType(Base, CommonMixin):
+
discriminator = Column('type', String(50))
- __mapper_args__= dict(polymorphic_on=discriminator)
- id = Column(Integer, primary_key=True)
- value = Column(Integer())
+ __mapper_args__ = dict(polymorphic_on=discriminator)
+ id = Column(Integer, primary_key=True)
+ value = Column(Integer())
class Single(BaseType):
+
__tablename__ = None
__mapper_args__ = dict(polymorphic_identity='type1')
class Joined(BaseType):
- __mapper_args__ = dict(polymorphic_identity='type2')
- id = Column(Integer, ForeignKey('basetype.id'), primary_key=True)
- eq_(BaseType.__table__.name,'basetype')
- eq_(BaseType.__table__.c.keys(),
- ['timestamp', 'type', 'id', 'value', ])
- eq_(BaseType.__table__.kwargs,{'mysql_engine': 'InnoDB'})
+ __mapper_args__ = dict(polymorphic_identity='type2')
+ id = Column(Integer, ForeignKey('basetype.id'),
+ primary_key=True)
+ eq_(BaseType.__table__.name, 'basetype')
+ eq_(BaseType.__table__.c.keys(), ['timestamp', 'type', 'id',
+ 'value'])
+ eq_(BaseType.__table__.kwargs, {'mysql_engine': 'InnoDB'})
assert Single.__table__ is BaseType.__table__
+ eq_(Joined.__table__.name, 'joined')
+ eq_(Joined.__table__.c.keys(), ['timestamp', 'id'])
+ eq_(Joined.__table__.kwargs, {'mysql_engine': 'InnoDB'})
- eq_(Joined.__table__.name,'joined')
- eq_(Joined.__table__.c.keys(),['timestamp','id'])
- eq_(Joined.__table__.kwargs,{'mysql_engine': 'InnoDB'})
-
def test_non_propagating_mixin(self):
class NoJoinedTableNameMixin:
+
@classproperty
def __tablename__(cls):
if decl.has_inherited_table(cls):
@@ -2339,113 +2499,118 @@ class DeclarativeMixinTest(DeclarativeTestBase):
return cls.__name__.lower()
class BaseType(Base, NoJoinedTableNameMixin):
+
discriminator = Column('type', String(50))
- __mapper_args__= dict(polymorphic_on=discriminator)
- id = Column(Integer, primary_key=True)
- value = Column(Integer())
+ __mapper_args__ = dict(polymorphic_on=discriminator)
+ id = Column(Integer, primary_key=True)
+ value = Column(Integer())
class Specific(BaseType):
- __mapper_args__ = dict(polymorphic_identity='specific')
- eq_(BaseType.__table__.name,'basetype')
- eq_(BaseType.__table__.c.keys(),['type', 'id', 'value'])
+ __mapper_args__ = dict(polymorphic_identity='specific')
+ eq_(BaseType.__table__.name, 'basetype')
+ eq_(BaseType.__table__.c.keys(), ['type', 'id', 'value'])
assert Specific.__table__ is BaseType.__table__
- assert class_mapper(Specific).polymorphic_on is\
- BaseType.__table__.c.type
+ assert class_mapper(Specific).polymorphic_on \
+ is BaseType.__table__.c.type
eq_(class_mapper(Specific).polymorphic_identity, 'specific')
def test_non_propagating_mixin_used_for_joined(self):
class TableNameMixin:
+
@classproperty
def __tablename__(cls):
- if (decl.has_inherited_table(cls) and
- TableNameMixin not in cls.__bases__):
+ if decl.has_inherited_table(cls) and TableNameMixin \
+ not in cls.__bases__:
return None
return cls.__name__.lower()
class BaseType(Base, TableNameMixin):
+
discriminator = Column('type', String(50))
- __mapper_args__= dict(polymorphic_on=discriminator)
- id = Column(Integer, primary_key=True)
- value = Column(Integer())
+ __mapper_args__ = dict(polymorphic_on=discriminator)
+ id = Column(Integer, primary_key=True)
+ value = Column(Integer())
class Specific(BaseType, TableNameMixin):
+
__mapper_args__ = dict(polymorphic_identity='specific')
- id = Column(Integer, ForeignKey('basetype.id'), primary_key=True)
-
- eq_(BaseType.__table__.name,'basetype')
- eq_(BaseType.__table__.c.keys(),['type', 'id', 'value'])
- eq_(Specific.__table__.name,'specific')
- eq_(Specific.__table__.c.keys(),['id'])
+ id = Column(Integer, ForeignKey('basetype.id'),
+ primary_key=True)
+
+ eq_(BaseType.__table__.name, 'basetype')
+ eq_(BaseType.__table__.c.keys(), ['type', 'id', 'value'])
+ eq_(Specific.__table__.name, 'specific')
+ eq_(Specific.__table__.c.keys(), ['id'])
def test_single_back_propagate(self):
class ColumnMixin:
- timestamp = Column(Integer)
+
+ timestamp = Column(Integer)
class BaseType(Base):
+
__tablename__ = 'foo'
discriminator = Column('type', String(50))
- __mapper_args__= dict(polymorphic_on=discriminator)
- id = Column(Integer, primary_key=True)
+ __mapper_args__ = dict(polymorphic_on=discriminator)
+ id = Column(Integer, primary_key=True)
+
+ class Specific(BaseType, ColumnMixin):
- class Specific(BaseType,ColumnMixin):
__mapper_args__ = dict(polymorphic_identity='specific')
-
- eq_(BaseType.__table__.c.keys(),['type', 'id', 'timestamp'])
+
+ eq_(BaseType.__table__.c.keys(), ['type', 'id', 'timestamp'])
def test_table_in_model_and_same_column_in_mixin(self):
-
+
class ColumnMixin:
+
data = Column(Integer)
-
+
class Model(Base, ColumnMixin):
- __table__ = Table(
- 'foo',
- Base.metadata,
- Column('data', Integer),
- Column('id', Integer, primary_key=True)
- )
-
+
+ __table__ = Table('foo', Base.metadata, Column('data',
+ Integer), Column('id', Integer,
+ primary_key=True))
+
model_col = Model.__table__.c.data
mixin_col = ColumnMixin.data
assert model_col is not mixin_col
eq_(model_col.name, 'data')
assert model_col.type.__class__ is mixin_col.type.__class__
-
+
def test_table_in_model_and_different_named_column_in_mixin(self):
-
+
class ColumnMixin:
+
tada = Column(Integer)
-
+
def go():
+
class Model(Base, ColumnMixin):
- __table__ = Table(
- 'foo',
- Base.metadata,
- Column('data', Integer),
- Column('id', Integer, primary_key=True)
- )
-
- assert_raises_message(
- sa.exc.ArgumentError,
- "Can't add additional column 'tada' when specifying __table__",
- go)
-
+
+ __table__ = Table('foo', Base.metadata, Column('data',
+ Integer), Column('id', Integer,
+ primary_key=True))
+
+ assert_raises_message(sa.exc.ArgumentError,
+ "Can't add additional column 'tada' when "
+ "specifying __table__", go)
+
def test_table_in_model_overrides_different_typed_column_in_mixin(self):
-
+
class ColumnMixin:
+
data = Column(String)
-
+
class Model(Base, ColumnMixin):
- __table__ = Table(
- 'foo',
- Base.metadata,
- Column('data', Integer),
- Column('id', Integer, primary_key=True)
- )
+
+ __table__ = Table('foo', Base.metadata, Column('data',
+ Integer), Column('id', Integer,
+ primary_key=True))
model_col = Model.__table__.c.data
mixin_col = ColumnMixin.data
@@ -2454,189 +2619,190 @@ class DeclarativeMixinTest(DeclarativeTestBase):
assert model_col.type.__class__ is Integer
def test_mixin_column_ordering(self):
-
+
class Foo(object):
+
col1 = Column(Integer)
col3 = Column(Integer)
-
+
class Bar(object):
+
col2 = Column(Integer)
col4 = Column(Integer)
-
+
class Model(Base, Foo, Bar):
+
id = Column(Integer, primary_key=True)
__tablename__ = 'model'
- eq_(Model.__table__.c.keys(),
- ['col1', 'col3', 'col2', 'col4', 'id'])
+ eq_(Model.__table__.c.keys(), ['col1', 'col3', 'col2', 'col4',
+ 'id'])
class DeclarativeMixinPropertyTest(DeclarativeTestBase):
+
def test_column_property(self):
+
class MyMixin(object):
+
@classproperty
def prop_hoho(cls):
return column_property(Column('prop', String(50)))
- class MyModel(Base,MyMixin):
+ class MyModel(Base, MyMixin):
+
__tablename__ = 'test'
id = Column(Integer, primary_key=True,
- test_needs_autoincrement=True)
+ test_needs_autoincrement=True)
+
+ class MyOtherModel(Base, MyMixin):
- class MyOtherModel(Base,MyMixin):
__tablename__ = 'othertest'
id = Column(Integer, primary_key=True,
- test_needs_autoincrement=True)
-
- assert MyModel.__table__.c.prop is not None
- assert MyOtherModel.__table__.c.prop is not None
- assert MyModel.__table__.c.prop is not MyOtherModel.__table__.c.prop
-
- assert MyModel.prop_hoho.property.columns == \
- [MyModel.__table__.c.prop]
- assert MyOtherModel.prop_hoho.property.columns == \
- [MyOtherModel.__table__.c.prop]
- assert MyModel.prop_hoho.property is not \
- MyOtherModel.prop_hoho.property
-
+ test_needs_autoincrement=True)
+
+ assert MyModel.__table__.c.prop is not None
+ assert MyOtherModel.__table__.c.prop is not None
+ assert MyModel.__table__.c.prop \
+ is not MyOtherModel.__table__.c.prop
+ assert MyModel.prop_hoho.property.columns \
+ == [MyModel.__table__.c.prop]
+ assert MyOtherModel.prop_hoho.property.columns \
+ == [MyOtherModel.__table__.c.prop]
+ assert MyModel.prop_hoho.property \
+ is not MyOtherModel.prop_hoho.property
Base.metadata.create_all()
sess = create_session()
m1, m2 = MyModel(prop_hoho='foo'), MyOtherModel(prop_hoho='bar')
sess.add_all([m1, m2])
sess.flush()
- eq_(
- sess.query(MyModel).filter(MyModel.prop_hoho=='foo').one(),
- m1
- )
- eq_(
- sess.query(MyOtherModel).\
- filter(MyOtherModel.prop_hoho=='bar').one(),
- m2
- )
-
+ eq_(sess.query(MyModel).filter(MyModel.prop_hoho == 'foo'
+ ).one(), m1)
+ eq_(sess.query(MyOtherModel).filter(MyOtherModel.prop_hoho
+ == 'bar').one(), m2)
+
def test_doc(self):
"""test documentation transfer.
- the documentation situation with @classproperty is
- problematic. at least see if mapped subclasses
- get the doc.
+ the documentation situation with @classproperty is problematic.
+ at least see if mapped subclasses get the doc.
"""
class MyMixin(object):
+
@classproperty
def type_(cls):
"""this is a document."""
+
return Column(String(50))
@classproperty
def t2(cls):
"""this is another document."""
+
return column_property(Column(String(50)))
-
- class MyModel(Base,MyMixin):
- __tablename__='test'
- id = Column(Integer, primary_key=True)
+
+ class MyModel(Base, MyMixin):
+
+ __tablename__ = 'test'
+ id = Column(Integer, primary_key=True)
+
compile_mappers()
- eq_(
- MyModel.type_.__doc__,
- 'this is a document.'
- )
- eq_(
- MyModel.t2.__doc__,
- 'this is another document.'
- )
-
+ eq_(MyModel.type_.__doc__, """this is a document.""")
+ eq_(MyModel.t2.__doc__, """this is another document.""")
+
def test_column_in_mapper_args(self):
+
class MyMixin(object):
+
@classproperty
def type_(cls):
return Column(String(50))
+ __mapper_args__ = {'polymorphic_on': type_}
- __mapper_args__= {'polymorphic_on':type_}
+ class MyModel(Base, MyMixin):
+
+ __tablename__ = 'test'
+ id = Column(Integer, primary_key=True)
- class MyModel(Base,MyMixin):
- __tablename__='test'
- id = Column(Integer, primary_key=True)
-
compile_mappers()
col = MyModel.__mapper__.polymorphic_on
- eq_(col.name,'type_')
+ eq_(col.name, 'type_')
assert col.table is not None
-
+
def test_deferred(self):
+
class MyMixin(object):
+
@classproperty
def data(cls):
return deferred(Column('data', String(50)))
-
- class MyModel(Base,MyMixin):
- __tablename__='test'
- id = Column(Integer, primary_key=True,
- test_needs_autoincrement=True)
-
+
+ class MyModel(Base, MyMixin):
+
+ __tablename__ = 'test'
+ id = Column(Integer, primary_key=True,
+ test_needs_autoincrement=True)
+
Base.metadata.create_all()
sess = create_session()
sess.add_all([MyModel(data='d1'), MyModel(data='d2')])
sess.flush()
sess.expunge_all()
-
d1, d2 = sess.query(MyModel).order_by(MyModel.data)
assert 'data' not in d1.__dict__
assert d1.data == 'd1'
assert 'data' in d1.__dict__
-
+
def _test_relationship(self, usestring):
+
class RefTargetMixin(object):
+
@classproperty
def target_id(cls):
return Column('target_id', ForeignKey('target.id'))
-
if usestring:
+
@classproperty
def target(cls):
- return relationship("Target",
- primaryjoin="Target.id==%s.target_id" % cls.__name__
- )
+ return relationship('Target',
+ primaryjoin='Target.id==%s.target_id'
+ % cls.__name__)
else:
+
@classproperty
def target(cls):
- return relationship("Target")
-
+ return relationship('Target')
+
class Foo(Base, RefTargetMixin):
+
__tablename__ = 'foo'
id = Column(Integer, primary_key=True,
- test_needs_autoincrement=True)
-
+ test_needs_autoincrement=True)
+
class Bar(Base, RefTargetMixin):
+
__tablename__ = 'bar'
id = Column(Integer, primary_key=True,
- test_needs_autoincrement=True)
-
+ test_needs_autoincrement=True)
+
class Target(Base):
+
__tablename__ = 'target'
id = Column(Integer, primary_key=True,
- test_needs_autoincrement=True)
-
+ test_needs_autoincrement=True)
+
Base.metadata.create_all()
sess = create_session()
t1, t2 = Target(), Target()
f1, f2, b1 = Foo(target=t1), Foo(target=t2), Bar(target=t1)
sess.add_all([f1, f2, b1])
sess.flush()
-
- eq_(
- sess.query(Foo).filter(Foo.target==t2).one(),
- f2
- )
- eq_(
- sess.query(Bar).filter(Bar.target==t2).first(),
- None
- )
+ eq_(sess.query(Foo).filter(Foo.target == t2).one(), f2)
+ eq_(sess.query(Bar).filter(Bar.target == t2).first(), None)
sess.expire_all()
eq_(f1.target, t1)
-
-
-
+
def test_relationship(self):
self._test_relationship(False)
diff --git a/test/ext/test_horizontal_shard.py b/test/ext/test_horizontal_shard.py
index b21768d1f..66583fb0d 100644
--- a/test/ext/test_horizontal_shard.py
+++ b/test/ext/test_horizontal_shard.py
@@ -40,12 +40,16 @@ class ShardTest(TestBase):
Column('city', String(50), nullable=False)
)
- weather_reports = Table("weather_reports", meta,
+ weather_reports = Table(
+ 'weather_reports',
+ meta,
Column('id', Integer, primary_key=True),
- Column('location_id', Integer, ForeignKey('weather_locations.id')),
+ Column('location_id', Integer,
+ ForeignKey('weather_locations.id')),
Column('temperature', Float),
- Column('report_time', DateTime, default=datetime.datetime.now),
- )
+ Column('report_time', DateTime,
+ default=datetime.datetime.now),
+ )
for db in (db1, db2, db3, db4):
meta.create_all(db)
@@ -65,13 +69,12 @@ class ShardTest(TestBase):
@classmethod
def setup_session(cls):
global create_session
-
shard_lookup = {
- 'North America':'north_america',
- 'Asia':'asia',
- 'Europe':'europe',
- 'South America':'south_america'
- }
+ 'North America': 'north_america',
+ 'Asia': 'asia',
+ 'Europe': 'europe',
+ 'South America': 'south_america',
+ }
def shard_chooser(mapper, instance, clause=None):
if isinstance(instance, WeatherLocation):
@@ -85,29 +88,35 @@ class ShardTest(TestBase):
def query_chooser(query):
ids = []
+
class FindContinent(sql.ClauseVisitor):
+
def visit_binary(self, binary):
- if binary.left.shares_lineage(weather_locations.c.continent):
+ if binary.left.shares_lineage(
+ weather_locations.c.continent):
if binary.operator == operators.eq:
ids.append(shard_lookup[binary.right.value])
elif binary.operator == operators.in_op:
for bind in binary.right.clauses:
ids.append(shard_lookup[bind.value])
+
FindContinent().traverse(query._criterion)
if len(ids) == 0:
- return ['north_america', 'asia', 'europe', 'south_america']
+ return ['north_america', 'asia', 'europe',
+ 'south_america']
else:
return ids
- create_session = sessionmaker(class_=ShardedSession, autoflush=True, autocommit=False)
-
+ create_session = sessionmaker(class_=ShardedSession,
+ autoflush=True, autocommit=False)
create_session.configure(shards={
- 'north_america':db1,
- 'asia':db2,
- 'europe':db3,
- 'south_america':db4
- }, shard_chooser=shard_chooser, id_chooser=id_chooser, query_chooser=query_chooser)
+ 'north_america': db1,
+ 'asia': db2,
+ 'europe': db3,
+ 'south_america': db4,
+ }, shard_chooser=shard_chooser, id_chooser=id_chooser,
+ query_chooser=query_chooser)
@classmethod
@@ -138,31 +147,41 @@ class ShardTest(TestBase):
dublin = WeatherLocation('Europe', 'Dublin')
brasilia = WeatherLocation('South America', 'Brasila')
quito = WeatherLocation('South America', 'Quito')
-
tokyo.reports.append(Report(80.0))
newyork.reports.append(Report(75))
quito.reports.append(Report(85))
-
sess = create_session()
- for c in [tokyo, newyork, toronto, london, dublin, brasilia, quito]:
+ for c in [
+ tokyo,
+ newyork,
+ toronto,
+ london,
+ dublin,
+ brasilia,
+ quito,
+ ]:
sess.add(c)
sess.commit()
- tokyo.city # reload 'city' attribute on tokyo
+ tokyo.city # reload 'city' attribute on tokyo
sess.expunge_all()
-
- eq_(db2.execute(weather_locations.select()).fetchall(), [(1, 'Asia', 'Tokyo')])
- eq_(db1.execute(weather_locations.select()).fetchall(), [(2, 'North America', 'New York'), (3, 'North America', 'Toronto')])
- eq_(sess.execute(weather_locations.select(), shard_id='asia').fetchall(), [(1, 'Asia', 'Tokyo')])
-
+ eq_(db2.execute(weather_locations.select()).fetchall(), [(1,
+ 'Asia', 'Tokyo')])
+ eq_(db1.execute(weather_locations.select()).fetchall(), [(2,
+ 'North America', 'New York'), (3, 'North America', 'Toronto'
+ )])
+ eq_(sess.execute(weather_locations.select(), shard_id='asia'
+ ).fetchall(), [(1, 'Asia', 'Tokyo')])
t = sess.query(WeatherLocation).get(tokyo.id)
eq_(t.city, tokyo.city)
eq_(t.reports[0].temperature, 80.0)
-
- north_american_cities = sess.query(WeatherLocation).filter(WeatherLocation.continent == 'North America')
- eq_(set([c.city for c in north_american_cities]), set(['New York', 'Toronto']))
-
- asia_and_europe = sess.query(WeatherLocation).filter(WeatherLocation.continent.in_(['Europe', 'Asia']))
- eq_(set([c.city for c in asia_and_europe]), set(['Tokyo', 'London', 'Dublin']))
-
-
+ north_american_cities = \
+ sess.query(WeatherLocation).filter(WeatherLocation.continent
+ == 'North America')
+ eq_(set([c.city for c in north_american_cities]),
+ set(['New York', 'Toronto']))
+ asia_and_europe = \
+ sess.query(WeatherLocation).filter(
+ WeatherLocation.continent.in_(['Europe', 'Asia']))
+ eq_(set([c.city for c in asia_and_europe]), set(['Tokyo',
+ 'London', 'Dublin']))
diff --git a/test/ext/test_orderinglist.py b/test/ext/test_orderinglist.py
index 96c1c90cf..559aefd1d 100644
--- a/test/ext/test_orderinglist.py
+++ b/test/ext/test_orderinglist.py
@@ -46,8 +46,8 @@ class OrderingListTest(TestBase):
metadata.clear()
def _setup(self, test_collection_class):
- """Build a relationship situation using the given test_collection_class
- factory"""
+ """Build a relationship situation using the given
+ test_collection_class factory"""
global metadata, slides_table, bullets_table, Slide, Bullet
@@ -382,7 +382,16 @@ class OrderingListTest(TestBase):
fibbed.insert(4, Pos())
fibbed.insert(6, Pos())
- for li, pos in (0,1), (1,2), (2,3), (3,5), (4,8), (5,13), (6,21), (7,34):
+ for li, pos in (
+ (0, 1),
+ (1, 2),
+ (2, 3),
+ (3, 5),
+ (4, 8),
+ (5, 13),
+ (6, 21),
+ (7, 34),
+ ):
self.assert_(fibbed[li].position == pos)
alpha_factory = ordering_list('position',
diff --git a/test/ext/test_serializer.py b/test/ext/test_serializer.py
index 4e26e5b9a..45f55e1c9 100644
--- a/test/ext/test_serializer.py
+++ b/test/ext/test_serializer.py
@@ -3,12 +3,13 @@ from sqlalchemy.ext import serializer
from sqlalchemy import exc
import sqlalchemy as sa
from sqlalchemy.test import testing
-from sqlalchemy import MetaData, Integer, String, ForeignKey, select, desc, func, util
+from sqlalchemy import MetaData, Integer, String, ForeignKey, select, \
+ desc, func, util
from sqlalchemy.test.schema import Table
from sqlalchemy.test.schema import Column
-from sqlalchemy.orm import relationship, sessionmaker, scoped_session, class_mapper, mapper, joinedload, compile_mappers, aliased
+from sqlalchemy.orm import relationship, sessionmaker, scoped_session, \
+ class_mapper, mapper, joinedload, compile_mappers, aliased
from sqlalchemy.test.testing import eq_
-
from test.orm._base import ComparableEntity, MappedTest
@@ -18,125 +19,127 @@ class User(ComparableEntity):
class Address(ComparableEntity):
pass
+
class SerializeTest(MappedTest):
+
run_setup_mappers = 'once'
run_inserts = 'once'
run_deletes = None
-
+
@classmethod
def define_tables(cls, metadata):
global users, addresses
- users = Table('users', metadata,
- Column('id', Integer, primary_key=True),
- Column('name', String(50))
- )
- addresses = Table('addresses', metadata,
- Column('id', Integer, primary_key=True),
- Column('email', String(50)),
- Column('user_id', Integer, ForeignKey('users.id')),
- )
+ users = Table('users', metadata, Column('id', Integer,
+ primary_key=True), Column('name', String(50)))
+ addresses = Table('addresses', metadata, Column('id', Integer,
+ primary_key=True), Column('email',
+ String(50)), Column('user_id', Integer,
+ ForeignKey('users.id')))
@classmethod
def setup_mappers(cls):
global Session
Session = scoped_session(sessionmaker())
-
- mapper(User, users, properties={
- 'addresses':relationship(Address, backref='user', order_by=addresses.c.id)
- })
+ mapper(User, users, properties={'addresses'
+ : relationship(Address, backref='user',
+ order_by=addresses.c.id)})
mapper(Address, addresses)
-
compile_mappers()
-
+
@classmethod
def insert_data(cls):
- params = [dict(zip(('id', 'name'), column_values)) for column_values in
- [(7, 'jack'),
- (8, 'ed'),
- (9, 'fred'),
- (10, 'chuck')]
- ]
+ params = [dict(zip(('id', 'name'), column_values))
+ for column_values in [(7, 'jack'), (8, 'ed'), (9,
+ 'fred'), (10, 'chuck')]]
users.insert().execute(params)
-
- addresses.insert().execute(
- [dict(zip(('id', 'user_id', 'email'), column_values)) for column_values in
- [(1, 7, "jack@bean.com"),
- (2, 8, "ed@wood.com"),
- (3, 8, "ed@bettyboop.com"),
- (4, 8, "ed@lala.com"),
- (5, 9, "fred@fred.com")]
- ]
- )
-
+ addresses.insert().execute([dict(zip(('id', 'user_id', 'email'
+ ), column_values))
+ for column_values in [(1, 7,
+ 'jack@bean.com'), (2, 8,
+ 'ed@wood.com'), (3, 8,
+ 'ed@bettyboop.com'), (4, 8,
+ 'ed@lala.com'), (5, 9,
+ 'fred@fred.com')]])
+
def test_tables(self):
- assert serializer.loads(serializer.dumps(users, -1), users.metadata, Session) is users
+ assert serializer.loads(serializer.dumps(users, -1),
+ users.metadata, Session) is users
def test_columns(self):
- assert serializer.loads(serializer.dumps(users.c.name, -1), users.metadata, Session) is users.c.name
-
+ assert serializer.loads(serializer.dumps(users.c.name, -1),
+ users.metadata, Session) is users.c.name
+
def test_mapper(self):
user_mapper = class_mapper(User)
- assert serializer.loads(serializer.dumps(user_mapper, -1), None, None) is user_mapper
-
+ assert serializer.loads(serializer.dumps(user_mapper, -1),
+ None, None) is user_mapper
+
def test_attribute(self):
- assert serializer.loads(serializer.dumps(User.name, -1), None, None) is User.name
-
+ assert serializer.loads(serializer.dumps(User.name, -1), None,
+ None) is User.name
+
def test_expression(self):
-
- expr = select([users]).select_from(users.join(addresses)).limit(5)
- re_expr = serializer.loads(serializer.dumps(expr, -1), users.metadata, None)
- eq_(
- str(expr),
- str(re_expr)
- )
-
+ expr = \
+ select([users]).select_from(users.join(addresses)).limit(5)
+ re_expr = serializer.loads(serializer.dumps(expr, -1),
+ users.metadata, None)
+ eq_(str(expr), str(re_expr))
assert re_expr.bind is testing.db
- eq_(
- re_expr.execute().fetchall(),
- [(7, u'jack'), (8, u'ed'), (8, u'ed'), (8, u'ed'), (9, u'fred')]
- )
-
+ eq_(re_expr.execute().fetchall(), [(7, u'jack'), (8, u'ed'),
+ (8, u'ed'), (8, u'ed'), (9, u'fred')])
+
def test_query(self):
- q = Session.query(User).filter(User.name=='ed').options(joinedload(User.addresses))
- eq_(q.all(), [User(name='ed', addresses=[Address(id=2), Address(id=3), Address(id=4)])])
-
- q2 = serializer.loads(serializer.dumps(q, -1), users.metadata, Session)
+ q = Session.query(User).filter(User.name == 'ed'
+ ).options(joinedload(User.addresses))
+ eq_(q.all(), [User(name='ed', addresses=[Address(id=2),
+ Address(id=3), Address(id=4)])])
+ q2 = serializer.loads(serializer.dumps(q, -1), users.metadata,
+ Session)
+
def go():
- eq_(q2.all(), [User(name='ed', addresses=[Address(id=2), Address(id=3), Address(id=4)])])
- self.assert_sql_count(testing.db, go, 1)
-
- eq_(q2.join(User.addresses).filter(Address.email=='ed@bettyboop.com').value(func.count('*')), 1)
+ eq_(q2.all(), [User(name='ed', addresses=[Address(id=2),
+ Address(id=3), Address(id=4)])])
+ self.assert_sql_count(testing.db, go, 1)
+ eq_(q2.join(User.addresses).filter(Address.email
+ == 'ed@bettyboop.com').value(func.count('*')), 1)
u1 = Session.query(User).get(8)
-
- q = Session.query(Address).filter(Address.user==u1).order_by(desc(Address.email))
- q2 = serializer.loads(serializer.dumps(q, -1), users.metadata, Session)
-
- eq_(q2.all(), [Address(email='ed@wood.com'), Address(email='ed@lala.com'), Address(email='ed@bettyboop.com')])
-
- q = Session.query(User).join(User.addresses).filter(Address.email.like('%fred%'))
- q2 = serializer.loads(serializer.dumps(q, -1), users.metadata, Session)
+ q = Session.query(Address).filter(Address.user
+ == u1).order_by(desc(Address.email))
+ q2 = serializer.loads(serializer.dumps(q, -1), users.metadata,
+ Session)
+ eq_(q2.all(), [Address(email='ed@wood.com'),
+ Address(email='ed@lala.com'),
+ Address(email='ed@bettyboop.com')])
+ q = \
+ Session.query(User).join(User.addresses).\
+ filter(Address.email.like('%fred%'))
+ q2 = serializer.loads(serializer.dumps(q, -1), users.metadata,
+ Session)
eq_(q2.all(), [User(name='fred')])
-
eq_(list(q2.values(User.id, User.name)), [(9, u'fred')])
- @testing.exclude('sqlite', '<=', (3, 5, 9), 'id comparison failing on the buildbot')
+ @testing.exclude('sqlite', '<=', (3, 5, 9),
+ 'id comparison failing on the buildbot')
def test_aliases(self):
u7, u8, u9, u10 = Session.query(User).order_by(User.id).all()
-
ualias = aliased(User)
- q = Session.query(User, ualias).join((ualias, User.id < ualias.id)).filter(User.id<9).order_by(User.id, ualias.id)
- eq_(list(q.all()), [(u7, u8), (u7, u9), (u7, u10), (u8, u9), (u8, u10)])
-
- q2 = serializer.loads(serializer.dumps(q, -1), users.metadata, Session)
-
- eq_(list(q2.all()), [(u7, u8), (u7, u9), (u7, u10), (u8, u9), (u8, u10)])
+ q = Session.query(User, ualias).join((ualias, User.id
+ < ualias.id)).filter(User.id < 9).order_by(User.id,
+ ualias.id)
+ eq_(list(q.all()), [(u7, u8), (u7, u9), (u7, u10), (u8, u9),
+ (u8, u10)])
+ q2 = serializer.loads(serializer.dumps(q, -1), users.metadata,
+ Session)
+ eq_(list(q2.all()), [(u7, u8), (u7, u9), (u7, u10), (u8, u9),
+ (u8, u10)])
def test_any(self):
- r = User.addresses.any(Address.email=='x')
+ r = User.addresses.any(Address.email == 'x')
ser = serializer.dumps(r, -1)
x = serializer.loads(ser, users.metadata)
eq_(str(r), str(x))
-
+
+
if __name__ == '__main__':
testing.main()
diff --git a/test/ext/test_sqlsoup.py b/test/ext/test_sqlsoup.py
index 9216af96f..7fe8ab178 100644
--- a/test/ext/test_sqlsoup.py
+++ b/test/ext/test_sqlsoup.py
@@ -1,15 +1,14 @@
-#!coding:utf-8
-
from sqlalchemy.ext import sqlsoup
from sqlalchemy.test.testing import TestBase, eq_, assert_raises
-from sqlalchemy import create_engine, or_, desc, select, func, exc, Table,\
- util
+from sqlalchemy import create_engine, or_, desc, select, func, exc, \
+ Table, util
from sqlalchemy.orm import scoped_session, sessionmaker
import datetime
class SQLSoupTest(TestBase):
- __requires__ = ('sqlite', )
+
+ __requires__ = 'sqlite',
@classmethod
def setup_class(cls):
@@ -17,205 +16,198 @@ class SQLSoupTest(TestBase):
engine = create_engine('sqlite://')
for sql in _ddl:
engine.execute(sql)
-
+
@classmethod
def teardown_class(cls):
engine.dispose()
-
+
def setup(self):
for sql in _data:
engine.execute(sql)
-
+
def teardown(self):
sqlsoup.Session.remove()
for sql in _teardown:
engine.execute(sql)
-
+
def test_bad_names(self):
db = sqlsoup.SqlSoup(engine)
-# print db.bad_names.c.id
+
+ # print db.bad_names.c.id
+
print db.bad_names.c.query
-
-
+
def test_load(self):
db = sqlsoup.SqlSoup(engine)
MappedUsers = db.users
users = db.users.all()
users.sort()
- eq_(
- users,
- [
- MappedUsers(
- name=u'Joe Student',
- email=u'student@example.edu',
- password=u'student',classname=None,admin=0),
- MappedUsers(
- name=u'Bhargan Basepair',
- email=u'basepair@example.edu',
- password=u'basepair',classname=None,admin=1)
- ]
- )
-
+ eq_(users, [MappedUsers(name=u'Joe Student',
+ email=u'student@example.edu', password=u'student',
+ classname=None, admin=0),
+ MappedUsers(name=u'Bhargan Basepair',
+ email=u'basepair@example.edu', password=u'basepair',
+ classname=None, admin=1)])
+
def test_order_by(self):
db = sqlsoup.SqlSoup(engine)
MappedUsers = db.users
users = db.users.order_by(db.users.name).all()
- eq_(
- users,
- [
- MappedUsers(
- name=u'Bhargan Basepair',
- email=u'basepair@example.edu',
- password=u'basepair',classname=None,admin=1),
- MappedUsers(
- name=u'Joe Student',
- email=u'student@example.edu',
- password=u'student',classname=None,admin=0),
- ]
- )
-
+ eq_(users, [MappedUsers(name=u'Bhargan Basepair',
+ email=u'basepair@example.edu', password=u'basepair',
+ classname=None, admin=1), MappedUsers(name=u'Joe Student',
+ email=u'student@example.edu', password=u'student',
+ classname=None, admin=0)])
+
def test_whereclause(self):
db = sqlsoup.SqlSoup(engine)
MappedUsers = db.users
-
- where = or_(db.users.name=='Bhargan Basepair', db.users.email=='student@example.edu')
- users = db.users.filter(where).order_by(desc(db.users.name)).all()
- eq_(
- users,
- [MappedUsers(name=u'Joe Student',email=u'student@example.edu',password=u'student',classname=None,admin=0), MappedUsers(name=u'Bhargan Basepair',email=u'basepair@example.edu',password=u'basepair',classname=None,admin=1)]
- )
-
+ where = or_(db.users.name == 'Bhargan Basepair', db.users.email
+ == 'student@example.edu')
+ users = \
+ db.users.filter(where).order_by(desc(db.users.name)).all()
+ eq_(users, [MappedUsers(name=u'Joe Student',
+ email=u'student@example.edu', password=u'student',
+ classname=None, admin=0),
+ MappedUsers(name=u'Bhargan Basepair',
+ email=u'basepair@example.edu', password=u'basepair',
+ classname=None, admin=1)])
+
def test_first(self):
db = sqlsoup.SqlSoup(engine)
MappedUsers = db.users
-
- user = db.users.filter(db.users.name=='Bhargan Basepair').one()
- eq_(user,
- MappedUsers(name=u'Bhargan Basepair',email=u'basepair@example.edu',password=u'basepair',classname=None,admin=1)
- )
+ user = db.users.filter(db.users.name == 'Bhargan Basepair'
+ ).one()
+ eq_(user, MappedUsers(name=u'Bhargan Basepair',
+ email=u'basepair@example.edu', password=u'basepair',
+ classname=None, admin=1))
db.rollback()
-
user = db.users.get('Bhargan Basepair')
- eq_(user, MappedUsers(name=u'Bhargan Basepair',email=u'basepair@example.edu',password=u'basepair',classname=None,admin=1))
+ eq_(user, MappedUsers(name=u'Bhargan Basepair',
+ email=u'basepair@example.edu', password=u'basepair',
+ classname=None, admin=1))
db.rollback()
-
user = db.users.filter_by(name='Bhargan Basepair').one()
- eq_(user, MappedUsers(name=u'Bhargan Basepair',email=u'basepair@example.edu',password=u'basepair',classname=None,admin=1))
+ eq_(user, MappedUsers(name=u'Bhargan Basepair',
+ email=u'basepair@example.edu', password=u'basepair',
+ classname=None, admin=1))
db.rollback()
def test_crud(self):
+
# note we're testing autoflush here too...
+
db = sqlsoup.SqlSoup(engine)
MappedLoans = db.loans
user = db.users.filter_by(name='Bhargan Basepair').one()
- book_id = db.books.filter_by(title='Regional Variation in Moss').first().id
- loan_insert = db.loans.insert(book_id=book_id, user_name=user.name)
-
- loan = db.loans.filter_by(book_id=2, user_name='Bhargan Basepair').one()
+ book_id = db.books.filter_by(title='Regional Variation in Moss'
+ ).first().id
+ loan_insert = db.loans.insert(book_id=book_id,
+ user_name=user.name)
+ loan = db.loans.filter_by(book_id=2,
+ user_name='Bhargan Basepair').one()
eq_(loan, loan_insert)
- l2 = MappedLoans(book_id=2,user_name=u'Bhargan Basepair',loan_date=loan.loan_date)
+ l2 = MappedLoans(book_id=2, user_name=u'Bhargan Basepair',
+ loan_date=loan.loan_date)
eq_(loan, l2)
db.expunge(l2)
-
db.delete(loan)
- loan = db.loans.filter_by(book_id=2, user_name='Bhargan Basepair').first()
+ loan = db.loans.filter_by(book_id=2,
+ user_name='Bhargan Basepair').first()
assert loan is None
-
+
def test_cls_crud(self):
db = sqlsoup.SqlSoup(engine)
MappedUsers = db.users
- db.users.filter_by(name='Bhargan Basepair').update(dict(name='Some New Name'))
- u1= db.users.filter_by(name='Some New Name').one()
- eq_(
- u1,
- MappedUsers(name=u'Some New Name',
- email=u'basepair@example.edu',
- password=u'basepair',classname=None,admin=1)
- )
-
+ db.users.filter_by(name='Bhargan Basepair'
+ ).update(dict(name='Some New Name'))
+ u1 = db.users.filter_by(name='Some New Name').one()
+ eq_(u1, MappedUsers(name=u'Some New Name',
+ email=u'basepair@example.edu', password=u'basepair',
+ classname=None, admin=1))
+
def test_map_table(self):
db = sqlsoup.SqlSoup(engine)
users = Table('users', db._metadata, autoload=True)
MappedUsers = db.map(users)
-
users = MappedUsers.order_by(db.users.name).all()
- eq_(
- users,
- [
- MappedUsers(
- name=u'Bhargan Basepair',
- email=u'basepair@example.edu',
- password=u'basepair',classname=None,admin=1),
- MappedUsers(
- name=u'Joe Student',
- email=u'student@example.edu',
- password=u'student',classname=None,admin=0),
- ]
- )
-
+ eq_(users, [MappedUsers(name=u'Bhargan Basepair',
+ email=u'basepair@example.edu', password=u'basepair',
+ classname=None, admin=1), MappedUsers(name=u'Joe Student',
+ email=u'student@example.edu', password=u'student',
+ classname=None, admin=0)])
+
def test_mapped_join(self):
db = sqlsoup.SqlSoup(engine)
-
join1 = MappedJoin = db.join(db.users, db.loans, isouter=True)
mj = join1.filter_by(name='Joe Student').all()
- eq_(
- mj,
- [MappedJoin(name=u'Joe Student',email=u'student@example.edu',password=u'student',classname=None,admin=0,book_id=1,user_name=u'Joe Student',loan_date=datetime.datetime(2006, 7, 12, 0, 0))]
- )
-
+ eq_(mj, [MappedJoin(
+ name=u'Joe Student',
+ email=u'student@example.edu',
+ password=u'student',
+ classname=None,
+ admin=0,
+ book_id=1,
+ user_name=u'Joe Student',
+ loan_date=datetime.datetime(2006, 7, 12, 0, 0),
+ )])
db.rollback()
-
join2 = MappedJoin = db.join(join1, db.books)
mj = join2.all()
- eq_(mj, [MappedJoin(name=u'Joe Student',email=u'student@example.edu',password=u'student',classname=None,admin=0,book_id=1,user_name=u'Joe Student',loan_date=datetime.datetime(2006, 7, 12, 0, 0),id=1,title=u'Mustards I Have Known',published_year=u'1989',authors=u'Jones')])
-
- eq_(
- db.with_labels(join1).c.keys(),
- [u'users_name', u'users_email', u'users_password',
- u'users_classname', u'users_admin',
- u'loans_book_id', u'loans_user_name',
- u'loans_loan_date']
- )
-
+ eq_(mj, [MappedJoin(
+ name=u'Joe Student',
+ email=u'student@example.edu',
+ password=u'student',
+ classname=None,
+ admin=0,
+ book_id=1,
+ user_name=u'Joe Student',
+ loan_date=datetime.datetime(2006, 7, 12, 0, 0),
+ id=1,
+ title=u'Mustards I Have Known',
+ published_year=u'1989',
+ authors=u'Jones',
+ )])
+ eq_(db.with_labels(join1).c.keys(), [
+ u'users_name',
+ u'users_email',
+ u'users_password',
+ u'users_classname',
+ u'users_admin',
+ u'loans_book_id',
+ u'loans_user_name',
+ u'loans_loan_date',
+ ])
labeled_loans = db.with_labels(db.loans)
- eq_(
- db.join(db.users, labeled_loans, isouter=True).c.keys(),
- [u'name', u'email', u'password', u'classname',
- u'admin', u'loans_book_id', u'loans_user_name', u'loans_loan_date']
- )
-
+ eq_(db.join(db.users, labeled_loans, isouter=True).c.keys(), [
+ u'name',
+ u'email',
+ u'password',
+ u'classname',
+ u'admin',
+ u'loans_book_id',
+ u'loans_user_name',
+ u'loans_loan_date',
+ ])
+
def test_relations(self):
db = sqlsoup.SqlSoup(engine)
db.users.relate('loans', db.loans)
-
MappedLoans = db.loans
MappedUsers = db.users
-
- eq_(
- db.users.get('Joe Student').loans,
- [MappedLoans(
- book_id=1,
- user_name=u'Joe Student',
- loan_date=datetime.datetime(2006, 7, 12, 0, 0))
- ]
- )
+ eq_(db.users.get('Joe Student').loans, [MappedLoans(book_id=1,
+ user_name=u'Joe Student', loan_date=datetime.datetime(2006,
+ 7, 12, 0, 0))])
db.rollback()
-
- eq_(
- db.users.filter(~db.users.loans.any()).all(),
- [MappedUsers(
- name=u'Bhargan Basepair',
- email='basepair@example.edu',
- password=u'basepair',
- classname=None,admin=1)
- ]
- )
+ eq_(db.users.filter(~db.users.loans.any()).all(),
+ [MappedUsers(name=u'Bhargan Basepair',
+ email='basepair@example.edu', password=u'basepair',
+ classname=None, admin=1)])
db.rollback()
-
del db._cache['users']
- db.users.relate('loans', db.loans,
- order_by=db.loans.loan_date, cascade='all, delete-orphan')
-
+ db.users.relate('loans', db.loans, order_by=db.loans.loan_date,
+ cascade='all, delete-orphan')
+
def test_explicit_session(self):
Session = scoped_session(sessionmaker())
db = sqlsoup.SqlSoup(engine, session=Session)
@@ -223,62 +215,46 @@ class SQLSoupTest(TestBase):
MappedUsers = db.users
sess = Session()
assert db.users._query.session is db.users.session is sess
-
row = db.users.insert(name='new name', email='new email')
assert row in sess
finally:
sess.rollback()
sess.close()
-
+
def test_selectable(self):
db = sqlsoup.SqlSoup(engine)
MappedBooks = db.books
b = db.books._table
-
- s = select(
- [b.c.published_year, func.count('*').label('n')],
- from_obj=[b], group_by=[b.c.published_year])
-
+ s = select([b.c.published_year, func.count('*').label('n')],
+ from_obj=[b], group_by=[b.c.published_year])
s = s.alias('years_with_count')
years_with_count = db.map(s, primary_key=[s.c.published_year])
-
- eq_(
- years_with_count.filter_by(published_year='1989').all(),
- [MappedBooks(published_year=u'1989',n=1)]
- )
-
+ eq_(years_with_count.filter_by(published_year='1989').all(),
+ [MappedBooks(published_year=u'1989', n=1)])
+
def test_raw_sql(self):
db = sqlsoup.SqlSoup(engine)
rp = db.execute('select name, email from users order by name')
- eq_(
- rp.fetchall(),
- [('Bhargan Basepair', 'basepair@example.edu'),
- ('Joe Student', 'student@example.edu')]
- )
-
- # test that execute() shares the same transactional
- # context as the session
+ eq_(rp.fetchall(), [('Bhargan Basepair', 'basepair@example.edu'
+ ), ('Joe Student', 'student@example.edu')])
+
+ # test that execute() shares the same transactional context as
+ # the session
+
db.execute("update users set email='foo bar'")
- eq_(
- db.execute("select distinct email from users").fetchall(),
- [('foo bar',)]
- )
+ eq_(db.execute('select distinct email from users').fetchall(),
+ [('foo bar', )])
db.rollback()
- eq_(
- db.execute("select distinct email from users").fetchall(),
- [(u'basepair@example.edu',), (u'student@example.edu',)]
- )
-
+ eq_(db.execute('select distinct email from users').fetchall(),
+ [(u'basepair@example.edu', ), (u'student@example.edu', )])
+
def test_connection(self):
db = sqlsoup.SqlSoup(engine)
conn = db.connection()
rp = conn.execute('select name, email from users order by name')
- eq_(
- rp.fetchall(),
- [('Bhargan Basepair', 'basepair@example.edu'),
- ('Joe Student', 'student@example.edu')]
- )
-
+ eq_(rp.fetchall(), [('Bhargan Basepair', 'basepair@example.edu'
+ ), ('Joe Student', 'student@example.edu')])
+
def test_entity(self):
db = sqlsoup.SqlSoup(engine)
tablename = 'loans'
@@ -287,40 +263,40 @@ class SQLSoupTest(TestBase):
def test_entity_with_different_base(self):
class subclass(object):
pass
+
db = sqlsoup.SqlSoup(engine, base=subclass)
assert issubclass(db.entity('loans'), subclass)
-
+
def test_filter_by_order_by(self):
db = sqlsoup.SqlSoup(engine)
MappedUsers = db.users
- users = db.users.filter_by(classname=None).order_by(db.users.name).all()
- eq_(
- users,
- [MappedUsers(name=u'Bhargan Basepair',email=u'basepair@example.edu',password=u'basepair',classname=None,admin=1), MappedUsers(name=u'Joe Student',email=u'student@example.edu',password=u'student',classname=None,admin=0)]
- )
-
+ users = \
+ db.users.filter_by(classname=None).order_by(db.users.name).all()
+ eq_(users, [MappedUsers(name=u'Bhargan Basepair',
+ email=u'basepair@example.edu', password=u'basepair',
+ classname=None, admin=1), MappedUsers(name=u'Joe Student',
+ email=u'student@example.edu', password=u'student',
+ classname=None, admin=0)])
+
def test_no_pk(self):
db = sqlsoup.SqlSoup(engine)
assert_raises(sqlsoup.PKNotFoundError, getattr, db, 'nopk')
-
+
def test_nosuchtable(self):
db = sqlsoup.SqlSoup(engine)
assert_raises(exc.NoSuchTableError, getattr, db, 'nosuchtable')
-
+
def test_dont_persist_alias(self):
db = sqlsoup.SqlSoup(engine)
MappedBooks = db.books
b = db.books._table
-
- s = select(
- [b.c.published_year, func.count('*').label('n')],
- from_obj=[b], group_by=[b.c.published_year])
-
+ s = select([b.c.published_year, func.count('*').label('n')],
+ from_obj=[b], group_by=[b.c.published_year])
s = s.alias('years_with_count')
years_with_count = db.map(s, primary_key=[s.c.published_year])
-
- assert_raises(exc.InvalidRequestError, years_with_count.insert, published_year='2007', n=1)
-
+ assert_raises(exc.InvalidRequestError, years_with_count.insert,
+ published_year='2007', n=1)
+
def test_clear(self):
db = sqlsoup.SqlSoup(engine)
eq_(db.loans.count(), 1)
@@ -328,8 +304,10 @@ class SQLSoupTest(TestBase):
db.expunge_all()
db.flush()
eq_(db.loans.count(), 1)
-
-_ddl = u"""
+
+
+_ddl = \
+ u"""
CREATE TABLE books (
id integer PRIMARY KEY, -- auto-increments in sqlite
title text NOT NULL,
@@ -360,10 +338,10 @@ CREATE TABLE bad_names (
id int primary key,
query varchar(100)
)
-""".split(';')
-
-
-_data = """
+""".split(';'
+ )
+_data = \
+ """
insert into users(name, email, password, admin)
values('Bhargan Basepair', 'basepair@example.edu', 'basepair', 1);
insert into users(name, email, password, admin)
@@ -380,11 +358,13 @@ values (
(select name from users where name like 'Joe%'),
'2006-07-12 0:0:0')
;
-""".split(";")
-
-_teardown = """
+""".split(';'
+ )
+_teardown = \
+ """
delete from loans;
delete from books;
delete from users;
delete from nopk;
-""".split(";")
+""".split(';'
+ )