summaryrefslogtreecommitdiff
path: root/tests/test_connection.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_connection.py')
-rwxr-xr-xtests/test_connection.py404
1 files changed, 311 insertions, 93 deletions
diff --git a/tests/test_connection.py b/tests/test_connection.py
index 703d8f1..a9525b4 100755
--- a/tests/test_connection.py
+++ b/tests/test_connection.py
@@ -89,13 +89,26 @@ class ConnectionTests(ConnectingTestCase):
def test_reset(self):
conn = self.conn
- # switch isolation level, then reset
- level = conn.isolation_level
- conn.set_isolation_level(0)
- self.assertEqual(conn.isolation_level, 0)
+ # switch session characteristics
+ conn.autocommit = True
+ conn.isolation_level = 'serializable'
+ conn.readonly = True
+ if self.conn.server_version >= 90100:
+ conn.deferrable = False
+
+ self.assert_(conn.autocommit)
+ self.assertEqual(conn.isolation_level, ext.ISOLATION_LEVEL_SERIALIZABLE)
+ self.assert_(conn.readonly is True)
+ if self.conn.server_version >= 90100:
+ self.assert_(conn.deferrable is False)
+
conn.reset()
- # now the isolation level should be equal to saved one
- self.assertEqual(conn.isolation_level, level)
+ # now the session characteristics should be reverted
+ self.assert_(not conn.autocommit)
+ self.assertEqual(conn.isolation_level, ext.ISOLATION_LEVEL_DEFAULT)
+ self.assert_(conn.readonly is None)
+ if self.conn.server_version >= 90100:
+ self.assert_(conn.deferrable is None)
def test_notices(self):
conn = self.conn
@@ -222,7 +235,7 @@ class ConnectionTests(ConnectingTestCase):
self.conn.set_client_encoding("EUC_JP")
# conn.encoding is 'EUCJP' now.
cur = self.conn.cursor()
- psycopg2.extensions.register_type(psycopg2.extensions.UNICODE, cur)
+ ext.register_type(ext.UNICODE, cur)
cur.execute("select 'foo'::text;")
self.assertEqual(cur.fetchone()[0], u'foo')
@@ -308,14 +321,14 @@ class ConnectionTests(ConnectingTestCase):
# issue #210
conn = self.connect()
cur = conn.cursor(cursor_factory=None)
- self.assertEqual(type(cur), psycopg2.extensions.cursor)
+ self.assertEqual(type(cur), ext.cursor)
conn = self.connect(cursor_factory=psycopg2.extras.DictCursor)
cur = conn.cursor(cursor_factory=None)
self.assertEqual(type(cur), psycopg2.extras.DictCursor)
def test_failed_init_status(self):
- class SubConnection(psycopg2.extensions.connection):
+ class SubConnection(ext.connection):
def __init__(self, dsn):
try:
super(SubConnection, self).__init__(dsn)
@@ -488,23 +501,22 @@ class IsolationLevelsTestCase(ConnectingTestCase):
conn = self.connect()
self.assertEqual(
conn.isolation_level,
- psycopg2.extensions.ISOLATION_LEVEL_DEFAULT)
+ ext.ISOLATION_LEVEL_DEFAULT)
def test_encoding(self):
conn = self.connect()
- self.assert_(conn.encoding in psycopg2.extensions.encodings)
+ self.assert_(conn.encoding in ext.encodings)
def test_set_isolation_level(self):
conn = self.connect()
curs = conn.cursor()
levels = [
- (None, psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT),
('read uncommitted',
- psycopg2.extensions.ISOLATION_LEVEL_READ_UNCOMMITTED),
- ('read committed', psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED),
- ('repeatable read', psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ),
- ('serializable', psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE),
+ ext.ISOLATION_LEVEL_READ_UNCOMMITTED),
+ ('read committed', ext.ISOLATION_LEVEL_READ_COMMITTED),
+ ('repeatable read', ext.ISOLATION_LEVEL_REPEATABLE_READ),
+ ('serializable', ext.ISOLATION_LEVEL_SERIALIZABLE),
]
for name, level in levels:
conn.set_isolation_level(level)
@@ -512,8 +524,8 @@ class IsolationLevelsTestCase(ConnectingTestCase):
# the only values available on prehistoric PG versions
if conn.server_version < 80000:
if level in (
- psycopg2.extensions.ISOLATION_LEVEL_READ_UNCOMMITTED,
- psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ):
+ ext.ISOLATION_LEVEL_READ_UNCOMMITTED,
+ ext.ISOLATION_LEVEL_REPEATABLE_READ):
name, level = levels[levels.index((name, level)) + 1]
self.assertEqual(conn.isolation_level, level)
@@ -521,15 +533,26 @@ class IsolationLevelsTestCase(ConnectingTestCase):
curs.execute('show transaction_isolation;')
got_name = curs.fetchone()[0]
- if name is None:
- curs.execute('show transaction_isolation;')
- name = curs.fetchone()[0]
-
self.assertEqual(name, got_name)
conn.commit()
self.assertRaises(ValueError, conn.set_isolation_level, -1)
- self.assertRaises(ValueError, conn.set_isolation_level, 6)
+ self.assertRaises(ValueError, conn.set_isolation_level, 5)
+
+ def test_set_isolation_level_autocommit(self):
+ conn = self.connect()
+ curs = conn.cursor()
+
+ conn.set_isolation_level(ext.ISOLATION_LEVEL_AUTOCOMMIT)
+ self.assertEqual(conn.isolation_level, ext.ISOLATION_LEVEL_DEFAULT)
+ self.assert_(conn.autocommit)
+
+ conn.isolation_level = 'serializable'
+ self.assertEqual(conn.isolation_level, ext.ISOLATION_LEVEL_SERIALIZABLE)
+ self.assert_(conn.autocommit)
+
+ curs.execute('show transaction_isolation;')
+ self.assertEqual(curs.fetchone()[0], 'serializable')
def test_set_isolation_level_default(self):
conn = self.connect()
@@ -539,14 +562,14 @@ class IsolationLevelsTestCase(ConnectingTestCase):
curs.execute("set default_transaction_isolation to 'read committed'")
conn.autocommit = False
- conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
+ conn.set_isolation_level(ext.ISOLATION_LEVEL_SERIALIZABLE)
self.assertEqual(conn.isolation_level,
- psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
+ ext.ISOLATION_LEVEL_SERIALIZABLE)
curs.execute("show transaction_isolation")
self.assertEqual(curs.fetchone()[0], "serializable")
conn.rollback()
- conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_DEFAULT)
+ conn.set_isolation_level(ext.ISOLATION_LEVEL_DEFAULT)
curs.execute("show transaction_isolation")
self.assertEqual(curs.fetchone()[0], "read committed")
@@ -554,25 +577,45 @@ class IsolationLevelsTestCase(ConnectingTestCase):
conn = self.connect()
cur = conn.cursor()
- self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE,
+ self.assertEqual(ext.TRANSACTION_STATUS_IDLE,
conn.get_transaction_status())
cur.execute("insert into isolevel values (10);")
- self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_INTRANS,
+ self.assertEqual(ext.TRANSACTION_STATUS_INTRANS,
conn.get_transaction_status())
- # changed in psycopg 2.7
- self.assertRaises(psycopg2.ProgrammingError,
- conn.set_isolation_level,
+ conn.set_isolation_level(
psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
+ self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE,
+ conn.get_transaction_status())
+ cur.execute("select count(*) from isolevel;")
+ self.assertEqual(0, cur.fetchone()[0])
+
+ cur.execute("insert into isolevel values (10);")
self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_INTRANS,
conn.get_transaction_status())
+ conn.set_isolation_level(
+ psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
+ self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE,
+ conn.get_transaction_status())
+ cur.execute("select count(*) from isolevel;")
+ self.assertEqual(0, cur.fetchone()[0])
+
+ cur.execute("insert into isolevel values (10);")
+ self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE,
+ conn.get_transaction_status())
+ conn.set_isolation_level(
+ psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
+ self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE,
+ conn.get_transaction_status())
+ cur.execute("select count(*) from isolevel;")
+ self.assertEqual(1, cur.fetchone()[0])
self.assertEqual(conn.isolation_level,
- psycopg2.extensions.ISOLATION_LEVEL_DEFAULT)
+ psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
def test_isolation_level_autocommit(self):
cnn1 = self.connect()
cnn2 = self.connect()
- cnn2.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
+ cnn2.set_isolation_level(ext.ISOLATION_LEVEL_AUTOCOMMIT)
cur1 = cnn1.cursor()
cur1.execute("select count(*) from isolevel;")
@@ -588,7 +631,7 @@ class IsolationLevelsTestCase(ConnectingTestCase):
def test_isolation_level_read_committed(self):
cnn1 = self.connect()
cnn2 = self.connect()
- cnn2.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
+ cnn2.set_isolation_level(ext.ISOLATION_LEVEL_READ_COMMITTED)
cur1 = cnn1.cursor()
cur1.execute("select count(*) from isolevel;")
@@ -614,7 +657,7 @@ class IsolationLevelsTestCase(ConnectingTestCase):
def test_isolation_level_serializable(self):
cnn1 = self.connect()
cnn2 = self.connect()
- cnn2.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
+ cnn2.set_isolation_level(ext.ISOLATION_LEVEL_SERIALIZABLE)
cur1 = cnn1.cursor()
cur1.execute("select count(*) from isolevel;")
@@ -643,13 +686,112 @@ class IsolationLevelsTestCase(ConnectingTestCase):
def test_isolation_level_closed(self):
cnn = self.connect()
cnn.close()
- self.assertRaises(psycopg2.InterfaceError, getattr,
- cnn, 'isolation_level')
self.assertRaises(psycopg2.InterfaceError,
cnn.set_isolation_level, 0)
self.assertRaises(psycopg2.InterfaceError,
cnn.set_isolation_level, 1)
+ def test_setattr_isolation_level_int(self):
+ cur = self.conn.cursor()
+ self.conn.isolation_level = ext.ISOLATION_LEVEL_SERIALIZABLE
+ self.assertEqual(self.conn.isolation_level, ext.ISOLATION_LEVEL_SERIALIZABLE)
+
+ cur.execute("SHOW transaction_isolation;")
+ self.assertEqual(cur.fetchone()[0], 'serializable')
+ self.conn.rollback()
+
+ self.conn.isolation_level = ext.ISOLATION_LEVEL_REPEATABLE_READ
+ cur.execute("SHOW transaction_isolation;")
+ if self.conn.server_version > 80000:
+ self.assertEqual(self.conn.isolation_level,
+ ext.ISOLATION_LEVEL_REPEATABLE_READ)
+ self.assertEqual(cur.fetchone()[0], 'repeatable read')
+ else:
+ self.assertEqual(self.conn.isolation_level,
+ ext.ISOLATION_LEVEL_SERIALIZABLE)
+ self.assertEqual(cur.fetchone()[0], 'serializable')
+ self.conn.rollback()
+
+ self.conn.isolation_level = ext.ISOLATION_LEVEL_READ_COMMITTED
+ self.assertEqual(self.conn.isolation_level,
+ ext.ISOLATION_LEVEL_READ_COMMITTED)
+ cur.execute("SHOW transaction_isolation;")
+ self.assertEqual(cur.fetchone()[0], 'read committed')
+ self.conn.rollback()
+
+ self.conn.isolation_level = ext.ISOLATION_LEVEL_READ_UNCOMMITTED
+ cur.execute("SHOW transaction_isolation;")
+ if self.conn.server_version > 80000:
+ self.assertEqual(self.conn.isolation_level,
+ ext.ISOLATION_LEVEL_READ_UNCOMMITTED)
+ self.assertEqual(cur.fetchone()[0], 'read uncommitted')
+ else:
+ self.assertEqual(self.conn.isolation_level,
+ ext.ISOLATION_LEVEL_READ_COMMITTED)
+ self.assertEqual(cur.fetchone()[0], 'read committed')
+ self.conn.rollback()
+
+ self.assertEqual(ext.ISOLATION_LEVEL_DEFAULT, None)
+ self.conn.isolation_level = ext.ISOLATION_LEVEL_DEFAULT
+ self.assertEqual(self.conn.isolation_level, None)
+ cur.execute("SHOW transaction_isolation;")
+ isol = cur.fetchone()[0]
+ cur.execute("SHOW default_transaction_isolation;")
+ self.assertEqual(cur.fetchone()[0], isol)
+
+ def test_setattr_isolation_level_str(self):
+ cur = self.conn.cursor()
+ self.conn.isolation_level = "serializable"
+ self.assertEqual(self.conn.isolation_level, ext.ISOLATION_LEVEL_SERIALIZABLE)
+
+ cur.execute("SHOW transaction_isolation;")
+ self.assertEqual(cur.fetchone()[0], 'serializable')
+ self.conn.rollback()
+
+ self.conn.isolation_level = "repeatable read"
+ cur.execute("SHOW transaction_isolation;")
+ if self.conn.server_version > 80000:
+ self.assertEqual(self.conn.isolation_level,
+ ext.ISOLATION_LEVEL_REPEATABLE_READ)
+ self.assertEqual(cur.fetchone()[0], 'repeatable read')
+ else:
+ self.assertEqual(self.conn.isolation_level,
+ ext.ISOLATION_LEVEL_SERIALIZABLE)
+ self.assertEqual(cur.fetchone()[0], 'serializable')
+ self.conn.rollback()
+
+ self.conn.isolation_level = "read committed"
+ self.assertEqual(self.conn.isolation_level,
+ ext.ISOLATION_LEVEL_READ_COMMITTED)
+ cur.execute("SHOW transaction_isolation;")
+ self.assertEqual(cur.fetchone()[0], 'read committed')
+ self.conn.rollback()
+
+ self.conn.isolation_level = "read uncommitted"
+ cur.execute("SHOW transaction_isolation;")
+ if self.conn.server_version > 80000:
+ self.assertEqual(self.conn.isolation_level,
+ ext.ISOLATION_LEVEL_READ_UNCOMMITTED)
+ self.assertEqual(cur.fetchone()[0], 'read uncommitted')
+ else:
+ self.assertEqual(self.conn.isolation_level,
+ ext.ISOLATION_LEVEL_READ_COMMITTED)
+ self.assertEqual(cur.fetchone()[0], 'read committed')
+ self.conn.rollback()
+
+ self.conn.isolation_level = "default"
+ self.assertEqual(self.conn.isolation_level, None)
+ cur.execute("SHOW transaction_isolation;")
+ isol = cur.fetchone()[0]
+ cur.execute("SHOW default_transaction_isolation;")
+ self.assertEqual(cur.fetchone()[0], isol)
+
+ def test_setattr_isolation_level_invalid(self):
+ self.assertRaises(ValueError, setattr, self.conn, 'isolation_level', 0)
+ self.assertRaises(ValueError, setattr, self.conn, 'isolation_level', -1)
+ self.assertRaises(ValueError, setattr, self.conn, 'isolation_level', 5)
+ self.assertRaises(ValueError, setattr, self.conn, 'isolation_level', 'bah')
+
class ConnectionTwoPhaseTests(ConnectingTestCase):
def setUp(self):
@@ -716,10 +858,10 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
def test_tpc_commit(self):
cnn = self.connect()
xid = cnn.xid(1, "gtrid", "bqual")
- self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY)
+ self.assertEqual(cnn.status, ext.STATUS_READY)
cnn.tpc_begin(xid)
- self.assertEqual(cnn.status, psycopg2.extensions.STATUS_BEGIN)
+ self.assertEqual(cnn.status, ext.STATUS_BEGIN)
cur = cnn.cursor()
cur.execute("insert into test_tpc values ('test_tpc_commit');")
@@ -727,22 +869,22 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
self.assertEqual(0, self.count_test_records())
cnn.tpc_prepare()
- self.assertEqual(cnn.status, psycopg2.extensions.STATUS_PREPARED)
+ self.assertEqual(cnn.status, ext.STATUS_PREPARED)
self.assertEqual(1, self.count_xacts())
self.assertEqual(0, self.count_test_records())
cnn.tpc_commit()
- self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY)
+ self.assertEqual(cnn.status, ext.STATUS_READY)
self.assertEqual(0, self.count_xacts())
self.assertEqual(1, self.count_test_records())
def test_tpc_commit_one_phase(self):
cnn = self.connect()
xid = cnn.xid(1, "gtrid", "bqual")
- self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY)
+ self.assertEqual(cnn.status, ext.STATUS_READY)
cnn.tpc_begin(xid)
- self.assertEqual(cnn.status, psycopg2.extensions.STATUS_BEGIN)
+ self.assertEqual(cnn.status, ext.STATUS_BEGIN)
cur = cnn.cursor()
cur.execute("insert into test_tpc values ('test_tpc_commit_1p');")
@@ -750,17 +892,17 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
self.assertEqual(0, self.count_test_records())
cnn.tpc_commit()
- self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY)
+ self.assertEqual(cnn.status, ext.STATUS_READY)
self.assertEqual(0, self.count_xacts())
self.assertEqual(1, self.count_test_records())
def test_tpc_commit_recovered(self):
cnn = self.connect()
xid = cnn.xid(1, "gtrid", "bqual")
- self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY)
+ self.assertEqual(cnn.status, ext.STATUS_READY)
cnn.tpc_begin(xid)
- self.assertEqual(cnn.status, psycopg2.extensions.STATUS_BEGIN)
+ self.assertEqual(cnn.status, ext.STATUS_BEGIN)
cur = cnn.cursor()
cur.execute("insert into test_tpc values ('test_tpc_commit_rec');")
@@ -776,17 +918,17 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
xid = cnn.xid(1, "gtrid", "bqual")
cnn.tpc_commit(xid)
- self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY)
+ self.assertEqual(cnn.status, ext.STATUS_READY)
self.assertEqual(0, self.count_xacts())
self.assertEqual(1, self.count_test_records())
def test_tpc_rollback(self):
cnn = self.connect()
xid = cnn.xid(1, "gtrid", "bqual")
- self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY)
+ self.assertEqual(cnn.status, ext.STATUS_READY)
cnn.tpc_begin(xid)
- self.assertEqual(cnn.status, psycopg2.extensions.STATUS_BEGIN)
+ self.assertEqual(cnn.status, ext.STATUS_BEGIN)
cur = cnn.cursor()
cur.execute("insert into test_tpc values ('test_tpc_rollback');")
@@ -794,22 +936,22 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
self.assertEqual(0, self.count_test_records())
cnn.tpc_prepare()
- self.assertEqual(cnn.status, psycopg2.extensions.STATUS_PREPARED)
+ self.assertEqual(cnn.status, ext.STATUS_PREPARED)
self.assertEqual(1, self.count_xacts())
self.assertEqual(0, self.count_test_records())
cnn.tpc_rollback()
- self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY)
+ self.assertEqual(cnn.status, ext.STATUS_READY)
self.assertEqual(0, self.count_xacts())
self.assertEqual(0, self.count_test_records())
def test_tpc_rollback_one_phase(self):
cnn = self.connect()
xid = cnn.xid(1, "gtrid", "bqual")
- self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY)
+ self.assertEqual(cnn.status, ext.STATUS_READY)
cnn.tpc_begin(xid)
- self.assertEqual(cnn.status, psycopg2.extensions.STATUS_BEGIN)
+ self.assertEqual(cnn.status, ext.STATUS_BEGIN)
cur = cnn.cursor()
cur.execute("insert into test_tpc values ('test_tpc_rollback_1p');")
@@ -817,17 +959,17 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
self.assertEqual(0, self.count_test_records())
cnn.tpc_rollback()
- self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY)
+ self.assertEqual(cnn.status, ext.STATUS_READY)
self.assertEqual(0, self.count_xacts())
self.assertEqual(0, self.count_test_records())
def test_tpc_rollback_recovered(self):
cnn = self.connect()
xid = cnn.xid(1, "gtrid", "bqual")
- self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY)
+ self.assertEqual(cnn.status, ext.STATUS_READY)
cnn.tpc_begin(xid)
- self.assertEqual(cnn.status, psycopg2.extensions.STATUS_BEGIN)
+ self.assertEqual(cnn.status, ext.STATUS_BEGIN)
cur = cnn.cursor()
cur.execute("insert into test_tpc values ('test_tpc_commit_rec');")
@@ -843,21 +985,21 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
xid = cnn.xid(1, "gtrid", "bqual")
cnn.tpc_rollback(xid)
- self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY)
+ self.assertEqual(cnn.status, ext.STATUS_READY)
self.assertEqual(0, self.count_xacts())
self.assertEqual(0, self.count_test_records())
def test_status_after_recover(self):
cnn = self.connect()
- self.assertEqual(psycopg2.extensions.STATUS_READY, cnn.status)
+ self.assertEqual(ext.STATUS_READY, cnn.status)
cnn.tpc_recover()
- self.assertEqual(psycopg2.extensions.STATUS_READY, cnn.status)
+ self.assertEqual(ext.STATUS_READY, cnn.status)
cur = cnn.cursor()
cur.execute("select 1")
- self.assertEqual(psycopg2.extensions.STATUS_BEGIN, cnn.status)
+ self.assertEqual(ext.STATUS_BEGIN, cnn.status)
cnn.tpc_recover()
- self.assertEqual(psycopg2.extensions.STATUS_BEGIN, cnn.status)
+ self.assertEqual(ext.STATUS_BEGIN, cnn.status)
def test_recovered_xids(self):
# insert a few test xns
@@ -1030,25 +1172,25 @@ class TransactionControlTests(ConnectingTestCase):
self.conn.close()
self.assertRaises(psycopg2.InterfaceError,
self.conn.set_session,
- psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
+ ext.ISOLATION_LEVEL_SERIALIZABLE)
def test_not_in_transaction(self):
cur = self.conn.cursor()
cur.execute("select 1")
self.assertRaises(psycopg2.ProgrammingError,
self.conn.set_session,
- psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
+ ext.ISOLATION_LEVEL_SERIALIZABLE)
def test_set_isolation_level(self):
cur = self.conn.cursor()
self.conn.set_session(
- psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
+ ext.ISOLATION_LEVEL_SERIALIZABLE)
cur.execute("SHOW transaction_isolation;")
self.assertEqual(cur.fetchone()[0], 'serializable')
self.conn.rollback()
self.conn.set_session(
- psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ)
+ ext.ISOLATION_LEVEL_REPEATABLE_READ)
cur.execute("SHOW transaction_isolation;")
if self.conn.server_version > 80000:
self.assertEqual(cur.fetchone()[0], 'repeatable read')
@@ -1057,13 +1199,13 @@ class TransactionControlTests(ConnectingTestCase):
self.conn.rollback()
self.conn.set_session(
- isolation_level=psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
+ isolation_level=ext.ISOLATION_LEVEL_READ_COMMITTED)
cur.execute("SHOW transaction_isolation;")
self.assertEqual(cur.fetchone()[0], 'read committed')
self.conn.rollback()
self.conn.set_session(
- isolation_level=psycopg2.extensions.ISOLATION_LEVEL_READ_UNCOMMITTED)
+ isolation_level=ext.ISOLATION_LEVEL_READ_UNCOMMITTED)
cur.execute("SHOW transaction_isolation;")
if self.conn.server_version > 80000:
self.assertEqual(cur.fetchone()[0], 'read uncommitted')
@@ -1105,8 +1247,11 @@ class TransactionControlTests(ConnectingTestCase):
self.assertRaises(ValueError, self.conn.set_session, 'whatever')
def test_set_read_only(self):
+ self.assert_(self.conn.readonly is None)
+
cur = self.conn.cursor()
self.conn.set_session(readonly=True)
+ self.assert_(self.conn.readonly is True)
cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on')
self.conn.rollback()
@@ -1114,13 +1259,35 @@ class TransactionControlTests(ConnectingTestCase):
self.assertEqual(cur.fetchone()[0], 'on')
self.conn.rollback()
+ self.conn.set_session(readonly=False)
+ self.assert_(self.conn.readonly is False)
+ cur.execute("SHOW transaction_read_only;")
+ self.assertEqual(cur.fetchone()[0], 'off')
+ self.conn.rollback()
+
+ def test_setattr_read_only(self):
cur = self.conn.cursor()
- self.conn.set_session(readonly=None)
+ self.conn.readonly = True
+ self.assert_(self.conn.readonly is True)
+ cur.execute("SHOW transaction_read_only;")
+ self.assertEqual(cur.fetchone()[0], 'on')
+ self.assertRaises(self.conn.ProgrammingError,
+ setattr, self.conn, 'readonly', False)
+ self.assert_(self.conn.readonly is True)
+ self.conn.rollback()
cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on')
self.conn.rollback()
- self.conn.set_session(readonly=False)
+ cur = self.conn.cursor()
+ self.conn.readonly = None
+ self.assert_(self.conn.readonly is None)
+ cur.execute("SHOW transaction_read_only;")
+ self.assertEqual(cur.fetchone()[0], 'off') # assume defined by server
+ self.conn.rollback()
+
+ self.conn.readonly = False
+ self.assert_(self.conn.readonly is False)
cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'off')
self.conn.rollback()
@@ -1143,8 +1310,10 @@ class TransactionControlTests(ConnectingTestCase):
@skip_before_postgres(9, 1)
def test_set_deferrable(self):
+ self.assert_(self.conn.deferrable is None)
cur = self.conn.cursor()
self.conn.set_session(readonly=True, deferrable=True)
+ self.assert_(self.conn.deferrable is True)
cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on')
cur.execute("SHOW transaction_deferrable;")
@@ -1155,6 +1324,7 @@ class TransactionControlTests(ConnectingTestCase):
self.conn.rollback()
self.conn.set_session(deferrable=False)
+ self.assert_(self.conn.deferrable is False)
cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on')
cur.execute("SHOW transaction_deferrable;")
@@ -1165,6 +1335,54 @@ class TransactionControlTests(ConnectingTestCase):
def test_set_deferrable_error(self):
self.assertRaises(psycopg2.ProgrammingError,
self.conn.set_session, readonly=True, deferrable=True)
+ self.assertRaises(psycopg2.ProgrammingError,
+ setattr, self.conn, 'deferrable', True)
+
+ @skip_before_postgres(9, 1)
+ def test_setattr_deferrable(self):
+ cur = self.conn.cursor()
+ self.conn.deferrable = True
+ self.assert_(self.conn.deferrable is True)
+ cur.execute("SHOW transaction_deferrable;")
+ self.assertEqual(cur.fetchone()[0], 'on')
+ self.assertRaises(self.conn.ProgrammingError,
+ setattr, self.conn, 'deferrable', False)
+ self.assert_(self.conn.deferrable is True)
+ self.conn.rollback()
+ cur.execute("SHOW transaction_deferrable;")
+ self.assertEqual(cur.fetchone()[0], 'on')
+ self.conn.rollback()
+
+ cur = self.conn.cursor()
+ self.conn.deferrable = None
+ self.assert_(self.conn.deferrable is None)
+ cur.execute("SHOW transaction_deferrable;")
+ self.assertEqual(cur.fetchone()[0], 'off') # assume defined by server
+ self.conn.rollback()
+
+ self.conn.deferrable = False
+ self.assert_(self.conn.deferrable is False)
+ cur.execute("SHOW transaction_deferrable;")
+ self.assertEqual(cur.fetchone()[0], 'off')
+ self.conn.rollback()
+
+ def test_mixing_session_attribs(self):
+ cur = self.conn.cursor()
+ self.conn.autocommit = True
+ self.conn.readonly = True
+
+ cur.execute("SHOW transaction_read_only;")
+ self.assertEqual(cur.fetchone()[0], 'on')
+
+ cur.execute("SHOW default_transaction_read_only;")
+ self.assertEqual(cur.fetchone()[0], 'on')
+
+ self.conn.autocommit = False
+ cur.execute("SHOW transaction_read_only;")
+ self.assertEqual(cur.fetchone()[0], 'on')
+
+ cur.execute("SHOW default_transaction_read_only;")
+ self.assertEqual(cur.fetchone()[0], 'off')
class AutocommitTests(ConnectingTestCase):
@@ -1183,44 +1401,44 @@ class AutocommitTests(ConnectingTestCase):
def test_default_no_autocommit(self):
self.assert_(not self.conn.autocommit)
- self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY)
+ self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(),
- psycopg2.extensions.TRANSACTION_STATUS_IDLE)
+ ext.TRANSACTION_STATUS_IDLE)
cur = self.conn.cursor()
cur.execute('select 1;')
- self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_BEGIN)
+ self.assertEqual(self.conn.status, ext.STATUS_BEGIN)
self.assertEqual(self.conn.get_transaction_status(),
- psycopg2.extensions.TRANSACTION_STATUS_INTRANS)
+ ext.TRANSACTION_STATUS_INTRANS)
self.conn.rollback()
- self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY)
+ self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(),
- psycopg2.extensions.TRANSACTION_STATUS_IDLE)
+ ext.TRANSACTION_STATUS_IDLE)
def test_set_autocommit(self):
self.conn.autocommit = True
self.assert_(self.conn.autocommit)
- self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY)
+ self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(),
- psycopg2.extensions.TRANSACTION_STATUS_IDLE)
+ ext.TRANSACTION_STATUS_IDLE)
cur = self.conn.cursor()
cur.execute('select 1;')
- self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY)
+ self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(),
- psycopg2.extensions.TRANSACTION_STATUS_IDLE)
+ ext.TRANSACTION_STATUS_IDLE)
self.conn.autocommit = False
self.assert_(not self.conn.autocommit)
- self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY)
+ self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(),
- psycopg2.extensions.TRANSACTION_STATUS_IDLE)
+ ext.TRANSACTION_STATUS_IDLE)
cur.execute('select 1;')
- self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_BEGIN)
+ self.assertEqual(self.conn.status, ext.STATUS_BEGIN)
self.assertEqual(self.conn.get_transaction_status(),
- psycopg2.extensions.TRANSACTION_STATUS_INTRANS)
+ ext.TRANSACTION_STATUS_INTRANS)
def test_set_intrans_error(self):
cur = self.conn.cursor()
@@ -1231,34 +1449,34 @@ class AutocommitTests(ConnectingTestCase):
def test_set_session_autocommit(self):
self.conn.set_session(autocommit=True)
self.assert_(self.conn.autocommit)
- self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY)
+ self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(),
- psycopg2.extensions.TRANSACTION_STATUS_IDLE)
+ ext.TRANSACTION_STATUS_IDLE)
cur = self.conn.cursor()
cur.execute('select 1;')
- self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY)
+ self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(),
- psycopg2.extensions.TRANSACTION_STATUS_IDLE)
+ ext.TRANSACTION_STATUS_IDLE)
self.conn.set_session(autocommit=False)
self.assert_(not self.conn.autocommit)
- self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY)
+ self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(),
- psycopg2.extensions.TRANSACTION_STATUS_IDLE)
+ ext.TRANSACTION_STATUS_IDLE)
cur.execute('select 1;')
- self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_BEGIN)
+ self.assertEqual(self.conn.status, ext.STATUS_BEGIN)
self.assertEqual(self.conn.get_transaction_status(),
- psycopg2.extensions.TRANSACTION_STATUS_INTRANS)
+ ext.TRANSACTION_STATUS_INTRANS)
self.conn.rollback()
self.conn.set_session('serializable', readonly=True, autocommit=True)
self.assert_(self.conn.autocommit)
cur.execute('select 1;')
- self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY)
+ self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(),
- psycopg2.extensions.TRANSACTION_STATUS_IDLE)
+ ext.TRANSACTION_STATUS_IDLE)
cur.execute("SHOW transaction_isolation;")
self.assertEqual(cur.fetchone()[0], 'serializable')
cur.execute("SHOW transaction_read_only;")