summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSybren A. Stüvel <sybren@stuvel.eu>2016-03-17 15:04:04 +0100
committerSybren A. Stüvel <sybren@stuvel.eu>2016-03-17 15:16:23 +0100
commit83a81107342ba414064703e5919978782d6bee01 (patch)
treebb1963b7a03849ff4523accab2bbda49d0846611
parent5d6603258881710fdd7aac866f6ec7445c864d50 (diff)
downloadrsa-git-83a81107342ba414064703e5919978782d6bee01.tar.gz
Removed deprecated functionality.
The following modules have been removed: - rsa._version133 - rsa._version200 - rsa.bigfile - rsa.varblock The encrypt/decrypt-bigfile CLI commands have also been removed.
-rw-r--r--rsa/_version133.py441
-rw-r--r--rsa/_version200.py513
-rw-r--r--rsa/bigfile.py135
-rw-r--r--rsa/cli.py99
-rw-r--r--rsa/pkcs1.py26
-rw-r--r--rsa/varblock.py179
-rwxr-xr-xsetup.py2
-rw-r--r--tests/test_bigfile.py73
-rw-r--r--tests/test_varblock.py88
9 files changed, 24 insertions, 1532 deletions
diff --git a/rsa/_version133.py b/rsa/_version133.py
deleted file mode 100644
index ff03b45..0000000
--- a/rsa/_version133.py
+++ /dev/null
@@ -1,441 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Copyright 2011 Sybren A. Stüvel <sybren@stuvel.eu>
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# https://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Deprecated version of the RSA module
-
-.. deprecated:: 2.0
-
- This submodule is deprecated and will be completely removed as of version 4.0.
-
-Module for calculating large primes, and RSA encryption, decryption,
-signing and verification. Includes generating public and private keys.
-
-WARNING: this code implements the mathematics of RSA. It is not suitable for
-real-world secure cryptography purposes. It has not been reviewed by a security
-expert. It does not include padding of data. There are many ways in which the
-output of this module, when used without any modification, can be sucessfully
-attacked.
-"""
-
-__author__ = "Sybren Stuvel, Marloes de Boer and Ivo Tamboer"
-__date__ = "2010-02-05"
-__version__ = '1.3.3'
-
-# NOTE: Python's modulo can return negative numbers. We compensate for
-# this behaviour using the abs() function
-
-try:
- import cPickle as pickle
-except ImportError:
- import pickle
-from pickle import dumps, loads
-import base64
-import math
-import os
-import random
-import sys
-import types
-import zlib
-
-from rsa._compat import byte
-
-# Display a warning that this insecure version is imported.
-import warnings
-warnings.warn('Insecure version of the RSA module is imported as %s, be careful'
- % __name__)
-warnings.warn('This submodule is deprecated and will be completely removed as of version 4.0.',
- DeprecationWarning)
-
-
-def gcd(p, q):
- """Returns the greatest common divisor of p and q
-
-
- >>> gcd(42, 6)
- 6
- """
- if p<q: return gcd(q, p)
- if q == 0: return p
- return gcd(q, abs(p%q))
-
-def bytes2int(bytes):
- """Converts a list of bytes or a string to an integer
- """
-
- if not (type(bytes) is types.ListType or type(bytes) is types.StringType):
- raise TypeError("You must pass a string or a list")
-
- # Convert byte stream to integer
- integer = 0
- for byte in bytes:
- integer *= 256
- if type(byte) is types.StringType: byte = ord(byte)
- integer += byte
-
- return integer
-
-def int2bytes(number):
- """Converts a number to a string of bytes
- """
-
- if not (type(number) is types.LongType or type(number) is types.IntType):
- raise TypeError("You must pass a long or an int")
-
- string = ""
-
- while number > 0:
- string = "%s%s" % (byte(number & 0xFF), string)
- number /= 256
-
- return string
-
-def fast_exponentiation(a, p, n):
- """Calculates r = a^p mod n
- """
- result = a % n
- remainders = []
- while p != 1:
- remainders.append(p & 1)
- p = p >> 1
- while remainders:
- rem = remainders.pop()
- result = ((a ** rem) * result ** 2) % n
- return result
-
-def read_random_int(nbits):
- """Reads a random integer of approximately nbits bits rounded up
- to whole bytes"""
-
- nbytes = ceil(nbits/8.)
- randomdata = os.urandom(nbytes)
- return bytes2int(randomdata)
-
-def ceil(x):
- """ceil(x) -> int(math.ceil(x))"""
-
- return int(math.ceil(x))
-
-def randint(minvalue, maxvalue):
- """Returns a random integer x with minvalue <= x <= maxvalue"""
-
- # Safety - get a lot of random data even if the range is fairly
- # small
- min_nbits = 32
-
- # The range of the random numbers we need to generate
- range = maxvalue - minvalue
-
- # Which is this number of bytes
- rangebytes = ceil(math.log(range, 2) / 8.)
-
- # Convert to bits, but make sure it's always at least min_nbits*2
- rangebits = max(rangebytes * 8, min_nbits * 2)
-
- # Take a random number of bits between min_nbits and rangebits
- nbits = random.randint(min_nbits, rangebits)
-
- return (read_random_int(nbits) % range) + minvalue
-
-def fermat_little_theorem(p):
- """Returns 1 if p may be prime, and something else if p definitely
- is not prime"""
-
- a = randint(1, p-1)
- return fast_exponentiation(a, p-1, p)
-
-def jacobi(a, b):
- """Calculates the value of the Jacobi symbol (a/b)
- """
-
- if a % b == 0:
- return 0
- result = 1
- while a > 1:
- if a & 1:
- if ((a-1)*(b-1) >> 2) & 1:
- result = -result
- b, a = a, b % a
- else:
- if ((b ** 2 - 1) >> 3) & 1:
- result = -result
- a = a >> 1
- return result
-
-def jacobi_witness(x, n):
- """Returns False if n is an Euler pseudo-prime with base x, and
- True otherwise.
- """
-
- j = jacobi(x, n) % n
- f = fast_exponentiation(x, (n-1)/2, n)
-
- if j == f: return False
- return True
-
-def randomized_primality_testing(n, k):
- """Calculates whether n is composite (which is always correct) or
- prime (which is incorrect with error probability 2**-k)
-
- Returns False if the number if composite, and True if it's
- probably prime.
- """
-
- q = 0.5 # Property of the jacobi_witness function
-
- # t = int(math.ceil(k / math.log(1/q, 2)))
- t = ceil(k / math.log(1/q, 2))
- for i in range(t+1):
- x = randint(1, n-1)
- if jacobi_witness(x, n): return False
-
- return True
-
-def is_prime(number):
- """Returns True if the number is prime, and False otherwise.
- """
-
- """
- if not fermat_little_theorem(number) == 1:
- # Not prime, according to Fermat's little theorem
- return False
- """
-
- if randomized_primality_testing(number, 5):
- # Prime, according to Jacobi
- return True
-
- # Not prime
- return False
-
-
-def getprime(nbits):
- """Returns a prime number of max. 'math.ceil(nbits/8)*8' bits. In
- other words: nbits is rounded up to whole bytes.
- """
-
- nbytes = int(math.ceil(nbits/8.))
-
- while True:
- integer = read_random_int(nbits)
-
- # Make sure it's odd
- integer |= 1
-
- # Test for primeness
- if is_prime(integer): break
-
- # Retry if not prime
-
- return integer
-
-def are_relatively_prime(a, b):
- """Returns True if a and b are relatively prime, and False if they
- are not.
- """
-
- d = gcd(a, b)
- return (d == 1)
-
-def find_p_q(nbits):
- """Returns a tuple of two different primes of nbits bits"""
-
- p = getprime(nbits)
- while True:
- q = getprime(nbits)
- if not q == p: break
-
- return (p, q)
-
-def extended_euclid_gcd(a, b):
- """Returns a tuple (d, i, j) such that d = gcd(a, b) = ia + jb
- """
-
- if b == 0:
- return (a, 1, 0)
-
- q = abs(a % b)
- r = long(a / b)
- (d, k, l) = extended_euclid_gcd(b, q)
-
- return (d, l, k - l*r)
-
-# Main function: calculate encryption and decryption keys
-def calculate_keys(p, q, nbits):
- """Calculates an encryption and a decryption key for p and q, and
- returns them as a tuple (e, d)"""
-
- n = p * q
- phi_n = (p-1) * (q-1)
-
- while True:
- # Make sure e has enough bits so we ensure "wrapping" through
- # modulo n
- e = getprime(max(8, nbits/2))
- if are_relatively_prime(e, n) and are_relatively_prime(e, phi_n): break
-
- (d, i, j) = extended_euclid_gcd(e, phi_n)
-
- if not d == 1:
- raise Exception("e (%d) and phi_n (%d) are not relatively prime" % (e, phi_n))
-
- if not (e * i) % phi_n == 1:
- raise Exception("e (%d) and i (%d) are not mult. inv. modulo phi_n (%d)" % (e, i, phi_n))
-
- return (e, i)
-
-
-def gen_keys(nbits):
- """Generate RSA keys of nbits bits. Returns (p, q, e, d).
-
- Note: this can take a long time, depending on the key size.
- """
-
- while True:
- (p, q) = find_p_q(nbits)
- (e, d) = calculate_keys(p, q, nbits)
-
- # For some reason, d is sometimes negative. We don't know how
- # to fix it (yet), so we keep trying until everything is shiny
- if d > 0: break
-
- return (p, q, e, d)
-
-def gen_pubpriv_keys(nbits):
- """Generates public and private keys, and returns them as (pub,
- priv).
-
- The public key consists of a dict {e: ..., , n: ....). The private
- key consists of a dict {d: ...., p: ...., q: ....).
- """
-
- (p, q, e, d) = gen_keys(nbits)
-
- return ( {'e': e, 'n': p*q}, {'d': d, 'p': p, 'q': q} )
-
-def encrypt_int(message, ekey, n):
- """Encrypts a message using encryption key 'ekey', working modulo
- n"""
-
- if type(message) is types.IntType:
- return encrypt_int(long(message), ekey, n)
-
- if not type(message) is types.LongType:
- raise TypeError("You must pass a long or an int")
-
- if message > 0 and \
- math.floor(math.log(message, 2)) > math.floor(math.log(n, 2)):
- raise OverflowError("The message is too long")
-
- return fast_exponentiation(message, ekey, n)
-
-def decrypt_int(cyphertext, dkey, n):
- """Decrypts a cypher text using the decryption key 'dkey', working
- modulo n"""
-
- return encrypt_int(cyphertext, dkey, n)
-
-def sign_int(message, dkey, n):
- """Signs 'message' using key 'dkey', working modulo n"""
-
- return decrypt_int(message, dkey, n)
-
-def verify_int(signed, ekey, n):
- """verifies 'signed' using key 'ekey', working modulo n"""
-
- return encrypt_int(signed, ekey, n)
-
-def picklechops(chops):
- """Pickles and base64encodes it's argument chops"""
-
- value = zlib.compress(dumps(chops))
- encoded = base64.encodestring(value)
- return encoded.strip()
-
-def unpicklechops(string):
- """base64decodes and unpickes it's argument string into chops"""
-
- return loads(zlib.decompress(base64.decodestring(string)))
-
-def chopstring(message, key, n, funcref):
- """Splits 'message' into chops that are at most as long as n,
- converts these into integers, and calls funcref(integer, key, n)
- for each chop.
-
- Used by 'encrypt' and 'sign'.
- """
-
- msglen = len(message)
- mbits = msglen * 8
- nbits = int(math.floor(math.log(n, 2)))
- nbytes = nbits / 8
- blocks = msglen / nbytes
-
- if msglen % nbytes > 0:
- blocks += 1
-
- cypher = []
-
- for bindex in range(blocks):
- offset = bindex * nbytes
- block = message[offset:offset+nbytes]
- value = bytes2int(block)
- cypher.append(funcref(value, key, n))
-
- return picklechops(cypher)
-
-def gluechops(chops, key, n, funcref):
- """Glues chops back together into a string. calls
- funcref(integer, key, n) for each chop.
-
- Used by 'decrypt' and 'verify'.
- """
- message = ""
-
- chops = unpicklechops(chops)
-
- for cpart in chops:
- mpart = funcref(cpart, key, n)
- message += int2bytes(mpart)
-
- return message
-
-def encrypt(message, key):
- """Encrypts a string 'message' with the public key 'key'"""
-
- return chopstring(message, key['e'], key['n'], encrypt_int)
-
-def sign(message, key):
- """Signs a string 'message' with the private key 'key'"""
-
- return chopstring(message, key['d'], key['p']*key['q'], decrypt_int)
-
-def decrypt(cypher, key):
- """Decrypts a cypher with the private key 'key'"""
-
- return gluechops(cypher, key['d'], key['p']*key['q'], decrypt_int)
-
-def verify(cypher, key):
- """Verifies a cypher with the public key 'key'"""
-
- return gluechops(cypher, key['e'], key['n'], encrypt_int)
-
-# Do doctest if we're not imported
-if __name__ == "__main__":
- import doctest
- doctest.testmod()
-
-__all__ = ["gen_pubpriv_keys", "encrypt", "decrypt", "sign", "verify"]
-
diff --git a/rsa/_version200.py b/rsa/_version200.py
deleted file mode 100644
index 1a16949..0000000
--- a/rsa/_version200.py
+++ /dev/null
@@ -1,513 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Copyright 2011 Sybren A. Stüvel <sybren@stuvel.eu>
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# https://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Deprecated version of the RSA module
-
-.. deprecated:: 3.0
-
- This submodule is deprecated and will be completely removed as of version 4.0.
-
-"""
-
-__author__ = "Sybren Stuvel, Marloes de Boer, Ivo Tamboer, and Barry Mead"
-__date__ = "2010-02-08"
-__version__ = '2.0'
-
-import math
-import os
-import random
-import sys
-import types
-from rsa._compat import byte
-
-# Display a warning that this insecure version is imported.
-import warnings
-warnings.warn('Insecure version of the RSA module is imported as %s' % __name__)
-warnings.warn('This submodule is deprecated and will be completely removed as of version 4.0.',
- DeprecationWarning)
-
-
-def bit_size(number):
- """Returns the number of bits required to hold a specific long number"""
-
- return int(math.ceil(math.log(number,2)))
-
-def gcd(p, q):
- """Returns the greatest common divisor of p and q
- >>> gcd(48, 180)
- 12
- """
- # Iterateive Version is faster and uses much less stack space
- while q != 0:
- if p < q: (p,q) = (q,p)
- (p,q) = (q, p % q)
- return p
-
-
-def bytes2int(bytes):
- r"""Converts a list of bytes or a string to an integer
- """
-
- if not (type(bytes) is types.ListType or type(bytes) is types.StringType):
- raise TypeError("You must pass a string or a list")
-
- # Convert byte stream to integer
- integer = 0
- for byte in bytes:
- integer *= 256
- if type(byte) is types.StringType: byte = ord(byte)
- integer += byte
-
- return integer
-
-def int2bytes(number):
- """
- Converts a number to a string of bytes
- """
-
- if not (type(number) is types.LongType or type(number) is types.IntType):
- raise TypeError("You must pass a long or an int")
-
- string = ""
-
- while number > 0:
- string = "%s%s" % (byte(number & 0xFF), string)
- number /= 256
-
- return string
-
-def to64(number):
- """Converts a number in the range of 0 to 63 into base 64 digit
- character in the range of '0'-'9', 'A'-'Z', 'a'-'z','-','_'.
- """
-
- if not (type(number) is types.LongType or type(number) is types.IntType):
- raise TypeError("You must pass a long or an int")
-
- if 0 <= number <= 9: #00-09 translates to '0' - '9'
- return byte(number + 48)
-
- if 10 <= number <= 35:
- return byte(number + 55) #10-35 translates to 'A' - 'Z'
-
- if 36 <= number <= 61:
- return byte(number + 61) #36-61 translates to 'a' - 'z'
-
- if number == 62: # 62 translates to '-' (minus)
- return byte(45)
-
- if number == 63: # 63 translates to '_' (underscore)
- return byte(95)
-
- raise ValueError('Invalid Base64 value: %i' % number)
-
-
-def from64(number):
- """Converts an ordinal character value in the range of
- 0-9,A-Z,a-z,-,_ to a number in the range of 0-63.
- """
-
- if not (type(number) is types.LongType or type(number) is types.IntType):
- raise TypeError("You must pass a long or an int")
-
- if 48 <= number <= 57: #ord('0') - ord('9') translates to 0-9
- return(number - 48)
-
- if 65 <= number <= 90: #ord('A') - ord('Z') translates to 10-35
- return(number - 55)
-
- if 97 <= number <= 122: #ord('a') - ord('z') translates to 36-61
- return(number - 61)
-
- if number == 45: #ord('-') translates to 62
- return(62)
-
- if number == 95: #ord('_') translates to 63
- return(63)
-
- raise ValueError('Invalid Base64 value: %i' % number)
-
-
-def int2str64(number):
- """Converts a number to a string of base64 encoded characters in
- the range of '0'-'9','A'-'Z,'a'-'z','-','_'.
- """
-
- if not (type(number) is types.LongType or type(number) is types.IntType):
- raise TypeError("You must pass a long or an int")
-
- string = ""
-
- while number > 0:
- string = "%s%s" % (to64(number & 0x3F), string)
- number /= 64
-
- return string
-
-
-def str642int(string):
- """Converts a base64 encoded string into an integer.
- The chars of this string in in the range '0'-'9','A'-'Z','a'-'z','-','_'
- """
-
- if not (type(string) is types.ListType or type(string) is types.StringType):
- raise TypeError("You must pass a string or a list")
-
- integer = 0
- for byte in string:
- integer *= 64
- if type(byte) is types.StringType: byte = ord(byte)
- integer += from64(byte)
-
- return integer
-
-def read_random_int(nbits):
- """Reads a random integer of approximately nbits bits rounded up
- to whole bytes"""
-
- nbytes = int(math.ceil(nbits/8.))
- randomdata = os.urandom(nbytes)
- return bytes2int(randomdata)
-
-def randint(minvalue, maxvalue):
- """Returns a random integer x with minvalue <= x <= maxvalue"""
-
- # Safety - get a lot of random data even if the range is fairly
- # small
- min_nbits = 32
-
- # The range of the random numbers we need to generate
- range = (maxvalue - minvalue) + 1
-
- # Which is this number of bytes
- rangebytes = ((bit_size(range) + 7) / 8)
-
- # Convert to bits, but make sure it's always at least min_nbits*2
- rangebits = max(rangebytes * 8, min_nbits * 2)
-
- # Take a random number of bits between min_nbits and rangebits
- nbits = random.randint(min_nbits, rangebits)
-
- return (read_random_int(nbits) % range) + minvalue
-
-def jacobi(a, b):
- """Calculates the value of the Jacobi symbol (a/b)
- where both a and b are positive integers, and b is odd
- """
-
- if a == 0: return 0
- result = 1
- while a > 1:
- if a & 1:
- if ((a-1)*(b-1) >> 2) & 1:
- result = -result
- a, b = b % a, a
- else:
- if (((b * b) - 1) >> 3) & 1:
- result = -result
- a >>= 1
- if a == 0: return 0
- return result
-
-def jacobi_witness(x, n):
- """Returns False if n is an Euler pseudo-prime with base x, and
- True otherwise.
- """
-
- j = jacobi(x, n) % n
- f = pow(x, (n-1)/2, n)
-
- if j == f: return False
- return True
-
-def randomized_primality_testing(n, k):
- """Calculates whether n is composite (which is always correct) or
- prime (which is incorrect with error probability 2**-k)
-
- Returns False if the number is composite, and True if it's
- probably prime.
- """
-
- # 50% of Jacobi-witnesses can report compositness of non-prime numbers
-
- for i in range(k):
- x = randint(1, n-1)
- if jacobi_witness(x, n): return False
-
- return True
-
-def is_prime(number):
- """Returns True if the number is prime, and False otherwise.
- """
-
- if randomized_primality_testing(number, 6):
- # Prime, according to Jacobi
- return True
-
- # Not prime
- return False
-
-
-def getprime(nbits):
- """Returns a prime number of max. 'math.ceil(nbits/8)*8' bits. In
- other words: nbits is rounded up to whole bytes.
- """
-
- while True:
- integer = read_random_int(nbits)
-
- # Make sure it's odd
- integer |= 1
-
- # Test for primeness
- if is_prime(integer): break
-
- # Retry if not prime
-
- return integer
-
-def are_relatively_prime(a, b):
- """Returns True if a and b are relatively prime, and False if they
- are not.
-
- >>> are_relatively_prime(2, 3)
- 1
- >>> are_relatively_prime(2, 4)
- 0
- """
-
- d = gcd(a, b)
- return (d == 1)
-
-def find_p_q(nbits):
- """Returns a tuple of two different primes of nbits bits"""
- pbits = nbits + (nbits/16) #Make sure that p and q aren't too close
- qbits = nbits - (nbits/16) #or the factoring programs can factor n
- p = getprime(pbits)
- while True:
- q = getprime(qbits)
- #Make sure p and q are different.
- if not q == p: break
- return (p, q)
-
-def extended_gcd(a, b):
- """Returns a tuple (r, i, j) such that r = gcd(a, b) = ia + jb
- """
- # r = gcd(a,b) i = multiplicitive inverse of a mod b
- # or j = multiplicitive inverse of b mod a
- # Neg return values for i or j are made positive mod b or a respectively
- # Iterateive Version is faster and uses much less stack space
- x = 0
- y = 1
- lx = 1
- ly = 0
- oa = a #Remember original a/b to remove
- ob = b #negative values from return results
- while b != 0:
- q = long(a/b)
- (a, b) = (b, a % b)
- (x, lx) = ((lx - (q * x)),x)
- (y, ly) = ((ly - (q * y)),y)
- if (lx < 0): lx += ob #If neg wrap modulo orignal b
- if (ly < 0): ly += oa #If neg wrap modulo orignal a
- return (a, lx, ly) #Return only positive values
-
-# Main function: calculate encryption and decryption keys
-def calculate_keys(p, q, nbits):
- """Calculates an encryption and a decryption key for p and q, and
- returns them as a tuple (e, d)"""
-
- n = p * q
- phi_n = (p-1) * (q-1)
-
- while True:
- # Make sure e has enough bits so we ensure "wrapping" through
- # modulo n
- e = max(65537,getprime(nbits/4))
- if are_relatively_prime(e, n) and are_relatively_prime(e, phi_n): break
-
- (d, i, j) = extended_gcd(e, phi_n)
-
- if not d == 1:
- raise Exception("e (%d) and phi_n (%d) are not relatively prime" % (e, phi_n))
- if (i < 0):
- raise Exception("New extended_gcd shouldn't return negative values")
- if not (e * i) % phi_n == 1:
- raise Exception("e (%d) and i (%d) are not mult. inv. modulo phi_n (%d)" % (e, i, phi_n))
-
- return (e, i)
-
-
-def gen_keys(nbits):
- """Generate RSA keys of nbits bits. Returns (p, q, e, d).
-
- Note: this can take a long time, depending on the key size.
- """
-
- (p, q) = find_p_q(nbits)
- (e, d) = calculate_keys(p, q, nbits)
-
- return (p, q, e, d)
-
-def newkeys(nbits):
- """Generates public and private keys, and returns them as (pub,
- priv).
-
- The public key consists of a dict {e: ..., , n: ....). The private
- key consists of a dict {d: ...., p: ...., q: ....).
- """
- nbits = max(9,nbits) # Don't let nbits go below 9 bits
- (p, q, e, d) = gen_keys(nbits)
-
- return ( {'e': e, 'n': p*q}, {'d': d, 'p': p, 'q': q} )
-
-def encrypt_int(message, ekey, n):
- """Encrypts a message using encryption key 'ekey', working modulo n"""
-
- if type(message) is types.IntType:
- message = long(message)
-
- if not type(message) is types.LongType:
- raise TypeError("You must pass a long or int")
-
- if message < 0 or message > n:
- raise OverflowError("The message is too long")
-
- #Note: Bit exponents start at zero (bit counts start at 1) this is correct
- safebit = bit_size(n) - 2 #compute safe bit (MSB - 1)
- message += (1 << safebit) #add safebit to ensure folding
-
- return pow(message, ekey, n)
-
-def decrypt_int(cyphertext, dkey, n):
- """Decrypts a cypher text using the decryption key 'dkey', working
- modulo n"""
-
- message = pow(cyphertext, dkey, n)
-
- safebit = bit_size(n) - 2 #compute safe bit (MSB - 1)
- message -= (1 << safebit) #remove safebit before decode
-
- return message
-
-def encode64chops(chops):
- """base64encodes chops and combines them into a ',' delimited string"""
-
- chips = [] #chips are character chops
-
- for value in chops:
- chips.append(int2str64(value))
-
- #delimit chops with comma
- encoded = ','.join(chips)
-
- return encoded
-
-def decode64chops(string):
- """base64decodes and makes a ',' delimited string into chops"""
-
- chips = string.split(',') #split chops at commas
-
- chops = []
-
- for string in chips: #make char chops (chips) into chops
- chops.append(str642int(string))
-
- return chops
-
-def chopstring(message, key, n, funcref):
- """Chops the 'message' into integers that fit into n,
- leaving room for a safebit to be added to ensure that all
- messages fold during exponentiation. The MSB of the number n
- is not independant modulo n (setting it could cause overflow), so
- use the next lower bit for the safebit. Therefore reserve 2-bits
- in the number n for non-data bits. Calls specified encryption
- function for each chop.
-
- Used by 'encrypt' and 'sign'.
- """
-
- msglen = len(message)
- mbits = msglen * 8
- #Set aside 2-bits so setting of safebit won't overflow modulo n.
- nbits = bit_size(n) - 2 # leave room for safebit
- nbytes = nbits / 8
- blocks = msglen / nbytes
-
- if msglen % nbytes > 0:
- blocks += 1
-
- cypher = []
-
- for bindex in range(blocks):
- offset = bindex * nbytes
- block = message[offset:offset+nbytes]
- value = bytes2int(block)
- cypher.append(funcref(value, key, n))
-
- return encode64chops(cypher) #Encode encrypted ints to base64 strings
-
-def gluechops(string, key, n, funcref):
- """Glues chops back together into a string. calls
- funcref(integer, key, n) for each chop.
-
- Used by 'decrypt' and 'verify'.
- """
- message = ""
-
- chops = decode64chops(string) #Decode base64 strings into integer chops
-
- for cpart in chops:
- mpart = funcref(cpart, key, n) #Decrypt each chop
- message += int2bytes(mpart) #Combine decrypted strings into a msg
-
- return message
-
-def encrypt(message, key):
- """Encrypts a string 'message' with the public key 'key'"""
- if 'n' not in key:
- raise Exception("You must use the public key with encrypt")
-
- return chopstring(message, key['e'], key['n'], encrypt_int)
-
-def sign(message, key):
- """Signs a string 'message' with the private key 'key'"""
- if 'p' not in key:
- raise Exception("You must use the private key with sign")
-
- return chopstring(message, key['d'], key['p']*key['q'], encrypt_int)
-
-def decrypt(cypher, key):
- """Decrypts a string 'cypher' with the private key 'key'"""
- if 'p' not in key:
- raise Exception("You must use the private key with decrypt")
-
- return gluechops(cypher, key['d'], key['p']*key['q'], decrypt_int)
-
-def verify(cypher, key):
- """Verifies a string 'cypher' with the public key 'key'"""
- if 'n' not in key:
- raise Exception("You must use the public key with verify")
-
- return gluechops(cypher, key['e'], key['n'], decrypt_int)
-
-# Do doctest if we're not imported
-if __name__ == "__main__":
- import doctest
- doctest.testmod()
-
-__all__ = ["newkeys", "encrypt", "decrypt", "sign", "verify"]
-
diff --git a/rsa/bigfile.py b/rsa/bigfile.py
deleted file mode 100644
index 3a09716..0000000
--- a/rsa/bigfile.py
+++ /dev/null
@@ -1,135 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Copyright 2011 Sybren A. Stüvel <sybren@stuvel.eu>
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# https://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Large file support
-
-.. deprecated:: 3.4
-
- The VARBLOCK format is NOT recommended for general use, has been deprecated since
- Python-RSA 3.4, and will be removed in a future release. It's vulnerable to a
- number of attacks:
-
- 1. decrypt/encrypt_bigfile() does not implement `Authenticated encryption`_ nor
- uses MACs to verify messages before decrypting public key encrypted messages.
-
- 2. decrypt/encrypt_bigfile() does not use hybrid encryption (it uses plain RSA)
- and has no method for chaining, so block reordering is possible.
-
- See `issue #19 on Github`_ for more information.
-
-.. _Authenticated encryption: https://en.wikipedia.org/wiki/Authenticated_encryption
-.. _issue #19 on Github: https://github.com/sybrenstuvel/python-rsa/issues/13
-
-
-This module contains functions to:
-
- - break a file into smaller blocks, and encrypt them, and store the
- encrypted blocks in another file.
-
- - take such an encrypted files, decrypt its blocks, and reconstruct the
- original file.
-
-The encrypted file format is as follows, where || denotes byte concatenation:
-
- FILE := VERSION || BLOCK || BLOCK ...
-
- BLOCK := LENGTH || DATA
-
- LENGTH := varint-encoded length of the subsequent data. Varint comes from
- Google Protobuf, and encodes an integer into a variable number of bytes.
- Each byte uses the 7 lowest bits to encode the value. The highest bit set
- to 1 indicates the next byte is also part of the varint. The last byte will
- have this bit set to 0.
-
-This file format is called the VARBLOCK format, in line with the varint format
-used to denote the block sizes.
-
-"""
-
-import warnings
-
-from rsa import key, common, pkcs1, varblock
-from rsa._compat import byte
-
-
-def encrypt_bigfile(infile, outfile, pub_key):
- """Encrypts a file, writing it to 'outfile' in VARBLOCK format.
-
- .. deprecated:: 3.4
- This function was deprecated in Python-RSA version 3.4 due to security issues
- in the VARBLOCK format. See the documentation_ for more information.
-
- .. _documentation: https://stuvel.eu/python-rsa-doc/usage.html#working-with-big-files
-
- :param infile: file-like object to read the cleartext from
- :param outfile: file-like object to write the crypto in VARBLOCK format to
- :param pub_key: :py:class:`rsa.PublicKey` to encrypt with
-
- """
-
- warnings.warn("The 'rsa.bigfile.encrypt_bigfile' function was deprecated in Python-RSA version "
- "3.4 due to security issues in the VARBLOCK format. See "
- "https://stuvel.eu/python-rsa-doc/usage.html#working-with-big-files "
- "for more information.",
- DeprecationWarning, stacklevel=2)
-
- if not isinstance(pub_key, key.PublicKey):
- raise TypeError('Public key required, but got %r' % pub_key)
-
- key_bytes = common.bit_size(pub_key.n) // 8
- blocksize = key_bytes - 11 # keep space for PKCS#1 padding
-
- # Write the version number to the VARBLOCK file
- outfile.write(byte(varblock.VARBLOCK_VERSION))
-
- # Encrypt and write each block
- for block in varblock.yield_fixedblocks(infile, blocksize):
- crypto = pkcs1.encrypt(block, pub_key)
-
- varblock.write_varint(outfile, len(crypto))
- outfile.write(crypto)
-
-
-def decrypt_bigfile(infile, outfile, priv_key):
- """Decrypts an encrypted VARBLOCK file, writing it to 'outfile'
-
- .. deprecated:: 3.4
- This function was deprecated in Python-RSA version 3.4 due to security issues
- in the VARBLOCK format. See the documentation_ for more information.
-
- .. _documentation: https://stuvel.eu/python-rsa-doc/usage.html#working-with-big-files
-
- :param infile: file-like object to read the crypto in VARBLOCK format from
- :param outfile: file-like object to write the cleartext to
- :param priv_key: :py:class:`rsa.PrivateKey` to decrypt with
-
- """
-
- warnings.warn("The 'rsa.bigfile.decrypt_bigfile' function was deprecated in Python-RSA version "
- "3.4 due to security issues in the VARBLOCK format. See "
- "https://stuvel.eu/python-rsa-doc/usage.html#working-with-big-files "
- "for more information.",
- DeprecationWarning, stacklevel=2)
-
- if not isinstance(priv_key, key.PrivateKey):
- raise TypeError('Private key required, but got %r' % priv_key)
-
- for block in varblock.yield_varblocks(infile):
- cleartext = pkcs1.decrypt(block, priv_key)
- outfile.write(cleartext)
-
-
-__all__ = ['encrypt_bigfile', 'decrypt_bigfile']
diff --git a/rsa/cli.py b/rsa/cli.py
index 3a21878..9415b97 100644
--- a/rsa/cli.py
+++ b/rsa/cli.py
@@ -26,7 +26,6 @@ import sys
from optparse import OptionParser
import rsa
-import rsa.bigfile
import rsa.pkcs1
HASH_METHODS = sorted(rsa.pkcs1.HASH_METHODS.keys())
@@ -198,8 +197,7 @@ class EncryptOperation(CryptoOperation):
keyname = 'public'
description = ('Encrypts a file. The file must be shorter than the key '
- 'length in order to be encrypted. For larger files, use the '
- 'pyrsa-encrypt-bigfile command.')
+ 'length in order to be encrypted.')
operation = 'encrypt'
operation_past = 'encrypted'
operation_progressive = 'encrypting'
@@ -215,8 +213,7 @@ class DecryptOperation(CryptoOperation):
keyname = 'private'
description = ('Decrypts a file. The original file must be shorter than '
- 'the key length in order to have been encrypted. For larger '
- 'files, use the pyrsa-decrypt-bigfile command.')
+ 'the key length in order to have been encrypted.')
operation = 'decrypt'
operation_past = 'decrypted'
operation_progressive = 'decrypting'
@@ -285,99 +282,7 @@ class VerifyOperation(CryptoOperation):
print('Verification OK', file=sys.stderr)
-class BigfileOperation(CryptoOperation):
- """CryptoOperation that doesn't read the entire file into memory."""
-
- def __init__(self):
- CryptoOperation.__init__(self)
-
- self.file_objects = []
-
- def __del__(self):
- """Closes any open file handles."""
-
- for fobj in self.file_objects:
- fobj.close()
-
- def __call__(self):
- """Runs the program."""
-
- (cli, cli_args) = self.parse_cli()
-
- key = self.read_key(cli_args[0], cli.keyform)
-
- # Get the file handles
- infile = self.get_infile(cli.input)
- outfile = self.get_outfile(cli.output)
-
- # Call the operation
- print(self.operation_progressive.title(), file=sys.stderr)
- self.perform_operation(infile, outfile, key, cli_args)
-
- def get_infile(self, inname):
- """Returns the input file object"""
-
- if inname:
- print('Reading input from %s' % inname, file=sys.stderr)
- fobj = open(inname, 'rb')
- self.file_objects.append(fobj)
- else:
- print('Reading input from stdin', file=sys.stderr)
- fobj = sys.stdin
-
- return fobj
-
- def get_outfile(self, outname):
- """Returns the output file object"""
-
- if outname:
- print('Will write output to %s' % outname, file=sys.stderr)
- fobj = open(outname, 'wb')
- self.file_objects.append(fobj)
- else:
- print('Will write output to stdout', file=sys.stderr)
- fobj = sys.stdout
-
- return fobj
-
-
-class EncryptBigfileOperation(BigfileOperation):
- """Encrypts a file to VARBLOCK format."""
-
- keyname = 'public'
- description = ('Encrypts a file to an encrypted VARBLOCK file. The file '
- 'can be larger than the key length, but the output file is only '
- 'compatible with Python-RSA.')
- operation = 'encrypt'
- operation_past = 'encrypted'
- operation_progressive = 'encrypting'
-
- def perform_operation(self, infile, outfile, pub_key, cli_args=None):
- """Encrypts files to VARBLOCK."""
-
- return rsa.bigfile.encrypt_bigfile(infile, outfile, pub_key)
-
-
-class DecryptBigfileOperation(BigfileOperation):
- """Decrypts a file in VARBLOCK format."""
-
- keyname = 'private'
- description = ('Decrypts an encrypted VARBLOCK file that was encrypted '
- 'with pyrsa-encrypt-bigfile')
- operation = 'decrypt'
- operation_past = 'decrypted'
- operation_progressive = 'decrypting'
- key_class = rsa.PrivateKey
-
- def perform_operation(self, infile, outfile, priv_key, cli_args=None):
- """Decrypts a VARBLOCK file."""
-
- return rsa.bigfile.decrypt_bigfile(infile, outfile, priv_key)
-
-
encrypt = EncryptOperation()
decrypt = DecryptOperation()
sign = SignOperation()
verify = VerifyOperation()
-encrypt_bigfile = EncryptBigfileOperation()
-decrypt_bigfile = DecryptBigfileOperation()
diff --git a/rsa/pkcs1.py b/rsa/pkcs1.py
index 28f0dc5..fdbf093 100644
--- a/rsa/pkcs1.py
+++ b/rsa/pkcs1.py
@@ -317,6 +317,27 @@ def verify(message, signature, pub_key):
return True
+def yield_fixedblocks(infile, blocksize):
+ """Generator, yields each block of ``blocksize`` bytes in the input file.
+
+ :param infile: file to read and separate in blocks.
+ :param blocksize: block size in bytes.
+ :returns: a generator that yields the contents of each block
+ """
+
+ while True:
+ block = infile.read(blocksize)
+
+ read_bytes = len(block)
+ if read_bytes == 0:
+ break
+
+ yield block
+
+ if read_bytes < blocksize:
+ break
+
+
def _hash(message, method_name):
"""Returns the message digest.
@@ -335,11 +356,8 @@ def _hash(message, method_name):
hasher = method()
if hasattr(message, 'read') and hasattr(message.read, '__call__'):
- # Late import to prevent DeprecationWarnings.
- from . import varblock
-
# read as 1K blocks
- for block in varblock.yield_fixedblocks(message, 1024):
+ for block in yield_fixedblocks(message, 1024):
hasher.update(block)
else:
# hash the message object itself.
diff --git a/rsa/varblock.py b/rsa/varblock.py
deleted file mode 100644
index 1c8d839..0000000
--- a/rsa/varblock.py
+++ /dev/null
@@ -1,179 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Copyright 2011 Sybren A. Stüvel <sybren@stuvel.eu>
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# https://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""VARBLOCK file support
-
-.. deprecated:: 3.4
-
- The VARBLOCK format is NOT recommended for general use, has been deprecated since
- Python-RSA 3.4, and will be removed in a future release. It's vulnerable to a
- number of attacks:
-
- 1. decrypt/encrypt_bigfile() does not implement `Authenticated encryption`_ nor
- uses MACs to verify messages before decrypting public key encrypted messages.
-
- 2. decrypt/encrypt_bigfile() does not use hybrid encryption (it uses plain RSA)
- and has no method for chaining, so block reordering is possible.
-
- See `issue #19 on Github`_ for more information.
-
-.. _Authenticated encryption: https://en.wikipedia.org/wiki/Authenticated_encryption
-.. _issue #19 on Github: https://github.com/sybrenstuvel/python-rsa/issues/13
-
-
-The VARBLOCK file format is as follows, where || denotes byte concatenation:
-
- FILE := VERSION || BLOCK || BLOCK ...
-
- BLOCK := LENGTH || DATA
-
- LENGTH := varint-encoded length of the subsequent data. Varint comes from
- Google Protobuf, and encodes an integer into a variable number of bytes.
- Each byte uses the 7 lowest bits to encode the value. The highest bit set
- to 1 indicates the next byte is also part of the varint. The last byte will
- have this bit set to 0.
-
-This file format is called the VARBLOCK format, in line with the varint format
-used to denote the block sizes.
-
-"""
-
-import warnings
-
-from rsa._compat import byte, b
-
-ZERO_BYTE = b('\x00')
-VARBLOCK_VERSION = 1
-
-warnings.warn("The 'rsa.varblock' module was deprecated in Python-RSA version "
- "3.4 due to security issues in the VARBLOCK format. See "
- "https://github.com/sybrenstuvel/python-rsa/issues/13 for more information.",
- DeprecationWarning)
-
-
-def read_varint(infile):
- """Reads a varint from the file.
-
- When the first byte to be read indicates EOF, (0, 0) is returned. When an
- EOF occurs when at least one byte has been read, an EOFError exception is
- raised.
-
- :param infile: the file-like object to read from. It should have a read()
- method.
- :returns: (varint, length), the read varint and the number of read bytes.
- """
-
- varint = 0
- read_bytes = 0
-
- while True:
- char = infile.read(1)
- if len(char) == 0:
- if read_bytes == 0:
- return 0, 0
- raise EOFError('EOF while reading varint, value is %i so far' %
- varint)
-
- byte = ord(char)
- varint += (byte & 0x7F) << (7 * read_bytes)
-
- read_bytes += 1
-
- if not byte & 0x80:
- return varint, read_bytes
-
-
-def write_varint(outfile, value):
- """Writes a varint to a file.
-
- :param outfile: the file-like object to write to. It should have a write()
- method.
- :returns: the number of written bytes.
- """
-
- # there is a big difference between 'write the value 0' (this case) and
- # 'there is nothing left to write' (the false-case of the while loop)
-
- if value == 0:
- outfile.write(ZERO_BYTE)
- return 1
-
- written_bytes = 0
- while value > 0:
- to_write = value & 0x7f
- value >>= 7
-
- if value > 0:
- to_write |= 0x80
-
- outfile.write(byte(to_write))
- written_bytes += 1
-
- return written_bytes
-
-
-def yield_varblocks(infile):
- """Generator, yields each block in the input file.
-
- :param infile: file to read, is expected to have the VARBLOCK format as
- described in the module's docstring.
- @yields the contents of each block.
- """
-
- # Check the version number
- first_char = infile.read(1)
- if len(first_char) == 0:
- raise EOFError('Unable to read VARBLOCK version number')
-
- version = ord(first_char)
- if version != VARBLOCK_VERSION:
- raise ValueError('VARBLOCK version %i not supported' % version)
-
- while True:
- (block_size, read_bytes) = read_varint(infile)
-
- # EOF at block boundary, that's fine.
- if read_bytes == 0 and block_size == 0:
- break
-
- block = infile.read(block_size)
-
- read_size = len(block)
- if read_size != block_size:
- raise EOFError('Block size is %i, but could read only %i bytes' %
- (block_size, read_size))
-
- yield block
-
-
-def yield_fixedblocks(infile, blocksize):
- """Generator, yields each block of ``blocksize`` bytes in the input file.
-
- :param infile: file to read and separate in blocks.
- :returns: a generator that yields the contents of each block
- """
-
- while True:
- block = infile.read(blocksize)
-
- read_bytes = len(block)
- if read_bytes == 0:
- break
-
- yield block
-
- if read_bytes < blocksize:
- break
diff --git a/setup.py b/setup.py
index a4d1d41..ee2968f 100755
--- a/setup.py
+++ b/setup.py
@@ -54,8 +54,6 @@ if __name__ == '__main__':
'pyrsa-decrypt = rsa.cli:decrypt',
'pyrsa-sign = rsa.cli:sign',
'pyrsa-verify = rsa.cli:verify',
- 'pyrsa-encrypt-bigfile = rsa.cli:encrypt_bigfile',
- 'pyrsa-decrypt-bigfile = rsa.cli:decrypt_bigfile',
]},
)
diff --git a/tests/test_bigfile.py b/tests/test_bigfile.py
deleted file mode 100644
index 70278dc..0000000
--- a/tests/test_bigfile.py
+++ /dev/null
@@ -1,73 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Copyright 2011 Sybren A. Stüvel <sybren@stuvel.eu>
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# https://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Tests block operations."""
-
-from rsa._compat import b
-
-try:
- from StringIO import StringIO as BytesIO
-except ImportError:
- from io import BytesIO
-import unittest
-
-import rsa
-from rsa import bigfile, varblock, pkcs1
-
-
-class BigfileTest(unittest.TestCase):
- def test_encrypt_decrypt_bigfile(self):
- # Expected block size + 11 bytes padding
- pub_key, priv_key = rsa.newkeys((6 + 11) * 8)
-
- # Encrypt the file
- message = b('123456Sybren')
- infile = BytesIO(message)
- outfile = BytesIO()
-
- bigfile.encrypt_bigfile(infile, outfile, pub_key)
-
- # Test
- crypto = outfile.getvalue()
-
- cryptfile = BytesIO(crypto)
- clearfile = BytesIO()
-
- bigfile.decrypt_bigfile(cryptfile, clearfile, priv_key)
- self.assertEquals(clearfile.getvalue(), message)
-
- # We have 2x6 bytes in the message, so that should result in two
- # bigfile.
- cryptfile.seek(0)
- varblocks = list(varblock.yield_varblocks(cryptfile))
- self.assertEqual(2, len(varblocks))
-
- def test_sign_verify_bigfile(self):
- # Large enough to store MD5-sum and ASN.1 code for MD5
- pub_key, priv_key = rsa.newkeys((34 + 11) * 8)
-
- # Sign the file
- msgfile = BytesIO(b('123456Sybren'))
- signature = pkcs1.sign(msgfile, priv_key, 'MD5')
-
- # Check the signature
- msgfile.seek(0)
- self.assertTrue(pkcs1.verify(msgfile, signature, pub_key))
-
- # Alter the message, re-check
- msgfile = BytesIO(b('123456sybren'))
- self.assertRaises(pkcs1.VerificationError,
- pkcs1.verify, msgfile, signature, pub_key)
diff --git a/tests/test_varblock.py b/tests/test_varblock.py
deleted file mode 100644
index d1c3730..0000000
--- a/tests/test_varblock.py
+++ /dev/null
@@ -1,88 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Copyright 2011 Sybren A. Stüvel <sybren@stuvel.eu>
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# https://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Tests varblock operations."""
-
-try:
- from StringIO import StringIO as BytesIO
-except ImportError:
- from io import BytesIO
-import unittest
-
-from rsa._compat import b
-from rsa import varblock
-
-
-class VarintTest(unittest.TestCase):
- def test_read_varint(self):
- encoded = b('\xac\x02crummy')
- infile = BytesIO(encoded)
-
- (decoded, read) = varblock.read_varint(infile)
-
- # Test the returned values
- self.assertEqual(300, decoded)
- self.assertEqual(2, read)
-
- # The rest of the file should be untouched
- self.assertEqual(b('crummy'), infile.read())
-
- def test_read_zero(self):
- encoded = b('\x00crummy')
- infile = BytesIO(encoded)
-
- (decoded, read) = varblock.read_varint(infile)
-
- # Test the returned values
- self.assertEqual(0, decoded)
- self.assertEqual(1, read)
-
- # The rest of the file should be untouched
- self.assertEqual(b('crummy'), infile.read())
-
- def test_write_varint(self):
- expected = b('\xac\x02')
- outfile = BytesIO()
-
- written = varblock.write_varint(outfile, 300)
-
- # Test the returned values
- self.assertEqual(expected, outfile.getvalue())
- self.assertEqual(2, written)
-
- def test_write_zero(self):
- outfile = BytesIO()
- written = varblock.write_varint(outfile, 0)
-
- # Test the returned values
- self.assertEqual(b('\x00'), outfile.getvalue())
- self.assertEqual(1, written)
-
-
-class VarblockTest(unittest.TestCase):
- def test_yield_varblock(self):
- infile = BytesIO(b('\x01\x0512345\x06Sybren'))
-
- varblocks = list(varblock.yield_varblocks(infile))
- self.assertEqual([b('12345'), b('Sybren')], varblocks)
-
-
-class FixedblockTest(unittest.TestCase):
- def test_yield_fixedblock(self):
- infile = BytesIO(b('123456Sybren'))
-
- fixedblocks = list(varblock.yield_fixedblocks(infile, 6))
- self.assertEqual([b('123456'), b('Sybren')], fixedblocks)