From e993a986388b0ec44b109a03edd34937a51aa9a1 Mon Sep 17 00:00:00 2001 From: Jeff Forcier Date: Mon, 16 Jan 2023 19:03:36 -0500 Subject: s/%/fstrings/g Except in one spot where it was too complicated to bother for now XD --- paramiko/_winapi.py | 2 +- paramiko/win_pageant.py | 2 +- setup_helper.py | 2 +- tests/test_client.py | 12 +++--- tests/test_kex_gss.py | 2 +- tests/test_sftp.py | 22 +++++------ tests/test_sftp_big.py | 99 ++++++++++++++++++++++++------------------------- tests/test_ssh_gss.py | 2 +- 8 files changed, 71 insertions(+), 72 deletions(-) diff --git a/paramiko/_winapi.py b/paramiko/_winapi.py index ab2a71e6..96284b69 100644 --- a/paramiko/_winapi.py +++ b/paramiko/_winapi.py @@ -172,7 +172,7 @@ class MemoryMap(object): assert isinstance(msg, bytes) n = len(msg) if self.pos + n >= self.length: # A little safety. - raise ValueError("Refusing to write %d bytes" % n) + raise ValueError(f"Refusing to write {n} bytes") dest = self.view + self.pos length = ctypes.c_size_t(n) ctypes.windll.kernel32.RtlMoveMemory(dest, msg, length) diff --git a/paramiko/win_pageant.py b/paramiko/win_pageant.py index 780824c1..5a085939 100644 --- a/paramiko/win_pageant.py +++ b/paramiko/win_pageant.py @@ -84,7 +84,7 @@ def _query_pageant(msg): return None # create a name for the mmap - map_name = "PageantRequest%08x" % thread.get_ident() + map_name = f"PageantRequest{thread.get_ident():08x}" pymap = _winapi.MemoryMap( map_name, _AGENT_MAX_MSGLEN, _winapi.get_security_attributes_for_user() diff --git a/setup_helper.py b/setup_helper.py index 8671fd63..f290ea3f 100644 --- a/setup_helper.py +++ b/setup_helper.py @@ -116,7 +116,7 @@ def make_tarball( mode = "w:" + tarfile_compress_flag.get(compress, "") mkpath(os.path.dirname(archive_name), dry_run=dry_run) - log.info("Creating tar file %s with mode %s" % (archive_name, mode)) + log.info(f"Creating tar file {archive_name} with mode {mode}") uid = _get_uid(owner) gid = _get_gid(group) diff --git a/tests/test_client.py b/tests/test_client.py index 3cf991e4..3ff59b87 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -202,7 +202,7 @@ class ClientTest(unittest.TestCase): # Client setup self.tc = SSHClient() self.tc.get_host_keys().add( - "[%s]:%d" % (self.addr, self.port), "ssh-rsa", public_host_key + f"[{self.addr}]:{self.port}", "ssh-rsa", public_host_key ) # Actual connection @@ -390,7 +390,7 @@ class SSHClientTest(ClientTest): verify that SSHClient's AutoAddPolicy works. """ threading.Thread(target=self._run).start() - hostname = "[%s]:%d" % (self.addr, self.port) + hostname = f"[{self.addr}]:{self.port}" key_file = _support("test_ecdsa_256.key") public_host_key = paramiko.ECDSAKey.from_private_key_file(key_file) @@ -424,7 +424,7 @@ class SSHClientTest(ClientTest): client = SSHClient() assert len(client.get_host_keys()) == 0 - host_id = "[%s]:%d" % (self.addr, self.port) + host_id = f"[{self.addr}]:{self.port}" client.get_host_keys().add(host_id, "ssh-rsa", public_host_key) assert len(client.get_host_keys()) == 1 @@ -523,7 +523,7 @@ class SSHClientTest(ClientTest): self.tc = SSHClient() self.tc.get_host_keys().add( - "[%s]:%d" % (self.addr, self.port), "ssh-rsa", public_host_key + f"[{self.addr}]:{self.port}", "ssh-rsa", public_host_key ) # Connect with a half second banner timeout. kwargs = dict(self.connect_kwargs, banner_timeout=0.5) @@ -619,7 +619,7 @@ class SSHClientTest(ClientTest): def _client_host_key_bad(self, host_key): threading.Thread(target=self._run).start() - hostname = "[%s]:%d" % (self.addr, self.port) + hostname = f"[{self.addr}]:{self.port}" self.tc = SSHClient() self.tc.set_missing_host_key_policy(paramiko.WarningPolicy()) @@ -635,7 +635,7 @@ class SSHClientTest(ClientTest): def _client_host_key_good(self, ktype, kfile): threading.Thread(target=self._run).start() - hostname = "[%s]:%d" % (self.addr, self.port) + hostname = f"[{self.addr}]:{self.port}" self.tc = SSHClient() self.tc.set_missing_host_key_policy(paramiko.RejectPolicy()) diff --git a/tests/test_kex_gss.py b/tests/test_kex_gss.py index 26659ae3..d4868f4a 100644 --- a/tests/test_kex_gss.py +++ b/tests/test_kex_gss.py @@ -101,7 +101,7 @@ class GSSKexTest(KerberosTestCase): self.tc = paramiko.SSHClient() self.tc.get_host_keys().add( - "[%s]:%d" % (self.hostname, self.port), "ssh-rsa", public_host_key + f"[{self.hostname}]:{self.port}", "ssh-rsa", public_host_key ) self.tc.connect( self.hostname, diff --git a/tests/test_sftp.py b/tests/test_sftp.py index bcda5441..5767c00b 100644 --- a/tests/test_sftp.py +++ b/tests/test_sftp.py @@ -775,18 +775,18 @@ class TestSFTP(object): def test_non_utf8_data(self, sftp): """Test write() and read() of non utf8 data""" try: - with sftp.open("%s/nonutf8data" % sftp.FOLDER, "w") as f: + with sftp.open(f"{sftp.FOLDER}/nonutf8data", "w") as f: f.write(NON_UTF8_DATA) - with sftp.open("%s/nonutf8data" % sftp.FOLDER, "r") as f: + with sftp.open(f"{sftp.FOLDER}/nonutf8data", "r") as f: data = f.read() assert data == NON_UTF8_DATA - with sftp.open("%s/nonutf8data" % sftp.FOLDER, "wb") as f: + with sftp.open(f"{sftp.FOLDER}/nonutf8data", "wb") as f: f.write(NON_UTF8_DATA) - with sftp.open("%s/nonutf8data" % sftp.FOLDER, "rb") as f: + with sftp.open(f"{sftp.FOLDER}/nonutf8data", "rb") as f: data = f.read() assert data == NON_UTF8_DATA finally: - sftp.remove("%s/nonutf8data" % sftp.FOLDER) + sftp.remove(f"{sftp.FOLDER}/nonutf8data") @requireNonAsciiLocale("LC_TIME") def test_sftp_attributes_locale_time(self, sftp): @@ -807,26 +807,26 @@ class TestSFTP(object): """Test write() using a buffer instance.""" data = 3 * b"A potentially large block of data to chunk up.\n" try: - with sftp.open("%s/write_buffer" % sftp.FOLDER, "wb") as f: + with sftp.open(f"{sftp.FOLDER}/write_buffer", "wb") as f: for offset in range(0, len(data), 8): f.write(buffer(data, offset, 8)) # noqa - with sftp.open("%s/write_buffer" % sftp.FOLDER, "rb") as f: + with sftp.open(f"{sftp.FOLDER}/write_buffer", "rb") as f: assert f.read() == data finally: - sftp.remove("%s/write_buffer" % sftp.FOLDER) + sftp.remove(f"{sftp.FOLDER}/write_buffer") @needs_builtin("memoryview") def test_write_memoryview(self, sftp): """Test write() using a memoryview instance.""" data = 3 * b"A potentially large block of data to chunk up.\n" try: - with sftp.open("%s/write_memoryview" % sftp.FOLDER, "wb") as f: + with sftp.open(f"{sftp.FOLDER}/write_memoryview", "wb") as f: view = memoryview(data) for offset in range(0, len(data), 8): f.write(view[offset : offset + 8]) - with sftp.open("%s/write_memoryview" % sftp.FOLDER, "rb") as f: + with sftp.open(f"{sftp.FOLDER}/write_memoryview", "rb") as f: assert f.read() == data finally: - sftp.remove("%s/write_memoryview" % sftp.FOLDER) + sftp.remove(f"{sftp.FOLDER}/write_memoryview") diff --git a/tests/test_sftp_big.py b/tests/test_sftp_big.py index 4643bcaa..d2d30f24 100644 --- a/tests/test_sftp_big.py +++ b/tests/test_sftp_big.py @@ -42,24 +42,23 @@ class TestBigSFTP(object): numfiles = 100 try: for i in range(numfiles): - with sftp.open( - "%s/file%d.txt" % (sftp.FOLDER, i), "w", 1 - ) as f: - f.write("this is file #%d.\n" % i) - sftp.chmod("%s/file%d.txt" % (sftp.FOLDER, i), o660) + target = f"{sftp.FOLDER}/file{i}.txt" + with sftp.open(target, "w", 1) as f: + f.write(f"this is file #{i}.\n") + sftp.chmod(target, o660) # now make sure every file is there, by creating a list of filenmes # and reading them in random order. numlist = list(range(numfiles)) while len(numlist) > 0: r = numlist[random.randint(0, len(numlist) - 1)] - with sftp.open("%s/file%d.txt" % (sftp.FOLDER, r)) as f: - assert f.readline() == "this is file #%d.\n" % r + with sftp.open(f"{sftp.FOLDER}/file{r}.txt") as f: + assert f.readline() == f"this is file #{r}.\n" numlist.remove(r) finally: for i in range(numfiles): try: - sftp.remove("%s/file%d.txt" % (sftp.FOLDER, i)) + sftp.remove(f"{sftp.FOLDER}/file{i}.txt") except: pass @@ -70,7 +69,7 @@ class TestBigSFTP(object): kblob = 1024 * b"x" start = time.time() try: - with sftp.open("%s/hongry.txt" % sftp.FOLDER, "w") as f: + with sftp.open(f"{sftp.FOLDER}/hongry.txt", "w") as f: for n in range(1024): f.write(kblob) if n % 128 == 0: @@ -78,21 +77,21 @@ class TestBigSFTP(object): sys.stderr.write(" ") assert ( - sftp.stat("%s/hongry.txt" % sftp.FOLDER).st_size == 1024 * 1024 + sftp.stat(f"{sftp.FOLDER}/hongry.txt").st_size == 1024 * 1024 ) end = time.time() - sys.stderr.write("%ds " % round(end - start)) + sys.stderr.write(f"{round(end - start)}s") start = time.time() - with sftp.open("%s/hongry.txt" % sftp.FOLDER, "r") as f: + with sftp.open(f"{sftp.FOLDER}/hongry.txt", "r") as f: for n in range(1024): data = f.read(1024) assert data == kblob end = time.time() - sys.stderr.write("%ds " % round(end - start)) + sys.stderr.write(f"{round(end - start)}s") finally: - sftp.remove("%s/hongry.txt" % sftp.FOLDER) + sftp.remove(f"{sftp.FOLDER}/hongry.txt") def test_big_file_pipelined(self, sftp): """ @@ -101,7 +100,7 @@ class TestBigSFTP(object): kblob = bytes().join([struct.pack(">H", n) for n in range(512)]) start = time.time() try: - with sftp.open("%s/hongry.txt" % sftp.FOLDER, "wb") as f: + with sftp.open(f"{sftp.FOLDER}/hongry.txt", "wb") as f: f.set_pipelined(True) for n in range(1024): f.write(kblob) @@ -110,13 +109,13 @@ class TestBigSFTP(object): sys.stderr.write(" ") assert ( - sftp.stat("%s/hongry.txt" % sftp.FOLDER).st_size == 1024 * 1024 + sftp.stat(f"{sftp.FOLDER}/hongry.txt").st_size == 1024 * 1024 ) end = time.time() - sys.stderr.write("%ds " % round(end - start)) + sys.stderr.write(f"{round(end - start)}s") start = time.time() - with sftp.open("%s/hongry.txt" % sftp.FOLDER, "rb") as f: + with sftp.open(f"{sftp.FOLDER}/hongry.txt", "rb") as f: file_size = f.stat().st_size f.prefetch(file_size) @@ -135,14 +134,14 @@ class TestBigSFTP(object): n += chunk end = time.time() - sys.stderr.write("%ds " % round(end - start)) + sys.stderr.write(f"{round(end - start)}s") finally: - sftp.remove("%s/hongry.txt" % sftp.FOLDER) + sftp.remove(f"{sftp.FOLDER}/hongry.txt") def test_prefetch_seek(self, sftp): kblob = bytes().join([struct.pack(">H", n) for n in range(512)]) try: - with sftp.open("%s/hongry.txt" % sftp.FOLDER, "wb") as f: + with sftp.open(f"{sftp.FOLDER}/hongry.txt", "wb") as f: f.set_pipelined(True) for n in range(1024): f.write(kblob) @@ -151,14 +150,14 @@ class TestBigSFTP(object): sys.stderr.write(" ") assert ( - sftp.stat("%s/hongry.txt" % sftp.FOLDER).st_size == 1024 * 1024 + sftp.stat(f"{sftp.FOLDER}/hongry.txt").st_size == 1024 * 1024 ) start = time.time() k2blob = kblob + kblob chunk = 793 for i in range(10): - with sftp.open("%s/hongry.txt" % sftp.FOLDER, "rb") as f: + with sftp.open(f"{sftp.FOLDER}/hongry.txt", "rb") as f: file_size = f.stat().st_size f.prefetch(file_size) base_offset = (512 * 1024) + 17 * random.randint( @@ -175,14 +174,14 @@ class TestBigSFTP(object): assert data == k2blob[n_offset : n_offset + chunk] offset += chunk end = time.time() - sys.stderr.write("%ds " % round(end - start)) + sys.stderr.write(f"{round(end - start)}s") finally: - sftp.remove("%s/hongry.txt" % sftp.FOLDER) + sftp.remove(f"{sftp.FOLDER}/hongry.txt") def test_readv_seek(self, sftp): kblob = bytes().join([struct.pack(">H", n) for n in range(512)]) try: - with sftp.open("%s/hongry.txt" % sftp.FOLDER, "wb") as f: + with sftp.open(f"{sftp.FOLDER}/hongry.txt", "wb") as f: f.set_pipelined(True) for n in range(1024): f.write(kblob) @@ -191,14 +190,14 @@ class TestBigSFTP(object): sys.stderr.write(" ") assert ( - sftp.stat("%s/hongry.txt" % sftp.FOLDER).st_size == 1024 * 1024 + sftp.stat(f"{sftp.FOLDER}/hongry.txt").st_size == 1024 * 1024 ) start = time.time() k2blob = kblob + kblob chunk = 793 for i in range(10): - with sftp.open("%s/hongry.txt" % sftp.FOLDER, "rb") as f: + with sftp.open(f"{sftp.FOLDER}/hongry.txt", "rb") as f: base_offset = (512 * 1024) + 17 * random.randint( 1000, 2000 ) @@ -215,9 +214,9 @@ class TestBigSFTP(object): n_offset = offset % 1024 assert next(ret) == k2blob[n_offset : n_offset + chunk] end = time.time() - sys.stderr.write("%ds " % round(end - start)) + sys.stderr.write(f"{round(end - start)}s") finally: - sftp.remove("%s/hongry.txt" % sftp.FOLDER) + sftp.remove(f"{sftp.FOLDER}/hongry.txt") def test_lots_of_prefetching(self, sftp): """ @@ -226,7 +225,7 @@ class TestBigSFTP(object): """ kblob = 1024 * b"x" try: - with sftp.open("%s/hongry.txt" % sftp.FOLDER, "w") as f: + with sftp.open(f"{sftp.FOLDER}/hongry.txt", "w") as f: f.set_pipelined(True) for n in range(1024): f.write(kblob) @@ -235,14 +234,14 @@ class TestBigSFTP(object): sys.stderr.write(" ") assert ( - sftp.stat("%s/hongry.txt" % sftp.FOLDER).st_size == 1024 * 1024 + sftp.stat(f"{sftp.FOLDER}/hongry.txt").st_size == 1024 * 1024 ) for i in range(10): - with sftp.open("%s/hongry.txt" % sftp.FOLDER, "r") as f: + with sftp.open(f"{sftp.FOLDER}/hongry.txt", "r") as f: file_size = f.stat().st_size f.prefetch(file_size) - with sftp.open("%s/hongry.txt" % sftp.FOLDER, "r") as f: + with sftp.open(f"{sftp.FOLDER}/hongry.txt", "r") as f: file_size = f.stat().st_size f.prefetch(file_size) for n in range(1024): @@ -252,7 +251,7 @@ class TestBigSFTP(object): sys.stderr.write(".") sys.stderr.write(" ") finally: - sftp.remove("%s/hongry.txt" % sftp.FOLDER) + sftp.remove(f"{sftp.FOLDER}/hongry.txt") def test_prefetch_readv(self, sftp): """ @@ -260,7 +259,7 @@ class TestBigSFTP(object): """ kblob = bytes().join([struct.pack(">H", n) for n in range(512)]) try: - with sftp.open("%s/hongry.txt" % sftp.FOLDER, "wb") as f: + with sftp.open(f"{sftp.FOLDER}/hongry.txt", "wb") as f: f.set_pipelined(True) for n in range(1024): f.write(kblob) @@ -269,10 +268,10 @@ class TestBigSFTP(object): sys.stderr.write(" ") assert ( - sftp.stat("%s/hongry.txt" % sftp.FOLDER).st_size == 1024 * 1024 + sftp.stat(f"{sftp.FOLDER}/hongry.txt").st_size == 1024 * 1024 ) - with sftp.open("%s/hongry.txt" % sftp.FOLDER, "rb") as f: + with sftp.open(f"{sftp.FOLDER}/hongry.txt", "rb") as f: file_size = f.stat().st_size f.prefetch(file_size) data = f.read(1024) @@ -293,7 +292,7 @@ class TestBigSFTP(object): sys.stderr.write(" ") finally: - sftp.remove("%s/hongry.txt" % sftp.FOLDER) + sftp.remove(f"{sftp.FOLDER}/hongry.txt") def test_large_readv(self, sftp): """ @@ -302,7 +301,7 @@ class TestBigSFTP(object): """ kblob = bytes().join([struct.pack(">H", n) for n in range(512)]) try: - with sftp.open("%s/hongry.txt" % sftp.FOLDER, "wb") as f: + with sftp.open(f"{sftp.FOLDER}/hongry.txt", "wb") as f: f.set_pipelined(True) for n in range(1024): f.write(kblob) @@ -311,10 +310,10 @@ class TestBigSFTP(object): sys.stderr.write(" ") assert ( - sftp.stat("%s/hongry.txt" % sftp.FOLDER).st_size == 1024 * 1024 + sftp.stat(f"{sftp.FOLDER}/hongry.txt").st_size == 1024 * 1024 ) - with sftp.open("%s/hongry.txt" % sftp.FOLDER, "rb") as f: + with sftp.open(f"{sftp.FOLDER}/hongry.txt", "rb") as f: data = list(f.readv([(23 * 1024, 128 * 1024)])) assert len(data) == 1 data = data[0] @@ -322,7 +321,7 @@ class TestBigSFTP(object): sys.stderr.write(" ") finally: - sftp.remove("%s/hongry.txt" % sftp.FOLDER) + sftp.remove(f"{sftp.FOLDER}/hongry.txt") def test_big_file_big_buffer(self, sftp): """ @@ -331,15 +330,15 @@ class TestBigSFTP(object): mblob = 1024 * 1024 * "x" try: with sftp.open( - "%s/hongry.txt" % sftp.FOLDER, "w", 128 * 1024 + f"{sftp.FOLDER}/hongry.txt", "w", 128 * 1024 ) as f: f.write(mblob) assert ( - sftp.stat("%s/hongry.txt" % sftp.FOLDER).st_size == 1024 * 1024 + sftp.stat(f"{sftp.FOLDER}/hongry.txt").st_size == 1024 * 1024 ) finally: - sftp.remove("%s/hongry.txt" % sftp.FOLDER) + sftp.remove(f"{sftp.FOLDER}/hongry.txt") def test_big_file_renegotiate(self, sftp): """ @@ -350,19 +349,19 @@ class TestBigSFTP(object): k32blob = 32 * 1024 * "x" try: with sftp.open( - "%s/hongry.txt" % sftp.FOLDER, "w", 128 * 1024 + f"{sftp.FOLDER}/hongry.txt", "w", 128 * 1024 ) as f: for i in range(32): f.write(k32blob) assert ( - sftp.stat("%s/hongry.txt" % sftp.FOLDER).st_size == 1024 * 1024 + sftp.stat(f"{sftp.FOLDER}/hongry.txt").st_size == 1024 * 1024 ) assert t.H != t.session_id # try to read it too. with sftp.open( - "%s/hongry.txt" % sftp.FOLDER, "r", 128 * 1024 + f"{sftp.FOLDER}/hongry.txt", "r", 128 * 1024 ) as f: file_size = f.stat().st_size f.prefetch(file_size) @@ -370,5 +369,5 @@ class TestBigSFTP(object): while total < 1024 * 1024: total += len(f.read(32 * 1024)) finally: - sftp.remove("%s/hongry.txt" % sftp.FOLDER) + sftp.remove(f"{sftp.FOLDER}/hongry.txt") t.packetizer.REKEY_BYTES = pow(2, 30) diff --git a/tests/test_ssh_gss.py b/tests/test_ssh_gss.py index 4d171854..cff72f7a 100644 --- a/tests/test_ssh_gss.py +++ b/tests/test_ssh_gss.py @@ -106,7 +106,7 @@ class GSSAuthTest(KerberosTestCase): self.tc = paramiko.SSHClient() self.tc.set_missing_host_key_policy(paramiko.WarningPolicy()) self.tc.get_host_keys().add( - "[%s]:%d" % (self.addr, self.port), "ssh-rsa", public_host_key + f"[{self.addr}]:{self.port}", "ssh-rsa", public_host_key ) self.tc.connect( hostname=self.addr, -- cgit v1.2.1