summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xtests/test_cursor.py454
1 files changed, 231 insertions, 223 deletions
diff --git a/tests/test_cursor.py b/tests/test_cursor.py
index 9bf9ccf..cc78338 100755
--- a/tests/test_cursor.py
+++ b/tests/test_cursor.py
@@ -36,7 +36,7 @@ from decimal import Decimal
from weakref import ref
from .testutils import (ConnectingTestCase, skip_before_postgres,
skip_if_no_getrefcount, slow, skip_if_no_superuser,
- skip_if_windows)
+ skip_if_windows, skip_if_crdb, crdb_version)
import psycopg2.extras
from psycopg2.compat import text_type
@@ -59,7 +59,7 @@ class CursorTests(ConnectingTestCase):
def test_executemany_propagate_exceptions(self):
conn = self.conn
cur = conn.cursor()
- cur.execute("create temp table test_exc (data int);")
+ cur.execute("create table test_exc (data int);")
def buggygen():
yield 1 // 0
@@ -177,9 +177,237 @@ class CursorTests(ConnectingTestCase):
curs = self.conn.cursor(None)
self.assertEqual(curs.name, None)
+ def test_description_attribs(self):
+ curs = self.conn.cursor()
+ curs.execute("""select
+ 3.14::decimal(10,2) as pi,
+ 'hello'::text as hi,
+ '2010-02-18'::date as now;
+ """)
+ self.assertEqual(len(curs.description), 3)
+ for c in curs.description:
+ self.assertEqual(len(c), 7) # DBAPI happy
+ for a in ('name', 'type_code', 'display_size', 'internal_size',
+ 'precision', 'scale', 'null_ok'):
+ self.assert_(hasattr(c, a), a)
+
+ c = curs.description[0]
+ self.assertEqual(c.name, 'pi')
+ self.assert_(c.type_code in psycopg2.extensions.DECIMAL.values)
+ if crdb_version(self.conn) is None:
+ self.assert_(c.internal_size > 0)
+ self.assertEqual(c.precision, 10)
+ self.assertEqual(c.scale, 2)
+
+ c = curs.description[1]
+ self.assertEqual(c.name, 'hi')
+ self.assert_(c.type_code in psycopg2.STRING.values)
+ self.assert_(c.internal_size < 0)
+ self.assertEqual(c.precision, None)
+ self.assertEqual(c.scale, None)
+
+ c = curs.description[2]
+ self.assertEqual(c.name, 'now')
+ self.assert_(c.type_code in psycopg2.extensions.DATE.values)
+ self.assert_(c.internal_size > 0)
+ self.assertEqual(c.precision, None)
+ self.assertEqual(c.scale, None)
+
+ @skip_if_crdb
+ def test_description_extra_attribs(self):
+ curs = self.conn.cursor()
+ curs.execute("""
+ create table testcol (
+ pi decimal(10,2),
+ hi text)
+ """)
+ curs.execute("select oid from pg_class where relname = %s", ('testcol',))
+ oid = curs.fetchone()[0]
+
+ curs.execute("insert into testcol values (3.14, 'hello')")
+ curs.execute("select hi, pi, 42 from testcol")
+ self.assertEqual(curs.description[0].table_oid, oid)
+ self.assertEqual(curs.description[0].table_column, 2)
+
+ self.assertEqual(curs.description[1].table_oid, oid)
+ self.assertEqual(curs.description[1].table_column, 1)
+
+ self.assertEqual(curs.description[2].table_oid, None)
+ self.assertEqual(curs.description[2].table_column, None)
+
+ def test_description_slice(self):
+ curs = self.conn.cursor()
+ curs.execute("select 1::int4 as a")
+ self.assertEqual(curs.description[0][0:2], ('a', 23))
+
+ def test_pickle_description(self):
+ curs = self.conn.cursor()
+ curs.execute('SELECT 1 AS foo')
+ description = curs.description
+
+ pickled = pickle.dumps(description, pickle.HIGHEST_PROTOCOL)
+ unpickled = pickle.loads(pickled)
+
+ self.assertEqual(description, unpickled)
+
+ def test_bad_subclass(self):
+ # check that we get an error message instead of a segfault
+ # for badly written subclasses.
+ # see https://stackoverflow.com/questions/22019341/
+ class StupidCursor(psycopg2.extensions.cursor):
+ def __init__(self, *args, **kwargs):
+ # I am stupid so not calling superclass init
+ pass
+
+ cur = StupidCursor()
+ self.assertRaises(psycopg2.InterfaceError, cur.execute, 'select 1')
+ self.assertRaises(psycopg2.InterfaceError, cur.executemany,
+ 'select 1', [])
+
+ def test_callproc_badparam(self):
+ cur = self.conn.cursor()
+ self.assertRaises(TypeError, cur.callproc, 'lower', 42)
+
+ # It would be inappropriate to test callproc's named parameters in the
+ # DBAPI2.0 test section because they are a psycopg2 extension.
+ @skip_before_postgres(9, 0)
+ @skip_if_crdb
+ def test_callproc_dict(self):
+ # This parameter name tests for injection and quote escaping
+ paramname = '''
+ Robert'); drop table "students" --
+ '''.strip()
+ escaped_paramname = '"%s"' % paramname.replace('"', '""')
+ procname = 'pg_temp.randall'
+
+ cur = self.conn.cursor()
+
+ # Set up the temporary function
+ cur.execute('''
+ CREATE FUNCTION %s(%s INT)
+ RETURNS INT AS
+ 'SELECT $1 * $1'
+ LANGUAGE SQL
+ ''' % (procname, escaped_paramname))
+
+ # Make sure callproc works right
+ cur.callproc(procname, {paramname: 2})
+ self.assertEquals(cur.fetchone()[0], 4)
+
+ # Make sure callproc fails right
+ failing_cases = [
+ ({paramname: 2, 'foo': 'bar'}, psycopg2.ProgrammingError),
+ ({paramname: '2'}, psycopg2.ProgrammingError),
+ ({paramname: 'two'}, psycopg2.ProgrammingError),
+ ({u'bj\xc3rn': 2}, psycopg2.ProgrammingError),
+ ({3: 2}, TypeError),
+ ({self: 2}, TypeError),
+ ]
+ for parameter_sequence, exception in failing_cases:
+ self.assertRaises(exception, cur.callproc, procname, parameter_sequence)
+ self.conn.rollback()
+
+ @skip_if_no_superuser
+ @skip_if_windows
+ @skip_if_crdb
+ @skip_before_postgres(8, 4)
+ def test_external_close_sync(self):
+ # If a "victim" connection is closed by a "control" connection
+ # behind psycopg2's back, psycopg2 always handles it correctly:
+ # raise OperationalError, set conn.closed to 2. This reproduces
+ # issue #443, a race between control_conn closing victim_conn and
+ # psycopg2 noticing.
+ control_conn = self.conn
+ connect_func = self.connect
+
+ def wait_func(conn):
+ pass
+
+ self._test_external_close(control_conn, connect_func, wait_func)
+
+ @skip_if_no_superuser
+ @skip_if_windows
+ @skip_if_crdb
+ @skip_before_postgres(8, 4)
+ def test_external_close_async(self):
+ # Issue #443 is in the async code too. Since the fix is duplicated,
+ # so is the test.
+ control_conn = self.conn
+
+ def connect_func():
+ return self.connect(async_=True)
+
+ wait_func = psycopg2.extras.wait_select
+ self._test_external_close(control_conn, connect_func, wait_func)
+
+ def _test_external_close(self, control_conn, connect_func, wait_func):
+ # The short sleep before using victim_conn the second time makes it
+ # much more likely to lose the race and see the bug. Repeating the
+ # test several times makes it even more likely.
+ for i in range(10):
+ victim_conn = connect_func()
+ wait_func(victim_conn)
+
+ with victim_conn.cursor() as cur:
+ cur.execute('select pg_backend_pid()')
+ wait_func(victim_conn)
+ pid1 = cur.fetchall()[0][0]
+
+ with control_conn.cursor() as cur:
+ cur.execute('select pg_terminate_backend(%s)', (pid1,))
+
+ time.sleep(0.001)
+
+ def f():
+ with victim_conn.cursor() as cur:
+ cur.execute('select 1')
+ wait_func(victim_conn)
+
+ self.assertRaises(psycopg2.OperationalError, f)
+
+ self.assertEqual(victim_conn.closed, 2)
+
+ @skip_before_postgres(8, 2)
+ def test_rowcount_on_executemany_returning(self):
+ cur = self.conn.cursor()
+ cur.execute("create table execmany(id serial primary key, data int)")
+ cur.executemany(
+ "insert into execmany (data) values (%s)",
+ [(i,) for i in range(4)])
+ self.assertEqual(cur.rowcount, 4)
+
+ cur.executemany(
+ "insert into execmany (data) values (%s) returning data",
+ [(i,) for i in range(5)])
+ self.assertEqual(cur.rowcount, 5)
+
+ @skip_before_postgres(9)
+ def test_pgresult_ptr(self):
+ curs = self.conn.cursor()
+ self.assert_(curs.pgresult_ptr is None)
+
+ curs.execute("select 'x'")
+ self.assert_(curs.pgresult_ptr is not None)
+
+ try:
+ f = self.libpq.PQcmdStatus
+ except AttributeError:
+ pass
+ else:
+ f.argtypes = [ctypes.c_void_p]
+ f.restype = ctypes.c_char_p
+ status = f(curs.pgresult_ptr)
+ self.assertEqual(status, b'SELECT 1')
+
+ curs.close()
+ self.assert_(curs.pgresult_ptr is None)
+
+
+@skip_if_crdb
+class NamedCursorTests(ConnectingTestCase):
def test_invalid_name(self):
curs = self.conn.cursor()
- curs.execute("create temp table invname (data int);")
+ curs.execute("create table invname (data int);")
for i in (10, 20, 30):
curs.execute("insert into invname values (%s)", (i,))
curs.close()
@@ -377,77 +605,6 @@ class CursorTests(ConnectingTestCase):
for i, rec in enumerate(curs):
self.assertEqual(i + 1, curs.rownumber)
- def test_description_attribs(self):
- curs = self.conn.cursor()
- curs.execute("""select
- 3.14::decimal(10,2) as pi,
- 'hello'::text as hi,
- '2010-02-18'::date as now;
- """)
- self.assertEqual(len(curs.description), 3)
- for c in curs.description:
- self.assertEqual(len(c), 7) # DBAPI happy
- for a in ('name', 'type_code', 'display_size', 'internal_size',
- 'precision', 'scale', 'null_ok'):
- self.assert_(hasattr(c, a), a)
-
- c = curs.description[0]
- self.assertEqual(c.name, 'pi')
- self.assert_(c.type_code in psycopg2.extensions.DECIMAL.values)
- self.assert_(c.internal_size > 0)
- self.assertEqual(c.precision, 10)
- self.assertEqual(c.scale, 2)
-
- c = curs.description[1]
- self.assertEqual(c.name, 'hi')
- self.assert_(c.type_code in psycopg2.STRING.values)
- self.assert_(c.internal_size < 0)
- self.assertEqual(c.precision, None)
- self.assertEqual(c.scale, None)
-
- c = curs.description[2]
- self.assertEqual(c.name, 'now')
- self.assert_(c.type_code in psycopg2.extensions.DATE.values)
- self.assert_(c.internal_size > 0)
- self.assertEqual(c.precision, None)
- self.assertEqual(c.scale, None)
-
- def test_description_extra_attribs(self):
- curs = self.conn.cursor()
- curs.execute("""
- create table testcol (
- pi decimal(10,2),
- hi text)
- """)
- curs.execute("select oid from pg_class where relname = %s", ('testcol',))
- oid = curs.fetchone()[0]
-
- curs.execute("insert into testcol values (3.14, 'hello')")
- curs.execute("select hi, pi, 42 from testcol")
- self.assertEqual(curs.description[0].table_oid, oid)
- self.assertEqual(curs.description[0].table_column, 2)
-
- self.assertEqual(curs.description[1].table_oid, oid)
- self.assertEqual(curs.description[1].table_column, 1)
-
- self.assertEqual(curs.description[2].table_oid, None)
- self.assertEqual(curs.description[2].table_column, None)
-
- def test_description_slice(self):
- curs = self.conn.cursor()
- curs.execute("select 1::int as a")
- self.assertEqual(curs.description[0][0:2], ('a', 23))
-
- def test_pickle_description(self):
- curs = self.conn.cursor()
- curs.execute('SELECT 1 AS foo')
- description = curs.description
-
- pickled = pickle.dumps(description, pickle.HIGHEST_PROTOCOL)
- unpickled = pickle.loads(pickled)
-
- self.assertEqual(description, unpickled)
-
@skip_before_postgres(8, 0)
def test_named_cursor_stealing(self):
# you can use a named cursor to iterate on a refcursor created
@@ -527,155 +684,6 @@ class CursorTests(ConnectingTestCase):
cur.scroll(9, mode='absolute')
self.assertEqual(cur.fetchone(), (9,))
- def test_bad_subclass(self):
- # check that we get an error message instead of a segfault
- # for badly written subclasses.
- # see https://stackoverflow.com/questions/22019341/
- class StupidCursor(psycopg2.extensions.cursor):
- def __init__(self, *args, **kwargs):
- # I am stupid so not calling superclass init
- pass
-
- cur = StupidCursor()
- self.assertRaises(psycopg2.InterfaceError, cur.execute, 'select 1')
- self.assertRaises(psycopg2.InterfaceError, cur.executemany,
- 'select 1', [])
-
- def test_callproc_badparam(self):
- cur = self.conn.cursor()
- self.assertRaises(TypeError, cur.callproc, 'lower', 42)
-
- # It would be inappropriate to test callproc's named parameters in the
- # DBAPI2.0 test section because they are a psycopg2 extension.
- @skip_before_postgres(9, 0)
- def test_callproc_dict(self):
- # This parameter name tests for injection and quote escaping
- paramname = '''
- Robert'); drop table "students" --
- '''.strip()
- escaped_paramname = '"%s"' % paramname.replace('"', '""')
- procname = 'pg_temp.randall'
-
- cur = self.conn.cursor()
-
- # Set up the temporary function
- cur.execute('''
- CREATE FUNCTION %s(%s INT)
- RETURNS INT AS
- 'SELECT $1 * $1'
- LANGUAGE SQL
- ''' % (procname, escaped_paramname))
-
- # Make sure callproc works right
- cur.callproc(procname, {paramname: 2})
- self.assertEquals(cur.fetchone()[0], 4)
-
- # Make sure callproc fails right
- failing_cases = [
- ({paramname: 2, 'foo': 'bar'}, psycopg2.ProgrammingError),
- ({paramname: '2'}, psycopg2.ProgrammingError),
- ({paramname: 'two'}, psycopg2.ProgrammingError),
- ({u'bj\xc3rn': 2}, psycopg2.ProgrammingError),
- ({3: 2}, TypeError),
- ({self: 2}, TypeError),
- ]
- for parameter_sequence, exception in failing_cases:
- self.assertRaises(exception, cur.callproc, procname, parameter_sequence)
- self.conn.rollback()
-
- @skip_if_no_superuser
- @skip_if_windows
- @skip_before_postgres(8, 4)
- def test_external_close_sync(self):
- # If a "victim" connection is closed by a "control" connection
- # behind psycopg2's back, psycopg2 always handles it correctly:
- # raise OperationalError, set conn.closed to 2. This reproduces
- # issue #443, a race between control_conn closing victim_conn and
- # psycopg2 noticing.
- control_conn = self.conn
- connect_func = self.connect
-
- def wait_func(conn):
- pass
-
- self._test_external_close(control_conn, connect_func, wait_func)
-
- @skip_if_no_superuser
- @skip_if_windows
- @skip_before_postgres(8, 4)
- def test_external_close_async(self):
- # Issue #443 is in the async code too. Since the fix is duplicated,
- # so is the test.
- control_conn = self.conn
-
- def connect_func():
- return self.connect(async_=True)
-
- wait_func = psycopg2.extras.wait_select
- self._test_external_close(control_conn, connect_func, wait_func)
-
- def _test_external_close(self, control_conn, connect_func, wait_func):
- # The short sleep before using victim_conn the second time makes it
- # much more likely to lose the race and see the bug. Repeating the
- # test several times makes it even more likely.
- for i in range(10):
- victim_conn = connect_func()
- wait_func(victim_conn)
-
- with victim_conn.cursor() as cur:
- cur.execute('select pg_backend_pid()')
- wait_func(victim_conn)
- pid1 = cur.fetchall()[0][0]
-
- with control_conn.cursor() as cur:
- cur.execute('select pg_terminate_backend(%s)', (pid1,))
-
- time.sleep(0.001)
-
- def f():
- with victim_conn.cursor() as cur:
- cur.execute('select 1')
- wait_func(victim_conn)
-
- self.assertRaises(psycopg2.OperationalError, f)
-
- self.assertEqual(victim_conn.closed, 2)
-
- @skip_before_postgres(8, 2)
- def test_rowcount_on_executemany_returning(self):
- cur = self.conn.cursor()
- cur.execute("create table execmany(id serial primary key, data int)")
- cur.executemany(
- "insert into execmany (data) values (%s)",
- [(i,) for i in range(4)])
- self.assertEqual(cur.rowcount, 4)
-
- cur.executemany(
- "insert into execmany (data) values (%s) returning data",
- [(i,) for i in range(5)])
- self.assertEqual(cur.rowcount, 5)
-
- @skip_before_postgres(9)
- def test_pgresult_ptr(self):
- curs = self.conn.cursor()
- self.assert_(curs.pgresult_ptr is None)
-
- curs.execute("select 'x'")
- self.assert_(curs.pgresult_ptr is not None)
-
- try:
- f = self.libpq.PQcmdStatus
- except AttributeError:
- pass
- else:
- f.argtypes = [ctypes.c_void_p]
- f.restype = ctypes.c_char_p
- status = f(curs.pgresult_ptr)
- self.assertEqual(status, b'SELECT 1')
-
- curs.close()
- self.assert_(curs.pgresult_ptr is None)
-
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)