summaryrefslogtreecommitdiff
path: root/test/dialect/postgresql
diff options
context:
space:
mode:
Diffstat (limited to 'test/dialect/postgresql')
-rw-r--r--test/dialect/postgresql/test_compiler.py21
-rw-r--r--test/dialect/postgresql/test_dialect.py88
2 files changed, 108 insertions, 1 deletions
diff --git a/test/dialect/postgresql/test_compiler.py b/test/dialect/postgresql/test_compiler.py
index dad7ccd3c..a031c3df9 100644
--- a/test/dialect/postgresql/test_compiler.py
+++ b/test/dialect/postgresql/test_compiler.py
@@ -820,13 +820,14 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
where="room > 100",
deferrable=True,
initially="immediate",
+ ops={"room": "my_opclass"},
)
tbl.append_constraint(cons)
self.assert_compile(
schema.AddConstraint(cons),
"ALTER TABLE testtbl ADD CONSTRAINT my_name "
"EXCLUDE USING gist "
- "(room WITH =, during WITH "
+ "(room my_opclass WITH =, during WITH "
"&&) WHERE "
"(room > 100) DEFERRABLE INITIALLY immediate",
dialect=postgresql.dialect(),
@@ -935,6 +936,24 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
dialect=postgresql.dialect(),
)
+ def test_exclude_constraint_ops_many(self):
+ m = MetaData()
+ tbl = Table(
+ "testtbl", m, Column("room", String), Column("during", TSRANGE)
+ )
+ cons = ExcludeConstraint(
+ ("room", "="),
+ ("during", "&&"),
+ ops={"room": "first_opsclass", "during": "second_opclass"},
+ )
+ tbl.append_constraint(cons)
+ self.assert_compile(
+ schema.AddConstraint(cons),
+ "ALTER TABLE testtbl ADD EXCLUDE USING gist "
+ "(room first_opsclass WITH =, during second_opclass WITH &&)",
+ dialect=postgresql.dialect(),
+ )
+
def test_substring(self):
self.assert_compile(
func.substring("abc", 1, 2),
diff --git a/test/dialect/postgresql/test_dialect.py b/test/dialect/postgresql/test_dialect.py
index d1e9c2e6d..5cea604d6 100644
--- a/test/dialect/postgresql/test_dialect.py
+++ b/test/dialect/postgresql/test_dialect.py
@@ -39,6 +39,7 @@ from sqlalchemy.engine import url
from sqlalchemy.testing import engines
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
+from sqlalchemy.testing import is_false
from sqlalchemy.testing import is_true
from sqlalchemy.testing import mock
from sqlalchemy.testing.assertions import assert_raises
@@ -103,6 +104,12 @@ class DialectTest(fixtures.TestBase):
"(Red Hat 4.8.5-11), 64-bit",
(10,),
),
+ (
+ "PostgreSQL 8.0.2 on i686-pc-linux-gnu, compiled by GCC gcc "
+ "(GCC) 3.4.2 20041017 (Red Hat 3.4.2-6.fc3), "
+ "Redshift 1.0.12103",
+ (8, 0, 2),
+ ),
]:
eq_(dialect._get_server_version_info(mock_conn(string)), version)
@@ -764,6 +771,87 @@ class MiscBackendTest(
".".join(str(x) for x in v)
)
+ @testing.combinations(
+ ((8, 1), False, False),
+ ((8, 1), None, False),
+ ((11, 5), True, False),
+ ((11, 5), False, True),
+ )
+ def test_backslash_escapes_detection(
+ self, version, explicit_setting, expected
+ ):
+ engine = engines.testing_engine()
+
+ def _server_version(conn):
+ return version
+
+ if explicit_setting is not None:
+
+ @event.listens_for(engine, "connect", insert=True)
+ @event.listens_for(engine, "first_connect", insert=True)
+ def connect(dbapi_connection, connection_record):
+ cursor = dbapi_connection.cursor()
+ cursor.execute(
+ "SET SESSION standard_conforming_strings = %s"
+ % ("off" if not explicit_setting else "on")
+ )
+ dbapi_connection.commit()
+
+ with mock.patch.object(
+ engine.dialect, "_get_server_version_info", _server_version
+ ):
+ with engine.connect():
+ eq_(engine.dialect._backslash_escapes, expected)
+
+ def test_dbapi_autocommit_attribute(self):
+ """all the supported DBAPIs have an .autocommit attribute. make
+ sure it works and preserves isolation level.
+
+ This is added in particular to support the asyncpg dialect that
+ has a DBAPI compatibility layer.
+
+ """
+
+ with testing.db.connect().execution_options(
+ isolation_level="SERIALIZABLE"
+ ) as conn:
+ dbapi_conn = conn.connection.connection
+
+ is_false(dbapi_conn.autocommit)
+
+ with conn.begin():
+
+ existing_isolation = conn.exec_driver_sql(
+ "show transaction isolation level"
+ ).scalar()
+ eq_(existing_isolation.upper(), "SERIALIZABLE")
+
+ txid1 = conn.exec_driver_sql("select txid_current()").scalar()
+ txid2 = conn.exec_driver_sql("select txid_current()").scalar()
+ eq_(txid1, txid2)
+
+ dbapi_conn.autocommit = True
+
+ with conn.begin():
+ # magic way to see if we are in autocommit mode from
+ # the server's perspective
+ txid1 = conn.exec_driver_sql("select txid_current()").scalar()
+ txid2 = conn.exec_driver_sql("select txid_current()").scalar()
+ ne_(txid1, txid2)
+
+ dbapi_conn.autocommit = False
+
+ with conn.begin():
+
+ existing_isolation = conn.exec_driver_sql(
+ "show transaction isolation level"
+ ).scalar()
+ eq_(existing_isolation.upper(), "SERIALIZABLE")
+
+ txid1 = conn.exec_driver_sql("select txid_current()").scalar()
+ txid2 = conn.exec_driver_sql("select txid_current()").scalar()
+ eq_(txid1, txid2)
+
def test_readonly_flag_connection(self):
with testing.db.connect() as conn:
# asyncpg requires serializable for readonly..