# coding: utf-8 from sqlalchemy.testing.assertions import eq_, assert_raises, \ assert_raises_message, is_, AssertsExecutionResults, \ AssertsCompiledSQL, ComparesTables from sqlalchemy.testing import engines, fixtures from sqlalchemy import testing import datetime from sqlalchemy import Table, Column, select, MetaData, text, Integer, \ String, Sequence, ForeignKey, join, Numeric, \ PrimaryKeyConstraint, DateTime, tuple_, Float, BigInteger, \ func, literal_column, literal, bindparam, cast, extract, \ SmallInteger, Enum, REAL, update, insert, Index, delete, \ and_, Date, TypeDecorator, Time, Unicode, Interval, or_, Text from sqlalchemy.orm import Session, mapper, aliased from sqlalchemy import exc, schema, types from sqlalchemy.dialects.postgresql import base as postgresql from sqlalchemy.dialects.postgresql import HSTORE, hstore, array, \ INT4RANGE, INT8RANGE, NUMRANGE, DATERANGE, TSRANGE, TSTZRANGE, \ ExcludeConstraint import decimal from sqlalchemy import util from sqlalchemy.testing.util import round_decimal from sqlalchemy.sql import table, column, operators import logging import logging.handlers import re class SequenceTest(fixtures.TestBase, AssertsCompiledSQL): def test_format(self): seq = Sequence('my_seq_no_schema') dialect = postgresql.PGDialect() assert dialect.identifier_preparer.format_sequence(seq) \ == 'my_seq_no_schema' seq = Sequence('my_seq', schema='some_schema') assert dialect.identifier_preparer.format_sequence(seq) \ == 'some_schema.my_seq' seq = Sequence('My_Seq', schema='Some_Schema') assert dialect.identifier_preparer.format_sequence(seq) \ == '"Some_Schema"."My_Seq"' @testing.only_on('postgresql', 'foo') @testing.provide_metadata def test_reverse_eng_name(self): metadata = self.metadata engine = engines.testing_engine(options=dict(implicit_returning=False)) for tname, cname in [ ('tb1' * 30, 'abc'), ('tb2', 'abc' * 30), ('tb3' * 30, 'abc' * 30), ('tb4', 'abc'), ]: t = Table(tname[:57], metadata, Column(cname[:57], Integer, primary_key=True) ) t.create(engine) r = engine.execute(t.insert()) assert r.inserted_primary_key == [1] class CompileTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = postgresql.dialect() def test_update_returning(self): dialect = postgresql.dialect() table1 = table('mytable', column('myid', Integer), column('name' , String(128)), column('description', String(128))) u = update(table1, values=dict(name='foo' )).returning(table1.c.myid, table1.c.name) self.assert_compile(u, 'UPDATE mytable SET name=%(name)s ' 'RETURNING mytable.myid, mytable.name', dialect=dialect) u = update(table1, values=dict(name='foo')).returning(table1) self.assert_compile(u, 'UPDATE mytable SET name=%(name)s ' 'RETURNING mytable.myid, mytable.name, ' 'mytable.description', dialect=dialect) u = update(table1, values=dict(name='foo' )).returning(func.length(table1.c.name)) self.assert_compile(u, 'UPDATE mytable SET name=%(name)s ' 'RETURNING length(mytable.name) AS length_1' , dialect=dialect) def test_insert_returning(self): dialect = postgresql.dialect() table1 = table('mytable', column('myid', Integer), column('name', String(128)), column('description', String(128)), ) i = insert(table1, values=dict(name='foo' )).returning(table1.c.myid, table1.c.name) self.assert_compile(i, 'INSERT INTO mytable (name) VALUES ' '(%(name)s) RETURNING mytable.myid, ' 'mytable.name', dialect=dialect) i = insert(table1, values=dict(name='foo')).returning(table1) self.assert_compile(i, 'INSERT INTO mytable (name) VALUES ' '(%(name)s) RETURNING mytable.myid, ' 'mytable.name, mytable.description', dialect=dialect) i = insert(table1, values=dict(name='foo' )).returning(func.length(table1.c.name)) self.assert_compile(i, 'INSERT INTO mytable (name) VALUES ' '(%(name)s) RETURNING length(mytable.name) ' 'AS length_1', dialect=dialect) def test_create_partial_index(self): m = MetaData() tbl = Table('testtbl', m, Column('data', Integer)) idx = Index('test_idx1', tbl.c.data, postgresql_where=and_(tbl.c.data > 5, tbl.c.data < 10)) idx = Index('test_idx1', tbl.c.data, postgresql_where=and_(tbl.c.data > 5, tbl.c.data < 10)) # test quoting and all that idx2 = Index('test_idx2', tbl.c.data, postgresql_where=and_(tbl.c.data > 'a', tbl.c.data < "b's")) self.assert_compile(schema.CreateIndex(idx), 'CREATE INDEX test_idx1 ON testtbl (data) ' 'WHERE data > 5 AND data < 10', dialect=postgresql.dialect()) self.assert_compile(schema.CreateIndex(idx2), "CREATE INDEX test_idx2 ON testtbl (data) " "WHERE data > 'a' AND data < 'b''s'", dialect=postgresql.dialect()) def test_create_index_with_ops(self): m = MetaData() tbl = Table('testtbl', m, Column('data', String), Column('data2', Integer, key='d2')) idx = Index('test_idx1', tbl.c.data, postgresql_ops={'data': 'text_pattern_ops'}) idx2 = Index('test_idx2', tbl.c.data, tbl.c.d2, postgresql_ops={'data': 'text_pattern_ops', 'd2': 'int4_ops'}) self.assert_compile(schema.CreateIndex(idx), 'CREATE INDEX test_idx1 ON testtbl ' '(data text_pattern_ops)', dialect=postgresql.dialect()) self.assert_compile(schema.CreateIndex(idx2), 'CREATE INDEX test_idx2 ON testtbl ' '(data text_pattern_ops, data2 int4_ops)', dialect=postgresql.dialect()) def test_create_index_with_using(self): m = MetaData() tbl = Table('testtbl', m, Column('data', String)) idx1 = Index('test_idx1', tbl.c.data) idx2 = Index('test_idx2', tbl.c.data, postgresql_using='btree') idx3 = Index('test_idx3', tbl.c.data, postgresql_using='hash') self.assert_compile(schema.CreateIndex(idx1), 'CREATE INDEX test_idx1 ON testtbl ' '(data)', dialect=postgresql.dialect()) self.assert_compile(schema.CreateIndex(idx2), 'CREATE INDEX test_idx2 ON testtbl ' 'USING btree (data)', dialect=postgresql.dialect()) self.assert_compile(schema.CreateIndex(idx3), 'CREATE INDEX test_idx3 ON testtbl ' 'USING hash (data)', dialect=postgresql.dialect()) def test_exclude_constraint_min(self): m = MetaData() tbl = Table('testtbl', m, Column('room', Integer, primary_key=True)) cons = ExcludeConstraint(('room', '=')) tbl.append_constraint(cons) self.assert_compile(schema.AddConstraint(cons), 'ALTER TABLE testtbl ADD EXCLUDE USING gist ' '(room WITH =)', dialect=postgresql.dialect()) def test_exclude_constraint_full(self): m = MetaData() room = Column('room', Integer, primary_key=True) tbl = Table('testtbl', m, room, Column('during', TSRANGE)) room = Column('room', Integer, primary_key=True) cons = ExcludeConstraint((room, '='), ('during', '&&'), name='my_name', using='gist', where="room > 100", deferrable=True, initially='immediate') tbl.append_constraint(cons) self.assert_compile(schema.AddConstraint(cons), 'ALTER TABLE testtbl ADD CONSTRAINT my_name ' 'EXCLUDE USING gist ' '(room WITH =, during WITH ''&&) WHERE ' '(room > 100) DEFERRABLE INITIALLY immediate', dialect=postgresql.dialect()) def test_exclude_constraint_copy(self): m = MetaData() cons = ExcludeConstraint(('room', '=')) tbl = Table('testtbl', m, Column('room', Integer, primary_key=True), cons) # apparently you can't copy a ColumnCollectionConstraint until # after it has been bound to a table... cons_copy = cons.copy() tbl.append_constraint(cons_copy) self.assert_compile(schema.AddConstraint(cons_copy), 'ALTER TABLE testtbl ADD EXCLUDE USING gist ' '(room WITH =)', dialect=postgresql.dialect()) def test_substring(self): self.assert_compile(func.substring('abc', 1, 2), 'SUBSTRING(%(substring_1)s FROM %(substring_2)s ' 'FOR %(substring_3)s)') self.assert_compile(func.substring('abc', 1), 'SUBSTRING(%(substring_1)s FROM %(substring_2)s)') def test_extract(self): t = table('t', column('col1', DateTime), column('col2', Date), column('col3', Time), column('col4', postgresql.INTERVAL)) for field in 'year', 'month', 'day', 'epoch', 'hour': for expr, compiled_expr in [ # invalid, no cast. plain # text. no cast. addition is # commutative subtraction is # not invalid - no cast. dont # crack up on entirely # unsupported types (t.c.col1, 't.col1 :: timestamp'), (t.c.col2, 't.col2 :: date'), (t.c.col3, 't.col3 :: time'), (func.current_timestamp() - datetime.timedelta(days=5), '(CURRENT_TIMESTAMP - %(current_timestamp_1)s) :: ' 'timestamp'), (func.current_timestamp() + func.current_timestamp(), 'CURRENT_TIMESTAMP + CURRENT_TIMESTAMP'), (text('foo.date + foo.time'), 'foo.date + foo.time'), (func.current_timestamp() + datetime.timedelta(days=5), '(CURRENT_TIMESTAMP + %(current_timestamp_1)s) :: ' 'timestamp'), (t.c.col2 + t.c.col3, '(t.col2 + t.col3) :: timestamp' ), (t.c.col2 + datetime.timedelta(days=5), '(t.col2 + %(col2_1)s) :: timestamp'), (datetime.timedelta(days=5) + t.c.col2, '(%(col2_1)s + t.col2) :: timestamp'), (t.c.col1 + t.c.col4, '(t.col1 + t.col4) :: timestamp' ), (t.c.col1 - datetime.timedelta(seconds=30), '(t.col1 - %(col1_1)s) :: timestamp'), (datetime.timedelta(seconds=30) - t.c.col1, '%(col1_1)s - t.col1'), (func.coalesce(t.c.col1, func.current_timestamp()), 'coalesce(t.col1, CURRENT_TIMESTAMP) :: timestamp'), (t.c.col3 + datetime.timedelta(seconds=30), '(t.col3 + %(col3_1)s) :: time'), (func.current_timestamp() - func.coalesce(t.c.col1, func.current_timestamp()), '(CURRENT_TIMESTAMP - coalesce(t.col1, ' 'CURRENT_TIMESTAMP)) :: interval'), (3 * func.foobar(type_=Interval), '(%(foobar_1)s * foobar()) :: interval'), (literal(datetime.timedelta(seconds=10)) - literal(datetime.timedelta(seconds=10)), '(%(param_1)s - %(param_2)s) :: interval'), (t.c.col3 + 'some string', 't.col3 + %(col3_1)s'), ]: self.assert_compile(select([extract(field, expr)]).select_from(t), 'SELECT EXTRACT(%s FROM %s) AS ' 'anon_1 FROM t' % (field, compiled_expr)) def test_reserved_words(self): table = Table("pg_table", MetaData(), Column("col1", Integer), Column("variadic", Integer)) x = select([table.c.col1, table.c.variadic]) self.assert_compile(x, '''SELECT pg_table.col1, pg_table."variadic" FROM pg_table''') def test_array(self): c = Column('x', postgresql.ARRAY(Integer)) self.assert_compile( cast(c, postgresql.ARRAY(Integer)), "CAST(x AS INTEGER[])" ) self.assert_compile( c[5], "x[%(x_1)s]", checkparams={'x_1': 5} ) self.assert_compile( c[5:7], "x[%(x_1)s:%(x_2)s]", checkparams={'x_2': 7, 'x_1': 5} ) self.assert_compile( c[5:7][2:3], "x[%(x_1)s:%(x_2)s][%(param_1)s:%(param_2)s]", checkparams={'x_2': 7, 'x_1': 5, 'param_1':2, 'param_2':3} ) self.assert_compile( c[5:7][3], "x[%(x_1)s:%(x_2)s][%(param_1)s]", checkparams={'x_2': 7, 'x_1': 5, 'param_1':3} ) self.assert_compile( c.contains([1]), 'x @> %(x_1)s', checkparams={'x_1': [1]} ) self.assert_compile( c.contained_by([2]), 'x <@ %(x_1)s', checkparams={'x_1': [2]} ) self.assert_compile( c.overlap([3]), 'x && %(x_1)s', checkparams={'x_1': [3]} ) self.assert_compile( postgresql.Any(4, c), '%(param_1)s = ANY (x)', checkparams={'param_1': 4} ) self.assert_compile( c.any(5, operator=operators.ne), '%(param_1)s != ANY (x)', checkparams={'param_1': 5} ) self.assert_compile( postgresql.All(6, c, operator=operators.gt), '%(param_1)s > ALL (x)', checkparams={'param_1': 6} ) self.assert_compile( c.all(7, operator=operators.lt), '%(param_1)s < ALL (x)', checkparams={'param_1': 7} ) def test_array_literal_type(self): is_(postgresql.array([1, 2]).type._type_affinity, postgresql.ARRAY) is_(postgresql.array([1, 2]).type.item_type._type_affinity, Integer) is_(postgresql.array([1, 2], type_=String). type.item_type._type_affinity, String) def test_array_literal(self): self.assert_compile( func.array_dims(postgresql.array([1, 2]) + postgresql.array([3, 4, 5])), "array_dims(ARRAY[%(param_1)s, %(param_2)s] || " "ARRAY[%(param_3)s, %(param_4)s, %(param_5)s])", checkparams={'param_5': 5, 'param_4': 4, 'param_1': 1, 'param_3': 3, 'param_2': 2} ) def test_array_literal_insert(self): m = MetaData() t = Table('t', m, Column('data', postgresql.ARRAY(Integer))) self.assert_compile( t.insert().values(data=array([1, 2, 3])), "INSERT INTO t (data) VALUES (ARRAY[%(param_1)s, " "%(param_2)s, %(param_3)s])" ) def test_update_array_element(self): m = MetaData() t = Table('t', m, Column('data', postgresql.ARRAY(Integer))) self.assert_compile( t.update().values({t.c.data[5]: 1}), "UPDATE t SET data[%(data_1)s]=%(param_1)s", checkparams={'data_1': 5, 'param_1': 1} ) def test_update_array_slice(self): m = MetaData() t = Table('t', m, Column('data', postgresql.ARRAY(Integer))) self.assert_compile( t.update().values({t.c.data[2:5]: 2}), "UPDATE t SET data[%(data_1)s:%(data_2)s]=%(param_1)s", checkparams={'param_1': 2, 'data_2': 5, 'data_1': 2} ) def test_from_only(self): m = MetaData() tbl1 = Table('testtbl1', m, Column('id', Integer)) tbl2 = Table('testtbl2', m, Column('id', Integer)) stmt = tbl1.select().with_hint(tbl1, 'ONLY', 'postgresql') expected = 'SELECT testtbl1.id FROM ONLY testtbl1' self.assert_compile(stmt, expected) talias1 = tbl1.alias('foo') stmt = talias1.select().with_hint(talias1, 'ONLY', 'postgresql') expected = 'SELECT foo.id FROM ONLY testtbl1 AS foo' self.assert_compile(stmt, expected) stmt = select([tbl1, tbl2]).with_hint(tbl1, 'ONLY', 'postgresql') expected = ('SELECT testtbl1.id, testtbl2.id FROM ONLY testtbl1, ' 'testtbl2') self.assert_compile(stmt, expected) stmt = select([tbl1, tbl2]).with_hint(tbl2, 'ONLY', 'postgresql') expected = ('SELECT testtbl1.id, testtbl2.id FROM testtbl1, ONLY ' 'testtbl2') self.assert_compile(stmt, expected) stmt = select([tbl1, tbl2]) stmt = stmt.with_hint(tbl1, 'ONLY', 'postgresql') stmt = stmt.with_hint(tbl2, 'ONLY', 'postgresql') expected = ('SELECT testtbl1.id, testtbl2.id FROM ONLY testtbl1, ' 'ONLY testtbl2') self.assert_compile(stmt, expected) stmt = update(tbl1, values=dict(id=1)) stmt = stmt.with_hint('ONLY', dialect_name='postgresql') expected = 'UPDATE ONLY testtbl1 SET id=%(id)s' self.assert_compile(stmt, expected) stmt = delete(tbl1).with_hint('ONLY', selectable=tbl1, dialect_name='postgresql') expected = 'DELETE FROM ONLY testtbl1' self.assert_compile(stmt, expected) tbl3 = Table('testtbl3', m, Column('id', Integer), schema='testschema') stmt = tbl3.select().with_hint(tbl3, 'ONLY', 'postgresql') expected = 'SELECT testschema.testtbl3.id FROM ONLY testschema.testtbl3' self.assert_compile(stmt, expected) assert_raises( exc.CompileError, tbl3.select().with_hint(tbl3, "FAKE", "postgresql").compile, dialect=postgresql.dialect() ) class FloatCoercionTest(fixtures.TablesTest, AssertsExecutionResults): __only_on__ = 'postgresql' __dialect__ = postgresql.dialect() @classmethod def define_tables(cls, metadata): data_table = Table('data_table', metadata, Column('id', Integer, primary_key=True), Column('data', Integer) ) @classmethod def insert_data(cls): data_table = cls.tables.data_table data_table.insert().execute( {'data':3}, {'data':5}, {'data':7}, {'data':2}, {'data':15}, {'data':12}, {'data':6}, {'data':478}, {'data':52}, {'data':9}, ) @testing.fails_on('postgresql+zxjdbc', 'XXX: postgresql+zxjdbc currently returns a Decimal result for Float') def test_float_coercion(self): data_table = self.tables.data_table for type_, result in [ (Numeric, decimal.Decimal('140.381230939')), (Float, 140.381230939), (Float(asdecimal=True), decimal.Decimal('140.381230939')), (Numeric(asdecimal=False), 140.381230939), ]: ret = testing.db.execute( select([ func.stddev_pop(data_table.c.data, type_=type_) ]) ).scalar() eq_(round_decimal(ret, 9), result) ret = testing.db.execute( select([ cast(func.stddev_pop(data_table.c.data), type_) ]) ).scalar() eq_(round_decimal(ret, 9), result) @testing.fails_on('postgresql+zxjdbc', 'zxjdbc has no support for PG arrays') @testing.provide_metadata def test_arrays(self): metadata = self.metadata t1 = Table('t', metadata, Column('x', postgresql.ARRAY(Float)), Column('y', postgresql.ARRAY(REAL)), Column('z', postgresql.ARRAY(postgresql.DOUBLE_PRECISION)), Column('q', postgresql.ARRAY(Numeric)) ) metadata.create_all() t1.insert().execute(x=[5], y=[5], z=[6], q=[decimal.Decimal("6.4")]) row = t1.select().execute().first() eq_( row, ([5], [5], [6], [decimal.Decimal("6.4")]) ) class EnumTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL): __only_on__ = 'postgresql' __dialect__ = postgresql.dialect() def test_compile(self): e1 = Enum('x', 'y', 'z', name='somename') e2 = Enum('x', 'y', 'z', name='somename', schema='someschema') self.assert_compile(postgresql.CreateEnumType(e1), "CREATE TYPE somename AS ENUM ('x','y','z')" ) self.assert_compile(postgresql.CreateEnumType(e2), "CREATE TYPE someschema.somename AS ENUM " "('x','y','z')") self.assert_compile(postgresql.DropEnumType(e1), 'DROP TYPE somename') self.assert_compile(postgresql.DropEnumType(e2), 'DROP TYPE someschema.somename') t1 = Table('sometable', MetaData(), Column('somecolumn', e1)) self.assert_compile(schema.CreateTable(t1), 'CREATE TABLE sometable (somecolumn ' 'somename)') t1 = Table('sometable', MetaData(), Column('somecolumn', Enum('x', 'y', 'z', native_enum=False))) self.assert_compile(schema.CreateTable(t1), "CREATE TABLE sometable (somecolumn " "VARCHAR(1), CHECK (somecolumn IN ('x', " "'y', 'z')))") @testing.fails_on('postgresql+zxjdbc', 'zxjdbc fails on ENUM: column "XXX" is of type ' 'XXX but expression is of type character varying') @testing.fails_on('postgresql+pg8000', 'zxjdbc fails on ENUM: column "XXX" is of type ' 'XXX but expression is of type text') def test_create_table(self): metadata = MetaData(testing.db) t1 = Table('table', metadata, Column('id', Integer, primary_key=True), Column('value', Enum('one', 'two' , 'three', name='onetwothreetype'))) t1.create() t1.create(checkfirst=True) # check the create try: t1.insert().execute(value='two') t1.insert().execute(value='three') t1.insert().execute(value='three') eq_(t1.select().order_by(t1.c.id).execute().fetchall(), [(1, 'two'), (2, 'three'), (3, 'three')]) finally: metadata.drop_all() metadata.drop_all() def test_name_required(self): metadata = MetaData(testing.db) etype = Enum('four', 'five', 'six', metadata=metadata) assert_raises(exc.CompileError, etype.create) assert_raises(exc.CompileError, etype.compile, dialect=postgresql.dialect()) @testing.fails_on('postgresql+zxjdbc', 'zxjdbc fails on ENUM: column "XXX" is of type ' 'XXX but expression is of type character varying') @testing.fails_on('postgresql+pg8000', 'zxjdbc fails on ENUM: column "XXX" is of type ' 'XXX but expression is of type text') @testing.provide_metadata def test_unicode_labels(self): metadata = self.metadata t1 = Table('table', metadata, Column('id', Integer, primary_key=True), Column('value', Enum(util.u('réveillé'), util.u('drôle'), util.u('S’il'), name='onetwothreetype')) ) metadata.create_all() t1.insert().execute(value=util.u('drôle')) t1.insert().execute(value=util.u('réveillé')) t1.insert().execute(value=util.u('S’il')) eq_(t1.select().order_by(t1.c.id).execute().fetchall(), [(1, util.u('drôle')), (2, util.u('réveillé')), (3, util.u('S’il'))] ) m2 = MetaData(testing.db) t2 = Table('table', m2, autoload=True) eq_( t2.c.value.type.enums, (util.u('réveillé'), util.u('drôle'), util.u('S’il')) ) def test_non_native_type(self): metadata = MetaData() t1 = Table('foo', metadata, Column('bar', Enum('one', 'two', 'three', name='myenum', native_enum=False))) def go(): t1.create(testing.db) try: self.assert_sql(testing.db, go, [], with_sequences=[("CREATE TABLE foo (\tbar " "VARCHAR(5), \tCONSTRAINT myenum CHECK " "(bar IN ('one', 'two', 'three')))", {})]) finally: metadata.drop_all(testing.db) @testing.provide_metadata def test_disable_create(self): metadata = self.metadata e1 = postgresql.ENUM('one', 'two', 'three', name="myenum", create_type=False) t1 = Table('e1', metadata, Column('c1', e1) ) # table can be created separately # without conflict e1.create(bind=testing.db) t1.create(testing.db) t1.drop(testing.db) e1.drop(bind=testing.db) @testing.provide_metadata def test_generate_multiple(self): """Test that the same enum twice only generates once for the create_all() call, without using checkfirst. A 'memo' collection held by the DDL runner now handles this. """ metadata = self.metadata e1 = Enum('one', 'two', 'three', name="myenum") t1 = Table('e1', metadata, Column('c1', e1) ) t2 = Table('e2', metadata, Column('c1', e1) ) metadata.create_all(checkfirst=False) metadata.drop_all(checkfirst=False) def test_non_native_dialect(self): engine = engines.testing_engine() engine.connect() engine.dialect.supports_native_enum = False metadata = MetaData() t1 = Table('foo', metadata, Column('bar', Enum('one', 'two', 'three', name='myenum'))) def go(): t1.create(engine) try: self.assert_sql(engine, go, [], with_sequences=[("CREATE TABLE foo (\tbar " "VARCHAR(5), \tCONSTRAINT myenum CHECK " "(bar IN ('one', 'two', 'three')))", {})]) finally: metadata.drop_all(engine) def test_standalone_enum(self): metadata = MetaData(testing.db) etype = Enum('four', 'five', 'six', name='fourfivesixtype', metadata=metadata) etype.create() try: assert testing.db.dialect.has_type(testing.db, 'fourfivesixtype') finally: etype.drop() assert not testing.db.dialect.has_type(testing.db, 'fourfivesixtype') metadata.create_all() try: assert testing.db.dialect.has_type(testing.db, 'fourfivesixtype') finally: metadata.drop_all() assert not testing.db.dialect.has_type(testing.db, 'fourfivesixtype') def test_no_support(self): def server_version_info(self): return (8, 2) e = engines.testing_engine() dialect = e.dialect dialect._get_server_version_info = server_version_info assert dialect.supports_native_enum e.connect() assert not dialect.supports_native_enum # initialize is called again on new pool e.dispose() e.connect() assert not dialect.supports_native_enum def test_reflection(self): metadata = MetaData(testing.db) etype = Enum('four', 'five', 'six', name='fourfivesixtype', metadata=metadata) t1 = Table('table', metadata, Column('id', Integer, primary_key=True), Column('value', Enum('one', 'two' , 'three', name='onetwothreetype')), Column('value2' , etype)) metadata.create_all() try: m2 = MetaData(testing.db) t2 = Table('table', m2, autoload=True) assert t2.c.value.type.enums == ('one', 'two', 'three') assert t2.c.value.type.name == 'onetwothreetype' assert t2.c.value2.type.enums == ('four', 'five', 'six') assert t2.c.value2.type.name == 'fourfivesixtype' finally: metadata.drop_all() def test_schema_reflection(self): metadata = MetaData(testing.db) etype = Enum( 'four', 'five', 'six', name='fourfivesixtype', schema='test_schema', metadata=metadata, ) t1 = Table('table', metadata, Column('id', Integer, primary_key=True), Column('value', Enum('one', 'two' , 'three', name='onetwothreetype', schema='test_schema')), Column('value2', etype)) metadata.create_all() try: m2 = MetaData(testing.db) t2 = Table('table', m2, autoload=True) assert t2.c.value.type.enums == ('one', 'two', 'three') assert t2.c.value.type.name == 'onetwothreetype' assert t2.c.value2.type.enums == ('four', 'five', 'six') assert t2.c.value2.type.name == 'fourfivesixtype' assert t2.c.value2.type.schema == 'test_schema' finally: metadata.drop_all() class NumericInterpretationTest(fixtures.TestBase): __only_on__ = 'postgresql' def test_numeric_codes(self): from sqlalchemy.dialects.postgresql import pg8000, psycopg2, base for dialect in (pg8000.dialect(), psycopg2.dialect()): typ = Numeric().dialect_impl(dialect) for code in base._INT_TYPES + base._FLOAT_TYPES + \ base._DECIMAL_TYPES: proc = typ.result_processor(dialect, code) val = 23.7 if proc is not None: val = proc(val) assert val in (23.7, decimal.Decimal("23.7")) @testing.provide_metadata def test_numeric_default(self): metadata = self.metadata # pg8000 appears to fail when the value is 0, # returns an int instead of decimal. t =Table('t', metadata, Column('id', Integer, primary_key=True), Column('nd', Numeric(asdecimal=True), default=1), Column('nf', Numeric(asdecimal=False), default=1), Column('fd', Float(asdecimal=True), default=1), Column('ff', Float(asdecimal=False), default=1), ) metadata.create_all() r = t.insert().execute() row = t.select().execute().first() assert isinstance(row[1], decimal.Decimal) assert isinstance(row[2], float) assert isinstance(row[3], decimal.Decimal) assert isinstance(row[4], float) eq_( row, (1, decimal.Decimal("1"), 1, decimal.Decimal("1"), 1) ) class InsertTest(fixtures.TestBase, AssertsExecutionResults): __only_on__ = 'postgresql' @classmethod def setup_class(cls): global metadata cls.engine = testing.db metadata = MetaData(testing.db) def teardown(self): metadata.drop_all() metadata.clear() if self.engine is not testing.db: self.engine.dispose() def test_compiled_insert(self): table = Table('testtable', metadata, Column('id', Integer, primary_key=True), Column('data', String(30))) metadata.create_all() ins = table.insert(inline=True, values={'data': bindparam('x' )}).compile() ins.execute({'x': 'five'}, {'x': 'seven'}) assert table.select().execute().fetchall() == [(1, 'five'), (2, 'seven')] def test_foreignkey_missing_insert(self): t1 = Table('t1', metadata, Column('id', Integer, primary_key=True)) t2 = Table('t2', metadata, Column('id', Integer, ForeignKey('t1.id'), primary_key=True)) metadata.create_all() # want to ensure that "null value in column "id" violates not- # null constraint" is raised (IntegrityError on psycoopg2, but # ProgrammingError on pg8000), and not "ProgrammingError: # (ProgrammingError) relationship "t2_id_seq" does not exist". # the latter corresponds to autoincrement behavior, which is not # the case here due to the foreign key. for eng in [engines.testing_engine(options={'implicit_returning' : False}), engines.testing_engine(options={'implicit_returning' : True})]: assert_raises_message(exc.DBAPIError, 'violates not-null constraint', eng.execute, t2.insert()) def test_sequence_insert(self): table = Table('testtable', metadata, Column('id', Integer, Sequence('my_seq'), primary_key=True), Column('data', String(30))) metadata.create_all() self._assert_data_with_sequence(table, 'my_seq') def test_sequence_returning_insert(self): table = Table('testtable', metadata, Column('id', Integer, Sequence('my_seq'), primary_key=True), Column('data', String(30))) metadata.create_all() self._assert_data_with_sequence_returning(table, 'my_seq') def test_opt_sequence_insert(self): table = Table('testtable', metadata, Column('id', Integer, Sequence('my_seq', optional=True), primary_key=True), Column('data', String(30))) metadata.create_all() self._assert_data_autoincrement(table) def test_opt_sequence_returning_insert(self): table = Table('testtable', metadata, Column('id', Integer, Sequence('my_seq', optional=True), primary_key=True), Column('data', String(30))) metadata.create_all() self._assert_data_autoincrement_returning(table) def test_autoincrement_insert(self): table = Table('testtable', metadata, Column('id', Integer, primary_key=True), Column('data', String(30))) metadata.create_all() self._assert_data_autoincrement(table) def test_autoincrement_returning_insert(self): table = Table('testtable', metadata, Column('id', Integer, primary_key=True), Column('data', String(30))) metadata.create_all() self._assert_data_autoincrement_returning(table) def test_noautoincrement_insert(self): table = Table('testtable', metadata, Column('id', Integer, primary_key=True, autoincrement=False), Column('data', String(30))) metadata.create_all() self._assert_data_noautoincrement(table) def _assert_data_autoincrement(self, table): self.engine = \ engines.testing_engine(options={'implicit_returning' : False}) metadata.bind = self.engine def go(): # execute with explicit id r = table.insert().execute({'id': 30, 'data': 'd1'}) assert r.inserted_primary_key == [30] # execute with prefetch id r = table.insert().execute({'data': 'd2'}) assert r.inserted_primary_key == [1] # executemany with explicit ids table.insert().execute({'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}) # executemany, uses SERIAL table.insert().execute({'data': 'd5'}, {'data': 'd6'}) # single execute, explicit id, inline table.insert(inline=True).execute({'id': 33, 'data': 'd7'}) # single execute, inline, uses SERIAL table.insert(inline=True).execute({'data': 'd8'}) # note that the test framework doesnt capture the "preexecute" # of a seqeuence or default. we just see it in the bind params. self.assert_sql(self.engine, go, [], with_sequences=[ ('INSERT INTO testtable (id, data) VALUES (:id, :data)', {'id': 30, 'data': 'd1'}), ('INSERT INTO testtable (id, data) VALUES (:id, :data)', {'id': 1, 'data': 'd2'}), ('INSERT INTO testtable (id, data) VALUES (:id, :data)', [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]), ('INSERT INTO testtable (data) VALUES (:data)', [{'data' : 'd5'}, {'data': 'd6'}]), ('INSERT INTO testtable (id, data) VALUES (:id, :data)', [{'id': 33, 'data': 'd7'}]), ('INSERT INTO testtable (data) VALUES (:data)', [{'data' : 'd8'}]), ]) assert table.select().execute().fetchall() == [ (30, 'd1'), (1, 'd2'), (31, 'd3'), (32, 'd4'), (2, 'd5'), (3, 'd6'), (33, 'd7'), (4, 'd8'), ] table.delete().execute() # test the same series of events using a reflected version of # the table m2 = MetaData(self.engine) table = Table(table.name, m2, autoload=True) def go(): table.insert().execute({'id': 30, 'data': 'd1'}) r = table.insert().execute({'data': 'd2'}) assert r.inserted_primary_key == [5] table.insert().execute({'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}) table.insert().execute({'data': 'd5'}, {'data': 'd6'}) table.insert(inline=True).execute({'id': 33, 'data': 'd7'}) table.insert(inline=True).execute({'data': 'd8'}) self.assert_sql(self.engine, go, [], with_sequences=[ ('INSERT INTO testtable (id, data) VALUES (:id, :data)', {'id': 30, 'data': 'd1'}), ('INSERT INTO testtable (id, data) VALUES (:id, :data)', {'id': 5, 'data': 'd2'}), ('INSERT INTO testtable (id, data) VALUES (:id, :data)', [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]), ('INSERT INTO testtable (data) VALUES (:data)', [{'data' : 'd5'}, {'data': 'd6'}]), ('INSERT INTO testtable (id, data) VALUES (:id, :data)', [{'id': 33, 'data': 'd7'}]), ('INSERT INTO testtable (data) VALUES (:data)', [{'data' : 'd8'}]), ]) assert table.select().execute().fetchall() == [ (30, 'd1'), (5, 'd2'), (31, 'd3'), (32, 'd4'), (6, 'd5'), (7, 'd6'), (33, 'd7'), (8, 'd8'), ] table.delete().execute() def _assert_data_autoincrement_returning(self, table): self.engine = \ engines.testing_engine(options={'implicit_returning': True}) metadata.bind = self.engine def go(): # execute with explicit id r = table.insert().execute({'id': 30, 'data': 'd1'}) assert r.inserted_primary_key == [30] # execute with prefetch id r = table.insert().execute({'data': 'd2'}) assert r.inserted_primary_key == [1] # executemany with explicit ids table.insert().execute({'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}) # executemany, uses SERIAL table.insert().execute({'data': 'd5'}, {'data': 'd6'}) # single execute, explicit id, inline table.insert(inline=True).execute({'id': 33, 'data': 'd7'}) # single execute, inline, uses SERIAL table.insert(inline=True).execute({'data': 'd8'}) self.assert_sql(self.engine, go, [], with_sequences=[ ('INSERT INTO testtable (id, data) VALUES (:id, :data)', {'id': 30, 'data': 'd1'}), ('INSERT INTO testtable (data) VALUES (:data) RETURNING ' 'testtable.id', {'data': 'd2'}), ('INSERT INTO testtable (id, data) VALUES (:id, :data)', [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]), ('INSERT INTO testtable (data) VALUES (:data)', [{'data' : 'd5'}, {'data': 'd6'}]), ('INSERT INTO testtable (id, data) VALUES (:id, :data)', [{'id': 33, 'data': 'd7'}]), ('INSERT INTO testtable (data) VALUES (:data)', [{'data' : 'd8'}]), ]) assert table.select().execute().fetchall() == [ (30, 'd1'), (1, 'd2'), (31, 'd3'), (32, 'd4'), (2, 'd5'), (3, 'd6'), (33, 'd7'), (4, 'd8'), ] table.delete().execute() # test the same series of events using a reflected version of # the table m2 = MetaData(self.engine) table = Table(table.name, m2, autoload=True) def go(): table.insert().execute({'id': 30, 'data': 'd1'}) r = table.insert().execute({'data': 'd2'}) assert r.inserted_primary_key == [5] table.insert().execute({'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}) table.insert().execute({'data': 'd5'}, {'data': 'd6'}) table.insert(inline=True).execute({'id': 33, 'data': 'd7'}) table.insert(inline=True).execute({'data': 'd8'}) self.assert_sql(self.engine, go, [], with_sequences=[ ('INSERT INTO testtable (id, data) VALUES (:id, :data)', {'id': 30, 'data': 'd1'}), ('INSERT INTO testtable (data) VALUES (:data) RETURNING ' 'testtable.id', {'data': 'd2'}), ('INSERT INTO testtable (id, data) VALUES (:id, :data)', [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]), ('INSERT INTO testtable (data) VALUES (:data)', [{'data' : 'd5'}, {'data': 'd6'}]), ('INSERT INTO testtable (id, data) VALUES (:id, :data)', [{'id': 33, 'data': 'd7'}]), ('INSERT INTO testtable (data) VALUES (:data)', [{'data' : 'd8'}]), ]) assert table.select().execute().fetchall() == [ (30, 'd1'), (5, 'd2'), (31, 'd3'), (32, 'd4'), (6, 'd5'), (7, 'd6'), (33, 'd7'), (8, 'd8'), ] table.delete().execute() def _assert_data_with_sequence(self, table, seqname): self.engine = \ engines.testing_engine(options={'implicit_returning' : False}) metadata.bind = self.engine def go(): table.insert().execute({'id': 30, 'data': 'd1'}) table.insert().execute({'data': 'd2'}) table.insert().execute({'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}) table.insert().execute({'data': 'd5'}, {'data': 'd6'}) table.insert(inline=True).execute({'id': 33, 'data': 'd7'}) table.insert(inline=True).execute({'data': 'd8'}) self.assert_sql(self.engine, go, [], with_sequences=[ ('INSERT INTO testtable (id, data) VALUES (:id, :data)', {'id': 30, 'data': 'd1'}), ('INSERT INTO testtable (id, data) VALUES (:id, :data)', {'id': 1, 'data': 'd2'}), ('INSERT INTO testtable (id, data) VALUES (:id, :data)', [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]), ("INSERT INTO testtable (id, data) VALUES (nextval('%s'), " ":data)" % seqname, [{'data': 'd5'}, {'data': 'd6'}]), ('INSERT INTO testtable (id, data) VALUES (:id, :data)', [{'id': 33, 'data': 'd7'}]), ("INSERT INTO testtable (id, data) VALUES (nextval('%s'), " ":data)" % seqname, [{'data': 'd8'}]), ]) assert table.select().execute().fetchall() == [ (30, 'd1'), (1, 'd2'), (31, 'd3'), (32, 'd4'), (2, 'd5'), (3, 'd6'), (33, 'd7'), (4, 'd8'), ] # cant test reflection here since the Sequence must be # explicitly specified def _assert_data_with_sequence_returning(self, table, seqname): self.engine = \ engines.testing_engine(options={'implicit_returning': True}) metadata.bind = self.engine def go(): table.insert().execute({'id': 30, 'data': 'd1'}) table.insert().execute({'data': 'd2'}) table.insert().execute({'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}) table.insert().execute({'data': 'd5'}, {'data': 'd6'}) table.insert(inline=True).execute({'id': 33, 'data': 'd7'}) table.insert(inline=True).execute({'data': 'd8'}) self.assert_sql(self.engine, go, [], with_sequences=[ ('INSERT INTO testtable (id, data) VALUES (:id, :data)', {'id': 30, 'data': 'd1'}), ("INSERT INTO testtable (id, data) VALUES " "(nextval('my_seq'), :data) RETURNING testtable.id", {'data': 'd2'}), ('INSERT INTO testtable (id, data) VALUES (:id, :data)', [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]), ("INSERT INTO testtable (id, data) VALUES (nextval('%s'), " ":data)" % seqname, [{'data': 'd5'}, {'data': 'd6'}]), ('INSERT INTO testtable (id, data) VALUES (:id, :data)', [{'id': 33, 'data': 'd7'}]), ("INSERT INTO testtable (id, data) VALUES (nextval('%s'), " ":data)" % seqname, [{'data': 'd8'}]), ]) assert table.select().execute().fetchall() == [ (30, 'd1'), (1, 'd2'), (31, 'd3'), (32, 'd4'), (2, 'd5'), (3, 'd6'), (33, 'd7'), (4, 'd8'), ] # cant test reflection here since the Sequence must be # explicitly specified def _assert_data_noautoincrement(self, table): self.engine = \ engines.testing_engine(options={'implicit_returning' : False}) metadata.bind = self.engine table.insert().execute({'id': 30, 'data': 'd1'}) if self.engine.driver == 'pg8000': exception_cls = exc.ProgrammingError elif self.engine.driver == 'pypostgresql': exception_cls = Exception else: exception_cls = exc.IntegrityError assert_raises_message(exception_cls, 'violates not-null constraint', table.insert().execute, {'data': 'd2'}) assert_raises_message(exception_cls, 'violates not-null constraint', table.insert().execute, {'data': 'd2'}, {'data': 'd3'}) assert_raises_message(exception_cls, 'violates not-null constraint', table.insert().execute, {'data': 'd2'}) assert_raises_message(exception_cls, 'violates not-null constraint', table.insert().execute, {'data': 'd2'}, {'data': 'd3'}) table.insert().execute({'id': 31, 'data': 'd2'}, {'id': 32, 'data': 'd3'}) table.insert(inline=True).execute({'id': 33, 'data': 'd4'}) assert table.select().execute().fetchall() == [(30, 'd1'), (31, 'd2'), (32, 'd3'), (33, 'd4')] table.delete().execute() # test the same series of events using a reflected version of # the table m2 = MetaData(self.engine) table = Table(table.name, m2, autoload=True) table.insert().execute({'id': 30, 'data': 'd1'}) assert_raises_message(exception_cls, 'violates not-null constraint', table.insert().execute, {'data': 'd2'}) assert_raises_message(exception_cls, 'violates not-null constraint', table.insert().execute, {'data': 'd2'}, {'data': 'd3'}) table.insert().execute({'id': 31, 'data': 'd2'}, {'id': 32, 'data': 'd3'}) table.insert(inline=True).execute({'id': 33, 'data': 'd4'}) assert table.select().execute().fetchall() == [(30, 'd1'), (31, 'd2'), (32, 'd3'), (33, 'd4')] class DomainReflectionTest(fixtures.TestBase, AssertsExecutionResults): """Test PostgreSQL domains""" __only_on__ = 'postgresql' @classmethod def setup_class(cls): con = testing.db.connect() for ddl in \ 'CREATE DOMAIN testdomain INTEGER NOT NULL DEFAULT 42', \ 'CREATE DOMAIN test_schema.testdomain INTEGER DEFAULT 0', \ "CREATE TYPE testtype AS ENUM ('test')", \ 'CREATE DOMAIN enumdomain AS testtype'\ : try: con.execute(ddl) except exc.DBAPIError as e: if not 'already exists' in str(e): raise e con.execute('CREATE TABLE testtable (question integer, answer ' 'testdomain)') con.execute('CREATE TABLE test_schema.testtable(question ' 'integer, answer test_schema.testdomain, anything ' 'integer)') con.execute('CREATE TABLE crosschema (question integer, answer ' 'test_schema.testdomain)') con.execute('CREATE TABLE enum_test (id integer, data enumdomain)') @classmethod def teardown_class(cls): con = testing.db.connect() con.execute('DROP TABLE testtable') con.execute('DROP TABLE test_schema.testtable') con.execute('DROP TABLE crosschema') con.execute('DROP DOMAIN testdomain') con.execute('DROP DOMAIN test_schema.testdomain') con.execute("DROP TABLE enum_test") con.execute("DROP DOMAIN enumdomain") con.execute("DROP TYPE testtype") def test_table_is_reflected(self): metadata = MetaData(testing.db) table = Table('testtable', metadata, autoload=True) eq_(set(table.columns.keys()), set(['question', 'answer']), "Columns of reflected table didn't equal expected columns") assert isinstance(table.c.answer.type, Integer) def test_domain_is_reflected(self): metadata = MetaData(testing.db) table = Table('testtable', metadata, autoload=True) eq_(str(table.columns.answer.server_default.arg), '42', "Reflected default value didn't equal expected value") assert not table.columns.answer.nullable, \ 'Expected reflected column to not be nullable.' def test_enum_domain_is_reflected(self): metadata = MetaData(testing.db) table = Table('enum_test', metadata, autoload=True) eq_( table.c.data.type.enums, ('test', ) ) def test_table_is_reflected_test_schema(self): metadata = MetaData(testing.db) table = Table('testtable', metadata, autoload=True, schema='test_schema') eq_(set(table.columns.keys()), set(['question', 'answer', 'anything']), "Columns of reflected table didn't equal expected columns") assert isinstance(table.c.anything.type, Integer) def test_schema_domain_is_reflected(self): metadata = MetaData(testing.db) table = Table('testtable', metadata, autoload=True, schema='test_schema') eq_(str(table.columns.answer.server_default.arg), '0', "Reflected default value didn't equal expected value") assert table.columns.answer.nullable, \ 'Expected reflected column to be nullable.' def test_crosschema_domain_is_reflected(self): metadata = MetaData(testing.db) table = Table('crosschema', metadata, autoload=True) eq_(str(table.columns.answer.server_default.arg), '0', "Reflected default value didn't equal expected value") assert table.columns.answer.nullable, \ 'Expected reflected column to be nullable.' def test_unknown_types(self): from sqlalchemy.databases import postgresql ischema_names = postgresql.PGDialect.ischema_names postgresql.PGDialect.ischema_names = {} try: m2 = MetaData(testing.db) assert_raises(exc.SAWarning, Table, 'testtable', m2, autoload=True) @testing.emits_warning('Did not recognize type') def warns(): m3 = MetaData(testing.db) t3 = Table('testtable', m3, autoload=True) assert t3.c.answer.type.__class__ == sa.types.NullType finally: postgresql.PGDialect.ischema_names = ischema_names class DistinctOnTest(fixtures.TestBase, AssertsCompiledSQL): """Test 'DISTINCT' with SQL expression language and orm.Query with an emphasis on PG's 'DISTINCT ON' syntax. """ __dialect__ = postgresql.dialect() def setup(self): self.table = Table('t', MetaData(), Column('id',Integer, primary_key=True), Column('a', String), Column('b', String), ) def test_plain_generative(self): self.assert_compile( select([self.table]).distinct(), "SELECT DISTINCT t.id, t.a, t.b FROM t" ) def test_on_columns_generative(self): self.assert_compile( select([self.table]).distinct(self.table.c.a), "SELECT DISTINCT ON (t.a) t.id, t.a, t.b FROM t" ) def test_on_columns_generative_multi_call(self): self.assert_compile( select([self.table]).distinct(self.table.c.a). distinct(self.table.c.b), "SELECT DISTINCT ON (t.a, t.b) t.id, t.a, t.b FROM t" ) def test_plain_inline(self): self.assert_compile( select([self.table], distinct=True), "SELECT DISTINCT t.id, t.a, t.b FROM t" ) def test_on_columns_inline_list(self): self.assert_compile( select([self.table], distinct=[self.table.c.a, self.table.c.b]). order_by(self.table.c.a, self.table.c.b), "SELECT DISTINCT ON (t.a, t.b) t.id, " "t.a, t.b FROM t ORDER BY t.a, t.b" ) def test_on_columns_inline_scalar(self): self.assert_compile( select([self.table], distinct=self.table.c.a), "SELECT DISTINCT ON (t.a) t.id, t.a, t.b FROM t" ) def test_query_plain(self): sess = Session() self.assert_compile( sess.query(self.table).distinct(), "SELECT DISTINCT t.id AS t_id, t.a AS t_a, " "t.b AS t_b FROM t" ) def test_query_on_columns(self): sess = Session() self.assert_compile( sess.query(self.table).distinct(self.table.c.a), "SELECT DISTINCT ON (t.a) t.id AS t_id, t.a AS t_a, " "t.b AS t_b FROM t" ) def test_query_on_columns_multi_call(self): sess = Session() self.assert_compile( sess.query(self.table).distinct(self.table.c.a). distinct(self.table.c.b), "SELECT DISTINCT ON (t.a, t.b) t.id AS t_id, t.a AS t_a, " "t.b AS t_b FROM t" ) def test_query_on_columns_subquery(self): sess = Session() class Foo(object): pass mapper(Foo, self.table) sess = Session() self.assert_compile( sess.query(Foo).from_self().distinct(Foo.a, Foo.b), "SELECT DISTINCT ON (anon_1.t_a, anon_1.t_b) anon_1.t_id " "AS anon_1_t_id, anon_1.t_a AS anon_1_t_a, anon_1.t_b " "AS anon_1_t_b FROM (SELECT t.id AS t_id, t.a AS t_a, " "t.b AS t_b FROM t) AS anon_1" ) def test_query_distinct_on_aliased(self): class Foo(object): pass mapper(Foo, self.table) a1 = aliased(Foo) sess = Session() self.assert_compile( sess.query(a1).distinct(a1.a), "SELECT DISTINCT ON (t_1.a) t_1.id AS t_1_id, " "t_1.a AS t_1_a, t_1.b AS t_1_b FROM t AS t_1" ) def test_distinct_on_subquery_anon(self): sq = select([self.table]).alias() q = select([self.table.c.id,sq.c.id]).\ distinct(sq.c.id).\ where(self.table.c.id==sq.c.id) self.assert_compile( q, "SELECT DISTINCT ON (anon_1.id) t.id, anon_1.id " "FROM t, (SELECT t.id AS id, t.a AS a, t.b " "AS b FROM t) AS anon_1 WHERE t.id = anon_1.id" ) def test_distinct_on_subquery_named(self): sq = select([self.table]).alias('sq') q = select([self.table.c.id,sq.c.id]).\ distinct(sq.c.id).\ where(self.table.c.id==sq.c.id) self.assert_compile( q, "SELECT DISTINCT ON (sq.id) t.id, sq.id " "FROM t, (SELECT t.id AS id, t.a AS a, " "t.b AS b FROM t) AS sq WHERE t.id = sq.id" ) class ReflectionTest(fixtures.TestBase): __only_on__ = 'postgresql' @testing.fails_if(('postgresql', '<', (8, 4)), "newer query is bypassed due to unsupported SQL functions") @testing.provide_metadata def test_reflected_primary_key_order(self): meta1 = self.metadata subject = Table('subject', meta1, Column('p1', Integer, primary_key=True), Column('p2', Integer, primary_key=True), PrimaryKeyConstraint('p2', 'p1') ) meta1.create_all() meta2 = MetaData(testing.db) subject = Table('subject', meta2, autoload=True) eq_(subject.primary_key.columns.keys(), ['p2', 'p1']) @testing.provide_metadata def test_pg_weirdchar_reflection(self): meta1 = self.metadata subject = Table('subject', meta1, Column('id$', Integer, primary_key=True)) referer = Table('referer', meta1, Column('id', Integer, primary_key=True), Column('ref', Integer, ForeignKey('subject.id$'))) meta1.create_all() meta2 = MetaData(testing.db) subject = Table('subject', meta2, autoload=True) referer = Table('referer', meta2, autoload=True) self.assert_((subject.c['id$'] == referer.c.ref).compare( subject.join(referer).onclause)) @testing.provide_metadata def test_renamed_sequence_reflection(self): metadata = self.metadata t = Table('t', metadata, Column('id', Integer, primary_key=True)) metadata.create_all() m2 = MetaData(testing.db) t2 = Table('t', m2, autoload=True, implicit_returning=False) eq_(t2.c.id.server_default.arg.text, "nextval('t_id_seq'::regclass)") r = t2.insert().execute() eq_(r.inserted_primary_key, [1]) testing.db.connect().execution_options(autocommit=True).\ execute('alter table t_id_seq rename to foobar_id_seq' ) m3 = MetaData(testing.db) t3 = Table('t', m3, autoload=True, implicit_returning=False) eq_(t3.c.id.server_default.arg.text, "nextval('foobar_id_seq'::regclass)") r = t3.insert().execute() eq_(r.inserted_primary_key, [2]) @testing.provide_metadata def test_renamed_pk_reflection(self): metadata = self.metadata t = Table('t', metadata, Column('id', Integer, primary_key=True)) metadata.create_all() testing.db.connect().execution_options(autocommit=True).\ execute('alter table t rename id to t_id') m2 = MetaData(testing.db) t2 = Table('t', m2, autoload=True) eq_([c.name for c in t2.primary_key], ['t_id']) @testing.provide_metadata def test_schema_reflection(self): """note: this test requires that the 'test_schema' schema be separate and accessible by the test user""" meta1 = self.metadata users = Table('users', meta1, Column('user_id', Integer, primary_key=True), Column('user_name', String(30), nullable=False), schema='test_schema') addresses = Table( 'email_addresses', meta1, Column('address_id', Integer, primary_key=True), Column('remote_user_id', Integer, ForeignKey(users.c.user_id)), Column('email_address', String(20)), schema='test_schema', ) meta1.create_all() meta2 = MetaData(testing.db) addresses = Table('email_addresses', meta2, autoload=True, schema='test_schema') users = Table('users', meta2, mustexist=True, schema='test_schema') j = join(users, addresses) self.assert_((users.c.user_id == addresses.c.remote_user_id).compare(j.onclause)) @testing.provide_metadata def test_schema_reflection_2(self): meta1 = self.metadata subject = Table('subject', meta1, Column('id', Integer, primary_key=True)) referer = Table('referer', meta1, Column('id', Integer, primary_key=True), Column('ref', Integer, ForeignKey('subject.id')), schema='test_schema') meta1.create_all() meta2 = MetaData(testing.db) subject = Table('subject', meta2, autoload=True) referer = Table('referer', meta2, schema='test_schema', autoload=True) self.assert_((subject.c.id == referer.c.ref).compare( subject.join(referer).onclause)) @testing.provide_metadata def test_schema_reflection_3(self): meta1 = self.metadata subject = Table('subject', meta1, Column('id', Integer, primary_key=True), schema='test_schema_2') referer = Table('referer', meta1, Column('id', Integer, primary_key=True), Column('ref', Integer, ForeignKey('test_schema_2.subject.id')), schema='test_schema') meta1.create_all() meta2 = MetaData(testing.db) subject = Table('subject', meta2, autoload=True, schema='test_schema_2') referer = Table('referer', meta2, schema='test_schema', autoload=True) self.assert_((subject.c.id == referer.c.ref).compare( subject.join(referer).onclause)) @testing.provide_metadata def test_uppercase_lowercase_table(self): metadata = self.metadata a_table = Table('a', metadata, Column('x', Integer)) A_table = Table('A', metadata, Column('x', Integer)) a_table.create() assert testing.db.has_table("a") assert not testing.db.has_table("A") A_table.create(checkfirst=True) assert testing.db.has_table("A") def test_uppercase_lowercase_sequence(self): a_seq = Sequence('a') A_seq = Sequence('A') a_seq.create(testing.db) assert testing.db.dialect.has_sequence(testing.db, "a") assert not testing.db.dialect.has_sequence(testing.db, "A") A_seq.create(testing.db, checkfirst=True) assert testing.db.dialect.has_sequence(testing.db, "A") a_seq.drop(testing.db) A_seq.drop(testing.db) def test_schema_reflection_multi_search_path(self): """test the 'set the same schema' rule when multiple schemas/search paths are in effect.""" db = engines.testing_engine() conn = db.connect() trans = conn.begin() try: conn.execute("set search_path to test_schema_2, " "test_schema, public") conn.dialect.default_schema_name = "test_schema_2" conn.execute(""" CREATE TABLE test_schema.some_table ( id SERIAL not null primary key ) """) conn.execute(""" CREATE TABLE test_schema_2.some_other_table ( id SERIAL not null primary key, sid INTEGER REFERENCES test_schema.some_table(id) ) """) m1 = MetaData() t2_schema = Table('some_other_table', m1, schema="test_schema_2", autoload=True, autoload_with=conn) t1_schema = Table('some_table', m1, schema="test_schema", autoload=True, autoload_with=conn) t2_no_schema = Table('some_other_table', m1, autoload=True, autoload_with=conn) t1_no_schema = Table('some_table', m1, autoload=True, autoload_with=conn) # OK, this because, "test_schema" is # in the search path, and might as well be # the default too. why would we assign # a "schema" to the Table ? assert t2_schema.c.sid.references( t1_no_schema.c.id) assert t2_no_schema.c.sid.references( t1_no_schema.c.id) finally: trans.rollback() conn.close() db.dispose() @testing.provide_metadata def test_index_reflection(self): """ Reflecting partial & expression-based indexes should warn """ metadata = self.metadata t1 = Table('party', metadata, Column('id', String(10), nullable=False), Column('name', String(20), index=True), Column('aname', String(20))) metadata.create_all() testing.db.execute(""" create index idx1 on party ((id || name)) """) testing.db.execute(""" create unique index idx2 on party (id) where name = 'test' """) testing.db.execute(""" create index idx3 on party using btree (lower(name::text), lower(aname::text)) """) def go(): m2 = MetaData(testing.db) t2 = Table('party', m2, autoload=True) assert len(t2.indexes) == 2 # Make sure indexes are in the order we expect them in tmp = [(idx.name, idx) for idx in t2.indexes] tmp.sort() r1, r2 = [idx[1] for idx in tmp] assert r1.name == 'idx2' assert r1.unique == True assert r2.unique == False assert [t2.c.id] == r1.columns assert [t2.c.name] == r2.columns testing.assert_warnings(go, [ 'Skipped unsupported reflection of ' 'expression-based index idx1', 'Predicate of partial index idx2 ignored during ' 'reflection', 'Skipped unsupported reflection of ' 'expression-based index idx3' ]) @testing.provide_metadata def test_index_reflection_modified(self): """reflect indexes when a column name has changed - PG 9 does not update the name of the column in the index def. [ticket:2141] """ metadata = self.metadata t1 = Table('t', metadata, Column('id', Integer, primary_key=True), Column('x', Integer) ) metadata.create_all() conn = testing.db.connect().execution_options(autocommit=True) conn.execute("CREATE INDEX idx1 ON t (x)") conn.execute("ALTER TABLE t RENAME COLUMN x to y") ind = testing.db.dialect.get_indexes(conn, "t", None) eq_(ind, [{'unique': False, 'column_names': ['y'], 'name': 'idx1'}]) conn.close() class CustomTypeReflectionTest(fixtures.TestBase): class CustomType(object): def __init__(self, arg1=None, arg2=None): self.arg1 = arg1 self.arg2 = arg2 ischema_names = None def setup(self): ischema_names = postgresql.PGDialect.ischema_names postgresql.PGDialect.ischema_names = ischema_names.copy() self.ischema_names = ischema_names def teardown(self): postgresql.PGDialect.ischema_names = self.ischema_names self.ischema_names = None def _assert_reflected(self, dialect): for sch, args in [ ('my_custom_type', (None, None)), ('my_custom_type()', (None, None)), ('my_custom_type(ARG1)', ('ARG1', None)), ('my_custom_type(ARG1, ARG2)', ('ARG1', 'ARG2')), ]: column_info = dialect._get_column_info( 'colname', sch, None, False, {}, {}, 'public') assert isinstance(column_info['type'], self.CustomType) eq_(column_info['type'].arg1, args[0]) eq_(column_info['type'].arg2, args[1]) def test_clslevel(self): postgresql.PGDialect.ischema_names['my_custom_type'] = self.CustomType dialect = postgresql.PGDialect() self._assert_reflected(dialect) def test_instancelevel(self): dialect = postgresql.PGDialect() dialect.ischema_names = dialect.ischema_names.copy() dialect.ischema_names['my_custom_type'] = self.CustomType self._assert_reflected(dialect) class MiscTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL): __only_on__ = 'postgresql' @testing.provide_metadata def test_date_reflection(self): metadata = self.metadata t1 = Table('pgdate', metadata, Column('date1', DateTime(timezone=True)), Column('date2', DateTime(timezone=False))) metadata.create_all() m2 = MetaData(testing.db) t2 = Table('pgdate', m2, autoload=True) assert t2.c.date1.type.timezone is True assert t2.c.date2.type.timezone is False @testing.fails_on('+zxjdbc', 'The JDBC driver handles the version parsing') def test_version_parsing(self): class MockConn(object): def __init__(self, res): self.res = res def execute(self, str): return self def scalar(self): return self.res for string, version in \ [('PostgreSQL 8.3.8 on i686-redhat-linux-gnu, compiled by ' 'GCC gcc (GCC) 4.1.2 20070925 (Red Hat 4.1.2-33)', (8, 3, 8)), ('PostgreSQL 8.5devel on x86_64-unknown-linux-gnu, ' 'compiled by GCC gcc (GCC) 4.4.2, 64-bit', (8, 5)), ('EnterpriseDB 9.1.2.2 on x86_64-unknown-linux-gnu, ' 'compiled by gcc (GCC) 4.1.2 20080704 (Red Hat 4.1.2-50), ' '64-bit', (9, 1, 2))]: eq_(testing.db.dialect._get_server_version_info(MockConn(string)), version) @testing.only_on('postgresql+psycopg2', 'psycopg2-specific feature') def test_psycopg2_version(self): v = testing.db.dialect.psycopg2_version assert testing.db.dialect.dbapi.__version__.\ startswith(".".join(str(x) for x in v)) @testing.only_on('postgresql+psycopg2', 'psycopg2-specific feature') def test_notice_logging(self): log = logging.getLogger('sqlalchemy.dialects.postgresql') buf = logging.handlers.BufferingHandler(100) lev = log.level log.addHandler(buf) log.setLevel(logging.INFO) try: conn = testing.db.connect() trans = conn.begin() try: conn.execute('create table foo (id serial primary key)') finally: trans.rollback() finally: log.removeHandler(buf) log.setLevel(lev) msgs = ' '.join(b.msg for b in buf.buffer) assert 'will create implicit sequence' in msgs assert 'will create implicit index' in msgs @testing.only_on('postgresql+psycopg2', 'psycopg2-specific feature') @engines.close_open_connections def test_client_encoding(self): c = testing.db.connect() current_encoding = c.connection.connection.encoding c.close() # attempt to use an encoding that's not # already set if current_encoding == 'UTF8': test_encoding = 'LATIN1' else: test_encoding = 'UTF8' e = engines.testing_engine( options={'client_encoding':test_encoding} ) c = e.connect() eq_(c.connection.connection.encoding, test_encoding) @testing.only_on('postgresql+psycopg2', 'psycopg2-specific feature') @engines.close_open_connections def test_autocommit_isolation_level(self): extensions = __import__('psycopg2.extensions').extensions c = testing.db.connect() c = c.execution_options(isolation_level='AUTOCOMMIT') eq_(c.connection.connection.isolation_level, extensions.ISOLATION_LEVEL_AUTOCOMMIT) @testing.fails_on('+zxjdbc', "Can't infer the SQL type to use for an instance " "of org.python.core.PyObjectDerived.") @testing.fails_on('+pg8000', "Can't determine correct type.") def test_extract(self): fivedaysago = datetime.datetime.now() \ - datetime.timedelta(days=5) for field, exp in ('year', fivedaysago.year), ('month', fivedaysago.month), ('day', fivedaysago.day): r = testing.db.execute(select([extract(field, func.now() + datetime.timedelta(days=-5))])).scalar() eq_(r, exp) def test_checksfor_sequence(self): meta1 = MetaData(testing.db) seq = Sequence('fooseq') t = Table('mytable', meta1, Column('col1', Integer, seq)) seq.drop() try: testing.db.execute('CREATE SEQUENCE fooseq') t.create(checkfirst=True) finally: t.drop(checkfirst=True) def test_schema_roundtrips(self): meta = MetaData(testing.db) users = Table('users', meta, Column('id', Integer, primary_key=True), Column('name', String(50)), schema='test_schema') users.create() try: users.insert().execute(id=1, name='name1') users.insert().execute(id=2, name='name2') users.insert().execute(id=3, name='name3') users.insert().execute(id=4, name='name4') eq_(users.select().where(users.c.name == 'name2' ).execute().fetchall(), [(2, 'name2')]) eq_(users.select(use_labels=True).where(users.c.name == 'name2').execute().fetchall(), [(2, 'name2')]) users.delete().where(users.c.id == 3).execute() eq_(users.select().where(users.c.name == 'name3' ).execute().fetchall(), []) users.update().where(users.c.name == 'name4' ).execute(name='newname') eq_(users.select(use_labels=True).where(users.c.id == 4).execute().fetchall(), [(4, 'newname')]) finally: users.drop() def test_preexecute_passivedefault(self): """test that when we get a primary key column back from reflecting a table which has a default value on it, we pre- execute that DefaultClause upon insert.""" try: meta = MetaData(testing.db) testing.db.execute(""" CREATE TABLE speedy_users ( speedy_user_id SERIAL PRIMARY KEY, user_name VARCHAR NOT NULL, user_password VARCHAR NOT NULL ); """) t = Table('speedy_users', meta, autoload=True) r = t.insert().execute(user_name='user', user_password='lala') assert r.inserted_primary_key == [1] l = t.select().execute().fetchall() assert l == [(1, 'user', 'lala')] finally: testing.db.execute('drop table speedy_users') @testing.fails_on('+zxjdbc', 'psycopg2/pg8000 specific assertion') @testing.fails_on('pypostgresql', 'psycopg2/pg8000 specific assertion') def test_numeric_raise(self): stmt = text("select cast('hi' as char) as hi", typemap={'hi' : Numeric}) assert_raises(exc.InvalidRequestError, testing.db.execute, stmt) def test_serial_integer(self): for type_, expected in [ (Integer, 'SERIAL'), (BigInteger, 'BIGSERIAL'), (SmallInteger, 'SMALLINT'), (postgresql.INTEGER, 'SERIAL'), (postgresql.BIGINT, 'BIGSERIAL'), ]: m = MetaData() t = Table('t', m, Column('c', type_, primary_key=True)) ddl_compiler = testing.db.dialect.ddl_compiler(testing.db.dialect, schema.CreateTable(t)) eq_( ddl_compiler.get_column_specification(t.c.c), "c %s NOT NULL" % expected ) class TimezoneTest(fixtures.TestBase): """Test timezone-aware datetimes. psycopg will return a datetime with a tzinfo attached to it, if postgresql returns it. python then will not let you compare a datetime with a tzinfo to a datetime that doesnt have one. this test illustrates two ways to have datetime types with and without timezone info. """ __only_on__ = 'postgresql' @classmethod def setup_class(cls): global tztable, notztable, metadata metadata = MetaData(testing.db) # current_timestamp() in postgresql is assumed to return # TIMESTAMP WITH TIMEZONE tztable = Table('tztable', metadata, Column('id', Integer, primary_key=True), Column('date', DateTime(timezone=True), onupdate=func.current_timestamp()), Column('name', String(20))) notztable = Table('notztable', metadata, Column('id', Integer, primary_key=True), Column('date', DateTime(timezone=False), onupdate=cast(func.current_timestamp(), DateTime(timezone=False))), Column('name', String(20))) metadata.create_all() @classmethod def teardown_class(cls): metadata.drop_all() @testing.fails_on('postgresql+zxjdbc', "XXX: postgresql+zxjdbc doesn't give a tzinfo back") def test_with_timezone(self): # get a date with a tzinfo somedate = \ testing.db.connect().scalar(func.current_timestamp().select()) assert somedate.tzinfo tztable.insert().execute(id=1, name='row1', date=somedate) row = select([tztable.c.date], tztable.c.id == 1).execute().first() eq_(row[0], somedate) eq_(somedate.tzinfo.utcoffset(somedate), row[0].tzinfo.utcoffset(row[0])) result = tztable.update(tztable.c.id == 1).returning(tztable.c.date).\ execute(name='newname' ) row = result.first() assert row[0] >= somedate def test_without_timezone(self): # get a date without a tzinfo somedate = datetime.datetime( 2005, 10, 20, 11, 52, 0, ) assert not somedate.tzinfo notztable.insert().execute(id=1, name='row1', date=somedate) row = select([notztable.c.date], notztable.c.id == 1).execute().first() eq_(row[0], somedate) eq_(row[0].tzinfo, None) result = notztable.update(notztable.c.id == 1).returning(notztable.c.date).\ execute(name='newname' ) row = result.first() assert row[0] >= somedate class TimePrecisionTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = postgresql.dialect() def test_compile(self): for type_, expected in [ (postgresql.TIME(), 'TIME WITHOUT TIME ZONE'), (postgresql.TIME(precision=5), 'TIME(5) WITHOUT TIME ZONE' ), (postgresql.TIME(timezone=True, precision=5), 'TIME(5) WITH TIME ZONE'), (postgresql.TIMESTAMP(), 'TIMESTAMP WITHOUT TIME ZONE'), (postgresql.TIMESTAMP(precision=5), 'TIMESTAMP(5) WITHOUT TIME ZONE'), (postgresql.TIMESTAMP(timezone=True, precision=5), 'TIMESTAMP(5) WITH TIME ZONE'), ]: self.assert_compile(type_, expected) @testing.only_on('postgresql', 'DB specific feature') @testing.provide_metadata def test_reflection(self): metadata = self.metadata t1 = Table( 't1', metadata, Column('c1', postgresql.TIME()), Column('c2', postgresql.TIME(precision=5)), Column('c3', postgresql.TIME(timezone=True, precision=5)), Column('c4', postgresql.TIMESTAMP()), Column('c5', postgresql.TIMESTAMP(precision=5)), Column('c6', postgresql.TIMESTAMP(timezone=True, precision=5)), ) t1.create() m2 = MetaData(testing.db) t2 = Table('t1', m2, autoload=True) eq_(t2.c.c1.type.precision, None) eq_(t2.c.c2.type.precision, 5) eq_(t2.c.c3.type.precision, 5) eq_(t2.c.c4.type.precision, None) eq_(t2.c.c5.type.precision, 5) eq_(t2.c.c6.type.precision, 5) eq_(t2.c.c1.type.timezone, False) eq_(t2.c.c2.type.timezone, False) eq_(t2.c.c3.type.timezone, True) eq_(t2.c.c4.type.timezone, False) eq_(t2.c.c5.type.timezone, False) eq_(t2.c.c6.type.timezone, True) class ArrayTest(fixtures.TablesTest, AssertsExecutionResults): __only_on__ = 'postgresql' __unsupported_on__ = 'postgresql+pg8000', 'postgresql+zxjdbc' @classmethod def define_tables(cls, metadata): class ProcValue(TypeDecorator): impl = postgresql.ARRAY(Integer, dimensions=2) def process_bind_param(self, value, dialect): if value is None: return None return [ [x + 5 for x in v] for v in value ] def process_result_value(self, value, dialect): if value is None: return None return [ [x - 7 for x in v] for v in value ] Table('arrtable', metadata, Column('id', Integer, primary_key=True), Column('intarr', postgresql.ARRAY(Integer)), Column('strarr', postgresql.ARRAY(Unicode())), Column('dimarr', ProcValue) ) Table('dim_arrtable', metadata, Column('id', Integer, primary_key=True), Column('intarr', postgresql.ARRAY(Integer, dimensions=1)), Column('strarr', postgresql.ARRAY(Unicode(), dimensions=1)), Column('dimarr', ProcValue) ) def _fixture_456(self, table): testing.db.execute( table.insert(), intarr=[4, 5, 6] ) def test_reflect_array_column(self): metadata2 = MetaData(testing.db) tbl = Table('arrtable', metadata2, autoload=True) assert isinstance(tbl.c.intarr.type, postgresql.ARRAY) assert isinstance(tbl.c.strarr.type, postgresql.ARRAY) assert isinstance(tbl.c.intarr.type.item_type, Integer) assert isinstance(tbl.c.strarr.type.item_type, String) def test_insert_array(self): arrtable = self.tables.arrtable arrtable.insert().execute(intarr=[1, 2, 3], strarr=[util.u('abc'), util.u('def')]) results = arrtable.select().execute().fetchall() eq_(len(results), 1) eq_(results[0]['intarr'], [1, 2, 3]) eq_(results[0]['strarr'], [util.u('abc'), util.u('def')]) def test_array_where(self): arrtable = self.tables.arrtable arrtable.insert().execute(intarr=[1, 2, 3], strarr=[util.u('abc'), util.u('def')]) arrtable.insert().execute(intarr=[4, 5, 6], strarr=util.u('ABC')) results = arrtable.select().where(arrtable.c.intarr == [1, 2, 3]).execute().fetchall() eq_(len(results), 1) eq_(results[0]['intarr'], [1, 2, 3]) def test_array_concat(self): arrtable = self.tables.arrtable arrtable.insert().execute(intarr=[1, 2, 3], strarr=[util.u('abc'), util.u('def')]) results = select([arrtable.c.intarr + [4, 5, 6]]).execute().fetchall() eq_(len(results), 1) eq_(results[0][0], [ 1, 2, 3, 4, 5, 6, ]) def test_array_subtype_resultprocessor(self): arrtable = self.tables.arrtable arrtable.insert().execute(intarr=[4, 5, 6], strarr=[[util.ue('m\xe4\xe4')], [ util.ue('m\xf6\xf6')]]) arrtable.insert().execute(intarr=[1, 2, 3], strarr=[ util.ue('m\xe4\xe4'), util.ue('m\xf6\xf6')]) results = \ arrtable.select(order_by=[arrtable.c.intarr]).execute().fetchall() eq_(len(results), 2) eq_(results[0]['strarr'], [util.ue('m\xe4\xe4'), util.ue('m\xf6\xf6')]) eq_(results[1]['strarr'], [[util.ue('m\xe4\xe4')], [util.ue('m\xf6\xf6')]]) def test_array_literal(self): eq_( testing.db.scalar( select([ postgresql.array([1, 2]) + postgresql.array([3, 4, 5]) ]) ), [1,2,3,4,5] ) def test_array_getitem_single_type(self): arrtable = self.tables.arrtable is_(arrtable.c.intarr[1].type._type_affinity, Integer) is_(arrtable.c.strarr[1].type._type_affinity, String) def test_array_getitem_slice_type(self): arrtable = self.tables.arrtable is_(arrtable.c.intarr[1:3].type._type_affinity, postgresql.ARRAY) is_(arrtable.c.strarr[1:3].type._type_affinity, postgresql.ARRAY) def test_array_getitem_single_exec(self): arrtable = self.tables.arrtable self._fixture_456(arrtable) eq_( testing.db.scalar(select([arrtable.c.intarr[2]])), 5 ) testing.db.execute( arrtable.update().values({arrtable.c.intarr[2]: 7}) ) eq_( testing.db.scalar(select([arrtable.c.intarr[2]])), 7 ) def test_undim_array_empty(self): arrtable = self.tables.arrtable self._fixture_456(arrtable) eq_( testing.db.scalar( select([arrtable.c.intarr]). where(arrtable.c.intarr.contains([])) ), [4, 5, 6] ) def test_array_getitem_slice_exec(self): arrtable = self.tables.arrtable testing.db.execute( arrtable.insert(), intarr=[4, 5, 6], strarr=[util.u('abc'), util.u('def')] ) eq_( testing.db.scalar(select([arrtable.c.intarr[2:3]])), [5, 6] ) testing.db.execute( arrtable.update().values({arrtable.c.intarr[2:3]: [7, 8]}) ) eq_( testing.db.scalar(select([arrtable.c.intarr[2:3]])), [7, 8] ) def _test_undim_array_contains_typed_exec(self, struct): arrtable = self.tables.arrtable self._fixture_456(arrtable) eq_( testing.db.scalar( select([arrtable.c.intarr]). where(arrtable.c.intarr.contains(struct([4, 5]))) ), [4, 5, 6] ) def test_undim_array_contains_set_exec(self): self._test_undim_array_contains_typed_exec(set) def test_undim_array_contains_list_exec(self): self._test_undim_array_contains_typed_exec(list) def test_undim_array_contains_generator_exec(self): self._test_undim_array_contains_typed_exec( lambda elem: (x for x in elem)) def _test_dim_array_contains_typed_exec(self, struct): dim_arrtable = self.tables.dim_arrtable self._fixture_456(dim_arrtable) eq_( testing.db.scalar( select([dim_arrtable.c.intarr]). where(dim_arrtable.c.intarr.contains(struct([4, 5]))) ), [4, 5, 6] ) def test_dim_array_contains_set_exec(self): self._test_dim_array_contains_typed_exec(set) def test_dim_array_contains_list_exec(self): self._test_dim_array_contains_typed_exec(list) def test_dim_array_contains_generator_exec(self): self._test_dim_array_contains_typed_exec(lambda elem: (x for x in elem)) def test_array_contained_by_exec(self): arrtable = self.tables.arrtable with testing.db.connect() as conn: conn.execute( arrtable.insert(), intarr=[6, 5, 4] ) eq_( conn.scalar( select([arrtable.c.intarr.contained_by([4, 5, 6, 7])]) ), True ) def test_array_overlap_exec(self): arrtable = self.tables.arrtable with testing.db.connect() as conn: conn.execute( arrtable.insert(), intarr=[4, 5, 6] ) eq_( conn.scalar( select([arrtable.c.intarr]). where(arrtable.c.intarr.overlap([7, 6])) ), [4, 5, 6] ) def test_array_any_exec(self): arrtable = self.tables.arrtable with testing.db.connect() as conn: conn.execute( arrtable.insert(), intarr=[4, 5, 6] ) eq_( conn.scalar( select([arrtable.c.intarr]). where(postgresql.Any(5, arrtable.c.intarr)) ), [4, 5, 6] ) def test_array_all_exec(self): arrtable = self.tables.arrtable with testing.db.connect() as conn: conn.execute( arrtable.insert(), intarr=[4, 5, 6] ) eq_( conn.scalar( select([arrtable.c.intarr]). where(arrtable.c.intarr.all(4, operator=operators.le)) ), [4, 5, 6] ) @testing.provide_metadata def test_tuple_flag(self): metadata = self.metadata t1 = Table('t1', metadata, Column('id', Integer, primary_key=True), Column('data', postgresql.ARRAY(String(5), as_tuple=True)), Column('data2', postgresql.ARRAY(Numeric(asdecimal=False), as_tuple=True)), ) metadata.create_all() testing.db.execute(t1.insert(), id=1, data=["1","2","3"], data2=[5.4, 5.6]) testing.db.execute(t1.insert(), id=2, data=["4", "5", "6"], data2=[1.0]) testing.db.execute(t1.insert(), id=3, data=[["4", "5"], ["6", "7"]], data2=[[5.4, 5.6], [1.0, 1.1]]) r = testing.db.execute(t1.select().order_by(t1.c.id)).fetchall() eq_( r, [ (1, ('1', '2', '3'), (5.4, 5.6)), (2, ('4', '5', '6'), (1.0,)), (3, (('4', '5'), ('6', '7')), ((5.4, 5.6), (1.0, 1.1))) ] ) # hashable eq_( set(row[1] for row in r), set([('1', '2', '3'), ('4', '5', '6'), (('4', '5'), ('6', '7'))]) ) def test_dimension(self): arrtable = self.tables.arrtable testing.db.execute(arrtable.insert(), dimarr=[[1, 2, 3], [4,5, 6]]) eq_( testing.db.scalar(select([arrtable.c.dimarr])), [[-1, 0, 1], [2, 3, 4]] ) class TimestampTest(fixtures.TestBase, AssertsExecutionResults): __only_on__ = 'postgresql' def test_timestamp(self): engine = testing.db connection = engine.connect() s = select(["timestamp '2007-12-25'"]) result = connection.execute(s).first() eq_(result[0], datetime.datetime(2007, 12, 25, 0, 0)) class ServerSideCursorsTest(fixtures.TestBase, AssertsExecutionResults): __only_on__ = 'postgresql+psycopg2' def _fixture(self, server_side_cursors): self.engine = engines.testing_engine( options={'server_side_cursors':server_side_cursors} ) return self.engine def tearDown(self): engines.testing_reaper.close_all() self.engine.dispose() def test_global_string(self): engine = self._fixture(True) result = engine.execute('select 1') assert result.cursor.name def test_global_text(self): engine = self._fixture(True) result = engine.execute(text('select 1')) assert result.cursor.name def test_global_expr(self): engine = self._fixture(True) result = engine.execute(select([1])) assert result.cursor.name def test_global_off_explicit(self): engine = self._fixture(False) result = engine.execute(text('select 1')) # It should be off globally ... assert not result.cursor.name def test_stmt_option(self): engine = self._fixture(False) s = select([1]).execution_options(stream_results=True) result = engine.execute(s) # ... but enabled for this one. assert result.cursor.name def test_conn_option(self): engine = self._fixture(False) # and this one result = \ engine.connect().execution_options(stream_results=True).\ execute('select 1' ) assert result.cursor.name def test_stmt_enabled_conn_option_disabled(self): engine = self._fixture(False) s = select([1]).execution_options(stream_results=True) # not this one result = \ engine.connect().execution_options(stream_results=False).\ execute(s) assert not result.cursor.name def test_stmt_option_disabled(self): engine = self._fixture(True) s = select([1]).execution_options(stream_results=False) result = engine.execute(s) assert not result.cursor.name def test_aliases_and_ss(self): engine = self._fixture(False) s1 = select([1]).execution_options(stream_results=True).alias() result = engine.execute(s1) assert result.cursor.name # s1's options shouldn't affect s2 when s2 is used as a # from_obj. s2 = select([1], from_obj=s1) result = engine.execute(s2) assert not result.cursor.name def test_for_update_expr(self): engine = self._fixture(True) s1 = select([1], for_update=True) result = engine.execute(s1) assert result.cursor.name def test_for_update_string(self): engine = self._fixture(True) result = engine.execute('SELECT 1 FOR UPDATE') assert result.cursor.name def test_text_no_ss(self): engine = self._fixture(False) s = text('select 42') result = engine.execute(s) assert not result.cursor.name def test_text_ss_option(self): engine = self._fixture(False) s = text('select 42').execution_options(stream_results=True) result = engine.execute(s) assert result.cursor.name def test_roundtrip(self): engine = self._fixture(True) test_table = Table('test_table', MetaData(engine), Column('id', Integer, primary_key=True), Column('data', String(50))) test_table.create(checkfirst=True) try: test_table.insert().execute(data='data1') nextid = engine.execute(Sequence('test_table_id_seq')) test_table.insert().execute(id=nextid, data='data2') eq_(test_table.select().execute().fetchall(), [(1, 'data1' ), (2, 'data2')]) test_table.update().where(test_table.c.id == 2).values(data=test_table.c.data + ' updated' ).execute() eq_(test_table.select().execute().fetchall(), [(1, 'data1' ), (2, 'data2 updated')]) test_table.delete().execute() eq_(test_table.count().scalar(), 0) finally: test_table.drop(checkfirst=True) class SpecialTypesTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL): """test DDL and reflection of PG-specific types """ __only_on__ = 'postgresql' __excluded_on__ = (('postgresql', '<', (8, 3, 0)),) @classmethod def setup_class(cls): global metadata, table metadata = MetaData(testing.db) # create these types so that we can issue # special SQL92 INTERVAL syntax class y2m(types.UserDefinedType, postgresql.INTERVAL): def get_col_spec(self): return "INTERVAL YEAR TO MONTH" class d2s(types.UserDefinedType, postgresql.INTERVAL): def get_col_spec(self): return "INTERVAL DAY TO SECOND" table = Table('sometable', metadata, Column('id', postgresql.UUID, primary_key=True), Column('flag', postgresql.BIT), Column('bitstring', postgresql.BIT(4)), Column('addr', postgresql.INET), Column('addr2', postgresql.MACADDR), Column('addr3', postgresql.CIDR), Column('doubleprec', postgresql.DOUBLE_PRECISION), Column('plain_interval', postgresql.INTERVAL), Column('year_interval', y2m()), Column('month_interval', d2s()), Column('precision_interval', postgresql.INTERVAL(precision=3)) ) metadata.create_all() # cheat so that the "strict type check" # works table.c.year_interval.type = postgresql.INTERVAL() table.c.month_interval.type = postgresql.INTERVAL() @classmethod def teardown_class(cls): metadata.drop_all() def test_reflection(self): m = MetaData(testing.db) t = Table('sometable', m, autoload=True) self.assert_tables_equal(table, t, strict_types=True) assert t.c.plain_interval.type.precision is None assert t.c.precision_interval.type.precision == 3 assert t.c.bitstring.type.length == 4 def test_bit_compile(self): pairs = [(postgresql.BIT(), 'BIT(1)'), (postgresql.BIT(5), 'BIT(5)'), (postgresql.BIT(varying=True), 'BIT VARYING'), (postgresql.BIT(5, varying=True), 'BIT VARYING(5)'), ] for type_, expected in pairs: self.assert_compile(type_, expected) @testing.provide_metadata def test_bit_reflection(self): metadata = self.metadata t1 = Table('t1', metadata, Column('bit1', postgresql.BIT()), Column('bit5', postgresql.BIT(5)), Column('bitvarying', postgresql.BIT(varying=True)), Column('bitvarying5', postgresql.BIT(5, varying=True)), ) t1.create() m2 = MetaData(testing.db) t2 = Table('t1', m2, autoload=True) eq_(t2.c.bit1.type.length, 1) eq_(t2.c.bit1.type.varying, False) eq_(t2.c.bit5.type.length, 5) eq_(t2.c.bit5.type.varying, False) eq_(t2.c.bitvarying.type.length, None) eq_(t2.c.bitvarying.type.varying, True) eq_(t2.c.bitvarying5.type.length, 5) eq_(t2.c.bitvarying5.type.varying, True) class UUIDTest(fixtures.TestBase): """Test the bind/return values of the UUID type.""" __only_on__ = 'postgresql' @testing.requires.python25 @testing.fails_on('postgresql+zxjdbc', 'column "data" is of type uuid but expression is of type character varying') @testing.fails_on('postgresql+pg8000', 'No support for UUID type') def test_uuid_string(self): import uuid self._test_round_trip( Table('utable', MetaData(), Column('data', postgresql.UUID()) ), str(uuid.uuid4()), str(uuid.uuid4()) ) @testing.requires.python25 @testing.fails_on('postgresql+zxjdbc', 'column "data" is of type uuid but expression is of type character varying') @testing.fails_on('postgresql+pg8000', 'No support for UUID type') def test_uuid_uuid(self): import uuid self._test_round_trip( Table('utable', MetaData(), Column('data', postgresql.UUID(as_uuid=True)) ), uuid.uuid4(), uuid.uuid4() ) def test_no_uuid_available(self): from sqlalchemy.dialects.postgresql import base uuid_type = base._python_UUID base._python_UUID = None try: assert_raises( NotImplementedError, postgresql.UUID, as_uuid=True ) finally: base._python_UUID = uuid_type def setup(self): self.conn = testing.db.connect() trans = self.conn.begin() def teardown(self): self.conn.close() def _test_round_trip(self, utable, value1, value2): utable.create(self.conn) self.conn.execute(utable.insert(), {'data':value1}) self.conn.execute(utable.insert(), {'data':value2}) r = self.conn.execute( select([utable.c.data]). where(utable.c.data != value1) ) eq_(r.fetchone()[0], value2) eq_(r.fetchone(), None) class MatchTest(fixtures.TestBase, AssertsCompiledSQL): __only_on__ = 'postgresql' __excluded_on__ = ('postgresql', '<', (8, 3, 0)), @classmethod def setup_class(cls): global metadata, cattable, matchtable metadata = MetaData(testing.db) cattable = Table('cattable', metadata, Column('id', Integer, primary_key=True), Column('description', String(50))) matchtable = Table('matchtable', metadata, Column('id', Integer, primary_key=True), Column('title', String(200)), Column('category_id', Integer, ForeignKey('cattable.id'))) metadata.create_all() cattable.insert().execute([{'id': 1, 'description': 'Python'}, {'id': 2, 'description': 'Ruby'}]) matchtable.insert().execute([{'id': 1, 'title' : 'Agile Web Development with Rails' , 'category_id': 2}, {'id': 2, 'title': 'Dive Into Python', 'category_id': 1}, {'id': 3, 'title' : "Programming Matz's Ruby", 'category_id': 2}, {'id': 4, 'title' : 'The Definitive Guide to Django', 'category_id': 1}, {'id': 5, 'title' : 'Python in a Nutshell', 'category_id': 1}]) @classmethod def teardown_class(cls): metadata.drop_all() @testing.fails_on('postgresql+pg8000', 'uses positional') @testing.fails_on('postgresql+zxjdbc', 'uses qmark') def test_expression_pyformat(self): self.assert_compile(matchtable.c.title.match('somstr'), 'matchtable.title @@ to_tsquery(%(title_1)s' ')') @testing.fails_on('postgresql+psycopg2', 'uses pyformat') @testing.fails_on('postgresql+pypostgresql', 'uses pyformat') @testing.fails_on('postgresql+zxjdbc', 'uses qmark') def test_expression_positional(self): self.assert_compile(matchtable.c.title.match('somstr'), 'matchtable.title @@ to_tsquery(%s)') def test_simple_match(self): results = \ matchtable.select().where(matchtable.c.title.match('python' )).order_by(matchtable.c.id).execute().fetchall() eq_([2, 5], [r.id for r in results]) def test_simple_match_with_apostrophe(self): results = \ matchtable.select().where(matchtable.c.title.match("Matz's" )).execute().fetchall() eq_([3], [r.id for r in results]) def test_simple_derivative_match(self): results = \ matchtable.select().where(matchtable.c.title.match('nutshells' )).execute().fetchall() eq_([5], [r.id for r in results]) def test_or_match(self): results1 = \ matchtable.select().where(or_(matchtable.c.title.match('nutshells' ), matchtable.c.title.match('rubies' ))).order_by(matchtable.c.id).execute().fetchall() eq_([3, 5], [r.id for r in results1]) results2 = \ matchtable.select().where( matchtable.c.title.match('nutshells | rubies' )).order_by(matchtable.c.id).execute().fetchall() eq_([3, 5], [r.id for r in results2]) def test_and_match(self): results1 = \ matchtable.select().where(and_(matchtable.c.title.match('python' ), matchtable.c.title.match('nutshells' ))).execute().fetchall() eq_([5], [r.id for r in results1]) results2 = \ matchtable.select().where( matchtable.c.title.match('python & nutshells' )).execute().fetchall() eq_([5], [r.id for r in results2]) def test_match_across_joins(self): results = matchtable.select().where(and_(cattable.c.id == matchtable.c.category_id, or_(cattable.c.description.match('Ruby'), matchtable.c.title.match('nutshells' )))).order_by(matchtable.c.id).execute().fetchall() eq_([1, 3, 5], [r.id for r in results]) class TupleTest(fixtures.TestBase): __only_on__ = 'postgresql' def test_tuple_containment(self): for test, exp in [ ([('a', 'b')], True), ([('a', 'c')], False), ([('f', 'q'), ('a', 'b')], True), ([('f', 'q'), ('a', 'c')], False) ]: eq_( testing.db.execute( select([ tuple_( literal_column("'a'"), literal_column("'b'") ).\ in_([ tuple_(*[ literal_column("'%s'" % letter) for letter in elem ]) for elem in test ]) ]) ).scalar(), exp ) class HStoreTest(fixtures.TestBase): def _assert_sql(self, construct, expected): dialect = postgresql.dialect() compiled = str(construct.compile(dialect=dialect)) compiled = re.sub(r'\s+', ' ', compiled) expected = re.sub(r'\s+', ' ', expected) eq_(compiled, expected) def setup(self): metadata = MetaData() self.test_table = Table('test_table', metadata, Column('id', Integer, primary_key=True), Column('hash', HSTORE) ) self.hashcol = self.test_table.c.hash def _test_where(self, whereclause, expected): stmt = select([self.test_table]).where(whereclause) self._assert_sql( stmt, "SELECT test_table.id, test_table.hash FROM test_table " "WHERE %s" % expected ) def _test_cols(self, colclause, expected, from_=True): stmt = select([colclause]) self._assert_sql( stmt, ( "SELECT %s" + (" FROM test_table" if from_ else "") ) % expected ) def test_bind_serialize_default(self): from sqlalchemy.engine import default dialect = default.DefaultDialect() proc = self.test_table.c.hash.type._cached_bind_processor(dialect) eq_( proc(util.OrderedDict([("key1", "value1"), ("key2", "value2")])), '"key1"=>"value1", "key2"=>"value2"' ) def test_parse_error(self): from sqlalchemy.engine import default dialect = default.DefaultDialect() proc = self.test_table.c.hash.type._cached_result_processor( dialect, None) assert_raises_message( ValueError, r'''After u?'\[\.\.\.\], "key1"=>"value1", ', could not parse ''' '''residual at position 36: u?'crapcrapcrap, "key3"\[\.\.\.\]''', proc, '"key2"=>"value2", "key1"=>"value1", ' 'crapcrapcrap, "key3"=>"value3"' ) def test_result_deserialize_default(self): from sqlalchemy.engine import default dialect = default.DefaultDialect() proc = self.test_table.c.hash.type._cached_result_processor( dialect, None) eq_( proc('"key2"=>"value2", "key1"=>"value1"'), {"key1": "value1", "key2": "value2"} ) def test_bind_serialize_psycopg2(self): from sqlalchemy.dialects.postgresql import psycopg2 dialect = psycopg2.PGDialect_psycopg2() dialect._has_native_hstore = True proc = self.test_table.c.hash.type._cached_bind_processor(dialect) is_(proc, None) dialect = psycopg2.PGDialect_psycopg2() dialect._has_native_hstore = False proc = self.test_table.c.hash.type._cached_bind_processor(dialect) eq_( proc(util.OrderedDict([("key1", "value1"), ("key2", "value2")])), '"key1"=>"value1", "key2"=>"value2"' ) def test_result_deserialize_psycopg2(self): from sqlalchemy.dialects.postgresql import psycopg2 dialect = psycopg2.PGDialect_psycopg2() dialect._has_native_hstore = True proc = self.test_table.c.hash.type._cached_result_processor( dialect, None) is_(proc, None) dialect = psycopg2.PGDialect_psycopg2() dialect._has_native_hstore = False proc = self.test_table.c.hash.type._cached_result_processor( dialect, None) eq_( proc('"key2"=>"value2", "key1"=>"value1"'), {"key1": "value1", "key2": "value2"} ) def test_where_has_key(self): self._test_where( # hide from 2to3 getattr(self.hashcol, 'has_key')('foo'), "test_table.hash ? %(hash_1)s" ) def test_where_has_all(self): self._test_where( self.hashcol.has_all(postgresql.array(['1', '2'])), "test_table.hash ?& ARRAY[%(param_1)s, %(param_2)s]" ) def test_where_has_any(self): self._test_where( self.hashcol.has_any(postgresql.array(['1', '2'])), "test_table.hash ?| ARRAY[%(param_1)s, %(param_2)s]" ) def test_where_defined(self): self._test_where( self.hashcol.defined('foo'), "defined(test_table.hash, %(param_1)s)" ) def test_where_contains(self): self._test_where( self.hashcol.contains({'foo': '1'}), "test_table.hash @> %(hash_1)s" ) def test_where_contained_by(self): self._test_where( self.hashcol.contained_by({'foo': '1', 'bar': None}), "test_table.hash <@ %(hash_1)s" ) def test_where_getitem(self): self._test_where( self.hashcol['bar'] == None, "(test_table.hash -> %(hash_1)s) IS NULL" ) def test_cols_get(self): self._test_cols( self.hashcol['foo'], "test_table.hash -> %(hash_1)s AS anon_1", True ) def test_cols_delete_single_key(self): self._test_cols( self.hashcol.delete('foo'), "delete(test_table.hash, %(param_1)s) AS delete_1", True ) def test_cols_delete_array_of_keys(self): self._test_cols( self.hashcol.delete(postgresql.array(['foo', 'bar'])), ("delete(test_table.hash, ARRAY[%(param_1)s, %(param_2)s]) " "AS delete_1"), True ) def test_cols_delete_matching_pairs(self): self._test_cols( self.hashcol.delete(hstore('1', '2')), ("delete(test_table.hash, hstore(%(param_1)s, %(param_2)s)) " "AS delete_1"), True ) def test_cols_slice(self): self._test_cols( self.hashcol.slice(postgresql.array(['1', '2'])), ("slice(test_table.hash, ARRAY[%(param_1)s, %(param_2)s]) " "AS slice_1"), True ) def test_cols_hstore_pair_text(self): self._test_cols( hstore('foo', '3')['foo'], "hstore(%(param_1)s, %(param_2)s) -> %(hstore_1)s AS anon_1", False ) def test_cols_hstore_pair_array(self): self._test_cols( hstore(postgresql.array(['1', '2']), postgresql.array(['3', None]))['1'], ("hstore(ARRAY[%(param_1)s, %(param_2)s], " "ARRAY[%(param_3)s, NULL]) -> %(hstore_1)s AS anon_1"), False ) def test_cols_hstore_single_array(self): self._test_cols( hstore(postgresql.array(['1', '2', '3', None]))['3'], ("hstore(ARRAY[%(param_1)s, %(param_2)s, %(param_3)s, NULL]) " "-> %(hstore_1)s AS anon_1"), False ) def test_cols_concat(self): self._test_cols( self.hashcol.concat(hstore(cast(self.test_table.c.id, Text), '3')), ("test_table.hash || hstore(CAST(test_table.id AS TEXT), " "%(param_1)s) AS anon_1"), True ) def test_cols_concat_op(self): self._test_cols( hstore('foo', 'bar') + self.hashcol, "hstore(%(param_1)s, %(param_2)s) || test_table.hash AS anon_1", True ) def test_cols_concat_get(self): self._test_cols( (self.hashcol + self.hashcol)['foo'], "test_table.hash || test_table.hash -> %(param_1)s AS anon_1" ) def test_cols_keys(self): self._test_cols( # hide from 2to3 getattr(self.hashcol, 'keys')(), "akeys(test_table.hash) AS akeys_1", True ) def test_cols_vals(self): self._test_cols( self.hashcol.vals(), "avals(test_table.hash) AS avals_1", True ) def test_cols_array(self): self._test_cols( self.hashcol.array(), "hstore_to_array(test_table.hash) AS hstore_to_array_1", True ) def test_cols_matrix(self): self._test_cols( self.hashcol.matrix(), "hstore_to_matrix(test_table.hash) AS hstore_to_matrix_1", True ) class HStoreRoundTripTest(fixtures.TablesTest): __requires__ = 'hstore', __dialect__ = 'postgresql' @classmethod def define_tables(cls, metadata): Table('data_table', metadata, Column('id', Integer, primary_key=True), Column('name', String(30), nullable=False), Column('data', HSTORE) ) def _fixture_data(self, engine): data_table = self.tables.data_table engine.execute( data_table.insert(), {'name': 'r1', 'data': {"k1": "r1v1", "k2": "r1v2"}}, {'name': 'r2', 'data': {"k1": "r2v1", "k2": "r2v2"}}, {'name': 'r3', 'data': {"k1": "r3v1", "k2": "r3v2"}}, {'name': 'r4', 'data': {"k1": "r4v1", "k2": "r4v2"}}, {'name': 'r5', 'data': {"k1": "r5v1", "k2": "r5v2"}}, ) def _assert_data(self, compare): data = testing.db.execute( select([self.tables.data_table.c.data]). order_by(self.tables.data_table.c.name) ).fetchall() eq_([d for d, in data], compare) def _test_insert(self, engine): engine.execute( self.tables.data_table.insert(), {'name': 'r1', 'data': {"k1": "r1v1", "k2": "r1v2"}} ) self._assert_data([{"k1": "r1v1", "k2": "r1v2"}]) def _non_native_engine(self): if testing.against("postgresql+psycopg2"): engine = engines.testing_engine(options=dict(use_native_hstore=False)) else: engine = testing.db engine.connect() return engine def test_reflect(self): from sqlalchemy import inspect insp = inspect(testing.db) cols = insp.get_columns('data_table') assert isinstance(cols[2]['type'], HSTORE) @testing.only_on("postgresql+psycopg2") def test_insert_native(self): engine = testing.db self._test_insert(engine) def test_insert_python(self): engine = self._non_native_engine() self._test_insert(engine) @testing.only_on("postgresql+psycopg2") def test_criterion_native(self): engine = testing.db self._fixture_data(engine) self._test_criterion(engine) def test_criterion_python(self): engine = self._non_native_engine() self._fixture_data(engine) self._test_criterion(engine) def _test_criterion(self, engine): data_table = self.tables.data_table result = engine.execute( select([data_table.c.data]).where(data_table.c.data['k1'] == 'r3v1') ).first() eq_(result, ({'k1': 'r3v1', 'k2': 'r3v2'},)) def _test_fixed_round_trip(self, engine): s = select([ hstore( array(['key1', 'key2', 'key3']), array(['value1', 'value2', 'value3']) ) ]) eq_( engine.scalar(s), {"key1": "value1", "key2": "value2", "key3": "value3"} ) def test_fixed_round_trip_python(self): engine = self._non_native_engine() self._test_fixed_round_trip(engine) @testing.only_on("postgresql+psycopg2") def test_fixed_round_trip_native(self): engine = testing.db self._test_fixed_round_trip(engine) def _test_unicode_round_trip(self, engine): s = select([ hstore( array([util.u('réveillé'), util.u('drôle'), util.u('S’il')]), array([util.u('réveillé'), util.u('drôle'), util.u('S’il')]) ) ]) eq_( engine.scalar(s), { util.u('réveillé'): util.u('réveillé'), util.u('drôle'): util.u('drôle'), util.u('S’il'): util.u('S’il') } ) def test_unicode_round_trip_python(self): engine = self._non_native_engine() self._test_unicode_round_trip(engine) @testing.only_on("postgresql+psycopg2") def test_unicode_round_trip_native(self): engine = testing.db self._test_unicode_round_trip(engine) class _RangeTypeMixin(object): __requires__ = 'range_types', __dialect__ = 'postgresql+psycopg2' @property def extras(self): # done this way so we don't get ImportErrors with # older psycopg2 versions. from psycopg2 import extras return extras @classmethod def define_tables(cls, metadata): # no reason ranges shouldn't be primary keys, # so lets just use them as such table = Table('data_table', metadata, Column('range', cls._col_type, primary_key=True), ) cls.col = table.c.range def test_actual_type(self): eq_(str(self._col_type()), self._col_str) def test_reflect(self): from sqlalchemy import inspect insp = inspect(testing.db) cols = insp.get_columns('data_table') assert isinstance(cols[0]['type'], self._col_type) def _assert_data(self): data = testing.db.execute( select([self.tables.data_table.c.range]) ).fetchall() eq_(data, [(self._data_obj(), )]) def test_insert_obj(self): testing.db.engine.execute( self.tables.data_table.insert(), {'range': self._data_obj()} ) self._assert_data() def test_insert_text(self): testing.db.engine.execute( self.tables.data_table.insert(), {'range': self._data_str} ) self._assert_data() # operator tests def _test_clause(self, colclause, expected): dialect = postgresql.dialect() compiled = str(colclause.compile(dialect=dialect)) eq_(compiled, expected) def test_where_equal(self): self._test_clause( self.col==self._data_str, "data_table.range = %(range_1)s" ) def test_where_not_equal(self): self._test_clause( self.col!=self._data_str, "data_table.range <> %(range_1)s" ) def test_where_less_than(self): self._test_clause( self.col < self._data_str, "data_table.range < %(range_1)s" ) def test_where_greater_than(self): self._test_clause( self.col > self._data_str, "data_table.range > %(range_1)s" ) def test_where_less_than_or_equal(self): self._test_clause( self.col <= self._data_str, "data_table.range <= %(range_1)s" ) def test_where_greater_than_or_equal(self): self._test_clause( self.col >= self._data_str, "data_table.range >= %(range_1)s" ) def test_contains(self): self._test_clause( self.col.contains(self._data_str), "data_table.range @> %(range_1)s" ) def test_contained_by(self): self._test_clause( self.col.contained_by(self._data_str), "data_table.range <@ %(range_1)s" ) def test_overlaps(self): self._test_clause( self.col.overlaps(self._data_str), "data_table.range && %(range_1)s" ) def test_strictly_left_of(self): self._test_clause( self.col << self._data_str, "data_table.range << %(range_1)s" ) self._test_clause( self.col.strictly_left_of(self._data_str), "data_table.range << %(range_1)s" ) def test_strictly_right_of(self): self._test_clause( self.col >> self._data_str, "data_table.range >> %(range_1)s" ) self._test_clause( self.col.strictly_right_of(self._data_str), "data_table.range >> %(range_1)s" ) def test_not_extend_right_of(self): self._test_clause( self.col.not_extend_right_of(self._data_str), "data_table.range &< %(range_1)s" ) def test_not_extend_left_of(self): self._test_clause( self.col.not_extend_left_of(self._data_str), "data_table.range &> %(range_1)s" ) def test_adjacent_to(self): self._test_clause( self.col.adjacent_to(self._data_str), "data_table.range -|- %(range_1)s" ) def test_union(self): self._test_clause( self.col + self.col, "data_table.range + data_table.range" ) def test_union_result(self): # insert testing.db.engine.execute( self.tables.data_table.insert(), {'range': self._data_str} ) # select range = self.tables.data_table.c.range data = testing.db.execute( select([range + range]) ).fetchall() eq_(data, [(self._data_obj(), )]) def test_intersection(self): self._test_clause( self.col * self.col, "data_table.range * data_table.range" ) def test_intersection_result(self): # insert testing.db.engine.execute( self.tables.data_table.insert(), {'range': self._data_str} ) # select range = self.tables.data_table.c.range data = testing.db.execute( select([range * range]) ).fetchall() eq_(data, [(self._data_obj(), )]) def test_different(self): self._test_clause( self.col - self.col, "data_table.range - data_table.range" ) def test_difference_result(self): # insert testing.db.engine.execute( self.tables.data_table.insert(), {'range': self._data_str} ) # select range = self.tables.data_table.c.range data = testing.db.execute( select([range - range]) ).fetchall() eq_(data, [(self._data_obj().__class__(empty=True), )]) class Int4RangeTests(_RangeTypeMixin, fixtures.TablesTest): _col_type = INT4RANGE _col_str = 'INT4RANGE' _data_str = '[1,2)' def _data_obj(self): return self.extras.NumericRange(1, 2) class Int8RangeTests(_RangeTypeMixin, fixtures.TablesTest): _col_type = INT8RANGE _col_str = 'INT8RANGE' _data_str = '[9223372036854775806,9223372036854775807)' def _data_obj(self): return self.extras.NumericRange( 9223372036854775806, 9223372036854775807 ) class NumRangeTests(_RangeTypeMixin, fixtures.TablesTest): _col_type = NUMRANGE _col_str = 'NUMRANGE' _data_str = '[1.0,2.0)' def _data_obj(self): return self.extras.NumericRange( decimal.Decimal('1.0'), decimal.Decimal('2.0') ) class DateRangeTests(_RangeTypeMixin, fixtures.TablesTest): _col_type = DATERANGE _col_str = 'DATERANGE' _data_str = '[2013-03-23,2013-03-24)' def _data_obj(self): return self.extras.DateRange( datetime.date(2013, 3, 23), datetime.date(2013, 3, 24) ) class DateTimeRangeTests(_RangeTypeMixin, fixtures.TablesTest): _col_type = TSRANGE _col_str = 'TSRANGE' _data_str = '[2013-03-23 14:30,2013-03-23 23:30)' def _data_obj(self): return self.extras.DateTimeRange( datetime.datetime(2013, 3, 23, 14, 30), datetime.datetime(2013, 3, 23, 23, 30) ) class DateTimeTZRangeTests(_RangeTypeMixin, fixtures.TablesTest): _col_type = TSTZRANGE _col_str = 'TSTZRANGE' # make sure we use one, steady timestamp with timezone pair # for all parts of all these tests _tstzs = None def tstzs(self): if self._tstzs is None: lower = testing.db.connect().scalar( func.current_timestamp().select() ) upper = lower+datetime.timedelta(1) self._tstzs = (lower, upper) return self._tstzs @property def _data_str(self): return '[%s,%s)' % self.tstzs() def _data_obj(self): return self.extras.DateTimeTZRange(*self.tstzs())