diff options
author | Daniele Varrazzo <daniele.varrazzo@gmail.com> | 2017-02-16 11:04:02 +0000 |
---|---|---|
committer | Daniele Varrazzo <daniele.varrazzo@gmail.com> | 2017-02-16 12:46:35 +0000 |
commit | d50ed488074a3b809bf98fb1e6971bc295ba31c3 (patch) | |
tree | 333d66d7ddf85b5704ccd64a99283862aebbc9f8 /tests | |
parent | b5d80b609d6f1b0922ed51d6182ff137f1b3ba3e (diff) | |
download | psycopg2-d50ed488074a3b809bf98fb1e6971bc295ba31c3.tar.gz |
Added readonly and deferrable attributes
Diffstat (limited to 'tests')
-rwxr-xr-x | tests/test_async.py | 17 | ||||
-rwxr-xr-x | tests/test_async_keyword.py | 4 | ||||
-rwxr-xr-x | tests/test_connection.py | 127 |
3 files changed, 123 insertions, 25 deletions
diff --git a/tests/test_async.py b/tests/test_async.py index 63a5513..0a386d3 100755 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -26,7 +26,7 @@ from testutils import unittest, skip_before_postgres, slow import psycopg2 -from psycopg2 import extensions +from psycopg2 import extensions as ext import time import StringIO @@ -74,13 +74,14 @@ class AsyncTests(ConnectingTestCase): self.assert_(self.conn.async_) self.assert_(not self.sync_conn.async_) - # the async connection should be in isolevel 0 - self.assertEquals(self.conn.isolation_level, 0) + # the async connection should be autocommit + self.assert_(self.conn.autocommit) + self.assertEquals(self.conn.isolation_level, ext.ISOLATION_LEVEL_DEFAULT) # check other properties to be found on the connection self.assert_(self.conn.server_version) self.assert_(self.conn.protocol_version in (2, 3)) - self.assert_(self.conn.encoding in psycopg2.extensions.encodings) + self.assert_(self.conn.encoding in ext.encodings) def test_async_named_cursor(self): self.assertRaises(psycopg2.ProgrammingError, @@ -192,7 +193,7 @@ class AsyncTests(ConnectingTestCase): # getting transaction status works self.assertEquals(self.conn.get_transaction_status(), - extensions.TRANSACTION_STATUS_ACTIVE) + ext.TRANSACTION_STATUS_ACTIVE) self.assertTrue(self.conn.isexecuting()) # setting connection encoding should fail @@ -311,9 +312,9 @@ class AsyncTests(ConnectingTestCase): self.assertEquals(cur.fetchone()[0], "b" * 10000) def test_async_subclass(self): - class MyConn(psycopg2.extensions.connection): + class MyConn(ext.connection): def __init__(self, dsn, async_=0): - psycopg2.extensions.connection.__init__(self, dsn, async_=async_) + ext.connection.__init__(self, dsn, async_=async_) conn = self.connect(connection_factory=MyConn, async_=True) self.assert_(isinstance(conn, MyConn)) @@ -330,7 +331,7 @@ class AsyncTests(ConnectingTestCase): curs.execute("select %s;", ('x' * size,)) self.wait(stub) self.assertEqual(size, len(curs.fetchone()[0])) - if stub.polls.count(psycopg2.extensions.POLL_WRITE) > 1: + if stub.polls.count(ext.POLL_WRITE) > 1: return # This is more a testing glitch than an error: it happens diff --git a/tests/test_async_keyword.py b/tests/test_async_keyword.py index 67dbb03..ff41e1b 100755 --- a/tests/test_async_keyword.py +++ b/tests/test_async_keyword.py @@ -59,8 +59,8 @@ class AsyncTests(ConnectingTestCase): self.assert_(self.conn.async) self.assert_(not self.sync_conn.async) - # the async connection should be in isolevel 0 - self.assertEquals(self.conn.isolation_level, 0) + # the async connection should be autocommit + self.assert_(self.conn.autocommit) # check other properties to be found on the connection self.assert_(self.conn.server_version) diff --git a/tests/test_connection.py b/tests/test_connection.py index 1d11ff1..a9525b4 100755 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -89,13 +89,26 @@ class ConnectionTests(ConnectingTestCase): def test_reset(self): conn = self.conn - # switch isolation level, then reset - level = conn.isolation_level - conn.set_isolation_level(0) - self.assertEqual(conn.isolation_level, 0) + # switch session characteristics + conn.autocommit = True + conn.isolation_level = 'serializable' + conn.readonly = True + if self.conn.server_version >= 90100: + conn.deferrable = False + + self.assert_(conn.autocommit) + self.assertEqual(conn.isolation_level, ext.ISOLATION_LEVEL_SERIALIZABLE) + self.assert_(conn.readonly is True) + if self.conn.server_version >= 90100: + self.assert_(conn.deferrable is False) + conn.reset() - # now the isolation level should be equal to saved one - self.assertEqual(conn.isolation_level, level) + # now the session characteristics should be reverted + self.assert_(not conn.autocommit) + self.assertEqual(conn.isolation_level, ext.ISOLATION_LEVEL_DEFAULT) + self.assert_(conn.readonly is None) + if self.conn.server_version >= 90100: + self.assert_(conn.deferrable is None) def test_notices(self): conn = self.conn @@ -499,7 +512,6 @@ class IsolationLevelsTestCase(ConnectingTestCase): curs = conn.cursor() levels = [ - (None, ext.ISOLATION_LEVEL_AUTOCOMMIT), ('read uncommitted', ext.ISOLATION_LEVEL_READ_UNCOMMITTED), ('read committed', ext.ISOLATION_LEVEL_READ_COMMITTED), @@ -521,16 +533,27 @@ class IsolationLevelsTestCase(ConnectingTestCase): curs.execute('show transaction_isolation;') got_name = curs.fetchone()[0] - if name is None: - curs.execute('show transaction_isolation;') - name = curs.fetchone()[0] - self.assertEqual(name, got_name) conn.commit() self.assertRaises(ValueError, conn.set_isolation_level, -1) self.assertRaises(ValueError, conn.set_isolation_level, 5) + def test_set_isolation_level_autocommit(self): + conn = self.connect() + curs = conn.cursor() + + conn.set_isolation_level(ext.ISOLATION_LEVEL_AUTOCOMMIT) + self.assertEqual(conn.isolation_level, ext.ISOLATION_LEVEL_DEFAULT) + self.assert_(conn.autocommit) + + conn.isolation_level = 'serializable' + self.assertEqual(conn.isolation_level, ext.ISOLATION_LEVEL_SERIALIZABLE) + self.assert_(conn.autocommit) + + curs.execute('show transaction_isolation;') + self.assertEqual(curs.fetchone()[0], 'serializable') + def test_set_isolation_level_default(self): conn = self.connect() curs = conn.cursor() @@ -663,8 +686,6 @@ class IsolationLevelsTestCase(ConnectingTestCase): def test_isolation_level_closed(self): cnn = self.connect() cnn.close() - self.assertRaises(psycopg2.InterfaceError, getattr, - cnn, 'isolation_level') self.assertRaises(psycopg2.InterfaceError, cnn.set_isolation_level, 0) self.assertRaises(psycopg2.InterfaceError, @@ -1226,8 +1247,11 @@ class TransactionControlTests(ConnectingTestCase): self.assertRaises(ValueError, self.conn.set_session, 'whatever') def test_set_read_only(self): + self.assert_(self.conn.readonly is None) + cur = self.conn.cursor() self.conn.set_session(readonly=True) + self.assert_(self.conn.readonly is True) cur.execute("SHOW transaction_read_only;") self.assertEqual(cur.fetchone()[0], 'on') self.conn.rollback() @@ -1235,13 +1259,35 @@ class TransactionControlTests(ConnectingTestCase): self.assertEqual(cur.fetchone()[0], 'on') self.conn.rollback() + self.conn.set_session(readonly=False) + self.assert_(self.conn.readonly is False) + cur.execute("SHOW transaction_read_only;") + self.assertEqual(cur.fetchone()[0], 'off') + self.conn.rollback() + + def test_setattr_read_only(self): cur = self.conn.cursor() - self.conn.set_session(readonly=None) + self.conn.readonly = True + self.assert_(self.conn.readonly is True) + cur.execute("SHOW transaction_read_only;") + self.assertEqual(cur.fetchone()[0], 'on') + self.assertRaises(self.conn.ProgrammingError, + setattr, self.conn, 'readonly', False) + self.assert_(self.conn.readonly is True) + self.conn.rollback() cur.execute("SHOW transaction_read_only;") self.assertEqual(cur.fetchone()[0], 'on') self.conn.rollback() - self.conn.set_session(readonly=False) + cur = self.conn.cursor() + self.conn.readonly = None + self.assert_(self.conn.readonly is None) + cur.execute("SHOW transaction_read_only;") + self.assertEqual(cur.fetchone()[0], 'off') # assume defined by server + self.conn.rollback() + + self.conn.readonly = False + self.assert_(self.conn.readonly is False) cur.execute("SHOW transaction_read_only;") self.assertEqual(cur.fetchone()[0], 'off') self.conn.rollback() @@ -1264,8 +1310,10 @@ class TransactionControlTests(ConnectingTestCase): @skip_before_postgres(9, 1) def test_set_deferrable(self): + self.assert_(self.conn.deferrable is None) cur = self.conn.cursor() self.conn.set_session(readonly=True, deferrable=True) + self.assert_(self.conn.deferrable is True) cur.execute("SHOW transaction_read_only;") self.assertEqual(cur.fetchone()[0], 'on') cur.execute("SHOW transaction_deferrable;") @@ -1276,6 +1324,7 @@ class TransactionControlTests(ConnectingTestCase): self.conn.rollback() self.conn.set_session(deferrable=False) + self.assert_(self.conn.deferrable is False) cur.execute("SHOW transaction_read_only;") self.assertEqual(cur.fetchone()[0], 'on') cur.execute("SHOW transaction_deferrable;") @@ -1286,6 +1335,54 @@ class TransactionControlTests(ConnectingTestCase): def test_set_deferrable_error(self): self.assertRaises(psycopg2.ProgrammingError, self.conn.set_session, readonly=True, deferrable=True) + self.assertRaises(psycopg2.ProgrammingError, + setattr, self.conn, 'deferrable', True) + + @skip_before_postgres(9, 1) + def test_setattr_deferrable(self): + cur = self.conn.cursor() + self.conn.deferrable = True + self.assert_(self.conn.deferrable is True) + cur.execute("SHOW transaction_deferrable;") + self.assertEqual(cur.fetchone()[0], 'on') + self.assertRaises(self.conn.ProgrammingError, + setattr, self.conn, 'deferrable', False) + self.assert_(self.conn.deferrable is True) + self.conn.rollback() + cur.execute("SHOW transaction_deferrable;") + self.assertEqual(cur.fetchone()[0], 'on') + self.conn.rollback() + + cur = self.conn.cursor() + self.conn.deferrable = None + self.assert_(self.conn.deferrable is None) + cur.execute("SHOW transaction_deferrable;") + self.assertEqual(cur.fetchone()[0], 'off') # assume defined by server + self.conn.rollback() + + self.conn.deferrable = False + self.assert_(self.conn.deferrable is False) + cur.execute("SHOW transaction_deferrable;") + self.assertEqual(cur.fetchone()[0], 'off') + self.conn.rollback() + + def test_mixing_session_attribs(self): + cur = self.conn.cursor() + self.conn.autocommit = True + self.conn.readonly = True + + cur.execute("SHOW transaction_read_only;") + self.assertEqual(cur.fetchone()[0], 'on') + + cur.execute("SHOW default_transaction_read_only;") + self.assertEqual(cur.fetchone()[0], 'on') + + self.conn.autocommit = False + cur.execute("SHOW transaction_read_only;") + self.assertEqual(cur.fetchone()[0], 'on') + + cur.execute("SHOW default_transaction_read_only;") + self.assertEqual(cur.fetchone()[0], 'off') class AutocommitTests(ConnectingTestCase): |