diff options
author | Jason Kirtland <jek@discorporate.us> | 2008-05-20 00:14:51 +0000 |
---|---|---|
committer | Jason Kirtland <jek@discorporate.us> | 2008-05-20 00:14:51 +0000 |
commit | 18a31d0316de48430ca5e8c8d442f11fdd3e26d6 (patch) | |
tree | 62acc61cb86450c516a0aa14b1b00b0fee96ac6b /test/sql/defaults.py | |
parent | 8e9fce417a04e647c6c3a3791b59a9cb704fc44d (diff) | |
download | sqlalchemy-18a31d0316de48430ca5e8c8d442f11fdd3e26d6.tar.gz |
- Quick cleanup of defaults.py. The main DefaultTest is still a mess.
Diffstat (limited to 'test/sql/defaults.py')
-rw-r--r-- | test/sql/defaults.py | 302 |
1 files changed, 151 insertions, 151 deletions
diff --git a/test/sql/defaults.py b/test/sql/defaults.py index c496dfca8..fbea5888e 100644 --- a/test/sql/defaults.py +++ b/test/sql/defaults.py @@ -1,14 +1,14 @@ import testenv; testenv.configure_for_tests() import datetime -from sqlalchemy import * -from sqlalchemy import exc, schema, util -from sqlalchemy.orm import mapper, create_session +from sqlalchemy import Sequence, Column, func from testlib import sa, testing +from testlib.sa import MetaData, Table, Integer, String, ForeignKey from testlib.testing import eq_ -from testlib import * +from testlib.compat import set +from sql import _base -class DefaultTest(TestBase): +class DefaultTest(testing.TestBase): def setUpAll(self): global t, f, f2, ts, currenttime, metadata, default_generator @@ -23,12 +23,12 @@ class DefaultTest(TestBase): def myupdate_with_ctx(ctx): conn = ctx.connection - return conn.execute(select([text('13')])).scalar() + return conn.execute(sa.select([sa.text('13')])).scalar() def mydefault_using_connection(ctx): conn = ctx.connection try: - return conn.execute(select([text('12')])).scalar() + return conn.execute(sa.select([sa.text('12')])).scalar() finally: # ensure a "close()" on this connection does nothing, # since its a "branched" connection @@ -40,32 +40,32 @@ class DefaultTest(TestBase): # select "count(1)" returns different results on different DBs also # correct for "current_date" compatible as column default, value # differences - currenttime = func.current_date(type_=Date, bind=db) + currenttime = func.current_date(type_=sa.Date, bind=db) if is_oracle: - ts = db.scalar(select([func.trunc(func.sysdate(), literal_column("'DAY'"), type_=Date).label('today')])) + ts = db.scalar(sa.select([func.trunc(func.sysdate(), sa.literal_column("'DAY'"), type_=sa.Date).label('today')])) assert isinstance(ts, datetime.date) and not isinstance(ts, datetime.datetime) - f = select([func.length('abcdef')], bind=db).scalar() - f2 = select([func.length('abcdefghijk')], bind=db).scalar() + f = sa.select([func.length('abcdef')], bind=db).scalar() + f2 = sa.select([func.length('abcdefghijk')], bind=db).scalar() # TODO: engine propigation across nested functions not working - currenttime = func.trunc(currenttime, literal_column("'DAY'"), bind=db, type_=Date) + currenttime = func.trunc(currenttime, sa.literal_column("'DAY'"), bind=db, type_=sa.Date) def1 = currenttime - def2 = func.trunc(text("sysdate"), literal_column("'DAY'"), type_=Date) + def2 = func.trunc(sa.text("sysdate"), sa.literal_column("'DAY'"), type_=sa.Date) - deftype = Date + deftype = sa.Date elif use_function_defaults: - f = select([func.length('abcdef')], bind=db).scalar() - f2 = select([func.length('abcdefghijk')], bind=db).scalar() + f = sa.select([func.length('abcdef')], bind=db).scalar() + f2 = sa.select([func.length('abcdefghijk')], bind=db).scalar() def1 = currenttime if testing.against('maxdb'): - def2 = text("curdate") + def2 = sa.text("curdate") else: - def2 = text("current_date") - deftype = Date + def2 = sa.text("current_date") + deftype = sa.Date ts = db.func.current_date().scalar() else: - f = select([func.length('abcdef')], bind=db).scalar() - f2 = select([func.length('abcdefghijk')], bind=db).scalar() + f = len('abcdef') + f2 = len('abcdefghijk') def1 = def2 = "3" ts = 3 deftype = Integer @@ -94,12 +94,12 @@ class DefaultTest(TestBase): server_default=def2), # preexecute + update timestamp - Column('col6', Date, + Column('col6', sa.Date, default=currenttime, onupdate=currenttime), - Column('boolcol1', Boolean, default=True), - Column('boolcol2', Boolean, default=False), + Column('boolcol1', sa.Boolean, default=True), + Column('boolcol2', sa.Boolean, default=False), # python function which uses ExecutionContext Column('col7', Integer, @@ -107,7 +107,7 @@ class DefaultTest(TestBase): onupdate=myupdate_with_ctx), # python builtin - Column('col8', Date, + Column('col8', sa.Date, default=datetime.date.today, onupdate=datetime.date.today), # combo @@ -123,7 +123,7 @@ class DefaultTest(TestBase): default_generator['x'] = 50 t.delete().execute() - def test_bad_argsignature(self): + def test_bad_arg_signature(self): ex_msg = \ "ColumnDefault Python function takes zero or one positional arguments" @@ -138,13 +138,11 @@ class DefaultTest(TestBase): fn4 = FN4() for fn in fn1, fn2, fn3, fn4: - try: - c = ColumnDefault(fn) - assert False, str(fn) - except exc.ArgumentError, e: - assert str(e) == ex_msg + self.assertRaisesMessage(sa.exc.ArgumentError, + ex_msg, + sa.ColumnDefault, fn) - def test_argsignature(self): + def test_arg_signature(self): def fn1(): pass def fn2(): pass def fn3(x=1): pass @@ -166,17 +164,18 @@ class DefaultTest(TestBase): fn8 = FN8() for fn in fn1, fn2, fn3, fn4, fn5, fn6, fn7, fn8: - c = ColumnDefault(fn) + c = sa.ColumnDefault(fn) - def teststandalone(self): + @testing.fails_on('firebird') # 'Data type unknown' + def test_standalone(self): c = testing.db.engine.contextual_connect() x = c.execute(t.c.col1.default) y = t.c.col2.default.execute() z = c.execute(t.c.col3.default) - self.assert_(50 <= x <= 57) - self.assert_(y == 'imthedefault') - self.assert_(z == f) - self.assert_(f2==11) + assert 50 <= x <= 57 + eq_(y, 'imthedefault') + eq_(z, f) + eq_(f2, 11) def test_py_vs_server_default_detection(self): @@ -202,6 +201,8 @@ class DefaultTest(TestBase): has_('col8', 'default', 'onupdate') has_('col9', 'default', 'server_default') + ColumnDefault, DefaultClause = sa.ColumnDefault, sa.DefaultClause + t2 = Table('t2', MetaData(), Column('col1', Integer, Sequence('foo')), Column('col2', Integer, @@ -244,31 +245,38 @@ class DefaultTest(TestBase): has_('col7', 'default', 'server_default', 'onupdate') has_('col8', 'default', 'server_default', 'onupdate', 'server_onupdate') + @testing.fails_on('firebird') # 'Data type unknown' def test_insert(self): r = t.insert().execute() assert r.lastrow_has_defaults() - eq_(util.Set(r.context.postfetch_cols), util.Set([t.c.col3, t.c.col5, t.c.col4, t.c.col6])) + eq_(set(r.context.postfetch_cols), + set([t.c.col3, t.c.col5, t.c.col4, t.c.col6])) r = t.insert(inline=True).execute() assert r.lastrow_has_defaults() - eq_(util.Set(r.context.postfetch_cols), util.Set([t.c.col3, t.c.col5, t.c.col4, t.c.col6])) + eq_(set(r.context.postfetch_cols), + set([t.c.col3, t.c.col5, t.c.col4, t.c.col6])) t.insert().execute() - ctexec = select([currenttime.label('now')], bind=testing.db).scalar() + ctexec = sa.select([currenttime.label('now')], bind=testing.db).scalar() l = t.select().execute() today = datetime.date.today() eq_(l.fetchall(), [ - (x, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today, 'py') + (x, 'imthedefault', f, ts, ts, ctexec, True, False, + 12, today, 'py') for x in range(51, 54)]) t.insert().execute(col9=None) assert r.lastrow_has_defaults() - eq_(util.Set(r.context.postfetch_cols), util.Set([t.c.col3, t.c.col5, t.c.col4, t.c.col6])) + eq_(set(r.context.postfetch_cols), + set([t.c.col3, t.c.col5, t.c.col4, t.c.col6])) eq_(t.select(t.c.col1==54).execute().fetchall(), - [(54, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today, None)]) + [(54, 'imthedefault', f, ts, ts, ctexec, True, False, + 12, today, None)]) + @testing.fails_on('firebird') # 'Data type unknown' def test_insertmany(self): # MySQL-Python 1.2.2 breaks functions in execute_many :( if (testing.against('mysql') and @@ -281,15 +289,19 @@ class DefaultTest(TestBase): l = t.select().execute() today = datetime.date.today() eq_(l.fetchall(), - [(51, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today, 'py'), - (52, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today, 'py'), - (53, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today, 'py')]) + [(51, 'imthedefault', f, ts, ts, ctexec, True, False, + 12, today, 'py'), + (52, 'imthedefault', f, ts, ts, ctexec, True, False, + 12, today, 'py'), + (53, 'imthedefault', f, ts, ts, ctexec, True, False, + 12, today, 'py')]) def test_insert_values(self): t.insert(values={'col3':50}).execute() l = t.select().execute() - self.assert_(l.fetchone()['col3'] == 50) + eq_(50, l.fetchone()['col3']) + @testing.fails_on('firebird') # 'Data type unknown' def test_updatemany(self): # MySQL-Python 1.2.2 breaks functions in execute_many :( if (testing.against('mysql') and @@ -298,44 +310,49 @@ class DefaultTest(TestBase): t.insert().execute({}, {}, {}) - t.update(t.c.col1==bindparam('pkval')).execute( - {'pkval':51,'col7':None, 'col8':None, 'boolcol1':False}, - ) + t.update(t.c.col1==sa.bindparam('pkval')).execute( + {'pkval':51,'col7':None, 'col8':None, 'boolcol1':False}) - t.update(t.c.col1==bindparam('pkval')).execute( + t.update(t.c.col1==sa.bindparam('pkval')).execute( {'pkval':51,}, {'pkval':52,}, - {'pkval':53,}, - ) + {'pkval':53,}) l = t.select().execute() ctexec = currenttime.scalar() today = datetime.date.today() eq_(l.fetchall(), - [(51, 'im the update', f2, ts, ts, ctexec, False, False, 13, today, 'py'), - (52, 'im the update', f2, ts, ts, ctexec, True, False, 13, today, 'py'), - (53, 'im the update', f2, ts, ts, ctexec, True, False, 13, today, 'py')]) - - def testupdate(self): + [(51, 'im the update', f2, ts, ts, ctexec, False, False, + 13, today, 'py'), + (52, 'im the update', f2, ts, ts, ctexec, True, False, + 13, today, 'py'), + (53, 'im the update', f2, ts, ts, ctexec, True, False, + 13, today, 'py')]) + + @testing.fails_on('firebird') # 'Data type unknown' + def test_update(self): r = t.insert().execute() pk = r.last_inserted_ids()[0] t.update(t.c.col1==pk).execute(col4=None, col5=None) ctexec = currenttime.scalar() l = t.select(t.c.col1==pk).execute() l = l.fetchone() - self.assert_(l == (pk, 'im the update', f2, None, None, ctexec, True, False, 13, datetime.date.today(), 'py')) - self.assert_(f2==11) + eq_(l, + (pk, 'im the update', f2, None, None, ctexec, True, False, + 13, datetime.date.today(), 'py')) + eq_(11, f2) - def testupdatevalues(self): + @testing.fails_on('firebird') # 'Data type unknown' + def test_update_values(self): r = t.insert().execute() pk = r.last_inserted_ids()[0] t.update(t.c.col1==pk, values={'col3': 55}).execute() l = t.select(t.c.col1==pk).execute() l = l.fetchone() - self.assert_(l['col3'] == 55) + eq_(55, l['col3']) @testing.fails_on_everything_except('postgres') - def testpassiveoverride(self): + def test_passive_override(self): """ Primarily for postgres, tests that when we get a primary key column back from reflecting a table which has a default value on it, we @@ -345,6 +362,7 @@ class DefaultTest(TestBase): locate the just inserted row. """ + # TODO: move this to dialect/postgres try: meta = MetaData(testing.db) testing.db.execute(""" @@ -360,58 +378,48 @@ class DefaultTest(TestBase): t = Table("speedy_users", meta, autoload=True) t.insert().execute(user_name='user', user_password='lala') l = t.select().execute().fetchall() - self.assert_(l == [(1, 'user', 'lala')]) + eq_(l, [(1, 'user', 'lala')]) finally: testing.db.execute("drop table speedy_users", None) -class PKDefaultTest(TestBase): - __requires__ = ('subqueries',) - - def setUpAll(self): - global metadata, t1, t2 - metadata = MetaData(testing.db) +class PKDefaultTest(_base.TablesTest): + __requires__ = ('subqueries',) + def define_tables(self, metadata): t2 = Table('t2', metadata, Column('nextid', Integer)) - t1 = Table('t1', metadata, - Column('id', Integer, primary_key=True, - default=select([func.max(t2.c.nextid)]).as_scalar()), - Column('data', String(30))) - - metadata.create_all() - - def tearDownAll(self): - metadata.drop_all() + Table('t1', metadata, + Column('id', Integer, primary_key=True, + default=sa.select([func.max(t2.c.nextid)]).as_scalar()), + Column('data', String(30))) @testing.crashes('mssql', 'FIXME: unknown, verify not fails_on') + @testing.resolve_artifact_names def test_basic(self): t2.insert().execute(nextid=1) r = t1.insert().execute(data='hi') - assert r.last_inserted_ids() == [1] + eq_([1], r.last_inserted_ids()) t2.insert().execute(nextid=2) r = t1.insert().execute(data='there') - assert r.last_inserted_ids() == [2] - + eq_([2], r.last_inserted_ids()) -class PKIncrementTest(TestBase): - def setUp(self): - global aitable, aimeta - aimeta = MetaData(testing.db) - aitable = Table("aitest", aimeta, - Column('id', Integer, Sequence('ai_id_seq', optional=True), - primary_key=True), - Column('int1', Integer), - Column('str1', String(20))) - aimeta.create_all() +class PKIncrementTest(_base.TablesTest): + run_define_tables = 'each' - def tearDown(self): - aimeta.drop_all() + def define_tables(self, metadata): + Table("aitable", metadata, + Column('id', Integer, Sequence('ai_id_seq', optional=True), + primary_key=True), + Column('int1', Integer), + Column('str1', String(20))) # TODO: add coverage for increment on a secondary column in a key + @testing.fails_on('firebird') # data type unknown + @testing.resolve_artifact_names def _test_autoincrement(self, bind): ids = set() rs = bind.execute(aitable.insert(), int1=1) @@ -438,13 +446,14 @@ class PKIncrementTest(TestBase): self.assert_(last not in ids) ids.add(last) - self.assert_( - list(bind.execute(aitable.select().order_by(aitable.c.id))) == + eq_(list(bind.execute(aitable.select().order_by(aitable.c.id))), [(1, 1, None), (2, None, 'row 2'), (3, 3, 'row 3'), (4, 4, None)]) + @testing.resolve_artifact_names def test_autoincrement_autocommit(self): self._test_autoincrement(testing.db) + @testing.resolve_artifact_names def test_autoincrement_transaction(self): con = testing.db.connect() tx = con.begin() @@ -463,64 +472,57 @@ class PKIncrementTest(TestBase): con.close() -class AutoIncrementTest(TestBase): +class AutoIncrementTest(_base.TablesTest): __requires__ = ('identity',) + run_define_tables = 'each' + + def define_tables(self, metadata): + """Each test manipulates self.metadata individually.""" @testing.exclude('sqlite', '<', (3, 4), 'no database support') def test_autoincrement_single_col(self): - metadata = MetaData(testing.db) + single = Table('single', self.metadata, + Column('id', Integer, primary_key=True)) + single.create() - single = Table('single', metadata, - Column('id', Integer, primary_key=True)) - metadata.create_all() - try: - r = single.insert().execute() - id_ = r.last_inserted_ids()[0] - assert id_ is not None - eq_(select([func.count(text('*'))], from_obj=single).scalar(), 1) - finally: - metadata.drop_all() + r = single.insert().execute() + id_ = r.last_inserted_ids()[0] + assert id_ is not None + eq_(1, sa.select([func.count(sa.text('*'))], from_obj=single).scalar()) def test_autoincrement_fk(self): - metadata = MetaData(testing.db) - - nodes = Table('nodes', metadata, + nodes = Table('nodes', self.metadata, Column('id', Integer, primary_key=True), Column('parent_id', Integer, ForeignKey('nodes.id')), Column('data', String(30))) - metadata.create_all() - try: - r = nodes.insert().execute(data='foo') - id_ = r.last_inserted_ids()[0] - nodes.insert().execute(data='bar', parent_id=id_) - finally: - metadata.drop_all() + nodes.create() + + r = nodes.insert().execute(data='foo') + id_ = r.last_inserted_ids()[0] + nodes.insert().execute(data='bar', parent_id=id_) @testing.fails_on('sqlite') def test_non_autoincrement(self): # sqlite INT primary keys can be non-unique! (only for ints) - meta = MetaData(testing.db) - nonai_table = Table("nonaitest", meta, + nonai = Table("nonaitest", self.metadata, Column('id', Integer, autoincrement=False, primary_key=True), Column('data', String(20))) - nonai_table.create(checkfirst=True) + nonai.create() + + try: - try: - # postgres + mysql strict will fail on first row, - # mysql in legacy mode fails on second row - nonai_table.insert().execute(data='row 1') - nonai_table.insert().execute(data='row 2') - assert False - except exc.SQLError, e: - print "Got exception", str(e) - assert True - - nonai_table.insert().execute(id=1, data='row 1') - finally: - nonai_table.drop() + # postgres + mysql strict will fail on first row, + # mysql in legacy mode fails on second row + nonai.insert().execute(data='row 1') + nonai.insert().execute(data='row 2') + assert False + except sa.exc.SQLError, e: + assert True + + nonai.insert().execute(id=1, data='row 1') -class SequenceTest(TestBase): +class SequenceTest(testing.TestBase): __requires__ = ('sequences',) def setUpAll(self): @@ -529,7 +531,7 @@ class SequenceTest(TestBase): cartitems = Table("cartitems", metadata, Column("cart_id", Integer, Sequence('cart_id_seq'), primary_key=True), Column("description", String(40)), - Column("createdate", DateTime()) + Column("createdate", sa.DateTime()) ) sometable = Table( 'Manager', metadata, Column('obj_id', Integer, Sequence('obj_id_seq'), ), @@ -551,14 +553,12 @@ class SequenceTest(TestBase): sometable.insert().execute( {'name':'name3'}, - {'name':'name4'} - ) - assert sometable.select().execute().fetchall() == [ - (1, "somename", 1), - (2, "someother", 2), - (3, "name3", 3), - (4, "name4", 4), - ] + {'name':'name4'}) + eq_(sometable.select().execute().fetchall(), + [(1, "somename", 1), + (2, "someother", 2), + (3, "name3", 3), + (4, "name4", 4)]) def testsequence(self): cartitems.insert().execute(description='hi') @@ -568,13 +568,13 @@ class SequenceTest(TestBase): assert r.last_inserted_ids() and r.last_inserted_ids()[0] is not None id_ = r.last_inserted_ids()[0] - assert select([func.count(cartitems.c.cart_id)], - and_(cartitems.c.description == 'lala', - cartitems.c.cart_id == id_)).scalar() == 1 + eq_(1, + sa.select([func.count(cartitems.c.cart_id)], + sa.and_(cartitems.c.description == 'lala', + cartitems.c.cart_id == id_)).scalar()) cartitems.select().execute().fetchall() - @testing.fails_on('maxdb') # maxdb db-api seems to double-execute NEXTVAL internally somewhere, # throwing off the numbers for these tests... @@ -583,7 +583,7 @@ class SequenceTest(TestBase): s.create() try: x = s.execute() - self.assert_(x == 1) + eq_(x, 1) finally: s.drop() @@ -593,7 +593,7 @@ class SequenceTest(TestBase): s.create(bind=testing.db) try: x = s.execute(testing.db) - self.assert_(x == 1) + eq_(x, 1) finally: s.drop(testing.db) |