diff options
Diffstat (limited to 'tests')
-rwxr-xr-x | tests/test_async.py | 6 | ||||
-rwxr-xr-x | tests/test_connection.py | 120 | ||||
-rwxr-xr-x | tests/test_cursor.py | 40 | ||||
-rwxr-xr-x | tests/test_green.py | 8 | ||||
-rwxr-xr-x | tests/test_module.py | 13 | ||||
-rwxr-xr-x | tests/test_quote.py | 4 | ||||
-rwxr-xr-x | tests/test_sql.py | 28 | ||||
-rwxr-xr-x | tests/test_types_basic.py | 4 | ||||
-rwxr-xr-x | tests/test_types_extras.py | 46 | ||||
-rwxr-xr-x | tests/test_with.py | 2 | ||||
-rw-r--r-- | tests/testutils.py | 2 |
11 files changed, 203 insertions, 70 deletions
diff --git a/tests/test_async.py b/tests/test_async.py index ce5e9b1..dfdc44e 100755 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -450,6 +450,12 @@ class AsyncTests(ConnectingTestCase): else: self.fail("no exception raised") + @skip_before_postgres(8, 2) + def test_copy_no_hang(self): + cur = self.conn.cursor() + cur.execute("copy (select 1) to stdout") + self.assertRaises(psycopg2.ProgrammingError, self.wait, self.conn) + def test_suite(): return unittest.TestLoader().loadTestsFromName(__name__) diff --git a/tests/test_connection.py b/tests/test_connection.py index 13635f1..f9fdff6 100755 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -35,13 +35,11 @@ import psycopg2.errorcodes from psycopg2 import extensions as ext from .testutils import ( - unittest, decorate_all_tests, skip_if_no_superuser, - skip_before_postgres, skip_after_postgres, skip_before_libpq, - ConnectingTestCase, skip_if_tpc_disabled, skip_if_windows, slow, - libpq_version -) + unittest, decorate_all_tests, skip_if_no_superuser, skip_before_postgres, + skip_after_postgres, skip_before_libpq, skip_after_libpq, + ConnectingTestCase, skip_if_tpc_disabled, skip_if_windows, slow) -from .testconfig import dsn, dbname +from .testconfig import dbhost, dsn, dbname class ConnectionTests(ConnectingTestCase): @@ -805,6 +803,14 @@ class IsolationLevelsTestCase(ConnectingTestCase): self.assertRaises(ValueError, setattr, self.conn, 'isolation_level', 5) self.assertRaises(ValueError, setattr, self.conn, 'isolation_level', 'bah') + def test_attribs_segfault(self): + # bug #790 + for i in range(10000): + self.conn.autocommit + self.conn.readonly + self.conn.deferrable + self.conn.isolation_level + class ConnectionTwoPhaseTests(ConnectingTestCase): def setUp(self): @@ -1411,55 +1417,37 @@ class TransactionControlTests(ConnectingTestCase): class TestEncryptPassword(ConnectingTestCase): @skip_before_postgres(10) def test_encrypt_password_post_9_6(self): - cur = self.conn.cursor() - cur.execute("SHOW password_encryption;") - server_encryption_algorithm = cur.fetchone()[0] - # MD5 algorithm self.assertEqual( ext.encrypt_password('psycopg2', 'ashesh', self.conn, 'md5'), - 'md594839d658c28a357126f105b9cb14cfc' - ) + 'md594839d658c28a357126f105b9cb14cfc') # keywords self.assertEqual( ext.encrypt_password( password='psycopg2', user='ashesh', scope=self.conn, algorithm='md5'), - 'md594839d658c28a357126f105b9cb14cfc' - ) - if libpq_version() < 100000: - self.assertRaises( - psycopg2.NotSupportedError, - ext.encrypt_password, 'psycopg2', 'ashesh', self.conn, - 'scram-sha-256' - ) - else: - enc_password = ext.encrypt_password( - 'psycopg2', 'ashesh', self.conn - ) - if server_encryption_algorithm == 'md5': - self.assertEqual( - enc_password, 'md594839d658c28a357126f105b9cb14cfc' - ) - elif server_encryption_algorithm == 'scram-sha-256': - self.assertEqual(enc_password[:14], 'SCRAM-SHA-256$') + 'md594839d658c28a357126f105b9cb14cfc') - self.assertEqual( - ext.encrypt_password( - 'psycopg2', 'ashesh', self.conn, 'scram-sha-256' - )[:14], 'SCRAM-SHA-256$' - ) + @skip_before_postgres(10) + def test_encrypt_server(self): + cur = self.conn.cursor() + cur.execute("SHOW password_encryption;") + server_encryption_algorithm = cur.fetchone()[0] - self.assertRaises(psycopg2.ProgrammingError, - ext.encrypt_password, 'psycopg2', 'ashesh', self.conn, 'abc') + enc_password = ext.encrypt_password( + 'psycopg2', 'ashesh', self.conn) + + if server_encryption_algorithm == 'md5': + self.assertEqual( + enc_password, 'md594839d658c28a357126f105b9cb14cfc') + elif server_encryption_algorithm == 'scram-sha-256': + self.assertEqual(enc_password[:14], 'SCRAM-SHA-256$') - @skip_after_postgres(10) - def test_encrypt_password_pre_10(self): self.assertEqual( - ext.encrypt_password('psycopg2', 'ashesh', self.conn), - 'md594839d658c28a357126f105b9cb14cfc' - ) + ext.encrypt_password( + 'psycopg2', 'ashesh', self.conn, 'scram-sha-256' + )[:14], 'SCRAM-SHA-256$') self.assertRaises(psycopg2.ProgrammingError, ext.encrypt_password, 'psycopg2', 'ashesh', self.conn, 'abc') @@ -1467,20 +1455,31 @@ class TestEncryptPassword(ConnectingTestCase): def test_encrypt_md5(self): self.assertEqual( ext.encrypt_password('psycopg2', 'ashesh', algorithm='md5'), - 'md594839d658c28a357126f105b9cb14cfc' - ) + 'md594839d658c28a357126f105b9cb14cfc') + @skip_before_libpq(10) + def test_encrypt_bad_libpq_10(self): + self.assertRaises(psycopg2.ProgrammingError, + ext.encrypt_password, 'psycopg2', 'ashesh', self.conn, 'abc') + + @skip_after_libpq(10) + def test_encrypt_bad_before_libpq_10(self): + self.assertRaises(psycopg2.NotSupportedError, + ext.encrypt_password, 'psycopg2', 'ashesh', self.conn, 'abc') + + @skip_before_libpq(10) def test_encrypt_scram(self): - if libpq_version() >= 100000: - self.assert_( - ext.encrypt_password( - 'psycopg2', 'ashesh', self.conn, 'scram-sha-256') - .startswith('SCRAM-SHA-256$')) - else: - self.assertRaises(psycopg2.NotSupportedError, - ext.encrypt_password, - password='psycopg2', user='ashesh', - scope=self.conn, algorithm='scram-sha-256') + self.assert_( + ext.encrypt_password( + 'psycopg2', 'ashesh', self.conn, 'scram-sha-256') + .startswith('SCRAM-SHA-256$')) + + @skip_after_libpq(10) + def test_encrypt_scram_pre_10(self): + self.assertRaises(psycopg2.NotSupportedError, + ext.encrypt_password, + password='psycopg2', user='ashesh', + scope=self.conn, algorithm='scram-sha-256') def test_bad_types(self): self.assertRaises(TypeError, ext.encrypt_password) @@ -1691,6 +1690,19 @@ while True: self.assert_(not err, err) +class TestConnectionProps(ConnectingTestCase): + def test_host(self): + self.assertFalse(self.conn.closed) + expected = dbhost if dbhost else "/" + self.assertIn(expected, self.conn.host) + + def test_host_readonly(self): + self.assertFalse(self.conn.closed) + with self.assertRaises(AttributeError): + self.conn.host = 'override' + + + def test_suite(): return unittest.TestLoader().loadTestsFromName(__name__) diff --git a/tests/test_cursor.py b/tests/test_cursor.py index b48fe7f..d048f3e 100755 --- a/tests/test_cursor.py +++ b/tests/test_cursor.py @@ -377,7 +377,7 @@ class CursorTests(ConnectingTestCase): for i, rec in enumerate(curs): self.assertEqual(i + 1, curs.rownumber) - def test_namedtuple_description(self): + def test_description_attribs(self): curs = self.conn.cursor() curs.execute("""select 3.14::decimal(10,2) as pi, @@ -412,6 +412,27 @@ class CursorTests(ConnectingTestCase): self.assertEqual(c.precision, None) self.assertEqual(c.scale, None) + def test_description_extra_attribs(self): + curs = self.conn.cursor() + curs.execute(""" + create table testcol ( + pi decimal(10,2), + hi text) + """) + curs.execute("select oid from pg_class where relname = %s", ('testcol',)) + oid = curs.fetchone()[0] + + curs.execute("insert into testcol values (3.14, 'hello')") + curs.execute("select hi, pi, 42 from testcol") + self.assertEqual(curs.description[0].table_oid, oid) + self.assertEqual(curs.description[0].table_column, 2) + + self.assertEqual(curs.description[1].table_oid, oid) + self.assertEqual(curs.description[1].table_column, 1) + + self.assertEqual(curs.description[2].table_oid, None) + self.assertEqual(curs.description[2].table_column, None) + def test_pickle_description(self): curs = self.conn.cursor() curs.execute('SELECT 1 AS foo') @@ -436,11 +457,24 @@ class CursorTests(ConnectingTestCase): self.assertEqual([(2,), (3,), (4,)], cur2.fetchmany(3)) self.assertEqual([(5,), (6,), (7,)], cur2.fetchall()) - @skip_before_postgres(8, 0) + @skip_before_postgres(8, 2) def test_named_noop_close(self): cur = self.conn.cursor('test') cur.close() + @skip_before_postgres(8, 2) + def test_stolen_named_cursor_close(self): + cur1 = self.conn.cursor() + cur1.execute("DECLARE test CURSOR WITHOUT HOLD " + " FOR SELECT generate_series(1,7)") + cur2 = self.conn.cursor('test') + cur2.close() + + cur1.execute("DECLARE test CURSOR WITHOUT HOLD " + " FOR SELECT generate_series(1,7)") + cur2 = self.conn.cursor('test') + cur2.close() + @skip_before_postgres(8, 0) def test_scroll(self): cur = self.conn.cursor() @@ -491,7 +525,7 @@ class CursorTests(ConnectingTestCase): def test_bad_subclass(self): # check that we get an error message instead of a segfault # for badly written subclasses. - # see http://stackoverflow.com/questions/22019341/ + # see https://stackoverflow.com/questions/22019341/ class StupidCursor(psycopg2.extensions.cursor): def __init__(self, *args, **kwargs): # I am stupid so not calling superclass init diff --git a/tests/test_green.py b/tests/test_green.py index 65f483c..b726c46 100755 --- a/tests/test_green.py +++ b/tests/test_green.py @@ -27,7 +27,7 @@ import psycopg2 import psycopg2.extensions import psycopg2.extras -from .testutils import ConnectingTestCase, slow +from .testutils import ConnectingTestCase, skip_before_postgres, slow class ConnectionStub(object): @@ -111,6 +111,12 @@ class GreenTestCase(ConnectingTestCase): curs.execute("select 1") self.assertEqual(curs.fetchone()[0], 1) + @skip_before_postgres(8, 2) + def test_copy_no_hang(self): + cur = self.conn.cursor() + self.assertRaises(psycopg2.ProgrammingError, + cur.execute, "copy (select 1) to stdout") + class CallbackErrorTestCase(ConnectingTestCase): def setUp(self): diff --git a/tests/test_module.py b/tests/test_module.py index ddd6b02..cf3eb02 100755 --- a/tests/test_module.py +++ b/tests/test_module.py @@ -173,8 +173,8 @@ class ExceptionsTestCase(ConnectingTestCase): 'column_name', 'constraint_name', 'context', 'datatype_name', 'internal_position', 'internal_query', 'message_detail', 'message_hint', 'message_primary', 'schema_name', 'severity', - 'source_file', 'source_function', 'source_line', 'sqlstate', - 'statement_position', 'table_name', ]: + 'severity_nonlocalized', 'source_file', 'source_function', + 'source_line', 'sqlstate', 'statement_position', 'table_name', ]: v = getattr(diag, attr) if v is not None: self.assert_(isinstance(v, str)) @@ -276,6 +276,15 @@ class ExceptionsTestCase(ConnectingTestCase): self.assertEqual(e.diag.constraint_name, "chk_eq1") self.assertEqual(e.diag.datatype_name, None) + @skip_before_postgres(9, 6) + def test_9_6_diagnostics(self): + cur = self.conn.cursor() + try: + cur.execute("select 1 from nosuchtable") + except psycopg2.Error as exc: + e = exc + self.assertEqual(e.diag.severity_nonlocalized, 'ERROR') + def test_pickle(self): import pickle cur = self.conn.cursor() diff --git a/tests/test_quote.py b/tests/test_quote.py index fad6cee..ee72350 100755 --- a/tests/test_quote.py +++ b/tests/test_quote.py @@ -46,8 +46,8 @@ class QuotingTestCase(ConnectingTestCase): The tests also check that no warning is raised ('escape_string_warning' should be on). - http://www.postgresql.org/docs/current/static/sql-syntax-lexical.html#SQL-SYNTAX-STRINGS - http://www.postgresql.org/docs/current/static/runtime-config-compatible.html + https://www.postgresql.org/docs/current/static/sql-syntax-lexical.html#SQL-SYNTAX-STRINGS + https://www.postgresql.org/docs/current/static/runtime-config-compatible.html """ def test_string(self): data = """some data with \t chars diff --git a/tests/test_sql.py b/tests/test_sql.py index 81b22a4..cc9bba2 100755 --- a/tests/test_sql.py +++ b/tests/test_sql.py @@ -24,9 +24,8 @@ import datetime as dt import unittest -from .testutils import (ConnectingTestCase, - skip_before_postgres, skip_before_python, skip_copy_if_green, - StringIO) +from .testutils import ( + ConnectingTestCase, skip_before_postgres, skip_copy_if_green, StringIO) import psycopg2 from psycopg2 import sql @@ -181,26 +180,43 @@ class IdentifierTests(ConnectingTestCase): def test_init(self): self.assert_(isinstance(sql.Identifier('foo'), sql.Identifier)) self.assert_(isinstance(sql.Identifier(u'foo'), sql.Identifier)) + self.assert_(isinstance(sql.Identifier('foo', 'bar', 'baz'), sql.Identifier)) + self.assertRaises(TypeError, sql.Identifier) self.assertRaises(TypeError, sql.Identifier, 10) self.assertRaises(TypeError, sql.Identifier, dt.date(2016, 12, 31)) - def test_string(self): + def test_strings(self): + self.assertEqual(sql.Identifier('foo').strings, ('foo',)) + self.assertEqual(sql.Identifier('foo', 'bar').strings, ('foo', 'bar')) + + # Legacy method self.assertEqual(sql.Identifier('foo').string, 'foo') + self.assertRaises(AttributeError, + getattr, sql.Identifier('foo', 'bar'), 'string') def test_repr(self): obj = sql.Identifier("fo'o") self.assertEqual(repr(obj), 'Identifier("fo\'o")') self.assertEqual(repr(obj), str(obj)) + obj = sql.Identifier("fo'o", 'ba"r') + self.assertEqual(repr(obj), 'Identifier("fo\'o", \'ba"r\')') + self.assertEqual(repr(obj), str(obj)) + def test_eq(self): self.assert_(sql.Identifier('foo') == sql.Identifier('foo')) + self.assert_(sql.Identifier('foo', 'bar') == sql.Identifier('foo', 'bar')) self.assert_(sql.Identifier('foo') != sql.Identifier('bar')) self.assert_(sql.Identifier('foo') != 'foo') self.assert_(sql.Identifier('foo') != sql.SQL('foo')) def test_as_str(self): - self.assertEqual(sql.Identifier('foo').as_string(self.conn), '"foo"') - self.assertEqual(sql.Identifier("fo'o").as_string(self.conn), '"fo\'o"') + self.assertEqual( + sql.Identifier('foo').as_string(self.conn), '"foo"') + self.assertEqual( + sql.Identifier('foo', 'bar').as_string(self.conn), '"foo"."bar"') + self.assertEqual( + sql.Identifier("fo'o", 'ba"r').as_string(self.conn), '"fo\'o"."ba""r"') def test_join(self): self.assert_(not hasattr(sql.Identifier('foo'), 'join')) diff --git a/tests/test_types_basic.py b/tests/test_types_basic.py index 76b9aa3..510bdca 100755 --- a/tests/test_types_basic.py +++ b/tests/test_types_basic.py @@ -167,6 +167,10 @@ class TypesBasicTests(ConnectingTestCase): curs.execute("select col from array_test where id = 2") self.assertEqual(curs.fetchone()[0], []) + # issue #788 (test commented out until issue fixed) + #curs.execute("select null = any(%s)", ([[]], )) + #self.assertFalse(curs.fetchone()[0]) + def testEmptyArrayNoCast(self): s = self.execute("SELECT '{}' AS foo") self.assertEqual(s, '{}') diff --git a/tests/test_types_extras.py b/tests/test_types_extras.py index cda163b..ca31c05 100755 --- a/tests/test_types_extras.py +++ b/tests/test_types_extras.py @@ -1386,6 +1386,52 @@ class RangeTestCase(unittest.TestCase): r = Range(0, 4) self.assertEqual(loads(dumps(r)), r) + def test_str(self): + ''' + Range types should have a short and readable ``str`` implementation. + + Using ``repr`` for all string conversions can be very unreadable for + longer types like ``DateTimeTZRange``. + ''' + from psycopg2.extras import Range + + # Using the "u" prefix to make sure we have the proper return types in + # Python2 + expected = [ + u'(0, 4)', + u'[0, 4]', + u'(0, 4]', + u'[0, 4)', + u'empty', + ] + results = [] + + converter = unicode if sys.version_info < (3, 0) else str + + for bounds in ('()', '[]', '(]', '[)'): + r = Range(0, 4, bounds=bounds) + results.append(converter(r)) + + r = Range(empty=True) + results.append(converter(r)) + self.assertEqual(results, expected) + + def test_str_datetime(self): + ''' + Date-Time ranges should return a human-readable string as well on + string conversion. + ''' + from psycopg2.extras import DateTimeTZRange + from datetime import datetime + from psycopg2.tz import FixedOffsetTimezone + converter = unicode if sys.version_info < (3, 0) else str + tz = FixedOffsetTimezone(-5*60, "EST") + r = DateTimeTZRange(datetime(2010, 1, 1, tzinfo=tz), + datetime(2011, 1, 1, tzinfo=tz)) + expected = u'[2010-01-01 00:00:00-05:00, 2011-01-01 00:00:00-05:00)' + result = converter(r) + self.assertEqual(result, expected) + def skip_if_no_range(f): @wraps(f) diff --git a/tests/test_with.py b/tests/test_with.py index f26f8f9..1e4c518 100755 --- a/tests/test_with.py +++ b/tests/test_with.py @@ -215,7 +215,7 @@ class WithCursorTestCase(WithTestCase): else: self.fail("where is my exception?") - @skip_before_postgres(8, 0) + @skip_before_postgres(8, 2) def test_named_with_noop(self): with self.conn.cursor('named') as cur: pass diff --git a/tests/testutils.py b/tests/testutils.py index 3bb72e2..11138f0 100644 --- a/tests/testutils.py +++ b/tests/testutils.py @@ -50,7 +50,7 @@ else: # Silence warnings caused by the stubbornness of the Python unittest # maintainers -# http://bugs.python.org/issue9424 +# https://bugs.python.org/issue9424 if (not hasattr(unittest.TestCase, 'assert_') or unittest.TestCase.assert_ is not unittest.TestCase.assertTrue): # mavaff... |