summaryrefslogtreecommitdiff
path: root/test/dialect/postgresql/test_dialect.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/dialect/postgresql/test_dialect.py')
-rw-r--r--test/dialect/postgresql/test_dialect.py88
1 files changed, 88 insertions, 0 deletions
diff --git a/test/dialect/postgresql/test_dialect.py b/test/dialect/postgresql/test_dialect.py
index d1e9c2e6d..5cea604d6 100644
--- a/test/dialect/postgresql/test_dialect.py
+++ b/test/dialect/postgresql/test_dialect.py
@@ -39,6 +39,7 @@ from sqlalchemy.engine import url
from sqlalchemy.testing import engines
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
+from sqlalchemy.testing import is_false
from sqlalchemy.testing import is_true
from sqlalchemy.testing import mock
from sqlalchemy.testing.assertions import assert_raises
@@ -103,6 +104,12 @@ class DialectTest(fixtures.TestBase):
"(Red Hat 4.8.5-11), 64-bit",
(10,),
),
+ (
+ "PostgreSQL 8.0.2 on i686-pc-linux-gnu, compiled by GCC gcc "
+ "(GCC) 3.4.2 20041017 (Red Hat 3.4.2-6.fc3), "
+ "Redshift 1.0.12103",
+ (8, 0, 2),
+ ),
]:
eq_(dialect._get_server_version_info(mock_conn(string)), version)
@@ -764,6 +771,87 @@ class MiscBackendTest(
".".join(str(x) for x in v)
)
+ @testing.combinations(
+ ((8, 1), False, False),
+ ((8, 1), None, False),
+ ((11, 5), True, False),
+ ((11, 5), False, True),
+ )
+ def test_backslash_escapes_detection(
+ self, version, explicit_setting, expected
+ ):
+ engine = engines.testing_engine()
+
+ def _server_version(conn):
+ return version
+
+ if explicit_setting is not None:
+
+ @event.listens_for(engine, "connect", insert=True)
+ @event.listens_for(engine, "first_connect", insert=True)
+ def connect(dbapi_connection, connection_record):
+ cursor = dbapi_connection.cursor()
+ cursor.execute(
+ "SET SESSION standard_conforming_strings = %s"
+ % ("off" if not explicit_setting else "on")
+ )
+ dbapi_connection.commit()
+
+ with mock.patch.object(
+ engine.dialect, "_get_server_version_info", _server_version
+ ):
+ with engine.connect():
+ eq_(engine.dialect._backslash_escapes, expected)
+
+ def test_dbapi_autocommit_attribute(self):
+ """all the supported DBAPIs have an .autocommit attribute. make
+ sure it works and preserves isolation level.
+
+ This is added in particular to support the asyncpg dialect that
+ has a DBAPI compatibility layer.
+
+ """
+
+ with testing.db.connect().execution_options(
+ isolation_level="SERIALIZABLE"
+ ) as conn:
+ dbapi_conn = conn.connection.connection
+
+ is_false(dbapi_conn.autocommit)
+
+ with conn.begin():
+
+ existing_isolation = conn.exec_driver_sql(
+ "show transaction isolation level"
+ ).scalar()
+ eq_(existing_isolation.upper(), "SERIALIZABLE")
+
+ txid1 = conn.exec_driver_sql("select txid_current()").scalar()
+ txid2 = conn.exec_driver_sql("select txid_current()").scalar()
+ eq_(txid1, txid2)
+
+ dbapi_conn.autocommit = True
+
+ with conn.begin():
+ # magic way to see if we are in autocommit mode from
+ # the server's perspective
+ txid1 = conn.exec_driver_sql("select txid_current()").scalar()
+ txid2 = conn.exec_driver_sql("select txid_current()").scalar()
+ ne_(txid1, txid2)
+
+ dbapi_conn.autocommit = False
+
+ with conn.begin():
+
+ existing_isolation = conn.exec_driver_sql(
+ "show transaction isolation level"
+ ).scalar()
+ eq_(existing_isolation.upper(), "SERIALIZABLE")
+
+ txid1 = conn.exec_driver_sql("select txid_current()").scalar()
+ txid2 = conn.exec_driver_sql("select txid_current()").scalar()
+ eq_(txid1, txid2)
+
def test_readonly_flag_connection(self):
with testing.db.connect() as conn:
# asyncpg requires serializable for readonly..