summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>2019-03-16 19:53:45 +0000
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>2019-03-16 19:53:45 +0000
commit4ace9544ff815874ea7824eaf52a026276f5c154 (patch)
tree01ad6655d7a2f6cc691ea259df14bb864a7ae376
parentd90ad8627d0a7244abeee7cd3a9418d529dc1428 (diff)
parente8135ee2cf511aa855c62fc074f414617666df43 (diff)
downloadpsycopg2-4ace9544ff815874ea7824eaf52a026276f5c154.tar.gz
Merge branch 'fix-imports'
-rwxr-xr-xtests/test_async.py10
-rwxr-xr-xtests/test_connection.py22
-rwxr-xr-xtests/test_copy.py20
-rwxr-xr-xtests/test_cursor.py4
-rwxr-xr-xtests/test_dates.py29
-rwxr-xr-xtests/test_errcodes.py8
-rwxr-xr-xtests/test_green.py2
-rwxr-xr-xtests/test_ipaddress.py12
-rwxr-xr-xtests/test_module.py3
-rwxr-xr-xtests/test_replication.py2
-rwxr-xr-xtests/test_types_basic.py9
-rwxr-xr-xtests/test_types_extras.py7
-rw-r--r--tests/testutils.py27
13 files changed, 54 insertions, 101 deletions
diff --git a/tests/test_async.py b/tests/test_async.py
index 894d810..057deac 100755
--- a/tests/test_async.py
+++ b/tests/test_async.py
@@ -23,15 +23,15 @@
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
+import gc
+import time
import unittest
-from .testutils import skip_before_postgres, slow
+import warnings
import psycopg2
from psycopg2 import extensions as ext
-import time
-
-from .testutils import ConnectingTestCase, StringIO
+from .testutils import ConnectingTestCase, StringIO, skip_before_postgres, slow
class PollableStub(object):
@@ -338,7 +338,6 @@ class AsyncTests(ConnectingTestCase):
# on high load on linux: probably because the kernel has more
# buffers ready. A warning may be useful during development,
# but an error is bad during regression testing.
- import warnings
warnings.warn("sending a large query didn't trigger block on write.")
def test_sync_poll(self):
@@ -427,7 +426,6 @@ class AsyncTests(ConnectingTestCase):
self.assert_(self.conn.notices)
def test_async_cursor_gone(self):
- import gc
cur = self.conn.cursor()
cur.execute("select 42;")
del cur
diff --git a/tests/test_connection.py b/tests/test_connection.py
index 1c35e8b..0ad7d37 100755
--- a/tests/test_connection.py
+++ b/tests/test_connection.py
@@ -35,11 +35,8 @@ from operator import attrgetter
from weakref import ref
import psycopg2
-import psycopg2.errorcodes
+import psycopg2.extras
from psycopg2 import extensions as ext
-from psycopg2 import ProgrammingError
-from psycopg2.extensions import Xid
-from psycopg2.extras import RealDictConnection
from .testutils import (
unittest, skip_if_no_superuser, skip_before_postgres,
@@ -296,7 +293,6 @@ class ConnectionTests(ConnectingTestCase):
self.assert_(not notices, "%d notices raised" % len(notices))
def test_connect_cursor_factory(self):
- import psycopg2.extras
conn = self.connect(cursor_factory=psycopg2.extras.DictCursor)
cur = conn.cursor()
cur.execute("select 1 as a")
@@ -369,7 +365,7 @@ class ParseDsnTestCase(ConnectingTestCase):
dict(user='tester', password='secret', dbname='test'),
"simple DSN parsed")
- self.assertRaises(ProgrammingError, ext.parse_dsn,
+ self.assertRaises(psycopg2.ProgrammingError, ext.parse_dsn,
"dbname=test 2 user=tester password=secret")
self.assertEqual(
@@ -383,7 +379,7 @@ class ParseDsnTestCase(ConnectingTestCase):
try:
# unterminated quote after dbname:
ext.parse_dsn("dbname='test 2 user=tester password=secret")
- except ProgrammingError as e:
+ except psycopg2.ProgrammingError as e:
raised = True
self.assertTrue(str(e).find('secret') < 0,
"DSN was not exposed in error message")
@@ -1124,27 +1120,27 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
cnn.tpc_rollback(xid)
def test_xid_construction(self):
- x1 = Xid(74, 'foo', 'bar')
+ x1 = ext.Xid(74, 'foo', 'bar')
self.assertEqual(74, x1.format_id)
self.assertEqual('foo', x1.gtrid)
self.assertEqual('bar', x1.bqual)
def test_xid_from_string(self):
- x2 = Xid.from_string('42_Z3RyaWQ=_YnF1YWw=')
+ x2 = ext.Xid.from_string('42_Z3RyaWQ=_YnF1YWw=')
self.assertEqual(42, x2.format_id)
self.assertEqual('gtrid', x2.gtrid)
self.assertEqual('bqual', x2.bqual)
- x3 = Xid.from_string('99_xxx_yyy')
+ x3 = ext.Xid.from_string('99_xxx_yyy')
self.assertEqual(None, x3.format_id)
self.assertEqual('99_xxx_yyy', x3.gtrid)
self.assertEqual(None, x3.bqual)
def test_xid_to_string(self):
- x1 = Xid.from_string('42_Z3RyaWQ=_YnF1YWw=')
+ x1 = ext.Xid.from_string('42_Z3RyaWQ=_YnF1YWw=')
self.assertEqual(str(x1), '42_Z3RyaWQ=_YnF1YWw=')
- x2 = Xid.from_string('99_xxx_yyy')
+ x2 = ext.Xid.from_string('99_xxx_yyy')
self.assertEqual(str(x2), '99_xxx_yyy')
def test_xid_unicode(self):
@@ -1180,7 +1176,7 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
self.assertRaises(psycopg2.ProgrammingError, cnn.cancel)
def test_tpc_recover_non_dbapi_connection(self):
- cnn = self.connect(connection_factory=RealDictConnection)
+ cnn = self.connect(connection_factory=psycopg2.extras.RealDictConnection)
cnn.tpc_begin('dict-connection')
cnn.tpc_prepare()
cnn.reset()
diff --git a/tests/test_copy.py b/tests/test_copy.py
index e800aed..05513f5 100755
--- a/tests/test_copy.py
+++ b/tests/test_copy.py
@@ -22,6 +22,7 @@
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
+import io
import sys
import string
import unittest
@@ -31,17 +32,11 @@ from subprocess import Popen, PIPE
import psycopg2
import psycopg2.extensions
-from .testutils import skip_copy_if_green
+from .testutils import skip_copy_if_green, TextIOBase
from .testconfig import dsn
-if sys.version_info[0] < 3:
- _base = object
-else:
- from io import TextIOBase as _base
-
-
-class MinimalRead(_base):
+class MinimalRead(TextIOBase):
"""A file wrapper exposing the minimal interface to copy from."""
def __init__(self, f):
self.f = f
@@ -53,7 +48,7 @@ class MinimalRead(_base):
return self.f.readline()
-class MinimalWrite(_base):
+class MinimalWrite(TextIOBase):
"""A file wrapper exposing the minimal interface to copy to."""
def __init__(self, f):
self.f = f
@@ -148,7 +143,6 @@ class CopyTests(ConnectingTestCase):
curs.execute('insert into tcopy values (%s, %s)',
(42, abin))
- import io
f = io.StringIO()
curs.copy_to(f, 'tcopy', columns=('data',))
f.seek(0)
@@ -170,7 +164,6 @@ class CopyTests(ConnectingTestCase):
curs.execute('insert into tcopy values (%s, %s)',
(42, abin))
- import io
f = io.BytesIO()
curs.copy_to(f, 'tcopy', columns=('data',))
f.seek(0)
@@ -190,7 +183,6 @@ class CopyTests(ConnectingTestCase):
+ list(range(160, 256))).decode('latin1')
about = abin.replace('\\', '\\\\')
- import io
f = io.StringIO()
f.write(about)
f.seek(0)
@@ -351,7 +343,7 @@ conn.close()
self.assertEqual(0, proc.returncode)
def test_copy_from_propagate_error(self):
- class BrokenRead(_base):
+ class BrokenRead(TextIOBase):
def read(self, size):
return 1 / 0
@@ -368,7 +360,7 @@ conn.close()
self.assert_('ZeroDivisionError' in str(e))
def test_copy_to_propagate_error(self):
- class BrokenWrite(_base):
+ class BrokenWrite(TextIOBase):
def write(self, data):
return 1 / 0
diff --git a/tests/test_cursor.py b/tests/test_cursor.py
index 675699c..47b41e6 100755
--- a/tests/test_cursor.py
+++ b/tests/test_cursor.py
@@ -22,6 +22,8 @@
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
+import gc
+import sys
import time
import ctypes
import pickle
@@ -115,7 +117,6 @@ class CursorTests(ConnectingTestCase):
# more than once from a dict.
cur = self.conn.cursor()
foo = (lambda x: x)('foo') * 10
- import sys
nref1 = sys.getrefcount(foo)
cur.mogrify("select %(foo)s, %(foo)s, %(foo)s", {'foo': foo})
nref2 = sys.getrefcount(foo)
@@ -168,7 +169,6 @@ class CursorTests(ConnectingTestCase):
curs = self.conn.cursor()
w = ref(curs)
del curs
- import gc
gc.collect()
self.assert_(w() is None)
diff --git a/tests/test_dates.py b/tests/test_dates.py
index 0c1c504..5829822 100755
--- a/tests/test_dates.py
+++ b/tests/test_dates.py
@@ -23,14 +23,20 @@
# License for more details.
import math
+import pickle
from datetime import date, datetime, time, timedelta
import psycopg2
-import psycopg2.tz
from psycopg2.tz import FixedOffsetTimezone, ZERO
import unittest
from .testutils import ConnectingTestCase, skip_before_postgres
+try:
+ from mx.DateTime import Date, Time, DateTime, DateTimeDeltaFrom
+except ImportError:
+ # Tests will be skipped
+ pass
+
def total_seconds(d):
"""Return total number of seconds of a timedelta as a float."""
@@ -278,7 +284,7 @@ class DatetimeTests(ConnectingTestCase, CommonDatetimeTestsMixin):
self.assertEqual(None, dt.tzinfo)
def test_type_roundtrip_datetimetz(self):
- tz = psycopg2.tz.FixedOffsetTimezone(8 * 60)
+ tz = FixedOffsetTimezone(8 * 60)
dt1 = datetime(2010, 5, 3, 10, 20, 30, tzinfo=tz)
dt2 = self._test_type_roundtrip(dt1)
self.assertNotEqual(None, dt2.tzinfo)
@@ -289,7 +295,7 @@ class DatetimeTests(ConnectingTestCase, CommonDatetimeTestsMixin):
self.assertEqual(None, tm.tzinfo)
def test_type_roundtrip_timetz(self):
- tz = psycopg2.tz.FixedOffsetTimezone(8 * 60)
+ tz = FixedOffsetTimezone(8 * 60)
tm1 = time(10, 20, 30, tzinfo=tz)
tm2 = self._test_type_roundtrip(tm1)
self.assertNotEqual(None, tm2.tzinfo)
@@ -487,7 +493,6 @@ class mxDateTimeTests(ConnectingTestCase, CommonDatetimeTestsMixin):
def test_parse_time_timezone(self):
# Time zone information is ignored.
- from mx.DateTime import Time
expected = Time(13, 30, 29)
self.assertEqual(expected, self.TIME("13:30:29+01", self.curs))
self.assertEqual(expected, self.TIME("13:30:29-01", self.curs))
@@ -498,7 +503,6 @@ class mxDateTimeTests(ConnectingTestCase, CommonDatetimeTestsMixin):
def test_parse_datetime_timezone(self):
# Time zone information is ignored.
- from mx.DateTime import DateTime
expected = DateTime(2007, 1, 1, 13, 30, 29)
self.assertEqual(
expected, self.DATETIME("2007-01-01 13:30:29+01", self.curs))
@@ -522,19 +526,16 @@ class mxDateTimeTests(ConnectingTestCase, CommonDatetimeTestsMixin):
self.assertEqual(value.second, 5)
def test_adapt_time(self):
- from mx.DateTime import Time
value = self.execute('select (%s)::time::text',
[Time(13, 30, 29)])
self.assertEqual(value, '13:30:29')
def test_adapt_datetime(self):
- from mx.DateTime import DateTime
value = self.execute('select (%s)::timestamp::text',
[DateTime(2007, 1, 1, 13, 30, 29.123456)])
self.assertEqual(value, '2007-01-01 13:30:29.123456')
def test_adapt_bc_datetime(self):
- from mx.DateTime import DateTime
value = self.execute('select (%s)::timestamp::text',
[DateTime(-41, 1, 1, 13, 30, 29.123456)])
# microsecs for BC timestamps look not available in PG < 8.4
@@ -544,7 +545,6 @@ class mxDateTimeTests(ConnectingTestCase, CommonDatetimeTestsMixin):
'0042-01-01 13:30:29 BC'), value)
def test_adapt_timedelta(self):
- from mx.DateTime import DateTimeDeltaFrom
value = self.execute('select extract(epoch from (%s)::interval)',
[DateTimeDeltaFrom(days=42,
seconds=45296.123456)])
@@ -553,7 +553,6 @@ class mxDateTimeTests(ConnectingTestCase, CommonDatetimeTestsMixin):
self.assertEqual(int(round((value - seconds) * 1000000)), 123456)
def test_adapt_negative_timedelta(self):
- from mx.DateTime import DateTimeDeltaFrom
value = self.execute('select extract(epoch from (%s)::interval)',
[DateTimeDeltaFrom(days=-42,
seconds=45296.123456)])
@@ -571,35 +570,27 @@ class mxDateTimeTests(ConnectingTestCase, CommonDatetimeTestsMixin):
self.assertEqual(type(o1[0]), type(o2[0]))
def test_type_roundtrip_date(self):
- from mx.DateTime import Date
self._test_type_roundtrip(Date(2010, 5, 3))
def test_type_roundtrip_datetime(self):
- from mx.DateTime import DateTime
self._test_type_roundtrip(DateTime(2010, 5, 3, 10, 20, 30))
def test_type_roundtrip_time(self):
- from mx.DateTime import Time
self._test_type_roundtrip(Time(10, 20, 30))
def test_type_roundtrip_interval(self):
- from mx.DateTime import DateTimeDeltaFrom
self._test_type_roundtrip(DateTimeDeltaFrom(seconds=30))
def test_type_roundtrip_date_array(self):
- from mx.DateTime import Date
self._test_type_roundtrip_array(Date(2010, 5, 3))
def test_type_roundtrip_datetime_array(self):
- from mx.DateTime import DateTime
self._test_type_roundtrip_array(DateTime(2010, 5, 3, 10, 20, 30))
def test_type_roundtrip_time_array(self):
- from mx.DateTime import Time
self._test_type_roundtrip_array(Time(10, 20, 30))
def test_type_roundtrip_interval_array(self):
- from mx.DateTime import DateTimeDeltaFrom
self._test_type_roundtrip_array(DateTimeDeltaFrom(seconds=30))
@@ -659,8 +650,6 @@ class FixedOffsetTimezoneTests(unittest.TestCase):
def test_pickle(self):
# ticket #135
- import pickle
-
tz11 = FixedOffsetTimezone(60)
tz12 = FixedOffsetTimezone(120)
for proto in [-1, 0, 1, 2]:
diff --git a/tests/test_errcodes.py b/tests/test_errcodes.py
index c510f72..676d12d 100755
--- a/tests/test_errcodes.py
+++ b/tests/test_errcodes.py
@@ -25,14 +25,6 @@
import unittest
from .testutils import ConnectingTestCase, slow, reload
-try:
- reload
-except NameError:
- try:
- from importlib import reload
- except ImportError:
- from imp import reload
-
from threading import Thread
from psycopg2 import errorcodes
diff --git a/tests/test_green.py b/tests/test_green.py
index 0f45efb..76ee1c8 100755
--- a/tests/test_green.py
+++ b/tests/test_green.py
@@ -24,6 +24,7 @@
import select
import unittest
+import warnings
import psycopg2
import psycopg2.extensions
import psycopg2.extras
@@ -81,7 +82,6 @@ class GreenTestCase(ConnectingTestCase):
# on high load on linux: probably because the kernel has more
# buffers ready. A warning may be useful during development,
# but an error is bad during regression testing.
- import warnings
warnings.warn("sending a large query didn't trigger block on write.")
def test_error_in_callback(self):
diff --git a/tests/test_ipaddress.py b/tests/test_ipaddress.py
index ec67866..bf39421 100755
--- a/tests/test_ipaddress.py
+++ b/tests/test_ipaddress.py
@@ -23,16 +23,15 @@ import psycopg2
import psycopg2.extras
try:
- import ipaddress
+ import ipaddress as ip
except ImportError:
# Python 2
- ipaddress = None
+ ip = None
-@unittest.skipIf(ipaddress is None, "'ipaddress' module not available")
+@unittest.skipIf(ip is None, "'ipaddress' module not available")
class NetworkingTestCase(testutils.ConnectingTestCase):
def test_inet_cast(self):
- import ipaddress as ip
cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur)
@@ -51,7 +50,6 @@ class NetworkingTestCase(testutils.ConnectingTestCase):
@testutils.skip_before_postgres(8, 2)
def test_inet_array_cast(self):
- import ipaddress as ip
cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur)
cur.execute("select '{NULL,127.0.0.1,::ffff:102:300/128}'::inet[]")
@@ -63,7 +61,6 @@ class NetworkingTestCase(testutils.ConnectingTestCase):
self.assert_(isinstance(l[2], ip.IPv6Interface), l)
def test_inet_adapt(self):
- import ipaddress as ip
cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur)
@@ -74,7 +71,6 @@ class NetworkingTestCase(testutils.ConnectingTestCase):
self.assertEquals(cur.fetchone()[0], '::ffff:102:300/128')
def test_cidr_cast(self):
- import ipaddress as ip
cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur)
@@ -93,7 +89,6 @@ class NetworkingTestCase(testutils.ConnectingTestCase):
@testutils.skip_before_postgres(8, 2)
def test_cidr_array_cast(self):
- import ipaddress as ip
cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur)
cur.execute("select '{NULL,127.0.0.1,::ffff:102:300/128}'::cidr[]")
@@ -105,7 +100,6 @@ class NetworkingTestCase(testutils.ConnectingTestCase):
self.assert_(isinstance(l[2], ip.IPv6Network), l)
def test_cidr_adapt(self):
- import ipaddress as ip
cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur)
diff --git a/tests/test_module.py b/tests/test_module.py
index c9b9c6b..fd8ae46 100755
--- a/tests/test_module.py
+++ b/tests/test_module.py
@@ -25,6 +25,7 @@
import gc
import os
import sys
+import pickle
from subprocess import Popen
from weakref import ref
@@ -285,7 +286,6 @@ class ExceptionsTestCase(ConnectingTestCase):
self.assertEqual(e.diag.severity_nonlocalized, 'ERROR')
def test_pickle(self):
- import pickle
cur = self.conn.cursor()
try:
cur.execute("select * from nonexist")
@@ -300,7 +300,6 @@ class ExceptionsTestCase(ConnectingTestCase):
def test_pickle_connection_error(self):
# segfaults on psycopg 2.5.1 - see ticket #170
- import pickle
try:
psycopg2.connect('dbname=nosuchdatabasemate')
except psycopg2.Error as exc:
diff --git a/tests/test_replication.py b/tests/test_replication.py
index 7cb4327..697d53c 100755
--- a/tests/test_replication.py
+++ b/tests/test_replication.py
@@ -22,6 +22,7 @@
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
+import time
from select import select
import psycopg2
@@ -47,7 +48,6 @@ class ReplicationTestCase(ConnectingTestCase):
# first close all connections, as they might keep the slot(s) active
super(ReplicationTestCase, self).tearDown()
- import time
time.sleep(0.025) # sometimes the slot is still active, wait a little
if self._slots:
diff --git a/tests/test_types_basic.py b/tests/test_types_basic.py
index ff1dba0..13d733f 100755
--- a/tests/test_types_basic.py
+++ b/tests/test_types_basic.py
@@ -22,8 +22,10 @@
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
+import string
import ctypes
import decimal
+import datetime
import platform
import sys
@@ -155,7 +157,6 @@ class TypesBasicTests(ConnectingTestCase):
def testEmptyArrayRegression(self):
# ticket #42
- import datetime
curs = self.conn.cursor()
curs.execute(
"create table array_test "
@@ -478,9 +479,6 @@ class AdaptSubclassTest(unittest.TestCase):
del psycopg2.extensions.adapters[A, psycopg2.extensions.ISQLQuote]
def test_conform_subclass_precedence(self):
-
- import psycopg2.extensions as ext
-
class foo(tuple):
def __conform__(self, proto):
return self
@@ -488,7 +486,7 @@ class AdaptSubclassTest(unittest.TestCase):
def getquoted(self):
return 'bar'
- self.assertEqual(ext.adapt(foo((1, 2, 3))).getquoted(), 'bar')
+ self.assertEqual(adapt(foo((1, 2, 3))).getquoted(), 'bar')
@unittest.skipIf(
@@ -556,7 +554,6 @@ class ByteaParserTest(unittest.TestCase):
self.assertEqual(rv, bytes(range(256)))
def test_escaped_mixed(self):
- import string
buf = ''.join(("\\%03o" % i) for i in range(32))
buf += string.ascii_letters
buf += ''.join('\\' + c for c in string.ascii_letters)
diff --git a/tests/test_types_extras.py b/tests/test_types_extras.py
index a10d476..66d4ddf 100755
--- a/tests/test_types_extras.py
+++ b/tests/test_types_extras.py
@@ -16,6 +16,8 @@
import re
import sys
+import json
+import uuid
import warnings
from decimal import Decimal
from datetime import date, datetime
@@ -32,7 +34,7 @@ import psycopg2.extensions as ext
from psycopg2._json import _get_json_oids
from psycopg2.extras import (
CompositeCaster, DateRange, DateTimeRange, DateTimeTZRange, HstoreAdapter,
- Inet, Json, NumericRange, Range, RealDictConnection, json,
+ Inet, Json, NumericRange, Range, RealDictConnection,
register_composite, register_hstore, register_range,
)
from psycopg2.tz import FixedOffsetTimezone
@@ -48,7 +50,6 @@ class TypesExtrasTests(ConnectingTestCase):
@skip_if_no_uuid
def testUUID(self):
- import uuid
psycopg2.extras.register_uuid()
u = uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e350')
s = self.execute("SELECT %s AS foo", (u,))
@@ -59,7 +60,6 @@ class TypesExtrasTests(ConnectingTestCase):
@skip_if_no_uuid
def testUUIDARRAY(self):
- import uuid
psycopg2.extras.register_uuid()
u = [uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e350'),
uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e352')]
@@ -1019,7 +1019,6 @@ def skip_if_no_jsonb_type(f):
class JsonbTestCase(ConnectingTestCase):
@staticmethod
def myloads(s):
- import json
rv = json.loads(s)
rv['test'] = 1
return rv
diff --git a/tests/testutils.py b/tests/testutils.py
index 8dab1b6..959b3dd 100644
--- a/tests/testutils.py
+++ b/tests/testutils.py
@@ -32,23 +32,28 @@ import platform
import unittest
from functools import wraps
from ctypes.util import find_library
-from .testconfig import dsn, repl_dsn
-from psycopg2 import ProgrammingError
+
+import psycopg2
+import psycopg2.errors
+import psycopg2.extensions
from psycopg2.compat import text_type
-from .testconfig import green
+from .testconfig import green, dsn, repl_dsn
# Python 2/3 compatibility
if sys.version_info[0] == 2:
# Python 2
from StringIO import StringIO
+ TextIOBase = object
long = long
reload = reload
unichr = unichr
+
else:
# Python 3
from io import StringIO # noqa
+ from io import TextIOBase # noqa
from importlib import reload # noqa
long = int
unichr = chr
@@ -116,7 +121,6 @@ class ConnectingTestCase(unittest.TestCase):
conninfo = kwargs.pop('dsn')
else:
conninfo = dsn
- import psycopg2
conn = psycopg2.connect(conninfo, **kwargs)
self._conns.append(conn)
return conn
@@ -135,7 +139,6 @@ class ConnectingTestCase(unittest.TestCase):
if 'dsn' not in kwargs:
kwargs['dsn'] = repl_dsn
- import psycopg2
try:
conn = self.connect(**kwargs)
if conn.async_ == 1:
@@ -164,7 +167,6 @@ class ConnectingTestCase(unittest.TestCase):
# for use with async connections only
def wait(self, cur_or_conn):
- import psycopg2.extensions
pollable = cur_or_conn
if not hasattr(pollable, 'poll'):
pollable = cur_or_conn.connection
@@ -221,7 +223,7 @@ def decorate_all_tests(obj, *decorators):
@decorate_all_tests
def skip_if_no_uuid(f):
- """Decorator to skip a test if uuid is not supported by Py/PG."""
+ """Decorator to skip a test if uuid is not supported by PG."""
@wraps(f)
def skip_if_no_uuid_(self):
try:
@@ -248,7 +250,7 @@ def skip_if_tpc_disabled(f):
cur = cnn.cursor()
try:
cur.execute("SHOW max_prepared_transactions;")
- except ProgrammingError:
+ except psycopg2.ProgrammingError:
return self.skipTest(
"server too old: two phase transactions not supported.")
else:
@@ -306,7 +308,6 @@ def skip_after_postgres(*ver):
def libpq_version():
- import psycopg2
v = psycopg2.__libpq_version__
if v >= 90100:
v = min(v, psycopg2.extensions.libpq_version())
@@ -372,12 +373,8 @@ def skip_if_no_superuser(f):
def skip_if_no_superuser_(self):
try:
return f(self)
- except ProgrammingError as e:
- import psycopg2.errorcodes
- if e.pgcode == psycopg2.errorcodes.INSUFFICIENT_PRIVILEGE:
- self.skipTest("skipped because not superuser")
- else:
- raise
+ except psycopg2.errors.InsufficientPrivilege:
+ self.skipTest("skipped because not superuser")
return skip_if_no_superuser_