summaryrefslogtreecommitdiff
path: root/test/dialect/test_postgresql.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/dialect/test_postgresql.py')
-rw-r--r--test/dialect/test_postgresql.py3605
1 files changed, 0 insertions, 3605 deletions
diff --git a/test/dialect/test_postgresql.py b/test/dialect/test_postgresql.py
deleted file mode 100644
index ba42015e8..000000000
--- a/test/dialect/test_postgresql.py
+++ /dev/null
@@ -1,3605 +0,0 @@
-# coding: utf-8
-
-
-
-from sqlalchemy.testing.assertions import eq_, assert_raises, \
- assert_raises_message, is_, AssertsExecutionResults, \
- AssertsCompiledSQL, ComparesTables
-from sqlalchemy.testing import engines, fixtures
-from sqlalchemy import testing
-import datetime
-from sqlalchemy import Table, Column, select, MetaData, text, Integer, \
- String, Sequence, ForeignKey, join, Numeric, \
- PrimaryKeyConstraint, DateTime, tuple_, Float, BigInteger, \
- func, literal_column, literal, bindparam, cast, extract, \
- SmallInteger, Enum, REAL, update, insert, Index, delete, \
- and_, Date, TypeDecorator, Time, Unicode, Interval, or_, Text
-from sqlalchemy.orm import Session, mapper, aliased
-from sqlalchemy import exc, schema, types
-from sqlalchemy.dialects.postgresql import base as postgresql
-from sqlalchemy.dialects.postgresql import HSTORE, hstore, array, \
- INT4RANGE, INT8RANGE, NUMRANGE, DATERANGE, TSRANGE, TSTZRANGE, \
- ExcludeConstraint
-import decimal
-from sqlalchemy import util
-from sqlalchemy.testing.util import round_decimal
-from sqlalchemy.sql import table, column, operators
-import logging
-import logging.handlers
-import re
-
-class SequenceTest(fixtures.TestBase, AssertsCompiledSQL):
-
- def test_format(self):
- seq = Sequence('my_seq_no_schema')
- dialect = postgresql.PGDialect()
- assert dialect.identifier_preparer.format_sequence(seq) \
- == 'my_seq_no_schema'
- seq = Sequence('my_seq', schema='some_schema')
- assert dialect.identifier_preparer.format_sequence(seq) \
- == 'some_schema.my_seq'
- seq = Sequence('My_Seq', schema='Some_Schema')
- assert dialect.identifier_preparer.format_sequence(seq) \
- == '"Some_Schema"."My_Seq"'
-
- @testing.only_on('postgresql', 'foo')
- @testing.provide_metadata
- def test_reverse_eng_name(self):
- metadata = self.metadata
- engine = engines.testing_engine(options=dict(implicit_returning=False))
- for tname, cname in [
- ('tb1' * 30, 'abc'),
- ('tb2', 'abc' * 30),
- ('tb3' * 30, 'abc' * 30),
- ('tb4', 'abc'),
- ]:
- t = Table(tname[:57],
- metadata,
- Column(cname[:57], Integer, primary_key=True)
- )
- t.create(engine)
- r = engine.execute(t.insert())
- assert r.inserted_primary_key == [1]
-
-class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
-
- __dialect__ = postgresql.dialect()
-
- def test_update_returning(self):
- dialect = postgresql.dialect()
- table1 = table('mytable', column('myid', Integer), column('name'
- , String(128)), column('description',
- String(128)))
- u = update(table1, values=dict(name='foo'
- )).returning(table1.c.myid, table1.c.name)
- self.assert_compile(u,
- 'UPDATE mytable SET name=%(name)s '
- 'RETURNING mytable.myid, mytable.name',
- dialect=dialect)
- u = update(table1, values=dict(name='foo')).returning(table1)
- self.assert_compile(u,
- 'UPDATE mytable SET name=%(name)s '
- 'RETURNING mytable.myid, mytable.name, '
- 'mytable.description', dialect=dialect)
- u = update(table1, values=dict(name='foo'
- )).returning(func.length(table1.c.name))
- self.assert_compile(u,
- 'UPDATE mytable SET name=%(name)s '
- 'RETURNING length(mytable.name) AS length_1'
- , dialect=dialect)
-
-
- def test_insert_returning(self):
- dialect = postgresql.dialect()
- table1 = table('mytable',
- column('myid', Integer),
- column('name', String(128)),
- column('description', String(128)),
- )
-
- i = insert(table1, values=dict(name='foo'
- )).returning(table1.c.myid, table1.c.name)
- self.assert_compile(i,
- 'INSERT INTO mytable (name) VALUES '
- '(%(name)s) RETURNING mytable.myid, '
- 'mytable.name', dialect=dialect)
- i = insert(table1, values=dict(name='foo')).returning(table1)
- self.assert_compile(i,
- 'INSERT INTO mytable (name) VALUES '
- '(%(name)s) RETURNING mytable.myid, '
- 'mytable.name, mytable.description',
- dialect=dialect)
- i = insert(table1, values=dict(name='foo'
- )).returning(func.length(table1.c.name))
- self.assert_compile(i,
- 'INSERT INTO mytable (name) VALUES '
- '(%(name)s) RETURNING length(mytable.name) '
- 'AS length_1', dialect=dialect)
-
-
- def test_create_partial_index(self):
- m = MetaData()
- tbl = Table('testtbl', m, Column('data', Integer))
- idx = Index('test_idx1', tbl.c.data,
- postgresql_where=and_(tbl.c.data > 5, tbl.c.data
- < 10))
- idx = Index('test_idx1', tbl.c.data,
- postgresql_where=and_(tbl.c.data > 5, tbl.c.data
- < 10))
-
- # test quoting and all that
-
- idx2 = Index('test_idx2', tbl.c.data,
- postgresql_where=and_(tbl.c.data > 'a', tbl.c.data
- < "b's"))
- self.assert_compile(schema.CreateIndex(idx),
- 'CREATE INDEX test_idx1 ON testtbl (data) '
- 'WHERE data > 5 AND data < 10',
- dialect=postgresql.dialect())
- self.assert_compile(schema.CreateIndex(idx2),
- "CREATE INDEX test_idx2 ON testtbl (data) "
- "WHERE data > 'a' AND data < 'b''s'",
- dialect=postgresql.dialect())
-
- def test_create_index_with_ops(self):
- m = MetaData()
- tbl = Table('testtbl', m,
- Column('data', String),
- Column('data2', Integer, key='d2'))
-
- idx = Index('test_idx1', tbl.c.data,
- postgresql_ops={'data': 'text_pattern_ops'})
-
- idx2 = Index('test_idx2', tbl.c.data, tbl.c.d2,
- postgresql_ops={'data': 'text_pattern_ops',
- 'd2': 'int4_ops'})
-
- self.assert_compile(schema.CreateIndex(idx),
- 'CREATE INDEX test_idx1 ON testtbl '
- '(data text_pattern_ops)',
- dialect=postgresql.dialect())
- self.assert_compile(schema.CreateIndex(idx2),
- 'CREATE INDEX test_idx2 ON testtbl '
- '(data text_pattern_ops, data2 int4_ops)',
- dialect=postgresql.dialect())
-
- def test_create_index_with_using(self):
- m = MetaData()
- tbl = Table('testtbl', m, Column('data', String))
-
- idx1 = Index('test_idx1', tbl.c.data)
- idx2 = Index('test_idx2', tbl.c.data, postgresql_using='btree')
- idx3 = Index('test_idx3', tbl.c.data, postgresql_using='hash')
-
- self.assert_compile(schema.CreateIndex(idx1),
- 'CREATE INDEX test_idx1 ON testtbl '
- '(data)',
- dialect=postgresql.dialect())
- self.assert_compile(schema.CreateIndex(idx2),
- 'CREATE INDEX test_idx2 ON testtbl '
- 'USING btree (data)',
- dialect=postgresql.dialect())
- self.assert_compile(schema.CreateIndex(idx3),
- 'CREATE INDEX test_idx3 ON testtbl '
- 'USING hash (data)',
- dialect=postgresql.dialect())
-
- def test_exclude_constraint_min(self):
- m = MetaData()
- tbl = Table('testtbl', m,
- Column('room', Integer, primary_key=True))
- cons = ExcludeConstraint(('room', '='))
- tbl.append_constraint(cons)
- self.assert_compile(schema.AddConstraint(cons),
- 'ALTER TABLE testtbl ADD EXCLUDE USING gist '
- '(room WITH =)',
- dialect=postgresql.dialect())
-
- def test_exclude_constraint_full(self):
- m = MetaData()
- room = Column('room', Integer, primary_key=True)
- tbl = Table('testtbl', m,
- room,
- Column('during', TSRANGE))
- room = Column('room', Integer, primary_key=True)
- cons = ExcludeConstraint((room, '='), ('during', '&&'),
- name='my_name',
- using='gist',
- where="room > 100",
- deferrable=True,
- initially='immediate')
- tbl.append_constraint(cons)
- self.assert_compile(schema.AddConstraint(cons),
- 'ALTER TABLE testtbl ADD CONSTRAINT my_name '
- 'EXCLUDE USING gist '
- '(room WITH =, during WITH ''&&) WHERE '
- '(room > 100) DEFERRABLE INITIALLY immediate',
- dialect=postgresql.dialect())
-
- def test_exclude_constraint_copy(self):
- m = MetaData()
- cons = ExcludeConstraint(('room', '='))
- tbl = Table('testtbl', m,
- Column('room', Integer, primary_key=True),
- cons)
- # apparently you can't copy a ColumnCollectionConstraint until
- # after it has been bound to a table...
- cons_copy = cons.copy()
- tbl.append_constraint(cons_copy)
- self.assert_compile(schema.AddConstraint(cons_copy),
- 'ALTER TABLE testtbl ADD EXCLUDE USING gist '
- '(room WITH =)',
- dialect=postgresql.dialect())
-
- def test_substring(self):
- self.assert_compile(func.substring('abc', 1, 2),
- 'SUBSTRING(%(substring_1)s FROM %(substring_2)s '
- 'FOR %(substring_3)s)')
- self.assert_compile(func.substring('abc', 1),
- 'SUBSTRING(%(substring_1)s FROM %(substring_2)s)')
-
-
-
- def test_extract(self):
- t = table('t', column('col1', DateTime), column('col2', Date),
- column('col3', Time), column('col4',
- postgresql.INTERVAL))
- for field in 'year', 'month', 'day', 'epoch', 'hour':
- for expr, compiled_expr in [ # invalid, no cast. plain
- # text. no cast. addition is
- # commutative subtraction is
- # not invalid - no cast. dont
- # crack up on entirely
- # unsupported types
- (t.c.col1, 't.col1 :: timestamp'),
- (t.c.col2, 't.col2 :: date'),
- (t.c.col3, 't.col3 :: time'),
- (func.current_timestamp() - datetime.timedelta(days=5),
- '(CURRENT_TIMESTAMP - %(current_timestamp_1)s) :: '
- 'timestamp'),
- (func.current_timestamp() + func.current_timestamp(),
- 'CURRENT_TIMESTAMP + CURRENT_TIMESTAMP'),
- (text('foo.date + foo.time'), 'foo.date + foo.time'),
- (func.current_timestamp() + datetime.timedelta(days=5),
- '(CURRENT_TIMESTAMP + %(current_timestamp_1)s) :: '
- 'timestamp'),
- (t.c.col2 + t.c.col3, '(t.col2 + t.col3) :: timestamp'
- ),
- (t.c.col2 + datetime.timedelta(days=5),
- '(t.col2 + %(col2_1)s) :: timestamp'),
- (datetime.timedelta(days=5) + t.c.col2,
- '(%(col2_1)s + t.col2) :: timestamp'),
- (t.c.col1 + t.c.col4, '(t.col1 + t.col4) :: timestamp'
- ),
- (t.c.col1 - datetime.timedelta(seconds=30),
- '(t.col1 - %(col1_1)s) :: timestamp'),
- (datetime.timedelta(seconds=30) - t.c.col1,
- '%(col1_1)s - t.col1'),
- (func.coalesce(t.c.col1, func.current_timestamp()),
- 'coalesce(t.col1, CURRENT_TIMESTAMP) :: timestamp'),
- (t.c.col3 + datetime.timedelta(seconds=30),
- '(t.col3 + %(col3_1)s) :: time'),
- (func.current_timestamp() - func.coalesce(t.c.col1,
- func.current_timestamp()),
- '(CURRENT_TIMESTAMP - coalesce(t.col1, '
- 'CURRENT_TIMESTAMP)) :: interval'),
- (3 * func.foobar(type_=Interval),
- '(%(foobar_1)s * foobar()) :: interval'),
- (literal(datetime.timedelta(seconds=10))
- - literal(datetime.timedelta(seconds=10)),
- '(%(param_1)s - %(param_2)s) :: interval'),
- (t.c.col3 + 'some string', 't.col3 + %(col3_1)s'),
- ]:
- self.assert_compile(select([extract(field,
- expr)]).select_from(t),
- 'SELECT EXTRACT(%s FROM %s) AS '
- 'anon_1 FROM t' % (field,
- compiled_expr))
-
- def test_reserved_words(self):
- table = Table("pg_table", MetaData(),
- Column("col1", Integer),
- Column("variadic", Integer))
- x = select([table.c.col1, table.c.variadic])
-
- self.assert_compile(x,
- '''SELECT pg_table.col1, pg_table."variadic" FROM pg_table''')
-
- def test_array(self):
- c = Column('x', postgresql.ARRAY(Integer))
-
- self.assert_compile(
- cast(c, postgresql.ARRAY(Integer)),
- "CAST(x AS INTEGER[])"
- )
- self.assert_compile(
- c[5],
- "x[%(x_1)s]",
- checkparams={'x_1': 5}
- )
-
- self.assert_compile(
- c[5:7],
- "x[%(x_1)s:%(x_2)s]",
- checkparams={'x_2': 7, 'x_1': 5}
- )
- self.assert_compile(
- c[5:7][2:3],
- "x[%(x_1)s:%(x_2)s][%(param_1)s:%(param_2)s]",
- checkparams={'x_2': 7, 'x_1': 5, 'param_1':2, 'param_2':3}
- )
- self.assert_compile(
- c[5:7][3],
- "x[%(x_1)s:%(x_2)s][%(param_1)s]",
- checkparams={'x_2': 7, 'x_1': 5, 'param_1':3}
- )
-
- self.assert_compile(
- c.contains([1]),
- 'x @> %(x_1)s',
- checkparams={'x_1': [1]}
- )
- self.assert_compile(
- c.contained_by([2]),
- 'x <@ %(x_1)s',
- checkparams={'x_1': [2]}
- )
- self.assert_compile(
- c.overlap([3]),
- 'x && %(x_1)s',
- checkparams={'x_1': [3]}
- )
- self.assert_compile(
- postgresql.Any(4, c),
- '%(param_1)s = ANY (x)',
- checkparams={'param_1': 4}
- )
- self.assert_compile(
- c.any(5, operator=operators.ne),
- '%(param_1)s != ANY (x)',
- checkparams={'param_1': 5}
- )
- self.assert_compile(
- postgresql.All(6, c, operator=operators.gt),
- '%(param_1)s > ALL (x)',
- checkparams={'param_1': 6}
- )
- self.assert_compile(
- c.all(7, operator=operators.lt),
- '%(param_1)s < ALL (x)',
- checkparams={'param_1': 7}
- )
-
- def test_array_literal_type(self):
- is_(postgresql.array([1, 2]).type._type_affinity, postgresql.ARRAY)
- is_(postgresql.array([1, 2]).type.item_type._type_affinity, Integer)
-
- is_(postgresql.array([1, 2], type_=String).
- type.item_type._type_affinity, String)
-
- def test_array_literal(self):
- self.assert_compile(
- func.array_dims(postgresql.array([1, 2]) +
- postgresql.array([3, 4, 5])),
- "array_dims(ARRAY[%(param_1)s, %(param_2)s] || "
- "ARRAY[%(param_3)s, %(param_4)s, %(param_5)s])",
- checkparams={'param_5': 5, 'param_4': 4, 'param_1': 1,
- 'param_3': 3, 'param_2': 2}
- )
-
- def test_array_literal_insert(self):
- m = MetaData()
- t = Table('t', m, Column('data', postgresql.ARRAY(Integer)))
- self.assert_compile(
- t.insert().values(data=array([1, 2, 3])),
- "INSERT INTO t (data) VALUES (ARRAY[%(param_1)s, "
- "%(param_2)s, %(param_3)s])"
- )
-
- def test_update_array_element(self):
- m = MetaData()
- t = Table('t', m, Column('data', postgresql.ARRAY(Integer)))
- self.assert_compile(
- t.update().values({t.c.data[5]: 1}),
- "UPDATE t SET data[%(data_1)s]=%(param_1)s",
- checkparams={'data_1': 5, 'param_1': 1}
- )
-
- def test_update_array_slice(self):
- m = MetaData()
- t = Table('t', m, Column('data', postgresql.ARRAY(Integer)))
- self.assert_compile(
- t.update().values({t.c.data[2:5]: 2}),
- "UPDATE t SET data[%(data_1)s:%(data_2)s]=%(param_1)s",
- checkparams={'param_1': 2, 'data_2': 5, 'data_1': 2}
-
- )
-
- def test_from_only(self):
- m = MetaData()
- tbl1 = Table('testtbl1', m, Column('id', Integer))
- tbl2 = Table('testtbl2', m, Column('id', Integer))
-
- stmt = tbl1.select().with_hint(tbl1, 'ONLY', 'postgresql')
- expected = 'SELECT testtbl1.id FROM ONLY testtbl1'
- self.assert_compile(stmt, expected)
-
- talias1 = tbl1.alias('foo')
- stmt = talias1.select().with_hint(talias1, 'ONLY', 'postgresql')
- expected = 'SELECT foo.id FROM ONLY testtbl1 AS foo'
- self.assert_compile(stmt, expected)
-
- stmt = select([tbl1, tbl2]).with_hint(tbl1, 'ONLY', 'postgresql')
- expected = ('SELECT testtbl1.id, testtbl2.id FROM ONLY testtbl1, '
- 'testtbl2')
- self.assert_compile(stmt, expected)
-
- stmt = select([tbl1, tbl2]).with_hint(tbl2, 'ONLY', 'postgresql')
- expected = ('SELECT testtbl1.id, testtbl2.id FROM testtbl1, ONLY '
- 'testtbl2')
- self.assert_compile(stmt, expected)
-
- stmt = select([tbl1, tbl2])
- stmt = stmt.with_hint(tbl1, 'ONLY', 'postgresql')
- stmt = stmt.with_hint(tbl2, 'ONLY', 'postgresql')
- expected = ('SELECT testtbl1.id, testtbl2.id FROM ONLY testtbl1, '
- 'ONLY testtbl2')
- self.assert_compile(stmt, expected)
-
- stmt = update(tbl1, values=dict(id=1))
- stmt = stmt.with_hint('ONLY', dialect_name='postgresql')
- expected = 'UPDATE ONLY testtbl1 SET id=%(id)s'
- self.assert_compile(stmt, expected)
-
- stmt = delete(tbl1).with_hint('ONLY', selectable=tbl1, dialect_name='postgresql')
- expected = 'DELETE FROM ONLY testtbl1'
- self.assert_compile(stmt, expected)
-
- tbl3 = Table('testtbl3', m, Column('id', Integer), schema='testschema')
- stmt = tbl3.select().with_hint(tbl3, 'ONLY', 'postgresql')
- expected = 'SELECT testschema.testtbl3.id FROM ONLY testschema.testtbl3'
- self.assert_compile(stmt, expected)
-
- assert_raises(
- exc.CompileError,
- tbl3.select().with_hint(tbl3, "FAKE", "postgresql").compile,
- dialect=postgresql.dialect()
- )
-
-class FloatCoercionTest(fixtures.TablesTest, AssertsExecutionResults):
- __only_on__ = 'postgresql'
- __dialect__ = postgresql.dialect()
-
- @classmethod
- def define_tables(cls, metadata):
- data_table = Table('data_table', metadata,
- Column('id', Integer, primary_key=True),
- Column('data', Integer)
- )
-
- @classmethod
- def insert_data(cls):
- data_table = cls.tables.data_table
-
- data_table.insert().execute(
- {'data':3},
- {'data':5},
- {'data':7},
- {'data':2},
- {'data':15},
- {'data':12},
- {'data':6},
- {'data':478},
- {'data':52},
- {'data':9},
- )
-
- @testing.fails_on('postgresql+zxjdbc',
- 'XXX: postgresql+zxjdbc currently returns a Decimal result for Float')
- def test_float_coercion(self):
- data_table = self.tables.data_table
-
- for type_, result in [
- (Numeric, decimal.Decimal('140.381230939')),
- (Float, 140.381230939),
- (Float(asdecimal=True), decimal.Decimal('140.381230939')),
- (Numeric(asdecimal=False), 140.381230939),
- ]:
- ret = testing.db.execute(
- select([
- func.stddev_pop(data_table.c.data, type_=type_)
- ])
- ).scalar()
-
- eq_(round_decimal(ret, 9), result)
-
- ret = testing.db.execute(
- select([
- cast(func.stddev_pop(data_table.c.data), type_)
- ])
- ).scalar()
- eq_(round_decimal(ret, 9), result)
-
- @testing.fails_on('postgresql+zxjdbc',
- 'zxjdbc has no support for PG arrays')
- @testing.provide_metadata
- def test_arrays(self):
- metadata = self.metadata
- t1 = Table('t', metadata,
- Column('x', postgresql.ARRAY(Float)),
- Column('y', postgresql.ARRAY(REAL)),
- Column('z', postgresql.ARRAY(postgresql.DOUBLE_PRECISION)),
- Column('q', postgresql.ARRAY(Numeric))
- )
- metadata.create_all()
- t1.insert().execute(x=[5], y=[5], z=[6], q=[decimal.Decimal("6.4")])
- row = t1.select().execute().first()
- eq_(
- row,
- ([5], [5], [6], [decimal.Decimal("6.4")])
- )
-
-class EnumTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
-
- __only_on__ = 'postgresql'
- __dialect__ = postgresql.dialect()
-
- def test_compile(self):
- e1 = Enum('x', 'y', 'z', name='somename')
- e2 = Enum('x', 'y', 'z', name='somename', schema='someschema')
- self.assert_compile(postgresql.CreateEnumType(e1),
- "CREATE TYPE somename AS ENUM ('x','y','z')"
- )
- self.assert_compile(postgresql.CreateEnumType(e2),
- "CREATE TYPE someschema.somename AS ENUM "
- "('x','y','z')")
- self.assert_compile(postgresql.DropEnumType(e1),
- 'DROP TYPE somename')
- self.assert_compile(postgresql.DropEnumType(e2),
- 'DROP TYPE someschema.somename')
- t1 = Table('sometable', MetaData(), Column('somecolumn', e1))
- self.assert_compile(schema.CreateTable(t1),
- 'CREATE TABLE sometable (somecolumn '
- 'somename)')
- t1 = Table('sometable', MetaData(), Column('somecolumn',
- Enum('x', 'y', 'z', native_enum=False)))
- self.assert_compile(schema.CreateTable(t1),
- "CREATE TABLE sometable (somecolumn "
- "VARCHAR(1), CHECK (somecolumn IN ('x', "
- "'y', 'z')))")
-
- @testing.fails_on('postgresql+zxjdbc',
- 'zxjdbc fails on ENUM: column "XXX" is of type '
- 'XXX but expression is of type character varying')
- @testing.fails_on('postgresql+pg8000',
- 'zxjdbc fails on ENUM: column "XXX" is of type '
- 'XXX but expression is of type text')
- def test_create_table(self):
- metadata = MetaData(testing.db)
- t1 = Table('table', metadata, Column('id', Integer,
- primary_key=True), Column('value', Enum('one', 'two'
- , 'three', name='onetwothreetype')))
- t1.create()
- t1.create(checkfirst=True) # check the create
- try:
- t1.insert().execute(value='two')
- t1.insert().execute(value='three')
- t1.insert().execute(value='three')
- eq_(t1.select().order_by(t1.c.id).execute().fetchall(),
- [(1, 'two'), (2, 'three'), (3, 'three')])
- finally:
- metadata.drop_all()
- metadata.drop_all()
-
- def test_name_required(self):
- metadata = MetaData(testing.db)
- etype = Enum('four', 'five', 'six', metadata=metadata)
- assert_raises(exc.CompileError, etype.create)
- assert_raises(exc.CompileError, etype.compile,
- dialect=postgresql.dialect())
-
- @testing.fails_on('postgresql+zxjdbc',
- 'zxjdbc fails on ENUM: column "XXX" is of type '
- 'XXX but expression is of type character varying')
- @testing.fails_on('postgresql+pg8000',
- 'zxjdbc fails on ENUM: column "XXX" is of type '
- 'XXX but expression is of type text')
- @testing.provide_metadata
- def test_unicode_labels(self):
- metadata = self.metadata
- t1 = Table('table', metadata,
- Column('id', Integer, primary_key=True),
- Column('value',
- Enum(util.u('réveillé'), util.u('drôle'), util.u('S’il'),
- name='onetwothreetype'))
- )
- metadata.create_all()
- t1.insert().execute(value=util.u('drôle'))
- t1.insert().execute(value=util.u('réveillé'))
- t1.insert().execute(value=util.u('S’il'))
- eq_(t1.select().order_by(t1.c.id).execute().fetchall(),
- [(1, util.u('drôle')), (2, util.u('réveillé')),
- (3, util.u('S’il'))]
- )
- m2 = MetaData(testing.db)
- t2 = Table('table', m2, autoload=True)
- eq_(
- t2.c.value.type.enums,
- (util.u('réveillé'), util.u('drôle'), util.u('S’il'))
- )
-
- def test_non_native_type(self):
- metadata = MetaData()
- t1 = Table('foo', metadata, Column('bar', Enum('one', 'two',
- 'three', name='myenum', native_enum=False)))
-
- def go():
- t1.create(testing.db)
-
- try:
- self.assert_sql(testing.db, go, [],
- with_sequences=[("CREATE TABLE foo (\tbar "
- "VARCHAR(5), \tCONSTRAINT myenum CHECK "
- "(bar IN ('one', 'two', 'three')))", {})])
- finally:
- metadata.drop_all(testing.db)
-
- @testing.provide_metadata
- def test_disable_create(self):
- metadata = self.metadata
-
- e1 = postgresql.ENUM('one', 'two', 'three',
- name="myenum",
- create_type=False)
-
- t1 = Table('e1', metadata,
- Column('c1', e1)
- )
- # table can be created separately
- # without conflict
- e1.create(bind=testing.db)
- t1.create(testing.db)
- t1.drop(testing.db)
- e1.drop(bind=testing.db)
-
- @testing.provide_metadata
- def test_generate_multiple(self):
- """Test that the same enum twice only generates once
- for the create_all() call, without using checkfirst.
-
- A 'memo' collection held by the DDL runner
- now handles this.
-
- """
- metadata = self.metadata
-
- e1 = Enum('one', 'two', 'three',
- name="myenum")
- t1 = Table('e1', metadata,
- Column('c1', e1)
- )
-
- t2 = Table('e2', metadata,
- Column('c1', e1)
- )
-
- metadata.create_all(checkfirst=False)
- metadata.drop_all(checkfirst=False)
-
- def test_non_native_dialect(self):
- engine = engines.testing_engine()
- engine.connect()
- engine.dialect.supports_native_enum = False
- metadata = MetaData()
- t1 = Table('foo', metadata, Column('bar', Enum('one', 'two',
- 'three', name='myenum')))
-
- def go():
- t1.create(engine)
-
- try:
- self.assert_sql(engine, go, [],
- with_sequences=[("CREATE TABLE foo (\tbar "
- "VARCHAR(5), \tCONSTRAINT myenum CHECK "
- "(bar IN ('one', 'two', 'three')))", {})])
- finally:
- metadata.drop_all(engine)
-
- def test_standalone_enum(self):
- metadata = MetaData(testing.db)
- etype = Enum('four', 'five', 'six', name='fourfivesixtype',
- metadata=metadata)
- etype.create()
- try:
- assert testing.db.dialect.has_type(testing.db,
- 'fourfivesixtype')
- finally:
- etype.drop()
- assert not testing.db.dialect.has_type(testing.db,
- 'fourfivesixtype')
- metadata.create_all()
- try:
- assert testing.db.dialect.has_type(testing.db,
- 'fourfivesixtype')
- finally:
- metadata.drop_all()
- assert not testing.db.dialect.has_type(testing.db,
- 'fourfivesixtype')
-
- def test_no_support(self):
- def server_version_info(self):
- return (8, 2)
-
- e = engines.testing_engine()
- dialect = e.dialect
- dialect._get_server_version_info = server_version_info
-
- assert dialect.supports_native_enum
- e.connect()
- assert not dialect.supports_native_enum
-
- # initialize is called again on new pool
- e.dispose()
- e.connect()
- assert not dialect.supports_native_enum
-
-
- def test_reflection(self):
- metadata = MetaData(testing.db)
- etype = Enum('four', 'five', 'six', name='fourfivesixtype',
- metadata=metadata)
- t1 = Table('table', metadata, Column('id', Integer,
- primary_key=True), Column('value', Enum('one', 'two'
- , 'three', name='onetwothreetype')), Column('value2'
- , etype))
- metadata.create_all()
- try:
- m2 = MetaData(testing.db)
- t2 = Table('table', m2, autoload=True)
- assert t2.c.value.type.enums == ('one', 'two', 'three')
- assert t2.c.value.type.name == 'onetwothreetype'
- assert t2.c.value2.type.enums == ('four', 'five', 'six')
- assert t2.c.value2.type.name == 'fourfivesixtype'
- finally:
- metadata.drop_all()
-
- def test_schema_reflection(self):
- metadata = MetaData(testing.db)
- etype = Enum(
- 'four',
- 'five',
- 'six',
- name='fourfivesixtype',
- schema='test_schema',
- metadata=metadata,
- )
- t1 = Table('table', metadata, Column('id', Integer,
- primary_key=True), Column('value', Enum('one', 'two'
- , 'three', name='onetwothreetype',
- schema='test_schema')), Column('value2', etype))
- metadata.create_all()
- try:
- m2 = MetaData(testing.db)
- t2 = Table('table', m2, autoload=True)
- assert t2.c.value.type.enums == ('one', 'two', 'three')
- assert t2.c.value.type.name == 'onetwothreetype'
- assert t2.c.value2.type.enums == ('four', 'five', 'six')
- assert t2.c.value2.type.name == 'fourfivesixtype'
- assert t2.c.value2.type.schema == 'test_schema'
- finally:
- metadata.drop_all()
-
-class NumericInterpretationTest(fixtures.TestBase):
- __only_on__ = 'postgresql'
-
- def test_numeric_codes(self):
- from sqlalchemy.dialects.postgresql import pg8000, psycopg2, base
-
- for dialect in (pg8000.dialect(), psycopg2.dialect()):
-
- typ = Numeric().dialect_impl(dialect)
- for code in base._INT_TYPES + base._FLOAT_TYPES + \
- base._DECIMAL_TYPES:
- proc = typ.result_processor(dialect, code)
- val = 23.7
- if proc is not None:
- val = proc(val)
- assert val in (23.7, decimal.Decimal("23.7"))
-
- @testing.provide_metadata
- def test_numeric_default(self):
- metadata = self.metadata
- # pg8000 appears to fail when the value is 0,
- # returns an int instead of decimal.
- t =Table('t', metadata,
- Column('id', Integer, primary_key=True),
- Column('nd', Numeric(asdecimal=True), default=1),
- Column('nf', Numeric(asdecimal=False), default=1),
- Column('fd', Float(asdecimal=True), default=1),
- Column('ff', Float(asdecimal=False), default=1),
- )
- metadata.create_all()
- r = t.insert().execute()
-
- row = t.select().execute().first()
- assert isinstance(row[1], decimal.Decimal)
- assert isinstance(row[2], float)
- assert isinstance(row[3], decimal.Decimal)
- assert isinstance(row[4], float)
- eq_(
- row,
- (1, decimal.Decimal("1"), 1, decimal.Decimal("1"), 1)
- )
-
-class InsertTest(fixtures.TestBase, AssertsExecutionResults):
-
- __only_on__ = 'postgresql'
-
- @classmethod
- def setup_class(cls):
- global metadata
- cls.engine = testing.db
- metadata = MetaData(testing.db)
-
- def teardown(self):
- metadata.drop_all()
- metadata.clear()
- if self.engine is not testing.db:
- self.engine.dispose()
-
- def test_compiled_insert(self):
- table = Table('testtable', metadata, Column('id', Integer,
- primary_key=True), Column('data', String(30)))
- metadata.create_all()
- ins = table.insert(inline=True, values={'data': bindparam('x'
- )}).compile()
- ins.execute({'x': 'five'}, {'x': 'seven'})
- assert table.select().execute().fetchall() == [(1, 'five'), (2,
- 'seven')]
-
- def test_foreignkey_missing_insert(self):
- t1 = Table('t1', metadata, Column('id', Integer,
- primary_key=True))
- t2 = Table('t2', metadata, Column('id', Integer,
- ForeignKey('t1.id'), primary_key=True))
- metadata.create_all()
-
- # want to ensure that "null value in column "id" violates not-
- # null constraint" is raised (IntegrityError on psycoopg2, but
- # ProgrammingError on pg8000), and not "ProgrammingError:
- # (ProgrammingError) relationship "t2_id_seq" does not exist".
- # the latter corresponds to autoincrement behavior, which is not
- # the case here due to the foreign key.
-
- for eng in [engines.testing_engine(options={'implicit_returning'
- : False}),
- engines.testing_engine(options={'implicit_returning'
- : True})]:
- assert_raises_message(exc.DBAPIError,
- 'violates not-null constraint',
- eng.execute, t2.insert())
-
- def test_sequence_insert(self):
- table = Table('testtable', metadata, Column('id', Integer,
- Sequence('my_seq'), primary_key=True),
- Column('data', String(30)))
- metadata.create_all()
- self._assert_data_with_sequence(table, 'my_seq')
-
- def test_sequence_returning_insert(self):
- table = Table('testtable', metadata, Column('id', Integer,
- Sequence('my_seq'), primary_key=True),
- Column('data', String(30)))
- metadata.create_all()
- self._assert_data_with_sequence_returning(table, 'my_seq')
-
- def test_opt_sequence_insert(self):
- table = Table('testtable', metadata, Column('id', Integer,
- Sequence('my_seq', optional=True),
- primary_key=True), Column('data', String(30)))
- metadata.create_all()
- self._assert_data_autoincrement(table)
-
- def test_opt_sequence_returning_insert(self):
- table = Table('testtable', metadata, Column('id', Integer,
- Sequence('my_seq', optional=True),
- primary_key=True), Column('data', String(30)))
- metadata.create_all()
- self._assert_data_autoincrement_returning(table)
-
- def test_autoincrement_insert(self):
- table = Table('testtable', metadata, Column('id', Integer,
- primary_key=True), Column('data', String(30)))
- metadata.create_all()
- self._assert_data_autoincrement(table)
-
- def test_autoincrement_returning_insert(self):
- table = Table('testtable', metadata, Column('id', Integer,
- primary_key=True), Column('data', String(30)))
- metadata.create_all()
- self._assert_data_autoincrement_returning(table)
-
- def test_noautoincrement_insert(self):
- table = Table('testtable', metadata, Column('id', Integer,
- primary_key=True, autoincrement=False),
- Column('data', String(30)))
- metadata.create_all()
- self._assert_data_noautoincrement(table)
-
- def _assert_data_autoincrement(self, table):
- self.engine = \
- engines.testing_engine(options={'implicit_returning'
- : False})
- metadata.bind = self.engine
-
- def go():
-
- # execute with explicit id
-
- r = table.insert().execute({'id': 30, 'data': 'd1'})
- assert r.inserted_primary_key == [30]
-
- # execute with prefetch id
-
- r = table.insert().execute({'data': 'd2'})
- assert r.inserted_primary_key == [1]
-
- # executemany with explicit ids
-
- table.insert().execute({'id': 31, 'data': 'd3'}, {'id': 32,
- 'data': 'd4'})
-
- # executemany, uses SERIAL
-
- table.insert().execute({'data': 'd5'}, {'data': 'd6'})
-
- # single execute, explicit id, inline
-
- table.insert(inline=True).execute({'id': 33, 'data': 'd7'})
-
- # single execute, inline, uses SERIAL
-
- table.insert(inline=True).execute({'data': 'd8'})
-
- # note that the test framework doesnt capture the "preexecute"
- # of a seqeuence or default. we just see it in the bind params.
-
- self.assert_sql(self.engine, go, [], with_sequences=[
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
- {'id': 30, 'data': 'd1'}),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
- {'id': 1, 'data': 'd2'}),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
- [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]),
- ('INSERT INTO testtable (data) VALUES (:data)', [{'data'
- : 'd5'}, {'data': 'd6'}]),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
- [{'id': 33, 'data': 'd7'}]),
- ('INSERT INTO testtable (data) VALUES (:data)', [{'data'
- : 'd8'}]),
- ])
- assert table.select().execute().fetchall() == [
- (30, 'd1'),
- (1, 'd2'),
- (31, 'd3'),
- (32, 'd4'),
- (2, 'd5'),
- (3, 'd6'),
- (33, 'd7'),
- (4, 'd8'),
- ]
- table.delete().execute()
-
- # test the same series of events using a reflected version of
- # the table
-
- m2 = MetaData(self.engine)
- table = Table(table.name, m2, autoload=True)
-
- def go():
- table.insert().execute({'id': 30, 'data': 'd1'})
- r = table.insert().execute({'data': 'd2'})
- assert r.inserted_primary_key == [5]
- table.insert().execute({'id': 31, 'data': 'd3'}, {'id': 32,
- 'data': 'd4'})
- table.insert().execute({'data': 'd5'}, {'data': 'd6'})
- table.insert(inline=True).execute({'id': 33, 'data': 'd7'})
- table.insert(inline=True).execute({'data': 'd8'})
-
- self.assert_sql(self.engine, go, [], with_sequences=[
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
- {'id': 30, 'data': 'd1'}),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
- {'id': 5, 'data': 'd2'}),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
- [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]),
- ('INSERT INTO testtable (data) VALUES (:data)', [{'data'
- : 'd5'}, {'data': 'd6'}]),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
- [{'id': 33, 'data': 'd7'}]),
- ('INSERT INTO testtable (data) VALUES (:data)', [{'data'
- : 'd8'}]),
- ])
- assert table.select().execute().fetchall() == [
- (30, 'd1'),
- (5, 'd2'),
- (31, 'd3'),
- (32, 'd4'),
- (6, 'd5'),
- (7, 'd6'),
- (33, 'd7'),
- (8, 'd8'),
- ]
- table.delete().execute()
-
- def _assert_data_autoincrement_returning(self, table):
- self.engine = \
- engines.testing_engine(options={'implicit_returning': True})
- metadata.bind = self.engine
-
- def go():
-
- # execute with explicit id
-
- r = table.insert().execute({'id': 30, 'data': 'd1'})
- assert r.inserted_primary_key == [30]
-
- # execute with prefetch id
-
- r = table.insert().execute({'data': 'd2'})
- assert r.inserted_primary_key == [1]
-
- # executemany with explicit ids
-
- table.insert().execute({'id': 31, 'data': 'd3'}, {'id': 32,
- 'data': 'd4'})
-
- # executemany, uses SERIAL
-
- table.insert().execute({'data': 'd5'}, {'data': 'd6'})
-
- # single execute, explicit id, inline
-
- table.insert(inline=True).execute({'id': 33, 'data': 'd7'})
-
- # single execute, inline, uses SERIAL
-
- table.insert(inline=True).execute({'data': 'd8'})
-
- self.assert_sql(self.engine, go, [], with_sequences=[
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
- {'id': 30, 'data': 'd1'}),
- ('INSERT INTO testtable (data) VALUES (:data) RETURNING '
- 'testtable.id', {'data': 'd2'}),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
- [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]),
- ('INSERT INTO testtable (data) VALUES (:data)', [{'data'
- : 'd5'}, {'data': 'd6'}]),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
- [{'id': 33, 'data': 'd7'}]),
- ('INSERT INTO testtable (data) VALUES (:data)', [{'data'
- : 'd8'}]),
- ])
- assert table.select().execute().fetchall() == [
- (30, 'd1'),
- (1, 'd2'),
- (31, 'd3'),
- (32, 'd4'),
- (2, 'd5'),
- (3, 'd6'),
- (33, 'd7'),
- (4, 'd8'),
- ]
- table.delete().execute()
-
- # test the same series of events using a reflected version of
- # the table
-
- m2 = MetaData(self.engine)
- table = Table(table.name, m2, autoload=True)
-
- def go():
- table.insert().execute({'id': 30, 'data': 'd1'})
- r = table.insert().execute({'data': 'd2'})
- assert r.inserted_primary_key == [5]
- table.insert().execute({'id': 31, 'data': 'd3'}, {'id': 32,
- 'data': 'd4'})
- table.insert().execute({'data': 'd5'}, {'data': 'd6'})
- table.insert(inline=True).execute({'id': 33, 'data': 'd7'})
- table.insert(inline=True).execute({'data': 'd8'})
-
- self.assert_sql(self.engine, go, [], with_sequences=[
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
- {'id': 30, 'data': 'd1'}),
- ('INSERT INTO testtable (data) VALUES (:data) RETURNING '
- 'testtable.id', {'data': 'd2'}),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
- [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]),
- ('INSERT INTO testtable (data) VALUES (:data)', [{'data'
- : 'd5'}, {'data': 'd6'}]),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
- [{'id': 33, 'data': 'd7'}]),
- ('INSERT INTO testtable (data) VALUES (:data)', [{'data'
- : 'd8'}]),
- ])
- assert table.select().execute().fetchall() == [
- (30, 'd1'),
- (5, 'd2'),
- (31, 'd3'),
- (32, 'd4'),
- (6, 'd5'),
- (7, 'd6'),
- (33, 'd7'),
- (8, 'd8'),
- ]
- table.delete().execute()
-
- def _assert_data_with_sequence(self, table, seqname):
- self.engine = \
- engines.testing_engine(options={'implicit_returning'
- : False})
- metadata.bind = self.engine
-
- def go():
- table.insert().execute({'id': 30, 'data': 'd1'})
- table.insert().execute({'data': 'd2'})
- table.insert().execute({'id': 31, 'data': 'd3'}, {'id': 32,
- 'data': 'd4'})
- table.insert().execute({'data': 'd5'}, {'data': 'd6'})
- table.insert(inline=True).execute({'id': 33, 'data': 'd7'})
- table.insert(inline=True).execute({'data': 'd8'})
-
- self.assert_sql(self.engine, go, [], with_sequences=[
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
- {'id': 30, 'data': 'd1'}),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
- {'id': 1, 'data': 'd2'}),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
- [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]),
- ("INSERT INTO testtable (id, data) VALUES (nextval('%s'), "
- ":data)" % seqname, [{'data': 'd5'}, {'data': 'd6'}]),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
- [{'id': 33, 'data': 'd7'}]),
- ("INSERT INTO testtable (id, data) VALUES (nextval('%s'), "
- ":data)" % seqname, [{'data': 'd8'}]),
- ])
- assert table.select().execute().fetchall() == [
- (30, 'd1'),
- (1, 'd2'),
- (31, 'd3'),
- (32, 'd4'),
- (2, 'd5'),
- (3, 'd6'),
- (33, 'd7'),
- (4, 'd8'),
- ]
-
- # cant test reflection here since the Sequence must be
- # explicitly specified
-
- def _assert_data_with_sequence_returning(self, table, seqname):
- self.engine = \
- engines.testing_engine(options={'implicit_returning': True})
- metadata.bind = self.engine
-
- def go():
- table.insert().execute({'id': 30, 'data': 'd1'})
- table.insert().execute({'data': 'd2'})
- table.insert().execute({'id': 31, 'data': 'd3'}, {'id': 32,
- 'data': 'd4'})
- table.insert().execute({'data': 'd5'}, {'data': 'd6'})
- table.insert(inline=True).execute({'id': 33, 'data': 'd7'})
- table.insert(inline=True).execute({'data': 'd8'})
-
- self.assert_sql(self.engine, go, [], with_sequences=[
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
- {'id': 30, 'data': 'd1'}),
- ("INSERT INTO testtable (id, data) VALUES "
- "(nextval('my_seq'), :data) RETURNING testtable.id",
- {'data': 'd2'}),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
- [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]),
- ("INSERT INTO testtable (id, data) VALUES (nextval('%s'), "
- ":data)" % seqname, [{'data': 'd5'}, {'data': 'd6'}]),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
- [{'id': 33, 'data': 'd7'}]),
- ("INSERT INTO testtable (id, data) VALUES (nextval('%s'), "
- ":data)" % seqname, [{'data': 'd8'}]),
- ])
- assert table.select().execute().fetchall() == [
- (30, 'd1'),
- (1, 'd2'),
- (31, 'd3'),
- (32, 'd4'),
- (2, 'd5'),
- (3, 'd6'),
- (33, 'd7'),
- (4, 'd8'),
- ]
-
- # cant test reflection here since the Sequence must be
- # explicitly specified
-
- def _assert_data_noautoincrement(self, table):
- self.engine = \
- engines.testing_engine(options={'implicit_returning'
- : False})
- metadata.bind = self.engine
- table.insert().execute({'id': 30, 'data': 'd1'})
- if self.engine.driver == 'pg8000':
- exception_cls = exc.ProgrammingError
- elif self.engine.driver == 'pypostgresql':
- exception_cls = Exception
- else:
- exception_cls = exc.IntegrityError
- assert_raises_message(exception_cls,
- 'violates not-null constraint',
- table.insert().execute, {'data': 'd2'})
- assert_raises_message(exception_cls,
- 'violates not-null constraint',
- table.insert().execute, {'data': 'd2'},
- {'data': 'd3'})
- assert_raises_message(exception_cls,
- 'violates not-null constraint',
- table.insert().execute, {'data': 'd2'})
- assert_raises_message(exception_cls,
- 'violates not-null constraint',
- table.insert().execute, {'data': 'd2'},
- {'data': 'd3'})
- table.insert().execute({'id': 31, 'data': 'd2'}, {'id': 32,
- 'data': 'd3'})
- table.insert(inline=True).execute({'id': 33, 'data': 'd4'})
- assert table.select().execute().fetchall() == [(30, 'd1'), (31,
- 'd2'), (32, 'd3'), (33, 'd4')]
- table.delete().execute()
-
- # test the same series of events using a reflected version of
- # the table
-
- m2 = MetaData(self.engine)
- table = Table(table.name, m2, autoload=True)
- table.insert().execute({'id': 30, 'data': 'd1'})
- assert_raises_message(exception_cls,
- 'violates not-null constraint',
- table.insert().execute, {'data': 'd2'})
- assert_raises_message(exception_cls,
- 'violates not-null constraint',
- table.insert().execute, {'data': 'd2'},
- {'data': 'd3'})
- table.insert().execute({'id': 31, 'data': 'd2'}, {'id': 32,
- 'data': 'd3'})
- table.insert(inline=True).execute({'id': 33, 'data': 'd4'})
- assert table.select().execute().fetchall() == [(30, 'd1'), (31,
- 'd2'), (32, 'd3'), (33, 'd4')]
-
-class DomainReflectionTest(fixtures.TestBase, AssertsExecutionResults):
-
- """Test PostgreSQL domains"""
-
- __only_on__ = 'postgresql'
-
- @classmethod
- def setup_class(cls):
- con = testing.db.connect()
- for ddl in \
- 'CREATE DOMAIN testdomain INTEGER NOT NULL DEFAULT 42', \
- 'CREATE DOMAIN test_schema.testdomain INTEGER DEFAULT 0', \
- "CREATE TYPE testtype AS ENUM ('test')", \
- 'CREATE DOMAIN enumdomain AS testtype'\
- :
- try:
- con.execute(ddl)
- except exc.DBAPIError as e:
- if not 'already exists' in str(e):
- raise e
- con.execute('CREATE TABLE testtable (question integer, answer '
- 'testdomain)')
- con.execute('CREATE TABLE test_schema.testtable(question '
- 'integer, answer test_schema.testdomain, anything '
- 'integer)')
- con.execute('CREATE TABLE crosschema (question integer, answer '
- 'test_schema.testdomain)')
-
- con.execute('CREATE TABLE enum_test (id integer, data enumdomain)')
-
- @classmethod
- def teardown_class(cls):
- con = testing.db.connect()
- con.execute('DROP TABLE testtable')
- con.execute('DROP TABLE test_schema.testtable')
- con.execute('DROP TABLE crosschema')
- con.execute('DROP DOMAIN testdomain')
- con.execute('DROP DOMAIN test_schema.testdomain')
- con.execute("DROP TABLE enum_test")
- con.execute("DROP DOMAIN enumdomain")
- con.execute("DROP TYPE testtype")
-
- def test_table_is_reflected(self):
- metadata = MetaData(testing.db)
- table = Table('testtable', metadata, autoload=True)
- eq_(set(table.columns.keys()), set(['question', 'answer']),
- "Columns of reflected table didn't equal expected columns")
- assert isinstance(table.c.answer.type, Integer)
-
- def test_domain_is_reflected(self):
- metadata = MetaData(testing.db)
- table = Table('testtable', metadata, autoload=True)
- eq_(str(table.columns.answer.server_default.arg), '42',
- "Reflected default value didn't equal expected value")
- assert not table.columns.answer.nullable, \
- 'Expected reflected column to not be nullable.'
-
- def test_enum_domain_is_reflected(self):
- metadata = MetaData(testing.db)
- table = Table('enum_test', metadata, autoload=True)
- eq_(
- table.c.data.type.enums,
- ('test', )
- )
-
- def test_table_is_reflected_test_schema(self):
- metadata = MetaData(testing.db)
- table = Table('testtable', metadata, autoload=True,
- schema='test_schema')
- eq_(set(table.columns.keys()), set(['question', 'answer',
- 'anything']),
- "Columns of reflected table didn't equal expected columns")
- assert isinstance(table.c.anything.type, Integer)
-
- def test_schema_domain_is_reflected(self):
- metadata = MetaData(testing.db)
- table = Table('testtable', metadata, autoload=True,
- schema='test_schema')
- eq_(str(table.columns.answer.server_default.arg), '0',
- "Reflected default value didn't equal expected value")
- assert table.columns.answer.nullable, \
- 'Expected reflected column to be nullable.'
-
- def test_crosschema_domain_is_reflected(self):
- metadata = MetaData(testing.db)
- table = Table('crosschema', metadata, autoload=True)
- eq_(str(table.columns.answer.server_default.arg), '0',
- "Reflected default value didn't equal expected value")
- assert table.columns.answer.nullable, \
- 'Expected reflected column to be nullable.'
-
- def test_unknown_types(self):
- from sqlalchemy.databases import postgresql
- ischema_names = postgresql.PGDialect.ischema_names
- postgresql.PGDialect.ischema_names = {}
- try:
- m2 = MetaData(testing.db)
- assert_raises(exc.SAWarning, Table, 'testtable', m2,
- autoload=True)
-
- @testing.emits_warning('Did not recognize type')
- def warns():
- m3 = MetaData(testing.db)
- t3 = Table('testtable', m3, autoload=True)
- assert t3.c.answer.type.__class__ == sa.types.NullType
- finally:
- postgresql.PGDialect.ischema_names = ischema_names
-
-class DistinctOnTest(fixtures.TestBase, AssertsCompiledSQL):
- """Test 'DISTINCT' with SQL expression language and orm.Query with
- an emphasis on PG's 'DISTINCT ON' syntax.
-
- """
- __dialect__ = postgresql.dialect()
-
- def setup(self):
- self.table = Table('t', MetaData(),
- Column('id',Integer, primary_key=True),
- Column('a', String),
- Column('b', String),
- )
-
- def test_plain_generative(self):
- self.assert_compile(
- select([self.table]).distinct(),
- "SELECT DISTINCT t.id, t.a, t.b FROM t"
- )
-
- def test_on_columns_generative(self):
- self.assert_compile(
- select([self.table]).distinct(self.table.c.a),
- "SELECT DISTINCT ON (t.a) t.id, t.a, t.b FROM t"
- )
-
- def test_on_columns_generative_multi_call(self):
- self.assert_compile(
- select([self.table]).distinct(self.table.c.a).
- distinct(self.table.c.b),
- "SELECT DISTINCT ON (t.a, t.b) t.id, t.a, t.b FROM t"
- )
-
- def test_plain_inline(self):
- self.assert_compile(
- select([self.table], distinct=True),
- "SELECT DISTINCT t.id, t.a, t.b FROM t"
- )
-
- def test_on_columns_inline_list(self):
- self.assert_compile(
- select([self.table],
- distinct=[self.table.c.a, self.table.c.b]).
- order_by(self.table.c.a, self.table.c.b),
- "SELECT DISTINCT ON (t.a, t.b) t.id, "
- "t.a, t.b FROM t ORDER BY t.a, t.b"
- )
-
- def test_on_columns_inline_scalar(self):
- self.assert_compile(
- select([self.table], distinct=self.table.c.a),
- "SELECT DISTINCT ON (t.a) t.id, t.a, t.b FROM t"
- )
-
- def test_query_plain(self):
- sess = Session()
- self.assert_compile(
- sess.query(self.table).distinct(),
- "SELECT DISTINCT t.id AS t_id, t.a AS t_a, "
- "t.b AS t_b FROM t"
- )
-
- def test_query_on_columns(self):
- sess = Session()
- self.assert_compile(
- sess.query(self.table).distinct(self.table.c.a),
- "SELECT DISTINCT ON (t.a) t.id AS t_id, t.a AS t_a, "
- "t.b AS t_b FROM t"
- )
-
- def test_query_on_columns_multi_call(self):
- sess = Session()
- self.assert_compile(
- sess.query(self.table).distinct(self.table.c.a).
- distinct(self.table.c.b),
- "SELECT DISTINCT ON (t.a, t.b) t.id AS t_id, t.a AS t_a, "
- "t.b AS t_b FROM t"
- )
-
- def test_query_on_columns_subquery(self):
- sess = Session()
- class Foo(object):
- pass
- mapper(Foo, self.table)
- sess = Session()
- self.assert_compile(
- sess.query(Foo).from_self().distinct(Foo.a, Foo.b),
- "SELECT DISTINCT ON (anon_1.t_a, anon_1.t_b) anon_1.t_id "
- "AS anon_1_t_id, anon_1.t_a AS anon_1_t_a, anon_1.t_b "
- "AS anon_1_t_b FROM (SELECT t.id AS t_id, t.a AS t_a, "
- "t.b AS t_b FROM t) AS anon_1"
- )
-
- def test_query_distinct_on_aliased(self):
- class Foo(object):
- pass
- mapper(Foo, self.table)
- a1 = aliased(Foo)
- sess = Session()
- self.assert_compile(
- sess.query(a1).distinct(a1.a),
- "SELECT DISTINCT ON (t_1.a) t_1.id AS t_1_id, "
- "t_1.a AS t_1_a, t_1.b AS t_1_b FROM t AS t_1"
- )
-
- def test_distinct_on_subquery_anon(self):
-
- sq = select([self.table]).alias()
- q = select([self.table.c.id,sq.c.id]).\
- distinct(sq.c.id).\
- where(self.table.c.id==sq.c.id)
-
- self.assert_compile(
- q,
- "SELECT DISTINCT ON (anon_1.id) t.id, anon_1.id "
- "FROM t, (SELECT t.id AS id, t.a AS a, t.b "
- "AS b FROM t) AS anon_1 WHERE t.id = anon_1.id"
- )
-
- def test_distinct_on_subquery_named(self):
- sq = select([self.table]).alias('sq')
- q = select([self.table.c.id,sq.c.id]).\
- distinct(sq.c.id).\
- where(self.table.c.id==sq.c.id)
- self.assert_compile(
- q,
- "SELECT DISTINCT ON (sq.id) t.id, sq.id "
- "FROM t, (SELECT t.id AS id, t.a AS a, "
- "t.b AS b FROM t) AS sq WHERE t.id = sq.id"
- )
-
-class ReflectionTest(fixtures.TestBase):
- __only_on__ = 'postgresql'
-
- @testing.fails_if(('postgresql', '<', (8, 4)),
- "newer query is bypassed due to unsupported SQL functions")
- @testing.provide_metadata
- def test_reflected_primary_key_order(self):
- meta1 = self.metadata
- subject = Table('subject', meta1,
- Column('p1', Integer, primary_key=True),
- Column('p2', Integer, primary_key=True),
- PrimaryKeyConstraint('p2', 'p1')
- )
- meta1.create_all()
- meta2 = MetaData(testing.db)
- subject = Table('subject', meta2, autoload=True)
- eq_(subject.primary_key.columns.keys(), ['p2', 'p1'])
-
- @testing.provide_metadata
- def test_pg_weirdchar_reflection(self):
- meta1 = self.metadata
- subject = Table('subject', meta1, Column('id$', Integer,
- primary_key=True))
- referer = Table('referer', meta1, Column('id', Integer,
- primary_key=True), Column('ref', Integer,
- ForeignKey('subject.id$')))
- meta1.create_all()
- meta2 = MetaData(testing.db)
- subject = Table('subject', meta2, autoload=True)
- referer = Table('referer', meta2, autoload=True)
- self.assert_((subject.c['id$']
- == referer.c.ref).compare(
- subject.join(referer).onclause))
-
- @testing.provide_metadata
- def test_renamed_sequence_reflection(self):
- metadata = self.metadata
- t = Table('t', metadata, Column('id', Integer, primary_key=True))
- metadata.create_all()
- m2 = MetaData(testing.db)
- t2 = Table('t', m2, autoload=True, implicit_returning=False)
- eq_(t2.c.id.server_default.arg.text,
- "nextval('t_id_seq'::regclass)")
- r = t2.insert().execute()
- eq_(r.inserted_primary_key, [1])
- testing.db.connect().execution_options(autocommit=True).\
- execute('alter table t_id_seq rename to foobar_id_seq'
- )
- m3 = MetaData(testing.db)
- t3 = Table('t', m3, autoload=True, implicit_returning=False)
- eq_(t3.c.id.server_default.arg.text,
- "nextval('foobar_id_seq'::regclass)")
- r = t3.insert().execute()
- eq_(r.inserted_primary_key, [2])
-
- @testing.provide_metadata
- def test_renamed_pk_reflection(self):
- metadata = self.metadata
- t = Table('t', metadata, Column('id', Integer, primary_key=True))
- metadata.create_all()
- testing.db.connect().execution_options(autocommit=True).\
- execute('alter table t rename id to t_id')
- m2 = MetaData(testing.db)
- t2 = Table('t', m2, autoload=True)
- eq_([c.name for c in t2.primary_key], ['t_id'])
-
- @testing.provide_metadata
- def test_schema_reflection(self):
- """note: this test requires that the 'test_schema' schema be
- separate and accessible by the test user"""
-
- meta1 = self.metadata
-
- users = Table('users', meta1, Column('user_id', Integer,
- primary_key=True), Column('user_name',
- String(30), nullable=False), schema='test_schema')
- addresses = Table(
- 'email_addresses',
- meta1,
- Column('address_id', Integer, primary_key=True),
- Column('remote_user_id', Integer,
- ForeignKey(users.c.user_id)),
- Column('email_address', String(20)),
- schema='test_schema',
- )
- meta1.create_all()
- meta2 = MetaData(testing.db)
- addresses = Table('email_addresses', meta2, autoload=True,
- schema='test_schema')
- users = Table('users', meta2, mustexist=True,
- schema='test_schema')
- j = join(users, addresses)
- self.assert_((users.c.user_id
- == addresses.c.remote_user_id).compare(j.onclause))
-
- @testing.provide_metadata
- def test_schema_reflection_2(self):
- meta1 = self.metadata
- subject = Table('subject', meta1, Column('id', Integer,
- primary_key=True))
- referer = Table('referer', meta1, Column('id', Integer,
- primary_key=True), Column('ref', Integer,
- ForeignKey('subject.id')), schema='test_schema')
- meta1.create_all()
- meta2 = MetaData(testing.db)
- subject = Table('subject', meta2, autoload=True)
- referer = Table('referer', meta2, schema='test_schema',
- autoload=True)
- self.assert_((subject.c.id
- == referer.c.ref).compare(
- subject.join(referer).onclause))
-
- @testing.provide_metadata
- def test_schema_reflection_3(self):
- meta1 = self.metadata
- subject = Table('subject', meta1, Column('id', Integer,
- primary_key=True), schema='test_schema_2')
- referer = Table('referer', meta1, Column('id', Integer,
- primary_key=True), Column('ref', Integer,
- ForeignKey('test_schema_2.subject.id')),
- schema='test_schema')
- meta1.create_all()
- meta2 = MetaData(testing.db)
- subject = Table('subject', meta2, autoload=True,
- schema='test_schema_2')
- referer = Table('referer', meta2, schema='test_schema',
- autoload=True)
- self.assert_((subject.c.id
- == referer.c.ref).compare(
- subject.join(referer).onclause))
-
- @testing.provide_metadata
- def test_uppercase_lowercase_table(self):
- metadata = self.metadata
-
- a_table = Table('a', metadata, Column('x', Integer))
- A_table = Table('A', metadata, Column('x', Integer))
-
- a_table.create()
- assert testing.db.has_table("a")
- assert not testing.db.has_table("A")
- A_table.create(checkfirst=True)
- assert testing.db.has_table("A")
-
- def test_uppercase_lowercase_sequence(self):
-
- a_seq = Sequence('a')
- A_seq = Sequence('A')
-
- a_seq.create(testing.db)
- assert testing.db.dialect.has_sequence(testing.db, "a")
- assert not testing.db.dialect.has_sequence(testing.db, "A")
- A_seq.create(testing.db, checkfirst=True)
- assert testing.db.dialect.has_sequence(testing.db, "A")
-
- a_seq.drop(testing.db)
- A_seq.drop(testing.db)
-
- def test_schema_reflection_multi_search_path(self):
- """test the 'set the same schema' rule when
- multiple schemas/search paths are in effect."""
-
- db = engines.testing_engine()
- conn = db.connect()
- trans = conn.begin()
- try:
- conn.execute("set search_path to test_schema_2, "
- "test_schema, public")
- conn.dialect.default_schema_name = "test_schema_2"
-
- conn.execute("""
- CREATE TABLE test_schema.some_table (
- id SERIAL not null primary key
- )
- """)
-
- conn.execute("""
- CREATE TABLE test_schema_2.some_other_table (
- id SERIAL not null primary key,
- sid INTEGER REFERENCES test_schema.some_table(id)
- )
- """)
-
- m1 = MetaData()
-
- t2_schema = Table('some_other_table',
- m1,
- schema="test_schema_2",
- autoload=True,
- autoload_with=conn)
- t1_schema = Table('some_table',
- m1,
- schema="test_schema",
- autoload=True,
- autoload_with=conn)
-
- t2_no_schema = Table('some_other_table',
- m1,
- autoload=True,
- autoload_with=conn)
-
- t1_no_schema = Table('some_table',
- m1,
- autoload=True,
- autoload_with=conn)
-
- # OK, this because, "test_schema" is
- # in the search path, and might as well be
- # the default too. why would we assign
- # a "schema" to the Table ?
- assert t2_schema.c.sid.references(
- t1_no_schema.c.id)
-
- assert t2_no_schema.c.sid.references(
- t1_no_schema.c.id)
-
- finally:
- trans.rollback()
- conn.close()
- db.dispose()
-
- @testing.provide_metadata
- def test_index_reflection(self):
- """ Reflecting partial & expression-based indexes should warn
- """
-
- metadata = self.metadata
-
- t1 = Table('party', metadata, Column('id', String(10),
- nullable=False), Column('name', String(20),
- index=True), Column('aname', String(20)))
- metadata.create_all()
- testing.db.execute("""
- create index idx1 on party ((id || name))
- """)
- testing.db.execute("""
- create unique index idx2 on party (id) where name = 'test'
- """)
- testing.db.execute("""
- create index idx3 on party using btree
- (lower(name::text), lower(aname::text))
- """)
-
- def go():
- m2 = MetaData(testing.db)
- t2 = Table('party', m2, autoload=True)
- assert len(t2.indexes) == 2
-
- # Make sure indexes are in the order we expect them in
-
- tmp = [(idx.name, idx) for idx in t2.indexes]
- tmp.sort()
- r1, r2 = [idx[1] for idx in tmp]
- assert r1.name == 'idx2'
- assert r1.unique == True
- assert r2.unique == False
- assert [t2.c.id] == r1.columns
- assert [t2.c.name] == r2.columns
-
- testing.assert_warnings(go,
- [
- 'Skipped unsupported reflection of '
- 'expression-based index idx1',
- 'Predicate of partial index idx2 ignored during '
- 'reflection',
- 'Skipped unsupported reflection of '
- 'expression-based index idx3'
- ])
-
- @testing.provide_metadata
- def test_index_reflection_modified(self):
- """reflect indexes when a column name has changed - PG 9
- does not update the name of the column in the index def.
- [ticket:2141]
-
- """
-
- metadata = self.metadata
-
- t1 = Table('t', metadata,
- Column('id', Integer, primary_key=True),
- Column('x', Integer)
- )
- metadata.create_all()
- conn = testing.db.connect().execution_options(autocommit=True)
- conn.execute("CREATE INDEX idx1 ON t (x)")
- conn.execute("ALTER TABLE t RENAME COLUMN x to y")
-
- ind = testing.db.dialect.get_indexes(conn, "t", None)
- eq_(ind, [{'unique': False, 'column_names': ['y'], 'name': 'idx1'}])
- conn.close()
-
-class CustomTypeReflectionTest(fixtures.TestBase):
-
- class CustomType(object):
- def __init__(self, arg1=None, arg2=None):
- self.arg1 = arg1
- self.arg2 = arg2
-
- ischema_names = None
-
- def setup(self):
- ischema_names = postgresql.PGDialect.ischema_names
- postgresql.PGDialect.ischema_names = ischema_names.copy()
- self.ischema_names = ischema_names
-
- def teardown(self):
- postgresql.PGDialect.ischema_names = self.ischema_names
- self.ischema_names = None
-
- def _assert_reflected(self, dialect):
- for sch, args in [
- ('my_custom_type', (None, None)),
- ('my_custom_type()', (None, None)),
- ('my_custom_type(ARG1)', ('ARG1', None)),
- ('my_custom_type(ARG1, ARG2)', ('ARG1', 'ARG2')),
- ]:
- column_info = dialect._get_column_info(
- 'colname', sch, None, False,
- {}, {}, 'public')
- assert isinstance(column_info['type'], self.CustomType)
- eq_(column_info['type'].arg1, args[0])
- eq_(column_info['type'].arg2, args[1])
-
- def test_clslevel(self):
- postgresql.PGDialect.ischema_names['my_custom_type'] = self.CustomType
- dialect = postgresql.PGDialect()
- self._assert_reflected(dialect)
-
- def test_instancelevel(self):
- dialect = postgresql.PGDialect()
- dialect.ischema_names = dialect.ischema_names.copy()
- dialect.ischema_names['my_custom_type'] = self.CustomType
- self._assert_reflected(dialect)
-
-
-class MiscTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
-
- __only_on__ = 'postgresql'
-
- @testing.provide_metadata
- def test_date_reflection(self):
- metadata = self.metadata
- t1 = Table('pgdate', metadata, Column('date1',
- DateTime(timezone=True)), Column('date2',
- DateTime(timezone=False)))
- metadata.create_all()
- m2 = MetaData(testing.db)
- t2 = Table('pgdate', m2, autoload=True)
- assert t2.c.date1.type.timezone is True
- assert t2.c.date2.type.timezone is False
-
- @testing.fails_on('+zxjdbc',
- 'The JDBC driver handles the version parsing')
- def test_version_parsing(self):
-
-
- class MockConn(object):
-
- def __init__(self, res):
- self.res = res
-
- def execute(self, str):
- return self
-
- def scalar(self):
- return self.res
-
-
- for string, version in \
- [('PostgreSQL 8.3.8 on i686-redhat-linux-gnu, compiled by '
- 'GCC gcc (GCC) 4.1.2 20070925 (Red Hat 4.1.2-33)', (8, 3,
- 8)),
- ('PostgreSQL 8.5devel on x86_64-unknown-linux-gnu, '
- 'compiled by GCC gcc (GCC) 4.4.2, 64-bit', (8, 5)),
- ('EnterpriseDB 9.1.2.2 on x86_64-unknown-linux-gnu, '
- 'compiled by gcc (GCC) 4.1.2 20080704 (Red Hat 4.1.2-50), '
- '64-bit', (9, 1, 2))]:
- eq_(testing.db.dialect._get_server_version_info(MockConn(string)),
- version)
-
- @testing.only_on('postgresql+psycopg2', 'psycopg2-specific feature')
- def test_psycopg2_version(self):
- v = testing.db.dialect.psycopg2_version
- assert testing.db.dialect.dbapi.__version__.\
- startswith(".".join(str(x) for x in v))
-
- @testing.only_on('postgresql+psycopg2', 'psycopg2-specific feature')
- def test_notice_logging(self):
- log = logging.getLogger('sqlalchemy.dialects.postgresql')
- buf = logging.handlers.BufferingHandler(100)
- lev = log.level
- log.addHandler(buf)
- log.setLevel(logging.INFO)
- try:
- conn = testing.db.connect()
- trans = conn.begin()
- try:
- conn.execute('create table foo (id serial primary key)')
- finally:
- trans.rollback()
- finally:
- log.removeHandler(buf)
- log.setLevel(lev)
- msgs = ' '.join(b.msg for b in buf.buffer)
- assert 'will create implicit sequence' in msgs
- assert 'will create implicit index' in msgs
-
- @testing.only_on('postgresql+psycopg2', 'psycopg2-specific feature')
- @engines.close_open_connections
- def test_client_encoding(self):
- c = testing.db.connect()
- current_encoding = c.connection.connection.encoding
- c.close()
-
- # attempt to use an encoding that's not
- # already set
- if current_encoding == 'UTF8':
- test_encoding = 'LATIN1'
- else:
- test_encoding = 'UTF8'
-
- e = engines.testing_engine(
- options={'client_encoding':test_encoding}
- )
- c = e.connect()
- eq_(c.connection.connection.encoding, test_encoding)
-
- @testing.only_on('postgresql+psycopg2', 'psycopg2-specific feature')
- @engines.close_open_connections
- def test_autocommit_isolation_level(self):
- extensions = __import__('psycopg2.extensions').extensions
-
- c = testing.db.connect()
- c = c.execution_options(isolation_level='AUTOCOMMIT')
- eq_(c.connection.connection.isolation_level,
- extensions.ISOLATION_LEVEL_AUTOCOMMIT)
-
- @testing.fails_on('+zxjdbc',
- "Can't infer the SQL type to use for an instance "
- "of org.python.core.PyObjectDerived.")
- @testing.fails_on('+pg8000', "Can't determine correct type.")
- def test_extract(self):
- fivedaysago = datetime.datetime.now() \
- - datetime.timedelta(days=5)
- for field, exp in ('year', fivedaysago.year), ('month',
- fivedaysago.month), ('day', fivedaysago.day):
- r = testing.db.execute(select([extract(field, func.now()
- + datetime.timedelta(days=-5))])).scalar()
- eq_(r, exp)
-
- def test_checksfor_sequence(self):
- meta1 = MetaData(testing.db)
- seq = Sequence('fooseq')
- t = Table('mytable', meta1, Column('col1', Integer,
- seq))
- seq.drop()
- try:
- testing.db.execute('CREATE SEQUENCE fooseq')
- t.create(checkfirst=True)
- finally:
- t.drop(checkfirst=True)
-
- def test_schema_roundtrips(self):
- meta = MetaData(testing.db)
- users = Table('users', meta, Column('id', Integer,
- primary_key=True), Column('name', String(50)),
- schema='test_schema')
- users.create()
- try:
- users.insert().execute(id=1, name='name1')
- users.insert().execute(id=2, name='name2')
- users.insert().execute(id=3, name='name3')
- users.insert().execute(id=4, name='name4')
- eq_(users.select().where(users.c.name == 'name2'
- ).execute().fetchall(), [(2, 'name2')])
- eq_(users.select(use_labels=True).where(users.c.name
- == 'name2').execute().fetchall(), [(2, 'name2')])
- users.delete().where(users.c.id == 3).execute()
- eq_(users.select().where(users.c.name == 'name3'
- ).execute().fetchall(), [])
- users.update().where(users.c.name == 'name4'
- ).execute(name='newname')
- eq_(users.select(use_labels=True).where(users.c.id
- == 4).execute().fetchall(), [(4, 'newname')])
- finally:
- users.drop()
-
- def test_preexecute_passivedefault(self):
- """test that when we get a primary key column back from
- reflecting a table which has a default value on it, we pre-
- execute that DefaultClause upon insert."""
-
- try:
- meta = MetaData(testing.db)
- testing.db.execute("""
- CREATE TABLE speedy_users
- (
- speedy_user_id SERIAL PRIMARY KEY,
-
- user_name VARCHAR NOT NULL,
- user_password VARCHAR NOT NULL
- );
- """)
- t = Table('speedy_users', meta, autoload=True)
- r = t.insert().execute(user_name='user',
- user_password='lala')
- assert r.inserted_primary_key == [1]
- l = t.select().execute().fetchall()
- assert l == [(1, 'user', 'lala')]
- finally:
- testing.db.execute('drop table speedy_users')
-
-
- @testing.fails_on('+zxjdbc', 'psycopg2/pg8000 specific assertion')
- @testing.fails_on('pypostgresql',
- 'psycopg2/pg8000 specific assertion')
- def test_numeric_raise(self):
- stmt = text("select cast('hi' as char) as hi", typemap={'hi'
- : Numeric})
- assert_raises(exc.InvalidRequestError, testing.db.execute, stmt)
-
- def test_serial_integer(self):
- for type_, expected in [
- (Integer, 'SERIAL'),
- (BigInteger, 'BIGSERIAL'),
- (SmallInteger, 'SMALLINT'),
- (postgresql.INTEGER, 'SERIAL'),
- (postgresql.BIGINT, 'BIGSERIAL'),
- ]:
- m = MetaData()
-
- t = Table('t', m, Column('c', type_, primary_key=True))
- ddl_compiler = testing.db.dialect.ddl_compiler(testing.db.dialect, schema.CreateTable(t))
- eq_(
- ddl_compiler.get_column_specification(t.c.c),
- "c %s NOT NULL" % expected
- )
-
-class TimezoneTest(fixtures.TestBase):
-
- """Test timezone-aware datetimes.
-
- psycopg will return a datetime with a tzinfo attached to it, if
- postgresql returns it. python then will not let you compare a
- datetime with a tzinfo to a datetime that doesnt have one. this
- test illustrates two ways to have datetime types with and without
- timezone info. """
-
- __only_on__ = 'postgresql'
-
- @classmethod
- def setup_class(cls):
- global tztable, notztable, metadata
- metadata = MetaData(testing.db)
-
- # current_timestamp() in postgresql is assumed to return
- # TIMESTAMP WITH TIMEZONE
-
- tztable = Table('tztable', metadata, Column('id', Integer,
- primary_key=True), Column('date',
- DateTime(timezone=True),
- onupdate=func.current_timestamp()),
- Column('name', String(20)))
- notztable = Table('notztable', metadata, Column('id', Integer,
- primary_key=True), Column('date',
- DateTime(timezone=False),
- onupdate=cast(func.current_timestamp(),
- DateTime(timezone=False))), Column('name',
- String(20)))
- metadata.create_all()
-
- @classmethod
- def teardown_class(cls):
- metadata.drop_all()
-
- @testing.fails_on('postgresql+zxjdbc',
- "XXX: postgresql+zxjdbc doesn't give a tzinfo back")
- def test_with_timezone(self):
-
- # get a date with a tzinfo
-
- somedate = \
- testing.db.connect().scalar(func.current_timestamp().select())
- assert somedate.tzinfo
- tztable.insert().execute(id=1, name='row1', date=somedate)
- row = select([tztable.c.date], tztable.c.id
- == 1).execute().first()
- eq_(row[0], somedate)
- eq_(somedate.tzinfo.utcoffset(somedate),
- row[0].tzinfo.utcoffset(row[0]))
- result = tztable.update(tztable.c.id
- == 1).returning(tztable.c.date).\
- execute(name='newname'
- )
- row = result.first()
- assert row[0] >= somedate
-
- def test_without_timezone(self):
-
- # get a date without a tzinfo
-
- somedate = datetime.datetime( 2005, 10, 20, 11, 52, 0, )
- assert not somedate.tzinfo
- notztable.insert().execute(id=1, name='row1', date=somedate)
- row = select([notztable.c.date], notztable.c.id
- == 1).execute().first()
- eq_(row[0], somedate)
- eq_(row[0].tzinfo, None)
- result = notztable.update(notztable.c.id
- == 1).returning(notztable.c.date).\
- execute(name='newname'
- )
- row = result.first()
- assert row[0] >= somedate
-
-class TimePrecisionTest(fixtures.TestBase, AssertsCompiledSQL):
-
- __dialect__ = postgresql.dialect()
-
- def test_compile(self):
- for type_, expected in [
- (postgresql.TIME(), 'TIME WITHOUT TIME ZONE'),
- (postgresql.TIME(precision=5), 'TIME(5) WITHOUT TIME ZONE'
- ),
- (postgresql.TIME(timezone=True, precision=5),
- 'TIME(5) WITH TIME ZONE'),
- (postgresql.TIMESTAMP(), 'TIMESTAMP WITHOUT TIME ZONE'),
- (postgresql.TIMESTAMP(precision=5),
- 'TIMESTAMP(5) WITHOUT TIME ZONE'),
- (postgresql.TIMESTAMP(timezone=True, precision=5),
- 'TIMESTAMP(5) WITH TIME ZONE'),
- ]:
- self.assert_compile(type_, expected)
-
- @testing.only_on('postgresql', 'DB specific feature')
- @testing.provide_metadata
- def test_reflection(self):
- metadata = self.metadata
- t1 = Table(
- 't1',
- metadata,
- Column('c1', postgresql.TIME()),
- Column('c2', postgresql.TIME(precision=5)),
- Column('c3', postgresql.TIME(timezone=True, precision=5)),
- Column('c4', postgresql.TIMESTAMP()),
- Column('c5', postgresql.TIMESTAMP(precision=5)),
- Column('c6', postgresql.TIMESTAMP(timezone=True,
- precision=5)),
- )
- t1.create()
- m2 = MetaData(testing.db)
- t2 = Table('t1', m2, autoload=True)
- eq_(t2.c.c1.type.precision, None)
- eq_(t2.c.c2.type.precision, 5)
- eq_(t2.c.c3.type.precision, 5)
- eq_(t2.c.c4.type.precision, None)
- eq_(t2.c.c5.type.precision, 5)
- eq_(t2.c.c6.type.precision, 5)
- eq_(t2.c.c1.type.timezone, False)
- eq_(t2.c.c2.type.timezone, False)
- eq_(t2.c.c3.type.timezone, True)
- eq_(t2.c.c4.type.timezone, False)
- eq_(t2.c.c5.type.timezone, False)
- eq_(t2.c.c6.type.timezone, True)
-
-class ArrayTest(fixtures.TablesTest, AssertsExecutionResults):
-
- __only_on__ = 'postgresql'
-
- __unsupported_on__ = 'postgresql+pg8000', 'postgresql+zxjdbc'
-
- @classmethod
- def define_tables(cls, metadata):
-
- class ProcValue(TypeDecorator):
- impl = postgresql.ARRAY(Integer, dimensions=2)
-
- def process_bind_param(self, value, dialect):
- if value is None:
- return None
- return [
- [x + 5 for x in v]
- for v in value
- ]
-
- def process_result_value(self, value, dialect):
- if value is None:
- return None
- return [
- [x - 7 for x in v]
- for v in value
- ]
-
- Table('arrtable', metadata,
- Column('id', Integer, primary_key=True),
- Column('intarr', postgresql.ARRAY(Integer)),
- Column('strarr', postgresql.ARRAY(Unicode())),
- Column('dimarr', ProcValue)
- )
-
- Table('dim_arrtable', metadata,
- Column('id', Integer, primary_key=True),
- Column('intarr', postgresql.ARRAY(Integer, dimensions=1)),
- Column('strarr', postgresql.ARRAY(Unicode(), dimensions=1)),
- Column('dimarr', ProcValue)
- )
-
- def _fixture_456(self, table):
- testing.db.execute(
- table.insert(),
- intarr=[4, 5, 6]
- )
-
- def test_reflect_array_column(self):
- metadata2 = MetaData(testing.db)
- tbl = Table('arrtable', metadata2, autoload=True)
- assert isinstance(tbl.c.intarr.type, postgresql.ARRAY)
- assert isinstance(tbl.c.strarr.type, postgresql.ARRAY)
- assert isinstance(tbl.c.intarr.type.item_type, Integer)
- assert isinstance(tbl.c.strarr.type.item_type, String)
-
- def test_insert_array(self):
- arrtable = self.tables.arrtable
- arrtable.insert().execute(intarr=[1, 2, 3], strarr=[util.u('abc'),
- util.u('def')])
- results = arrtable.select().execute().fetchall()
- eq_(len(results), 1)
- eq_(results[0]['intarr'], [1, 2, 3])
- eq_(results[0]['strarr'], [util.u('abc'), util.u('def')])
-
- def test_array_where(self):
- arrtable = self.tables.arrtable
- arrtable.insert().execute(intarr=[1, 2, 3], strarr=[util.u('abc'),
- util.u('def')])
- arrtable.insert().execute(intarr=[4, 5, 6], strarr=util.u('ABC'))
- results = arrtable.select().where(arrtable.c.intarr == [1, 2,
- 3]).execute().fetchall()
- eq_(len(results), 1)
- eq_(results[0]['intarr'], [1, 2, 3])
-
- def test_array_concat(self):
- arrtable = self.tables.arrtable
- arrtable.insert().execute(intarr=[1, 2, 3],
- strarr=[util.u('abc'), util.u('def')])
- results = select([arrtable.c.intarr + [4, 5,
- 6]]).execute().fetchall()
- eq_(len(results), 1)
- eq_(results[0][0], [ 1, 2, 3, 4, 5, 6, ])
-
- def test_array_subtype_resultprocessor(self):
- arrtable = self.tables.arrtable
- arrtable.insert().execute(intarr=[4, 5, 6],
- strarr=[[util.ue('m\xe4\xe4')], [
- util.ue('m\xf6\xf6')]])
- arrtable.insert().execute(intarr=[1, 2, 3], strarr=[
- util.ue('m\xe4\xe4'), util.ue('m\xf6\xf6')])
- results = \
- arrtable.select(order_by=[arrtable.c.intarr]).execute().fetchall()
- eq_(len(results), 2)
- eq_(results[0]['strarr'], [util.ue('m\xe4\xe4'), util.ue('m\xf6\xf6')])
- eq_(results[1]['strarr'], [[util.ue('m\xe4\xe4')], [util.ue('m\xf6\xf6')]])
-
- def test_array_literal(self):
- eq_(
- testing.db.scalar(
- select([
- postgresql.array([1, 2]) + postgresql.array([3, 4, 5])
- ])
- ), [1,2,3,4,5]
- )
-
- def test_array_getitem_single_type(self):
- arrtable = self.tables.arrtable
- is_(arrtable.c.intarr[1].type._type_affinity, Integer)
- is_(arrtable.c.strarr[1].type._type_affinity, String)
-
- def test_array_getitem_slice_type(self):
- arrtable = self.tables.arrtable
- is_(arrtable.c.intarr[1:3].type._type_affinity, postgresql.ARRAY)
- is_(arrtable.c.strarr[1:3].type._type_affinity, postgresql.ARRAY)
-
- def test_array_getitem_single_exec(self):
- arrtable = self.tables.arrtable
- self._fixture_456(arrtable)
- eq_(
- testing.db.scalar(select([arrtable.c.intarr[2]])),
- 5
- )
- testing.db.execute(
- arrtable.update().values({arrtable.c.intarr[2]: 7})
- )
- eq_(
- testing.db.scalar(select([arrtable.c.intarr[2]])),
- 7
- )
-
- def test_undim_array_empty(self):
- arrtable = self.tables.arrtable
- self._fixture_456(arrtable)
- eq_(
- testing.db.scalar(
- select([arrtable.c.intarr]).
- where(arrtable.c.intarr.contains([]))
- ),
- [4, 5, 6]
- )
-
- def test_array_getitem_slice_exec(self):
- arrtable = self.tables.arrtable
- testing.db.execute(
- arrtable.insert(),
- intarr=[4, 5, 6],
- strarr=[util.u('abc'), util.u('def')]
- )
- eq_(
- testing.db.scalar(select([arrtable.c.intarr[2:3]])),
- [5, 6]
- )
- testing.db.execute(
- arrtable.update().values({arrtable.c.intarr[2:3]: [7, 8]})
- )
- eq_(
- testing.db.scalar(select([arrtable.c.intarr[2:3]])),
- [7, 8]
- )
-
-
- def _test_undim_array_contains_typed_exec(self, struct):
- arrtable = self.tables.arrtable
- self._fixture_456(arrtable)
- eq_(
- testing.db.scalar(
- select([arrtable.c.intarr]).
- where(arrtable.c.intarr.contains(struct([4, 5])))
- ),
- [4, 5, 6]
- )
-
- def test_undim_array_contains_set_exec(self):
- self._test_undim_array_contains_typed_exec(set)
-
- def test_undim_array_contains_list_exec(self):
- self._test_undim_array_contains_typed_exec(list)
-
- def test_undim_array_contains_generator_exec(self):
- self._test_undim_array_contains_typed_exec(
- lambda elem: (x for x in elem))
-
- def _test_dim_array_contains_typed_exec(self, struct):
- dim_arrtable = self.tables.dim_arrtable
- self._fixture_456(dim_arrtable)
- eq_(
- testing.db.scalar(
- select([dim_arrtable.c.intarr]).
- where(dim_arrtable.c.intarr.contains(struct([4, 5])))
- ),
- [4, 5, 6]
- )
-
- def test_dim_array_contains_set_exec(self):
- self._test_dim_array_contains_typed_exec(set)
-
- def test_dim_array_contains_list_exec(self):
- self._test_dim_array_contains_typed_exec(list)
-
- def test_dim_array_contains_generator_exec(self):
- self._test_dim_array_contains_typed_exec(lambda elem: (x for x in elem))
-
- def test_array_contained_by_exec(self):
- arrtable = self.tables.arrtable
- with testing.db.connect() as conn:
- conn.execute(
- arrtable.insert(),
- intarr=[6, 5, 4]
- )
- eq_(
- conn.scalar(
- select([arrtable.c.intarr.contained_by([4, 5, 6, 7])])
- ),
- True
- )
-
- def test_array_overlap_exec(self):
- arrtable = self.tables.arrtable
- with testing.db.connect() as conn:
- conn.execute(
- arrtable.insert(),
- intarr=[4, 5, 6]
- )
- eq_(
- conn.scalar(
- select([arrtable.c.intarr]).
- where(arrtable.c.intarr.overlap([7, 6]))
- ),
- [4, 5, 6]
- )
-
- def test_array_any_exec(self):
- arrtable = self.tables.arrtable
- with testing.db.connect() as conn:
- conn.execute(
- arrtable.insert(),
- intarr=[4, 5, 6]
- )
- eq_(
- conn.scalar(
- select([arrtable.c.intarr]).
- where(postgresql.Any(5, arrtable.c.intarr))
- ),
- [4, 5, 6]
- )
-
- def test_array_all_exec(self):
- arrtable = self.tables.arrtable
- with testing.db.connect() as conn:
- conn.execute(
- arrtable.insert(),
- intarr=[4, 5, 6]
- )
- eq_(
- conn.scalar(
- select([arrtable.c.intarr]).
- where(arrtable.c.intarr.all(4, operator=operators.le))
- ),
- [4, 5, 6]
- )
-
-
- @testing.provide_metadata
- def test_tuple_flag(self):
- metadata = self.metadata
-
- t1 = Table('t1', metadata,
- Column('id', Integer, primary_key=True),
- Column('data', postgresql.ARRAY(String(5), as_tuple=True)),
- Column('data2', postgresql.ARRAY(Numeric(asdecimal=False), as_tuple=True)),
- )
- metadata.create_all()
- testing.db.execute(t1.insert(), id=1, data=["1","2","3"], data2=[5.4, 5.6])
- testing.db.execute(t1.insert(), id=2, data=["4", "5", "6"], data2=[1.0])
- testing.db.execute(t1.insert(), id=3, data=[["4", "5"], ["6", "7"]],
- data2=[[5.4, 5.6], [1.0, 1.1]])
-
- r = testing.db.execute(t1.select().order_by(t1.c.id)).fetchall()
- eq_(
- r,
- [
- (1, ('1', '2', '3'), (5.4, 5.6)),
- (2, ('4', '5', '6'), (1.0,)),
- (3, (('4', '5'), ('6', '7')), ((5.4, 5.6), (1.0, 1.1)))
- ]
- )
- # hashable
- eq_(
- set(row[1] for row in r),
- set([('1', '2', '3'), ('4', '5', '6'), (('4', '5'), ('6', '7'))])
- )
-
- def test_dimension(self):
- arrtable = self.tables.arrtable
- testing.db.execute(arrtable.insert(), dimarr=[[1, 2, 3], [4,5, 6]])
- eq_(
- testing.db.scalar(select([arrtable.c.dimarr])),
- [[-1, 0, 1], [2, 3, 4]]
- )
-
-class TimestampTest(fixtures.TestBase, AssertsExecutionResults):
- __only_on__ = 'postgresql'
-
- def test_timestamp(self):
- engine = testing.db
- connection = engine.connect()
-
- s = select(["timestamp '2007-12-25'"])
- result = connection.execute(s).first()
- eq_(result[0], datetime.datetime(2007, 12, 25, 0, 0))
-
-class ServerSideCursorsTest(fixtures.TestBase, AssertsExecutionResults):
-
- __only_on__ = 'postgresql+psycopg2'
-
- def _fixture(self, server_side_cursors):
- self.engine = engines.testing_engine(
- options={'server_side_cursors':server_side_cursors}
- )
- return self.engine
-
- def tearDown(self):
- engines.testing_reaper.close_all()
- self.engine.dispose()
-
- def test_global_string(self):
- engine = self._fixture(True)
- result = engine.execute('select 1')
- assert result.cursor.name
-
- def test_global_text(self):
- engine = self._fixture(True)
- result = engine.execute(text('select 1'))
- assert result.cursor.name
-
- def test_global_expr(self):
- engine = self._fixture(True)
- result = engine.execute(select([1]))
- assert result.cursor.name
-
- def test_global_off_explicit(self):
- engine = self._fixture(False)
- result = engine.execute(text('select 1'))
-
- # It should be off globally ...
-
- assert not result.cursor.name
-
- def test_stmt_option(self):
- engine = self._fixture(False)
-
- s = select([1]).execution_options(stream_results=True)
- result = engine.execute(s)
-
- # ... but enabled for this one.
-
- assert result.cursor.name
-
-
- def test_conn_option(self):
- engine = self._fixture(False)
-
- # and this one
- result = \
- engine.connect().execution_options(stream_results=True).\
- execute('select 1'
- )
- assert result.cursor.name
-
- def test_stmt_enabled_conn_option_disabled(self):
- engine = self._fixture(False)
-
- s = select([1]).execution_options(stream_results=True)
-
- # not this one
- result = \
- engine.connect().execution_options(stream_results=False).\
- execute(s)
- assert not result.cursor.name
-
- def test_stmt_option_disabled(self):
- engine = self._fixture(True)
- s = select([1]).execution_options(stream_results=False)
- result = engine.execute(s)
- assert not result.cursor.name
-
- def test_aliases_and_ss(self):
- engine = self._fixture(False)
- s1 = select([1]).execution_options(stream_results=True).alias()
- result = engine.execute(s1)
- assert result.cursor.name
-
- # s1's options shouldn't affect s2 when s2 is used as a
- # from_obj.
- s2 = select([1], from_obj=s1)
- result = engine.execute(s2)
- assert not result.cursor.name
-
- def test_for_update_expr(self):
- engine = self._fixture(True)
- s1 = select([1], for_update=True)
- result = engine.execute(s1)
- assert result.cursor.name
-
- def test_for_update_string(self):
- engine = self._fixture(True)
- result = engine.execute('SELECT 1 FOR UPDATE')
- assert result.cursor.name
-
- def test_text_no_ss(self):
- engine = self._fixture(False)
- s = text('select 42')
- result = engine.execute(s)
- assert not result.cursor.name
-
- def test_text_ss_option(self):
- engine = self._fixture(False)
- s = text('select 42').execution_options(stream_results=True)
- result = engine.execute(s)
- assert result.cursor.name
-
- def test_roundtrip(self):
- engine = self._fixture(True)
- test_table = Table('test_table', MetaData(engine),
- Column('id', Integer, primary_key=True),
- Column('data', String(50)))
- test_table.create(checkfirst=True)
- try:
- test_table.insert().execute(data='data1')
- nextid = engine.execute(Sequence('test_table_id_seq'))
- test_table.insert().execute(id=nextid, data='data2')
- eq_(test_table.select().execute().fetchall(), [(1, 'data1'
- ), (2, 'data2')])
- test_table.update().where(test_table.c.id
- == 2).values(data=test_table.c.data + ' updated'
- ).execute()
- eq_(test_table.select().execute().fetchall(), [(1, 'data1'
- ), (2, 'data2 updated')])
- test_table.delete().execute()
- eq_(test_table.count().scalar(), 0)
- finally:
- test_table.drop(checkfirst=True)
-
-class SpecialTypesTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL):
- """test DDL and reflection of PG-specific types """
-
- __only_on__ = 'postgresql'
- __excluded_on__ = (('postgresql', '<', (8, 3, 0)),)
-
- @classmethod
- def setup_class(cls):
- global metadata, table
- metadata = MetaData(testing.db)
-
- # create these types so that we can issue
- # special SQL92 INTERVAL syntax
- class y2m(types.UserDefinedType, postgresql.INTERVAL):
- def get_col_spec(self):
- return "INTERVAL YEAR TO MONTH"
-
- class d2s(types.UserDefinedType, postgresql.INTERVAL):
- def get_col_spec(self):
- return "INTERVAL DAY TO SECOND"
-
- table = Table('sometable', metadata,
- Column('id', postgresql.UUID, primary_key=True),
- Column('flag', postgresql.BIT),
- Column('bitstring', postgresql.BIT(4)),
- Column('addr', postgresql.INET),
- Column('addr2', postgresql.MACADDR),
- Column('addr3', postgresql.CIDR),
- Column('doubleprec', postgresql.DOUBLE_PRECISION),
- Column('plain_interval', postgresql.INTERVAL),
- Column('year_interval', y2m()),
- Column('month_interval', d2s()),
- Column('precision_interval', postgresql.INTERVAL(precision=3))
- )
-
- metadata.create_all()
-
- # cheat so that the "strict type check"
- # works
- table.c.year_interval.type = postgresql.INTERVAL()
- table.c.month_interval.type = postgresql.INTERVAL()
-
- @classmethod
- def teardown_class(cls):
- metadata.drop_all()
-
- def test_reflection(self):
- m = MetaData(testing.db)
- t = Table('sometable', m, autoload=True)
-
- self.assert_tables_equal(table, t, strict_types=True)
- assert t.c.plain_interval.type.precision is None
- assert t.c.precision_interval.type.precision == 3
- assert t.c.bitstring.type.length == 4
-
- def test_bit_compile(self):
- pairs = [(postgresql.BIT(), 'BIT(1)'),
- (postgresql.BIT(5), 'BIT(5)'),
- (postgresql.BIT(varying=True), 'BIT VARYING'),
- (postgresql.BIT(5, varying=True), 'BIT VARYING(5)'),
- ]
- for type_, expected in pairs:
- self.assert_compile(type_, expected)
-
- @testing.provide_metadata
- def test_bit_reflection(self):
- metadata = self.metadata
- t1 = Table('t1', metadata,
- Column('bit1', postgresql.BIT()),
- Column('bit5', postgresql.BIT(5)),
- Column('bitvarying', postgresql.BIT(varying=True)),
- Column('bitvarying5', postgresql.BIT(5, varying=True)),
- )
- t1.create()
- m2 = MetaData(testing.db)
- t2 = Table('t1', m2, autoload=True)
- eq_(t2.c.bit1.type.length, 1)
- eq_(t2.c.bit1.type.varying, False)
- eq_(t2.c.bit5.type.length, 5)
- eq_(t2.c.bit5.type.varying, False)
- eq_(t2.c.bitvarying.type.length, None)
- eq_(t2.c.bitvarying.type.varying, True)
- eq_(t2.c.bitvarying5.type.length, 5)
- eq_(t2.c.bitvarying5.type.varying, True)
-
-class UUIDTest(fixtures.TestBase):
- """Test the bind/return values of the UUID type."""
-
- __only_on__ = 'postgresql'
-
- @testing.requires.python25
- @testing.fails_on('postgresql+zxjdbc',
- 'column "data" is of type uuid but expression is of type character varying')
- @testing.fails_on('postgresql+pg8000', 'No support for UUID type')
- def test_uuid_string(self):
- import uuid
- self._test_round_trip(
- Table('utable', MetaData(),
- Column('data', postgresql.UUID())
- ),
- str(uuid.uuid4()),
- str(uuid.uuid4())
- )
-
- @testing.requires.python25
- @testing.fails_on('postgresql+zxjdbc',
- 'column "data" is of type uuid but expression is of type character varying')
- @testing.fails_on('postgresql+pg8000', 'No support for UUID type')
- def test_uuid_uuid(self):
- import uuid
- self._test_round_trip(
- Table('utable', MetaData(),
- Column('data', postgresql.UUID(as_uuid=True))
- ),
- uuid.uuid4(),
- uuid.uuid4()
- )
-
- def test_no_uuid_available(self):
- from sqlalchemy.dialects.postgresql import base
- uuid_type = base._python_UUID
- base._python_UUID = None
- try:
- assert_raises(
- NotImplementedError,
- postgresql.UUID, as_uuid=True
- )
- finally:
- base._python_UUID = uuid_type
-
- def setup(self):
- self.conn = testing.db.connect()
- trans = self.conn.begin()
-
- def teardown(self):
- self.conn.close()
-
- def _test_round_trip(self, utable, value1, value2):
- utable.create(self.conn)
- self.conn.execute(utable.insert(), {'data':value1})
- self.conn.execute(utable.insert(), {'data':value2})
- r = self.conn.execute(
- select([utable.c.data]).
- where(utable.c.data != value1)
- )
- eq_(r.fetchone()[0], value2)
- eq_(r.fetchone(), None)
-
-
-class MatchTest(fixtures.TestBase, AssertsCompiledSQL):
-
- __only_on__ = 'postgresql'
- __excluded_on__ = ('postgresql', '<', (8, 3, 0)),
-
- @classmethod
- def setup_class(cls):
- global metadata, cattable, matchtable
- metadata = MetaData(testing.db)
- cattable = Table('cattable', metadata, Column('id', Integer,
- primary_key=True), Column('description',
- String(50)))
- matchtable = Table('matchtable', metadata, Column('id',
- Integer, primary_key=True), Column('title',
- String(200)), Column('category_id', Integer,
- ForeignKey('cattable.id')))
- metadata.create_all()
- cattable.insert().execute([{'id': 1, 'description': 'Python'},
- {'id': 2, 'description': 'Ruby'}])
- matchtable.insert().execute([{'id': 1, 'title'
- : 'Agile Web Development with Rails'
- , 'category_id': 2},
- {'id': 2,
- 'title': 'Dive Into Python',
- 'category_id': 1},
- {'id': 3, 'title'
- : "Programming Matz's Ruby",
- 'category_id': 2},
- {'id': 4, 'title'
- : 'The Definitive Guide to Django',
- 'category_id': 1},
- {'id': 5, 'title'
- : 'Python in a Nutshell',
- 'category_id': 1}])
-
- @classmethod
- def teardown_class(cls):
- metadata.drop_all()
-
- @testing.fails_on('postgresql+pg8000', 'uses positional')
- @testing.fails_on('postgresql+zxjdbc', 'uses qmark')
- def test_expression_pyformat(self):
- self.assert_compile(matchtable.c.title.match('somstr'),
- 'matchtable.title @@ to_tsquery(%(title_1)s'
- ')')
-
- @testing.fails_on('postgresql+psycopg2', 'uses pyformat')
- @testing.fails_on('postgresql+pypostgresql', 'uses pyformat')
- @testing.fails_on('postgresql+zxjdbc', 'uses qmark')
- def test_expression_positional(self):
- self.assert_compile(matchtable.c.title.match('somstr'),
- 'matchtable.title @@ to_tsquery(%s)')
-
- def test_simple_match(self):
- results = \
- matchtable.select().where(matchtable.c.title.match('python'
- )).order_by(matchtable.c.id).execute().fetchall()
- eq_([2, 5], [r.id for r in results])
-
- def test_simple_match_with_apostrophe(self):
- results = \
- matchtable.select().where(matchtable.c.title.match("Matz's"
- )).execute().fetchall()
- eq_([3], [r.id for r in results])
-
- def test_simple_derivative_match(self):
- results = \
- matchtable.select().where(matchtable.c.title.match('nutshells'
- )).execute().fetchall()
- eq_([5], [r.id for r in results])
-
- def test_or_match(self):
- results1 = \
- matchtable.select().where(or_(matchtable.c.title.match('nutshells'
- ), matchtable.c.title.match('rubies'
- ))).order_by(matchtable.c.id).execute().fetchall()
- eq_([3, 5], [r.id for r in results1])
- results2 = \
- matchtable.select().where(
- matchtable.c.title.match('nutshells | rubies'
- )).order_by(matchtable.c.id).execute().fetchall()
- eq_([3, 5], [r.id for r in results2])
-
- def test_and_match(self):
- results1 = \
- matchtable.select().where(and_(matchtable.c.title.match('python'
- ), matchtable.c.title.match('nutshells'
- ))).execute().fetchall()
- eq_([5], [r.id for r in results1])
- results2 = \
- matchtable.select().where(
- matchtable.c.title.match('python & nutshells'
- )).execute().fetchall()
- eq_([5], [r.id for r in results2])
-
- def test_match_across_joins(self):
- results = matchtable.select().where(and_(cattable.c.id
- == matchtable.c.category_id,
- or_(cattable.c.description.match('Ruby'),
- matchtable.c.title.match('nutshells'
- )))).order_by(matchtable.c.id).execute().fetchall()
- eq_([1, 3, 5], [r.id for r in results])
-
-
-class TupleTest(fixtures.TestBase):
- __only_on__ = 'postgresql'
-
- def test_tuple_containment(self):
-
- for test, exp in [
- ([('a', 'b')], True),
- ([('a', 'c')], False),
- ([('f', 'q'), ('a', 'b')], True),
- ([('f', 'q'), ('a', 'c')], False)
- ]:
- eq_(
- testing.db.execute(
- select([
- tuple_(
- literal_column("'a'"),
- literal_column("'b'")
- ).\
- in_([
- tuple_(*[
- literal_column("'%s'" % letter)
- for letter in elem
- ]) for elem in test
- ])
- ])
- ).scalar(),
- exp
- )
-
-
-class HStoreTest(fixtures.TestBase):
- def _assert_sql(self, construct, expected):
- dialect = postgresql.dialect()
- compiled = str(construct.compile(dialect=dialect))
- compiled = re.sub(r'\s+', ' ', compiled)
- expected = re.sub(r'\s+', ' ', expected)
- eq_(compiled, expected)
-
- def setup(self):
- metadata = MetaData()
- self.test_table = Table('test_table', metadata,
- Column('id', Integer, primary_key=True),
- Column('hash', HSTORE)
- )
- self.hashcol = self.test_table.c.hash
-
- def _test_where(self, whereclause, expected):
- stmt = select([self.test_table]).where(whereclause)
- self._assert_sql(
- stmt,
- "SELECT test_table.id, test_table.hash FROM test_table "
- "WHERE %s" % expected
- )
-
- def _test_cols(self, colclause, expected, from_=True):
- stmt = select([colclause])
- self._assert_sql(
- stmt,
- (
- "SELECT %s" +
- (" FROM test_table" if from_ else "")
- ) % expected
- )
-
- def test_bind_serialize_default(self):
- from sqlalchemy.engine import default
-
- dialect = default.DefaultDialect()
- proc = self.test_table.c.hash.type._cached_bind_processor(dialect)
- eq_(
- proc(util.OrderedDict([("key1", "value1"), ("key2", "value2")])),
- '"key1"=>"value1", "key2"=>"value2"'
- )
-
- def test_bind_serialize_with_slashes_and_quotes(self):
- from sqlalchemy.engine import default
-
- dialect = default.DefaultDialect()
- proc = self.test_table.c.hash.type._cached_bind_processor(dialect)
- eq_(
- proc({'\\"a': '\\"1'}),
- '"\\\\\\"a"=>"\\\\\\"1"'
- )
-
- def test_parse_error(self):
- from sqlalchemy.engine import default
-
- dialect = default.DefaultDialect()
- proc = self.test_table.c.hash.type._cached_result_processor(
- dialect, None)
- assert_raises_message(
- ValueError,
- r'''After u?'\[\.\.\.\], "key1"=>"value1", ', could not parse '''
- '''residual at position 36: u?'crapcrapcrap, "key3"\[\.\.\.\]''',
- proc,
- '"key2"=>"value2", "key1"=>"value1", '
- 'crapcrapcrap, "key3"=>"value3"'
- )
-
- def test_result_deserialize_default(self):
- from sqlalchemy.engine import default
-
- dialect = default.DefaultDialect()
- proc = self.test_table.c.hash.type._cached_result_processor(
- dialect, None)
- eq_(
- proc('"key2"=>"value2", "key1"=>"value1"'),
- {"key1": "value1", "key2": "value2"}
- )
-
- def test_result_deserialize_with_slashes_and_quotes(self):
- from sqlalchemy.engine import default
-
- dialect = default.DefaultDialect()
- proc = self.test_table.c.hash.type._cached_result_processor(
- dialect, None)
- eq_(
- proc('"\\\\\\"a"=>"\\\\\\"1"'),
- {'\\"a': '\\"1'}
- )
-
- def test_bind_serialize_psycopg2(self):
- from sqlalchemy.dialects.postgresql import psycopg2
-
- dialect = psycopg2.PGDialect_psycopg2()
- dialect._has_native_hstore = True
- proc = self.test_table.c.hash.type._cached_bind_processor(dialect)
- is_(proc, None)
-
- dialect = psycopg2.PGDialect_psycopg2()
- dialect._has_native_hstore = False
- proc = self.test_table.c.hash.type._cached_bind_processor(dialect)
- eq_(
- proc(util.OrderedDict([("key1", "value1"), ("key2", "value2")])),
- '"key1"=>"value1", "key2"=>"value2"'
- )
-
- def test_result_deserialize_psycopg2(self):
- from sqlalchemy.dialects.postgresql import psycopg2
-
- dialect = psycopg2.PGDialect_psycopg2()
- dialect._has_native_hstore = True
- proc = self.test_table.c.hash.type._cached_result_processor(
- dialect, None)
- is_(proc, None)
-
- dialect = psycopg2.PGDialect_psycopg2()
- dialect._has_native_hstore = False
- proc = self.test_table.c.hash.type._cached_result_processor(
- dialect, None)
- eq_(
- proc('"key2"=>"value2", "key1"=>"value1"'),
- {"key1": "value1", "key2": "value2"}
- )
-
- def test_where_has_key(self):
- self._test_where(
- # hide from 2to3
- getattr(self.hashcol, 'has_key')('foo'),
- "test_table.hash ? %(hash_1)s"
- )
-
- def test_where_has_all(self):
- self._test_where(
- self.hashcol.has_all(postgresql.array(['1', '2'])),
- "test_table.hash ?& ARRAY[%(param_1)s, %(param_2)s]"
- )
-
- def test_where_has_any(self):
- self._test_where(
- self.hashcol.has_any(postgresql.array(['1', '2'])),
- "test_table.hash ?| ARRAY[%(param_1)s, %(param_2)s]"
- )
-
- def test_where_defined(self):
- self._test_where(
- self.hashcol.defined('foo'),
- "defined(test_table.hash, %(param_1)s)"
- )
-
- def test_where_contains(self):
- self._test_where(
- self.hashcol.contains({'foo': '1'}),
- "test_table.hash @> %(hash_1)s"
- )
-
- def test_where_contained_by(self):
- self._test_where(
- self.hashcol.contained_by({'foo': '1', 'bar': None}),
- "test_table.hash <@ %(hash_1)s"
- )
-
- def test_where_getitem(self):
- self._test_where(
- self.hashcol['bar'] == None,
- "(test_table.hash -> %(hash_1)s) IS NULL"
- )
-
- def test_cols_get(self):
- self._test_cols(
- self.hashcol['foo'],
- "test_table.hash -> %(hash_1)s AS anon_1",
- True
- )
-
- def test_cols_delete_single_key(self):
- self._test_cols(
- self.hashcol.delete('foo'),
- "delete(test_table.hash, %(param_1)s) AS delete_1",
- True
- )
-
- def test_cols_delete_array_of_keys(self):
- self._test_cols(
- self.hashcol.delete(postgresql.array(['foo', 'bar'])),
- ("delete(test_table.hash, ARRAY[%(param_1)s, %(param_2)s]) "
- "AS delete_1"),
- True
- )
-
- def test_cols_delete_matching_pairs(self):
- self._test_cols(
- self.hashcol.delete(hstore('1', '2')),
- ("delete(test_table.hash, hstore(%(param_1)s, %(param_2)s)) "
- "AS delete_1"),
- True
- )
-
- def test_cols_slice(self):
- self._test_cols(
- self.hashcol.slice(postgresql.array(['1', '2'])),
- ("slice(test_table.hash, ARRAY[%(param_1)s, %(param_2)s]) "
- "AS slice_1"),
- True
- )
-
- def test_cols_hstore_pair_text(self):
- self._test_cols(
- hstore('foo', '3')['foo'],
- "hstore(%(param_1)s, %(param_2)s) -> %(hstore_1)s AS anon_1",
- False
- )
-
- def test_cols_hstore_pair_array(self):
- self._test_cols(
- hstore(postgresql.array(['1', '2']),
- postgresql.array(['3', None]))['1'],
- ("hstore(ARRAY[%(param_1)s, %(param_2)s], "
- "ARRAY[%(param_3)s, NULL]) -> %(hstore_1)s AS anon_1"),
- False
- )
-
- def test_cols_hstore_single_array(self):
- self._test_cols(
- hstore(postgresql.array(['1', '2', '3', None]))['3'],
- ("hstore(ARRAY[%(param_1)s, %(param_2)s, %(param_3)s, NULL]) "
- "-> %(hstore_1)s AS anon_1"),
- False
- )
-
- def test_cols_concat(self):
- self._test_cols(
- self.hashcol.concat(hstore(cast(self.test_table.c.id, Text), '3')),
- ("test_table.hash || hstore(CAST(test_table.id AS TEXT), "
- "%(param_1)s) AS anon_1"),
- True
- )
-
- def test_cols_concat_op(self):
- self._test_cols(
- hstore('foo', 'bar') + self.hashcol,
- "hstore(%(param_1)s, %(param_2)s) || test_table.hash AS anon_1",
- True
- )
-
- def test_cols_concat_get(self):
- self._test_cols(
- (self.hashcol + self.hashcol)['foo'],
- "test_table.hash || test_table.hash -> %(param_1)s AS anon_1"
- )
-
- def test_cols_keys(self):
- self._test_cols(
- # hide from 2to3
- getattr(self.hashcol, 'keys')(),
- "akeys(test_table.hash) AS akeys_1",
- True
- )
-
- def test_cols_vals(self):
- self._test_cols(
- self.hashcol.vals(),
- "avals(test_table.hash) AS avals_1",
- True
- )
-
- def test_cols_array(self):
- self._test_cols(
- self.hashcol.array(),
- "hstore_to_array(test_table.hash) AS hstore_to_array_1",
- True
- )
-
- def test_cols_matrix(self):
- self._test_cols(
- self.hashcol.matrix(),
- "hstore_to_matrix(test_table.hash) AS hstore_to_matrix_1",
- True
- )
-
-
-class HStoreRoundTripTest(fixtures.TablesTest):
- __requires__ = 'hstore',
- __dialect__ = 'postgresql'
-
- @classmethod
- def define_tables(cls, metadata):
- Table('data_table', metadata,
- Column('id', Integer, primary_key=True),
- Column('name', String(30), nullable=False),
- Column('data', HSTORE)
- )
-
- def _fixture_data(self, engine):
- data_table = self.tables.data_table
- engine.execute(
- data_table.insert(),
- {'name': 'r1', 'data': {"k1": "r1v1", "k2": "r1v2"}},
- {'name': 'r2', 'data': {"k1": "r2v1", "k2": "r2v2"}},
- {'name': 'r3', 'data': {"k1": "r3v1", "k2": "r3v2"}},
- {'name': 'r4', 'data': {"k1": "r4v1", "k2": "r4v2"}},
- {'name': 'r5', 'data': {"k1": "r5v1", "k2": "r5v2"}},
- )
-
- def _assert_data(self, compare):
- data = testing.db.execute(
- select([self.tables.data_table.c.data]).
- order_by(self.tables.data_table.c.name)
- ).fetchall()
- eq_([d for d, in data], compare)
-
- def _test_insert(self, engine):
- engine.execute(
- self.tables.data_table.insert(),
- {'name': 'r1', 'data': {"k1": "r1v1", "k2": "r1v2"}}
- )
- self._assert_data([{"k1": "r1v1", "k2": "r1v2"}])
-
- def _non_native_engine(self):
- if testing.against("postgresql+psycopg2"):
- engine = engines.testing_engine(options=dict(use_native_hstore=False))
- else:
- engine = testing.db
- engine.connect()
- return engine
-
- def test_reflect(self):
- from sqlalchemy import inspect
- insp = inspect(testing.db)
- cols = insp.get_columns('data_table')
- assert isinstance(cols[2]['type'], HSTORE)
-
- @testing.only_on("postgresql+psycopg2")
- def test_insert_native(self):
- engine = testing.db
- self._test_insert(engine)
-
- def test_insert_python(self):
- engine = self._non_native_engine()
- self._test_insert(engine)
-
- @testing.only_on("postgresql+psycopg2")
- def test_criterion_native(self):
- engine = testing.db
- self._fixture_data(engine)
- self._test_criterion(engine)
-
- def test_criterion_python(self):
- engine = self._non_native_engine()
- self._fixture_data(engine)
- self._test_criterion(engine)
-
- def _test_criterion(self, engine):
- data_table = self.tables.data_table
- result = engine.execute(
- select([data_table.c.data]).where(data_table.c.data['k1'] == 'r3v1')
- ).first()
- eq_(result, ({'k1': 'r3v1', 'k2': 'r3v2'},))
-
- def _test_fixed_round_trip(self, engine):
- s = select([
- hstore(
- array(['key1', 'key2', 'key3']),
- array(['value1', 'value2', 'value3'])
- )
- ])
- eq_(
- engine.scalar(s),
- {"key1": "value1", "key2": "value2", "key3": "value3"}
- )
-
- def test_fixed_round_trip_python(self):
- engine = self._non_native_engine()
- self._test_fixed_round_trip(engine)
-
- @testing.only_on("postgresql+psycopg2")
- def test_fixed_round_trip_native(self):
- engine = testing.db
- self._test_fixed_round_trip(engine)
-
- def _test_unicode_round_trip(self, engine):
- s = select([
- hstore(
- array([util.u('réveillé'), util.u('drôle'), util.u('S’il')]),
- array([util.u('réveillé'), util.u('drôle'), util.u('S’il')])
- )
- ])
- eq_(
- engine.scalar(s),
- {
- util.u('réveillé'): util.u('réveillé'),
- util.u('drôle'): util.u('drôle'),
- util.u('S’il'): util.u('S’il')
- }
- )
-
- def test_unicode_round_trip_python(self):
- engine = self._non_native_engine()
- self._test_unicode_round_trip(engine)
-
- @testing.only_on("postgresql+psycopg2")
- def test_unicode_round_trip_native(self):
- engine = testing.db
- self._test_unicode_round_trip(engine)
-
- def test_escaped_quotes_round_trip_python(self):
- engine = self._non_native_engine()
- self._test_escaped_quotes_round_trip(engine)
-
- @testing.only_on("postgresql+psycopg2")
- def test_escaped_quotes_round_trip_native(self):
- engine = testing.db
- self._test_escaped_quotes_round_trip(engine)
-
- def _test_escaped_quotes_round_trip(self, engine):
- engine.execute(
- self.tables.data_table.insert(),
- {'name': 'r1', 'data': {r'key \"foo\"': r'value \"bar"\ xyz'}}
- )
- self._assert_data([{r'key \"foo\"': r'value \"bar"\ xyz'}])
-
-class _RangeTypeMixin(object):
- __requires__ = 'range_types',
- __dialect__ = 'postgresql+psycopg2'
-
- @property
- def extras(self):
- # done this way so we don't get ImportErrors with
- # older psycopg2 versions.
- from psycopg2 import extras
- return extras
-
- @classmethod
- def define_tables(cls, metadata):
- # no reason ranges shouldn't be primary keys,
- # so lets just use them as such
- table = Table('data_table', metadata,
- Column('range', cls._col_type, primary_key=True),
- )
- cls.col = table.c.range
-
- def test_actual_type(self):
- eq_(str(self._col_type()), self._col_str)
-
- def test_reflect(self):
- from sqlalchemy import inspect
- insp = inspect(testing.db)
- cols = insp.get_columns('data_table')
- assert isinstance(cols[0]['type'], self._col_type)
-
- def _assert_data(self):
- data = testing.db.execute(
- select([self.tables.data_table.c.range])
- ).fetchall()
- eq_(data, [(self._data_obj(), )])
-
- def test_insert_obj(self):
- testing.db.engine.execute(
- self.tables.data_table.insert(),
- {'range': self._data_obj()}
- )
- self._assert_data()
-
- def test_insert_text(self):
- testing.db.engine.execute(
- self.tables.data_table.insert(),
- {'range': self._data_str}
- )
- self._assert_data()
-
- # operator tests
-
- def _test_clause(self, colclause, expected):
- dialect = postgresql.dialect()
- compiled = str(colclause.compile(dialect=dialect))
- eq_(compiled, expected)
-
- def test_where_equal(self):
- self._test_clause(
- self.col==self._data_str,
- "data_table.range = %(range_1)s"
- )
-
- def test_where_not_equal(self):
- self._test_clause(
- self.col!=self._data_str,
- "data_table.range <> %(range_1)s"
- )
-
- def test_where_less_than(self):
- self._test_clause(
- self.col < self._data_str,
- "data_table.range < %(range_1)s"
- )
-
- def test_where_greater_than(self):
- self._test_clause(
- self.col > self._data_str,
- "data_table.range > %(range_1)s"
- )
-
- def test_where_less_than_or_equal(self):
- self._test_clause(
- self.col <= self._data_str,
- "data_table.range <= %(range_1)s"
- )
-
- def test_where_greater_than_or_equal(self):
- self._test_clause(
- self.col >= self._data_str,
- "data_table.range >= %(range_1)s"
- )
-
- def test_contains(self):
- self._test_clause(
- self.col.contains(self._data_str),
- "data_table.range @> %(range_1)s"
- )
-
- def test_contained_by(self):
- self._test_clause(
- self.col.contained_by(self._data_str),
- "data_table.range <@ %(range_1)s"
- )
-
- def test_overlaps(self):
- self._test_clause(
- self.col.overlaps(self._data_str),
- "data_table.range && %(range_1)s"
- )
-
- def test_strictly_left_of(self):
- self._test_clause(
- self.col << self._data_str,
- "data_table.range << %(range_1)s"
- )
- self._test_clause(
- self.col.strictly_left_of(self._data_str),
- "data_table.range << %(range_1)s"
- )
-
- def test_strictly_right_of(self):
- self._test_clause(
- self.col >> self._data_str,
- "data_table.range >> %(range_1)s"
- )
- self._test_clause(
- self.col.strictly_right_of(self._data_str),
- "data_table.range >> %(range_1)s"
- )
-
- def test_not_extend_right_of(self):
- self._test_clause(
- self.col.not_extend_right_of(self._data_str),
- "data_table.range &< %(range_1)s"
- )
-
- def test_not_extend_left_of(self):
- self._test_clause(
- self.col.not_extend_left_of(self._data_str),
- "data_table.range &> %(range_1)s"
- )
-
- def test_adjacent_to(self):
- self._test_clause(
- self.col.adjacent_to(self._data_str),
- "data_table.range -|- %(range_1)s"
- )
-
- def test_union(self):
- self._test_clause(
- self.col + self.col,
- "data_table.range + data_table.range"
- )
-
- def test_union_result(self):
- # insert
- testing.db.engine.execute(
- self.tables.data_table.insert(),
- {'range': self._data_str}
- )
- # select
- range = self.tables.data_table.c.range
- data = testing.db.execute(
- select([range + range])
- ).fetchall()
- eq_(data, [(self._data_obj(), )])
-
-
- def test_intersection(self):
- self._test_clause(
- self.col * self.col,
- "data_table.range * data_table.range"
- )
-
- def test_intersection_result(self):
- # insert
- testing.db.engine.execute(
- self.tables.data_table.insert(),
- {'range': self._data_str}
- )
- # select
- range = self.tables.data_table.c.range
- data = testing.db.execute(
- select([range * range])
- ).fetchall()
- eq_(data, [(self._data_obj(), )])
-
- def test_different(self):
- self._test_clause(
- self.col - self.col,
- "data_table.range - data_table.range"
- )
-
- def test_difference_result(self):
- # insert
- testing.db.engine.execute(
- self.tables.data_table.insert(),
- {'range': self._data_str}
- )
- # select
- range = self.tables.data_table.c.range
- data = testing.db.execute(
- select([range - range])
- ).fetchall()
- eq_(data, [(self._data_obj().__class__(empty=True), )])
-
-class Int4RangeTests(_RangeTypeMixin, fixtures.TablesTest):
-
- _col_type = INT4RANGE
- _col_str = 'INT4RANGE'
- _data_str = '[1,2)'
- def _data_obj(self):
- return self.extras.NumericRange(1, 2)
-
-class Int8RangeTests(_RangeTypeMixin, fixtures.TablesTest):
-
- _col_type = INT8RANGE
- _col_str = 'INT8RANGE'
- _data_str = '[9223372036854775806,9223372036854775807)'
- def _data_obj(self):
- return self.extras.NumericRange(
- 9223372036854775806, 9223372036854775807
- )
-
-class NumRangeTests(_RangeTypeMixin, fixtures.TablesTest):
-
- _col_type = NUMRANGE
- _col_str = 'NUMRANGE'
- _data_str = '[1.0,2.0)'
- def _data_obj(self):
- return self.extras.NumericRange(
- decimal.Decimal('1.0'), decimal.Decimal('2.0')
- )
-
-class DateRangeTests(_RangeTypeMixin, fixtures.TablesTest):
-
- _col_type = DATERANGE
- _col_str = 'DATERANGE'
- _data_str = '[2013-03-23,2013-03-24)'
- def _data_obj(self):
- return self.extras.DateRange(
- datetime.date(2013, 3, 23), datetime.date(2013, 3, 24)
- )
-
-class DateTimeRangeTests(_RangeTypeMixin, fixtures.TablesTest):
-
- _col_type = TSRANGE
- _col_str = 'TSRANGE'
- _data_str = '[2013-03-23 14:30,2013-03-23 23:30)'
- def _data_obj(self):
- return self.extras.DateTimeRange(
- datetime.datetime(2013, 3, 23, 14, 30),
- datetime.datetime(2013, 3, 23, 23, 30)
- )
-
-class DateTimeTZRangeTests(_RangeTypeMixin, fixtures.TablesTest):
-
- _col_type = TSTZRANGE
- _col_str = 'TSTZRANGE'
-
- # make sure we use one, steady timestamp with timezone pair
- # for all parts of all these tests
- _tstzs = None
- def tstzs(self):
- if self._tstzs is None:
- lower = testing.db.connect().scalar(
- func.current_timestamp().select()
- )
- upper = lower+datetime.timedelta(1)
- self._tstzs = (lower, upper)
- return self._tstzs
-
- @property
- def _data_str(self):
- return '[%s,%s)' % self.tstzs()
-
- def _data_obj(self):
- return self.extras.DateTimeTZRange(*self.tstzs())