summaryrefslogtreecommitdiff
path: root/test/dialect/test_postgresql.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/dialect/test_postgresql.py')
-rw-r--r--test/dialect/test_postgresql.py1844
1 files changed, 936 insertions, 908 deletions
diff --git a/test/dialect/test_postgresql.py b/test/dialect/test_postgresql.py
index d947ad4c4..86b3617ab 100644
--- a/test/dialect/test_postgresql.py
+++ b/test/dialect/test_postgresql.py
@@ -16,37 +16,45 @@ from test.engine._base import TablesTest
import logging
class SequenceTest(TestBase, AssertsCompiledSQL):
+
def test_basic(self):
- seq = Sequence("my_seq_no_schema")
+ seq = Sequence('my_seq_no_schema')
dialect = postgresql.PGDialect()
- assert dialect.identifier_preparer.format_sequence(seq) == "my_seq_no_schema"
-
- seq = Sequence("my_seq", schema="some_schema")
- assert dialect.identifier_preparer.format_sequence(seq) == "some_schema.my_seq"
-
- seq = Sequence("My_Seq", schema="Some_Schema")
- assert dialect.identifier_preparer.format_sequence(seq) == '"Some_Schema"."My_Seq"'
+ assert dialect.identifier_preparer.format_sequence(seq) \
+ == 'my_seq_no_schema'
+ seq = Sequence('my_seq', schema='some_schema')
+ assert dialect.identifier_preparer.format_sequence(seq) \
+ == 'some_schema.my_seq'
+ seq = Sequence('My_Seq', schema='Some_Schema')
+ assert dialect.identifier_preparer.format_sequence(seq) \
+ == '"Some_Schema"."My_Seq"'
class CompileTest(TestBase, AssertsCompiledSQL):
+
__dialect__ = postgresql.dialect()
def test_update_returning(self):
dialect = postgresql.dialect()
- table1 = table('mytable',
- column('myid', Integer),
- column('name', String(128)),
- column('description', String(128)),
- )
-
- u = update(table1, values=dict(name='foo')).returning(table1.c.myid, table1.c.name)
- self.assert_compile(u, "UPDATE mytable SET name=%(name)s RETURNING mytable.myid, mytable.name", dialect=dialect)
-
+ table1 = table('mytable', column('myid', Integer), column('name'
+ , String(128)), column('description',
+ String(128)))
+ u = update(table1, values=dict(name='foo'
+ )).returning(table1.c.myid, table1.c.name)
+ self.assert_compile(u,
+ 'UPDATE mytable SET name=%(name)s '
+ 'RETURNING mytable.myid, mytable.name',
+ dialect=dialect)
u = update(table1, values=dict(name='foo')).returning(table1)
- self.assert_compile(u, "UPDATE mytable SET name=%(name)s "\
- "RETURNING mytable.myid, mytable.name, mytable.description", dialect=dialect)
-
- u = update(table1, values=dict(name='foo')).returning(func.length(table1.c.name))
- self.assert_compile(u, "UPDATE mytable SET name=%(name)s RETURNING length(mytable.name) AS length_1", dialect=dialect)
+ self.assert_compile(u,
+ 'UPDATE mytable SET name=%(name)s '
+ 'RETURNING mytable.myid, mytable.name, '
+ 'mytable.description', dialect=dialect)
+ u = update(table1, values=dict(name='foo'
+ )).returning(func.length(table1.c.name))
+ self.assert_compile(u,
+ 'UPDATE mytable SET name=%(name)s '
+ 'RETURNING length(mytable.name) AS length_1'
+ , dialect=dialect)
def test_insert_returning(self):
@@ -57,126 +65,142 @@ class CompileTest(TestBase, AssertsCompiledSQL):
column('description', String(128)),
)
- i = insert(table1, values=dict(name='foo')).returning(table1.c.myid, table1.c.name)
- self.assert_compile(i, "INSERT INTO mytable (name) VALUES (%(name)s) RETURNING mytable.myid, mytable.name", dialect=dialect)
-
+ i = insert(table1, values=dict(name='foo'
+ )).returning(table1.c.myid, table1.c.name)
+ self.assert_compile(i,
+ 'INSERT INTO mytable (name) VALUES '
+ '(%(name)s) RETURNING mytable.myid, '
+ 'mytable.name', dialect=dialect)
i = insert(table1, values=dict(name='foo')).returning(table1)
- self.assert_compile(i, "INSERT INTO mytable (name) VALUES (%(name)s) "\
- "RETURNING mytable.myid, mytable.name, mytable.description", dialect=dialect)
-
- i = insert(table1, values=dict(name='foo')).returning(func.length(table1.c.name))
- self.assert_compile(i, "INSERT INTO mytable (name) VALUES (%(name)s) RETURNING length(mytable.name) AS length_1", dialect=dialect)
+ self.assert_compile(i,
+ 'INSERT INTO mytable (name) VALUES '
+ '(%(name)s) RETURNING mytable.myid, '
+ 'mytable.name, mytable.description',
+ dialect=dialect)
+ i = insert(table1, values=dict(name='foo'
+ )).returning(func.length(table1.c.name))
+ self.assert_compile(i,
+ 'INSERT INTO mytable (name) VALUES '
+ '(%(name)s) RETURNING length(mytable.name) '
+ 'AS length_1', dialect=dialect)
- @testing.uses_deprecated(r".*argument is deprecated. Please use statement.returning.*")
+ @testing.uses_deprecated('.*argument is deprecated. Please use '
+ 'statement.returning.*')
def test_old_returning_names(self):
dialect = postgresql.dialect()
- table1 = table('mytable',
- column('myid', Integer),
- column('name', String(128)),
- column('description', String(128)),
- )
-
- u = update(table1, values=dict(name='foo'), postgres_returning=[table1.c.myid, table1.c.name])
- self.assert_compile(u, "UPDATE mytable SET name=%(name)s RETURNING mytable.myid, mytable.name", dialect=dialect)
-
- u = update(table1, values=dict(name='foo'), postgresql_returning=[table1.c.myid, table1.c.name])
- self.assert_compile(u, "UPDATE mytable SET name=%(name)s RETURNING mytable.myid, mytable.name", dialect=dialect)
-
- i = insert(table1, values=dict(name='foo'), postgres_returning=[table1.c.myid, table1.c.name])
- self.assert_compile(i, "INSERT INTO mytable (name) VALUES (%(name)s) RETURNING mytable.myid, mytable.name", dialect=dialect)
+ table1 = table('mytable', column('myid', Integer), column('name'
+ , String(128)), column('description',
+ String(128)))
+ u = update(table1, values=dict(name='foo'),
+ postgres_returning=[table1.c.myid, table1.c.name])
+ self.assert_compile(u,
+ 'UPDATE mytable SET name=%(name)s '
+ 'RETURNING mytable.myid, mytable.name',
+ dialect=dialect)
+ u = update(table1, values=dict(name='foo'),
+ postgresql_returning=[table1.c.myid, table1.c.name])
+ self.assert_compile(u,
+ 'UPDATE mytable SET name=%(name)s '
+ 'RETURNING mytable.myid, mytable.name',
+ dialect=dialect)
+ i = insert(table1, values=dict(name='foo'),
+ postgres_returning=[table1.c.myid, table1.c.name])
+ self.assert_compile(i,
+ 'INSERT INTO mytable (name) VALUES '
+ '(%(name)s) RETURNING mytable.myid, '
+ 'mytable.name', dialect=dialect)
def test_create_partial_index(self):
m = MetaData()
- tbl = Table('testtbl', m, Column('data',Integer))
- idx = Index('test_idx1', tbl.c.data, postgresql_where=and_(tbl.c.data > 5, tbl.c.data < 10))
- idx = Index('test_idx1', tbl.c.data, postgresql_where=and_(tbl.c.data > 5, tbl.c.data < 10))
-
- # test quoting and all that
- idx2 = Index('test_idx2', tbl.c.data, postgresql_where=and_(tbl.c.data > 'a', tbl.c.data < "b's"))
+ tbl = Table('testtbl', m, Column('data', Integer))
+ idx = Index('test_idx1', tbl.c.data,
+ postgresql_where=and_(tbl.c.data > 5, tbl.c.data
+ < 10))
+ idx = Index('test_idx1', tbl.c.data,
+ postgresql_where=and_(tbl.c.data > 5, tbl.c.data
+ < 10))
- self.assert_compile(schema.CreateIndex(idx),
- "CREATE INDEX test_idx1 ON testtbl (data) WHERE data > 5 AND data < 10", dialect=postgresql.dialect())
+ # test quoting and all that
- self.assert_compile(schema.CreateIndex(idx2),
- "CREATE INDEX test_idx2 ON testtbl (data) WHERE data > 'a' AND data < 'b''s'", dialect=postgresql.dialect())
-
- @testing.uses_deprecated(r".*'postgres_where' argument has been renamed.*")
+ idx2 = Index('test_idx2', tbl.c.data,
+ postgresql_where=and_(tbl.c.data > 'a', tbl.c.data
+ < "b's"))
+ self.assert_compile(schema.CreateIndex(idx),
+ 'CREATE INDEX test_idx1 ON testtbl (data) '
+ 'WHERE data > 5 AND data < 10',
+ dialect=postgresql.dialect())
+ self.assert_compile(schema.CreateIndex(idx2),
+ "CREATE INDEX test_idx2 ON testtbl (data) "
+ "WHERE data > 'a' AND data < 'b''s'",
+ dialect=postgresql.dialect())
+
+ @testing.uses_deprecated(r".*'postgres_where' argument has been "
+ "renamed.*")
def test_old_create_partial_index(self):
- tbl = Table('testtbl', MetaData(), Column('data',Integer))
- idx = Index('test_idx1', tbl.c.data, postgres_where=and_(tbl.c.data > 5, tbl.c.data < 10))
+ tbl = Table('testtbl', MetaData(), Column('data', Integer))
+ idx = Index('test_idx1', tbl.c.data,
+ postgres_where=and_(tbl.c.data > 5, tbl.c.data
+ < 10))
+ self.assert_compile(schema.CreateIndex(idx),
+ 'CREATE INDEX test_idx1 ON testtbl (data) '
+ 'WHERE data > 5 AND data < 10',
+ dialect=postgresql.dialect())
- self.assert_compile(schema.CreateIndex(idx),
- "CREATE INDEX test_idx1 ON testtbl (data) WHERE data > 5 AND data < 10", dialect=postgresql.dialect())
-
def test_extract(self):
- t = table('t', column('col1', DateTime), column('col2', Date), column('col3', Time),
- column('col4', postgresql.INTERVAL)
- )
-
+ t = table('t', column('col1', DateTime), column('col2', Date),
+ column('col3', Time), column('col4',
+ postgresql.INTERVAL))
for field in 'year', 'month', 'day', 'epoch', 'hour':
- for expr, compiled_expr in [
-
- ( t.c.col1, "t.col1 :: timestamp" ),
- ( t.c.col2, "t.col2 :: date" ),
- ( t.c.col3, "t.col3 :: time" ),
+ for expr, compiled_expr in [ # invalid, no cast. plain
+ # text. no cast. addition is
+ # commutative subtraction is
+ # not invalid - no cast. dont
+ # crack up on entirely
+ # unsupported types
+ (t.c.col1, 't.col1 :: timestamp'),
+ (t.c.col2, 't.col2 :: date'),
+ (t.c.col3, 't.col3 :: time'),
(func.current_timestamp() - datetime.timedelta(days=5),
- "(CURRENT_TIMESTAMP - %(current_timestamp_1)s) :: timestamp"
- ),
- (func.current_timestamp() + func.current_timestamp(),
- "CURRENT_TIMESTAMP + CURRENT_TIMESTAMP" # invalid, no cast.
- ),
- (text("foo.date + foo.time"),
- "foo.date + foo.time" # plain text. no cast.
- ),
- (func.current_timestamp() + datetime.timedelta(days=5),
- "(CURRENT_TIMESTAMP + %(current_timestamp_1)s) :: timestamp"
- ),
- (t.c.col2 + t.c.col3,
- "(t.col2 + t.col3) :: timestamp"
- ),
- # addition is commutative
+ '(CURRENT_TIMESTAMP - %(current_timestamp_1)s) :: '
+ 'timestamp'),
+ (func.current_timestamp() + func.current_timestamp(),
+ 'CURRENT_TIMESTAMP + CURRENT_TIMESTAMP'),
+ (text('foo.date + foo.time'), 'foo.date + foo.time'),
+ (func.current_timestamp() + datetime.timedelta(days=5),
+ '(CURRENT_TIMESTAMP + %(current_timestamp_1)s) :: '
+ 'timestamp'),
+ (t.c.col2 + t.c.col3, '(t.col2 + t.col3) :: timestamp'
+ ),
(t.c.col2 + datetime.timedelta(days=5),
- "(t.col2 + %(col2_1)s) :: timestamp"
- ),
+ '(t.col2 + %(col2_1)s) :: timestamp'),
(datetime.timedelta(days=5) + t.c.col2,
- "(%(col2_1)s + t.col2) :: timestamp"
- ),
- (t.c.col1 + t.c.col4,
- "(t.col1 + t.col4) :: timestamp"
- ),
- # subtraction is not
+ '(%(col2_1)s + t.col2) :: timestamp'),
+ (t.c.col1 + t.c.col4, '(t.col1 + t.col4) :: timestamp'
+ ),
(t.c.col1 - datetime.timedelta(seconds=30),
- "(t.col1 - %(col1_1)s) :: timestamp"
- ),
+ '(t.col1 - %(col1_1)s) :: timestamp'),
(datetime.timedelta(seconds=30) - t.c.col1,
- "%(col1_1)s - t.col1" # invalid - no cast.
- ),
+ '%(col1_1)s - t.col1'),
(func.coalesce(t.c.col1, func.current_timestamp()),
- "coalesce(t.col1, CURRENT_TIMESTAMP) :: timestamp"
- ),
+ 'coalesce(t.col1, CURRENT_TIMESTAMP) :: timestamp'),
(t.c.col3 + datetime.timedelta(seconds=30),
- "(t.col3 + %(col3_1)s) :: time"
- ),
- (func.current_timestamp() - func.coalesce(t.c.col1, func.current_timestamp()),
- "(CURRENT_TIMESTAMP - coalesce(t.col1, CURRENT_TIMESTAMP)) :: interval",
- ),
+ '(t.col3 + %(col3_1)s) :: time'),
+ (func.current_timestamp() - func.coalesce(t.c.col1,
+ func.current_timestamp()),
+ '(CURRENT_TIMESTAMP - coalesce(t.col1, '
+ 'CURRENT_TIMESTAMP)) :: interval'),
(3 * func.foobar(type_=Interval),
- "(%(foobar_1)s * foobar()) :: interval"
- ),
- (literal(datetime.timedelta(seconds=10)) - literal(datetime.timedelta(seconds=10)),
- "(%(param_1)s - %(param_2)s) :: interval"
- ),
- (t.c.col3 + "some string", # dont crack up on entirely unsupported types
- "t.col3 + %(col3_1)s"
- )
- ]:
- self.assert_compile(
- select([extract(field, expr)]).select_from(t),
- "SELECT EXTRACT(%s FROM %s) AS anon_1 FROM t" % (
- field,
- compiled_expr
- )
- )
+ '(%(foobar_1)s * foobar()) :: interval'),
+ (literal(datetime.timedelta(seconds=10))
+ - literal(datetime.timedelta(seconds=10)),
+ '(%(param_1)s - %(param_2)s) :: interval'),
+ (t.c.col3 + 'some string', 't.col3 + %(col3_1)s'),
+ ]:
+ self.assert_compile(select([extract(field,
+ expr)]).select_from(t),
+ 'SELECT EXTRACT(%s FROM %s) AS '
+ 'anon_1 FROM t' % (field,
+ compiled_expr))
class FloatCoercionTest(TablesTest, AssertsExecutionResults):
__only_on__ = 'postgresql'
@@ -245,73 +269,53 @@ class FloatCoercionTest(TablesTest, AssertsExecutionResults):
)
class EnumTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL):
+
__only_on__ = 'postgresql'
__dialect__ = postgresql.dialect()
-
- def test_compile(self):
- e1 = Enum('x', 'y', 'z', name="somename")
- e2 = Enum('x', 'y', 'z', name="somename", schema='someschema')
-
- self.assert_compile(
- postgresql.CreateEnumType(e1),
- "CREATE TYPE somename AS ENUM ('x','y','z')"
- )
-
- self.assert_compile(
- postgresql.CreateEnumType(e2),
- "CREATE TYPE someschema.somename AS ENUM ('x','y','z')"
- )
-
- self.assert_compile(
- postgresql.DropEnumType(e1),
- "DROP TYPE somename"
- )
- self.assert_compile(
- postgresql.DropEnumType(e2),
- "DROP TYPE someschema.somename"
- )
-
+ def test_compile(self):
+ e1 = Enum('x', 'y', 'z', name='somename')
+ e2 = Enum('x', 'y', 'z', name='somename', schema='someschema')
+ self.assert_compile(postgresql.CreateEnumType(e1),
+ "CREATE TYPE somename AS ENUM ('x','y','z')"
+ )
+ self.assert_compile(postgresql.CreateEnumType(e2),
+ "CREATE TYPE someschema.somename AS ENUM "
+ "('x','y','z')")
+ self.assert_compile(postgresql.DropEnumType(e1),
+ 'DROP TYPE somename')
+ self.assert_compile(postgresql.DropEnumType(e2),
+ 'DROP TYPE someschema.somename')
t1 = Table('sometable', MetaData(), Column('somecolumn', e1))
- self.assert_compile(
- schema.CreateTable(t1),
- "CREATE TABLE sometable ("
- "somecolumn somename"
- ")"
- )
- t1 = Table('sometable', MetaData(),
- Column('somecolumn', Enum('x', 'y', 'z', native_enum=False))
- )
- self.assert_compile(
- schema.CreateTable(t1),
- "CREATE TABLE sometable ("
- "somecolumn VARCHAR(1), "
- "CHECK (somecolumn IN ('x', 'y', 'z'))"
- ")"
- )
-
+ self.assert_compile(schema.CreateTable(t1),
+ 'CREATE TABLE sometable (somecolumn '
+ 'somename)')
+ t1 = Table('sometable', MetaData(), Column('somecolumn',
+ Enum('x', 'y', 'z', native_enum=False)))
+ self.assert_compile(schema.CreateTable(t1),
+ "CREATE TABLE sometable (somecolumn "
+ "VARCHAR(1), CHECK (somecolumn IN ('x', "
+ "'y', 'z')))")
- @testing.fails_on('postgresql+zxjdbc',
- 'zxjdbc fails on ENUM: column "XXX" is of type XXX '
- 'but expression is of type character varying')
- @testing.fails_on('postgresql+pg8000',
- 'zxjdbc fails on ENUM: column "XXX" is of type XXX '
- 'but expression is of type text')
+ @testing.fails_on('postgresql+zxjdbc',
+ 'zxjdbc fails on ENUM: column "XXX" is of type '
+ 'XXX but expression is of type character varying')
+ @testing.fails_on('postgresql+pg8000',
+ 'zxjdbc fails on ENUM: column "XXX" is of type '
+ 'XXX but expression is of type text')
def test_create_table(self):
metadata = MetaData(testing.db)
- t1 = Table('table', metadata,
- Column('id', Integer, primary_key=True),
- Column('value', Enum('one', 'two', 'three', name='onetwothreetype'))
- )
+ t1 = Table('table', metadata, Column('id', Integer,
+ primary_key=True), Column('value', Enum('one', 'two'
+ , 'three', name='onetwothreetype')))
t1.create()
- t1.create(checkfirst=True) # check the create
+ t1.create(checkfirst=True) # check the create
try:
t1.insert().execute(value='two')
t1.insert().execute(value='three')
t1.insert().execute(value='three')
- eq_(t1.select().order_by(t1.c.id).execute().fetchall(),
- [(1, 'two'), (2, 'three'), (3, 'three')]
- )
+ eq_(t1.select().order_by(t1.c.id).execute().fetchall(),
+ [(1, 'two'), (2, 'three'), (3, 'three')])
finally:
metadata.drop_all()
metadata.drop_all()
@@ -320,20 +324,24 @@ class EnumTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL):
metadata = MetaData(testing.db)
etype = Enum('four', 'five', 'six', metadata=metadata)
assert_raises(exc.ArgumentError, etype.create)
- assert_raises(exc.ArgumentError, etype.compile, dialect=postgresql.dialect())
-
- @testing.fails_on('postgresql+zxjdbc',
- 'zxjdbc fails on ENUM: column "XXX" is of type XXX '
- 'but expression is of type character varying')
- @testing.fails_on('postgresql+pg8000',
- 'zxjdbc fails on ENUM: column "XXX" is of type XXX '
- 'but expression is of type text')
+ assert_raises(exc.ArgumentError, etype.compile,
+ dialect=postgresql.dialect())
+
+ @testing.fails_on('postgresql+zxjdbc',
+ 'zxjdbc fails on ENUM: column "XXX" is of type '
+ 'XXX but expression is of type character varying')
+ @testing.fails_on('postgresql+pg8000',
+ 'zxjdbc fails on ENUM: column "XXX" is of type '
+ 'XXX but expression is of type text')
def test_unicode_labels(self):
metadata = MetaData(testing.db)
t1 = Table('table', metadata,
Column('id', Integer, primary_key=True),
- Column('value', Enum(u'réveillé', u'drôle', u'S’il', name='onetwothreetype'))
+ Column('value',
+ Enum(u'réveillé', u'drôle', u'S’il',
+ name='onetwothreetype'))
)
+
metadata.create_all()
try:
t1.insert().execute(value=u'drôle')
@@ -342,83 +350,76 @@ class EnumTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL):
eq_(t1.select().order_by(t1.c.id).execute().fetchall(),
[(1, u'drôle'), (2, u'réveillé'), (3, u'S’il')]
)
-
m2 = MetaData(testing.db)
t2 = Table('table', m2, autoload=True)
assert t2.c.value.type.enums == (u'réveillé', u'drôle', u'S’il')
-
finally:
metadata.drop_all()
-
+
def test_non_native_type(self):
metadata = MetaData()
- t1 = Table('foo', metadata,
- Column('bar', Enum('one', 'two', 'three', name='myenum', native_enum=False))
- )
-
+ t1 = Table('foo', metadata, Column('bar', Enum('one', 'two',
+ 'three', name='myenum', native_enum=False)))
+
def go():
t1.create(testing.db)
-
+
try:
- self.assert_sql(testing.db, go, [], with_sequences=[
- (
- "CREATE TABLE foo (\tbar VARCHAR(5), \t"
- "CONSTRAINT myenum CHECK (bar IN ('one', 'two', 'three')))",
- {}
- )]
- )
+ self.assert_sql(testing.db, go, [],
+ with_sequences=[("CREATE TABLE foo (\tbar "
+ "VARCHAR(5), \tCONSTRAINT myenum CHECK "
+ "(bar IN ('one', 'two', 'three')))", {})])
finally:
metadata.drop_all(testing.db)
-
+
def test_non_native_dialect(self):
engine = engines.testing_engine()
engine.connect()
engine.dialect.supports_native_enum = False
-
metadata = MetaData()
- t1 = Table('foo', metadata,
- Column('bar', Enum('one', 'two', 'three', name='myenum'))
- )
-
+ t1 = Table('foo', metadata, Column('bar', Enum('one', 'two',
+ 'three', name='myenum')))
+
def go():
t1.create(engine)
-
+
try:
- self.assert_sql(engine, go, [], with_sequences=[
- (
- "CREATE TABLE foo (\tbar VARCHAR(5), \t"
- "CONSTRAINT myenum CHECK (bar IN ('one', 'two', 'three')))",
- {}
- )]
- )
+ self.assert_sql(engine, go, [],
+ with_sequences=[("CREATE TABLE foo (\tbar "
+ "VARCHAR(5), \tCONSTRAINT myenum CHECK "
+ "(bar IN ('one', 'two', 'three')))", {})])
finally:
metadata.drop_all(engine)
-
+
def test_standalone_enum(self):
metadata = MetaData(testing.db)
- etype = Enum('four', 'five', 'six', name='fourfivesixtype', metadata=metadata)
+ etype = Enum('four', 'five', 'six', name='fourfivesixtype',
+ metadata=metadata)
etype.create()
try:
- assert testing.db.dialect.has_type(testing.db, 'fourfivesixtype')
+ assert testing.db.dialect.has_type(testing.db,
+ 'fourfivesixtype')
finally:
etype.drop()
- assert not testing.db.dialect.has_type(testing.db, 'fourfivesixtype')
-
+ assert not testing.db.dialect.has_type(testing.db,
+ 'fourfivesixtype')
metadata.create_all()
try:
- assert testing.db.dialect.has_type(testing.db, 'fourfivesixtype')
+ assert testing.db.dialect.has_type(testing.db,
+ 'fourfivesixtype')
finally:
metadata.drop_all()
- assert not testing.db.dialect.has_type(testing.db, 'fourfivesixtype')
-
+ assert not testing.db.dialect.has_type(testing.db,
+ 'fourfivesixtype')
+
def test_reflection(self):
metadata = MetaData(testing.db)
- etype = Enum('four', 'five', 'six', name='fourfivesixtype', metadata=metadata)
- t1 = Table('table', metadata,
- Column('id', Integer, primary_key=True),
- Column('value', Enum('one', 'two', 'three', name='onetwothreetype')),
- Column('value2', etype)
- )
+ etype = Enum('four', 'five', 'six', name='fourfivesixtype',
+ metadata=metadata)
+ t1 = Table('table', metadata, Column('id', Integer,
+ primary_key=True), Column('value', Enum('one', 'two'
+ , 'three', name='onetwothreetype')), Column('value2'
+ , etype))
metadata.create_all()
try:
m2 = MetaData(testing.db)
@@ -432,16 +433,18 @@ class EnumTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL):
def test_schema_reflection(self):
metadata = MetaData(testing.db)
- etype = Enum('four', 'five', 'six',
- name='fourfivesixtype',
- schema='test_schema',
- metadata=metadata)
- t1 = Table('table', metadata,
- Column('id', Integer, primary_key=True),
- Column('value', Enum('one', 'two', 'three',
- name='onetwothreetype', schema='test_schema')),
- Column('value2', etype)
- )
+ etype = Enum(
+ 'four',
+ 'five',
+ 'six',
+ name='fourfivesixtype',
+ schema='test_schema',
+ metadata=metadata,
+ )
+ t1 = Table('table', metadata, Column('id', Integer,
+ primary_key=True), Column('value', Enum('one', 'two'
+ , 'three', name='onetwothreetype',
+ schema='test_schema')), Column('value2', etype))
metadata.create_all()
try:
m2 = MetaData(testing.db)
@@ -455,12 +458,13 @@ class EnumTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL):
metadata.drop_all()
class InsertTest(TestBase, AssertsExecutionResults):
+
__only_on__ = 'postgresql'
@classmethod
def setup_class(cls):
global metadata
- cls.engine= testing.db
+ cls.engine = testing.db
metadata = MetaData(testing.db)
def teardown(self):
@@ -470,142 +474,136 @@ class InsertTest(TestBase, AssertsExecutionResults):
self.engine.dispose()
def test_compiled_insert(self):
- table = Table('testtable', metadata,
- Column('id', Integer, primary_key=True),
- Column('data', String(30)))
-
+ table = Table('testtable', metadata, Column('id', Integer,
+ primary_key=True), Column('data', String(30)))
metadata.create_all()
-
- ins = table.insert(inline=True, values={'data':bindparam('x')}).compile()
- ins.execute({'x':"five"}, {'x':"seven"})
- assert table.select().execute().fetchall() == [(1, 'five'), (2, 'seven')]
+ ins = table.insert(inline=True, values={'data': bindparam('x'
+ )}).compile()
+ ins.execute({'x': 'five'}, {'x': 'seven'})
+ assert table.select().execute().fetchall() == [(1, 'five'), (2,
+ 'seven')]
def test_foreignkey_missing_insert(self):
- t1 = Table('t1', metadata,
- Column('id', Integer, primary_key=True)
- )
- t2 = Table('t2', metadata,
- Column('id', Integer, ForeignKey('t1.id'), primary_key=True)
- )
+ t1 = Table('t1', metadata, Column('id', Integer,
+ primary_key=True))
+ t2 = Table('t2', metadata, Column('id', Integer,
+ ForeignKey('t1.id'), primary_key=True))
metadata.create_all()
-
- # want to ensure that
- # "null value in column "id" violates not-null constraint" is raised (IntegrityError on psycoopg2,
- # but ProgrammingError on pg8000),
- # and not "ProgrammingError: (ProgrammingError) relationship "t2_id_seq" does not exist".
- # the latter corresponds to autoincrement behavior, which is not the case
- # here due to the foreign key.
- for eng in [
- engines.testing_engine(options={'implicit_returning':False}),
- engines.testing_engine(options={'implicit_returning':True}),
- ]:
- assert_raises_message(exc.DBAPIError, "violates not-null constraint", eng.execute, t2.insert())
-
+
+ # want to ensure that "null value in column "id" violates not-
+ # null constraint" is raised (IntegrityError on psycoopg2, but
+ # ProgrammingError on pg8000), and not "ProgrammingError:
+ # (ProgrammingError) relationship "t2_id_seq" does not exist".
+ # the latter corresponds to autoincrement behavior, which is not
+ # the case here due to the foreign key.
+
+ for eng in [engines.testing_engine(options={'implicit_returning'
+ : False}),
+ engines.testing_engine(options={'implicit_returning'
+ : True})]:
+ assert_raises_message(exc.DBAPIError,
+ 'violates not-null constraint',
+ eng.execute, t2.insert())
def test_sequence_insert(self):
- table = Table('testtable', metadata,
- Column('id', Integer, Sequence('my_seq'), primary_key=True),
- Column('data', String(30)))
+ table = Table('testtable', metadata, Column('id', Integer,
+ Sequence('my_seq'), primary_key=True),
+ Column('data', String(30)))
metadata.create_all()
- self._assert_data_with_sequence(table, "my_seq")
+ self._assert_data_with_sequence(table, 'my_seq')
def test_sequence_returning_insert(self):
- table = Table('testtable', metadata,
- Column('id', Integer, Sequence('my_seq'), primary_key=True),
- Column('data', String(30)))
+ table = Table('testtable', metadata, Column('id', Integer,
+ Sequence('my_seq'), primary_key=True),
+ Column('data', String(30)))
metadata.create_all()
- self._assert_data_with_sequence_returning(table, "my_seq")
+ self._assert_data_with_sequence_returning(table, 'my_seq')
def test_opt_sequence_insert(self):
- table = Table('testtable', metadata,
- Column('id', Integer, Sequence('my_seq', optional=True), primary_key=True),
- Column('data', String(30)))
+ table = Table('testtable', metadata, Column('id', Integer,
+ Sequence('my_seq', optional=True),
+ primary_key=True), Column('data', String(30)))
metadata.create_all()
self._assert_data_autoincrement(table)
def test_opt_sequence_returning_insert(self):
- table = Table('testtable', metadata,
- Column('id', Integer, Sequence('my_seq', optional=True), primary_key=True),
- Column('data', String(30)))
+ table = Table('testtable', metadata, Column('id', Integer,
+ Sequence('my_seq', optional=True),
+ primary_key=True), Column('data', String(30)))
metadata.create_all()
self._assert_data_autoincrement_returning(table)
def test_autoincrement_insert(self):
- table = Table('testtable', metadata,
- Column('id', Integer, primary_key=True),
- Column('data', String(30)))
+ table = Table('testtable', metadata, Column('id', Integer,
+ primary_key=True), Column('data', String(30)))
metadata.create_all()
self._assert_data_autoincrement(table)
def test_autoincrement_returning_insert(self):
- table = Table('testtable', metadata,
- Column('id', Integer, primary_key=True),
- Column('data', String(30)))
+ table = Table('testtable', metadata, Column('id', Integer,
+ primary_key=True), Column('data', String(30)))
metadata.create_all()
self._assert_data_autoincrement_returning(table)
def test_noautoincrement_insert(self):
- table = Table('testtable', metadata,
- Column('id', Integer, primary_key=True, autoincrement=False),
- Column('data', String(30)))
+ table = Table('testtable', metadata, Column('id', Integer,
+ primary_key=True, autoincrement=False),
+ Column('data', String(30)))
metadata.create_all()
self._assert_data_noautoincrement(table)
def _assert_data_autoincrement(self, table):
- self.engine = engines.testing_engine(options={'implicit_returning':False})
+ self.engine = \
+ engines.testing_engine(options={'implicit_returning'
+ : False})
metadata.bind = self.engine
def go():
+
# execute with explicit id
- r = table.insert().execute({'id':30, 'data':'d1'})
+
+ r = table.insert().execute({'id': 30, 'data': 'd1'})
assert r.inserted_primary_key == [30]
# execute with prefetch id
- r = table.insert().execute({'data':'d2'})
+
+ r = table.insert().execute({'data': 'd2'})
assert r.inserted_primary_key == [1]
# executemany with explicit ids
- table.insert().execute({'id':31, 'data':'d3'}, {'id':32, 'data':'d4'})
+
+ table.insert().execute({'id': 31, 'data': 'd3'}, {'id': 32,
+ 'data': 'd4'})
# executemany, uses SERIAL
- table.insert().execute({'data':'d5'}, {'data':'d6'})
+
+ table.insert().execute({'data': 'd5'}, {'data': 'd6'})
# single execute, explicit id, inline
- table.insert(inline=True).execute({'id':33, 'data':'d7'})
+
+ table.insert(inline=True).execute({'id': 33, 'data': 'd7'})
# single execute, inline, uses SERIAL
- table.insert(inline=True).execute({'data':'d8'})
- # note that the test framework doesnt capture the "preexecute" of a seqeuence
- # or default. we just see it in the bind params.
+ table.insert(inline=True).execute({'data': 'd8'})
- self.assert_sql(self.engine, go, [], with_sequences=[
- (
- "INSERT INTO testtable (id, data) VALUES (:id, :data)",
- {'id':30, 'data':'d1'}
- ),
- (
- "INSERT INTO testtable (id, data) VALUES (:id, :data)",
- {'id':1, 'data':'d2'}
- ),
- (
- "INSERT INTO testtable (id, data) VALUES (:id, :data)",
- [{'id':31, 'data':'d3'}, {'id':32, 'data':'d4'}]
- ),
- (
- "INSERT INTO testtable (data) VALUES (:data)",
- [{'data':'d5'}, {'data':'d6'}]
- ),
- (
- "INSERT INTO testtable (id, data) VALUES (:id, :data)",
- [{'id':33, 'data':'d7'}]
- ),
- (
- "INSERT INTO testtable (data) VALUES (:data)",
- [{'data':'d8'}]
- ),
- ])
+ # note that the test framework doesnt capture the "preexecute"
+ # of a seqeuence or default. we just see it in the bind params.
+ self.assert_sql(self.engine, go, [], with_sequences=[
+ ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ {'id': 30, 'data': 'd1'}),
+ ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ {'id': 1, 'data': 'd2'}),
+ ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]),
+ ('INSERT INTO testtable (data) VALUES (:data)', [{'data'
+ : 'd5'}, {'data': 'd6'}]),
+ ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ [{'id': 33, 'data': 'd7'}]),
+ ('INSERT INTO testtable (data) VALUES (:data)', [{'data'
+ : 'd8'}]),
+ ])
assert table.select().execute().fetchall() == [
(30, 'd1'),
(1, 'd2'),
@@ -615,50 +613,39 @@ class InsertTest(TestBase, AssertsExecutionResults):
(3, 'd6'),
(33, 'd7'),
(4, 'd8'),
- ]
+ ]
table.delete().execute()
- # test the same series of events using a reflected
- # version of the table
+ # test the same series of events using a reflected version of
+ # the table
+
m2 = MetaData(self.engine)
table = Table(table.name, m2, autoload=True)
def go():
- table.insert().execute({'id':30, 'data':'d1'})
- r = table.insert().execute({'data':'d2'})
+ table.insert().execute({'id': 30, 'data': 'd1'})
+ r = table.insert().execute({'data': 'd2'})
assert r.inserted_primary_key == [5]
- table.insert().execute({'id':31, 'data':'d3'}, {'id':32, 'data':'d4'})
- table.insert().execute({'data':'d5'}, {'data':'d6'})
- table.insert(inline=True).execute({'id':33, 'data':'d7'})
- table.insert(inline=True).execute({'data':'d8'})
+ table.insert().execute({'id': 31, 'data': 'd3'}, {'id': 32,
+ 'data': 'd4'})
+ table.insert().execute({'data': 'd5'}, {'data': 'd6'})
+ table.insert(inline=True).execute({'id': 33, 'data': 'd7'})
+ table.insert(inline=True).execute({'data': 'd8'})
self.assert_sql(self.engine, go, [], with_sequences=[
- (
- "INSERT INTO testtable (id, data) VALUES (:id, :data)",
- {'id':30, 'data':'d1'}
- ),
- (
- "INSERT INTO testtable (id, data) VALUES (:id, :data)",
- {'id':5, 'data':'d2'}
- ),
- (
- "INSERT INTO testtable (id, data) VALUES (:id, :data)",
- [{'id':31, 'data':'d3'}, {'id':32, 'data':'d4'}]
- ),
- (
- "INSERT INTO testtable (data) VALUES (:data)",
- [{'data':'d5'}, {'data':'d6'}]
- ),
- (
- "INSERT INTO testtable (id, data) VALUES (:id, :data)",
- [{'id':33, 'data':'d7'}]
- ),
- (
- "INSERT INTO testtable (data) VALUES (:data)",
- [{'data':'d8'}]
- ),
- ])
-
+ ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ {'id': 30, 'data': 'd1'}),
+ ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ {'id': 5, 'data': 'd2'}),
+ ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]),
+ ('INSERT INTO testtable (data) VALUES (:data)', [{'data'
+ : 'd5'}, {'data': 'd6'}]),
+ ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ [{'id': 33, 'data': 'd7'}]),
+ ('INSERT INTO testtable (data) VALUES (:data)', [{'data'
+ : 'd8'}]),
+ ])
assert table.select().execute().fetchall() == [
(30, 'd1'),
(5, 'd2'),
@@ -668,61 +655,57 @@ class InsertTest(TestBase, AssertsExecutionResults):
(7, 'd6'),
(33, 'd7'),
(8, 'd8'),
- ]
+ ]
table.delete().execute()
def _assert_data_autoincrement_returning(self, table):
- self.engine = engines.testing_engine(options={'implicit_returning':True})
+ self.engine = \
+ engines.testing_engine(options={'implicit_returning': True})
metadata.bind = self.engine
def go():
+
# execute with explicit id
- r = table.insert().execute({'id':30, 'data':'d1'})
+
+ r = table.insert().execute({'id': 30, 'data': 'd1'})
assert r.inserted_primary_key == [30]
# execute with prefetch id
- r = table.insert().execute({'data':'d2'})
+
+ r = table.insert().execute({'data': 'd2'})
assert r.inserted_primary_key == [1]
# executemany with explicit ids
- table.insert().execute({'id':31, 'data':'d3'}, {'id':32, 'data':'d4'})
+
+ table.insert().execute({'id': 31, 'data': 'd3'}, {'id': 32,
+ 'data': 'd4'})
# executemany, uses SERIAL
- table.insert().execute({'data':'d5'}, {'data':'d6'})
+
+ table.insert().execute({'data': 'd5'}, {'data': 'd6'})
# single execute, explicit id, inline
- table.insert(inline=True).execute({'id':33, 'data':'d7'})
+
+ table.insert(inline=True).execute({'id': 33, 'data': 'd7'})
# single execute, inline, uses SERIAL
- table.insert(inline=True).execute({'data':'d8'})
-
- self.assert_sql(self.engine, go, [], with_sequences=[
- (
- "INSERT INTO testtable (id, data) VALUES (:id, :data)",
- {'id':30, 'data':'d1'}
- ),
- (
- "INSERT INTO testtable (data) VALUES (:data) RETURNING testtable.id",
- {'data': 'd2'}
- ),
- (
- "INSERT INTO testtable (id, data) VALUES (:id, :data)",
- [{'id':31, 'data':'d3'}, {'id':32, 'data':'d4'}]
- ),
- (
- "INSERT INTO testtable (data) VALUES (:data)",
- [{'data':'d5'}, {'data':'d6'}]
- ),
- (
- "INSERT INTO testtable (id, data) VALUES (:id, :data)",
- [{'id':33, 'data':'d7'}]
- ),
- (
- "INSERT INTO testtable (data) VALUES (:data)",
- [{'data':'d8'}]
- ),
- ])
+ table.insert(inline=True).execute({'data': 'd8'})
+
+ self.assert_sql(self.engine, go, [], with_sequences=[
+ ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ {'id': 30, 'data': 'd1'}),
+ ('INSERT INTO testtable (data) VALUES (:data) RETURNING '
+ 'testtable.id', {'data': 'd2'}),
+ ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]),
+ ('INSERT INTO testtable (data) VALUES (:data)', [{'data'
+ : 'd5'}, {'data': 'd6'}]),
+ ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ [{'id': 33, 'data': 'd7'}]),
+ ('INSERT INTO testtable (data) VALUES (:data)', [{'data'
+ : 'd8'}]),
+ ])
assert table.select().execute().fetchall() == [
(30, 'd1'),
(1, 'd2'),
@@ -732,50 +715,39 @@ class InsertTest(TestBase, AssertsExecutionResults):
(3, 'd6'),
(33, 'd7'),
(4, 'd8'),
- ]
+ ]
table.delete().execute()
- # test the same series of events using a reflected
- # version of the table
+ # test the same series of events using a reflected version of
+ # the table
+
m2 = MetaData(self.engine)
table = Table(table.name, m2, autoload=True)
def go():
- table.insert().execute({'id':30, 'data':'d1'})
- r = table.insert().execute({'data':'d2'})
+ table.insert().execute({'id': 30, 'data': 'd1'})
+ r = table.insert().execute({'data': 'd2'})
assert r.inserted_primary_key == [5]
- table.insert().execute({'id':31, 'data':'d3'}, {'id':32, 'data':'d4'})
- table.insert().execute({'data':'d5'}, {'data':'d6'})
- table.insert(inline=True).execute({'id':33, 'data':'d7'})
- table.insert(inline=True).execute({'data':'d8'})
+ table.insert().execute({'id': 31, 'data': 'd3'}, {'id': 32,
+ 'data': 'd4'})
+ table.insert().execute({'data': 'd5'}, {'data': 'd6'})
+ table.insert(inline=True).execute({'id': 33, 'data': 'd7'})
+ table.insert(inline=True).execute({'data': 'd8'})
self.assert_sql(self.engine, go, [], with_sequences=[
- (
- "INSERT INTO testtable (id, data) VALUES (:id, :data)",
- {'id':30, 'data':'d1'}
- ),
- (
- "INSERT INTO testtable (data) VALUES (:data) RETURNING testtable.id",
- {'data':'d2'}
- ),
- (
- "INSERT INTO testtable (id, data) VALUES (:id, :data)",
- [{'id':31, 'data':'d3'}, {'id':32, 'data':'d4'}]
- ),
- (
- "INSERT INTO testtable (data) VALUES (:data)",
- [{'data':'d5'}, {'data':'d6'}]
- ),
- (
- "INSERT INTO testtable (id, data) VALUES (:id, :data)",
- [{'id':33, 'data':'d7'}]
- ),
- (
- "INSERT INTO testtable (data) VALUES (:data)",
- [{'data':'d8'}]
- ),
- ])
-
+ ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ {'id': 30, 'data': 'd1'}),
+ ('INSERT INTO testtable (data) VALUES (:data) RETURNING '
+ 'testtable.id', {'data': 'd2'}),
+ ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]),
+ ('INSERT INTO testtable (data) VALUES (:data)', [{'data'
+ : 'd5'}, {'data': 'd6'}]),
+ ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ [{'id': 33, 'data': 'd7'}]),
+ ('INSERT INTO testtable (data) VALUES (:data)', [{'data'
+ : 'd8'}]),
+ ])
assert table.select().execute().fetchall() == [
(30, 'd1'),
(5, 'd2'),
@@ -785,48 +757,38 @@ class InsertTest(TestBase, AssertsExecutionResults):
(7, 'd6'),
(33, 'd7'),
(8, 'd8'),
- ]
+ ]
table.delete().execute()
def _assert_data_with_sequence(self, table, seqname):
- self.engine = engines.testing_engine(options={'implicit_returning':False})
+ self.engine = \
+ engines.testing_engine(options={'implicit_returning'
+ : False})
metadata.bind = self.engine
def go():
- table.insert().execute({'id':30, 'data':'d1'})
- table.insert().execute({'data':'d2'})
- table.insert().execute({'id':31, 'data':'d3'}, {'id':32, 'data':'d4'})
- table.insert().execute({'data':'d5'}, {'data':'d6'})
- table.insert(inline=True).execute({'id':33, 'data':'d7'})
- table.insert(inline=True).execute({'data':'d8'})
+ table.insert().execute({'id': 30, 'data': 'd1'})
+ table.insert().execute({'data': 'd2'})
+ table.insert().execute({'id': 31, 'data': 'd3'}, {'id': 32,
+ 'data': 'd4'})
+ table.insert().execute({'data': 'd5'}, {'data': 'd6'})
+ table.insert(inline=True).execute({'id': 33, 'data': 'd7'})
+ table.insert(inline=True).execute({'data': 'd8'})
self.assert_sql(self.engine, go, [], with_sequences=[
- (
- "INSERT INTO testtable (id, data) VALUES (:id, :data)",
- {'id':30, 'data':'d1'}
- ),
- (
- "INSERT INTO testtable (id, data) VALUES (:id, :data)",
- {'id':1, 'data':'d2'}
- ),
- (
- "INSERT INTO testtable (id, data) VALUES (:id, :data)",
- [{'id':31, 'data':'d3'}, {'id':32, 'data':'d4'}]
- ),
- (
- "INSERT INTO testtable (id, data) VALUES (nextval('%s'), :data)" % seqname,
- [{'data':'d5'}, {'data':'d6'}]
- ),
- (
- "INSERT INTO testtable (id, data) VALUES (:id, :data)",
- [{'id':33, 'data':'d7'}]
- ),
- (
- "INSERT INTO testtable (id, data) VALUES (nextval('%s'), :data)" % seqname,
- [{'data':'d8'}]
- ),
- ])
-
+ ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ {'id': 30, 'data': 'd1'}),
+ ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ {'id': 1, 'data': 'd2'}),
+ ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]),
+ ("INSERT INTO testtable (id, data) VALUES (nextval('%s'), "
+ ":data)" % seqname, [{'data': 'd5'}, {'data': 'd6'}]),
+ ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ [{'id': 33, 'data': 'd7'}]),
+ ("INSERT INTO testtable (id, data) VALUES (nextval('%s'), "
+ ":data)" % seqname, [{'data': 'd8'}]),
+ ])
assert table.select().execute().fetchall() == [
(30, 'd1'),
(1, 'd2'),
@@ -836,50 +798,40 @@ class InsertTest(TestBase, AssertsExecutionResults):
(3, 'd6'),
(33, 'd7'),
(4, 'd8'),
- ]
+ ]
# cant test reflection here since the Sequence must be
# explicitly specified
def _assert_data_with_sequence_returning(self, table, seqname):
- self.engine = engines.testing_engine(options={'implicit_returning':True})
+ self.engine = \
+ engines.testing_engine(options={'implicit_returning': True})
metadata.bind = self.engine
def go():
- table.insert().execute({'id':30, 'data':'d1'})
- table.insert().execute({'data':'d2'})
- table.insert().execute({'id':31, 'data':'d3'}, {'id':32, 'data':'d4'})
- table.insert().execute({'data':'d5'}, {'data':'d6'})
- table.insert(inline=True).execute({'id':33, 'data':'d7'})
- table.insert(inline=True).execute({'data':'d8'})
+ table.insert().execute({'id': 30, 'data': 'd1'})
+ table.insert().execute({'data': 'd2'})
+ table.insert().execute({'id': 31, 'data': 'd3'}, {'id': 32,
+ 'data': 'd4'})
+ table.insert().execute({'data': 'd5'}, {'data': 'd6'})
+ table.insert(inline=True).execute({'id': 33, 'data': 'd7'})
+ table.insert(inline=True).execute({'data': 'd8'})
self.assert_sql(self.engine, go, [], with_sequences=[
- (
- "INSERT INTO testtable (id, data) VALUES (:id, :data)",
- {'id':30, 'data':'d1'}
- ),
- (
- "INSERT INTO testtable (id, data) VALUES (nextval('my_seq'), :data) RETURNING testtable.id",
- {'data':'d2'}
- ),
- (
- "INSERT INTO testtable (id, data) VALUES (:id, :data)",
- [{'id':31, 'data':'d3'}, {'id':32, 'data':'d4'}]
- ),
- (
- "INSERT INTO testtable (id, data) VALUES (nextval('%s'), :data)" % seqname,
- [{'data':'d5'}, {'data':'d6'}]
- ),
- (
- "INSERT INTO testtable (id, data) VALUES (:id, :data)",
- [{'id':33, 'data':'d7'}]
- ),
- (
- "INSERT INTO testtable (id, data) VALUES (nextval('%s'), :data)" % seqname,
- [{'data':'d8'}]
- ),
- ])
-
+ ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ {'id': 30, 'data': 'd1'}),
+ ("INSERT INTO testtable (id, data) VALUES "
+ "(nextval('my_seq'), :data) RETURNING testtable.id",
+ {'data': 'd2'}),
+ ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]),
+ ("INSERT INTO testtable (id, data) VALUES (nextval('%s'), "
+ ":data)" % seqname, [{'data': 'd5'}, {'data': 'd6'}]),
+ ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ [{'id': 33, 'data': 'd7'}]),
+ ("INSERT INTO testtable (id, data) VALUES (nextval('%s'), "
+ ":data)" % seqname, [{'data': 'd8'}]),
+ ])
assert table.select().execute().fetchall() == [
(30, 'd1'),
(1, 'd2'),
@@ -889,79 +841,87 @@ class InsertTest(TestBase, AssertsExecutionResults):
(3, 'd6'),
(33, 'd7'),
(4, 'd8'),
- ]
+ ]
# cant test reflection here since the Sequence must be
# explicitly specified
def _assert_data_noautoincrement(self, table):
- self.engine = engines.testing_engine(options={'implicit_returning':False})
+ self.engine = \
+ engines.testing_engine(options={'implicit_returning'
+ : False})
metadata.bind = self.engine
-
- table.insert().execute({'id':30, 'data':'d1'})
-
+ table.insert().execute({'id': 30, 'data': 'd1'})
if self.engine.driver == 'pg8000':
exception_cls = exc.ProgrammingError
elif self.engine.driver == 'pypostgresql':
exception_cls = Exception
else:
exception_cls = exc.IntegrityError
-
- assert_raises_message(exception_cls, "violates not-null constraint", table.insert().execute, {'data':'d2'})
- assert_raises_message(exception_cls, "violates not-null constraint", table.insert().execute, {'data':'d2'}, {'data':'d3'})
-
- assert_raises_message(exception_cls, "violates not-null constraint", table.insert().execute, {'data':'d2'})
-
- assert_raises_message(exception_cls, "violates not-null constraint", table.insert().execute, {'data':'d2'}, {'data':'d3'})
-
- table.insert().execute({'id':31, 'data':'d2'}, {'id':32, 'data':'d3'})
- table.insert(inline=True).execute({'id':33, 'data':'d4'})
-
- assert table.select().execute().fetchall() == [
- (30, 'd1'),
- (31, 'd2'),
- (32, 'd3'),
- (33, 'd4'),
- ]
+ assert_raises_message(exception_cls,
+ 'violates not-null constraint',
+ table.insert().execute, {'data': 'd2'})
+ assert_raises_message(exception_cls,
+ 'violates not-null constraint',
+ table.insert().execute, {'data': 'd2'},
+ {'data': 'd3'})
+ assert_raises_message(exception_cls,
+ 'violates not-null constraint',
+ table.insert().execute, {'data': 'd2'})
+ assert_raises_message(exception_cls,
+ 'violates not-null constraint',
+ table.insert().execute, {'data': 'd2'},
+ {'data': 'd3'})
+ table.insert().execute({'id': 31, 'data': 'd2'}, {'id': 32,
+ 'data': 'd3'})
+ table.insert(inline=True).execute({'id': 33, 'data': 'd4'})
+ assert table.select().execute().fetchall() == [(30, 'd1'), (31,
+ 'd2'), (32, 'd3'), (33, 'd4')]
table.delete().execute()
- # test the same series of events using a reflected
- # version of the table
+ # test the same series of events using a reflected version of
+ # the table
+
m2 = MetaData(self.engine)
table = Table(table.name, m2, autoload=True)
- table.insert().execute({'id':30, 'data':'d1'})
-
- assert_raises_message(exception_cls, "violates not-null constraint", table.insert().execute, {'data':'d2'})
- assert_raises_message(exception_cls, "violates not-null constraint", table.insert().execute, {'data':'d2'}, {'data':'d3'})
-
- table.insert().execute({'id':31, 'data':'d2'}, {'id':32, 'data':'d3'})
- table.insert(inline=True).execute({'id':33, 'data':'d4'})
-
- assert table.select().execute().fetchall() == [
- (30, 'd1'),
- (31, 'd2'),
- (32, 'd3'),
- (33, 'd4'),
- ]
+ table.insert().execute({'id': 30, 'data': 'd1'})
+ assert_raises_message(exception_cls,
+ 'violates not-null constraint',
+ table.insert().execute, {'data': 'd2'})
+ assert_raises_message(exception_cls,
+ 'violates not-null constraint',
+ table.insert().execute, {'data': 'd2'},
+ {'data': 'd3'})
+ table.insert().execute({'id': 31, 'data': 'd2'}, {'id': 32,
+ 'data': 'd3'})
+ table.insert(inline=True).execute({'id': 33, 'data': 'd4'})
+ assert table.select().execute().fetchall() == [(30, 'd1'), (31,
+ 'd2'), (32, 'd3'), (33, 'd4')]
class DomainReflectionTest(TestBase, AssertsExecutionResults):
- "Test PostgreSQL domains"
+
+ """Test PostgreSQL domains"""
__only_on__ = 'postgresql'
@classmethod
def setup_class(cls):
con = testing.db.connect()
- for ddl in ('CREATE DOMAIN testdomain INTEGER NOT NULL DEFAULT 42',
- 'CREATE DOMAIN test_schema.testdomain INTEGER DEFAULT 0'):
+ for ddl in \
+ 'CREATE DOMAIN testdomain INTEGER NOT NULL DEFAULT 42', \
+ 'CREATE DOMAIN test_schema.testdomain INTEGER DEFAULT 0':
try:
con.execute(ddl)
except exc.SQLError, e:
- if not "already exists" in str(e):
+ if not 'already exists' in str(e):
raise e
- con.execute('CREATE TABLE testtable (question integer, answer testdomain)')
- con.execute('CREATE TABLE test_schema.testtable(question integer, answer test_schema.testdomain, anything integer)')
- con.execute('CREATE TABLE crosschema (question integer, answer test_schema.testdomain)')
+ con.execute('CREATE TABLE testtable (question integer, answer '
+ 'testdomain)')
+ con.execute('CREATE TABLE test_schema.testtable(question '
+ 'integer, answer test_schema.testdomain, anything '
+ 'integer)')
+ con.execute('CREATE TABLE crosschema (question integer, answer '
+ 'test_schema.testdomain)')
@classmethod
def teardown_class(cls):
@@ -975,60 +935,70 @@ class DomainReflectionTest(TestBase, AssertsExecutionResults):
def test_table_is_reflected(self):
metadata = MetaData(testing.db)
table = Table('testtable', metadata, autoload=True)
- eq_(set(table.columns.keys()), set(['question', 'answer']), "Columns of reflected table didn't equal expected columns")
+ eq_(set(table.columns.keys()), set(['question', 'answer']),
+ "Columns of reflected table didn't equal expected columns")
assert isinstance(table.c.answer.type, Integer)
def test_domain_is_reflected(self):
metadata = MetaData(testing.db)
table = Table('testtable', metadata, autoload=True)
- eq_(str(table.columns.answer.server_default.arg), '42', "Reflected default value didn't equal expected value")
- assert not table.columns.answer.nullable, "Expected reflected column to not be nullable."
+ eq_(str(table.columns.answer.server_default.arg), '42',
+ "Reflected default value didn't equal expected value")
+ assert not table.columns.answer.nullable, \
+ 'Expected reflected column to not be nullable.'
def test_table_is_reflected_test_schema(self):
metadata = MetaData(testing.db)
- table = Table('testtable', metadata, autoload=True, schema='test_schema')
- eq_(set(table.columns.keys()), set(['question', 'answer', 'anything']), "Columns of reflected table didn't equal expected columns")
+ table = Table('testtable', metadata, autoload=True,
+ schema='test_schema')
+ eq_(set(table.columns.keys()), set(['question', 'answer',
+ 'anything']),
+ "Columns of reflected table didn't equal expected columns")
assert isinstance(table.c.anything.type, Integer)
def test_schema_domain_is_reflected(self):
metadata = MetaData(testing.db)
- table = Table('testtable', metadata, autoload=True, schema='test_schema')
- eq_(str(table.columns.answer.server_default.arg), '0', "Reflected default value didn't equal expected value")
- assert table.columns.answer.nullable, "Expected reflected column to be nullable."
+ table = Table('testtable', metadata, autoload=True,
+ schema='test_schema')
+ eq_(str(table.columns.answer.server_default.arg), '0',
+ "Reflected default value didn't equal expected value")
+ assert table.columns.answer.nullable, \
+ 'Expected reflected column to be nullable.'
def test_crosschema_domain_is_reflected(self):
metadata = MetaData(testing.db)
table = Table('crosschema', metadata, autoload=True)
- eq_(str(table.columns.answer.server_default.arg), '0', "Reflected default value didn't equal expected value")
- assert table.columns.answer.nullable, "Expected reflected column to be nullable."
+ eq_(str(table.columns.answer.server_default.arg), '0',
+ "Reflected default value didn't equal expected value")
+ assert table.columns.answer.nullable, \
+ 'Expected reflected column to be nullable.'
def test_unknown_types(self):
from sqlalchemy.databases import postgresql
-
ischema_names = postgresql.PGDialect.ischema_names
postgresql.PGDialect.ischema_names = {}
try:
m2 = MetaData(testing.db)
- assert_raises(exc.SAWarning, Table, "testtable", m2, autoload=True)
+ assert_raises(exc.SAWarning, Table, 'testtable', m2,
+ autoload=True)
@testing.emits_warning('Did not recognize type')
def warns():
m3 = MetaData(testing.db)
- t3 = Table("testtable", m3, autoload=True)
+ t3 = Table('testtable', m3, autoload=True)
assert t3.c.answer.type.__class__ == sa.types.NullType
-
finally:
postgresql.PGDialect.ischema_names = ischema_names
class MiscTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL):
+
__only_on__ = 'postgresql'
def test_date_reflection(self):
m1 = MetaData(testing.db)
- t1 = Table('pgdate', m1,
- Column('date1', DateTime(timezone=True)),
- Column('date2', DateTime(timezone=False))
- )
+ t1 = Table('pgdate', m1, Column('date1',
+ DateTime(timezone=True)), Column('date2',
+ DateTime(timezone=False)))
m1.create_all()
try:
m2 = MetaData(testing.db)
@@ -1037,26 +1007,33 @@ class MiscTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL):
assert t2.c.date2.type.timezone is False
finally:
m1.drop_all()
-
- @testing.fails_on('+zxjdbc', 'The JDBC driver handles the version parsing')
+
+ @testing.fails_on('+zxjdbc',
+ 'The JDBC driver handles the version parsing')
def test_version_parsing(self):
+
+
class MockConn(object):
+
def __init__(self, res):
self.res = res
-
+
def execute(self, str):
return self
-
+
def scalar(self):
return self.res
-
- for string, version in [
- ("PostgreSQL 8.3.8 on i686-redhat-linux-gnu, compiled by GCC gcc (GCC) 4.1.2 20070925 (Red Hat 4.1.2-33)", (8, 3, 8)),
- ("PostgreSQL 8.5devel on x86_64-unknown-linux-gnu, compiled by GCC gcc (GCC) 4.4.2, 64-bit", (8, 5)),
- ]:
-
- eq_(testing.db.dialect._get_server_version_info(MockConn(string)), version)
-
+
+
+ for string, version in \
+ [('PostgreSQL 8.3.8 on i686-redhat-linux-gnu, compiled by '
+ 'GCC gcc (GCC) 4.1.2 20070925 (Red Hat 4.1.2-33)', (8, 3,
+ 8)),
+ ('PostgreSQL 8.5devel on x86_64-unknown-linux-gnu, '
+ 'compiled by GCC gcc (GCC) 4.4.2, 64-bit', (8, 5))]:
+ eq_(testing.db.dialect._get_server_version_info(MockConn(string)),
+ version)
+
@testing.only_on('postgresql+psycopg2', 'psycopg2-specific feature')
def test_notice_logging(self):
log = logging.getLogger('sqlalchemy.dialects.postgresql')
@@ -1068,211 +1045,198 @@ class MiscTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL):
conn = testing.db.connect()
trans = conn.begin()
try:
- conn.execute("create table foo (id serial primary key)")
+ conn.execute('create table foo (id serial primary key)')
finally:
trans.rollback()
finally:
log.removeHandler(buf)
log.setLevel(lev)
+ msgs = ' '.join(b.msg for b in buf.buffer)
+ assert 'will create implicit sequence' in msgs
+ assert 'will create implicit index' in msgs
- msgs = " ".join(b.msg for b in buf.buffer)
- assert "will create implicit sequence" in msgs
- assert "will create implicit index" in msgs
-
-
def test_pg_weirdchar_reflection(self):
meta1 = MetaData(testing.db)
- subject = Table("subject", meta1,
- Column("id$", Integer, primary_key=True),
- )
-
- referer = Table("referer", meta1,
- Column("id", Integer, primary_key=True),
- Column("ref", Integer, ForeignKey('subject.id$')),
- )
+ subject = Table('subject', meta1, Column('id$', Integer,
+ primary_key=True))
+ referer = Table('referer', meta1, Column('id', Integer,
+ primary_key=True), Column('ref', Integer,
+ ForeignKey('subject.id$')))
meta1.create_all()
try:
meta2 = MetaData(testing.db)
- subject = Table("subject", meta2, autoload=True)
- referer = Table("referer", meta2, autoload=True)
+ subject = Table('subject', meta2, autoload=True)
+ referer = Table('referer', meta2, autoload=True)
print str(subject.join(referer).onclause)
- self.assert_((subject.c['id$']==referer.c.ref).compare(subject.join(referer).onclause))
+ self.assert_((subject.c['id$']
+ == referer.c.ref).compare(
+ subject.join(referer).onclause))
finally:
meta1.drop_all()
- @testing.fails_on('+zxjdbc', "Can't infer the SQL type to use "
- "for an instance of "
- "org.python.core.PyObjectDerived.")
+ @testing.fails_on('+zxjdbc',
+ "Can't infer the SQL type to use for an instance "
+ "of org.python.core.PyObjectDerived.")
@testing.fails_on('+pg8000', "Can't determine correct type.")
def test_extract(self):
- fivedaysago = datetime.datetime.now() - datetime.timedelta(days=5)
- for field, exp in (
- ('year', fivedaysago.year),
- ('month', fivedaysago.month),
- ('day', fivedaysago.day),
- ):
- r = testing.db.execute(
- select([extract(field, func.now() + datetime.timedelta(days =-5))])
- ).scalar()
+ fivedaysago = datetime.datetime.now() \
+ - datetime.timedelta(days=5)
+ for field, exp in ('year', fivedaysago.year), ('month',
+ fivedaysago.month), ('day', fivedaysago.day):
+ r = testing.db.execute(select([extract(field, func.now()
+ + datetime.timedelta(days=-5))])).scalar()
eq_(r, exp)
-
def test_checksfor_sequence(self):
meta1 = MetaData(testing.db)
- t = Table('mytable', meta1,
- Column('col1', Integer, Sequence('fooseq')))
+ t = Table('mytable', meta1, Column('col1', Integer,
+ Sequence('fooseq')))
try:
- testing.db.execute("CREATE SEQUENCE fooseq")
+ testing.db.execute('CREATE SEQUENCE fooseq')
t.create(checkfirst=True)
finally:
t.drop(checkfirst=True)
def test_renamed_sequence_reflection(self):
m1 = MetaData(testing.db)
- t = Table('t', m1,
- Column('id', Integer, primary_key=True)
- )
+ t = Table('t', m1, Column('id', Integer, primary_key=True))
m1.create_all()
try:
m2 = MetaData(testing.db)
t2 = Table('t', m2, autoload=True, implicit_returning=False)
- eq_(t2.c.id.server_default.arg.text, "nextval('t_id_seq'::regclass)")
-
+ eq_(t2.c.id.server_default.arg.text,
+ "nextval('t_id_seq'::regclass)")
r = t2.insert().execute()
eq_(r.inserted_primary_key, [1])
-
- testing.db.connect().\
- execution_options(autocommit=True).\
- execute("alter table t_id_seq rename to foobar_id_seq")
-
+ testing.db.connect().execution_options(autocommit=True).\
+ execute('alter table t_id_seq rename to foobar_id_seq'
+ )
m3 = MetaData(testing.db)
t3 = Table('t', m3, autoload=True, implicit_returning=False)
- eq_(t3.c.id.server_default.arg.text, "nextval('foobar_id_seq'::regclass)")
-
+ eq_(t3.c.id.server_default.arg.text,
+ "nextval('foobar_id_seq'::regclass)")
r = t3.insert().execute()
eq_(r.inserted_primary_key, [2])
-
finally:
m1.drop_all()
-
-
+
def test_distinct_on(self):
- t = Table('mytable', MetaData(testing.db),
- Column('id', Integer, primary_key=True),
- Column('a', String(8)))
- eq_(
- str(t.select(distinct=t.c.a)),
- 'SELECT DISTINCT ON (mytable.a) mytable.id, mytable.a \n'
- 'FROM mytable')
- eq_(
- str(t.select(distinct=['id','a'])),
- 'SELECT DISTINCT ON (id, a) mytable.id, mytable.a \n'
- 'FROM mytable')
- eq_(
- str(t.select(distinct=[t.c.id, t.c.a])),
- 'SELECT DISTINCT ON (mytable.id, mytable.a) mytable.id, mytable.a \n'
- 'FROM mytable')
+ t = Table('mytable', MetaData(testing.db), Column('id',
+ Integer, primary_key=True), Column('a', String(8)))
+ eq_(str(t.select(distinct=t.c.a)),
+ 'SELECT DISTINCT ON (mytable.a) mytable.id, mytable.a '
+ '\nFROM mytable')
+ eq_(str(t.select(distinct=['id', 'a'])),
+ 'SELECT DISTINCT ON (id, a) mytable.id, mytable.a \nFROM '
+ 'mytable')
+ eq_(str(t.select(distinct=[t.c.id, t.c.a])),
+ 'SELECT DISTINCT ON (mytable.id, mytable.a) mytable.id, '
+ 'mytable.a \nFROM mytable')
def test_schema_reflection(self):
- """note: this test requires that the 'test_schema' schema be separate and accessible by the test user"""
+ """note: this test requires that the 'test_schema' schema be
+ separate and accessible by the test user"""
meta1 = MetaData(testing.db)
- users = Table('users', meta1,
- Column('user_id', Integer, primary_key = True),
- Column('user_name', String(30), nullable = False),
- schema="test_schema"
- )
-
- addresses = Table('email_addresses', meta1,
- Column('address_id', Integer, primary_key = True),
- Column('remote_user_id', Integer, ForeignKey(users.c.user_id)),
+ users = Table('users', meta1, Column('user_id', Integer,
+ primary_key=True), Column('user_name',
+ String(30), nullable=False), schema='test_schema')
+ addresses = Table(
+ 'email_addresses',
+ meta1,
+ Column('address_id', Integer, primary_key=True),
+ Column('remote_user_id', Integer,
+ ForeignKey(users.c.user_id)),
Column('email_address', String(20)),
- schema="test_schema"
- )
+ schema='test_schema',
+ )
meta1.create_all()
try:
meta2 = MetaData(testing.db)
- addresses = Table('email_addresses', meta2, autoload=True, schema="test_schema")
- users = Table('users', meta2, mustexist=True, schema="test_schema")
-
+ addresses = Table('email_addresses', meta2, autoload=True,
+ schema='test_schema')
+ users = Table('users', meta2, mustexist=True,
+ schema='test_schema')
print users
print addresses
j = join(users, addresses)
print str(j.onclause)
- self.assert_((users.c.user_id==addresses.c.remote_user_id).compare(j.onclause))
+ self.assert_((users.c.user_id
+ == addresses.c.remote_user_id).compare(j.onclause))
finally:
meta1.drop_all()
def test_schema_reflection_2(self):
meta1 = MetaData(testing.db)
- subject = Table("subject", meta1,
- Column("id", Integer, primary_key=True),
- )
-
- referer = Table("referer", meta1,
- Column("id", Integer, primary_key=True),
- Column("ref", Integer, ForeignKey('subject.id')),
- schema="test_schema")
+ subject = Table('subject', meta1, Column('id', Integer,
+ primary_key=True))
+ referer = Table('referer', meta1, Column('id', Integer,
+ primary_key=True), Column('ref', Integer,
+ ForeignKey('subject.id')), schema='test_schema')
meta1.create_all()
try:
meta2 = MetaData(testing.db)
- subject = Table("subject", meta2, autoload=True)
- referer = Table("referer", meta2, schema="test_schema", autoload=True)
+ subject = Table('subject', meta2, autoload=True)
+ referer = Table('referer', meta2, schema='test_schema',
+ autoload=True)
print str(subject.join(referer).onclause)
- self.assert_((subject.c.id==referer.c.ref).compare(subject.join(referer).onclause))
+ self.assert_((subject.c.id
+ == referer.c.ref).compare(
+ subject.join(referer).onclause))
finally:
meta1.drop_all()
def test_schema_reflection_3(self):
meta1 = MetaData(testing.db)
- subject = Table("subject", meta1,
- Column("id", Integer, primary_key=True),
- schema='test_schema_2'
- )
-
- referer = Table("referer", meta1,
- Column("id", Integer, primary_key=True),
- Column("ref", Integer, ForeignKey('test_schema_2.subject.id')),
- schema="test_schema")
-
+ subject = Table('subject', meta1, Column('id', Integer,
+ primary_key=True), schema='test_schema_2')
+ referer = Table('referer', meta1, Column('id', Integer,
+ primary_key=True), Column('ref', Integer,
+ ForeignKey('test_schema_2.subject.id')),
+ schema='test_schema')
meta1.create_all()
try:
meta2 = MetaData(testing.db)
- subject = Table("subject", meta2, autoload=True, schema="test_schema_2")
- referer = Table("referer", meta2, schema="test_schema", autoload=True)
+ subject = Table('subject', meta2, autoload=True,
+ schema='test_schema_2')
+ referer = Table('referer', meta2, schema='test_schema',
+ autoload=True)
print str(subject.join(referer).onclause)
- self.assert_((subject.c.id==referer.c.ref).compare(subject.join(referer).onclause))
+ self.assert_((subject.c.id
+ == referer.c.ref).compare(
+ subject.join(referer).onclause))
finally:
meta1.drop_all()
def test_schema_roundtrips(self):
meta = MetaData(testing.db)
- users = Table('users', meta,
- Column('id', Integer, primary_key=True),
- Column('name', String(50)), schema='test_schema')
+ users = Table('users', meta, Column('id', Integer,
+ primary_key=True), Column('name', String(50)),
+ schema='test_schema')
users.create()
try:
users.insert().execute(id=1, name='name1')
users.insert().execute(id=2, name='name2')
users.insert().execute(id=3, name='name3')
users.insert().execute(id=4, name='name4')
-
- eq_(users.select().where(users.c.name=='name2').execute().fetchall(), [(2, 'name2')])
- eq_(users.select(use_labels=True).where(users.c.name=='name2').execute().fetchall(), [(2, 'name2')])
-
- users.delete().where(users.c.id==3).execute()
- eq_(users.select().where(users.c.name=='name3').execute().fetchall(), [])
-
- users.update().where(users.c.name=='name4').execute(name='newname')
- eq_(users.select(use_labels=True).where(users.c.id==4).execute().fetchall(), [(4, 'newname')])
-
+ eq_(users.select().where(users.c.name == 'name2'
+ ).execute().fetchall(), [(2, 'name2')])
+ eq_(users.select(use_labels=True).where(users.c.name
+ == 'name2').execute().fetchall(), [(2, 'name2')])
+ users.delete().where(users.c.id == 3).execute()
+ eq_(users.select().where(users.c.name == 'name3'
+ ).execute().fetchall(), [])
+ users.update().where(users.c.name == 'name4'
+ ).execute(name='newname')
+ eq_(users.select(use_labels=True).where(users.c.id
+ == 4).execute().fetchall(), [(4, 'newname')])
finally:
users.drop()
def test_preexecute_passivedefault(self):
- """test that when we get a primary key column back
- from reflecting a table which has a default value on it, we pre-execute
- that DefaultClause upon insert."""
+ """test that when we get a primary key column back from
+ reflecting a table which has a default value on it, we pre-
+ execute that DefaultClause upon insert."""
try:
meta = MetaData(testing.db)
@@ -1285,63 +1249,61 @@ class MiscTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL):
user_password VARCHAR NOT NULL
);
""")
-
- t = Table("speedy_users", meta, autoload=True)
- r = t.insert().execute(user_name='user', user_password='lala')
+ t = Table('speedy_users', meta, autoload=True)
+ r = t.insert().execute(user_name='user',
+ user_password='lala')
assert r.inserted_primary_key == [1]
l = t.select().execute().fetchall()
assert l == [(1, 'user', 'lala')]
finally:
- testing.db.execute("drop table speedy_users")
+ testing.db.execute('drop table speedy_users')
@testing.emits_warning()
def test_index_reflection(self):
- """ Reflecting partial & expression-based indexes should warn """
+ """ Reflecting partial & expression-based indexes should warn
+ """
+
import warnings
+
def capture_warnings(*args, **kw):
capture_warnings._orig_showwarning(*args, **kw)
capture_warnings.warnings.append(args)
+
capture_warnings._orig_showwarning = warnings.warn
capture_warnings.warnings = []
-
m1 = MetaData(testing.db)
- t1 = Table('party', m1,
- Column('id', String(10), nullable=False),
- Column('name', String(20), index=True),
- Column('aname', String(20))
- )
+ t1 = Table('party', m1, Column('id', String(10),
+ nullable=False), Column('name', String(20),
+ index=True), Column('aname', String(20)))
m1.create_all()
-
testing.db.execute("""
create index idx1 on party ((id || name))
- """)
+ """)
testing.db.execute("""
create unique index idx2 on party (id) where name = 'test'
""")
-
testing.db.execute("""
create index idx3 on party using btree
(lower(name::text), lower(aname::text))
""")
-
try:
m2 = MetaData(testing.db)
-
warnings.warn = capture_warnings
t2 = Table('party', m2, autoload=True)
-
wrn = capture_warnings.warnings
- assert str(wrn[0][0]) == (
- "Skipped unsupported reflection of expression-based index idx1")
- assert str(wrn[1][0]) == (
- "Predicate of partial index idx2 ignored during reflection")
+ assert str(wrn[0][0]) \
+ == 'Skipped unsupported reflection of '\
+ 'expression-based index idx1'
+ assert str(wrn[1][0]) \
+ == 'Predicate of partial index idx2 ignored during '\
+ 'reflection'
assert len(t2.indexes) == 2
+
# Make sure indexes are in the order we expect them in
+
tmp = [(idx.name, idx) for idx in t2.indexes]
tmp.sort()
-
r1, r2 = [idx[1] for idx in tmp]
-
assert r1.name == 'idx2'
assert r1.unique == True
assert r2.unique == False
@@ -1351,44 +1313,43 @@ class MiscTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL):
warnings.warn = capture_warnings._orig_showwarning
m1.drop_all()
- @testing.fails_on('postgresql+pypostgresql', 'pypostgresql bombs on multiple calls')
+ @testing.fails_on('postgresql+pypostgresql',
+ 'pypostgresql bombs on multiple calls')
def test_set_isolation_level(self):
"""Test setting the isolation level with create_engine"""
+
eng = create_engine(testing.db.url)
- eq_(
- eng.execute("show transaction isolation level").scalar(),
+ eq_(eng.execute('show transaction isolation level').scalar(),
'read committed')
- eng = create_engine(testing.db.url, isolation_level="SERIALIZABLE")
- eq_(
- eng.execute("show transaction isolation level").scalar(),
+ eng = create_engine(testing.db.url,
+ isolation_level='SERIALIZABLE')
+ eq_(eng.execute('show transaction isolation level').scalar(),
'serializable')
- eng = create_engine(testing.db.url, isolation_level="FOO")
-
+ eng = create_engine(testing.db.url, isolation_level='FOO')
if testing.db.driver == 'zxjdbc':
exception_cls = eng.dialect.dbapi.Error
else:
exception_cls = eng.dialect.dbapi.ProgrammingError
- assert_raises(exception_cls, eng.execute, "show transaction isolation level")
-
- @testing.fails_on('+zxjdbc',
- "psycopg2/pg8000 specific assertion")
- @testing.fails_on('pypostgresql',
- "psycopg2/pg8000 specific assertion")
+ assert_raises(exception_cls, eng.execute,
+ 'show transaction isolation level')
+
+ @testing.fails_on('+zxjdbc', 'psycopg2/pg8000 specific assertion')
+ @testing.fails_on('pypostgresql',
+ 'psycopg2/pg8000 specific assertion')
def test_numeric_raise(self):
- stmt = text("select cast('hi' as char) as hi", typemap={'hi':Numeric})
- assert_raises(
- exc.InvalidRequestError,
- testing.db.execute, stmt
- )
+ stmt = text("select cast('hi' as char) as hi", typemap={'hi'
+ : Numeric})
+ assert_raises(exc.InvalidRequestError, testing.db.execute, stmt)
class TimezoneTest(TestBase):
- """Test timezone-aware datetimes.
- psycopg will return a datetime with a tzinfo attached to it, if postgresql
- returns it. python then will not let you compare a datetime with a tzinfo
- to a datetime that doesnt have one. this test illustrates two ways to
- have datetime types with and without timezone info.
- """
+ """Test timezone-aware datetimes.
+
+ psycopg will return a datetime with a tzinfo attached to it, if
+ postgresql returns it. python then will not let you compare a
+ datetime with a tzinfo to a datetime that doesnt have one. this
+ test illustrates two ways to have datetime types with and without
+ timezone info. """
__only_on__ = 'postgresql'
@@ -1397,17 +1358,20 @@ class TimezoneTest(TestBase):
global tztable, notztable, metadata
metadata = MetaData(testing.db)
- # current_timestamp() in postgresql is assumed to return TIMESTAMP WITH TIMEZONE
- tztable = Table('tztable', metadata,
- Column("id", Integer, primary_key=True),
- Column("date", DateTime(timezone=True), onupdate=func.current_timestamp()),
- Column("name", String(20)),
- )
- notztable = Table('notztable', metadata,
- Column("id", Integer, primary_key=True),
- Column("date", DateTime(timezone=False), onupdate=cast(func.current_timestamp(), DateTime(timezone=False))),
- Column("name", String(20)),
- )
+ # current_timestamp() in postgresql is assumed to return
+ # TIMESTAMP WITH TIMEZONE
+
+ tztable = Table('tztable', metadata, Column('id', Integer,
+ primary_key=True), Column('date',
+ DateTime(timezone=True),
+ onupdate=func.current_timestamp()),
+ Column('name', String(20)))
+ notztable = Table('notztable', metadata, Column('id', Integer,
+ primary_key=True), Column('date',
+ DateTime(timezone=False),
+ onupdate=cast(func.current_timestamp(),
+ DateTime(timezone=False))), Column('name',
+ String(20)))
metadata.create_all()
@classmethod
@@ -1415,63 +1379,76 @@ class TimezoneTest(TestBase):
metadata.drop_all()
def test_with_timezone(self):
+
# get a date with a tzinfo
- somedate = testing.db.connect().scalar(func.current_timestamp().select())
+
+ somedate = \
+ testing.db.connect().scalar(func.current_timestamp().select())
assert somedate.tzinfo
-
tztable.insert().execute(id=1, name='row1', date=somedate)
-
- row = select([tztable.c.date], tztable.c.id==1).execute().first()
+ row = select([tztable.c.date], tztable.c.id
+ == 1).execute().first()
eq_(row[0], somedate)
- eq_(somedate.tzinfo.utcoffset(somedate), row[0].tzinfo.utcoffset(row[0]))
-
- result = tztable.update(tztable.c.id==1).\
- returning(tztable.c.date).execute(name='newname')
+ eq_(somedate.tzinfo.utcoffset(somedate),
+ row[0].tzinfo.utcoffset(row[0]))
+ result = tztable.update(tztable.c.id
+ == 1).returning(tztable.c.date).\
+ execute(name='newname'
+ )
row = result.first()
assert row[0] >= somedate
def test_without_timezone(self):
+
# get a date without a tzinfo
- somedate = datetime.datetime(2005, 10, 20, 11, 52, 0)
+
+ somedate = datetime.datetime( 2005, 10, 20, 11, 52, 0, )
assert not somedate.tzinfo
-
notztable.insert().execute(id=1, name='row1', date=somedate)
-
- row = select([notztable.c.date], notztable.c.id==1).execute().first()
+ row = select([notztable.c.date], notztable.c.id
+ == 1).execute().first()
eq_(row[0], somedate)
eq_(row[0].tzinfo, None)
-
- result = notztable.update(notztable.c.id==1).\
- returning(notztable.c.date).execute(name='newname')
+ result = notztable.update(notztable.c.id
+ == 1).returning(notztable.c.date).\
+ execute(name='newname'
+ )
row = result.first()
assert row[0] >= somedate
class TimePrecisionTest(TestBase, AssertsCompiledSQL):
+
__dialect__ = postgresql.dialect()
-
+
def test_compile(self):
- for (type_, expected) in [
- (postgresql.TIME(), "TIME WITHOUT TIME ZONE"),
- (postgresql.TIME(precision=5), "TIME(5) WITHOUT TIME ZONE"),
- (postgresql.TIME(timezone=True, precision=5), "TIME(5) WITH TIME ZONE"),
- (postgresql.TIMESTAMP(), "TIMESTAMP WITHOUT TIME ZONE"),
- (postgresql.TIMESTAMP(precision=5), "TIMESTAMP(5) WITHOUT TIME ZONE"),
- (postgresql.TIMESTAMP(timezone=True, precision=5), "TIMESTAMP(5) WITH TIME ZONE"),
- ]:
+ for type_, expected in [
+ (postgresql.TIME(), 'TIME WITHOUT TIME ZONE'),
+ (postgresql.TIME(precision=5), 'TIME(5) WITHOUT TIME ZONE'
+ ),
+ (postgresql.TIME(timezone=True, precision=5),
+ 'TIME(5) WITH TIME ZONE'),
+ (postgresql.TIMESTAMP(), 'TIMESTAMP WITHOUT TIME ZONE'),
+ (postgresql.TIMESTAMP(precision=5),
+ 'TIMESTAMP(5) WITHOUT TIME ZONE'),
+ (postgresql.TIMESTAMP(timezone=True, precision=5),
+ 'TIMESTAMP(5) WITH TIME ZONE'),
+ ]:
self.assert_compile(type_, expected)
-
+
@testing.only_on('postgresql', 'DB specific feature')
def test_reflection(self):
m1 = MetaData(testing.db)
- t1 = Table('t1', m1,
+ t1 = Table(
+ 't1',
+ m1,
Column('c1', postgresql.TIME()),
Column('c2', postgresql.TIME(precision=5)),
- Column('c3', postgresql.TIME(timezone=True, precision=5)),
- Column('c4', postgresql.TIMESTAMP()),
- Column('c5', postgresql.TIMESTAMP(precision=5)),
- Column('c6', postgresql.TIMESTAMP(timezone=True, precision=5)),
-
- )
+ Column('c3', postgresql.TIME(timezone=True, precision=5)),
+ Column('c4', postgresql.TIMESTAMP()),
+ Column('c5', postgresql.TIMESTAMP(precision=5)),
+ Column('c6', postgresql.TIMESTAMP(timezone=True,
+ precision=5)),
+ )
t1.create()
try:
m2 = MetaData(testing.db)
@@ -1491,26 +1468,23 @@ class TimePrecisionTest(TestBase, AssertsCompiledSQL):
finally:
t1.drop()
-
-
class ArrayTest(TestBase, AssertsExecutionResults):
+
__only_on__ = 'postgresql'
@classmethod
def setup_class(cls):
global metadata, arrtable
metadata = MetaData(testing.db)
-
- arrtable = Table('arrtable', metadata,
- Column('id', Integer, primary_key=True),
- Column('intarr', postgresql.PGArray(Integer)),
- Column('strarr', postgresql.PGArray(Unicode()), nullable=False)
- )
+ arrtable = Table('arrtable', metadata, Column('id', Integer,
+ primary_key=True), Column('intarr',
+ postgresql.PGArray(Integer)), Column('strarr',
+ postgresql.PGArray(Unicode()), nullable=False))
metadata.create_all()
def teardown(self):
arrtable.delete().execute()
-
+
@classmethod
def teardown_class(cls):
metadata.drop_all()
@@ -1523,80 +1497,98 @@ class ArrayTest(TestBase, AssertsExecutionResults):
assert isinstance(tbl.c.intarr.type.item_type, Integer)
assert isinstance(tbl.c.strarr.type.item_type, String)
- @testing.fails_on('postgresql+zxjdbc', 'zxjdbc has no support for PG arrays')
+ @testing.fails_on('postgresql+zxjdbc',
+ 'zxjdbc has no support for PG arrays')
def test_insert_array(self):
- arrtable.insert().execute(intarr=[1,2,3], strarr=[u'abc', u'def'])
+ arrtable.insert().execute(intarr=[1, 2, 3], strarr=[u'abc',
+ u'def'])
results = arrtable.select().execute().fetchall()
eq_(len(results), 1)
- eq_(results[0]['intarr'], [1,2,3])
- eq_(results[0]['strarr'], ['abc','def'])
+ eq_(results[0]['intarr'], [1, 2, 3])
+ eq_(results[0]['strarr'], ['abc', 'def'])
- @testing.fails_on('postgresql+pg8000', 'pg8000 has poor support for PG arrays')
- @testing.fails_on('postgresql+zxjdbc', 'zxjdbc has no support for PG arrays')
+ @testing.fails_on('postgresql+pg8000',
+ 'pg8000 has poor support for PG arrays')
+ @testing.fails_on('postgresql+zxjdbc',
+ 'zxjdbc has no support for PG arrays')
def test_array_where(self):
- arrtable.insert().execute(intarr=[1,2,3], strarr=[u'abc', u'def'])
- arrtable.insert().execute(intarr=[4,5,6], strarr=u'ABC')
- results = arrtable.select().where(arrtable.c.intarr == [1,2,3]).execute().fetchall()
+ arrtable.insert().execute(intarr=[1, 2, 3], strarr=[u'abc',
+ u'def'])
+ arrtable.insert().execute(intarr=[4, 5, 6], strarr=u'ABC')
+ results = arrtable.select().where(arrtable.c.intarr == [1, 2,
+ 3]).execute().fetchall()
eq_(len(results), 1)
- eq_(results[0]['intarr'], [1,2,3])
-
- @testing.fails_on('postgresql+pg8000', 'pg8000 has poor support for PG arrays')
- @testing.fails_on('postgresql+pypostgresql', 'pypostgresql fails in coercing an array')
- @testing.fails_on('postgresql+zxjdbc', 'zxjdbc has no support for PG arrays')
+ eq_(results[0]['intarr'], [1, 2, 3])
+
+ @testing.fails_on('postgresql+pg8000',
+ 'pg8000 has poor support for PG arrays')
+ @testing.fails_on('postgresql+pypostgresql',
+ 'pypostgresql fails in coercing an array')
+ @testing.fails_on('postgresql+zxjdbc',
+ 'zxjdbc has no support for PG arrays')
def test_array_concat(self):
- arrtable.insert().execute(intarr=[1,2,3], strarr=[u'abc', u'def'])
- results = select([arrtable.c.intarr + [4,5,6]]).execute().fetchall()
+ arrtable.insert().execute(intarr=[1, 2, 3], strarr=[u'abc',
+ u'def'])
+ results = select([arrtable.c.intarr + [4, 5,
+ 6]]).execute().fetchall()
eq_(len(results), 1)
- eq_(results[0][0], [1,2,3,4,5,6])
+ eq_(results[0][0], [ 1, 2, 3, 4, 5, 6, ])
- @testing.fails_on('postgresql+pg8000', 'pg8000 has poor support for PG arrays')
- @testing.fails_on('postgresql+zxjdbc', 'zxjdbc has no support for PG arrays')
+ @testing.fails_on('postgresql+pg8000',
+ 'pg8000 has poor support for PG arrays')
+ @testing.fails_on('postgresql+zxjdbc',
+ 'zxjdbc has no support for PG arrays')
def test_array_subtype_resultprocessor(self):
- arrtable.insert().execute(intarr=[4,5,6], strarr=[[u'm\xe4\xe4'], [u'm\xf6\xf6']])
- arrtable.insert().execute(intarr=[1,2,3], strarr=[u'm\xe4\xe4', u'm\xf6\xf6'])
- results = arrtable.select(order_by=[arrtable.c.intarr]).execute().fetchall()
+ arrtable.insert().execute(intarr=[4, 5, 6],
+ strarr=[[u'm\xe4\xe4'], [u'm\xf6\xf6'
+ ]])
+ arrtable.insert().execute(intarr=[1, 2, 3], strarr=[u'm\xe4\xe4'
+ , u'm\xf6\xf6'])
+ results = \
+ arrtable.select(order_by=[arrtable.c.intarr]).execute().fetchall()
eq_(len(results), 2)
eq_(results[0]['strarr'], [u'm\xe4\xe4', u'm\xf6\xf6'])
eq_(results[1]['strarr'], [[u'm\xe4\xe4'], [u'm\xf6\xf6']])
- @testing.fails_on('postgresql+pg8000', 'pg8000 has poor support for PG arrays')
- @testing.fails_on('postgresql+zxjdbc', 'zxjdbc has no support for PG arrays')
+ @testing.fails_on('postgresql+pg8000',
+ 'pg8000 has poor support for PG arrays')
+ @testing.fails_on('postgresql+zxjdbc',
+ 'zxjdbc has no support for PG arrays')
def test_array_mutability(self):
- class Foo(object): pass
- footable = Table('foo', metadata,
- Column('id', Integer, primary_key=True),
- Column('intarr', postgresql.PGArray(Integer), nullable=True)
- )
+
+ class Foo(object):
+ pass
+
+ footable = Table('foo', metadata, Column('id', Integer,
+ primary_key=True), Column('intarr',
+ postgresql.PGArray(Integer), nullable=True))
mapper(Foo, footable)
metadata.create_all()
sess = create_session()
-
foo = Foo()
foo.id = 1
- foo.intarr = [1,2,3]
+ foo.intarr = [1, 2, 3]
sess.add(foo)
sess.flush()
sess.expunge_all()
foo = sess.query(Foo).get(1)
- eq_(foo.intarr, [1,2,3])
-
+ eq_(foo.intarr, [1, 2, 3])
foo.intarr.append(4)
sess.flush()
sess.expunge_all()
foo = sess.query(Foo).get(1)
- eq_(foo.intarr, [1,2,3,4])
-
+ eq_(foo.intarr, [1, 2, 3, 4])
foo.intarr = []
sess.flush()
sess.expunge_all()
eq_(foo.intarr, [])
-
foo.intarr = None
sess.flush()
sess.expunge_all()
eq_(foo.intarr, None)
# Errors in r4217:
+
foo = Foo()
foo.id = 2
sess.add(foo)
@@ -1614,58 +1606,73 @@ class TimestampTest(TestBase, AssertsExecutionResults):
eq_(result[0], datetime.datetime(2007, 12, 25, 0, 0))
class ServerSideCursorsTest(TestBase, AssertsExecutionResults):
+
__only_on__ = 'postgresql+psycopg2'
@classmethod
def setup_class(cls):
global ss_engine
- ss_engine = engines.testing_engine(options={'server_side_cursors':True})
+ ss_engine = \
+ engines.testing_engine(options={'server_side_cursors'
+ : True})
@classmethod
def teardown_class(cls):
ss_engine.dispose()
def test_uses_ss(self):
- result = ss_engine.execute("select 1")
+ result = ss_engine.execute('select 1')
assert result.cursor.name
-
- result = ss_engine.execute(text("select 1"))
+ result = ss_engine.execute(text('select 1'))
assert result.cursor.name
-
result = ss_engine.execute(select([1]))
assert result.cursor.name
def test_uses_ss_when_explicitly_enabled(self):
- engine = engines.testing_engine(options={'server_side_cursors':False})
- result = engine.execute(text("select 1"))
+ engine = engines.testing_engine(options={'server_side_cursors'
+ : False})
+ result = engine.execute(text('select 1'))
+
# It should be off globally ...
- assert not result.cursor.name
+ assert not result.cursor.name
s = select([1]).execution_options(stream_results=True)
result = engine.execute(s)
+
# ... but enabled for this one.
+
assert result.cursor.name
# and this one
- result = engine.connect().execution_options(stream_results=True).execute("select 1")
+
+ result = \
+ engine.connect().execution_options(stream_results=True).\
+ execute('select 1'
+ )
assert result.cursor.name
-
+
# not this one
- result = engine.connect().execution_options(stream_results=False).execute(s)
+
+ result = \
+ engine.connect().execution_options(stream_results=False).\
+ execute(s)
assert not result.cursor.name
-
+
def test_ss_explicitly_disabled(self):
s = select([1]).execution_options(stream_results=False)
result = ss_engine.execute(s)
assert not result.cursor.name
def test_aliases_and_ss(self):
- engine = engines.testing_engine(options={'server_side_cursors':False})
+ engine = engines.testing_engine(options={'server_side_cursors'
+ : False})
s1 = select([1]).execution_options(stream_results=True).alias()
result = engine.execute(s1)
assert result.cursor.name
- # s1's options shouldn't affect s2 when s2 is used as a from_obj.
+ # s1's options shouldn't affect s2 when s2 is used as a
+ # from_obj.
+
s2 = select([1], from_obj=s1)
result = engine.execute(s2)
assert not result.cursor.name
@@ -1674,39 +1681,45 @@ class ServerSideCursorsTest(TestBase, AssertsExecutionResults):
s1 = select([1], for_update=True)
result = ss_engine.execute(s1)
assert result.cursor.name
-
result = ss_engine.execute('SELECT 1 FOR UPDATE')
assert result.cursor.name
def test_orm_queries_with_ss(self):
metadata = MetaData(testing.db)
- class Foo(object): pass
- footable = Table('foobar', metadata,
- Column('id', Integer, primary_key=True),
- )
+
+
+ class Foo(object):
+
+ pass
+
+
+ footable = Table('foobar', metadata, Column('id', Integer,
+ primary_key=True))
mapper(Foo, footable)
metadata.create_all()
try:
sess = create_session()
-
- engine = engines.testing_engine(options={'server_side_cursors':False})
+ engine = \
+ engines.testing_engine(options={'server_side_cursors'
+ : False})
result = engine.execute(sess.query(Foo).statement)
assert not result.cursor.name, result.cursor.name
result.close()
-
q = sess.query(Foo).execution_options(stream_results=True)
result = engine.execute(q.statement)
assert result.cursor.name
result.close()
-
- result = sess.query(Foo).execution_options(stream_results=True).subquery().execute()
+ result = \
+ sess.query(Foo).execution_options(stream_results=True).\
+ subquery().execute()
assert result.cursor.name
result.close()
finally:
metadata.drop_all()
-
+
def test_text_with_ss(self):
- engine = engines.testing_engine(options={'server_side_cursors':False})
+ engine = engines.testing_engine(options={'server_side_cursors'
+ : False})
s = text('select 42')
result = engine.execute(s)
assert not result.cursor.name
@@ -1714,23 +1727,22 @@ class ServerSideCursorsTest(TestBase, AssertsExecutionResults):
result = engine.execute(s)
assert result.cursor.name
-
def test_roundtrip(self):
test_table = Table('test_table', MetaData(ss_engine),
- Column('id', Integer, primary_key=True),
- Column('data', String(50))
- )
+ Column('id', Integer, primary_key=True),
+ Column('data', String(50)))
test_table.create(checkfirst=True)
try:
test_table.insert().execute(data='data1')
-
nextid = ss_engine.execute(Sequence('test_table_id_seq'))
test_table.insert().execute(id=nextid, data='data2')
-
- eq_(test_table.select().execute().fetchall(), [(1, 'data1'), (2, 'data2')])
-
- test_table.update().where(test_table.c.id==2).values(data=test_table.c.data + ' updated').execute()
- eq_(test_table.select().execute().fetchall(), [(1, 'data1'), (2, 'data2 updated')])
+ eq_(test_table.select().execute().fetchall(), [(1, 'data1'
+ ), (2, 'data2')])
+ test_table.update().where(test_table.c.id
+ == 2).values(data=test_table.c.data + ' updated'
+ ).execute()
+ eq_(test_table.select().execute().fetchall(), [(1, 'data1'
+ ), (2, 'data2 updated')])
test_table.delete().execute()
eq_(test_table.count().scalar(), 0)
finally:
@@ -1790,36 +1802,39 @@ class SpecialTypesTest(TestBase, ComparesTables):
assert t.c.precision_interval.type.precision == 3
class MatchTest(TestBase, AssertsCompiledSQL):
+
__only_on__ = 'postgresql'
- __excluded_on__ = (('postgresql', '<', (8, 3, 0)),)
+ __excluded_on__ = ('postgresql', '<', (8, 3, 0)),
@classmethod
def setup_class(cls):
global metadata, cattable, matchtable
metadata = MetaData(testing.db)
-
- cattable = Table('cattable', metadata,
- Column('id', Integer, primary_key=True),
- Column('description', String(50)),
- )
- matchtable = Table('matchtable', metadata,
- Column('id', Integer, primary_key=True),
- Column('title', String(200)),
- Column('category_id', Integer, ForeignKey('cattable.id')),
- )
+ cattable = Table('cattable', metadata, Column('id', Integer,
+ primary_key=True), Column('description',
+ String(50)))
+ matchtable = Table('matchtable', metadata, Column('id',
+ Integer, primary_key=True), Column('title',
+ String(200)), Column('category_id', Integer,
+ ForeignKey('cattable.id')))
metadata.create_all()
-
- cattable.insert().execute([
- {'id': 1, 'description': 'Python'},
- {'id': 2, 'description': 'Ruby'},
- ])
- matchtable.insert().execute([
- {'id': 1, 'title': 'Agile Web Development with Rails', 'category_id': 2},
- {'id': 2, 'title': 'Dive Into Python', 'category_id': 1},
- {'id': 3, 'title': "Programming Matz's Ruby", 'category_id': 2},
- {'id': 4, 'title': 'The Definitive Guide to Django', 'category_id': 1},
- {'id': 5, 'title': 'Python in a Nutshell', 'category_id': 1}
- ])
+ cattable.insert().execute([{'id': 1, 'description': 'Python'},
+ {'id': 2, 'description': 'Ruby'}])
+ matchtable.insert().execute([{'id': 1, 'title'
+ : 'Agile Web Development with Rails'
+ , 'category_id': 2},
+ {'id': 2,
+ 'title': 'Dive Into Python',
+ 'category_id': 1},
+ {'id': 3, 'title'
+ : "Programming Matz's Ruby",
+ 'category_id': 2},
+ {'id': 4, 'title'
+ : 'The Definitive Guide to Django',
+ 'category_id': 1},
+ {'id': 5, 'title'
+ : 'Python in a Nutshell',
+ 'category_id': 1}])
@classmethod
def teardown_class(cls):
@@ -1828,50 +1843,63 @@ class MatchTest(TestBase, AssertsCompiledSQL):
@testing.fails_on('postgresql+pg8000', 'uses positional')
@testing.fails_on('postgresql+zxjdbc', 'uses qmark')
def test_expression_pyformat(self):
- self.assert_compile(matchtable.c.title.match('somstr'), "matchtable.title @@ to_tsquery(%(title_1)s)")
+ self.assert_compile(matchtable.c.title.match('somstr'),
+ 'matchtable.title @@ to_tsquery(%(title_1)s'
+ ')')
@testing.fails_on('postgresql+psycopg2', 'uses pyformat')
@testing.fails_on('postgresql+pypostgresql', 'uses pyformat')
@testing.fails_on('postgresql+zxjdbc', 'uses qmark')
def test_expression_positional(self):
- self.assert_compile(matchtable.c.title.match('somstr'), "matchtable.title @@ to_tsquery(%s)")
+ self.assert_compile(matchtable.c.title.match('somstr'),
+ 'matchtable.title @@ to_tsquery(%s)')
def test_simple_match(self):
- results = matchtable.select().where(matchtable.c.title.match('python')).order_by(matchtable.c.id).execute().fetchall()
+ results = \
+ matchtable.select().where(matchtable.c.title.match('python'
+ )).order_by(matchtable.c.id).execute().fetchall()
eq_([2, 5], [r.id for r in results])
def test_simple_match_with_apostrophe(self):
- results = matchtable.select().where(matchtable.c.title.match("Matz's")).execute().fetchall()
+ results = \
+ matchtable.select().where(matchtable.c.title.match("Matz's"
+ )).execute().fetchall()
eq_([3], [r.id for r in results])
def test_simple_derivative_match(self):
- results = matchtable.select().where(matchtable.c.title.match('nutshells')).execute().fetchall()
+ results = \
+ matchtable.select().where(matchtable.c.title.match('nutshells'
+ )).execute().fetchall()
eq_([5], [r.id for r in results])
def test_or_match(self):
- results1 = matchtable.select().where(or_(matchtable.c.title.match('nutshells'),
- matchtable.c.title.match('rubies'))
- ).order_by(matchtable.c.id).execute().fetchall()
+ results1 = \
+ matchtable.select().where(or_(matchtable.c.title.match('nutshells'
+ ), matchtable.c.title.match('rubies'
+ ))).order_by(matchtable.c.id).execute().fetchall()
eq_([3, 5], [r.id for r in results1])
- results2 = matchtable.select().where(matchtable.c.title.match('nutshells | rubies'),
- ).order_by(matchtable.c.id).execute().fetchall()
+ results2 = \
+ matchtable.select().where(
+ matchtable.c.title.match('nutshells | rubies'
+ )).order_by(matchtable.c.id).execute().fetchall()
eq_([3, 5], [r.id for r in results2])
-
def test_and_match(self):
- results1 = matchtable.select().where(and_(matchtable.c.title.match('python'),
- matchtable.c.title.match('nutshells'))
- ).execute().fetchall()
+ results1 = \
+ matchtable.select().where(and_(matchtable.c.title.match('python'
+ ), matchtable.c.title.match('nutshells'
+ ))).execute().fetchall()
eq_([5], [r.id for r in results1])
- results2 = matchtable.select().where(matchtable.c.title.match('python & nutshells'),
- ).execute().fetchall()
+ results2 = \
+ matchtable.select().where(
+ matchtable.c.title.match('python & nutshells'
+ )).execute().fetchall()
eq_([5], [r.id for r in results2])
def test_match_across_joins(self):
- results = matchtable.select().where(and_(cattable.c.id==matchtable.c.category_id,
- or_(cattable.c.description.match('Ruby'),
- matchtable.c.title.match('nutshells')))
- ).order_by(matchtable.c.id).execute().fetchall()
+ results = matchtable.select().where(and_(cattable.c.id
+ == matchtable.c.category_id,
+ or_(cattable.c.description.match('Ruby'),
+ matchtable.c.title.match('nutshells'
+ )))).order_by(matchtable.c.id).execute().fetchall()
eq_([1, 3, 5], [r.id for r in results])
-
-