diff options
Diffstat (limited to 'tests/test_connection.py')
-rwxr-xr-x | tests/test_connection.py | 404 |
1 files changed, 311 insertions, 93 deletions
diff --git a/tests/test_connection.py b/tests/test_connection.py index 703d8f1..a9525b4 100755 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -89,13 +89,26 @@ class ConnectionTests(ConnectingTestCase): def test_reset(self): conn = self.conn - # switch isolation level, then reset - level = conn.isolation_level - conn.set_isolation_level(0) - self.assertEqual(conn.isolation_level, 0) + # switch session characteristics + conn.autocommit = True + conn.isolation_level = 'serializable' + conn.readonly = True + if self.conn.server_version >= 90100: + conn.deferrable = False + + self.assert_(conn.autocommit) + self.assertEqual(conn.isolation_level, ext.ISOLATION_LEVEL_SERIALIZABLE) + self.assert_(conn.readonly is True) + if self.conn.server_version >= 90100: + self.assert_(conn.deferrable is False) + conn.reset() - # now the isolation level should be equal to saved one - self.assertEqual(conn.isolation_level, level) + # now the session characteristics should be reverted + self.assert_(not conn.autocommit) + self.assertEqual(conn.isolation_level, ext.ISOLATION_LEVEL_DEFAULT) + self.assert_(conn.readonly is None) + if self.conn.server_version >= 90100: + self.assert_(conn.deferrable is None) def test_notices(self): conn = self.conn @@ -222,7 +235,7 @@ class ConnectionTests(ConnectingTestCase): self.conn.set_client_encoding("EUC_JP") # conn.encoding is 'EUCJP' now. cur = self.conn.cursor() - psycopg2.extensions.register_type(psycopg2.extensions.UNICODE, cur) + ext.register_type(ext.UNICODE, cur) cur.execute("select 'foo'::text;") self.assertEqual(cur.fetchone()[0], u'foo') @@ -308,14 +321,14 @@ class ConnectionTests(ConnectingTestCase): # issue #210 conn = self.connect() cur = conn.cursor(cursor_factory=None) - self.assertEqual(type(cur), psycopg2.extensions.cursor) + self.assertEqual(type(cur), ext.cursor) conn = self.connect(cursor_factory=psycopg2.extras.DictCursor) cur = conn.cursor(cursor_factory=None) self.assertEqual(type(cur), psycopg2.extras.DictCursor) def test_failed_init_status(self): - class SubConnection(psycopg2.extensions.connection): + class SubConnection(ext.connection): def __init__(self, dsn): try: super(SubConnection, self).__init__(dsn) @@ -488,23 +501,22 @@ class IsolationLevelsTestCase(ConnectingTestCase): conn = self.connect() self.assertEqual( conn.isolation_level, - psycopg2.extensions.ISOLATION_LEVEL_DEFAULT) + ext.ISOLATION_LEVEL_DEFAULT) def test_encoding(self): conn = self.connect() - self.assert_(conn.encoding in psycopg2.extensions.encodings) + self.assert_(conn.encoding in ext.encodings) def test_set_isolation_level(self): conn = self.connect() curs = conn.cursor() levels = [ - (None, psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT), ('read uncommitted', - psycopg2.extensions.ISOLATION_LEVEL_READ_UNCOMMITTED), - ('read committed', psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED), - ('repeatable read', psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ), - ('serializable', psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE), + ext.ISOLATION_LEVEL_READ_UNCOMMITTED), + ('read committed', ext.ISOLATION_LEVEL_READ_COMMITTED), + ('repeatable read', ext.ISOLATION_LEVEL_REPEATABLE_READ), + ('serializable', ext.ISOLATION_LEVEL_SERIALIZABLE), ] for name, level in levels: conn.set_isolation_level(level) @@ -512,8 +524,8 @@ class IsolationLevelsTestCase(ConnectingTestCase): # the only values available on prehistoric PG versions if conn.server_version < 80000: if level in ( - psycopg2.extensions.ISOLATION_LEVEL_READ_UNCOMMITTED, - psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ): + ext.ISOLATION_LEVEL_READ_UNCOMMITTED, + ext.ISOLATION_LEVEL_REPEATABLE_READ): name, level = levels[levels.index((name, level)) + 1] self.assertEqual(conn.isolation_level, level) @@ -521,15 +533,26 @@ class IsolationLevelsTestCase(ConnectingTestCase): curs.execute('show transaction_isolation;') got_name = curs.fetchone()[0] - if name is None: - curs.execute('show transaction_isolation;') - name = curs.fetchone()[0] - self.assertEqual(name, got_name) conn.commit() self.assertRaises(ValueError, conn.set_isolation_level, -1) - self.assertRaises(ValueError, conn.set_isolation_level, 6) + self.assertRaises(ValueError, conn.set_isolation_level, 5) + + def test_set_isolation_level_autocommit(self): + conn = self.connect() + curs = conn.cursor() + + conn.set_isolation_level(ext.ISOLATION_LEVEL_AUTOCOMMIT) + self.assertEqual(conn.isolation_level, ext.ISOLATION_LEVEL_DEFAULT) + self.assert_(conn.autocommit) + + conn.isolation_level = 'serializable' + self.assertEqual(conn.isolation_level, ext.ISOLATION_LEVEL_SERIALIZABLE) + self.assert_(conn.autocommit) + + curs.execute('show transaction_isolation;') + self.assertEqual(curs.fetchone()[0], 'serializable') def test_set_isolation_level_default(self): conn = self.connect() @@ -539,14 +562,14 @@ class IsolationLevelsTestCase(ConnectingTestCase): curs.execute("set default_transaction_isolation to 'read committed'") conn.autocommit = False - conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE) + conn.set_isolation_level(ext.ISOLATION_LEVEL_SERIALIZABLE) self.assertEqual(conn.isolation_level, - psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE) + ext.ISOLATION_LEVEL_SERIALIZABLE) curs.execute("show transaction_isolation") self.assertEqual(curs.fetchone()[0], "serializable") conn.rollback() - conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_DEFAULT) + conn.set_isolation_level(ext.ISOLATION_LEVEL_DEFAULT) curs.execute("show transaction_isolation") self.assertEqual(curs.fetchone()[0], "read committed") @@ -554,25 +577,45 @@ class IsolationLevelsTestCase(ConnectingTestCase): conn = self.connect() cur = conn.cursor() - self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE, + self.assertEqual(ext.TRANSACTION_STATUS_IDLE, conn.get_transaction_status()) cur.execute("insert into isolevel values (10);") - self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_INTRANS, + self.assertEqual(ext.TRANSACTION_STATUS_INTRANS, conn.get_transaction_status()) - # changed in psycopg 2.7 - self.assertRaises(psycopg2.ProgrammingError, - conn.set_isolation_level, + conn.set_isolation_level( psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE) + self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE, + conn.get_transaction_status()) + cur.execute("select count(*) from isolevel;") + self.assertEqual(0, cur.fetchone()[0]) + + cur.execute("insert into isolevel values (10);") self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_INTRANS, conn.get_transaction_status()) + conn.set_isolation_level( + psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) + self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE, + conn.get_transaction_status()) + cur.execute("select count(*) from isolevel;") + self.assertEqual(0, cur.fetchone()[0]) + + cur.execute("insert into isolevel values (10);") + self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE, + conn.get_transaction_status()) + conn.set_isolation_level( + psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED) + self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE, + conn.get_transaction_status()) + cur.execute("select count(*) from isolevel;") + self.assertEqual(1, cur.fetchone()[0]) self.assertEqual(conn.isolation_level, - psycopg2.extensions.ISOLATION_LEVEL_DEFAULT) + psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED) def test_isolation_level_autocommit(self): cnn1 = self.connect() cnn2 = self.connect() - cnn2.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) + cnn2.set_isolation_level(ext.ISOLATION_LEVEL_AUTOCOMMIT) cur1 = cnn1.cursor() cur1.execute("select count(*) from isolevel;") @@ -588,7 +631,7 @@ class IsolationLevelsTestCase(ConnectingTestCase): def test_isolation_level_read_committed(self): cnn1 = self.connect() cnn2 = self.connect() - cnn2.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED) + cnn2.set_isolation_level(ext.ISOLATION_LEVEL_READ_COMMITTED) cur1 = cnn1.cursor() cur1.execute("select count(*) from isolevel;") @@ -614,7 +657,7 @@ class IsolationLevelsTestCase(ConnectingTestCase): def test_isolation_level_serializable(self): cnn1 = self.connect() cnn2 = self.connect() - cnn2.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE) + cnn2.set_isolation_level(ext.ISOLATION_LEVEL_SERIALIZABLE) cur1 = cnn1.cursor() cur1.execute("select count(*) from isolevel;") @@ -643,13 +686,112 @@ class IsolationLevelsTestCase(ConnectingTestCase): def test_isolation_level_closed(self): cnn = self.connect() cnn.close() - self.assertRaises(psycopg2.InterfaceError, getattr, - cnn, 'isolation_level') self.assertRaises(psycopg2.InterfaceError, cnn.set_isolation_level, 0) self.assertRaises(psycopg2.InterfaceError, cnn.set_isolation_level, 1) + def test_setattr_isolation_level_int(self): + cur = self.conn.cursor() + self.conn.isolation_level = ext.ISOLATION_LEVEL_SERIALIZABLE + self.assertEqual(self.conn.isolation_level, ext.ISOLATION_LEVEL_SERIALIZABLE) + + cur.execute("SHOW transaction_isolation;") + self.assertEqual(cur.fetchone()[0], 'serializable') + self.conn.rollback() + + self.conn.isolation_level = ext.ISOLATION_LEVEL_REPEATABLE_READ + cur.execute("SHOW transaction_isolation;") + if self.conn.server_version > 80000: + self.assertEqual(self.conn.isolation_level, + ext.ISOLATION_LEVEL_REPEATABLE_READ) + self.assertEqual(cur.fetchone()[0], 'repeatable read') + else: + self.assertEqual(self.conn.isolation_level, + ext.ISOLATION_LEVEL_SERIALIZABLE) + self.assertEqual(cur.fetchone()[0], 'serializable') + self.conn.rollback() + + self.conn.isolation_level = ext.ISOLATION_LEVEL_READ_COMMITTED + self.assertEqual(self.conn.isolation_level, + ext.ISOLATION_LEVEL_READ_COMMITTED) + cur.execute("SHOW transaction_isolation;") + self.assertEqual(cur.fetchone()[0], 'read committed') + self.conn.rollback() + + self.conn.isolation_level = ext.ISOLATION_LEVEL_READ_UNCOMMITTED + cur.execute("SHOW transaction_isolation;") + if self.conn.server_version > 80000: + self.assertEqual(self.conn.isolation_level, + ext.ISOLATION_LEVEL_READ_UNCOMMITTED) + self.assertEqual(cur.fetchone()[0], 'read uncommitted') + else: + self.assertEqual(self.conn.isolation_level, + ext.ISOLATION_LEVEL_READ_COMMITTED) + self.assertEqual(cur.fetchone()[0], 'read committed') + self.conn.rollback() + + self.assertEqual(ext.ISOLATION_LEVEL_DEFAULT, None) + self.conn.isolation_level = ext.ISOLATION_LEVEL_DEFAULT + self.assertEqual(self.conn.isolation_level, None) + cur.execute("SHOW transaction_isolation;") + isol = cur.fetchone()[0] + cur.execute("SHOW default_transaction_isolation;") + self.assertEqual(cur.fetchone()[0], isol) + + def test_setattr_isolation_level_str(self): + cur = self.conn.cursor() + self.conn.isolation_level = "serializable" + self.assertEqual(self.conn.isolation_level, ext.ISOLATION_LEVEL_SERIALIZABLE) + + cur.execute("SHOW transaction_isolation;") + self.assertEqual(cur.fetchone()[0], 'serializable') + self.conn.rollback() + + self.conn.isolation_level = "repeatable read" + cur.execute("SHOW transaction_isolation;") + if self.conn.server_version > 80000: + self.assertEqual(self.conn.isolation_level, + ext.ISOLATION_LEVEL_REPEATABLE_READ) + self.assertEqual(cur.fetchone()[0], 'repeatable read') + else: + self.assertEqual(self.conn.isolation_level, + ext.ISOLATION_LEVEL_SERIALIZABLE) + self.assertEqual(cur.fetchone()[0], 'serializable') + self.conn.rollback() + + self.conn.isolation_level = "read committed" + self.assertEqual(self.conn.isolation_level, + ext.ISOLATION_LEVEL_READ_COMMITTED) + cur.execute("SHOW transaction_isolation;") + self.assertEqual(cur.fetchone()[0], 'read committed') + self.conn.rollback() + + self.conn.isolation_level = "read uncommitted" + cur.execute("SHOW transaction_isolation;") + if self.conn.server_version > 80000: + self.assertEqual(self.conn.isolation_level, + ext.ISOLATION_LEVEL_READ_UNCOMMITTED) + self.assertEqual(cur.fetchone()[0], 'read uncommitted') + else: + self.assertEqual(self.conn.isolation_level, + ext.ISOLATION_LEVEL_READ_COMMITTED) + self.assertEqual(cur.fetchone()[0], 'read committed') + self.conn.rollback() + + self.conn.isolation_level = "default" + self.assertEqual(self.conn.isolation_level, None) + cur.execute("SHOW transaction_isolation;") + isol = cur.fetchone()[0] + cur.execute("SHOW default_transaction_isolation;") + self.assertEqual(cur.fetchone()[0], isol) + + def test_setattr_isolation_level_invalid(self): + self.assertRaises(ValueError, setattr, self.conn, 'isolation_level', 0) + self.assertRaises(ValueError, setattr, self.conn, 'isolation_level', -1) + self.assertRaises(ValueError, setattr, self.conn, 'isolation_level', 5) + self.assertRaises(ValueError, setattr, self.conn, 'isolation_level', 'bah') + class ConnectionTwoPhaseTests(ConnectingTestCase): def setUp(self): @@ -716,10 +858,10 @@ class ConnectionTwoPhaseTests(ConnectingTestCase): def test_tpc_commit(self): cnn = self.connect() xid = cnn.xid(1, "gtrid", "bqual") - self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) + self.assertEqual(cnn.status, ext.STATUS_READY) cnn.tpc_begin(xid) - self.assertEqual(cnn.status, psycopg2.extensions.STATUS_BEGIN) + self.assertEqual(cnn.status, ext.STATUS_BEGIN) cur = cnn.cursor() cur.execute("insert into test_tpc values ('test_tpc_commit');") @@ -727,22 +869,22 @@ class ConnectionTwoPhaseTests(ConnectingTestCase): self.assertEqual(0, self.count_test_records()) cnn.tpc_prepare() - self.assertEqual(cnn.status, psycopg2.extensions.STATUS_PREPARED) + self.assertEqual(cnn.status, ext.STATUS_PREPARED) self.assertEqual(1, self.count_xacts()) self.assertEqual(0, self.count_test_records()) cnn.tpc_commit() - self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) + self.assertEqual(cnn.status, ext.STATUS_READY) self.assertEqual(0, self.count_xacts()) self.assertEqual(1, self.count_test_records()) def test_tpc_commit_one_phase(self): cnn = self.connect() xid = cnn.xid(1, "gtrid", "bqual") - self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) + self.assertEqual(cnn.status, ext.STATUS_READY) cnn.tpc_begin(xid) - self.assertEqual(cnn.status, psycopg2.extensions.STATUS_BEGIN) + self.assertEqual(cnn.status, ext.STATUS_BEGIN) cur = cnn.cursor() cur.execute("insert into test_tpc values ('test_tpc_commit_1p');") @@ -750,17 +892,17 @@ class ConnectionTwoPhaseTests(ConnectingTestCase): self.assertEqual(0, self.count_test_records()) cnn.tpc_commit() - self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) + self.assertEqual(cnn.status, ext.STATUS_READY) self.assertEqual(0, self.count_xacts()) self.assertEqual(1, self.count_test_records()) def test_tpc_commit_recovered(self): cnn = self.connect() xid = cnn.xid(1, "gtrid", "bqual") - self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) + self.assertEqual(cnn.status, ext.STATUS_READY) cnn.tpc_begin(xid) - self.assertEqual(cnn.status, psycopg2.extensions.STATUS_BEGIN) + self.assertEqual(cnn.status, ext.STATUS_BEGIN) cur = cnn.cursor() cur.execute("insert into test_tpc values ('test_tpc_commit_rec');") @@ -776,17 +918,17 @@ class ConnectionTwoPhaseTests(ConnectingTestCase): xid = cnn.xid(1, "gtrid", "bqual") cnn.tpc_commit(xid) - self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) + self.assertEqual(cnn.status, ext.STATUS_READY) self.assertEqual(0, self.count_xacts()) self.assertEqual(1, self.count_test_records()) def test_tpc_rollback(self): cnn = self.connect() xid = cnn.xid(1, "gtrid", "bqual") - self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) + self.assertEqual(cnn.status, ext.STATUS_READY) cnn.tpc_begin(xid) - self.assertEqual(cnn.status, psycopg2.extensions.STATUS_BEGIN) + self.assertEqual(cnn.status, ext.STATUS_BEGIN) cur = cnn.cursor() cur.execute("insert into test_tpc values ('test_tpc_rollback');") @@ -794,22 +936,22 @@ class ConnectionTwoPhaseTests(ConnectingTestCase): self.assertEqual(0, self.count_test_records()) cnn.tpc_prepare() - self.assertEqual(cnn.status, psycopg2.extensions.STATUS_PREPARED) + self.assertEqual(cnn.status, ext.STATUS_PREPARED) self.assertEqual(1, self.count_xacts()) self.assertEqual(0, self.count_test_records()) cnn.tpc_rollback() - self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) + self.assertEqual(cnn.status, ext.STATUS_READY) self.assertEqual(0, self.count_xacts()) self.assertEqual(0, self.count_test_records()) def test_tpc_rollback_one_phase(self): cnn = self.connect() xid = cnn.xid(1, "gtrid", "bqual") - self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) + self.assertEqual(cnn.status, ext.STATUS_READY) cnn.tpc_begin(xid) - self.assertEqual(cnn.status, psycopg2.extensions.STATUS_BEGIN) + self.assertEqual(cnn.status, ext.STATUS_BEGIN) cur = cnn.cursor() cur.execute("insert into test_tpc values ('test_tpc_rollback_1p');") @@ -817,17 +959,17 @@ class ConnectionTwoPhaseTests(ConnectingTestCase): self.assertEqual(0, self.count_test_records()) cnn.tpc_rollback() - self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) + self.assertEqual(cnn.status, ext.STATUS_READY) self.assertEqual(0, self.count_xacts()) self.assertEqual(0, self.count_test_records()) def test_tpc_rollback_recovered(self): cnn = self.connect() xid = cnn.xid(1, "gtrid", "bqual") - self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) + self.assertEqual(cnn.status, ext.STATUS_READY) cnn.tpc_begin(xid) - self.assertEqual(cnn.status, psycopg2.extensions.STATUS_BEGIN) + self.assertEqual(cnn.status, ext.STATUS_BEGIN) cur = cnn.cursor() cur.execute("insert into test_tpc values ('test_tpc_commit_rec');") @@ -843,21 +985,21 @@ class ConnectionTwoPhaseTests(ConnectingTestCase): xid = cnn.xid(1, "gtrid", "bqual") cnn.tpc_rollback(xid) - self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) + self.assertEqual(cnn.status, ext.STATUS_READY) self.assertEqual(0, self.count_xacts()) self.assertEqual(0, self.count_test_records()) def test_status_after_recover(self): cnn = self.connect() - self.assertEqual(psycopg2.extensions.STATUS_READY, cnn.status) + self.assertEqual(ext.STATUS_READY, cnn.status) cnn.tpc_recover() - self.assertEqual(psycopg2.extensions.STATUS_READY, cnn.status) + self.assertEqual(ext.STATUS_READY, cnn.status) cur = cnn.cursor() cur.execute("select 1") - self.assertEqual(psycopg2.extensions.STATUS_BEGIN, cnn.status) + self.assertEqual(ext.STATUS_BEGIN, cnn.status) cnn.tpc_recover() - self.assertEqual(psycopg2.extensions.STATUS_BEGIN, cnn.status) + self.assertEqual(ext.STATUS_BEGIN, cnn.status) def test_recovered_xids(self): # insert a few test xns @@ -1030,25 +1172,25 @@ class TransactionControlTests(ConnectingTestCase): self.conn.close() self.assertRaises(psycopg2.InterfaceError, self.conn.set_session, - psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE) + ext.ISOLATION_LEVEL_SERIALIZABLE) def test_not_in_transaction(self): cur = self.conn.cursor() cur.execute("select 1") self.assertRaises(psycopg2.ProgrammingError, self.conn.set_session, - psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE) + ext.ISOLATION_LEVEL_SERIALIZABLE) def test_set_isolation_level(self): cur = self.conn.cursor() self.conn.set_session( - psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE) + ext.ISOLATION_LEVEL_SERIALIZABLE) cur.execute("SHOW transaction_isolation;") self.assertEqual(cur.fetchone()[0], 'serializable') self.conn.rollback() self.conn.set_session( - psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ) + ext.ISOLATION_LEVEL_REPEATABLE_READ) cur.execute("SHOW transaction_isolation;") if self.conn.server_version > 80000: self.assertEqual(cur.fetchone()[0], 'repeatable read') @@ -1057,13 +1199,13 @@ class TransactionControlTests(ConnectingTestCase): self.conn.rollback() self.conn.set_session( - isolation_level=psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED) + isolation_level=ext.ISOLATION_LEVEL_READ_COMMITTED) cur.execute("SHOW transaction_isolation;") self.assertEqual(cur.fetchone()[0], 'read committed') self.conn.rollback() self.conn.set_session( - isolation_level=psycopg2.extensions.ISOLATION_LEVEL_READ_UNCOMMITTED) + isolation_level=ext.ISOLATION_LEVEL_READ_UNCOMMITTED) cur.execute("SHOW transaction_isolation;") if self.conn.server_version > 80000: self.assertEqual(cur.fetchone()[0], 'read uncommitted') @@ -1105,8 +1247,11 @@ class TransactionControlTests(ConnectingTestCase): self.assertRaises(ValueError, self.conn.set_session, 'whatever') def test_set_read_only(self): + self.assert_(self.conn.readonly is None) + cur = self.conn.cursor() self.conn.set_session(readonly=True) + self.assert_(self.conn.readonly is True) cur.execute("SHOW transaction_read_only;") self.assertEqual(cur.fetchone()[0], 'on') self.conn.rollback() @@ -1114,13 +1259,35 @@ class TransactionControlTests(ConnectingTestCase): self.assertEqual(cur.fetchone()[0], 'on') self.conn.rollback() + self.conn.set_session(readonly=False) + self.assert_(self.conn.readonly is False) + cur.execute("SHOW transaction_read_only;") + self.assertEqual(cur.fetchone()[0], 'off') + self.conn.rollback() + + def test_setattr_read_only(self): cur = self.conn.cursor() - self.conn.set_session(readonly=None) + self.conn.readonly = True + self.assert_(self.conn.readonly is True) + cur.execute("SHOW transaction_read_only;") + self.assertEqual(cur.fetchone()[0], 'on') + self.assertRaises(self.conn.ProgrammingError, + setattr, self.conn, 'readonly', False) + self.assert_(self.conn.readonly is True) + self.conn.rollback() cur.execute("SHOW transaction_read_only;") self.assertEqual(cur.fetchone()[0], 'on') self.conn.rollback() - self.conn.set_session(readonly=False) + cur = self.conn.cursor() + self.conn.readonly = None + self.assert_(self.conn.readonly is None) + cur.execute("SHOW transaction_read_only;") + self.assertEqual(cur.fetchone()[0], 'off') # assume defined by server + self.conn.rollback() + + self.conn.readonly = False + self.assert_(self.conn.readonly is False) cur.execute("SHOW transaction_read_only;") self.assertEqual(cur.fetchone()[0], 'off') self.conn.rollback() @@ -1143,8 +1310,10 @@ class TransactionControlTests(ConnectingTestCase): @skip_before_postgres(9, 1) def test_set_deferrable(self): + self.assert_(self.conn.deferrable is None) cur = self.conn.cursor() self.conn.set_session(readonly=True, deferrable=True) + self.assert_(self.conn.deferrable is True) cur.execute("SHOW transaction_read_only;") self.assertEqual(cur.fetchone()[0], 'on') cur.execute("SHOW transaction_deferrable;") @@ -1155,6 +1324,7 @@ class TransactionControlTests(ConnectingTestCase): self.conn.rollback() self.conn.set_session(deferrable=False) + self.assert_(self.conn.deferrable is False) cur.execute("SHOW transaction_read_only;") self.assertEqual(cur.fetchone()[0], 'on') cur.execute("SHOW transaction_deferrable;") @@ -1165,6 +1335,54 @@ class TransactionControlTests(ConnectingTestCase): def test_set_deferrable_error(self): self.assertRaises(psycopg2.ProgrammingError, self.conn.set_session, readonly=True, deferrable=True) + self.assertRaises(psycopg2.ProgrammingError, + setattr, self.conn, 'deferrable', True) + + @skip_before_postgres(9, 1) + def test_setattr_deferrable(self): + cur = self.conn.cursor() + self.conn.deferrable = True + self.assert_(self.conn.deferrable is True) + cur.execute("SHOW transaction_deferrable;") + self.assertEqual(cur.fetchone()[0], 'on') + self.assertRaises(self.conn.ProgrammingError, + setattr, self.conn, 'deferrable', False) + self.assert_(self.conn.deferrable is True) + self.conn.rollback() + cur.execute("SHOW transaction_deferrable;") + self.assertEqual(cur.fetchone()[0], 'on') + self.conn.rollback() + + cur = self.conn.cursor() + self.conn.deferrable = None + self.assert_(self.conn.deferrable is None) + cur.execute("SHOW transaction_deferrable;") + self.assertEqual(cur.fetchone()[0], 'off') # assume defined by server + self.conn.rollback() + + self.conn.deferrable = False + self.assert_(self.conn.deferrable is False) + cur.execute("SHOW transaction_deferrable;") + self.assertEqual(cur.fetchone()[0], 'off') + self.conn.rollback() + + def test_mixing_session_attribs(self): + cur = self.conn.cursor() + self.conn.autocommit = True + self.conn.readonly = True + + cur.execute("SHOW transaction_read_only;") + self.assertEqual(cur.fetchone()[0], 'on') + + cur.execute("SHOW default_transaction_read_only;") + self.assertEqual(cur.fetchone()[0], 'on') + + self.conn.autocommit = False + cur.execute("SHOW transaction_read_only;") + self.assertEqual(cur.fetchone()[0], 'on') + + cur.execute("SHOW default_transaction_read_only;") + self.assertEqual(cur.fetchone()[0], 'off') class AutocommitTests(ConnectingTestCase): @@ -1183,44 +1401,44 @@ class AutocommitTests(ConnectingTestCase): def test_default_no_autocommit(self): self.assert_(not self.conn.autocommit) - self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY) + self.assertEqual(self.conn.status, ext.STATUS_READY) self.assertEqual(self.conn.get_transaction_status(), - psycopg2.extensions.TRANSACTION_STATUS_IDLE) + ext.TRANSACTION_STATUS_IDLE) cur = self.conn.cursor() cur.execute('select 1;') - self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_BEGIN) + self.assertEqual(self.conn.status, ext.STATUS_BEGIN) self.assertEqual(self.conn.get_transaction_status(), - psycopg2.extensions.TRANSACTION_STATUS_INTRANS) + ext.TRANSACTION_STATUS_INTRANS) self.conn.rollback() - self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY) + self.assertEqual(self.conn.status, ext.STATUS_READY) self.assertEqual(self.conn.get_transaction_status(), - psycopg2.extensions.TRANSACTION_STATUS_IDLE) + ext.TRANSACTION_STATUS_IDLE) def test_set_autocommit(self): self.conn.autocommit = True self.assert_(self.conn.autocommit) - self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY) + self.assertEqual(self.conn.status, ext.STATUS_READY) self.assertEqual(self.conn.get_transaction_status(), - psycopg2.extensions.TRANSACTION_STATUS_IDLE) + ext.TRANSACTION_STATUS_IDLE) cur = self.conn.cursor() cur.execute('select 1;') - self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY) + self.assertEqual(self.conn.status, ext.STATUS_READY) self.assertEqual(self.conn.get_transaction_status(), - psycopg2.extensions.TRANSACTION_STATUS_IDLE) + ext.TRANSACTION_STATUS_IDLE) self.conn.autocommit = False self.assert_(not self.conn.autocommit) - self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY) + self.assertEqual(self.conn.status, ext.STATUS_READY) self.assertEqual(self.conn.get_transaction_status(), - psycopg2.extensions.TRANSACTION_STATUS_IDLE) + ext.TRANSACTION_STATUS_IDLE) cur.execute('select 1;') - self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_BEGIN) + self.assertEqual(self.conn.status, ext.STATUS_BEGIN) self.assertEqual(self.conn.get_transaction_status(), - psycopg2.extensions.TRANSACTION_STATUS_INTRANS) + ext.TRANSACTION_STATUS_INTRANS) def test_set_intrans_error(self): cur = self.conn.cursor() @@ -1231,34 +1449,34 @@ class AutocommitTests(ConnectingTestCase): def test_set_session_autocommit(self): self.conn.set_session(autocommit=True) self.assert_(self.conn.autocommit) - self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY) + self.assertEqual(self.conn.status, ext.STATUS_READY) self.assertEqual(self.conn.get_transaction_status(), - psycopg2.extensions.TRANSACTION_STATUS_IDLE) + ext.TRANSACTION_STATUS_IDLE) cur = self.conn.cursor() cur.execute('select 1;') - self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY) + self.assertEqual(self.conn.status, ext.STATUS_READY) self.assertEqual(self.conn.get_transaction_status(), - psycopg2.extensions.TRANSACTION_STATUS_IDLE) + ext.TRANSACTION_STATUS_IDLE) self.conn.set_session(autocommit=False) self.assert_(not self.conn.autocommit) - self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY) + self.assertEqual(self.conn.status, ext.STATUS_READY) self.assertEqual(self.conn.get_transaction_status(), - psycopg2.extensions.TRANSACTION_STATUS_IDLE) + ext.TRANSACTION_STATUS_IDLE) cur.execute('select 1;') - self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_BEGIN) + self.assertEqual(self.conn.status, ext.STATUS_BEGIN) self.assertEqual(self.conn.get_transaction_status(), - psycopg2.extensions.TRANSACTION_STATUS_INTRANS) + ext.TRANSACTION_STATUS_INTRANS) self.conn.rollback() self.conn.set_session('serializable', readonly=True, autocommit=True) self.assert_(self.conn.autocommit) cur.execute('select 1;') - self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY) + self.assertEqual(self.conn.status, ext.STATUS_READY) self.assertEqual(self.conn.get_transaction_status(), - psycopg2.extensions.TRANSACTION_STATUS_IDLE) + ext.TRANSACTION_STATUS_IDLE) cur.execute("SHOW transaction_isolation;") self.assertEqual(cur.fetchone()[0], 'serializable') cur.execute("SHOW transaction_read_only;") |