diff options
Diffstat (limited to 'test/dialect/postgresql')
-rw-r--r-- | test/dialect/postgresql/test_compiler.py | 21 | ||||
-rw-r--r-- | test/dialect/postgresql/test_dialect.py | 88 |
2 files changed, 108 insertions, 1 deletions
diff --git a/test/dialect/postgresql/test_compiler.py b/test/dialect/postgresql/test_compiler.py index dad7ccd3c..a031c3df9 100644 --- a/test/dialect/postgresql/test_compiler.py +++ b/test/dialect/postgresql/test_compiler.py @@ -820,13 +820,14 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): where="room > 100", deferrable=True, initially="immediate", + ops={"room": "my_opclass"}, ) tbl.append_constraint(cons) self.assert_compile( schema.AddConstraint(cons), "ALTER TABLE testtbl ADD CONSTRAINT my_name " "EXCLUDE USING gist " - "(room WITH =, during WITH " + "(room my_opclass WITH =, during WITH " "&&) WHERE " "(room > 100) DEFERRABLE INITIALLY immediate", dialect=postgresql.dialect(), @@ -935,6 +936,24 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): dialect=postgresql.dialect(), ) + def test_exclude_constraint_ops_many(self): + m = MetaData() + tbl = Table( + "testtbl", m, Column("room", String), Column("during", TSRANGE) + ) + cons = ExcludeConstraint( + ("room", "="), + ("during", "&&"), + ops={"room": "first_opsclass", "during": "second_opclass"}, + ) + tbl.append_constraint(cons) + self.assert_compile( + schema.AddConstraint(cons), + "ALTER TABLE testtbl ADD EXCLUDE USING gist " + "(room first_opsclass WITH =, during second_opclass WITH &&)", + dialect=postgresql.dialect(), + ) + def test_substring(self): self.assert_compile( func.substring("abc", 1, 2), diff --git a/test/dialect/postgresql/test_dialect.py b/test/dialect/postgresql/test_dialect.py index d1e9c2e6d..5cea604d6 100644 --- a/test/dialect/postgresql/test_dialect.py +++ b/test/dialect/postgresql/test_dialect.py @@ -39,6 +39,7 @@ from sqlalchemy.engine import url from sqlalchemy.testing import engines from sqlalchemy.testing import fixtures from sqlalchemy.testing import is_ +from sqlalchemy.testing import is_false from sqlalchemy.testing import is_true from sqlalchemy.testing import mock from sqlalchemy.testing.assertions import assert_raises @@ -103,6 +104,12 @@ class DialectTest(fixtures.TestBase): "(Red Hat 4.8.5-11), 64-bit", (10,), ), + ( + "PostgreSQL 8.0.2 on i686-pc-linux-gnu, compiled by GCC gcc " + "(GCC) 3.4.2 20041017 (Red Hat 3.4.2-6.fc3), " + "Redshift 1.0.12103", + (8, 0, 2), + ), ]: eq_(dialect._get_server_version_info(mock_conn(string)), version) @@ -764,6 +771,87 @@ class MiscBackendTest( ".".join(str(x) for x in v) ) + @testing.combinations( + ((8, 1), False, False), + ((8, 1), None, False), + ((11, 5), True, False), + ((11, 5), False, True), + ) + def test_backslash_escapes_detection( + self, version, explicit_setting, expected + ): + engine = engines.testing_engine() + + def _server_version(conn): + return version + + if explicit_setting is not None: + + @event.listens_for(engine, "connect", insert=True) + @event.listens_for(engine, "first_connect", insert=True) + def connect(dbapi_connection, connection_record): + cursor = dbapi_connection.cursor() + cursor.execute( + "SET SESSION standard_conforming_strings = %s" + % ("off" if not explicit_setting else "on") + ) + dbapi_connection.commit() + + with mock.patch.object( + engine.dialect, "_get_server_version_info", _server_version + ): + with engine.connect(): + eq_(engine.dialect._backslash_escapes, expected) + + def test_dbapi_autocommit_attribute(self): + """all the supported DBAPIs have an .autocommit attribute. make + sure it works and preserves isolation level. + + This is added in particular to support the asyncpg dialect that + has a DBAPI compatibility layer. + + """ + + with testing.db.connect().execution_options( + isolation_level="SERIALIZABLE" + ) as conn: + dbapi_conn = conn.connection.connection + + is_false(dbapi_conn.autocommit) + + with conn.begin(): + + existing_isolation = conn.exec_driver_sql( + "show transaction isolation level" + ).scalar() + eq_(existing_isolation.upper(), "SERIALIZABLE") + + txid1 = conn.exec_driver_sql("select txid_current()").scalar() + txid2 = conn.exec_driver_sql("select txid_current()").scalar() + eq_(txid1, txid2) + + dbapi_conn.autocommit = True + + with conn.begin(): + # magic way to see if we are in autocommit mode from + # the server's perspective + txid1 = conn.exec_driver_sql("select txid_current()").scalar() + txid2 = conn.exec_driver_sql("select txid_current()").scalar() + ne_(txid1, txid2) + + dbapi_conn.autocommit = False + + with conn.begin(): + + existing_isolation = conn.exec_driver_sql( + "show transaction isolation level" + ).scalar() + eq_(existing_isolation.upper(), "SERIALIZABLE") + + txid1 = conn.exec_driver_sql("select txid_current()").scalar() + txid2 = conn.exec_driver_sql("select txid_current()").scalar() + eq_(txid1, txid2) + def test_readonly_flag_connection(self): with testing.db.connect() as conn: # asyncpg requires serializable for readonly.. |