summaryrefslogtreecommitdiff
path: root/tests/test_async.py
diff options
context:
space:
mode:
authorJan UrbaƄski <wulczer@wulczer.org>2010-03-31 02:00:52 +0200
committerFederico Di Gregorio <fog@initd.org>2010-04-05 16:28:39 +0200
commite15bc9da05721a22e986feb0363da8c5e4eb2641 (patch)
tree855b237208b8db78da9449455dd6951f0bcb0b12 /tests/test_async.py
parent31f60be000a16c464217c260296a088e6dc020ff (diff)
downloadpsycopg2-e15bc9da05721a22e986feb0363da8c5e4eb2641.tar.gz
Adapt the tests to recent changes
Some methods were forbidden in asynchronous mode, the isolation level of an asynchronous connection is not always 0 and these changes influenced expected test results.
Diffstat (limited to 'tests/test_async.py')
-rw-r--r--tests/test_async.py134
1 files changed, 80 insertions, 54 deletions
diff --git a/tests/test_async.py b/tests/test_async.py
index c2bbf4c..5aad8d3 100644
--- a/tests/test_async.py
+++ b/tests/test_async.py
@@ -33,7 +33,7 @@ class AsyncTests(unittest.TestCase):
CREATE TEMPORARY TABLE table1 (
id int PRIMARY KEY
)''')
- self.conn.commit()
+ self.wait_for_query(curs)
def tearDown(self):
self.sync_conn.close()
@@ -48,18 +48,19 @@ class AsyncTests(unittest.TestCase):
select.select([], [cur.fileno()], [])
state = cur.poll()
- def test_wrong_execution_type(self):
+ def test_connection_setup(self):
cur = self.conn.cursor()
sync_cur = self.sync_conn.cursor()
- self.assertRaises(psycopg2.ProgrammingError, cur.execute,
- "select 'a'", async=False)
- self.assertRaises(psycopg2.ProgrammingError, sync_cur.execute,
- "select 'a'", async=True)
+ self.assertEquals(self.conn.issync(), False)
+ self.assertEquals(self.sync_conn.issync(), True)
- # but this should work anyway
- sync_cur.execute("select 'a'", async=False)
- cur.execute("select 'a'", async=True)
+ # the async connection should be in isolevel 0
+ self.assertEquals(self.conn.isolation_level, 0)
+
+ def test_async_named_cursor(self):
+ self.assertRaises(psycopg2.ProgrammingError,
+ self.conn.cursor, "name")
def test_async_select(self):
cur = self.conn.cursor()
@@ -75,7 +76,7 @@ class AsyncTests(unittest.TestCase):
def test_async_callproc(self):
cur = self.conn.cursor()
try:
- cur.callproc("pg_sleep", (0.1, ), True)
+ cur.callproc("pg_sleep", (0.1, ))
except psycopg2.ProgrammingError:
# PG <8.1 did not have pg_sleep
return
@@ -91,7 +92,14 @@ class AsyncTests(unittest.TestCase):
cur.execute("insert into table1 values (1)")
- # an async execute after an async one blocks and waits for completion
+ # an async execute after an async one raises an exception
+ self.assertRaises(psycopg2.ProgrammingError,
+ cur.execute, "select * from table1")
+ # same for callproc
+ self.assertRaises(psycopg2.ProgrammingError,
+ cur.callproc, "version")
+ # but after you've waited it should be good
+ self.wait_for_query(cur)
cur.execute("select * from table1")
self.wait_for_query(cur)
@@ -109,7 +117,11 @@ class AsyncTests(unittest.TestCase):
cur = self.conn.cursor()
cur.execute("select 'a'")
- # a fetch after an asynchronous query blocks and waits for completion
+ # a fetch after an asynchronous query should raise an error
+ self.assertRaises(psycopg2.ProgrammingError,
+ cur.fetchall)
+ # but after waiting it should work
+ self.wait_for_query(cur)
self.assertEquals(cur.fetchall()[0][0], "a")
def test_rollback_while_async(self):
@@ -117,42 +129,38 @@ class AsyncTests(unittest.TestCase):
cur.execute("select 'a'")
- # a rollback blocks and should leave the connection in a workable state
- self.conn.rollback()
- self.assertFalse(self.conn.executing())
-
- # try a sync cursor first
- sync_cur = self.sync_conn.cursor()
- sync_cur.execute("select 'b'")
- self.assertEquals(sync_cur.fetchone()[0], "b")
-
- # now try the async cursor
- cur.execute("select 'c'")
- self.wait_for_query(cur)
- self.assertEquals(cur.fetchmany()[0][0], "c")
+ # a rollback should not work in asynchronous mode
+ self.assertRaises(psycopg2.ProgrammingError, self.conn.rollback)
def test_commit_while_async(self):
cur = self.conn.cursor()
+ cur.execute("begin")
+ self.wait_for_query(cur)
+
cur.execute("insert into table1 values (1)")
- # a commit blocks
- self.conn.commit()
- self.assertFalse(self.conn.executing())
+ # a commit should not work in asynchronous mode
+ self.assertRaises(psycopg2.ProgrammingError, self.conn.commit)
+ self.assertTrue(self.conn.executing())
+
+ # but a manual commit should
+ self.wait_for_query(cur)
+ cur.execute("commit")
+ self.wait_for_query(cur)
cur.execute("select * from table1")
self.wait_for_query(cur)
self.assertEquals(cur.fetchall()[0][0], 1)
cur.execute("delete from table1")
- self.conn.commit()
+ self.wait_for_query(cur)
cur.execute("select * from table1")
self.wait_for_query(cur)
self.assertEquals(cur.fetchone(), None)
def test_set_parameters_while_async(self):
- prev_encoding = self.conn.encoding
cur = self.conn.cursor()
cur.execute("select 'c'")
@@ -163,37 +171,36 @@ class AsyncTests(unittest.TestCase):
extensions.TRANSACTION_STATUS_ACTIVE)
self.assertTrue(self.conn.executing())
- # this issues a ROLLBACK internally
- self.conn.set_client_encoding("LATIN1")
-
- self.assertFalse(self.conn.executing())
- self.assertEquals(self.conn.encoding, "LATIN1")
+ # setting connection encoding should fail
+ self.assertRaises(psycopg2.ProgrammingError,
+ self.conn.set_client_encoding, "LATIN1")
- self.conn.set_client_encoding(prev_encoding)
+ # same for transaction isolation
+ self.assertRaises(psycopg2.ProgrammingError,
+ self.conn.set_isolation_level, 1)
def test_reset_while_async(self):
- prev_encoding = self.conn.encoding
- # pick something different than the current encoding
- new_encoding = (prev_encoding == "LATIN1") and "UTF8" or "LATIN1"
-
- self.conn.set_client_encoding(new_encoding)
-
cur = self.conn.cursor()
cur.execute("select 'c'")
self.assertTrue(self.conn.executing())
- self.conn.reset()
- self.assertFalse(self.conn.executing())
- self.assertEquals(self.conn.encoding, prev_encoding)
+ # a reset should fail
+ self.assertRaises(psycopg2.ProgrammingError, self.conn.reset)
def test_async_iter(self):
cur = self.conn.cursor()
+ cur.execute("begin")
+ self.wait_for_query(cur)
cur.execute("insert into table1 values (1), (2), (3)")
self.wait_for_query(cur)
cur.execute("select id from table1 order by id")
- # iteration just blocks
+ # iteration fails if a query is underway
+ self.assertRaises(psycopg2.ProgrammingError, list, cur)
+
+ # but after it's done it should work
+ self.wait_for_query(cur)
self.assertEquals(list(cur), [(1, ), (2, ), (3, )])
self.assertFalse(self.conn.executing())
@@ -201,11 +208,15 @@ class AsyncTests(unittest.TestCase):
cur = self.conn.cursor()
cur.execute("select 'a'")
- # copy just blocks
- cur.copy_from(StringIO.StringIO("1\n3\n5\n\\.\n"), "table1")
+ # copy should fail
+ self.assertRaises(psycopg2.ProgrammingError,
+ cur.copy_from,
+ StringIO.StringIO("1\n3\n5\n\\.\n"), "table1")
- cur.execute("select * from table1 order by id")
- self.assertEquals(cur.fetchall(), [(1, ), (3, ), (5, )])
+ def test_lobject_while_async(self):
+ # large objects should be prohibited
+ self.assertRaises(psycopg2.ProgrammingError,
+ self.conn.lobject)
def test_async_executemany(self):
cur = self.conn.cursor()
@@ -219,13 +230,18 @@ class AsyncTests(unittest.TestCase):
self.wait_for_query(cur)
cur.execute("select id from table1 order by id")
- # scroll blocks, but should work
+ # scroll should fail if a query is underway
+ self.assertRaises(psycopg2.ProgrammingError, cur.scroll, 1)
+ self.assertTrue(self.conn.executing())
+
+ # but after it's done it should work
+ self.wait_for_query(cur)
cur.scroll(1)
- self.assertFalse(self.conn.executing())
self.assertEquals(cur.fetchall(), [(2, ), (3, )])
cur = self.conn.cursor()
cur.execute("select id from table1 order by id")
+ self.wait_for_query(cur)
cur2 = self.conn.cursor()
self.assertRaises(psycopg2.ProgrammingError, cur2.scroll, 1)
@@ -234,19 +250,29 @@ class AsyncTests(unittest.TestCase):
cur = self.conn.cursor()
cur.execute("select id from table1 order by id")
+ self.wait_for_query(cur)
+ cur.scroll(2)
+ cur.scroll(-1)
+ self.assertEquals(cur.fetchall(), [(2, ), (3, )])
+
+ def test_scroll(self):
+ cur = self.sync_conn.cursor()
+ cur.execute("create table table1 (id int)")
+ cur.execute("insert into table1 values (1), (2), (3)")
+ cur.execute("select id from table1 order by id")
cur.scroll(2)
cur.scroll(-1)
self.assertEquals(cur.fetchall(), [(2, ), (3, )])
def test_async_dont_read_all(self):
cur = self.conn.cursor()
- cur.execute("select 'a'; select 'b'")
+ cur.execute("select repeat('a', 10000); select repeat('b', 10000)")
# fetch the result
self.wait_for_query(cur)
# it should be the result of the second query
- self.assertEquals(cur.fetchone()[0][0], "b")
+ self.assertEquals(cur.fetchone()[0], "b" * 10000)
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)