summaryrefslogtreecommitdiff
path: root/tests/test_async.py
diff options
context:
space:
mode:
authorJan UrbaƄski <wulczer@wulczer.org>2010-04-11 20:25:17 +0200
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>2010-04-20 23:01:01 +0100
commitc4ebc0f702dfc3bae92ff3ffc01b9d6ed5d2fbde (patch)
tree46d8cd7c5875f77c574b63504525b70ec1f2aa92 /tests/test_async.py
parent249b3ef88fcb57cddf9992175d60c782ee663b25 (diff)
downloadpsycopg2-c4ebc0f702dfc3bae92ff3ffc01b9d6ed5d2fbde.tar.gz
Handle errors in asynchronous queries.
Do it by keeping the reference to the last PGresult in the cursor and calling pq_fetch() before ending the asynchronous execution. This takes care of handling the possible error state of the PGresult and also allows the removal of the needsfetch flag, since now after execution ends the results are already fetched and parsed.
Diffstat (limited to 'tests/test_async.py')
-rwxr-xr-xtests/test_async.py23
1 files changed, 23 insertions, 0 deletions
diff --git a/tests/test_async.py b/tests/test_async.py
index 5333b0d..bc23ab7 100755
--- a/tests/test_async.py
+++ b/tests/test_async.py
@@ -338,6 +338,29 @@ class AsyncTests(unittest.TestCase):
# fetching from the correct cursor works
self.assertEquals(cur1.fetchone()[0], 1)
+ def test_error(self):
+ cur = self.conn.cursor()
+ cur.execute("insert into table1 values (%s)", (1, ))
+ self.wait(cur)
+ cur.execute("insert into table1 values (%s)", (1, ))
+ # this should fail
+ self.assertRaises(psycopg2.IntegrityError, self.wait, cur)
+ cur.execute("insert into table1 values (%s); "
+ "insert into table1 values (%s)", (2, 2))
+ # this should fail as well
+ self.assertRaises(psycopg2.IntegrityError, self.wait, cur)
+ # but this should work
+ cur.execute("insert into table1 values (%s)", (2, ))
+ self.wait(cur)
+ # and the cursor should be usable afterwards
+ cur.execute("insert into table1 values (%s)", (3, ))
+ self.wait(cur)
+ cur.execute("select * from table1 order by id")
+ self.wait(cur)
+ self.assertEquals(cur.fetchall(), [(1, ), (2, ), (3, )])
+ cur.execute("delete from table1")
+ self.wait(cur)
+
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)