summaryrefslogtreecommitdiff
path: root/test/sql/defaults.py
diff options
context:
space:
mode:
authorJason Kirtland <jek@discorporate.us>2008-05-20 00:14:51 +0000
committerJason Kirtland <jek@discorporate.us>2008-05-20 00:14:51 +0000
commit18a31d0316de48430ca5e8c8d442f11fdd3e26d6 (patch)
tree62acc61cb86450c516a0aa14b1b00b0fee96ac6b /test/sql/defaults.py
parent8e9fce417a04e647c6c3a3791b59a9cb704fc44d (diff)
downloadsqlalchemy-18a31d0316de48430ca5e8c8d442f11fdd3e26d6.tar.gz
- Quick cleanup of defaults.py. The main DefaultTest is still a mess.
Diffstat (limited to 'test/sql/defaults.py')
-rw-r--r--test/sql/defaults.py302
1 files changed, 151 insertions, 151 deletions
diff --git a/test/sql/defaults.py b/test/sql/defaults.py
index c496dfca8..fbea5888e 100644
--- a/test/sql/defaults.py
+++ b/test/sql/defaults.py
@@ -1,14 +1,14 @@
import testenv; testenv.configure_for_tests()
import datetime
-from sqlalchemy import *
-from sqlalchemy import exc, schema, util
-from sqlalchemy.orm import mapper, create_session
+from sqlalchemy import Sequence, Column, func
from testlib import sa, testing
+from testlib.sa import MetaData, Table, Integer, String, ForeignKey
from testlib.testing import eq_
-from testlib import *
+from testlib.compat import set
+from sql import _base
-class DefaultTest(TestBase):
+class DefaultTest(testing.TestBase):
def setUpAll(self):
global t, f, f2, ts, currenttime, metadata, default_generator
@@ -23,12 +23,12 @@ class DefaultTest(TestBase):
def myupdate_with_ctx(ctx):
conn = ctx.connection
- return conn.execute(select([text('13')])).scalar()
+ return conn.execute(sa.select([sa.text('13')])).scalar()
def mydefault_using_connection(ctx):
conn = ctx.connection
try:
- return conn.execute(select([text('12')])).scalar()
+ return conn.execute(sa.select([sa.text('12')])).scalar()
finally:
# ensure a "close()" on this connection does nothing,
# since its a "branched" connection
@@ -40,32 +40,32 @@ class DefaultTest(TestBase):
# select "count(1)" returns different results on different DBs also
# correct for "current_date" compatible as column default, value
# differences
- currenttime = func.current_date(type_=Date, bind=db)
+ currenttime = func.current_date(type_=sa.Date, bind=db)
if is_oracle:
- ts = db.scalar(select([func.trunc(func.sysdate(), literal_column("'DAY'"), type_=Date).label('today')]))
+ ts = db.scalar(sa.select([func.trunc(func.sysdate(), sa.literal_column("'DAY'"), type_=sa.Date).label('today')]))
assert isinstance(ts, datetime.date) and not isinstance(ts, datetime.datetime)
- f = select([func.length('abcdef')], bind=db).scalar()
- f2 = select([func.length('abcdefghijk')], bind=db).scalar()
+ f = sa.select([func.length('abcdef')], bind=db).scalar()
+ f2 = sa.select([func.length('abcdefghijk')], bind=db).scalar()
# TODO: engine propigation across nested functions not working
- currenttime = func.trunc(currenttime, literal_column("'DAY'"), bind=db, type_=Date)
+ currenttime = func.trunc(currenttime, sa.literal_column("'DAY'"), bind=db, type_=sa.Date)
def1 = currenttime
- def2 = func.trunc(text("sysdate"), literal_column("'DAY'"), type_=Date)
+ def2 = func.trunc(sa.text("sysdate"), sa.literal_column("'DAY'"), type_=sa.Date)
- deftype = Date
+ deftype = sa.Date
elif use_function_defaults:
- f = select([func.length('abcdef')], bind=db).scalar()
- f2 = select([func.length('abcdefghijk')], bind=db).scalar()
+ f = sa.select([func.length('abcdef')], bind=db).scalar()
+ f2 = sa.select([func.length('abcdefghijk')], bind=db).scalar()
def1 = currenttime
if testing.against('maxdb'):
- def2 = text("curdate")
+ def2 = sa.text("curdate")
else:
- def2 = text("current_date")
- deftype = Date
+ def2 = sa.text("current_date")
+ deftype = sa.Date
ts = db.func.current_date().scalar()
else:
- f = select([func.length('abcdef')], bind=db).scalar()
- f2 = select([func.length('abcdefghijk')], bind=db).scalar()
+ f = len('abcdef')
+ f2 = len('abcdefghijk')
def1 = def2 = "3"
ts = 3
deftype = Integer
@@ -94,12 +94,12 @@ class DefaultTest(TestBase):
server_default=def2),
# preexecute + update timestamp
- Column('col6', Date,
+ Column('col6', sa.Date,
default=currenttime,
onupdate=currenttime),
- Column('boolcol1', Boolean, default=True),
- Column('boolcol2', Boolean, default=False),
+ Column('boolcol1', sa.Boolean, default=True),
+ Column('boolcol2', sa.Boolean, default=False),
# python function which uses ExecutionContext
Column('col7', Integer,
@@ -107,7 +107,7 @@ class DefaultTest(TestBase):
onupdate=myupdate_with_ctx),
# python builtin
- Column('col8', Date,
+ Column('col8', sa.Date,
default=datetime.date.today,
onupdate=datetime.date.today),
# combo
@@ -123,7 +123,7 @@ class DefaultTest(TestBase):
default_generator['x'] = 50
t.delete().execute()
- def test_bad_argsignature(self):
+ def test_bad_arg_signature(self):
ex_msg = \
"ColumnDefault Python function takes zero or one positional arguments"
@@ -138,13 +138,11 @@ class DefaultTest(TestBase):
fn4 = FN4()
for fn in fn1, fn2, fn3, fn4:
- try:
- c = ColumnDefault(fn)
- assert False, str(fn)
- except exc.ArgumentError, e:
- assert str(e) == ex_msg
+ self.assertRaisesMessage(sa.exc.ArgumentError,
+ ex_msg,
+ sa.ColumnDefault, fn)
- def test_argsignature(self):
+ def test_arg_signature(self):
def fn1(): pass
def fn2(): pass
def fn3(x=1): pass
@@ -166,17 +164,18 @@ class DefaultTest(TestBase):
fn8 = FN8()
for fn in fn1, fn2, fn3, fn4, fn5, fn6, fn7, fn8:
- c = ColumnDefault(fn)
+ c = sa.ColumnDefault(fn)
- def teststandalone(self):
+ @testing.fails_on('firebird') # 'Data type unknown'
+ def test_standalone(self):
c = testing.db.engine.contextual_connect()
x = c.execute(t.c.col1.default)
y = t.c.col2.default.execute()
z = c.execute(t.c.col3.default)
- self.assert_(50 <= x <= 57)
- self.assert_(y == 'imthedefault')
- self.assert_(z == f)
- self.assert_(f2==11)
+ assert 50 <= x <= 57
+ eq_(y, 'imthedefault')
+ eq_(z, f)
+ eq_(f2, 11)
def test_py_vs_server_default_detection(self):
@@ -202,6 +201,8 @@ class DefaultTest(TestBase):
has_('col8', 'default', 'onupdate')
has_('col9', 'default', 'server_default')
+ ColumnDefault, DefaultClause = sa.ColumnDefault, sa.DefaultClause
+
t2 = Table('t2', MetaData(),
Column('col1', Integer, Sequence('foo')),
Column('col2', Integer,
@@ -244,31 +245,38 @@ class DefaultTest(TestBase):
has_('col7', 'default', 'server_default', 'onupdate')
has_('col8', 'default', 'server_default', 'onupdate', 'server_onupdate')
+ @testing.fails_on('firebird') # 'Data type unknown'
def test_insert(self):
r = t.insert().execute()
assert r.lastrow_has_defaults()
- eq_(util.Set(r.context.postfetch_cols), util.Set([t.c.col3, t.c.col5, t.c.col4, t.c.col6]))
+ eq_(set(r.context.postfetch_cols),
+ set([t.c.col3, t.c.col5, t.c.col4, t.c.col6]))
r = t.insert(inline=True).execute()
assert r.lastrow_has_defaults()
- eq_(util.Set(r.context.postfetch_cols), util.Set([t.c.col3, t.c.col5, t.c.col4, t.c.col6]))
+ eq_(set(r.context.postfetch_cols),
+ set([t.c.col3, t.c.col5, t.c.col4, t.c.col6]))
t.insert().execute()
- ctexec = select([currenttime.label('now')], bind=testing.db).scalar()
+ ctexec = sa.select([currenttime.label('now')], bind=testing.db).scalar()
l = t.select().execute()
today = datetime.date.today()
eq_(l.fetchall(), [
- (x, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today, 'py')
+ (x, 'imthedefault', f, ts, ts, ctexec, True, False,
+ 12, today, 'py')
for x in range(51, 54)])
t.insert().execute(col9=None)
assert r.lastrow_has_defaults()
- eq_(util.Set(r.context.postfetch_cols), util.Set([t.c.col3, t.c.col5, t.c.col4, t.c.col6]))
+ eq_(set(r.context.postfetch_cols),
+ set([t.c.col3, t.c.col5, t.c.col4, t.c.col6]))
eq_(t.select(t.c.col1==54).execute().fetchall(),
- [(54, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today, None)])
+ [(54, 'imthedefault', f, ts, ts, ctexec, True, False,
+ 12, today, None)])
+ @testing.fails_on('firebird') # 'Data type unknown'
def test_insertmany(self):
# MySQL-Python 1.2.2 breaks functions in execute_many :(
if (testing.against('mysql') and
@@ -281,15 +289,19 @@ class DefaultTest(TestBase):
l = t.select().execute()
today = datetime.date.today()
eq_(l.fetchall(),
- [(51, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today, 'py'),
- (52, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today, 'py'),
- (53, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today, 'py')])
+ [(51, 'imthedefault', f, ts, ts, ctexec, True, False,
+ 12, today, 'py'),
+ (52, 'imthedefault', f, ts, ts, ctexec, True, False,
+ 12, today, 'py'),
+ (53, 'imthedefault', f, ts, ts, ctexec, True, False,
+ 12, today, 'py')])
def test_insert_values(self):
t.insert(values={'col3':50}).execute()
l = t.select().execute()
- self.assert_(l.fetchone()['col3'] == 50)
+ eq_(50, l.fetchone()['col3'])
+ @testing.fails_on('firebird') # 'Data type unknown'
def test_updatemany(self):
# MySQL-Python 1.2.2 breaks functions in execute_many :(
if (testing.against('mysql') and
@@ -298,44 +310,49 @@ class DefaultTest(TestBase):
t.insert().execute({}, {}, {})
- t.update(t.c.col1==bindparam('pkval')).execute(
- {'pkval':51,'col7':None, 'col8':None, 'boolcol1':False},
- )
+ t.update(t.c.col1==sa.bindparam('pkval')).execute(
+ {'pkval':51,'col7':None, 'col8':None, 'boolcol1':False})
- t.update(t.c.col1==bindparam('pkval')).execute(
+ t.update(t.c.col1==sa.bindparam('pkval')).execute(
{'pkval':51,},
{'pkval':52,},
- {'pkval':53,},
- )
+ {'pkval':53,})
l = t.select().execute()
ctexec = currenttime.scalar()
today = datetime.date.today()
eq_(l.fetchall(),
- [(51, 'im the update', f2, ts, ts, ctexec, False, False, 13, today, 'py'),
- (52, 'im the update', f2, ts, ts, ctexec, True, False, 13, today, 'py'),
- (53, 'im the update', f2, ts, ts, ctexec, True, False, 13, today, 'py')])
-
- def testupdate(self):
+ [(51, 'im the update', f2, ts, ts, ctexec, False, False,
+ 13, today, 'py'),
+ (52, 'im the update', f2, ts, ts, ctexec, True, False,
+ 13, today, 'py'),
+ (53, 'im the update', f2, ts, ts, ctexec, True, False,
+ 13, today, 'py')])
+
+ @testing.fails_on('firebird') # 'Data type unknown'
+ def test_update(self):
r = t.insert().execute()
pk = r.last_inserted_ids()[0]
t.update(t.c.col1==pk).execute(col4=None, col5=None)
ctexec = currenttime.scalar()
l = t.select(t.c.col1==pk).execute()
l = l.fetchone()
- self.assert_(l == (pk, 'im the update', f2, None, None, ctexec, True, False, 13, datetime.date.today(), 'py'))
- self.assert_(f2==11)
+ eq_(l,
+ (pk, 'im the update', f2, None, None, ctexec, True, False,
+ 13, datetime.date.today(), 'py'))
+ eq_(11, f2)
- def testupdatevalues(self):
+ @testing.fails_on('firebird') # 'Data type unknown'
+ def test_update_values(self):
r = t.insert().execute()
pk = r.last_inserted_ids()[0]
t.update(t.c.col1==pk, values={'col3': 55}).execute()
l = t.select(t.c.col1==pk).execute()
l = l.fetchone()
- self.assert_(l['col3'] == 55)
+ eq_(55, l['col3'])
@testing.fails_on_everything_except('postgres')
- def testpassiveoverride(self):
+ def test_passive_override(self):
"""
Primarily for postgres, tests that when we get a primary key column
back from reflecting a table which has a default value on it, we
@@ -345,6 +362,7 @@ class DefaultTest(TestBase):
locate the just inserted row.
"""
+ # TODO: move this to dialect/postgres
try:
meta = MetaData(testing.db)
testing.db.execute("""
@@ -360,58 +378,48 @@ class DefaultTest(TestBase):
t = Table("speedy_users", meta, autoload=True)
t.insert().execute(user_name='user', user_password='lala')
l = t.select().execute().fetchall()
- self.assert_(l == [(1, 'user', 'lala')])
+ eq_(l, [(1, 'user', 'lala')])
finally:
testing.db.execute("drop table speedy_users", None)
-class PKDefaultTest(TestBase):
- __requires__ = ('subqueries',)
-
- def setUpAll(self):
- global metadata, t1, t2
- metadata = MetaData(testing.db)
+class PKDefaultTest(_base.TablesTest):
+ __requires__ = ('subqueries',)
+ def define_tables(self, metadata):
t2 = Table('t2', metadata,
Column('nextid', Integer))
- t1 = Table('t1', metadata,
- Column('id', Integer, primary_key=True,
- default=select([func.max(t2.c.nextid)]).as_scalar()),
- Column('data', String(30)))
-
- metadata.create_all()
-
- def tearDownAll(self):
- metadata.drop_all()
+ Table('t1', metadata,
+ Column('id', Integer, primary_key=True,
+ default=sa.select([func.max(t2.c.nextid)]).as_scalar()),
+ Column('data', String(30)))
@testing.crashes('mssql', 'FIXME: unknown, verify not fails_on')
+ @testing.resolve_artifact_names
def test_basic(self):
t2.insert().execute(nextid=1)
r = t1.insert().execute(data='hi')
- assert r.last_inserted_ids() == [1]
+ eq_([1], r.last_inserted_ids())
t2.insert().execute(nextid=2)
r = t1.insert().execute(data='there')
- assert r.last_inserted_ids() == [2]
-
+ eq_([2], r.last_inserted_ids())
-class PKIncrementTest(TestBase):
- def setUp(self):
- global aitable, aimeta
- aimeta = MetaData(testing.db)
- aitable = Table("aitest", aimeta,
- Column('id', Integer, Sequence('ai_id_seq', optional=True),
- primary_key=True),
- Column('int1', Integer),
- Column('str1', String(20)))
- aimeta.create_all()
+class PKIncrementTest(_base.TablesTest):
+ run_define_tables = 'each'
- def tearDown(self):
- aimeta.drop_all()
+ def define_tables(self, metadata):
+ Table("aitable", metadata,
+ Column('id', Integer, Sequence('ai_id_seq', optional=True),
+ primary_key=True),
+ Column('int1', Integer),
+ Column('str1', String(20)))
# TODO: add coverage for increment on a secondary column in a key
+ @testing.fails_on('firebird') # data type unknown
+ @testing.resolve_artifact_names
def _test_autoincrement(self, bind):
ids = set()
rs = bind.execute(aitable.insert(), int1=1)
@@ -438,13 +446,14 @@ class PKIncrementTest(TestBase):
self.assert_(last not in ids)
ids.add(last)
- self.assert_(
- list(bind.execute(aitable.select().order_by(aitable.c.id))) ==
+ eq_(list(bind.execute(aitable.select().order_by(aitable.c.id))),
[(1, 1, None), (2, None, 'row 2'), (3, 3, 'row 3'), (4, 4, None)])
+ @testing.resolve_artifact_names
def test_autoincrement_autocommit(self):
self._test_autoincrement(testing.db)
+ @testing.resolve_artifact_names
def test_autoincrement_transaction(self):
con = testing.db.connect()
tx = con.begin()
@@ -463,64 +472,57 @@ class PKIncrementTest(TestBase):
con.close()
-class AutoIncrementTest(TestBase):
+class AutoIncrementTest(_base.TablesTest):
__requires__ = ('identity',)
+ run_define_tables = 'each'
+
+ def define_tables(self, metadata):
+ """Each test manipulates self.metadata individually."""
@testing.exclude('sqlite', '<', (3, 4), 'no database support')
def test_autoincrement_single_col(self):
- metadata = MetaData(testing.db)
+ single = Table('single', self.metadata,
+ Column('id', Integer, primary_key=True))
+ single.create()
- single = Table('single', metadata,
- Column('id', Integer, primary_key=True))
- metadata.create_all()
- try:
- r = single.insert().execute()
- id_ = r.last_inserted_ids()[0]
- assert id_ is not None
- eq_(select([func.count(text('*'))], from_obj=single).scalar(), 1)
- finally:
- metadata.drop_all()
+ r = single.insert().execute()
+ id_ = r.last_inserted_ids()[0]
+ assert id_ is not None
+ eq_(1, sa.select([func.count(sa.text('*'))], from_obj=single).scalar())
def test_autoincrement_fk(self):
- metadata = MetaData(testing.db)
-
- nodes = Table('nodes', metadata,
+ nodes = Table('nodes', self.metadata,
Column('id', Integer, primary_key=True),
Column('parent_id', Integer, ForeignKey('nodes.id')),
Column('data', String(30)))
- metadata.create_all()
- try:
- r = nodes.insert().execute(data='foo')
- id_ = r.last_inserted_ids()[0]
- nodes.insert().execute(data='bar', parent_id=id_)
- finally:
- metadata.drop_all()
+ nodes.create()
+
+ r = nodes.insert().execute(data='foo')
+ id_ = r.last_inserted_ids()[0]
+ nodes.insert().execute(data='bar', parent_id=id_)
@testing.fails_on('sqlite')
def test_non_autoincrement(self):
# sqlite INT primary keys can be non-unique! (only for ints)
- meta = MetaData(testing.db)
- nonai_table = Table("nonaitest", meta,
+ nonai = Table("nonaitest", self.metadata,
Column('id', Integer, autoincrement=False, primary_key=True),
Column('data', String(20)))
- nonai_table.create(checkfirst=True)
+ nonai.create()
+
+
try:
- try:
- # postgres + mysql strict will fail on first row,
- # mysql in legacy mode fails on second row
- nonai_table.insert().execute(data='row 1')
- nonai_table.insert().execute(data='row 2')
- assert False
- except exc.SQLError, e:
- print "Got exception", str(e)
- assert True
-
- nonai_table.insert().execute(id=1, data='row 1')
- finally:
- nonai_table.drop()
+ # postgres + mysql strict will fail on first row,
+ # mysql in legacy mode fails on second row
+ nonai.insert().execute(data='row 1')
+ nonai.insert().execute(data='row 2')
+ assert False
+ except sa.exc.SQLError, e:
+ assert True
+
+ nonai.insert().execute(id=1, data='row 1')
-class SequenceTest(TestBase):
+class SequenceTest(testing.TestBase):
__requires__ = ('sequences',)
def setUpAll(self):
@@ -529,7 +531,7 @@ class SequenceTest(TestBase):
cartitems = Table("cartitems", metadata,
Column("cart_id", Integer, Sequence('cart_id_seq'), primary_key=True),
Column("description", String(40)),
- Column("createdate", DateTime())
+ Column("createdate", sa.DateTime())
)
sometable = Table( 'Manager', metadata,
Column('obj_id', Integer, Sequence('obj_id_seq'), ),
@@ -551,14 +553,12 @@ class SequenceTest(TestBase):
sometable.insert().execute(
{'name':'name3'},
- {'name':'name4'}
- )
- assert sometable.select().execute().fetchall() == [
- (1, "somename", 1),
- (2, "someother", 2),
- (3, "name3", 3),
- (4, "name4", 4),
- ]
+ {'name':'name4'})
+ eq_(sometable.select().execute().fetchall(),
+ [(1, "somename", 1),
+ (2, "someother", 2),
+ (3, "name3", 3),
+ (4, "name4", 4)])
def testsequence(self):
cartitems.insert().execute(description='hi')
@@ -568,13 +568,13 @@ class SequenceTest(TestBase):
assert r.last_inserted_ids() and r.last_inserted_ids()[0] is not None
id_ = r.last_inserted_ids()[0]
- assert select([func.count(cartitems.c.cart_id)],
- and_(cartitems.c.description == 'lala',
- cartitems.c.cart_id == id_)).scalar() == 1
+ eq_(1,
+ sa.select([func.count(cartitems.c.cart_id)],
+ sa.and_(cartitems.c.description == 'lala',
+ cartitems.c.cart_id == id_)).scalar())
cartitems.select().execute().fetchall()
-
@testing.fails_on('maxdb')
# maxdb db-api seems to double-execute NEXTVAL internally somewhere,
# throwing off the numbers for these tests...
@@ -583,7 +583,7 @@ class SequenceTest(TestBase):
s.create()
try:
x = s.execute()
- self.assert_(x == 1)
+ eq_(x, 1)
finally:
s.drop()
@@ -593,7 +593,7 @@ class SequenceTest(TestBase):
s.create(bind=testing.db)
try:
x = s.execute(testing.db)
- self.assert_(x == 1)
+ eq_(x, 1)
finally:
s.drop(testing.db)