summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/dbapi20.py62
-rwxr-xr-xtests/test_connection.py10
-rwxr-xr-xtests/test_copy.py10
-rwxr-xr-xtests/test_cursor.py19
-rwxr-xr-xtests/test_dates.py2
-rwxr-xr-xtests/test_extras_dictcursor.py2
-rwxr-xr-xtests/test_green.py4
-rwxr-xr-xtests/test_lobject.py2
-rwxr-xr-xtests/test_module.py10
-rwxr-xr-xtests/test_notify.py2
-rwxr-xr-xtests/test_quote.py3
-rwxr-xr-xtests/test_replication.py6
-rwxr-xr-xtests/test_types_extras.py10
-rw-r--r--tests/testconfig.py10
-rw-r--r--tests/testutils.py17
15 files changed, 78 insertions, 91 deletions
diff --git a/tests/dbapi20.py b/tests/dbapi20.py
index b96af09..5a98b4a 100644
--- a/tests/dbapi20.py
+++ b/tests/dbapi20.py
@@ -101,10 +101,10 @@ class DatabaseAPI20Test(unittest.TestCase):
connect_kw_args = {} # Keyword arguments for connect
table_prefix = 'dbapi20test_' # If you need to specify a prefix for tables
- ddl1 = 'create table %sbooze (name varchar(20))' % table_prefix
- ddl2 = 'create table %sbarflys (name varchar(20))' % table_prefix
- xddl1 = 'drop table %sbooze' % table_prefix
- xddl2 = 'drop table %sbarflys' % table_prefix
+ ddl1 = f'create table {table_prefix}booze (name varchar(20))'
+ ddl2 = f'create table {table_prefix}barflys (name varchar(20))'
+ xddl1 = f'drop table {table_prefix}booze'
+ xddl2 = f'drop table {table_prefix}barflys'
lowerfunc = 'lower' # Name of stored procedure to convert string->lowercase
@@ -265,7 +265,7 @@ class DatabaseAPI20Test(unittest.TestCase):
cur1.execute("insert into %sbooze values ('Victoria Bitter')" % (
self.table_prefix
))
- cur2.execute("select name from %sbooze" % self.table_prefix)
+ cur2.execute(f"select name from {self.table_prefix}booze")
booze = cur2.fetchall()
self.assertEqual(len(booze),1)
self.assertEqual(len(booze[0]),1)
@@ -282,7 +282,7 @@ class DatabaseAPI20Test(unittest.TestCase):
'cursor.description should be none after executing a '
'statement that can return no rows (such as DDL)'
)
- cur.execute('select name from %sbooze' % self.table_prefix)
+ cur.execute(f'select name from {self.table_prefix}booze')
self.assertEqual(len(cur.description),1,
'cursor.description describes too many columns'
)
@@ -322,7 +322,7 @@ class DatabaseAPI20Test(unittest.TestCase):
'cursor.rowcount should == number or rows inserted, or '
'set to -1 after executing an insert statement'
)
- cur.execute("select name from %sbooze" % self.table_prefix)
+ cur.execute(f"select name from {self.table_prefix}booze")
self.failUnless(cur.rowcount in (-1,1),
'cursor.rowcount should == number of rows returned, or '
'set to -1 after executing a select statement'
@@ -385,24 +385,22 @@ class DatabaseAPI20Test(unittest.TestCase):
def _paraminsert(self,cur):
self.executeDDL1(cur)
- cur.execute("insert into %sbooze values ('Victoria Bitter')" % (
- self.table_prefix
- ))
+ cur.execute(f"insert into {self.table_prefix}booze values ('Victoria Bitter')")
self.failUnless(cur.rowcount in (-1,1))
if self.driver.paramstyle == 'qmark':
cur.execute(
- 'insert into %sbooze values (?)' % self.table_prefix,
+ f'insert into {self.table_prefix}booze values (?)',
("Cooper's",)
)
elif self.driver.paramstyle == 'numeric':
cur.execute(
- 'insert into %sbooze values (:1)' % self.table_prefix,
+ f'insert into {self.table_prefix}booze values (:1)',
("Cooper's",)
)
elif self.driver.paramstyle == 'named':
cur.execute(
- 'insert into %sbooze values (:beer)' % self.table_prefix,
+ f'insert into {self.table_prefix}booze values (:beer)',
{'beer':"Cooper's"}
)
elif self.driver.paramstyle == 'format':
@@ -412,14 +410,14 @@ class DatabaseAPI20Test(unittest.TestCase):
)
elif self.driver.paramstyle == 'pyformat':
cur.execute(
- 'insert into %sbooze values (%%(beer)s)' % self.table_prefix,
+ f"insert into %sbooze values (%{self.table_prefix['beer']})",
{'beer':"Cooper's"}
)
else:
self.fail('Invalid paramstyle')
self.failUnless(cur.rowcount in (-1,1))
- cur.execute('select name from %sbooze' % self.table_prefix)
+ cur.execute(f'select name from {self.table_prefix}booze')
res = cur.fetchall()
self.assertEqual(len(res),2,'cursor.fetchall returned too few rows')
beers = [res[0][0],res[1][0]]
@@ -442,17 +440,17 @@ class DatabaseAPI20Test(unittest.TestCase):
margs = [ {'beer': "Cooper's"}, {'beer': "Boag's"} ]
if self.driver.paramstyle == 'qmark':
cur.executemany(
- 'insert into %sbooze values (?)' % self.table_prefix,
+ f'insert into {self.table_prefix}booze values (?)',
largs
)
elif self.driver.paramstyle == 'numeric':
cur.executemany(
- 'insert into %sbooze values (:1)' % self.table_prefix,
+ f'insert into {self.table_prefix}booze values (:1)',
largs
)
elif self.driver.paramstyle == 'named':
cur.executemany(
- 'insert into %sbooze values (:beer)' % self.table_prefix,
+ f'insert into {self.table_prefix}booze values (:beer)',
margs
)
elif self.driver.paramstyle == 'format':
@@ -462,9 +460,7 @@ class DatabaseAPI20Test(unittest.TestCase):
)
elif self.driver.paramstyle == 'pyformat':
cur.executemany(
- 'insert into %sbooze values (%%(beer)s)' % (
- self.table_prefix
- ),
+ f"insert into %sbooze values (%{self.table_prefix['beer']})",
margs
)
else:
@@ -473,7 +469,7 @@ class DatabaseAPI20Test(unittest.TestCase):
'insert using cursor.executemany set cursor.rowcount to '
'incorrect value %r' % cur.rowcount
)
- cur.execute('select name from %sbooze' % self.table_prefix)
+ cur.execute(f'select name from {self.table_prefix}booze')
res = cur.fetchall()
self.assertEqual(len(res),2,
'cursor.fetchall retrieved incorrect number of rows'
@@ -499,7 +495,7 @@ class DatabaseAPI20Test(unittest.TestCase):
self.executeDDL1(cur)
self.assertRaises(self.driver.Error,cur.fetchone)
- cur.execute('select name from %sbooze' % self.table_prefix)
+ cur.execute(f'select name from {self.table_prefix}booze')
self.assertEqual(cur.fetchone(),None,
'cursor.fetchone should return None if a query retrieves '
'no rows'
@@ -513,7 +509,7 @@ class DatabaseAPI20Test(unittest.TestCase):
))
self.assertRaises(self.driver.Error,cur.fetchone)
- cur.execute('select name from %sbooze' % self.table_prefix)
+ cur.execute(f'select name from {self.table_prefix}booze')
r = cur.fetchone()
self.assertEqual(len(r),1,
'cursor.fetchone should have retrieved a single row'
@@ -560,7 +556,7 @@ class DatabaseAPI20Test(unittest.TestCase):
for sql in self._populate():
cur.execute(sql)
- cur.execute('select name from %sbooze' % self.table_prefix)
+ cur.execute(f'select name from {self.table_prefix}booze')
r = cur.fetchmany()
self.assertEqual(len(r),1,
'cursor.fetchmany retrieved incorrect number of rows, '
@@ -584,7 +580,7 @@ class DatabaseAPI20Test(unittest.TestCase):
# Same as above, using cursor.arraysize
cur.arraysize=4
- cur.execute('select name from %sbooze' % self.table_prefix)
+ cur.execute(f'select name from {self.table_prefix}booze')
r = cur.fetchmany() # Should get 4 rows
self.assertEqual(len(r),4,
'cursor.arraysize not being honoured by fetchmany'
@@ -596,7 +592,7 @@ class DatabaseAPI20Test(unittest.TestCase):
self.failUnless(cur.rowcount in (-1,6))
cur.arraysize=6
- cur.execute('select name from %sbooze' % self.table_prefix)
+ cur.execute(f'select name from {self.table_prefix}booze')
rows = cur.fetchmany() # Should get all rows
self.failUnless(cur.rowcount in (-1,6))
self.assertEqual(len(rows),6)
@@ -618,7 +614,7 @@ class DatabaseAPI20Test(unittest.TestCase):
self.failUnless(cur.rowcount in (-1,6))
self.executeDDL2(cur)
- cur.execute('select name from %sbarflys' % self.table_prefix)
+ cur.execute(f'select name from {self.table_prefix}barflys')
r = cur.fetchmany() # Should get empty sequence
self.assertEqual(len(r),0,
'cursor.fetchmany should return an empty sequence if '
@@ -646,7 +642,7 @@ class DatabaseAPI20Test(unittest.TestCase):
# after executing a a statement that cannot return rows
self.assertRaises(self.driver.Error,cur.fetchall)
- cur.execute('select name from %sbooze' % self.table_prefix)
+ cur.execute(f'select name from {self.table_prefix}booze')
rows = cur.fetchall()
self.failUnless(cur.rowcount in (-1,len(self.samples)))
self.assertEqual(len(rows),len(self.samples),
@@ -667,7 +663,7 @@ class DatabaseAPI20Test(unittest.TestCase):
self.failUnless(cur.rowcount in (-1,len(self.samples)))
self.executeDDL2(cur)
- cur.execute('select name from %sbarflys' % self.table_prefix)
+ cur.execute(f'select name from {self.table_prefix}barflys')
rows = cur.fetchall()
self.failUnless(cur.rowcount in (-1,0))
self.assertEqual(len(rows),0,
@@ -686,7 +682,7 @@ class DatabaseAPI20Test(unittest.TestCase):
for sql in self._populate():
cur.execute(sql)
- cur.execute('select name from %sbooze' % self.table_prefix)
+ cur.execute(f'select name from {self.table_prefix}booze')
rows1 = cur.fetchone()
rows23 = cur.fetchmany(2)
rows4 = cur.fetchone()
@@ -803,8 +799,8 @@ class DatabaseAPI20Test(unittest.TestCase):
try:
cur = con.cursor()
self.executeDDL1(cur)
- cur.execute('insert into %sbooze values (NULL)' % self.table_prefix)
- cur.execute('select name from %sbooze' % self.table_prefix)
+ cur.execute(f'insert into {self.table_prefix}booze values (NULL)')
+ cur.execute(f'select name from {self.table_prefix}booze')
r = cur.fetchall()
self.assertEqual(len(r),1)
self.assertEqual(len(r[0]),1)
diff --git a/tests/test_connection.py b/tests/test_connection.py
index 5fb7670..dc1d446 100755
--- a/tests/test_connection.py
+++ b/tests/test_connection.py
@@ -383,8 +383,7 @@ class ConnectionTests(ConnectingTestCase):
dir = tempfile.mkdtemp()
try:
with open(os.path.join(dir, "mptest.py"), 'w') as f:
- f.write("""\
-import time
+ f.write(f"""import time
import psycopg2
def thread():
@@ -396,7 +395,7 @@ def thread():
def process():
time.sleep(0.2)
-""".format(dsn=dsn))
+""")
script = ("""\
import sys
@@ -1732,8 +1731,7 @@ class SignalTestCase(ConnectingTestCase):
""")
def _test_bug_551(self, query):
- script = ("""\
-import os
+ script = f"""import os
import sys
import time
import signal
@@ -1766,7 +1764,7 @@ t.start()
while True:
cur.execute({query!r}, ("Hello, world!",))
-""".format(dsn=dsn, query=query))
+"""
proc = sp.Popen([sys.executable, '-c', script],
stdout=sp.PIPE, stderr=sp.PIPE)
diff --git a/tests/test_copy.py b/tests/test_copy.py
index e48e3bd..a8e8ff9 100755
--- a/tests/test_copy.py
+++ b/tests/test_copy.py
@@ -317,8 +317,7 @@ class CopyTests(ConnectingTestCase):
@slow
def test_copy_from_segfault(self):
# issue #219
- script = ("""\
-import psycopg2
+ script = f"""import psycopg2
conn = psycopg2.connect({dsn!r})
curs = conn.cursor()
curs.execute("create table copy_segf (id int)")
@@ -327,7 +326,7 @@ try:
except psycopg2.ProgrammingError:
pass
conn.close()
-""".format(dsn=dsn))
+"""
proc = Popen([sys.executable, '-c', script])
proc.communicate()
@@ -336,8 +335,7 @@ conn.close()
@slow
def test_copy_to_segfault(self):
# issue #219
- script = ("""\
-import psycopg2
+ script = f"""import psycopg2
conn = psycopg2.connect({dsn!r})
curs = conn.cursor()
curs.execute("create table copy_segf (id int)")
@@ -346,7 +344,7 @@ try:
except psycopg2.ProgrammingError:
pass
conn.close()
-""".format(dsn=dsn))
+"""
proc = Popen([sys.executable, '-c', script], stdout=PIPE)
proc.communicate()
diff --git a/tests/test_cursor.py b/tests/test_cursor.py
index 1b5b620..391af56 100755
--- a/tests/test_cursor.py
+++ b/tests/test_cursor.py
@@ -88,21 +88,21 @@ class CursorTests(ConnectingTestCase):
return s
# unicode query with non-ascii data
- cur.execute("SELECT '%s';" % snowman)
+ cur.execute(f"SELECT '{snowman}';")
self.assertEqual(snowman.encode('utf8'), b(cur.fetchone()[0]))
- self.assertQuotedEqual(("SELECT '%s';" % snowman).encode('utf8'),
- cur.mogrify("SELECT '%s';" % snowman))
+ self.assertQuotedEqual(f"SELECT '{snowman}';".encode('utf8'),
+ cur.mogrify(f"SELECT '{snowman}';"))
# unicode args
cur.execute("SELECT %s;", (snowman,))
self.assertEqual(snowman.encode("utf-8"), b(cur.fetchone()[0]))
- self.assertQuotedEqual(("SELECT '%s';" % snowman).encode('utf8'),
+ self.assertQuotedEqual(f"SELECT '{snowman}';".encode('utf8'),
cur.mogrify("SELECT %s;", (snowman,)))
# unicode query and args
cur.execute("SELECT %s;", (snowman,))
self.assertEqual(snowman.encode("utf-8"), b(cur.fetchone()[0]))
- self.assertQuotedEqual(("SELECT '%s';" % snowman).encode('utf8'),
+ self.assertQuotedEqual(f"SELECT '{snowman}';".encode('utf8'),
cur.mogrify("SELECT %s;", (snowman,)))
def test_mogrify_decimal_explodes(self):
@@ -282,12 +282,12 @@ class CursorTests(ConnectingTestCase):
cur = self.conn.cursor()
# Set up the temporary function
- cur.execute('''
- CREATE FUNCTION {}({} INT)
+ cur.execute(f'''
+ CREATE FUNCTION {procname}({escaped_paramname} INT)
RETURNS INT AS
'SELECT $1 * $1'
LANGUAGE SQL
- '''.format(procname, escaped_paramname))
+ ''')
# Make sure callproc works right
cur.callproc(procname, {paramname: 2})
@@ -573,8 +573,7 @@ class NamedCursorTests(ConnectingTestCase):
time.sleep(0.2)
t2 = next(i)[0]
self.assert_((t2 - t1).microseconds * 1e-6 < 0.1,
- "named cursor records fetched in 2 roundtrips (delta: %s)"
- % (t2 - t1))
+ f"named cursor records fetched in 2 roundtrips (delta: {t2 - t1})")
@skip_before_postgres(8, 0)
def test_iter_named_cursor_default_itersize(self):
diff --git a/tests/test_dates.py b/tests/test_dates.py
index fb50abe..29c37b0 100755
--- a/tests/test_dates.py
+++ b/tests/test_dates.py
@@ -379,7 +379,7 @@ class DatetimeTests(ConnectingTestCase, CommonDatetimeTestsMixin):
cur)
def f(val):
- cur.execute("select '%s'::text" % val)
+ cur.execute(f"select '{val}'::text")
return cur.fetchone()[0]
self.assertRaises(OverflowError, f, '100000000000000000:00:00')
diff --git a/tests/test_extras_dictcursor.py b/tests/test_extras_dictcursor.py
index e52ad4a..4c63db7 100755
--- a/tests/test_extras_dictcursor.py
+++ b/tests/test_extras_dictcursor.py
@@ -621,7 +621,7 @@ class NamedTupleCursorTest(ConnectingTestCase):
recs = []
curs = self.conn.cursor()
for i in range(10):
- curs.execute("select 1 as f%s" % i)
+ curs.execute(f"select 1 as f{i}")
recs.append(curs.fetchone())
# Still in cache
diff --git a/tests/test_green.py b/tests/test_green.py
index e4e93a6..fa9f298 100755
--- a/tests/test_green.py
+++ b/tests/test_green.py
@@ -137,7 +137,7 @@ class GreenTestCase(ConnectingTestCase):
elif state == POLL_WRITE:
select.select([], [conn.fileno()], [], 0.1)
else:
- raise conn.OperationalError("bad state from poll: %s" % state)
+ raise conn.OperationalError(f"bad state from poll: {state}")
stub = self.set_stub_wait_callback(self.conn, wait)
cur = self.conn.cursor()
@@ -182,7 +182,7 @@ class CallbackErrorTestCase(ConnectingTestCase):
elif state == POLL_WRITE:
select.select([], [conn.fileno()], [])
else:
- raise conn.OperationalError("bad state from poll: %s" % state)
+ raise conn.OperationalError(f"bad state from poll: {state}")
except KeyboardInterrupt:
conn.cancel()
# the loop will be broken by a server error
diff --git a/tests/test_lobject.py b/tests/test_lobject.py
index 6088b28..8d94656 100755
--- a/tests/test_lobject.py
+++ b/tests/test_lobject.py
@@ -207,7 +207,7 @@ class LargeObjectTests(LargeObjectTestCase):
data1 = lo.read()
# avoid dumping megacraps in the console in case of error
self.assert_(data == data1,
- "{!r}... != {!r}...".format(data[:100], data1[:100]))
+ f"{data[:100]!r}... != {data1[:100]!r}...")
def test_seek_tell(self):
lo = self.conn.lobject()
diff --git a/tests/test_module.py b/tests/test_module.py
index dc96e23..717cf50 100755
--- a/tests/test_module.py
+++ b/tests/test_module.py
@@ -120,7 +120,7 @@ class ConnectTestCase(unittest.TestCase):
def test_int_port_param(self):
psycopg2.connect(database='sony', port=6543)
- dsn = " %s " % self.args[0]
+ dsn = f" {self.args[0]} "
self.assert_(" dbname=sony " in dsn, dsn)
self.assert_(" port=6543 " in dsn, dsn)
@@ -327,12 +327,12 @@ class TestExtensionModule(unittest.TestCase):
pkgdir = os.path.dirname(psycopg2.__file__)
pardir = os.path.dirname(pkgdir)
self.assert_(pardir in sys.path)
- script = ("""
+ script = f"""
import sys
-sys.path.remove({!r})
-sys.path.insert(0, {!r})
+sys.path.remove({pardir!r})
+sys.path.insert(0, {pkgdir!r})
import _psycopg
-""".format(pardir, pkgdir))
+"""
proc = Popen([sys.executable, '-c', script])
proc.communicate()
diff --git a/tests/test_notify.py b/tests/test_notify.py
index f6c514b..a3586f8 100755
--- a/tests/test_notify.py
+++ b/tests/test_notify.py
@@ -56,7 +56,7 @@ class NotifiesTests(ConnectingTestCase):
if payload is None:
payload = ''
else:
- payload = ", %r" % payload
+ payload = f", {payload!r}"
script = ("""\
import time
diff --git a/tests/test_quote.py b/tests/test_quote.py
index 7489b3f..98ec494 100755
--- a/tests/test_quote.py
+++ b/tests/test_quote.py
@@ -98,8 +98,7 @@ class QuotingTestCase(ConnectingTestCase):
server_encoding = curs.fetchone()[0]
if server_encoding != "UTF8":
return self.skipTest(
- "Unicode test skipped since server encoding is %s"
- % server_encoding)
+ f"Unicode test skipped since server encoding is {server_encoding}")
data = """some data with \t chars
to escape into, 'quotes', \u20ac euro sign and \\ a backslash too.
diff --git a/tests/test_replication.py b/tests/test_replication.py
index 657fa60..569ed39 100755
--- a/tests/test_replication.py
+++ b/tests/test_replication.py
@@ -244,9 +244,9 @@ class AsyncReplicationTest(ReplicationTestCase):
def consume(msg):
# just check the methods
- "{}: {}".format(cur.io_timestamp, repr(msg))
- "{}: {}".format(cur.feedback_timestamp, repr(msg))
- "{}: {}".format(cur.wal_end, repr(msg))
+ f"{cur.io_timestamp}: {repr(msg)}"
+ f"{cur.feedback_timestamp}: {repr(msg)}"
+ f"{cur.wal_end}: {repr(msg)}"
self.msg_count += 1
if self.msg_count > 3:
diff --git a/tests/test_types_extras.py b/tests/test_types_extras.py
index 942785c..c653020 100755
--- a/tests/test_types_extras.py
+++ b/tests/test_types_extras.py
@@ -711,7 +711,7 @@ class AdaptTypeTestCase(ConnectingTestCase):
def _create_type(self, name, fields):
curs = self.conn.cursor()
try:
- curs.execute("drop type %s cascade;" % name)
+ curs.execute(f"drop type {name} cascade;")
except psycopg2.ProgrammingError:
self.conn.rollback()
@@ -1300,14 +1300,14 @@ class RangeCasterTestCase(ConnectingTestCase):
def test_cast_null(self):
cur = self.conn.cursor()
for type in self.builtin_ranges:
- cur.execute("select NULL::%s" % type)
+ cur.execute(f"select NULL::{type}")
r = cur.fetchone()[0]
self.assertEqual(r, None)
def test_cast_empty(self):
cur = self.conn.cursor()
for type in self.builtin_ranges:
- cur.execute("select 'empty'::%s" % type)
+ cur.execute(f"select 'empty'::{type}")
r = cur.fetchone()[0]
self.assert_(isinstance(r, Range), type)
self.assert_(r.isempty)
@@ -1315,7 +1315,7 @@ class RangeCasterTestCase(ConnectingTestCase):
def test_cast_inf(self):
cur = self.conn.cursor()
for type in self.builtin_ranges:
- cur.execute("select '(,)'::%s" % type)
+ cur.execute(f"select '(,)'::{type}")
r = cur.fetchone()[0]
self.assert_(isinstance(r, Range), type)
self.assert_(not r.isempty)
@@ -1325,7 +1325,7 @@ class RangeCasterTestCase(ConnectingTestCase):
def test_cast_numbers(self):
cur = self.conn.cursor()
for type in ('int4range', 'int8range'):
- cur.execute("select '(10,20)'::%s" % type)
+ cur.execute(f"select '(10,20)'::{type}")
r = cur.fetchone()[0]
self.assert_(isinstance(r, NumericRange))
self.assert_(not r.isempty)
diff --git a/tests/testconfig.py b/tests/testconfig.py
index 511f1fb..ed6132a 100644
--- a/tests/testconfig.py
+++ b/tests/testconfig.py
@@ -23,15 +23,15 @@ if green:
psycopg2.extensions.set_wait_callback(wait_callback)
# Construct a DSN to connect to the test database:
-dsn = 'dbname=%s' % dbname
+dsn = f'dbname={dbname}'
if dbhost is not None:
- dsn += ' host=%s' % dbhost
+ dsn += f' host={dbhost}'
if dbport is not None:
- dsn += ' port=%s' % dbport
+ dsn += f' port={dbport}'
if dbuser is not None:
- dsn += ' user=%s' % dbuser
+ dsn += f' user={dbuser}'
if dbpass is not None:
- dsn += ' password=%s' % dbpass
+ dsn += f' password={dbpass}'
# Don't run replication tests if REPL_DSN is not set, default to normal DSN if
# set to empty string.
diff --git a/tests/testutils.py b/tests/testutils.py
index be6098c..007fda2 100644
--- a/tests/testutils.py
+++ b/tests/testutils.py
@@ -100,8 +100,7 @@ class ConnectingTestCase(unittest.TestCase):
self._conns
except AttributeError as e:
raise AttributeError(
- "%s (did you forget to call ConnectingTestCase.setUp()?)"
- % e)
+ f"{e} (did you forget to call ConnectingTestCase.setUp()?)")
if 'dsn' in kwargs:
conninfo = kwargs.pop('dsn')
@@ -134,7 +133,7 @@ class ConnectingTestCase(unittest.TestCase):
# Otherwise we tried to run some bad operation in the connection
# (e.g. bug #482) and we'd rather know that.
if e.pgcode is None:
- return self.skipTest("replication db not configured: %s" % e)
+ return self.skipTest(f"replication db not configured: {e}")
else:
raise
@@ -324,7 +323,7 @@ def skip_after_libpq(*ver):
v = libpq_version()
decorator = unittest.skipIf(
v >= int("%d%02d%02d" % ver),
- "skipped because libpq %s" % v,
+ f"skipped because libpq {v}",
)
return decorator(cls)
return skip_after_libpq_
@@ -335,8 +334,7 @@ def skip_before_python(*ver):
def skip_before_python_(cls):
decorator = unittest.skipIf(
sys.version_info[:len(ver)] < ver,
- "skipped because Python %s"
- % ".".join(map(str, sys.version_info[:len(ver)])),
+ f"skipped because Python {'.'.join(map(str, sys.version_info[:len(ver)]))}",
)
return decorator(cls)
return skip_before_python_
@@ -347,8 +345,7 @@ def skip_from_python(*ver):
def skip_from_python_(cls):
decorator = unittest.skipIf(
sys.version_info[:len(ver)] >= ver,
- "skipped because Python %s"
- % ".".join(map(str, sys.version_info[:len(ver)])),
+ f"skipped because Python {'.'.join(map(str, sys.version_info[:len(ver)]))}",
)
return decorator(cls)
return skip_from_python_
@@ -415,7 +412,7 @@ def crdb_version(conn, __crdb_version=[]):
m = re.search(r"\bv(\d+)\.(\d+)\.(\d+)", sver)
if not m:
raise ValueError(
- "can't parse CockroachDB version from %s" % sver)
+ f"can't parse CockroachDB version from {sver}")
ver = int(m.group(1)) * 10000 + int(m.group(2)) * 100 + int(m.group(3))
__crdb_version.append(ver)
@@ -439,7 +436,7 @@ def skip_if_crdb(reason, conn=None, version=None):
"""
if not isinstance(reason, str):
- raise TypeError("reason should be a string, got %r instead" % reason)
+ raise TypeError(f"reason should be a string, got {reason!r} instead")
if conn is not None:
ver = crdb_version(conn)