summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>2017-02-16 11:04:02 +0000
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>2017-02-16 12:46:35 +0000
commitd50ed488074a3b809bf98fb1e6971bc295ba31c3 (patch)
tree333d66d7ddf85b5704ccd64a99283862aebbc9f8 /tests
parentb5d80b609d6f1b0922ed51d6182ff137f1b3ba3e (diff)
downloadpsycopg2-d50ed488074a3b809bf98fb1e6971bc295ba31c3.tar.gz
Added readonly and deferrable attributes
Diffstat (limited to 'tests')
-rwxr-xr-xtests/test_async.py17
-rwxr-xr-xtests/test_async_keyword.py4
-rwxr-xr-xtests/test_connection.py127
3 files changed, 123 insertions, 25 deletions
diff --git a/tests/test_async.py b/tests/test_async.py
index 63a5513..0a386d3 100755
--- a/tests/test_async.py
+++ b/tests/test_async.py
@@ -26,7 +26,7 @@
from testutils import unittest, skip_before_postgres, slow
import psycopg2
-from psycopg2 import extensions
+from psycopg2 import extensions as ext
import time
import StringIO
@@ -74,13 +74,14 @@ class AsyncTests(ConnectingTestCase):
self.assert_(self.conn.async_)
self.assert_(not self.sync_conn.async_)
- # the async connection should be in isolevel 0
- self.assertEquals(self.conn.isolation_level, 0)
+ # the async connection should be autocommit
+ self.assert_(self.conn.autocommit)
+ self.assertEquals(self.conn.isolation_level, ext.ISOLATION_LEVEL_DEFAULT)
# check other properties to be found on the connection
self.assert_(self.conn.server_version)
self.assert_(self.conn.protocol_version in (2, 3))
- self.assert_(self.conn.encoding in psycopg2.extensions.encodings)
+ self.assert_(self.conn.encoding in ext.encodings)
def test_async_named_cursor(self):
self.assertRaises(psycopg2.ProgrammingError,
@@ -192,7 +193,7 @@ class AsyncTests(ConnectingTestCase):
# getting transaction status works
self.assertEquals(self.conn.get_transaction_status(),
- extensions.TRANSACTION_STATUS_ACTIVE)
+ ext.TRANSACTION_STATUS_ACTIVE)
self.assertTrue(self.conn.isexecuting())
# setting connection encoding should fail
@@ -311,9 +312,9 @@ class AsyncTests(ConnectingTestCase):
self.assertEquals(cur.fetchone()[0], "b" * 10000)
def test_async_subclass(self):
- class MyConn(psycopg2.extensions.connection):
+ class MyConn(ext.connection):
def __init__(self, dsn, async_=0):
- psycopg2.extensions.connection.__init__(self, dsn, async_=async_)
+ ext.connection.__init__(self, dsn, async_=async_)
conn = self.connect(connection_factory=MyConn, async_=True)
self.assert_(isinstance(conn, MyConn))
@@ -330,7 +331,7 @@ class AsyncTests(ConnectingTestCase):
curs.execute("select %s;", ('x' * size,))
self.wait(stub)
self.assertEqual(size, len(curs.fetchone()[0]))
- if stub.polls.count(psycopg2.extensions.POLL_WRITE) > 1:
+ if stub.polls.count(ext.POLL_WRITE) > 1:
return
# This is more a testing glitch than an error: it happens
diff --git a/tests/test_async_keyword.py b/tests/test_async_keyword.py
index 67dbb03..ff41e1b 100755
--- a/tests/test_async_keyword.py
+++ b/tests/test_async_keyword.py
@@ -59,8 +59,8 @@ class AsyncTests(ConnectingTestCase):
self.assert_(self.conn.async)
self.assert_(not self.sync_conn.async)
- # the async connection should be in isolevel 0
- self.assertEquals(self.conn.isolation_level, 0)
+ # the async connection should be autocommit
+ self.assert_(self.conn.autocommit)
# check other properties to be found on the connection
self.assert_(self.conn.server_version)
diff --git a/tests/test_connection.py b/tests/test_connection.py
index 1d11ff1..a9525b4 100755
--- a/tests/test_connection.py
+++ b/tests/test_connection.py
@@ -89,13 +89,26 @@ class ConnectionTests(ConnectingTestCase):
def test_reset(self):
conn = self.conn
- # switch isolation level, then reset
- level = conn.isolation_level
- conn.set_isolation_level(0)
- self.assertEqual(conn.isolation_level, 0)
+ # switch session characteristics
+ conn.autocommit = True
+ conn.isolation_level = 'serializable'
+ conn.readonly = True
+ if self.conn.server_version >= 90100:
+ conn.deferrable = False
+
+ self.assert_(conn.autocommit)
+ self.assertEqual(conn.isolation_level, ext.ISOLATION_LEVEL_SERIALIZABLE)
+ self.assert_(conn.readonly is True)
+ if self.conn.server_version >= 90100:
+ self.assert_(conn.deferrable is False)
+
conn.reset()
- # now the isolation level should be equal to saved one
- self.assertEqual(conn.isolation_level, level)
+ # now the session characteristics should be reverted
+ self.assert_(not conn.autocommit)
+ self.assertEqual(conn.isolation_level, ext.ISOLATION_LEVEL_DEFAULT)
+ self.assert_(conn.readonly is None)
+ if self.conn.server_version >= 90100:
+ self.assert_(conn.deferrable is None)
def test_notices(self):
conn = self.conn
@@ -499,7 +512,6 @@ class IsolationLevelsTestCase(ConnectingTestCase):
curs = conn.cursor()
levels = [
- (None, ext.ISOLATION_LEVEL_AUTOCOMMIT),
('read uncommitted',
ext.ISOLATION_LEVEL_READ_UNCOMMITTED),
('read committed', ext.ISOLATION_LEVEL_READ_COMMITTED),
@@ -521,16 +533,27 @@ class IsolationLevelsTestCase(ConnectingTestCase):
curs.execute('show transaction_isolation;')
got_name = curs.fetchone()[0]
- if name is None:
- curs.execute('show transaction_isolation;')
- name = curs.fetchone()[0]
-
self.assertEqual(name, got_name)
conn.commit()
self.assertRaises(ValueError, conn.set_isolation_level, -1)
self.assertRaises(ValueError, conn.set_isolation_level, 5)
+ def test_set_isolation_level_autocommit(self):
+ conn = self.connect()
+ curs = conn.cursor()
+
+ conn.set_isolation_level(ext.ISOLATION_LEVEL_AUTOCOMMIT)
+ self.assertEqual(conn.isolation_level, ext.ISOLATION_LEVEL_DEFAULT)
+ self.assert_(conn.autocommit)
+
+ conn.isolation_level = 'serializable'
+ self.assertEqual(conn.isolation_level, ext.ISOLATION_LEVEL_SERIALIZABLE)
+ self.assert_(conn.autocommit)
+
+ curs.execute('show transaction_isolation;')
+ self.assertEqual(curs.fetchone()[0], 'serializable')
+
def test_set_isolation_level_default(self):
conn = self.connect()
curs = conn.cursor()
@@ -663,8 +686,6 @@ class IsolationLevelsTestCase(ConnectingTestCase):
def test_isolation_level_closed(self):
cnn = self.connect()
cnn.close()
- self.assertRaises(psycopg2.InterfaceError, getattr,
- cnn, 'isolation_level')
self.assertRaises(psycopg2.InterfaceError,
cnn.set_isolation_level, 0)
self.assertRaises(psycopg2.InterfaceError,
@@ -1226,8 +1247,11 @@ class TransactionControlTests(ConnectingTestCase):
self.assertRaises(ValueError, self.conn.set_session, 'whatever')
def test_set_read_only(self):
+ self.assert_(self.conn.readonly is None)
+
cur = self.conn.cursor()
self.conn.set_session(readonly=True)
+ self.assert_(self.conn.readonly is True)
cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on')
self.conn.rollback()
@@ -1235,13 +1259,35 @@ class TransactionControlTests(ConnectingTestCase):
self.assertEqual(cur.fetchone()[0], 'on')
self.conn.rollback()
+ self.conn.set_session(readonly=False)
+ self.assert_(self.conn.readonly is False)
+ cur.execute("SHOW transaction_read_only;")
+ self.assertEqual(cur.fetchone()[0], 'off')
+ self.conn.rollback()
+
+ def test_setattr_read_only(self):
cur = self.conn.cursor()
- self.conn.set_session(readonly=None)
+ self.conn.readonly = True
+ self.assert_(self.conn.readonly is True)
+ cur.execute("SHOW transaction_read_only;")
+ self.assertEqual(cur.fetchone()[0], 'on')
+ self.assertRaises(self.conn.ProgrammingError,
+ setattr, self.conn, 'readonly', False)
+ self.assert_(self.conn.readonly is True)
+ self.conn.rollback()
cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on')
self.conn.rollback()
- self.conn.set_session(readonly=False)
+ cur = self.conn.cursor()
+ self.conn.readonly = None
+ self.assert_(self.conn.readonly is None)
+ cur.execute("SHOW transaction_read_only;")
+ self.assertEqual(cur.fetchone()[0], 'off') # assume defined by server
+ self.conn.rollback()
+
+ self.conn.readonly = False
+ self.assert_(self.conn.readonly is False)
cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'off')
self.conn.rollback()
@@ -1264,8 +1310,10 @@ class TransactionControlTests(ConnectingTestCase):
@skip_before_postgres(9, 1)
def test_set_deferrable(self):
+ self.assert_(self.conn.deferrable is None)
cur = self.conn.cursor()
self.conn.set_session(readonly=True, deferrable=True)
+ self.assert_(self.conn.deferrable is True)
cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on')
cur.execute("SHOW transaction_deferrable;")
@@ -1276,6 +1324,7 @@ class TransactionControlTests(ConnectingTestCase):
self.conn.rollback()
self.conn.set_session(deferrable=False)
+ self.assert_(self.conn.deferrable is False)
cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on')
cur.execute("SHOW transaction_deferrable;")
@@ -1286,6 +1335,54 @@ class TransactionControlTests(ConnectingTestCase):
def test_set_deferrable_error(self):
self.assertRaises(psycopg2.ProgrammingError,
self.conn.set_session, readonly=True, deferrable=True)
+ self.assertRaises(psycopg2.ProgrammingError,
+ setattr, self.conn, 'deferrable', True)
+
+ @skip_before_postgres(9, 1)
+ def test_setattr_deferrable(self):
+ cur = self.conn.cursor()
+ self.conn.deferrable = True
+ self.assert_(self.conn.deferrable is True)
+ cur.execute("SHOW transaction_deferrable;")
+ self.assertEqual(cur.fetchone()[0], 'on')
+ self.assertRaises(self.conn.ProgrammingError,
+ setattr, self.conn, 'deferrable', False)
+ self.assert_(self.conn.deferrable is True)
+ self.conn.rollback()
+ cur.execute("SHOW transaction_deferrable;")
+ self.assertEqual(cur.fetchone()[0], 'on')
+ self.conn.rollback()
+
+ cur = self.conn.cursor()
+ self.conn.deferrable = None
+ self.assert_(self.conn.deferrable is None)
+ cur.execute("SHOW transaction_deferrable;")
+ self.assertEqual(cur.fetchone()[0], 'off') # assume defined by server
+ self.conn.rollback()
+
+ self.conn.deferrable = False
+ self.assert_(self.conn.deferrable is False)
+ cur.execute("SHOW transaction_deferrable;")
+ self.assertEqual(cur.fetchone()[0], 'off')
+ self.conn.rollback()
+
+ def test_mixing_session_attribs(self):
+ cur = self.conn.cursor()
+ self.conn.autocommit = True
+ self.conn.readonly = True
+
+ cur.execute("SHOW transaction_read_only;")
+ self.assertEqual(cur.fetchone()[0], 'on')
+
+ cur.execute("SHOW default_transaction_read_only;")
+ self.assertEqual(cur.fetchone()[0], 'on')
+
+ self.conn.autocommit = False
+ cur.execute("SHOW transaction_read_only;")
+ self.assertEqual(cur.fetchone()[0], 'on')
+
+ cur.execute("SHOW default_transaction_read_only;")
+ self.assertEqual(cur.fetchone()[0], 'off')
class AutocommitTests(ConnectingTestCase):