summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--test/engine/_base.py145
-rw-r--r--test/orm/_base.py2
-rw-r--r--test/orm/_fixtures.py71
-rw-r--r--test/sql/_base.py4
-rw-r--r--test/sql/defaults.py302
5 files changed, 311 insertions, 213 deletions
diff --git a/test/engine/_base.py b/test/engine/_base.py
index 9ffdb5e1a..c215b2e96 100644
--- a/test/engine/_base.py
+++ b/test/engine/_base.py
@@ -1,4 +1,147 @@
-from testlib import testing
+from testlib import sa, testing
+from testlib.testing import adict
+
+
+class TablesTest(testing.TestBase):
+ """An integration test that creates and uses tables."""
+
+ # 'once', 'each', None
+ run_setup_bind = 'once'
+
+ # 'once', 'each', None
+ run_define_tables = 'once'
+
+ # 'once', 'each', None
+ run_inserts = 'each'
+
+ # 'foreach', None
+ run_deletes = 'each'
+
+ # 'once', 'each', None
+ run_dispose_bind = None
+
+ _artifact_registries = ('tables', 'other_artifacts')
+
+ bind = None
+ metadata = None
+ tables = None
+ other_artifacts = None
+
+ def setUpAll(self):
+ if self.run_setup_bind is None:
+ assert self.bind is not None
+ assert self.run_deletes in (None, 'each')
+ if self.run_inserts == 'once':
+ assert self.run_deletes is None
+
+ cls = self.__class__
+ if cls.tables is None:
+ cls.tables = adict()
+ if cls.other_artifacts is None:
+ cls.other_artifacts = adict()
+
+ if self.bind is None:
+ setattr(type(self), 'bind', self.setup_bind())
+
+ if self.metadata is None:
+ setattr(type(self), 'metadata', sa.MetaData())
+
+ if self.metadata.bind is None:
+ self.metadata.bind = self.bind
+
+ if self.run_define_tables:
+ self.define_tables(self.metadata)
+ self.metadata.create_all()
+ self.tables.update(self.metadata.tables)
+
+ if self.run_inserts:
+ self._load_fixtures()
+ self.insert_data()
+
+ def setUp(self):
+ if self._sa_first_test:
+ return
+
+ cls = self.__class__
+
+ if self.setup_bind == 'each':
+ setattr(cls, 'bind', self.setup_bind())
+
+ if self.run_define_tables == 'each':
+ self.tables.clear()
+ self.metadata.drop_all()
+ self.metadata.clear()
+ self.define_tables(self.metadata)
+ self.metadata.create_all()
+ self.tables.update(self.metadata.tables)
+
+ if self.run_inserts == 'each':
+ self._load_fixtures()
+ self.insert_data()
+
+ def tearDown(self):
+ # no need to run deletes if tables are recreated on setup
+ if self.run_define_tables != 'each' and self.run_deletes:
+ for table in self.metadata.table_iterator(reverse=True):
+ try:
+ table.delete().execute().close()
+ except sa.exc.DBAPIError, ex:
+ print >> sys.stderr, "Error emptying table %s: %r" % (
+ table, ex)
+
+ if self.run_dispose_bind == 'each':
+ self.dispose_bind(self.bind)
+
+ def tearDownAll(self):
+ self.metadata.drop_all()
+
+ if self.dispose_bind:
+ self.dispose_bind(self.bind)
+
+ self.metadata.bind = None
+
+ if self.run_setup_bind is not None:
+ self.bind = None
+
+ def setup_bind(self):
+ return testing.db
+
+ def dispose_bind(self, bind):
+ if hasattr(bind, 'dispose'):
+ bind.dispose()
+ elif hasattr(bind, 'close'):
+ bind.close()
+
+ def define_tables(self, metadata):
+ raise NotImplementedError()
+
+ def fixtures(self):
+ return {}
+
+ def insert_data(self):
+ pass
+
+ def sql_count_(self, count, fn):
+ self.assert_sql_count(self.bind, fn, count)
+
+ def sql_eq_(self, callable_, statements, with_sequences=None):
+ self.assert_sql(self.bind,
+ callable_, statements, with_sequences)
+
+ def _load_fixtures(self):
+ headers, rows = {}, {}
+ for table, data in self.fixtures().iteritems():
+ if isinstance(table, basestring):
+ table = self.tables[table]
+ headers[table] = data[0]
+ rows[table] = data[1:]
+ for table in self.metadata.table_iterator(reverse=False):
+ if table not in headers:
+ continue
+ table.bind.execute(
+ table.insert(),
+ [dict(zip(headers[table], column_values))
+ for column_values in rows[table]])
class AltEngineTest(testing.TestBase):
diff --git a/test/orm/_base.py b/test/orm/_base.py
index b952952d7..62285affb 100644
--- a/test/orm/_base.py
+++ b/test/orm/_base.py
@@ -199,7 +199,7 @@ class MappedTest(ORMTest):
if self.run_setup_mappers == 'each':
sa.orm.clear_mappers()
-
+
# no need to run deletes if tables are recreated on setup
if self.run_define_tables != 'each' and self.run_deletes:
for table in self.metadata.table_iterator(reverse=True):
diff --git a/test/orm/_fixtures.py b/test/orm/_fixtures.py
index 4ded57d98..6be6c7bd0 100644
--- a/test/orm/_fixtures.py
+++ b/test/orm/_fixtures.py
@@ -160,79 +160,30 @@ def _load_fixtures():
def run_inserts_for(table, bind=None):
table.info[('fixture', 'loader')](bind)
-
class Base(_base.ComparableEntity):
pass
-_recursion_stack = set()
-class ZBase(_base.BasicEntity):
- def __ne__(self, other):
- return not self.__eq__(other)
-
- def __eq__(self, other):
- """'passively' compare this object to another.
-
- only look at attributes that are present on the source object.
-
- """
- if self in _recursion_stack:
- return True
- _recursion_stack.add(self)
- try:
- # pick the entity thats not SA persisted as the source
- try:
- state = attributes.instance_state(self)
- key = state.key
- except (KeyError, AttributeError):
- key = None
- if other is None:
- a = self
- b = other
- elif key is not None:
- a = other
- b = self
- else:
- a = self
- b = other
-
- for attr in a.__dict__.keys():
- if attr[0] == '_':
- continue
- value = getattr(a, attr)
- #print "looking at attr:", attr, "start value:", value
- if hasattr(value, '__iter__') and not isinstance(value, basestring):
- try:
- # catch AttributeError so that lazy loaders trigger
- battr = getattr(b, attr)
- except AttributeError:
- #print "b class does not have attribute named '%s'" % attr
- #raise
- return False
-
- if list(value) == list(battr):
- continue
- else:
- return False
- else:
- if value is not None:
- if value != getattr(b, attr, None):
- #print "2. Attribute named '%s' does not match that of b" % attr
- return False
- else:
- return True
- finally:
- _recursion_stack.remove(self)
class User(Base):
pass
+
+
class Order(Base):
pass
+
+
class Item(Base):
pass
+
+
class Keyword(Base):
pass
+
+
class Address(Base):
pass
+
+
class Dingaling(Base):
pass
@@ -253,7 +204,7 @@ class FixtureTest(_base.MappedTest):
run_setup_mappers = 'each'
run_inserts = 'each'
run_deletes = 'each'
-
+
metadata = fixture_metadata
fixture_classes = dict(User=User,
Order=Order,
diff --git a/test/sql/_base.py b/test/sql/_base.py
new file mode 100644
index 000000000..c1a107eeb
--- /dev/null
+++ b/test/sql/_base.py
@@ -0,0 +1,4 @@
+from engine import _base as engine_base
+
+
+TablesTest = engine_base.TablesTest
diff --git a/test/sql/defaults.py b/test/sql/defaults.py
index c496dfca8..fbea5888e 100644
--- a/test/sql/defaults.py
+++ b/test/sql/defaults.py
@@ -1,14 +1,14 @@
import testenv; testenv.configure_for_tests()
import datetime
-from sqlalchemy import *
-from sqlalchemy import exc, schema, util
-from sqlalchemy.orm import mapper, create_session
+from sqlalchemy import Sequence, Column, func
from testlib import sa, testing
+from testlib.sa import MetaData, Table, Integer, String, ForeignKey
from testlib.testing import eq_
-from testlib import *
+from testlib.compat import set
+from sql import _base
-class DefaultTest(TestBase):
+class DefaultTest(testing.TestBase):
def setUpAll(self):
global t, f, f2, ts, currenttime, metadata, default_generator
@@ -23,12 +23,12 @@ class DefaultTest(TestBase):
def myupdate_with_ctx(ctx):
conn = ctx.connection
- return conn.execute(select([text('13')])).scalar()
+ return conn.execute(sa.select([sa.text('13')])).scalar()
def mydefault_using_connection(ctx):
conn = ctx.connection
try:
- return conn.execute(select([text('12')])).scalar()
+ return conn.execute(sa.select([sa.text('12')])).scalar()
finally:
# ensure a "close()" on this connection does nothing,
# since its a "branched" connection
@@ -40,32 +40,32 @@ class DefaultTest(TestBase):
# select "count(1)" returns different results on different DBs also
# correct for "current_date" compatible as column default, value
# differences
- currenttime = func.current_date(type_=Date, bind=db)
+ currenttime = func.current_date(type_=sa.Date, bind=db)
if is_oracle:
- ts = db.scalar(select([func.trunc(func.sysdate(), literal_column("'DAY'"), type_=Date).label('today')]))
+ ts = db.scalar(sa.select([func.trunc(func.sysdate(), sa.literal_column("'DAY'"), type_=sa.Date).label('today')]))
assert isinstance(ts, datetime.date) and not isinstance(ts, datetime.datetime)
- f = select([func.length('abcdef')], bind=db).scalar()
- f2 = select([func.length('abcdefghijk')], bind=db).scalar()
+ f = sa.select([func.length('abcdef')], bind=db).scalar()
+ f2 = sa.select([func.length('abcdefghijk')], bind=db).scalar()
# TODO: engine propigation across nested functions not working
- currenttime = func.trunc(currenttime, literal_column("'DAY'"), bind=db, type_=Date)
+ currenttime = func.trunc(currenttime, sa.literal_column("'DAY'"), bind=db, type_=sa.Date)
def1 = currenttime
- def2 = func.trunc(text("sysdate"), literal_column("'DAY'"), type_=Date)
+ def2 = func.trunc(sa.text("sysdate"), sa.literal_column("'DAY'"), type_=sa.Date)
- deftype = Date
+ deftype = sa.Date
elif use_function_defaults:
- f = select([func.length('abcdef')], bind=db).scalar()
- f2 = select([func.length('abcdefghijk')], bind=db).scalar()
+ f = sa.select([func.length('abcdef')], bind=db).scalar()
+ f2 = sa.select([func.length('abcdefghijk')], bind=db).scalar()
def1 = currenttime
if testing.against('maxdb'):
- def2 = text("curdate")
+ def2 = sa.text("curdate")
else:
- def2 = text("current_date")
- deftype = Date
+ def2 = sa.text("current_date")
+ deftype = sa.Date
ts = db.func.current_date().scalar()
else:
- f = select([func.length('abcdef')], bind=db).scalar()
- f2 = select([func.length('abcdefghijk')], bind=db).scalar()
+ f = len('abcdef')
+ f2 = len('abcdefghijk')
def1 = def2 = "3"
ts = 3
deftype = Integer
@@ -94,12 +94,12 @@ class DefaultTest(TestBase):
server_default=def2),
# preexecute + update timestamp
- Column('col6', Date,
+ Column('col6', sa.Date,
default=currenttime,
onupdate=currenttime),
- Column('boolcol1', Boolean, default=True),
- Column('boolcol2', Boolean, default=False),
+ Column('boolcol1', sa.Boolean, default=True),
+ Column('boolcol2', sa.Boolean, default=False),
# python function which uses ExecutionContext
Column('col7', Integer,
@@ -107,7 +107,7 @@ class DefaultTest(TestBase):
onupdate=myupdate_with_ctx),
# python builtin
- Column('col8', Date,
+ Column('col8', sa.Date,
default=datetime.date.today,
onupdate=datetime.date.today),
# combo
@@ -123,7 +123,7 @@ class DefaultTest(TestBase):
default_generator['x'] = 50
t.delete().execute()
- def test_bad_argsignature(self):
+ def test_bad_arg_signature(self):
ex_msg = \
"ColumnDefault Python function takes zero or one positional arguments"
@@ -138,13 +138,11 @@ class DefaultTest(TestBase):
fn4 = FN4()
for fn in fn1, fn2, fn3, fn4:
- try:
- c = ColumnDefault(fn)
- assert False, str(fn)
- except exc.ArgumentError, e:
- assert str(e) == ex_msg
+ self.assertRaisesMessage(sa.exc.ArgumentError,
+ ex_msg,
+ sa.ColumnDefault, fn)
- def test_argsignature(self):
+ def test_arg_signature(self):
def fn1(): pass
def fn2(): pass
def fn3(x=1): pass
@@ -166,17 +164,18 @@ class DefaultTest(TestBase):
fn8 = FN8()
for fn in fn1, fn2, fn3, fn4, fn5, fn6, fn7, fn8:
- c = ColumnDefault(fn)
+ c = sa.ColumnDefault(fn)
- def teststandalone(self):
+ @testing.fails_on('firebird') # 'Data type unknown'
+ def test_standalone(self):
c = testing.db.engine.contextual_connect()
x = c.execute(t.c.col1.default)
y = t.c.col2.default.execute()
z = c.execute(t.c.col3.default)
- self.assert_(50 <= x <= 57)
- self.assert_(y == 'imthedefault')
- self.assert_(z == f)
- self.assert_(f2==11)
+ assert 50 <= x <= 57
+ eq_(y, 'imthedefault')
+ eq_(z, f)
+ eq_(f2, 11)
def test_py_vs_server_default_detection(self):
@@ -202,6 +201,8 @@ class DefaultTest(TestBase):
has_('col8', 'default', 'onupdate')
has_('col9', 'default', 'server_default')
+ ColumnDefault, DefaultClause = sa.ColumnDefault, sa.DefaultClause
+
t2 = Table('t2', MetaData(),
Column('col1', Integer, Sequence('foo')),
Column('col2', Integer,
@@ -244,31 +245,38 @@ class DefaultTest(TestBase):
has_('col7', 'default', 'server_default', 'onupdate')
has_('col8', 'default', 'server_default', 'onupdate', 'server_onupdate')
+ @testing.fails_on('firebird') # 'Data type unknown'
def test_insert(self):
r = t.insert().execute()
assert r.lastrow_has_defaults()
- eq_(util.Set(r.context.postfetch_cols), util.Set([t.c.col3, t.c.col5, t.c.col4, t.c.col6]))
+ eq_(set(r.context.postfetch_cols),
+ set([t.c.col3, t.c.col5, t.c.col4, t.c.col6]))
r = t.insert(inline=True).execute()
assert r.lastrow_has_defaults()
- eq_(util.Set(r.context.postfetch_cols), util.Set([t.c.col3, t.c.col5, t.c.col4, t.c.col6]))
+ eq_(set(r.context.postfetch_cols),
+ set([t.c.col3, t.c.col5, t.c.col4, t.c.col6]))
t.insert().execute()
- ctexec = select([currenttime.label('now')], bind=testing.db).scalar()
+ ctexec = sa.select([currenttime.label('now')], bind=testing.db).scalar()
l = t.select().execute()
today = datetime.date.today()
eq_(l.fetchall(), [
- (x, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today, 'py')
+ (x, 'imthedefault', f, ts, ts, ctexec, True, False,
+ 12, today, 'py')
for x in range(51, 54)])
t.insert().execute(col9=None)
assert r.lastrow_has_defaults()
- eq_(util.Set(r.context.postfetch_cols), util.Set([t.c.col3, t.c.col5, t.c.col4, t.c.col6]))
+ eq_(set(r.context.postfetch_cols),
+ set([t.c.col3, t.c.col5, t.c.col4, t.c.col6]))
eq_(t.select(t.c.col1==54).execute().fetchall(),
- [(54, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today, None)])
+ [(54, 'imthedefault', f, ts, ts, ctexec, True, False,
+ 12, today, None)])
+ @testing.fails_on('firebird') # 'Data type unknown'
def test_insertmany(self):
# MySQL-Python 1.2.2 breaks functions in execute_many :(
if (testing.against('mysql') and
@@ -281,15 +289,19 @@ class DefaultTest(TestBase):
l = t.select().execute()
today = datetime.date.today()
eq_(l.fetchall(),
- [(51, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today, 'py'),
- (52, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today, 'py'),
- (53, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today, 'py')])
+ [(51, 'imthedefault', f, ts, ts, ctexec, True, False,
+ 12, today, 'py'),
+ (52, 'imthedefault', f, ts, ts, ctexec, True, False,
+ 12, today, 'py'),
+ (53, 'imthedefault', f, ts, ts, ctexec, True, False,
+ 12, today, 'py')])
def test_insert_values(self):
t.insert(values={'col3':50}).execute()
l = t.select().execute()
- self.assert_(l.fetchone()['col3'] == 50)
+ eq_(50, l.fetchone()['col3'])
+ @testing.fails_on('firebird') # 'Data type unknown'
def test_updatemany(self):
# MySQL-Python 1.2.2 breaks functions in execute_many :(
if (testing.against('mysql') and
@@ -298,44 +310,49 @@ class DefaultTest(TestBase):
t.insert().execute({}, {}, {})
- t.update(t.c.col1==bindparam('pkval')).execute(
- {'pkval':51,'col7':None, 'col8':None, 'boolcol1':False},
- )
+ t.update(t.c.col1==sa.bindparam('pkval')).execute(
+ {'pkval':51,'col7':None, 'col8':None, 'boolcol1':False})
- t.update(t.c.col1==bindparam('pkval')).execute(
+ t.update(t.c.col1==sa.bindparam('pkval')).execute(
{'pkval':51,},
{'pkval':52,},
- {'pkval':53,},
- )
+ {'pkval':53,})
l = t.select().execute()
ctexec = currenttime.scalar()
today = datetime.date.today()
eq_(l.fetchall(),
- [(51, 'im the update', f2, ts, ts, ctexec, False, False, 13, today, 'py'),
- (52, 'im the update', f2, ts, ts, ctexec, True, False, 13, today, 'py'),
- (53, 'im the update', f2, ts, ts, ctexec, True, False, 13, today, 'py')])
-
- def testupdate(self):
+ [(51, 'im the update', f2, ts, ts, ctexec, False, False,
+ 13, today, 'py'),
+ (52, 'im the update', f2, ts, ts, ctexec, True, False,
+ 13, today, 'py'),
+ (53, 'im the update', f2, ts, ts, ctexec, True, False,
+ 13, today, 'py')])
+
+ @testing.fails_on('firebird') # 'Data type unknown'
+ def test_update(self):
r = t.insert().execute()
pk = r.last_inserted_ids()[0]
t.update(t.c.col1==pk).execute(col4=None, col5=None)
ctexec = currenttime.scalar()
l = t.select(t.c.col1==pk).execute()
l = l.fetchone()
- self.assert_(l == (pk, 'im the update', f2, None, None, ctexec, True, False, 13, datetime.date.today(), 'py'))
- self.assert_(f2==11)
+ eq_(l,
+ (pk, 'im the update', f2, None, None, ctexec, True, False,
+ 13, datetime.date.today(), 'py'))
+ eq_(11, f2)
- def testupdatevalues(self):
+ @testing.fails_on('firebird') # 'Data type unknown'
+ def test_update_values(self):
r = t.insert().execute()
pk = r.last_inserted_ids()[0]
t.update(t.c.col1==pk, values={'col3': 55}).execute()
l = t.select(t.c.col1==pk).execute()
l = l.fetchone()
- self.assert_(l['col3'] == 55)
+ eq_(55, l['col3'])
@testing.fails_on_everything_except('postgres')
- def testpassiveoverride(self):
+ def test_passive_override(self):
"""
Primarily for postgres, tests that when we get a primary key column
back from reflecting a table which has a default value on it, we
@@ -345,6 +362,7 @@ class DefaultTest(TestBase):
locate the just inserted row.
"""
+ # TODO: move this to dialect/postgres
try:
meta = MetaData(testing.db)
testing.db.execute("""
@@ -360,58 +378,48 @@ class DefaultTest(TestBase):
t = Table("speedy_users", meta, autoload=True)
t.insert().execute(user_name='user', user_password='lala')
l = t.select().execute().fetchall()
- self.assert_(l == [(1, 'user', 'lala')])
+ eq_(l, [(1, 'user', 'lala')])
finally:
testing.db.execute("drop table speedy_users", None)
-class PKDefaultTest(TestBase):
- __requires__ = ('subqueries',)
-
- def setUpAll(self):
- global metadata, t1, t2
- metadata = MetaData(testing.db)
+class PKDefaultTest(_base.TablesTest):
+ __requires__ = ('subqueries',)
+ def define_tables(self, metadata):
t2 = Table('t2', metadata,
Column('nextid', Integer))
- t1 = Table('t1', metadata,
- Column('id', Integer, primary_key=True,
- default=select([func.max(t2.c.nextid)]).as_scalar()),
- Column('data', String(30)))
-
- metadata.create_all()
-
- def tearDownAll(self):
- metadata.drop_all()
+ Table('t1', metadata,
+ Column('id', Integer, primary_key=True,
+ default=sa.select([func.max(t2.c.nextid)]).as_scalar()),
+ Column('data', String(30)))
@testing.crashes('mssql', 'FIXME: unknown, verify not fails_on')
+ @testing.resolve_artifact_names
def test_basic(self):
t2.insert().execute(nextid=1)
r = t1.insert().execute(data='hi')
- assert r.last_inserted_ids() == [1]
+ eq_([1], r.last_inserted_ids())
t2.insert().execute(nextid=2)
r = t1.insert().execute(data='there')
- assert r.last_inserted_ids() == [2]
-
+ eq_([2], r.last_inserted_ids())
-class PKIncrementTest(TestBase):
- def setUp(self):
- global aitable, aimeta
- aimeta = MetaData(testing.db)
- aitable = Table("aitest", aimeta,
- Column('id', Integer, Sequence('ai_id_seq', optional=True),
- primary_key=True),
- Column('int1', Integer),
- Column('str1', String(20)))
- aimeta.create_all()
+class PKIncrementTest(_base.TablesTest):
+ run_define_tables = 'each'
- def tearDown(self):
- aimeta.drop_all()
+ def define_tables(self, metadata):
+ Table("aitable", metadata,
+ Column('id', Integer, Sequence('ai_id_seq', optional=True),
+ primary_key=True),
+ Column('int1', Integer),
+ Column('str1', String(20)))
# TODO: add coverage for increment on a secondary column in a key
+ @testing.fails_on('firebird') # data type unknown
+ @testing.resolve_artifact_names
def _test_autoincrement(self, bind):
ids = set()
rs = bind.execute(aitable.insert(), int1=1)
@@ -438,13 +446,14 @@ class PKIncrementTest(TestBase):
self.assert_(last not in ids)
ids.add(last)
- self.assert_(
- list(bind.execute(aitable.select().order_by(aitable.c.id))) ==
+ eq_(list(bind.execute(aitable.select().order_by(aitable.c.id))),
[(1, 1, None), (2, None, 'row 2'), (3, 3, 'row 3'), (4, 4, None)])
+ @testing.resolve_artifact_names
def test_autoincrement_autocommit(self):
self._test_autoincrement(testing.db)
+ @testing.resolve_artifact_names
def test_autoincrement_transaction(self):
con = testing.db.connect()
tx = con.begin()
@@ -463,64 +472,57 @@ class PKIncrementTest(TestBase):
con.close()
-class AutoIncrementTest(TestBase):
+class AutoIncrementTest(_base.TablesTest):
__requires__ = ('identity',)
+ run_define_tables = 'each'
+
+ def define_tables(self, metadata):
+ """Each test manipulates self.metadata individually."""
@testing.exclude('sqlite', '<', (3, 4), 'no database support')
def test_autoincrement_single_col(self):
- metadata = MetaData(testing.db)
+ single = Table('single', self.metadata,
+ Column('id', Integer, primary_key=True))
+ single.create()
- single = Table('single', metadata,
- Column('id', Integer, primary_key=True))
- metadata.create_all()
- try:
- r = single.insert().execute()
- id_ = r.last_inserted_ids()[0]
- assert id_ is not None
- eq_(select([func.count(text('*'))], from_obj=single).scalar(), 1)
- finally:
- metadata.drop_all()
+ r = single.insert().execute()
+ id_ = r.last_inserted_ids()[0]
+ assert id_ is not None
+ eq_(1, sa.select([func.count(sa.text('*'))], from_obj=single).scalar())
def test_autoincrement_fk(self):
- metadata = MetaData(testing.db)
-
- nodes = Table('nodes', metadata,
+ nodes = Table('nodes', self.metadata,
Column('id', Integer, primary_key=True),
Column('parent_id', Integer, ForeignKey('nodes.id')),
Column('data', String(30)))
- metadata.create_all()
- try:
- r = nodes.insert().execute(data='foo')
- id_ = r.last_inserted_ids()[0]
- nodes.insert().execute(data='bar', parent_id=id_)
- finally:
- metadata.drop_all()
+ nodes.create()
+
+ r = nodes.insert().execute(data='foo')
+ id_ = r.last_inserted_ids()[0]
+ nodes.insert().execute(data='bar', parent_id=id_)
@testing.fails_on('sqlite')
def test_non_autoincrement(self):
# sqlite INT primary keys can be non-unique! (only for ints)
- meta = MetaData(testing.db)
- nonai_table = Table("nonaitest", meta,
+ nonai = Table("nonaitest", self.metadata,
Column('id', Integer, autoincrement=False, primary_key=True),
Column('data', String(20)))
- nonai_table.create(checkfirst=True)
+ nonai.create()
+
+
try:
- try:
- # postgres + mysql strict will fail on first row,
- # mysql in legacy mode fails on second row
- nonai_table.insert().execute(data='row 1')
- nonai_table.insert().execute(data='row 2')
- assert False
- except exc.SQLError, e:
- print "Got exception", str(e)
- assert True
-
- nonai_table.insert().execute(id=1, data='row 1')
- finally:
- nonai_table.drop()
+ # postgres + mysql strict will fail on first row,
+ # mysql in legacy mode fails on second row
+ nonai.insert().execute(data='row 1')
+ nonai.insert().execute(data='row 2')
+ assert False
+ except sa.exc.SQLError, e:
+ assert True
+
+ nonai.insert().execute(id=1, data='row 1')
-class SequenceTest(TestBase):
+class SequenceTest(testing.TestBase):
__requires__ = ('sequences',)
def setUpAll(self):
@@ -529,7 +531,7 @@ class SequenceTest(TestBase):
cartitems = Table("cartitems", metadata,
Column("cart_id", Integer, Sequence('cart_id_seq'), primary_key=True),
Column("description", String(40)),
- Column("createdate", DateTime())
+ Column("createdate", sa.DateTime())
)
sometable = Table( 'Manager', metadata,
Column('obj_id', Integer, Sequence('obj_id_seq'), ),
@@ -551,14 +553,12 @@ class SequenceTest(TestBase):
sometable.insert().execute(
{'name':'name3'},
- {'name':'name4'}
- )
- assert sometable.select().execute().fetchall() == [
- (1, "somename", 1),
- (2, "someother", 2),
- (3, "name3", 3),
- (4, "name4", 4),
- ]
+ {'name':'name4'})
+ eq_(sometable.select().execute().fetchall(),
+ [(1, "somename", 1),
+ (2, "someother", 2),
+ (3, "name3", 3),
+ (4, "name4", 4)])
def testsequence(self):
cartitems.insert().execute(description='hi')
@@ -568,13 +568,13 @@ class SequenceTest(TestBase):
assert r.last_inserted_ids() and r.last_inserted_ids()[0] is not None
id_ = r.last_inserted_ids()[0]
- assert select([func.count(cartitems.c.cart_id)],
- and_(cartitems.c.description == 'lala',
- cartitems.c.cart_id == id_)).scalar() == 1
+ eq_(1,
+ sa.select([func.count(cartitems.c.cart_id)],
+ sa.and_(cartitems.c.description == 'lala',
+ cartitems.c.cart_id == id_)).scalar())
cartitems.select().execute().fetchall()
-
@testing.fails_on('maxdb')
# maxdb db-api seems to double-execute NEXTVAL internally somewhere,
# throwing off the numbers for these tests...
@@ -583,7 +583,7 @@ class SequenceTest(TestBase):
s.create()
try:
x = s.execute()
- self.assert_(x == 1)
+ eq_(x, 1)
finally:
s.drop()
@@ -593,7 +593,7 @@ class SequenceTest(TestBase):
s.create(bind=testing.db)
try:
x = s.execute(testing.db)
- self.assert_(x == 1)
+ eq_(x, 1)
finally:
s.drop(testing.db)