diff options
Diffstat (limited to 'pysnmp')
-rw-r--r-- | pysnmp/crypto/__init__.py | 67 | ||||
-rw-r--r-- | pysnmp/crypto/aes.py | 39 | ||||
-rw-r--r-- | pysnmp/crypto/des.py | 30 | ||||
-rw-r--r-- | pysnmp/crypto/des3.py | 39 | ||||
-rw-r--r-- | pysnmp/proto/secmod/eso/priv/des3.py | 24 | ||||
-rw-r--r-- | pysnmp/proto/secmod/rfc3414/priv/des.py | 22 | ||||
-rw-r--r-- | pysnmp/proto/secmod/rfc3826/priv/aes.py | 23 |
7 files changed, 184 insertions, 60 deletions
diff --git a/pysnmp/crypto/__init__.py b/pysnmp/crypto/__init__.py new file mode 100644 index 00000000..1ed4488d --- /dev/null +++ b/pysnmp/crypto/__init__.py @@ -0,0 +1,67 @@ +"""Backend-independent cryptographic implementations to allow migration to pyca/cryptography +without immediately dropping support for legacy minor Python versions. +""" +from pysnmp.proto import errind, error +CRYPTOGRPAHY = 'cryptography' +CRYPTODOME = 'Cryptodome' +try: + import cryptography + backend = CRYPTOGRPAHY +except ImportError: + try: + import Cryptodome + backend = CRYPTODOME + except ImportError: + backend = None + + +def raise_backend_error(*args, **kwargs): + raise error.StatusInformation( + errorIndication=errind.decryptionError + ) + + +def _cryptodome_encrypt(cipher_factory, plaintext, key, iv): + """""" + encryptor = cipher_factory(key, iv) + return encryptor.encrypt(plaintext) + + +def _cryptodome_decrypt(cipher_factory, ciphertext, key, iv): + """""" + decryptor = cipher_factory(key, iv) + return decryptor.decrypt(ciphertext) + + +def _cryptography_encrypt(cipher_factory, plaintext, key, iv): + """""" + encryptor = cipher_factory(key, iv).encryptor() + return encryptor.update(plaintext) + encryptor.finalize() + + +def _cryptography_decrypt(cipher_factory, ciphertext, key, iv): + """""" + decryptor = cipher_factory(key, iv).decryptor() + return decryptor.update(ciphertext) + decryptor.finalize() + + +_DECRYPT_MAP = { + CRYPTOGRPAHY: _cryptography_decrypt, + CRYPTODOME: _cryptodome_decrypt, + None: raise_backend_error +} +_ENCRYPT_MAP = { + CRYPTOGRPAHY: _cryptography_encrypt, + CRYPTODOME: _cryptodome_encrypt, + None: raise_backend_error +} + + +def generic_encrypt(cipher_factory_map, plaintext, key, iv): + """""" + return _ENCRYPT_MAP[backend](cipher_factory_map[backend], plaintext, key, iv) + + +def generic_decrypt(cipher_factory_map, plaintext, key, iv): + """""" + return _DECRYPT_MAP[backend](cipher_factory_map[backend], plaintext, key, iv) diff --git a/pysnmp/crypto/aes.py b/pysnmp/crypto/aes.py new file mode 100644 index 00000000..44ed831a --- /dev/null +++ b/pysnmp/crypto/aes.py @@ -0,0 +1,39 @@ +"""""" +from pysnmp.crypto import backend, CRYPTODOME, CRYPTOGRPAHY, generic_decrypt, generic_encrypt, raise_backend_error + +if backend == CRYPTOGRPAHY: + from cryptography.hazmat.backends import default_backend + from cryptography.hazmat.primitives.ciphers import algorithms, Cipher, modes +elif backend == CRYPTODOME: + from Cryptodome.Cipher import AES + + +def _cryptodome_cipher(key, iv): + """""" + return AES.new(key, AES.MODE_CFB, iv, segment_size=128) + + +def _cryptography_cipher(key, iv): + """""" + return Cipher( + algorithm=algorithms.AES(key), + mode=modes.CFB(iv), + backend=default_backend() + ) + + +_CIPHER_FACTORY_MAP = { + CRYPTOGRPAHY: _cryptography_cipher, + CRYPTODOME: _cryptodome_cipher, + None: raise_backend_error +} + + +def encrypt(plaintext, key, iv): + """""" + return generic_encrypt(_CIPHER_FACTORY_MAP, plaintext, key, iv) + + +def decrypt(plaintext, key, iv): + """""" + return generic_decrypt(_CIPHER_FACTORY_MAP, plaintext, key, iv) diff --git a/pysnmp/crypto/des.py b/pysnmp/crypto/des.py new file mode 100644 index 00000000..1e6aba01 --- /dev/null +++ b/pysnmp/crypto/des.py @@ -0,0 +1,30 @@ +"""""" +from pysnmp.crypto import backend, CRYPTODOME, CRYPTOGRPAHY, des3, generic_decrypt, generic_encrypt, raise_backend_error + +if backend == CRYPTODOME: + from Cryptodome.Cipher import DES + + +def _cryptodome_cipher(key, iv): + """""" + return DES.new(key, DES.MODE_CBC, iv) + + +_CIPHER_FACTORY_MAP = { + CRYPTODOME: _cryptodome_cipher, + None: raise_backend_error +} + + +def encrypt(plaintext, key, iv): + """""" + if backend == CRYPTOGRPAHY: + return des3.encrypt(plaintext, key * 3, iv) + return generic_encrypt(_CIPHER_FACTORY_MAP, plaintext, key, iv) + + +def decrypt(plaintext, key, iv): + """""" + if backend == CRYPTOGRPAHY: + return des3.decrypt(plaintext, key * 3, iv) + return generic_decrypt(_CIPHER_FACTORY_MAP, plaintext, key, iv) diff --git a/pysnmp/crypto/des3.py b/pysnmp/crypto/des3.py new file mode 100644 index 00000000..2f9b8d6e --- /dev/null +++ b/pysnmp/crypto/des3.py @@ -0,0 +1,39 @@ +"""""" +from pysnmp.crypto import backend, CRYPTODOME, CRYPTOGRPAHY, generic_decrypt, generic_encrypt, raise_backend_error + +if backend == CRYPTOGRPAHY: + from cryptography.hazmat.backends import default_backend + from cryptography.hazmat.primitives.ciphers import algorithms, Cipher, modes +elif backend == CRYPTODOME: + from Cryptodome.Cipher import DES3 + + +def _cryptodome_cipher(key, iv): + """""" + return DES3.new(key, DES3.MODE_CBC, iv) + + +def _cryptography_cipher(key, iv): + """""" + return Cipher( + algorithm=algorithms.TripleDES(key), + mode=modes.CBC(iv), + backend=default_backend() + ) + + +_CIPHER_FACTORY_MAP = { + CRYPTOGRPAHY: _cryptography_cipher, + CRYPTODOME: _cryptodome_cipher, + None: raise_backend_error +} + + +def encrypt(plaintext, key, iv): + """""" + return generic_encrypt(_CIPHER_FACTORY_MAP, plaintext, key, iv) + + +def decrypt(plaintext, key, iv): + """""" + return generic_decrypt(_CIPHER_FACTORY_MAP, plaintext, key, iv) diff --git a/pysnmp/proto/secmod/eso/priv/des3.py b/pysnmp/proto/secmod/eso/priv/des3.py index 426df633..2edfa7a7 100644 --- a/pysnmp/proto/secmod/eso/priv/des3.py +++ b/pysnmp/proto/secmod/eso/priv/des3.py @@ -5,6 +5,7 @@ # License: http://snmplabs.com/pysnmp/license.html # import random +from pysnmp.crypto.des3 import decrypt, encrypt from pysnmp.proto.secmod.rfc3414.priv import base from pysnmp.proto.secmod.rfc3414.auth import hmacmd5, hmacsha from pysnmp.proto.secmod.rfc3414 import localkey @@ -12,7 +13,6 @@ from pysnmp.proto.secmod.rfc7860.auth import hmacsha2 from pysnmp.proto import errind, error from pyasn1.type import univ from pyasn1.compat.octets import null -from math import ceil try: from hashlib import md5, sha1 @@ -23,11 +23,6 @@ except ImportError: md5 = md5.new sha1 = sha.new -try: - from Cryptodome.Cipher import DES3 -except ImportError: - DES3 = None - random.seed() @@ -113,32 +108,21 @@ class Des3(base.AbstractEncryptionService): # 5.1.1.2 def encryptData(self, encryptKey, privParameters, dataToEncrypt): - if DES3 is None: - raise error.StatusInformation( - errorIndication=errind.encryptionError - ) - snmpEngineBoots, snmpEngineTime, salt = privParameters des3Key, salt, iv = self.__getEncryptionKey( encryptKey, snmpEngineBoots ) - des3Obj = DES3.new(des3Key, DES3.MODE_CBC, iv) - privParameters = univ.OctetString(salt) plaintext = dataToEncrypt + univ.OctetString((0,) * (8 - len(dataToEncrypt) % 8)).asOctets() - ciphertext = des3Obj.encrypt(plaintext) + ciphertext = encrypt(plaintext, des3Key, iv) return univ.OctetString(ciphertext), privParameters # 5.1.1.3 def decryptData(self, decryptKey, privParameters, encryptedData): - if DES3 is None: - raise error.StatusInformation( - errorIndication=errind.decryptionError - ) snmpEngineBoots, snmpEngineTime, salt = privParameters if len(salt) != 8: @@ -153,9 +137,7 @@ class Des3(base.AbstractEncryptionService): errorIndication=errind.decryptionError ) - des3Obj = DES3.new(des3Key, DES3.MODE_CBC, iv) - ciphertext = encryptedData.asOctets() - plaintext = des3Obj.decrypt(ciphertext) + plaintext = decrypt(ciphertext, des3Key, iv) return plaintext diff --git a/pysnmp/proto/secmod/rfc3414/priv/des.py b/pysnmp/proto/secmod/rfc3414/priv/des.py index b66889e2..7a46e2af 100644 --- a/pysnmp/proto/secmod/rfc3414/priv/des.py +++ b/pysnmp/proto/secmod/rfc3414/priv/des.py @@ -5,6 +5,7 @@ # License: http://snmplabs.com/pysnmp/license.html # import random +from pysnmp.crypto.des import decrypt, encrypt from pysnmp.proto.secmod.rfc3414.priv import base from pysnmp.proto.secmod.rfc3414.auth import hmacmd5, hmacsha from pysnmp.proto.secmod.rfc3414 import localkey @@ -14,10 +15,6 @@ from pyasn1.type import univ from sys import version_info try: - from Cryptodome.Cipher import DES -except ImportError: - DES = None -try: from hashlib import md5, sha1 except ImportError: import md5 @@ -98,11 +95,6 @@ class Des(base.AbstractEncryptionService): # 8.2.4.1 def encryptData(self, encryptKey, privParameters, dataToEncrypt): - if DES is None: - raise error.StatusInformation( - errorIndication=errind.encryptionError - ) - snmpEngineBoots, snmpEngineTime, salt = privParameters # 8.3.1.1 @@ -114,20 +106,14 @@ class Des(base.AbstractEncryptionService): privParameters = univ.OctetString(salt) # 8.1.1.2 - desObj = DES.new(desKey, DES.MODE_CBC, iv) plaintext = dataToEncrypt + univ.OctetString((0,) * (8 - len(dataToEncrypt) % 8)).asOctets() - ciphertext = desObj.encrypt(plaintext) + ciphertext = encrypt(plaintext, desKey, iv) # 8.3.1.3 & 4 return univ.OctetString(ciphertext), privParameters # 8.2.4.2 def decryptData(self, decryptKey, privParameters, encryptedData): - if DES is None: - raise error.StatusInformation( - errorIndication=errind.decryptionError - ) - snmpEngineBoots, snmpEngineTime, salt = privParameters # 8.3.2.1 @@ -147,7 +133,5 @@ class Des(base.AbstractEncryptionService): errorIndication=errind.decryptionError ) - desObj = DES.new(desKey, DES.MODE_CBC, iv) - # 8.3.2.6 - return desObj.decrypt(encryptedData.asOctets()) + return decrypt(encryptedData.asOctets(), desKey, iv) diff --git a/pysnmp/proto/secmod/rfc3826/priv/aes.py b/pysnmp/proto/secmod/rfc3826/priv/aes.py index c702a418..6ee351ce 100644 --- a/pysnmp/proto/secmod/rfc3826/priv/aes.py +++ b/pysnmp/proto/secmod/rfc3826/priv/aes.py @@ -6,6 +6,7 @@ # import random from pyasn1.type import univ +from pysnmp.crypto.aes import decrypt, encrypt from pysnmp.proto.secmod.rfc3414.priv import base from pysnmp.proto.secmod.rfc3414.auth import hmacmd5, hmacsha from pysnmp.proto.secmod.rfc7860.auth import hmacsha2 @@ -13,10 +14,6 @@ from pysnmp.proto.secmod.rfc3414 import localkey from pysnmp.proto import errind, error try: - from Cryptodome.Cipher import AES -except ImportError: - AES = None -try: from hashlib import md5, sha1 except ImportError: import md5 @@ -102,11 +99,6 @@ class Aes(base.AbstractEncryptionService): # 3.2.4.1 def encryptData(self, encryptKey, privParameters, dataToEncrypt): - if AES is None: - raise error.StatusInformation( - errorIndication=errind.encryptionError - ) - snmpEngineBoots, snmpEngineTime, salt = privParameters # 3.3.1.1 @@ -115,23 +107,16 @@ class Aes(base.AbstractEncryptionService): ) # 3.3.1.3 - aesObj = AES.new(aesKey, AES.MODE_CFB, iv, segment_size=128) - # PyCrypto seems to require padding dataToEncrypt = dataToEncrypt + univ.OctetString((0,) * (16 - len(dataToEncrypt) % 16)).asOctets() - ciphertext = aesObj.encrypt(dataToEncrypt) + ciphertext = encrypt(dataToEncrypt, aesKey, iv) # 3.3.1.4 return univ.OctetString(ciphertext), univ.OctetString(salt) # 3.2.4.2 def decryptData(self, decryptKey, privParameters, encryptedData): - if AES is None: - raise error.StatusInformation( - errorIndication=errind.decryptionError - ) - snmpEngineBoots, snmpEngineTime, salt = privParameters # 3.3.2.1 @@ -145,10 +130,8 @@ class Aes(base.AbstractEncryptionService): decryptKey, snmpEngineBoots, snmpEngineTime, salt ) - aesObj = AES.new(aesKey, AES.MODE_CFB, iv, segment_size=128) - # PyCrypto seems to require padding encryptedData = encryptedData + univ.OctetString((0,) * (16 - len(encryptedData) % 16)).asOctets() # 3.3.2.4-6 - return aesObj.decrypt(encryptedData.asOctets()) + return decrypt(encryptedData.asOctets(), aesKey, iv) |