diff options
Diffstat (limited to 'pysnmp/proto/secmod')
-rw-r--r-- | pysnmp/proto/secmod/cache.py | 12 | ||||
-rw-r--r-- | pysnmp/proto/secmod/eso/priv/aes192.py | 6 | ||||
-rw-r--r-- | pysnmp/proto/secmod/eso/priv/aes256.py | 6 | ||||
-rw-r--r-- | pysnmp/proto/secmod/eso/priv/aesbase.py | 22 | ||||
-rw-r--r-- | pysnmp/proto/secmod/eso/priv/des3.py | 51 | ||||
-rw-r--r-- | pysnmp/proto/secmod/rfc2576.py | 349 | ||||
-rw-r--r-- | pysnmp/proto/secmod/rfc3414/auth/hmacmd5.py | 38 | ||||
-rw-r--r-- | pysnmp/proto/secmod/rfc3414/auth/hmacsha.py | 37 | ||||
-rw-r--r-- | pysnmp/proto/secmod/rfc3414/localkey.py | 15 | ||||
-rw-r--r-- | pysnmp/proto/secmod/rfc3414/priv/des.py | 53 | ||||
-rw-r--r-- | pysnmp/proto/secmod/rfc3414/service.py | 732 | ||||
-rw-r--r-- | pysnmp/proto/secmod/rfc3826/priv/aes.py | 102 | ||||
-rw-r--r-- | pysnmp/proto/secmod/rfc7860/auth/hmacsha2.py | 64 |
13 files changed, 892 insertions, 595 deletions
diff --git a/pysnmp/proto/secmod/cache.py b/pysnmp/proto/secmod/cache.py index f5843c50..1859bbad 100644 --- a/pysnmp/proto/secmod/cache.py +++ b/pysnmp/proto/secmod/cache.py @@ -21,10 +21,8 @@ class Cache(object): def pop(self, stateReference): if stateReference in self.__cacheEntries: - securityData = self.__cacheEntries[stateReference] - else: - raise error.ProtocolError( - 'Cache miss for stateReference=%s at %s' % (stateReference, self) - ) - del self.__cacheEntries[stateReference] - return securityData + return self.__cacheEntries.pop(stateReference) + + raise error.ProtocolError( + 'Cache miss for stateReference=%s at ' + '%s' % (stateReference, self)) diff --git a/pysnmp/proto/secmod/eso/priv/aes192.py b/pysnmp/proto/secmod/eso/priv/aes192.py index 7df0a0f5..cbbc748e 100644 --- a/pysnmp/proto/secmod/eso/priv/aes192.py +++ b/pysnmp/proto/secmod/eso/priv/aes192.py @@ -14,7 +14,8 @@ class AesBlumenthal192(aesbase.AbstractAesBlumenthal): http://tools.ietf.org/html/draft-blumenthal-aes-usm-04 """ - SERVICE_ID = (1, 3, 6, 1, 4, 1, 9, 12, 6, 1, 1) # cusmAESCfb192PrivProtocol + # cusmAESCfb192PrivProtocol + SERVICE_ID = (1, 3, 6, 1, 4, 1, 9, 12, 6, 1, 1) KEY_SIZE = 24 @@ -29,5 +30,6 @@ class Aes192(aesbase.AbstractAesReeder): Known to be used by many vendors including Cisco and others. """ - SERVICE_ID = (1, 3, 6, 1, 4, 1, 9, 12, 6, 1, 101) # cusmAESCfb192PrivProtocol (non-standard OID) + # cusmAESCfb192PrivProtocol (non-standard OID) + SERVICE_ID = (1, 3, 6, 1, 4, 1, 9, 12, 6, 1, 101) KEY_SIZE = 24 diff --git a/pysnmp/proto/secmod/eso/priv/aes256.py b/pysnmp/proto/secmod/eso/priv/aes256.py index 94a255ff..7ca42504 100644 --- a/pysnmp/proto/secmod/eso/priv/aes256.py +++ b/pysnmp/proto/secmod/eso/priv/aes256.py @@ -12,7 +12,8 @@ class AesBlumenthal256(aesbase.AbstractAesBlumenthal): http://tools.ietf.org/html/draft-blumenthal-aes-usm-04 """ - SERVICE_ID = (1, 3, 6, 1, 4, 1, 9, 12, 6, 1, 2) # cusmAESCfb256PrivProtocol + # cusmAESCfb256PrivProtocol + SERVICE_ID = (1, 3, 6, 1, 4, 1, 9, 12, 6, 1, 2) KEY_SIZE = 32 @@ -27,5 +28,6 @@ class Aes256(aesbase.AbstractAesReeder): Known to be used by many vendors including Cisco and others. """ - SERVICE_ID = (1, 3, 6, 1, 4, 1, 9, 12, 6, 1, 102) # cusmAESCfb256PrivProtocol (non-standard OID) + # cusmAESCfb256PrivProtocol (non-standard OID) + SERVICE_ID = (1, 3, 6, 1, 4, 1, 9, 12, 6, 1, 102) KEY_SIZE = 32 diff --git a/pysnmp/proto/secmod/eso/priv/aesbase.py b/pysnmp/proto/secmod/eso/priv/aesbase.py index 0c2e5d17..3af30df0 100644 --- a/pysnmp/proto/secmod/eso/priv/aesbase.py +++ b/pysnmp/proto/secmod/eso/priv/aesbase.py @@ -24,19 +24,25 @@ class AbstractAesBlumenthal(aes.Aes): def localizeKey(self, authProtocol, privKey, snmpEngineID): if authProtocol == hmacmd5.HmacMd5.SERVICE_ID: hashAlgo = md5 + elif authProtocol == hmacsha.HmacSha.SERVICE_ID: hashAlgo = sha1 + elif authProtocol in hmacsha2.HmacSha2.HASH_ALGORITHM: hashAlgo = hmacsha2.HmacSha2.HASH_ALGORITHM[authProtocol] + else: raise error.ProtocolError( - 'Unknown auth protocol %s' % (authProtocol,) - ) + 'Unknown auth protocol %s' % (authProtocol,)) - localPrivKey = localkey.localizeKey(privKey, snmpEngineID, hashAlgo) + localPrivKey = localkey.localizeKey( + privKey, snmpEngineID, hashAlgo) - # now extend this key if too short by repeating steps that includes the hashPassphrase step - for count in range(1, int(ceil(self.KEY_SIZE * 1.0 / len(localPrivKey)))): + # now extend this key if too short by repeating steps that includes + # the hashPassphrase step + rounds = int(ceil(self.KEY_SIZE * 1.0 / len(localPrivKey))) + + for _ in range(1, rounds): localPrivKey += hashAlgo(localPrivKey).digest() return localPrivKey[:self.KEY_SIZE] @@ -64,14 +70,16 @@ class AbstractAesReeder(aes.Aes): def localizeKey(self, authProtocol, privKey, snmpEngineID): if authProtocol == hmacmd5.HmacMd5.SERVICE_ID: hashAlgo = md5 + elif authProtocol == hmacsha.HmacSha.SERVICE_ID: hashAlgo = sha1 + elif authProtocol in hmacsha2.HmacSha2.HASH_ALGORITHM: hashAlgo = hmacsha2.HmacSha2.HASH_ALGORITHM[authProtocol] + else: raise error.ProtocolError( - 'Unknown auth protocol %s' % (authProtocol,) - ) + 'Unknown auth protocol %s' % (authProtocol,)) localPrivKey = localkey.localizeKey(privKey, snmpEngineID, hashAlgo) diff --git a/pysnmp/proto/secmod/eso/priv/des3.py b/pysnmp/proto/secmod/eso/priv/des3.py index da84475b..8d193084 100644 --- a/pysnmp/proto/secmod/eso/priv/des3.py +++ b/pysnmp/proto/secmod/eso/priv/des3.py @@ -34,7 +34,8 @@ class Des3(base.AbstractEncryptionService): https://tools.ietf.org/html/draft-reeder-snmpv3-usm-3desede-00 """ - SERVICE_ID = (1, 3, 6, 1, 6, 3, 10, 1, 2, 3) # usm3DESEDEPrivProtocol + # usm3DESEDEPrivProtocol + SERVICE_ID = (1, 3, 6, 1, 6, 3, 10, 1, 2, 3) KEY_SIZE = 32 local_int = random.randrange(0, 0xffffffff) @@ -42,28 +43,34 @@ class Des3(base.AbstractEncryptionService): def hashPassphrase(self, authProtocol, privKey): if authProtocol == hmacmd5.HmacMd5.SERVICE_ID: hashAlgo = md5 + elif authProtocol == hmacsha.HmacSha.SERVICE_ID: hashAlgo = sha1 + elif authProtocol in hmacsha2.HmacSha2.HASH_ALGORITHM: hashAlgo = hmacsha2.HmacSha2.HASH_ALGORITHM[authProtocol] + else: raise error.ProtocolError( - 'Unknown auth protocol %s' % (authProtocol,) - ) + 'Unknown auth protocol %s' % (authProtocol,)) + return localkey.hashPassphrase(privKey, hashAlgo) # 2.1 def localizeKey(self, authProtocol, privKey, snmpEngineID): if authProtocol == hmacmd5.HmacMd5.SERVICE_ID: hashAlgo = md5 + elif authProtocol == hmacsha.HmacSha.SERVICE_ID: hashAlgo = sha1 + elif authProtocol in hmacsha2.HmacSha2.HASH_ALGORITHM: hashAlgo = hmacsha2.HmacSha2.HASH_ALGORITHM[authProtocol] + else: raise error.ProtocolError( - 'Unknown auth protocol %s' % (authProtocol,) - ) + 'Unknown auth protocol %s' % (authProtocol,)) + localPrivKey = localkey.localizeKey(privKey, snmpEngineID, hashAlgo) # now extend this key if too short by repeating steps that includes the hashPassphrase step @@ -75,7 +82,7 @@ class Des3(base.AbstractEncryptionService): return localPrivKey[:self.KEY_SIZE] # 5.1.1.1 - def __getEncryptionKey(self, privKey, snmpEngineBoots): + def _getEncryptionKey(self, privKey, snmpEngineBoots): # 5.1.1.1.1 des3Key = privKey[:24] preIV = privKey[24:32] @@ -94,39 +101,46 @@ class Des3(base.AbstractEncryptionService): ] if self.local_int == 0xffffffff: self.local_int = 0 + else: self.local_int += 1 # salt not yet hashed XXX + iv = map(lambda x, y: x ^ y, salt, preIV.asNumbers()) + return (des3Key.asOctets(), univ.OctetString(salt).asOctets(), - univ.OctetString(map(lambda x, y: x ^ y, salt, preIV.asNumbers())).asOctets()) + univ.OctetString(iv).asOctets()) @staticmethod - def __getDecryptionKey(privKey, salt): + def _getDecryptionKey(privKey, salt): + iv = map( + lambda x, y: x ^ y, salt.asNumbers(), privKey[24:32].asNumbers()) + return (privKey[:24].asOctets(), - univ.OctetString(map(lambda x, y: x ^ y, salt.asNumbers(), privKey[24:32].asNumbers())).asOctets()) + univ.OctetString(iv).asOctets()) # 5.1.1.2 def encryptData(self, encryptKey, privParameters, dataToEncrypt): snmpEngineBoots, snmpEngineTime, salt = privParameters - des3Key, salt, iv = self.__getEncryptionKey( + des3Key, salt, iv = self._getEncryptionKey( encryptKey, snmpEngineBoots ) privParameters = univ.OctetString(salt) - plaintext = dataToEncrypt + univ.OctetString((0,) * (8 - len(dataToEncrypt) % 8)).asOctets() + plaintext = dataToEncrypt + plaintext += univ.OctetString( + (0,) * (8 - len(dataToEncrypt) % 8)).asOctets() try: ciphertext = des3.encrypt(plaintext, des3Key, iv) except PysnmpCryptoError: raise error.StatusInformation( - errorIndication=errind.unsupportedPrivProtocol - ) + errorIndication=errind.unsupportedPrivProtocol) return univ.OctetString(ciphertext), privParameters @@ -136,15 +150,13 @@ class Des3(base.AbstractEncryptionService): if len(salt) != 8: raise error.StatusInformation( - errorIndication=errind.decryptionError - ) + errorIndication=errind.decryptionError) - des3Key, iv = self.__getDecryptionKey(decryptKey, salt) + des3Key, iv = self._getDecryptionKey(decryptKey, salt) if len(encryptedData) % 8 != 0: raise error.StatusInformation( - errorIndication=errind.decryptionError - ) + errorIndication=errind.decryptionError) ciphertext = encryptedData.asOctets() @@ -153,7 +165,6 @@ class Des3(base.AbstractEncryptionService): except PysnmpCryptoError: raise error.StatusInformation( - errorIndication=errind.unsupportedPrivProtocol - ) + errorIndication=errind.unsupportedPrivProtocol) return plaintext diff --git a/pysnmp/proto/secmod/rfc2576.py b/pysnmp/proto/secmod/rfc2576.py index ba11b0fe..92b5eac2 100644 --- a/pysnmp/proto/secmod/rfc2576.py +++ b/pysnmp/proto/secmod/rfc2576.py @@ -4,9 +4,6 @@ # Copyright (c) 2005-2019, Ilya Etingof <etingof@gmail.com> # License: http://snmplabs.com/pysnmp/license.html # -from pyasn1.codec.ber import encoder -from pyasn1.error import PyAsn1Error - from pysnmp import debug from pysnmp.carrier.asyncore.dgram import udp from pysnmp.carrier.asyncore.dgram import udp6 @@ -15,6 +12,9 @@ from pysnmp.proto import error from pysnmp.proto.secmod import base from pysnmp.smi.error import NoSuchInstanceError +from pyasn1.codec.ber import encoder +from pyasn1.error import PyAsn1Error + class SnmpV1SecurityModel(base.AbstractSecurityModel): SECURITY_MODEL_ID = 1 @@ -26,60 +26,69 @@ class SnmpV1SecurityModel(base.AbstractSecurityModel): # in here. def __init__(self): - self.__transportBranchId = self.__paramsBranchId = self.__communityBranchId = self.__securityBranchId = -1 + self._transportBranchId = -1 + self._paramsBranchId = -1 + self._communityBranchId = -1 + self._securityBranchId = -1 + base.AbstractSecurityModel.__init__(self) def _sec2com(self, snmpEngine, securityName, contextEngineId, contextName): - snmpTargetParamsSecurityName, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols( + mibBuilder = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder + + snmpTargetParamsSecurityName, = mibBuilder.importSymbols( 'SNMP-TARGET-MIB', 'snmpTargetParamsSecurityName') - if self.__paramsBranchId != snmpTargetParamsSecurityName.branchVersionId: - snmpTargetParamsSecurityModel, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols( + + if self._paramsBranchId != snmpTargetParamsSecurityName.branchVersionId: + snmpTargetParamsSecurityModel, = mibBuilder.importSymbols( 'SNMP-TARGET-MIB', 'snmpTargetParamsSecurityModel') - self.__nameToModelMap = {} + self._nameToModelMap = {} nextMibNode = snmpTargetParamsSecurityName while True: try: - nextMibNode = snmpTargetParamsSecurityName.getNextNode(nextMibNode.name) + nextMibNode = snmpTargetParamsSecurityName.getNextNode( + nextMibNode.name) except NoSuchInstanceError: break instId = nextMibNode.name[len(snmpTargetParamsSecurityName.name):] - mibNode = snmpTargetParamsSecurityModel.getNode(snmpTargetParamsSecurityModel.name + instId) + mibNode = snmpTargetParamsSecurityModel.getNode( + snmpTargetParamsSecurityModel.name + instId) try: - if mibNode.syntax not in self.__nameToModelMap: - self.__nameToModelMap[nextMibNode.syntax] = set() + if mibNode.syntax not in self._nameToModelMap: + self._nameToModelMap[nextMibNode.syntax] = set() - self.__nameToModelMap[nextMibNode.syntax].add(mibNode.syntax) + self._nameToModelMap[nextMibNode.syntax].add(mibNode.syntax) except PyAsn1Error: debug.logger & debug.FLAG_SM and debug.logger( - '_sec2com: table entries %r/%r hashing failed' % ( - nextMibNode.syntax, mibNode.syntax) - ) + '_sec2com: table entries %r/%r hashing ' + 'failed' % (nextMibNode.syntax, mibNode.syntax)) continue - self.__paramsBranchId = snmpTargetParamsSecurityName.branchVersionId + self._paramsBranchId = snmpTargetParamsSecurityName.branchVersionId # invalidate next map as it include this one - self.__securityBranchId = -1 + self._securityBranchId = -1 - snmpCommunityName, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('SNMP-COMMUNITY-MIB', - 'snmpCommunityName') - if self.__securityBranchId != snmpCommunityName.branchVersionId: + snmpCommunityName, = mibBuilder.importSymbols( + 'SNMP-COMMUNITY-MIB', 'snmpCommunityName') + + if self._securityBranchId != snmpCommunityName.branchVersionId: (snmpCommunitySecurityName, snmpCommunityContextEngineId, - snmpCommunityContextName) = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols( + snmpCommunityContextName) = mibBuilder.importSymbols( 'SNMP-COMMUNITY-MIB', 'snmpCommunitySecurityName', 'snmpCommunityContextEngineID', 'snmpCommunityContextName' ) - self.__securityMap = {} + self._securityMap = {} nextMibNode = snmpCommunityName @@ -92,54 +101,55 @@ class SnmpV1SecurityModel(base.AbstractSecurityModel): instId = nextMibNode.name[len(snmpCommunityName.name):] - _securityName = snmpCommunitySecurityName.getNode(snmpCommunitySecurityName.name + instId).syntax + _securityName = snmpCommunitySecurityName.getNode( + snmpCommunitySecurityName.name + instId).syntax _contextEngineId = snmpCommunityContextEngineId.getNode( snmpCommunityContextEngineId.name + instId).syntax - _contextName = snmpCommunityContextName.getNode(snmpCommunityContextName.name + instId).syntax + _contextName = snmpCommunityContextName.getNode( + snmpCommunityContextName.name + instId).syntax + + key = _securityName, _contextEngineId, _contextName try: - self.__securityMap[(_securityName, - _contextEngineId, - _contextName)] = nextMibNode.syntax + self._securityMap[key] = nextMibNode.syntax except PyAsn1Error: debug.logger & debug.FLAG_SM and debug.logger( - '_sec2com: table entries %r/%r/%r hashing failed' % ( - _securityName, _contextEngineId, _contextName) - ) + '_sec2com: table entries %r/%r/%r hashing failed' % key) continue - self.__securityBranchId = snmpCommunityName.branchVersionId + self._securityBranchId = snmpCommunityName.branchVersionId debug.logger & debug.FLAG_SM and debug.logger( - '_sec2com: built securityName to communityName map, version %s: %s' % ( - self.__securityBranchId, self.__securityMap)) + '_sec2com: built securityName to communityName map, version ' + '%s: %s' % (self._securityBranchId, self._securityMap)) + + key = securityName, contextEngineId, contextName try: - return self.__securityMap[(securityName, - contextEngineId, - contextName)] + return self._securityMap[key] except KeyError: raise error.StatusInformation( - errorIndication=errind.unknownCommunityName - ) + errorIndication=errind.unknownCommunityName) def _com2sec(self, snmpEngine, communityName, transportInformation): - snmpTargetAddrTAddress, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols( + mibBuilder = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder + + snmpTargetAddrTAddress, = mibBuilder.importSymbols( 'SNMP-TARGET-MIB', 'snmpTargetAddrTAddress') - if self.__transportBranchId != snmpTargetAddrTAddress.branchVersionId: + + if self._transportBranchId != snmpTargetAddrTAddress.branchVersionId: (SnmpTagValue, snmpTargetAddrTDomain, - snmpTargetAddrTagList) = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols( + snmpTargetAddrTagList) = mibBuilder.importSymbols( 'SNMP-TARGET-MIB', 'SnmpTagValue', 'snmpTargetAddrTDomain', - 'snmpTargetAddrTagList' - ) + 'snmpTargetAddrTagList') - self.__emptyTag = SnmpTagValue('') + self._emptyTag = SnmpTagValue('') - self.__transportToTagMap = {} + self._transportToTagMap = {} nextMibNode = snmpTargetAddrTagList @@ -151,56 +161,64 @@ class SnmpV1SecurityModel(base.AbstractSecurityModel): break instId = nextMibNode.name[len(snmpTargetAddrTagList.name):] - targetAddrTDomain = snmpTargetAddrTDomain.getNode(snmpTargetAddrTDomain.name + instId).syntax - targetAddrTAddress = snmpTargetAddrTAddress.getNode(snmpTargetAddrTAddress.name + instId).syntax + + targetAddrTDomain = snmpTargetAddrTDomain.getNode( + snmpTargetAddrTDomain.name + instId).syntax + targetAddrTAddress = snmpTargetAddrTAddress.getNode( + snmpTargetAddrTAddress.name + instId).syntax targetAddrTDomain = tuple(targetAddrTDomain) - if targetAddrTDomain[:len(udp.SNMP_UDP_DOMAIN)] == udp.SNMP_UDP_DOMAIN: - SnmpUDPAddress, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('SNMPv2-TM', - 'SnmpUDPAddress') + if (targetAddrTDomain[:len(udp.SNMP_UDP_DOMAIN)] == + udp.SNMP_UDP_DOMAIN): + SnmpUDPAddress, = mibBuilder.importSymbols( + 'SNMPv2-TM', 'SnmpUDPAddress') targetAddrTAddress = tuple(SnmpUDPAddress(targetAddrTAddress)) - elif targetAddrTDomain[:len(udp6.SNMP_UDP6_DOMAIN)] == udp6.SNMP_UDP6_DOMAIN: - TransportAddressIPv6, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols( + + elif (targetAddrTDomain[:len(udp6.SNMP_UDP6_DOMAIN)] == + udp6.SNMP_UDP6_DOMAIN): + TransportAddressIPv6, = mibBuilder.importSymbols( 'TRANSPORT-ADDRESS-MIB', 'TransportAddressIPv6') + targetAddrTAddress = tuple(TransportAddressIPv6(targetAddrTAddress)) targetAddr = targetAddrTDomain, targetAddrTAddress - targetAddrTagList = snmpTargetAddrTagList.getNode(snmpTargetAddrTagList.name + instId).syntax - if targetAddr not in self.__transportToTagMap: - self.__transportToTagMap[targetAddr] = set() + targetAddrTagList = snmpTargetAddrTagList.getNode( + snmpTargetAddrTagList.name + instId).syntax + + if targetAddr not in self._transportToTagMap: + self._transportToTagMap[targetAddr] = set() try: if targetAddrTagList: - self.__transportToTagMap[targetAddr].update( + self._transportToTagMap[targetAddr].update( [SnmpTagValue(x) - for x in targetAddrTagList.asOctets().split()] - ) + for x in targetAddrTagList.asOctets().split()]) else: - self.__transportToTagMap[targetAddr].add(self.__emptyTag) + self._transportToTagMap[targetAddr].add(self._emptyTag) except PyAsn1Error: debug.logger & debug.FLAG_SM and debug.logger( '_com2sec: table entries %r/%r hashing failed' % ( - targetAddr, targetAddrTagList) - ) + targetAddr, targetAddrTagList)) continue - self.__transportBranchId = snmpTargetAddrTAddress.branchVersionId + self._transportBranchId = snmpTargetAddrTAddress.branchVersionId - debug.logger & debug.FLAG_SM and debug.logger('_com2sec: built transport-to-tag map version %s: %s' % ( - self.__transportBranchId, self.__transportToTagMap)) + debug.logger & debug.FLAG_SM and debug.logger( + '_com2sec: built transport-to-tag map version %s: ' + '%s' % (self._transportBranchId, self._transportToTagMap)) - snmpTargetParamsSecurityName, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols( + snmpTargetParamsSecurityName, = mibBuilder.importSymbols( 'SNMP-TARGET-MIB', 'snmpTargetParamsSecurityName') - if self.__paramsBranchId != snmpTargetParamsSecurityName.branchVersionId: - snmpTargetParamsSecurityModel, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols( + if self._paramsBranchId != snmpTargetParamsSecurityName.branchVersionId: + snmpTargetParamsSecurityModel, = mibBuilder.importSymbols( 'SNMP-TARGET-MIB', 'snmpTargetParamsSecurityModel') - self.__nameToModelMap = {} + self._nameToModelMap = {} nextMibNode = snmpTargetParamsSecurityName @@ -213,43 +231,44 @@ class SnmpV1SecurityModel(base.AbstractSecurityModel): instId = nextMibNode.name[len(snmpTargetParamsSecurityName.name):] - mibNode = snmpTargetParamsSecurityModel.getNode(snmpTargetParamsSecurityModel.name + instId) + mibNode = snmpTargetParamsSecurityModel.getNode( + snmpTargetParamsSecurityModel.name + instId) try: - if nextMibNode.syntax not in self.__nameToModelMap: - self.__nameToModelMap[nextMibNode.syntax] = set() + if nextMibNode.syntax not in self._nameToModelMap: + self._nameToModelMap[nextMibNode.syntax] = set() - self.__nameToModelMap[nextMibNode.syntax].add(mibNode.syntax) + self._nameToModelMap[nextMibNode.syntax].add(mibNode.syntax) except PyAsn1Error: debug.logger & debug.FLAG_SM and debug.logger( - '_com2sec: table entries %r/%r hashing failed' % ( - nextMibNode.syntax, mibNode.syntax) - ) + '_com2sec: table entries %r/%r hashing ' + 'failed' % (nextMibNode.syntax, mibNode.syntax)) continue - self.__paramsBranchId = snmpTargetParamsSecurityName.branchVersionId + self._paramsBranchId = snmpTargetParamsSecurityName.branchVersionId # invalidate next map as it include this one - self.__communityBranchId = -1 + self._communityBranchId = -1 debug.logger & debug.FLAG_SM and debug.logger( - '_com2sec: built securityName to securityModel map, version %s: %s' % ( - self.__paramsBranchId, self.__nameToModelMap)) + '_com2sec: built securityName to securityModel map, version ' + '%s: %s' % (self._paramsBranchId, self._nameToModelMap)) + + snmpCommunityName, = mibBuilder.importSymbols( + 'SNMP-COMMUNITY-MIB', 'snmpCommunityName') - snmpCommunityName, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('SNMP-COMMUNITY-MIB', - 'snmpCommunityName') - if self.__communityBranchId != snmpCommunityName.branchVersionId: + if self._communityBranchId != snmpCommunityName.branchVersionId: (snmpCommunitySecurityName, snmpCommunityContextEngineId, snmpCommunityContextName, - snmpCommunityTransportTag) = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols( + snmpCommunityTransportTag) = mibBuilder.importSymbols( 'SNMP-COMMUNITY-MIB', 'snmpCommunitySecurityName', 'snmpCommunityContextEngineID', 'snmpCommunityContextName', 'snmpCommunityTransportTag' ) - self.__communityToTagMap = {} - self.__tagAndCommunityToSecurityMap = {} + self._communityToTagMap = {} + self._tagAndCommunityToSecurityMap = {} nextMibNode = snmpCommunityName @@ -262,100 +281,125 @@ class SnmpV1SecurityModel(base.AbstractSecurityModel): instId = nextMibNode.name[len(snmpCommunityName.name):] - securityName = snmpCommunitySecurityName.getNode(snmpCommunitySecurityName.name + instId).syntax + securityName = snmpCommunitySecurityName.getNode( + snmpCommunitySecurityName.name + instId).syntax contextEngineId = snmpCommunityContextEngineId.getNode( snmpCommunityContextEngineId.name + instId).syntax - contextName = snmpCommunityContextName.getNode(snmpCommunityContextName.name + instId).syntax + contextName = snmpCommunityContextName.getNode( + snmpCommunityContextName.name + instId).syntax - transportTag = snmpCommunityTransportTag.getNode(snmpCommunityTransportTag.name + instId).syntax + transportTag = snmpCommunityTransportTag.getNode( + snmpCommunityTransportTag.name + instId).syntax _tagAndCommunity = transportTag, nextMibNode.syntax try: - if _tagAndCommunity not in self.__tagAndCommunityToSecurityMap: - self.__tagAndCommunityToSecurityMap[_tagAndCommunity] = set() + if _tagAndCommunity not in self._tagAndCommunityToSecurityMap: + self._tagAndCommunityToSecurityMap[_tagAndCommunity] = set() - self.__tagAndCommunityToSecurityMap[_tagAndCommunity].add( - (securityName, contextEngineId, contextName) - ) + self._tagAndCommunityToSecurityMap[_tagAndCommunity].add( + (securityName, contextEngineId, contextName)) - if nextMibNode.syntax not in self.__communityToTagMap: - self.__communityToTagMap[nextMibNode.syntax] = set() + if nextMibNode.syntax not in self._communityToTagMap: + self._communityToTagMap[nextMibNode.syntax] = set() - self.__communityToTagMap[nextMibNode.syntax].add(transportTag) + self._communityToTagMap[nextMibNode.syntax].add(transportTag) except PyAsn1Error: debug.logger & debug.FLAG_SM and debug.logger( - '_com2sec: table entries %r/%r hashing failed' % ( - _tagAndCommunity, nextMibNode.syntax) - ) + '_com2sec: table entries %r/%r hashing ' + 'failed' % (_tagAndCommunity, nextMibNode.syntax)) continue - self.__communityBranchId = snmpCommunityName.branchVersionId + self._communityBranchId = snmpCommunityName.branchVersionId debug.logger & debug.FLAG_SM and debug.logger( - '_com2sec: built communityName to tag map (securityModel %s), version %s: %s' % ( - self.SECURITY_MODEL_ID, self.__communityBranchId, self.__communityToTagMap)) + '_com2sec: built communityName to tag map ' + '(securityModel %s), version %s: ' + '%s' % (self.SECURITY_MODEL_ID, self._communityBranchId, + self._communityToTagMap)) debug.logger & debug.FLAG_SM and debug.logger( - '_com2sec: built tag & community to securityName map (securityModel %s), version %s: %s' % ( - self.SECURITY_MODEL_ID, self.__communityBranchId, self.__tagAndCommunityToSecurityMap)) - - if communityName in self.__communityToTagMap: - if transportInformation in self.__transportToTagMap: - tags = self.__transportToTagMap[transportInformation].intersection( - self.__communityToTagMap[communityName]) - elif self.__emptyTag in self.__communityToTagMap[communityName]: - tags = [self.__emptyTag] + '_com2sec: built tag & community to securityName map ' + '(securityModel %s), version %s: ' + '%s' % (self.SECURITY_MODEL_ID, self._communityBranchId, + self._tagAndCommunityToSecurityMap)) + + if communityName in self._communityToTagMap: + if transportInformation in self._transportToTagMap: + tags = self._transportToTagMap[transportInformation].intersection( + self._communityToTagMap[communityName]) + + elif self._emptyTag in self._communityToTagMap[communityName]: + tags = [self._emptyTag] + else: - raise error.StatusInformation(errorIndication=errind.unknownCommunityName) + raise error.StatusInformation( + errorIndication=errind.unknownCommunityName) candidateSecurityNames = [] - for x in [self.__tagAndCommunityToSecurityMap[(t, communityName)] for t in tags]: + securityNamesSets = [ + self._tagAndCommunityToSecurityMap[(t, communityName)] + for t in tags + ] + + for x in securityNamesSets: candidateSecurityNames.extend(list(x)) - # 5.2.1 (row selection in snmpCommunityTable) - # Picks first match but favors entries already in targets table if candidateSecurityNames: - candidateSecurityNames.sort( - key=lambda x, m=self.__nameToModelMap, v=self.SECURITY_MODEL_ID: ( - not int(x[0] in m and v in m[x[0]]), str(x[0])) - ) + candidateSecurityNames.sort(key=self._orderSecurityNames) + chosenSecurityName = candidateSecurityNames[0] # min() + debug.logger & debug.FLAG_SM and debug.logger( - '_com2sec: securityName candidates for communityName \'%s\' are %s; choosing securityName \'%s\'' % ( - communityName, candidateSecurityNames, chosenSecurityName[0])) + '_com2sec: securityName candidates for communityName %s ' + 'are %s; choosing securityName ' + '%s' % (communityName, candidateSecurityNames, + chosenSecurityName[0])) + return chosenSecurityName - raise error.StatusInformation(errorIndication=errind.unknownCommunityName) + raise error.StatusInformation( + errorIndication=errind.unknownCommunityName) + + # 5.2.1 (row selection in snmpCommunityTable) + # Picks first match but favors entries already in targets table + def _orderSecurityNames(self, securityName): + return (not int(securityName[0] in self._nameToModelMap and + self.SECURITY_MODEL_ID in self._nameToModelMap[securityName[0]]), + str(securityName[0])) def generateRequestMsg(self, snmpEngine, messageProcessingModel, globalData, maxMessageSize, securityModel, securityEngineId, securityName, securityLevel, scopedPDU): msg, = globalData + contextEngineId, contextName, pdu = scopedPDU # rfc2576: 5.2.3 - communityName = self._sec2com(snmpEngine, securityName, - contextEngineId, contextName) + communityName = self._sec2com( + snmpEngine, securityName, contextEngineId, contextName) debug.logger & debug.FLAG_SM and debug.logger( - 'generateRequestMsg: using community %r for securityModel %r, securityName %r, contextEngineId %r contextName %r' % ( - communityName, securityModel, securityName, contextEngineId, contextName)) + 'generateRequestMsg: using community %r for securityModel %r, ' + 'securityName %r, contextEngineId %r contextName ' + '%r' % (communityName, securityModel, securityName, + contextEngineId, contextName)) securityParameters = communityName msg.setComponentByPosition(1, securityParameters) msg.setComponentByPosition(2) msg.getComponentByPosition(2).setComponentByType( - pdu.tagSet, pdu, verifyConstraints=False, matchTags=False, matchConstraints=False - ) + pdu.tagSet, pdu, verifyConstraints=False, matchTags=False, + matchConstraints=False) - debug.logger & debug.FLAG_MP and debug.logger('generateRequestMsg: %s' % (msg.prettyPrint(),)) + debug.logger & debug.FLAG_MP and debug.logger( + 'generateRequestMsg: %s' % (msg.prettyPrint(),)) try: return securityParameters, encoder.encode(msg) @@ -363,7 +407,9 @@ class SnmpV1SecurityModel(base.AbstractSecurityModel): except PyAsn1Error as exc: debug.logger & debug.FLAG_MP and debug.logger( 'generateRequestMsg: serialization failure: %s' % exc) - raise error.StatusInformation(errorIndication=errind.serializationError) + + raise error.StatusInformation( + errorIndication=errind.serializationError) def generateResponseMsg(self, snmpEngine, messageProcessingModel, globalData, maxMessageSize, securityModel, @@ -371,21 +417,26 @@ class SnmpV1SecurityModel(base.AbstractSecurityModel): scopedPDU, securityStateReference): # rfc2576: 5.2.2 msg, = globalData + contextEngineId, contextName, pdu = scopedPDU + cachedSecurityData = self._cache.pop(securityStateReference) + communityName = cachedSecurityData['communityName'] debug.logger & debug.FLAG_SM and debug.logger( - 'generateResponseMsg: recovered community %r by securityStateReference %s' % ( - communityName, securityStateReference)) + 'generateResponseMsg: recovered community %r by ' + 'securityStateReference ' + '%s' % (communityName, securityStateReference)) msg.setComponentByPosition(1, communityName) msg.setComponentByPosition(2) msg.getComponentByPosition(2).setComponentByType( - pdu.tagSet, pdu, verifyConstraints=False, matchTags=False, matchConstraints=False - ) + pdu.tagSet, pdu, verifyConstraints=False, matchTags=False, + matchConstraints=False) - debug.logger & debug.FLAG_MP and debug.logger('generateResponseMsg: %s' % (msg.prettyPrint(),)) + debug.logger & debug.FLAG_MP and debug.logger( + 'generateResponseMsg: %s' % (msg.prettyPrint(),)) try: return communityName, encoder.encode(msg) @@ -393,11 +444,14 @@ class SnmpV1SecurityModel(base.AbstractSecurityModel): except PyAsn1Error as exc: debug.logger & debug.FLAG_MP and debug.logger( 'generateResponseMsg: serialization failure: %s' % exc) + raise error.StatusInformation(errorIndication=errind.serializationError) def processIncomingMsg(self, snmpEngine, messageProcessingModel, maxMessageSize, securityParameters, securityModel, securityLevel, wholeMsg, msg): + mibBuilder = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder + # rfc2576: 5.2.1 communityName, transportInformation = securityParameters @@ -407,6 +461,7 @@ class SnmpV1SecurityModel(base.AbstractSecurityModel): snmpEngine.observer.storeExecutionContext( snmpEngine, 'rfc2576.processIncomingMsg:writable', scope ) + snmpEngine.observer.clearExecutionContext( snmpEngine, 'rfc2576.processIncomingMsg:writable' ) @@ -418,16 +473,17 @@ class SnmpV1SecurityModel(base.AbstractSecurityModel): ) except error.StatusInformation: - snmpInBadCommunityNames, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols( + snmpInBadCommunityNames, = mibBuilder.importSymbols( '__SNMPv2-MIB', 'snmpInBadCommunityNames') snmpInBadCommunityNames.syntax += 1 + raise error.StatusInformation( errorIndication=errind.unknownCommunityName, communityName=communityName ) - snmpEngineID, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('__SNMP-FRAMEWORK-MIB', - 'snmpEngineID') + snmpEngineID, = mibBuilder.importSymbols( + '__SNMP-FRAMEWORK-MIB', 'snmpEngineID') securityEngineID = snmpEngineID.syntax @@ -440,24 +496,31 @@ class SnmpV1SecurityModel(base.AbstractSecurityModel): contextEngineId=contextEngineId, contextName=contextName) ) + snmpEngine.observer.clearExecutionContext( snmpEngine, 'rfc2576.processIncomingMsg' ) debug.logger & debug.FLAG_SM and debug.logger( - 'processIncomingMsg: looked up securityName %r securityModel %r contextEngineId %r contextName %r by communityName %r AND transportInformation %r' % ( - securityName, self.SECURITY_MODEL_ID, contextEngineId, contextName, communityName, transportInformation)) + 'processIncomingMsg: looked up securityName %r securityModel %r ' + 'contextEngineId %r contextName %r by communityName %r ' + 'AND transportInformation ' + '%r' % (securityName, self.SECURITY_MODEL_ID, contextEngineId, + contextName, communityName, transportInformation)) stateReference = self._cache.push(communityName=communityName) scopedPDU = (contextEngineId, contextName, msg.getComponentByPosition(2).getComponent()) + maxSizeResponseScopedPDU = maxMessageSize - 128 + securityStateReference = stateReference debug.logger & debug.FLAG_SM and debug.logger( - 'processIncomingMsg: generated maxSizeResponseScopedPDU %s securityStateReference %s' % ( - maxSizeResponseScopedPDU, securityStateReference)) + 'processIncomingMsg: generated maxSizeResponseScopedPDU ' + '%s securityStateReference ' + '%s' % (maxSizeResponseScopedPDU, securityStateReference)) return (securityEngineID, securityName, scopedPDU, maxSizeResponseScopedPDU, securityStateReference) diff --git a/pysnmp/proto/secmod/rfc3414/auth/hmacmd5.py b/pysnmp/proto/secmod/rfc3414/auth/hmacmd5.py index df3db63f..c3578682 100644 --- a/pysnmp/proto/secmod/rfc3414/auth/hmacmd5.py +++ b/pysnmp/proto/secmod/rfc3414/auth/hmacmd5.py @@ -41,11 +41,13 @@ class HmacMd5(base.AbstractAuthenticationService): # should be in the substrate. Also, it pre-sets digest placeholder # so we hash wholeMsg out of the box. # Yes, that's ugly but that's rfc... - l = wholeMsg.find(TWELVE_ZEROS) - if l == -1: + ln = wholeMsg.find(TWELVE_ZEROS) + + if ln == -1: raise error.ProtocolError('Cant locate digest placeholder') - wholeHead = wholeMsg[:l] - wholeTail = wholeMsg[l + 12:] + + wholeHead = wholeMsg[:ln] + wholeTail = wholeMsg[ln + 12:] # 6.3.1.1 @@ -56,15 +58,13 @@ class HmacMd5(base.AbstractAuthenticationService): # 6.3.1.2c k1 = univ.OctetString( - map(lambda x, y: x ^ y, extendedAuthKey, self.IPAD) - ) + map(lambda x, y: x ^ y, extendedAuthKey, self.IPAD)) # 6.3.1.2d --> no-op # 6.3.1.2e k2 = univ.OctetString( - map(lambda x, y: x ^ y, extendedAuthKey, self.OPAD) - ) + map(lambda x, y: x ^ y, extendedAuthKey, self.OPAD)) # 6.3.1.3 # noinspection PyDeprecation,PyCallingNonCallable @@ -83,15 +83,16 @@ class HmacMd5(base.AbstractAuthenticationService): # 6.3.2.1 & 2 if len(authParameters) != 12: raise error.StatusInformation( - errorIndication=errind.authenticationError - ) + errorIndication=errind.authenticationError) # 6.3.2.3 - l = wholeMsg.find(authParameters.asOctets()) - if l == -1: + ln = wholeMsg.find(authParameters.asOctets()) + if ln == -1: raise error.ProtocolError('Cant locate digest in wholeMsg') - wholeHead = wholeMsg[:l] - wholeTail = wholeMsg[l + 12:] + + wholeHead = wholeMsg[:ln] + wholeTail = wholeMsg[ln + 12:] + authenticatedWholeMsg = wholeHead + TWELVE_ZEROS + wholeTail # 6.3.2.4a @@ -101,15 +102,13 @@ class HmacMd5(base.AbstractAuthenticationService): # 6.3.2.4c k1 = univ.OctetString( - map(lambda x, y: x ^ y, extendedAuthKey, self.IPAD) - ) + map(lambda x, y: x ^ y, extendedAuthKey, self.IPAD)) # 6.3.2.4d --> no-op # 6.3.2.4e k2 = univ.OctetString( - map(lambda x, y: x ^ y, extendedAuthKey, self.OPAD) - ) + map(lambda x, y: x ^ y, extendedAuthKey, self.OPAD)) # 6.3.2.5a # noinspection PyDeprecation,PyCallingNonCallable @@ -125,7 +124,6 @@ class HmacMd5(base.AbstractAuthenticationService): # 6.3.2.6 if mac != authParameters: raise error.StatusInformation( - errorIndication=errind.authenticationFailure - ) + errorIndication=errind.authenticationFailure) return authenticatedWholeMsg diff --git a/pysnmp/proto/secmod/rfc3414/auth/hmacsha.py b/pysnmp/proto/secmod/rfc3414/auth/hmacsha.py index 4d5498d1..76c35af0 100644 --- a/pysnmp/proto/secmod/rfc3414/auth/hmacsha.py +++ b/pysnmp/proto/secmod/rfc3414/auth/hmacsha.py @@ -42,11 +42,12 @@ class HmacSha(base.AbstractAuthenticationService): # should be in the substrate. Also, it pre-sets digest placeholder # so we hash wholeMsg out of the box. # Yes, that's ugly but that's rfc... - l = wholeMsg.find(TWELVE_ZEROS) - if l == -1: + ln = wholeMsg.find(TWELVE_ZEROS) + if ln == -1: raise error.ProtocolError('Cant locate digest placeholder') - wholeHead = wholeMsg[:l] - wholeTail = wholeMsg[l + 12:] + + wholeHead = wholeMsg[:ln] + wholeTail = wholeMsg[ln + 12:] # 7.3.1.2a extendedAuthKey = authKey.asNumbers() + FORTY_FOUR_ZEROS @@ -55,15 +56,13 @@ class HmacSha(base.AbstractAuthenticationService): # 7.3.1.2c k1 = univ.OctetString( - map(lambda x, y: x ^ y, extendedAuthKey, self.IPAD) - ) + map(lambda x, y: x ^ y, extendedAuthKey, self.IPAD)) # 7.3.1.2d -- no-op # 7.3.1.2e k2 = univ.OctetString( - map(lambda x, y: x ^ y, extendedAuthKey, self.OPAD) - ) + map(lambda x, y: x ^ y, extendedAuthKey, self.OPAD)) # 7.3.1.3 d1 = sha1(k1.asOctets() + wholeMsg).digest() @@ -80,15 +79,16 @@ class HmacSha(base.AbstractAuthenticationService): # 7.3.2.1 & 2 if len(authParameters) != 12: raise error.StatusInformation( - errorIndication=errind.authenticationError - ) + errorIndication=errind.authenticationError) # 7.3.2.3 - l = wholeMsg.find(authParameters.asOctets()) - if l == -1: + ln = wholeMsg.find(authParameters.asOctets()) + if ln == -1: raise error.ProtocolError('Cant locate digest in wholeMsg') - wholeHead = wholeMsg[:l] - wholeTail = wholeMsg[l + 12:] + + wholeHead = wholeMsg[:ln] + wholeTail = wholeMsg[ln + 12:] + authenticatedWholeMsg = wholeHead + TWELVE_ZEROS + wholeTail # 7.3.2.4a @@ -98,15 +98,13 @@ class HmacSha(base.AbstractAuthenticationService): # 7.3.2.4c k1 = univ.OctetString( - map(lambda x, y: x ^ y, extendedAuthKey, self.IPAD) - ) + map(lambda x, y: x ^ y, extendedAuthKey, self.IPAD)) # 7.3.2.4d --> no-op # 7.3.2.4e k2 = univ.OctetString( - map(lambda x, y: x ^ y, extendedAuthKey, self.OPAD) - ) + map(lambda x, y: x ^ y, extendedAuthKey, self.OPAD)) # 7.3.2.5a d1 = sha1(k1.asOctets() + authenticatedWholeMsg).digest() @@ -120,7 +118,6 @@ class HmacSha(base.AbstractAuthenticationService): # 7.3.2.6 if mac != authParameters: raise error.StatusInformation( - errorIndication=errind.authenticationFailure - ) + errorIndication=errind.authenticationFailure) return authenticatedWholeMsg diff --git a/pysnmp/proto/secmod/rfc3414/localkey.py b/pysnmp/proto/secmod/rfc3414/localkey.py index 144342fa..bc4faa91 100644 --- a/pysnmp/proto/secmod/rfc3414/localkey.py +++ b/pysnmp/proto/secmod/rfc3414/localkey.py @@ -12,9 +12,12 @@ from pyasn1.type import univ def hashPassphrase(passphrase, hashFunc): passphrase = univ.OctetString(passphrase).asOctets() + hasher = hashFunc() + ringBuffer = passphrase * (64 // len(passphrase) + 1) ringBufferLen = len(ringBuffer) + count = 0 mark = 0 @@ -25,9 +28,11 @@ def hashPassphrase(passphrase, hashFunc): mark = e else: - hasher.update( - ringBuffer[mark:ringBufferLen] + ringBuffer[0:e - ringBufferLen] - ) + chunk = ringBuffer[mark:ringBufferLen] + chunk += ringBuffer[0:e - ringBufferLen] + + hasher.update(chunk) + mark = e - ringBufferLen count += 1 @@ -36,11 +41,13 @@ def hashPassphrase(passphrase, hashFunc): def passwordToKey(passphrase, snmpEngineId, hashFunc): - return localizeKey(hashPassphrase(passphrase, hashFunc), snmpEngineId, hashFunc) + return localizeKey( + hashPassphrase(passphrase, hashFunc), snmpEngineId, hashFunc) def localizeKey(passKey, snmpEngineId, hashFunc): passKey = univ.OctetString(passKey).asOctets() + # noinspection PyDeprecation,PyCallingNonCallable return hashFunc(passKey + snmpEngineId.asOctets() + passKey).digest() diff --git a/pysnmp/proto/secmod/rfc3414/priv/des.py b/pysnmp/proto/secmod/rfc3414/priv/des.py index 5128a662..10058aeb 100644 --- a/pysnmp/proto/secmod/rfc3414/priv/des.py +++ b/pysnmp/proto/secmod/rfc3414/priv/des.py @@ -38,32 +38,39 @@ class Des(base.AbstractEncryptionService): def hashPassphrase(self, authProtocol, privKey): if authProtocol == hmacmd5.HmacMd5.SERVICE_ID: hashAlgo = md5 + elif authProtocol == hmacsha.HmacSha.SERVICE_ID: hashAlgo = sha1 + elif authProtocol in hmacsha2.HmacSha2.HASH_ALGORITHM: hashAlgo = hmacsha2.HmacSha2.HASH_ALGORITHM[authProtocol] + else: raise error.ProtocolError( - 'Unknown auth protocol %s' % (authProtocol,) - ) + 'Unknown auth protocol %s' % (authProtocol,)) + return localkey.hashPassphrase(privKey, hashAlgo) def localizeKey(self, authProtocol, privKey, snmpEngineID): if authProtocol == hmacmd5.HmacMd5.SERVICE_ID: hashAlgo = md5 + elif authProtocol == hmacsha.HmacSha.SERVICE_ID: hashAlgo = sha1 + elif authProtocol in hmacsha2.HmacSha2.HASH_ALGORITHM: hashAlgo = hmacsha2.HmacSha2.HASH_ALGORITHM[authProtocol] + else: raise error.ProtocolError( - 'Unknown auth protocol %s' % (authProtocol,) - ) + 'Unknown auth protocol %s' % (authProtocol,)) + localPrivKey = localkey.localizeKey(privKey, snmpEngineID, hashAlgo) + return localPrivKey[:self.KEY_SIZE] # 8.1.1.1 - def __getEncryptionKey(self, privKey, snmpEngineBoots): + def _getEncryptionKey(self, privKey, snmpEngineBoots): desKey = privKey[:8] preIV = privKey[8:16] @@ -77,42 +84,47 @@ class Des(base.AbstractEncryptionService): self.local_int >> 16 & 0xff, self.local_int >> 8 & 0xff, self.local_int & 0xff] + if self.local_int == 0xffffffff: self.local_int = 0 + else: self.local_int += 1 + iv = map(lambda x, y: x ^ y, salt, preIV.asNumbers()) + return (desKey.asOctets(), univ.OctetString(salt).asOctets(), - univ.OctetString(map(lambda x, y: x ^ y, salt, preIV.asNumbers())).asOctets()) + univ.OctetString(iv).asOctets()) @staticmethod - def __getDecryptionKey(privKey, salt): - return (privKey[:8].asOctets(), - univ.OctetString(map(lambda x, y: x ^ y, salt.asNumbers(), privKey[8:16].asNumbers())).asOctets()) + def _getDecryptionKey(privKey, salt): + iv = map(lambda x, y: x ^ y, salt.asNumbers(), + privKey[8:16].asNumbers()) + + return privKey[:8].asOctets(), univ.OctetString(iv).asOctets() # 8.2.4.1 def encryptData(self, encryptKey, privParameters, dataToEncrypt): snmpEngineBoots, snmpEngineTime, salt = privParameters # 8.3.1.1 - desKey, salt, iv = self.__getEncryptionKey( - encryptKey, snmpEngineBoots - ) + desKey, salt, iv = self._getEncryptionKey(encryptKey, snmpEngineBoots) # 8.3.1.2 privParameters = univ.OctetString(salt) # 8.1.1.2 - plaintext = dataToEncrypt + univ.OctetString((0,) * (8 - len(dataToEncrypt) % 8)).asOctets() + plaintext = dataToEncrypt + plaintext += univ.OctetString( + (0,) * (8 - len(dataToEncrypt) % 8)).asOctets() try: ciphertext = des.encrypt(plaintext, desKey, iv) except PysnmpCryptoError: raise error.StatusInformation( - errorIndication=errind.unsupportedPrivProtocol - ) + errorIndication=errind.unsupportedPrivProtocol) # 8.3.1.3 & 4 return univ.OctetString(ciphertext), privParameters @@ -124,19 +136,17 @@ class Des(base.AbstractEncryptionService): # 8.3.2.1 if len(salt) != 8: raise error.StatusInformation( - errorIndication=errind.decryptionError - ) + errorIndication=errind.decryptionError) # 8.3.2.2 no-op # 8.3.2.3 - desKey, iv = self.__getDecryptionKey(decryptKey, salt) + desKey, iv = self._getDecryptionKey(decryptKey, salt) # 8.3.2.4 -> 8.1.1.3 if len(encryptedData) % 8 != 0: raise error.StatusInformation( - errorIndication=errind.decryptionError - ) + errorIndication=errind.decryptionError) try: # 8.3.2.6 @@ -144,5 +154,4 @@ class Des(base.AbstractEncryptionService): except PysnmpCryptoError: raise error.StatusInformation( - errorIndication=errind.unsupportedPrivProtocol - ) + errorIndication=errind.unsupportedPrivProtocol) diff --git a/pysnmp/proto/secmod/rfc3414/service.py b/pysnmp/proto/secmod/rfc3414/service.py index f9bf056b..0a8f54e5 100644 --- a/pysnmp/proto/secmod/rfc3414/service.py +++ b/pysnmp/proto/secmod/rfc3414/service.py @@ -38,15 +38,26 @@ from pysnmp.smi.error import NoSuchInstanceError class UsmSecurityParameters(rfc1155.TypeCoercionHackMixIn, univ.Sequence): componentType = namedtype.NamedTypes( - namedtype.NamedType('msgAuthoritativeEngineId', univ.OctetString()), - namedtype.NamedType('msgAuthoritativeEngineBoots', - univ.Integer().subtype(subtypeSpec=constraint.ValueRangeConstraint(0, 2147483647))), - namedtype.NamedType('msgAuthoritativeEngineTime', - univ.Integer().subtype(subtypeSpec=constraint.ValueRangeConstraint(0, 2147483647))), - namedtype.NamedType('msgUserName', - univ.OctetString().subtype(subtypeSpec=constraint.ValueSizeConstraint(0, 32))), - namedtype.NamedType('msgAuthenticationParameters', univ.OctetString()), - namedtype.NamedType('msgPrivacyParameters', univ.OctetString()) + namedtype.NamedType( + 'msgAuthoritativeEngineId', univ.OctetString()), + namedtype.NamedType( + 'msgAuthoritativeEngineBoots', + univ.Integer().subtype( + subtypeSpec=constraint.ValueRangeConstraint(0, 2147483647))), + namedtype.NamedType( + 'msgAuthoritativeEngineTime', + univ.Integer().subtype( + subtypeSpec=constraint.ValueRangeConstraint(0, 2147483647))), + namedtype.NamedType( + 'msgUserName', + univ.OctetString().subtype( + subtypeSpec=constraint.ValueSizeConstraint(0, 32))), + namedtype.NamedType( + 'msgAuthenticationParameters', + univ.OctetString()), + namedtype.NamedType( + 'msgPrivacyParameters', + univ.OctetString()) ) @@ -56,10 +67,14 @@ class SnmpUSMSecurityModel(AbstractSecurityModel): AUTH_SERVICES = { hmacmd5.HmacMd5.SERVICE_ID: hmacmd5.HmacMd5(), hmacsha.HmacSha.SERVICE_ID: hmacsha.HmacSha(), - hmacsha2.HmacSha2.SHA224_SERVICE_ID: hmacsha2.HmacSha2(hmacsha2.HmacSha2.SHA224_SERVICE_ID), - hmacsha2.HmacSha2.SHA256_SERVICE_ID: hmacsha2.HmacSha2(hmacsha2.HmacSha2.SHA256_SERVICE_ID), - hmacsha2.HmacSha2.SHA384_SERVICE_ID: hmacsha2.HmacSha2(hmacsha2.HmacSha2.SHA384_SERVICE_ID), - hmacsha2.HmacSha2.SHA512_SERVICE_ID: hmacsha2.HmacSha2(hmacsha2.HmacSha2.SHA512_SERVICE_ID), + hmacsha2.HmacSha2.SHA224_SERVICE_ID: hmacsha2.HmacSha2( + hmacsha2.HmacSha2.SHA224_SERVICE_ID), + hmacsha2.HmacSha2.SHA256_SERVICE_ID: hmacsha2.HmacSha2( + hmacsha2.HmacSha2.SHA256_SERVICE_ID), + hmacsha2.HmacSha2.SHA384_SERVICE_ID: hmacsha2.HmacSha2( + hmacsha2.HmacSha2.SHA384_SERVICE_ID), + hmacsha2.HmacSha2.SHA512_SERVICE_ID: hmacsha2.HmacSha2( + hmacsha2.HmacSha2.SHA512_SERVICE_ID), noauth.NoAuth.SERVICE_ID: noauth.NoAuth(), } @@ -76,21 +91,23 @@ class SnmpUSMSecurityModel(AbstractSecurityModel): def __init__(self): AbstractSecurityModel.__init__(self) - self.__securityParametersSpec = UsmSecurityParameters() - self.__timeline = {} - self.__timelineExpQueue = {} - self.__expirationTimer = 0 - self.__paramsBranchId = -1 + self._securityParametersSpec = UsmSecurityParameters() + self._timeline = {} + self._timelineExpQueue = {} + self._expirationTimer = 0 + self._paramsBranchId = -1 - def __sec2usr(self, snmpEngine, securityName, securityEngineID=None): + def _sec2usr(self, snmpEngine, securityName, securityEngineID=None): mibBuilder = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder - usmUserEngineID, = mibBuilder.importSymbols('SNMP-USER-BASED-SM-MIB', - 'usmUserEngineID') - if self.__paramsBranchId != usmUserEngineID.branchVersionId: + + usmUserEngineID, = mibBuilder.importSymbols( + 'SNMP-USER-BASED-SM-MIB', 'usmUserEngineID') + + if self._paramsBranchId != usmUserEngineID.branchVersionId: usmUserName, usmUserSecurityName = mibBuilder.importSymbols( 'SNMP-USER-BASED-SM-MIB', 'usmUserName', 'usmUserSecurityName') - self.__securityToUserMap = {} + self._securityToUserMap = {} nextMibNode = usmUserEngineID @@ -99,108 +116,138 @@ class SnmpUSMSecurityModel(AbstractSecurityModel): nextMibNode = usmUserEngineID.getNextNode(nextMibNode.name) except NoSuchInstanceError: - self.__paramsBranchId = usmUserEngineID.branchVersionId + self._paramsBranchId = usmUserEngineID.branchVersionId + debug.logger & debug.FLAG_SM and debug.logger( - '_sec2usr: built snmpEngineId + securityName to userName map, version %s: %r' % ( - self.__paramsBranchId, self.__securityToUserMap)) + '_sec2usr: built snmpEngineId + securityName to ' + 'userName map, version %s: %r' % ( + self._paramsBranchId, self._securityToUserMap)) break instId = nextMibNode.name[len(usmUserSecurityName.name):] - __engineID = usmUserEngineID.getNode(usmUserEngineID.name + instId).syntax - __userName = usmUserName.getNode(usmUserName.name + instId).syntax - __securityName = usmUserSecurityName.getNode(usmUserSecurityName.name + instId).syntax + _engineID = usmUserEngineID.getNode( + usmUserEngineID.name + instId).syntax + _userName = usmUserName.getNode( + usmUserName.name + instId).syntax + _securityName = usmUserSecurityName.getNode( + usmUserSecurityName.name + instId).syntax - k = __engineID, __securityName + k = _engineID, _securityName # first (lesser) securityName wins - if k not in self.__securityToUserMap: - self.__securityToUserMap[k] = __userName + if k not in self._securityToUserMap: + self._securityToUserMap[k] = _userName if securityEngineID is None: - snmpEngineID, = mibBuilder.importSymbols('__SNMP-FRAMEWORK-MIB', 'snmpEngineID') + snmpEngineID, = mibBuilder.importSymbols( + '__SNMP-FRAMEWORK-MIB', 'snmpEngineID') securityEngineID = snmpEngineID.syntax try: - userName = self.__securityToUserMap[(securityEngineID, securityName)] + userName = self._securityToUserMap[(securityEngineID, securityName)] + except KeyError: debug.logger & debug.FLAG_SM and debug.logger( - '_sec2usr: no entry exists for snmpEngineId %r, securityName %r' % (securityEngineID, securityName)) + '_sec2usr: no entry exists for snmpEngineId %r, securityName ' + '%r' % (securityEngineID, securityName)) raise NoSuchInstanceError() # emulate MIB lookup debug.logger & debug.FLAG_SM and debug.logger( - '_sec2usr: using userName %r for snmpEngineId %r, securityName %r' % ( - userName, securityEngineID, securityName)) + '_sec2usr: using userName %r for snmpEngineId %r, securityName ' + '%r' % (userName, securityEngineID, securityName)) return userName @staticmethod - def __getUserInfo(mibInstrumController, securityEngineID, userName): - usmUserEntry, = mibInstrumController.mibBuilder.importSymbols( - 'SNMP-USER-BASED-SM-MIB', 'usmUserEntry' - ) + def _getUserInfo(mibInstrumController, securityEngineID, userName): + mibBuilder = mibInstrumController.mibBuilder + + usmUserEntry, = mibBuilder.importSymbols( + 'SNMP-USER-BASED-SM-MIB', 'usmUserEntry') + tblIdx = usmUserEntry.getInstIdFromIndices(securityEngineID, userName) + # Get userName & securityName usmUserName = usmUserEntry.getNode(usmUserEntry.name + (2,) + tblIdx).syntax usmUserSecurityName = usmUserEntry.getNode(usmUserEntry.name + (3,) + tblIdx).syntax + # Get protocols usmUserAuthProtocol = usmUserEntry.getNode(usmUserEntry.name + (5,) + tblIdx).syntax usmUserPrivProtocol = usmUserEntry.getNode(usmUserEntry.name + (8,) + tblIdx).syntax + # Get keys - pysnmpUsmKeyEntry, = mibInstrumController.mibBuilder.importSymbols( - 'PYSNMP-USM-MIB', 'pysnmpUsmKeyEntry' - ) - pysnmpUsmKeyAuthLocalized = pysnmpUsmKeyEntry.getNode(pysnmpUsmKeyEntry.name + (1,) + tblIdx).syntax - pysnmpUsmKeyPrivLocalized = pysnmpUsmKeyEntry.getNode(pysnmpUsmKeyEntry.name + (2,) + tblIdx).syntax + pysnmpUsmKeyEntry, = mibBuilder.importSymbols( + 'PYSNMP-USM-MIB', 'pysnmpUsmKeyEntry') + + pysnmpUsmKeyAuthLocalized = pysnmpUsmKeyEntry.getNode( + pysnmpUsmKeyEntry.name + (1,) + tblIdx).syntax + pysnmpUsmKeyPrivLocalized = pysnmpUsmKeyEntry.getNode( + pysnmpUsmKeyEntry.name + (2,) + tblIdx).syntax + return (usmUserName, usmUserSecurityName, usmUserAuthProtocol, pysnmpUsmKeyAuthLocalized, usmUserPrivProtocol, pysnmpUsmKeyPrivLocalized) - def __cloneUserInfo(self, snmpEngine, securityEngineID, userName): + def _cloneUserInfo(self, snmpEngine, securityEngineID, userName): mibInstrumController = snmpEngine.msgAndPduDsp.mibInstrumController + mibBuilder = mibInstrumController.mibBuilder + + snmpEngineID, = mibBuilder.importSymbols( + '__SNMP-FRAMEWORK-MIB', 'snmpEngineID') - snmpEngineID, = mibInstrumController.mibBuilder.importSymbols( - '__SNMP-FRAMEWORK-MIB', 'snmpEngineID' - ) # Proto entry - usmUserEntry, = mibInstrumController.mibBuilder.importSymbols( - 'SNMP-USER-BASED-SM-MIB', 'usmUserEntry' - ) + usmUserEntry, = mibBuilder.importSymbols( + 'SNMP-USER-BASED-SM-MIB', 'usmUserEntry') + tblIdx1 = usmUserEntry.getInstIdFromIndices( - snmpEngineID.syntax, userName - ) + snmpEngineID.syntax, userName) + # Get proto protocols - usmUserName = usmUserEntry.getNode(usmUserEntry.name + (2,) + tblIdx1) - usmUserSecurityName = usmUserEntry.getNode(usmUserEntry.name + (3,) + tblIdx1) - usmUserCloneFrom = usmUserEntry.getNode(usmUserEntry.name + (4,) + tblIdx1) - usmUserAuthProtocol = usmUserEntry.getNode(usmUserEntry.name + (5,) + tblIdx1) - usmUserPrivProtocol = usmUserEntry.getNode(usmUserEntry.name + (8,) + tblIdx1) + usmUserName = usmUserEntry.getNode( + usmUserEntry.name + (2,) + tblIdx1) + usmUserSecurityName = usmUserEntry.getNode( + usmUserEntry.name + (3,) + tblIdx1) + usmUserCloneFrom = usmUserEntry.getNode( + usmUserEntry.name + (4,) + tblIdx1) + usmUserAuthProtocol = usmUserEntry.getNode( + usmUserEntry.name + (5,) + tblIdx1) + usmUserPrivProtocol = usmUserEntry.getNode( + usmUserEntry.name + (8,) + tblIdx1) + # Get proto keys - pysnmpUsmKeyEntry, = mibInstrumController.mibBuilder.importSymbols( - 'PYSNMP-USM-MIB', 'pysnmpUsmKeyEntry' - ) - pysnmpUsmKeyAuth = pysnmpUsmKeyEntry.getNode(pysnmpUsmKeyEntry.name + (3,) + tblIdx1) - pysnmpUsmKeyPriv = pysnmpUsmKeyEntry.getNode(pysnmpUsmKeyEntry.name + (4,) + tblIdx1) + pysnmpUsmKeyEntry, = mibBuilder.importSymbols( + 'PYSNMP-USM-MIB', 'pysnmpUsmKeyEntry') + + pysnmpUsmKeyAuth = pysnmpUsmKeyEntry.getNode( + pysnmpUsmKeyEntry.name + (3,) + tblIdx1) + pysnmpUsmKeyPriv = pysnmpUsmKeyEntry.getNode( + pysnmpUsmKeyEntry.name + (4,) + tblIdx1) # Create new row from proto values - tblIdx2 = usmUserEntry.getInstIdFromIndices(securityEngineID, userName) + tblIdx2 = usmUserEntry.getInstIdFromIndices( + securityEngineID, userName) # New inactive row mibInstrumController.writeMibObjects( - (usmUserEntry.name + (13,) + tblIdx2, 5), snmpEngine=snmpEngine - ) + (usmUserEntry.name + (13,) + tblIdx2, 5), snmpEngine=snmpEngine) # Set user&securityNames - usmUserEntry.getNode(usmUserEntry.name + (2,) + tblIdx2).syntax = usmUserName.syntax - usmUserEntry.getNode(usmUserEntry.name + (3,) + tblIdx2).syntax = usmUserSecurityName.syntax + usmUserEntry.getNode( + usmUserEntry.name + (2,) + tblIdx2).syntax = usmUserName.syntax + usmUserEntry.getNode( + usmUserEntry.name + (3,) + tblIdx2).syntax = usmUserSecurityName.syntax # Store a reference to original row - usmUserEntry.getNode(usmUserEntry.name + (4,) + tblIdx2).syntax = usmUserCloneFrom.syntax.clone(tblIdx1) + usmUserEntry.getNode( + usmUserEntry.name + (4,) + tblIdx2).syntax = usmUserCloneFrom.syntax.clone(tblIdx1) # Set protocols - usmUserEntry.getNode(usmUserEntry.name + (5,) + tblIdx2).syntax = usmUserAuthProtocol.syntax - usmUserEntry.getNode(usmUserEntry.name + (8,) + tblIdx2).syntax = usmUserPrivProtocol.syntax + usmUserEntry.getNode( + usmUserEntry.name + (5,) + tblIdx2).syntax = usmUserAuthProtocol.syntax + usmUserEntry.getNode( + usmUserEntry.name + (8,) + tblIdx2).syntax = usmUserPrivProtocol.syntax # Activate row mibInstrumController.writeMibObjects( @@ -208,98 +255,124 @@ class SnmpUSMSecurityModel(AbstractSecurityModel): ) # Localize and set keys - pysnmpUsmKeyEntry, = mibInstrumController.mibBuilder.importSymbols( - 'PYSNMP-USM-MIB', 'pysnmpUsmKeyEntry' - ) + pysnmpUsmKeyEntry, = mibBuilder.importSymbols( + 'PYSNMP-USM-MIB', 'pysnmpUsmKeyEntry') pysnmpUsmKeyAuthLocalized = pysnmpUsmKeyEntry.getNode( - pysnmpUsmKeyEntry.name + (1,) + tblIdx2 - ) + pysnmpUsmKeyEntry.name + (1,) + tblIdx2) + if usmUserAuthProtocol.syntax in self.AUTH_SERVICES: localizeKey = self.AUTH_SERVICES[usmUserAuthProtocol.syntax].localizeKey - localAuthKey = localizeKey(pysnmpUsmKeyAuth.syntax, - securityEngineID) + localAuthKey = localizeKey( + pysnmpUsmKeyAuth.syntax, securityEngineID) + else: raise error.StatusInformation( - errorIndication=errind.unsupportedAuthProtocol - ) + errorIndication=errind.unsupportedAuthProtocol) + if localAuthKey is not None: pysnmpUsmKeyAuthLocalized.syntax = pysnmpUsmKeyAuthLocalized.syntax.clone(localAuthKey) + pysnmpUsmKeyPrivLocalized = pysnmpUsmKeyEntry.getNode( - pysnmpUsmKeyEntry.name + (2,) + tblIdx2 - ) + pysnmpUsmKeyEntry.name + (2,) + tblIdx2) + if usmUserPrivProtocol.syntax in self.PRIV_SERVICES: localizeKey = self.PRIV_SERVICES[usmUserPrivProtocol.syntax].localizeKey - localPrivKey = localizeKey(usmUserAuthProtocol.syntax, - pysnmpUsmKeyPriv.syntax, - securityEngineID) + localPrivKey = localizeKey( + usmUserAuthProtocol.syntax, pysnmpUsmKeyPriv.syntax, + securityEngineID) + else: - raise error.StatusInformation(errorIndication=errind.unsupportedPrivProtocol) + raise error.StatusInformation( + errorIndication=errind.unsupportedPrivProtocol) + if localPrivKey is not None: pysnmpUsmKeyPrivLocalized.syntax = pysnmpUsmKeyPrivLocalized.syntax.clone(localPrivKey) + return (usmUserName.syntax, usmUserSecurityName.syntax, usmUserAuthProtocol.syntax, pysnmpUsmKeyAuthLocalized.syntax, usmUserPrivProtocol.syntax, pysnmpUsmKeyPrivLocalized.syntax) - def __generateRequestOrResponseMsg(self, snmpEngine, - messageProcessingModel, - globalData, maxMessageSize, - securityModel, securityEngineID, - securityName, securityLevel, - scopedPDU, securityStateReference): + def _generateRequestOrResponseMsg(self, snmpEngine, + messageProcessingModel, + globalData, maxMessageSize, + securityModel, securityEngineID, + securityName, securityLevel, + scopedPDU, securityStateReference): + mibBuilder = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder - snmpEngineID = mibBuilder.importSymbols('__SNMP-FRAMEWORK-MIB', 'snmpEngineID')[0].syntax + + snmpEngineID = mibBuilder.importSymbols( + '__SNMP-FRAMEWORK-MIB', 'snmpEngineID')[0].syntax # 3.1.1 if securityStateReference is not None: # 3.1.1a cachedSecurityData = self._cache.pop(securityStateReference) + usmUserName = cachedSecurityData['msgUserName'] + if 'usmUserSecurityName' in cachedSecurityData: usmUserSecurityName = cachedSecurityData['usmUserSecurityName'] + else: usmUserSecurityName = usmUserName + if 'usmUserAuthProtocol' in cachedSecurityData: usmUserAuthProtocol = cachedSecurityData['usmUserAuthProtocol'] + else: usmUserAuthProtocol = noauth.NoAuth.SERVICE_ID + if 'usmUserAuthKeyLocalized' in cachedSecurityData: usmUserAuthKeyLocalized = cachedSecurityData['usmUserAuthKeyLocalized'] + else: usmUserAuthKeyLocalized = None + if 'usmUserPrivProtocol' in cachedSecurityData: usmUserPrivProtocol = cachedSecurityData['usmUserPrivProtocol'] + else: usmUserPrivProtocol = nopriv.NoPriv.SERVICE_ID + if 'usmUserPrivKeyLocalized' in cachedSecurityData: usmUserPrivKeyLocalized = cachedSecurityData['usmUserPrivKeyLocalized'] + else: usmUserPrivKeyLocalized = None + securityEngineID = snmpEngineID - debug.logger & debug.FLAG_SM and debug.logger('__generateRequestOrResponseMsg: user info read from cache') + + debug.logger & debug.FLAG_SM and debug.logger( + '__generateRequestOrResponseMsg: user info read from cache') + elif securityName: # 3.1.1b try: (usmUserName, usmUserSecurityName, usmUserAuthProtocol, usmUserAuthKeyLocalized, usmUserPrivProtocol, - usmUserPrivKeyLocalized) = self.__getUserInfo( + usmUserPrivKeyLocalized) = self._getUserInfo( snmpEngine.msgAndPduDsp.mibInstrumController, securityEngineID, - self.__sec2usr(snmpEngine, securityName, securityEngineID) + self._sec2usr(snmpEngine, securityName, securityEngineID) ) - debug.logger & debug.FLAG_SM and debug.logger('__generateRequestOrResponseMsg: read user info') + debug.logger & debug.FLAG_SM and debug.logger( + '__generateRequestOrResponseMsg: read user info') except NoSuchInstanceError: - pysnmpUsmDiscovery, = mibBuilder.importSymbols('__PYSNMP-USM-MIB', 'pysnmpUsmDiscovery') + pysnmpUsmDiscovery, = mibBuilder.importSymbols( + '__PYSNMP-USM-MIB', 'pysnmpUsmDiscovery') + reportUnknownName = not pysnmpUsmDiscovery.syntax + if not reportUnknownName: try: (usmUserName, usmUserSecurityName, usmUserAuthProtocol, usmUserAuthKeyLocalized, usmUserPrivProtocol, - usmUserPrivKeyLocalized) = self.__cloneUserInfo( - snmpEngine, - securityEngineID, - self.__sec2usr(snmpEngine, securityName) + usmUserPrivKeyLocalized) = self._cloneUserInfo( + snmpEngine, securityEngineID, + self._sec2usr(snmpEngine, securityName) ) except NoSuchInstanceError: @@ -307,31 +380,40 @@ class SnmpUSMSecurityModel(AbstractSecurityModel): if reportUnknownName: raise error.StatusInformation( - errorIndication=errind.unknownSecurityName - ) + errorIndication=errind.unknownSecurityName) - debug.logger & debug.FLAG_SM and debug.logger('__generateRequestOrResponseMsg: clone user info') + debug.logger & debug.FLAG_SM and debug.logger( + '__generateRequestOrResponseMsg: clone user info') except PyAsn1Error as exc: debug.logger & debug.FLAG_SM and debug.logger( '__generateRequestOrResponseMsg: %s' % exc) - snmpInGenErrs, = mibBuilder.importSymbols('__SNMPv2-MIB', 'snmpInGenErrs') + + snmpInGenErrs, = mibBuilder.importSymbols( + '__SNMPv2-MIB', 'snmpInGenErrs') snmpInGenErrs.syntax += 1 - raise error.StatusInformation( - errorIndication=errind.invalidMsg - ) + + raise error.StatusInformation(errorIndication=errind.invalidMsg) + else: # empty username used for engineID discovery usmUserName = usmUserSecurityName = null usmUserAuthProtocol = noauth.NoAuth.SERVICE_ID usmUserPrivProtocol = nopriv.NoPriv.SERVICE_ID + usmUserAuthKeyLocalized = usmUserPrivKeyLocalized = None - debug.logger & debug.FLAG_SM and debug.logger('__generateRequestOrResponseMsg: use empty USM data') + + debug.logger & debug.FLAG_SM and debug.logger( + '__generateRequestOrResponseMsg: use empty USM data') # noinspection PyUnboundLocalVariable debug.logger & debug.FLAG_SM and debug.logger( - '__generateRequestOrResponseMsg: local usmUserName %r usmUserSecurityName %r usmUserAuthProtocol %s usmUserPrivProtocol %s securityEngineID %r securityName %r' % ( - usmUserName, usmUserSecurityName, usmUserAuthProtocol, usmUserPrivProtocol, securityEngineID, securityName)) + '__generateRequestOrResponseMsg: local usmUserName %r ' + 'usmUserSecurityName %r usmUserAuthProtocol %s ' + 'usmUserPrivProtocol %s securityEngineID %r ' + 'securityName %r' % ( + usmUserName, usmUserSecurityName, usmUserAuthProtocol, + usmUserPrivProtocol, securityEngineID, securityName)) msg = globalData @@ -340,22 +422,21 @@ class SnmpUSMSecurityModel(AbstractSecurityModel): if (usmUserAuthProtocol == noauth.NoAuth.SERVICE_ID or usmUserPrivProtocol == nopriv.NoPriv.SERVICE_ID): raise error.StatusInformation( - errorIndication=errind.unsupportedSecurityLevel - ) + errorIndication=errind.unsupportedSecurityLevel) # 3.1.3 if securityLevel == 3 or securityLevel == 2: if usmUserAuthProtocol == noauth.NoAuth.SERVICE_ID: raise error.StatusInformation( - errorIndication=errind.unsupportedSecurityLevel - ) + errorIndication=errind.unsupportedSecurityLevel) - securityParameters = self.__securityParametersSpec + securityParameters = self._securityParametersSpec scopedPDUData = msg.setComponentByPosition(3).getComponentByPosition(3) + scopedPDUData.setComponentByPosition( - 0, scopedPDU, verifyConstraints=False, matchTags=False, matchConstraints=False - ) + 0, scopedPDU, verifyConstraints=False, matchTags=False, + matchConstraints=False) snmpEngineBoots = snmpEngineTime = 0 @@ -364,161 +445,180 @@ class SnmpUSMSecurityModel(AbstractSecurityModel): # 3.1.6.b if pdu.tagSet in rfc3411.UNCONFIRMED_CLASS_PDUS: - (snmpEngineBoots, - snmpEngineTime) = mibBuilder.importSymbols('__SNMP-FRAMEWORK-MIB', 'snmpEngineBoots', 'snmpEngineTime') + snmpEngineBoots, snmpEngineTime = mibBuilder.importSymbols( + '__SNMP-FRAMEWORK-MIB', 'snmpEngineBoots', + 'snmpEngineTime') snmpEngineBoots = snmpEngineBoots.syntax snmpEngineTime = snmpEngineTime.syntax.clone() debug.logger & debug.FLAG_SM and debug.logger( - '__generateRequestOrResponseMsg: read snmpEngineBoots, snmpEngineTime from LCD') + '__generateRequestOrResponseMsg: read snmpEngineBoots, ' + 'snmpEngineTime from LCD') # 3.1.6a - elif securityEngineID in self.__timeline: + elif securityEngineID in self._timeline: (snmpEngineBoots, snmpEngineTime, latestReceivedEngineTime, - latestUpdateTimestamp) = self.__timeline[securityEngineID] + latestUpdateTimestamp) = self._timeline[securityEngineID] debug.logger & debug.FLAG_SM and debug.logger( - '__generateRequestOrResponseMsg: read snmpEngineBoots, snmpEngineTime from timeline') + '__generateRequestOrResponseMsg: read snmpEngineBoots, ' + 'snmpEngineTime from timeline') # 3.1.6.c else: debug.logger & debug.FLAG_SM and debug.logger( - '__generateRequestOrResponseMsg: assuming zero snmpEngineBoots, snmpEngineTime') + '__generateRequestOrResponseMsg: assuming zero ' + 'snmpEngineBoots, snmpEngineTime') debug.logger & debug.FLAG_SM and debug.logger( - '__generateRequestOrResponseMsg: use snmpEngineBoots %s snmpEngineTime %s for securityEngineID %r' % ( + '__generateRequestOrResponseMsg: use snmpEngineBoots %s ' + 'snmpEngineTime %s for securityEngineID %r' % ( snmpEngineBoots, snmpEngineTime, securityEngineID)) # 3.1.4a if securityLevel == 3: if usmUserPrivProtocol in self.PRIV_SERVICES: privHandler = self.PRIV_SERVICES[usmUserPrivProtocol] + else: raise error.StatusInformation( - errorIndication=errind.encryptionError - ) + errorIndication=errind.encryptionError) debug.logger & debug.FLAG_SM and debug.logger( - '__generateRequestOrResponseMsg: scopedPDU %s' % scopedPDU.prettyPrint()) + '__generateRequestOrResponseMsg: scopedPDU ' + '%s' % scopedPDU.prettyPrint()) try: dataToEncrypt = encoder.encode(scopedPDU) except PyAsn1Error as exc: debug.logger & debug.FLAG_SM and debug.logger( - '__generateRequestOrResponseMsg: scopedPDU serialization error: %s' % exc) + '__generateRequestOrResponseMsg: scopedPDU serialization ' + 'error: %s' % exc) raise error.StatusInformation( - errorIndication=errind.serializationError - ) + errorIndication=errind.serializationError) debug.logger & debug.FLAG_SM and debug.logger( - '__generateRequestOrResponseMsg: scopedPDU encoded into %s' % debug.hexdump(dataToEncrypt)) + '__generateRequestOrResponseMsg: scopedPDU encoded into ' + '%s' % debug.hexdump(dataToEncrypt)) # noinspection PyUnboundLocalVariable - (encryptedData, - privParameters) = privHandler.encryptData( + encryptedData, privParameters = privHandler.encryptData( usmUserPrivKeyLocalized, - (snmpEngineBoots, snmpEngineTime, None), dataToEncrypt - ) + (snmpEngineBoots, snmpEngineTime, None), + dataToEncrypt) securityParameters.setComponentByPosition( - 5, privParameters, verifyConstraints=False, matchTags=False, matchConstraints=False - ) + 5, privParameters, verifyConstraints=False, matchTags=False, + matchConstraints=False) + scopedPDUData.setComponentByPosition( - 1, encryptedData, verifyConstraints=False, matchTags=False, matchConstraints=False - ) + 1, encryptedData, verifyConstraints=False, matchTags=False, + matchConstraints=False) debug.logger & debug.FLAG_SM and debug.logger( - '__generateRequestOrResponseMsg: scopedPDU ciphered into %s' % debug.hexdump(encryptedData)) + '__generateRequestOrResponseMsg: scopedPDU ciphered into ' + '%s' % debug.hexdump(encryptedData)) # 3.1.4b elif securityLevel == 1 or securityLevel == 2: securityParameters.setComponentByPosition(5, '') - debug.logger & debug.FLAG_SM and debug.logger('__generateRequestOrResponseMsg: %s' % scopedPDUData.prettyPrint()) + debug.logger & debug.FLAG_SM and debug.logger( + '__generateRequestOrResponseMsg: %s' % scopedPDUData.prettyPrint()) # 3.1.5 securityParameters.setComponentByPosition( - 0, securityEngineID, verifyConstraints=False, matchTags=False, matchConstraints=False - ) + 0, securityEngineID, verifyConstraints=False, matchTags=False, + matchConstraints=False) + securityParameters.setComponentByPosition( - 1, snmpEngineBoots, verifyConstraints=False, matchTags=False, matchConstraints=False - ) + 1, snmpEngineBoots, verifyConstraints=False, matchTags=False, + matchConstraints=False) + securityParameters.setComponentByPosition( - 2, snmpEngineTime, verifyConstraints=False, matchTags=False, matchConstraints=False - ) + 2, snmpEngineTime, verifyConstraints=False, matchTags=False, + matchConstraints=False) # 3.1.7 securityParameters.setComponentByPosition( - 3, usmUserName, verifyConstraints=False, matchTags=False, matchConstraints=False - ) + 3, usmUserName, verifyConstraints=False, matchTags=False, + matchConstraints=False) # 3.1.8a if securityLevel == 3 or securityLevel == 2: if usmUserAuthProtocol in self.AUTH_SERVICES: authHandler = self.AUTH_SERVICES[usmUserAuthProtocol] + else: raise error.StatusInformation( - errorIndication=errind.authenticationFailure - ) + errorIndication=errind.authenticationFailure) # extra-wild hack to facilitate BER substrate in-place re-write securityParameters.setComponentByPosition( - 4, '\x00' * authHandler.digestLength - ) + 4, '\x00' * authHandler.digestLength) debug.logger & debug.FLAG_SM and debug.logger( - '__generateRequestOrResponseMsg: %s' % (securityParameters.prettyPrint(),)) + '__generateRequestOrResponseMsg: ' + '%s' % (securityParameters.prettyPrint(),)) try: - msg.setComponentByPosition(2, encoder.encode(securityParameters), verifyConstraints=False) + msg.setComponentByPosition( + 2, encoder.encode(securityParameters), + verifyConstraints=False) except PyAsn1Error as exc: debug.logger & debug.FLAG_SM and debug.logger( - '__generateRequestOrResponseMsg: securityParameters serialization error: %s' % exc) + '__generateRequestOrResponseMsg: securityParameters ' + 'serialization error: %s' % exc) + raise error.StatusInformation( - errorIndication=errind.serializationError - ) + errorIndication=errind.serializationError) debug.logger & debug.FLAG_SM and debug.logger( - '__generateRequestOrResponseMsg: auth outgoing msg: %s' % msg.prettyPrint()) + '__generateRequestOrResponseMsg: auth outgoing msg: ' + '%s' % msg.prettyPrint()) try: wholeMsg = encoder.encode(msg) except PyAsn1Error as exc: debug.logger & debug.FLAG_SM and debug.logger( - '__generateRequestOrResponseMsg: msg serialization error: %s' % exc) + '__generateRequestOrResponseMsg: msg serialization ' + 'error: %s' % exc) + raise error.StatusInformation( - errorIndication=errind.serializationError - ) + errorIndication=errind.serializationError) # noinspection PyUnboundLocalVariable authenticatedWholeMsg = authHandler.authenticateOutgoingMsg( - usmUserAuthKeyLocalized, wholeMsg - ) + usmUserAuthKeyLocalized, wholeMsg) # 3.1.8b else: securityParameters.setComponentByPosition( - 4, '', verifyConstraints=False, matchTags=False, matchConstraints=False - ) + 4, '', verifyConstraints=False, matchTags=False, + matchConstraints=False) debug.logger & debug.FLAG_SM and debug.logger( - '__generateRequestOrResponseMsg: %s' % (securityParameters.prettyPrint(),)) + '__generateRequestOrResponseMsg: ' + '%s' % (securityParameters.prettyPrint(),)) try: - msg.setComponentByPosition(2, encoder.encode(securityParameters), verifyConstraints=False, matchTags=False, matchConstraints=False) + msg.setComponentByPosition( + 2, encoder.encode(securityParameters), verifyConstraints=False, + matchTags=False, matchConstraints=False) except PyAsn1Error as exc: debug.logger & debug.FLAG_SM and debug.logger( - '__generateRequestOrResponseMsg: secutiryParameters serialization error: %s' % exc) + '__generateRequestOrResponseMsg: securityParameters ' + 'serialization error: %s' % exc) + raise error.StatusInformation( - errorIndication=errind.serializationError - ) + errorIndication=errind.serializationError) try: debug.logger & debug.FLAG_SM and debug.logger( @@ -527,13 +627,16 @@ class SnmpUSMSecurityModel(AbstractSecurityModel): except PyAsn1Error as exc: debug.logger & debug.FLAG_SM and debug.logger( - '__generateRequestOrResponseMsg: msg serialization error: %s' % exc) + '__generateRequestOrResponseMsg: msg serialization error: ' + '%s' % exc) + raise error.StatusInformation( - errorIndication=errind.serializationError - ) + errorIndication=errind.serializationError) - debug.logger & debug.FLAG_SM and debug.logger('__generateRequestOrResponseMsg: %s outgoing msg: %s' % ( - securityLevel > 1 and "authenticated" or "plain", debug.hexdump(authenticatedWholeMsg))) + debug.logger & debug.FLAG_SM and debug.logger( + '__generateRequestOrResponseMsg: %s outgoing msg: ' + '%s' % (securityLevel > 1 and "authenticated" or + "plain", debug.hexdump(authenticatedWholeMsg))) # 3.1.9 return msg.getComponentByPosition(2), authenticatedWholeMsg @@ -542,88 +645,99 @@ class SnmpUSMSecurityModel(AbstractSecurityModel): globalData, maxMessageSize, securityModel, securityEngineID, securityName, securityLevel, scopedPDU): - return self.__generateRequestOrResponseMsg(snmpEngine, - messageProcessingModel, - globalData, - maxMessageSize, - securityModel, - securityEngineID, - securityName, - securityLevel, - scopedPDU, - None) + + return self._generateRequestOrResponseMsg( + snmpEngine, messageProcessingModel, globalData, + maxMessageSize, securityModel, securityEngineID, + securityName, securityLevel, scopedPDU, None) def generateResponseMsg(self, snmpEngine, messageProcessingModel, globalData, maxMessageSize, securityModel, securityEngineID, securityName, securityLevel, scopedPDU, securityStateReference): - return self.__generateRequestOrResponseMsg( + + return self._generateRequestOrResponseMsg( snmpEngine, messageProcessingModel, globalData, maxMessageSize, securityModel, securityEngineID, - securityName, securityLevel, scopedPDU, securityStateReference + securityName, securityLevel, scopedPDU, + securityStateReference ) # 3.2 def processIncomingMsg(self, snmpEngine, messageProcessingModel, maxMessageSize, securityParameters, securityModel, securityLevel, wholeMsg, msg): + mibBuilder = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder # 3.2.9 -- moved up here to be able to report # maxSizeResponseScopedPDU on error # (48 - maximum SNMPv3 header length) - maxSizeResponseScopedPDU = int(maxMessageSize) - len(securityParameters) - 48 + maxSizeResponseScopedPDU = (int(maxMessageSize) - + len(securityParameters) - 48) debug.logger & debug.FLAG_SM and debug.logger( - 'processIncomingMsg: securityParameters %s' % debug.hexdump(securityParameters)) + 'processIncomingMsg: securityParameters ' + '%s' % debug.hexdump(securityParameters)) # 3.2.1 securityParameters, rest = decoder.decode( - securityParameters, asn1Spec=self.__securityParametersSpec - ) + securityParameters, asn1Spec=self._securityParametersSpec) - debug.logger & debug.FLAG_SM and debug.logger('processIncomingMsg: %s' % (securityParameters.prettyPrint(),)) + debug.logger & debug.FLAG_SM and debug.logger( + 'processIncomingMsg: %s' % (securityParameters.prettyPrint(),)) if eoo.endOfOctets.isSameTypeWith(securityParameters): raise error.StatusInformation(errorIndication=errind.parseError) # 3.2.2 msgAuthoritativeEngineId = securityParameters.getComponentByPosition(0) + securityStateReference = self._cache.push( msgUserName=securityParameters.getComponentByPosition(3) ) debug.logger & debug.FLAG_SM and debug.logger( - 'processIncomingMsg: cache write securityStateReference %s by msgUserName %s' % ( - securityStateReference, securityParameters.getComponentByPosition(3))) + 'processIncomingMsg: cache write securityStateReference %s by ' + 'msgUserName %s' % (securityStateReference, + securityParameters.getComponentByPosition(3))) scopedPduData = msg.getComponentByPosition(3) # Used for error reporting - contextEngineId = mibBuilder.importSymbols('__SNMP-FRAMEWORK-MIB', 'snmpEngineID')[0].syntax + contextEngineId = mibBuilder.importSymbols( + '__SNMP-FRAMEWORK-MIB', 'snmpEngineID')[0].syntax contextName = null - snmpEngineID = mibBuilder.importSymbols('__SNMP-FRAMEWORK-MIB', 'snmpEngineID')[0].syntax + snmpEngineID = mibBuilder.importSymbols( + '__SNMP-FRAMEWORK-MIB', 'snmpEngineID')[0].syntax # 3.2.3 if (msgAuthoritativeEngineId != snmpEngineID and - msgAuthoritativeEngineId not in self.__timeline): + msgAuthoritativeEngineId not in self._timeline): + if (msgAuthoritativeEngineId and 4 < len(msgAuthoritativeEngineId) < 33): # 3.2.3a - cloned user when request was sent debug.logger & debug.FLAG_SM and debug.logger( - 'processIncomingMsg: non-synchronized securityEngineID %r' % (msgAuthoritativeEngineId,)) + 'processIncomingMsg: non-synchronized securityEngineID ' + '%r' % (msgAuthoritativeEngineId,)) + else: # 3.2.3b debug.logger & debug.FLAG_SM and debug.logger( 'processIncomingMsg: peer requested snmpEngineID discovery') + usmStatsUnknownEngineIDs, = mibBuilder.importSymbols( '__SNMP-USER-BASED-SM-MIB', 'usmStatsUnknownEngineIDs') usmStatsUnknownEngineIDs.syntax += 1 + debug.logger & debug.FLAG_SM and debug.logger( 'processIncomingMsg: null or malformed msgAuthoritativeEngineId') + pysnmpUsmDiscoverable, = mibBuilder.importSymbols( '__PYSNMP-USM-MIB', 'pysnmpUsmDiscoverable') + if pysnmpUsmDiscoverable.syntax: debug.logger & debug.FLAG_SM and debug.logger( 'processIncomingMsg: starting snmpEngineID discovery procedure') @@ -631,10 +745,11 @@ class SnmpUSMSecurityModel(AbstractSecurityModel): # Report original contextName if scopedPduData.getName() != 'plaintext': debug.logger & debug.FLAG_SM and debug.logger( - 'processIncomingMsg: scopedPduData not plaintext %s' % scopedPduData.prettyPrint()) + 'processIncomingMsg: scopedPduData not plaintext ' + '%s' % scopedPduData.prettyPrint()) + raise error.StatusInformation( - errorIndication=errind.unknownEngineID - ) + errorIndication=errind.unknownEngineID) # 7.2.6.a.1 scopedPdu = scopedPduData.getComponent() @@ -653,16 +768,18 @@ class SnmpUSMSecurityModel(AbstractSecurityModel): maxSizeResponseScopedPDU=maxSizeResponseScopedPDU ) else: - debug.logger & debug.FLAG_SM and debug.logger('processIncomingMsg: will not discover EngineID') + debug.logger & debug.FLAG_SM and debug.logger( + 'processIncomingMsg: will not discover EngineID') + # free securityStateReference XXX raise error.StatusInformation( - errorIndication=errind.unknownEngineID - ) + errorIndication=errind.unknownEngineID) msgUserName = securityParameters.getComponentByPosition(3) debug.logger & debug.FLAG_SM and debug.logger( - 'processIncomingMsg: read from securityParams msgAuthoritativeEngineId %r msgUserName %r' % ( + 'processIncomingMsg: read from securityParams ' + 'msgAuthoritativeEngineId %r msgUserName %r' % ( msgAuthoritativeEngineId, msgUserName)) if msgUserName: @@ -673,16 +790,17 @@ class SnmpUSMSecurityModel(AbstractSecurityModel): usmUserAuthProtocol, usmUserAuthKeyLocalized, usmUserPrivProtocol, - usmUserPrivKeyLocalized) = self.__getUserInfo( + usmUserPrivKeyLocalized) = self._getUserInfo( snmpEngine.msgAndPduDsp.mibInstrumController, msgAuthoritativeEngineId, msgUserName ) - debug.logger & debug.FLAG_SM and debug.logger('processIncomingMsg: read user info from LCD') + debug.logger & debug.FLAG_SM and debug.logger( + 'processIncomingMsg: read user info from LCD') except NoSuchInstanceError: debug.logger & debug.FLAG_SM and debug.logger( - 'processIncomingMsg: unknown securityEngineID %r msgUserName %r' % ( - msgAuthoritativeEngineId, msgUserName)) + 'processIncomingMsg: unknown securityEngineID %r ' + 'msgUserName %r' % (msgAuthoritativeEngineId, msgUserName)) usmStatsUnknownUserNames, = mibBuilder.importSymbols( '__SNMP-USER-BASED-SM-MIB', 'usmStatsUnknownUserNames') @@ -701,10 +819,15 @@ class SnmpUSMSecurityModel(AbstractSecurityModel): ) except PyAsn1Error as exc: - debug.logger & debug.FLAG_SM and debug.logger('processIncomingMsg: %s' % exc) - snmpInGenErrs, = mibBuilder.importSymbols('__SNMPv2-MIB', 'snmpInGenErrs') + debug.logger & debug.FLAG_SM and debug.logger( + 'processIncomingMsg: %s' % exc) + + snmpInGenErrs, = mibBuilder.importSymbols( + '__SNMPv2-MIB', 'snmpInGenErrs') snmpInGenErrs.syntax += 1 + raise error.StatusInformation(errorIndication=errind.invalidMsg) + else: # empty username used for engineID discovery usmUserName = usmUserSecurityName = null @@ -713,11 +836,14 @@ class SnmpUSMSecurityModel(AbstractSecurityModel): usmUserAuthKeyLocalized = usmUserPrivKeyLocalized = None debug.logger & debug.FLAG_SM and debug.logger( - 'processIncomingMsg: now have usmUserName %r usmUserSecurityName %r usmUserAuthProtocol %r usmUserPrivProtocol %r for msgUserName %r' % ( - usmUserName, usmUserSecurityName, usmUserAuthProtocol, usmUserPrivProtocol, msgUserName)) + 'processIncomingMsg: now have usmUserName %r usmUserSecurityName ' + '%r usmUserAuthProtocol %r usmUserPrivProtocol %r for msgUserName ' + '%r' % (usmUserName, usmUserSecurityName, usmUserAuthProtocol, + usmUserPrivProtocol, msgUserName)) # 3.2.11 (moved up here to let Reports be authenticated & encrypted) self._cache.pop(securityStateReference) + securityStateReference = self._cache.push( msgUserName=securityParameters.getComponentByPosition(3), usmUserSecurityName=usmUserSecurityName, @@ -743,21 +869,24 @@ class SnmpUSMSecurityModel(AbstractSecurityModel): privKey=usmUserPrivKeyLocalized) ) snmpEngine.observer.clearExecutionContext( - snmpEngine, 'rfc3414.processIncomingMsg' - ) + snmpEngine, 'rfc3414.processIncomingMsg') # 3.2.5 if msgAuthoritativeEngineId == snmpEngineID: # Authoritative SNMP engine: make sure securityLevel is sufficient badSecIndication = None + if securityLevel == 3: if usmUserAuthProtocol == noauth.NoAuth.SERVICE_ID: badSecIndication = 'authPriv wanted while auth not expected' + if usmUserPrivProtocol == nopriv.NoPriv.SERVICE_ID: badSecIndication = 'authPriv wanted while priv not expected' + elif securityLevel == 2: if usmUserAuthProtocol == noauth.NoAuth.SERVICE_ID: badSecIndication = 'authNoPriv wanted while auth not expected' + if usmUserPrivProtocol != nopriv.NoPriv.SERVICE_ID: # 4 (discovery phase always uses authenticated messages) if msgAuthoritativeEngineBoots or msgAuthoritativeEngineTime: @@ -766,15 +895,20 @@ class SnmpUSMSecurityModel(AbstractSecurityModel): elif securityLevel == 1: if usmUserAuthProtocol != noauth.NoAuth.SERVICE_ID: badSecIndication = 'noAuthNoPriv wanted while auth expected' + if usmUserPrivProtocol != nopriv.NoPriv.SERVICE_ID: badSecIndication = 'noAuthNoPriv wanted while priv expected' + if badSecIndication: usmStatsUnsupportedSecLevels, = mibBuilder.importSymbols( '__SNMP-USER-BASED-SM-MIB', 'usmStatsUnsupportedSecLevels') + usmStatsUnsupportedSecLevels.syntax += 1 + debug.logger & debug.FLAG_SM and debug.logger( - 'processIncomingMsg: reporting inappropriate security level for user %s: %s' % ( - msgUserName, badSecIndication)) + 'processIncomingMsg: reporting inappropriate security ' + 'level for user %s: %s' % (msgUserName, badSecIndication)) + raise error.StatusInformation( errorIndication=errind.unsupportedSecurityLevel, oid=usmStatsUnsupportedSecLevels.name, @@ -791,22 +925,23 @@ class SnmpUSMSecurityModel(AbstractSecurityModel): if securityLevel == 3 or securityLevel == 2: if usmUserAuthProtocol in self.AUTH_SERVICES: authHandler = self.AUTH_SERVICES[usmUserAuthProtocol] + else: raise error.StatusInformation( - errorIndication=errind.authenticationFailure - ) + errorIndication=errind.authenticationFailure) try: authHandler.authenticateIncomingMsg( usmUserAuthKeyLocalized, securityParameters.getComponentByPosition(4), - wholeMsg - ) + wholeMsg) except error.StatusInformation: usmStatsWrongDigests, = mibBuilder.importSymbols( '__SNMP-USER-BASED-SM-MIB', 'usmStatsWrongDigests') + usmStatsWrongDigests.syntax += 1 + raise error.StatusInformation( errorIndication=errind.authenticationFailure, oid=usmStatsWrongDigests.name, @@ -819,62 +954,81 @@ class SnmpUSMSecurityModel(AbstractSecurityModel): maxSizeResponseScopedPDU=maxSizeResponseScopedPDU ) - debug.logger & debug.FLAG_SM and debug.logger('processIncomingMsg: incoming msg authenticated') + debug.logger & debug.FLAG_SM and debug.logger( + 'processIncomingMsg: incoming msg authenticated') # synchronize time with authed peer - self.__timeline[msgAuthoritativeEngineId] = ( + self._timeline[msgAuthoritativeEngineId] = ( securityParameters.getComponentByPosition(1), securityParameters.getComponentByPosition(2), securityParameters.getComponentByPosition(2), int(time.time()) ) - timerResolution = snmpEngine.transportDispatcher is None and 1.0 or snmpEngine.transportDispatcher.getTimerResolution() - expireAt = int(self.__expirationTimer + 300 / timerResolution) - if expireAt not in self.__timelineExpQueue: - self.__timelineExpQueue[expireAt] = [] - self.__timelineExpQueue[expireAt].append(msgAuthoritativeEngineId) + timerResolution = (snmpEngine.transportDispatcher is None and 1.0 or + snmpEngine.transportDispatcher.getTimerResolution()) + + expireAt = int(self._expirationTimer + 300 / timerResolution) + + if expireAt not in self._timelineExpQueue: + self._timelineExpQueue[expireAt] = [] + + self._timelineExpQueue[expireAt].append(msgAuthoritativeEngineId) debug.logger & debug.FLAG_SM and debug.logger( - 'processIncomingMsg: store timeline for securityEngineID %r' % (msgAuthoritativeEngineId,)) + 'processIncomingMsg: store timeline for securityEngineID ' + '%r' % (msgAuthoritativeEngineId,)) # 3.2.7 if securityLevel == 3 or securityLevel == 2: if msgAuthoritativeEngineId == snmpEngineID: # Authoritative SNMP engine: use local notion (SF bug #1649032) - (snmpEngineBoots, - snmpEngineTime) = mibBuilder.importSymbols( + snmpEngineBoots, snmpEngineTime = mibBuilder.importSymbols( '__SNMP-FRAMEWORK-MIB', 'snmpEngineBoots', 'snmpEngineTime') + snmpEngineBoots = snmpEngineBoots.syntax snmpEngineTime = snmpEngineTime.syntax.clone() + idleTime = 0 + debug.logger & debug.FLAG_SM and debug.logger( - 'processIncomingMsg: read snmpEngineBoots (%s), snmpEngineTime (%s) from LCD' % ( + 'processIncomingMsg: read snmpEngineBoots (%s), ' + 'snmpEngineTime (%s) from LCD' % ( snmpEngineBoots, snmpEngineTime)) + else: # Non-authoritative SNMP engine: use cached estimates - if msgAuthoritativeEngineId in self.__timeline: + if msgAuthoritativeEngineId in self._timeline: (snmpEngineBoots, snmpEngineTime, latestReceivedEngineTime, - latestUpdateTimestamp) = self.__timeline[ + latestUpdateTimestamp) = self._timeline[ msgAuthoritativeEngineId ] + # time passed since last talk with this SNMP engine idleTime = int(time.time()) - latestUpdateTimestamp + debug.logger & debug.FLAG_SM and debug.logger( - 'processIncomingMsg: read timeline snmpEngineBoots %s snmpEngineTime %s for msgAuthoritativeEngineId %r, idle time %s secs' % ( - snmpEngineBoots, snmpEngineTime, msgAuthoritativeEngineId, idleTime)) + 'processIncomingMsg: read timeline snmpEngineBoots %s ' + 'snmpEngineTime %s for msgAuthoritativeEngineId %r, ' + 'idle time %s secs' % (snmpEngineBoots, snmpEngineTime, + msgAuthoritativeEngineId, + idleTime)) else: raise error.ProtocolError('Peer SNMP engine info missing') # 3.2.7a if msgAuthoritativeEngineId == snmpEngineID: + if (snmpEngineBoots == 2147483647 or snmpEngineBoots != msgAuthoritativeEngineBoots or - abs(idleTime + int(snmpEngineTime) - int(msgAuthoritativeEngineTime)) > 150): + (abs(idleTime + int(snmpEngineTime) + - int(msgAuthoritativeEngineTime))) > 150): + usmStatsNotInTimeWindows, = mibBuilder.importSymbols( '__SNMP-USER-BASED-SM-MIB', 'usmStatsNotInTimeWindows') usmStatsNotInTimeWindows.syntax += 1 + raise error.StatusInformation( errorIndication=errind.notInTimeWindow, oid=usmStatsNotInTimeWindows.name, @@ -886,6 +1040,7 @@ class SnmpUSMSecurityModel(AbstractSecurityModel): msgUserName=msgUserName, maxSizeResponseScopedPDU=maxSizeResponseScopedPDU ) + # 3.2.7b else: # 3.2.7b.1 @@ -893,48 +1048,60 @@ class SnmpUSMSecurityModel(AbstractSecurityModel): if (msgAuthoritativeEngineBoots > snmpEngineBoots or msgAuthoritativeEngineBoots == snmpEngineBoots and msgAuthoritativeEngineTime > latestReceivedEngineTime): - self.__timeline[msgAuthoritativeEngineId] = ( + self._timeline[msgAuthoritativeEngineId] = ( msgAuthoritativeEngineBoots, msgAuthoritativeEngineTime, msgAuthoritativeEngineTime, int(time.time()) ) - timerResolution = snmpEngine.transportDispatcher is None and 1.0 or snmpEngine.transportDispatcher.getTimerResolution() - expireAt = int(self.__expirationTimer + 300 / timerResolution) - if expireAt not in self.__timelineExpQueue: - self.__timelineExpQueue[expireAt] = [] - self.__timelineExpQueue[expireAt].append(msgAuthoritativeEngineId) + timerResolution = ( + snmpEngine.transportDispatcher is None and 1.0 or + snmpEngine.transportDispatcher.getTimerResolution()) + + expireAt = int(self._expirationTimer + 300 / timerResolution) + + if expireAt not in self._timelineExpQueue: + self._timelineExpQueue[expireAt] = [] + + self._timelineExpQueue[expireAt].append(msgAuthoritativeEngineId) debug.logger & debug.FLAG_SM and debug.logger( - 'processIncomingMsg: stored timeline msgAuthoritativeEngineBoots %s msgAuthoritativeEngineTime %s for msgAuthoritativeEngineId %r' % ( - msgAuthoritativeEngineBoots, msgAuthoritativeEngineTime, msgAuthoritativeEngineId)) + 'processIncomingMsg: stored timeline ' + 'msgAuthoritativeEngineBoots %s ' + 'msgAuthoritativeEngineTime %s for ' + 'msgAuthoritativeEngineId ' + '%r' % (msgAuthoritativeEngineBoots, + msgAuthoritativeEngineTime, + msgAuthoritativeEngineId)) # 3.2.7b.2 if (snmpEngineBoots == 2147483647 or msgAuthoritativeEngineBoots < snmpEngineBoots or msgAuthoritativeEngineBoots == snmpEngineBoots and - abs(idleTime + int(snmpEngineTime) - int(msgAuthoritativeEngineTime)) > 150): + (abs(idleTime + int(snmpEngineTime) + - int(msgAuthoritativeEngineTime))) > 150): + raise error.StatusInformation( errorIndication=errind.notInTimeWindow, - msgUserName=msgUserName - ) + msgUserName=msgUserName) # 3.2.8a if securityLevel == 3: if usmUserPrivProtocol in self.PRIV_SERVICES: privHandler = self.PRIV_SERVICES[usmUserPrivProtocol] + else: raise error.StatusInformation( errorIndication=errind.decryptionError, - msgUserName=msgUserName - ) + msgUserName=msgUserName) + encryptedPDU = scopedPduData.getComponentByPosition(1) + if encryptedPDU is None: # no ciphertext raise error.StatusInformation( errorIndication=errind.decryptionError, - msgUserName=msgUserName - ) + msgUserName=msgUserName) try: decryptedData = privHandler.decryptData( @@ -942,15 +1109,17 @@ class SnmpUSMSecurityModel(AbstractSecurityModel): (securityParameters.getComponentByPosition(1), securityParameters.getComponentByPosition(2), securityParameters.getComponentByPosition(5)), - encryptedPDU - ) + encryptedPDU) + debug.logger & debug.FLAG_SM and debug.logger( - 'processIncomingMsg: PDU deciphered into %s' % debug.hexdump(decryptedData)) + 'processIncomingMsg: PDU deciphered into ' + '%s' % debug.hexdump(decryptedData)) except error.StatusInformation: usmStatsDecryptionErrors, = mibBuilder.importSymbols( '__SNMP-USER-BASED-SM-MIB', 'usmStatsDecryptionErrors') usmStatsDecryptionErrors.syntax += 1 + raise error.StatusInformation( errorIndication=errind.decryptionError, oid=usmStatsDecryptionErrors.name, @@ -962,31 +1131,32 @@ class SnmpUSMSecurityModel(AbstractSecurityModel): msgUserName=msgUserName, maxSizeResponseScopedPDU=maxSizeResponseScopedPDU ) - scopedPduSpec = scopedPduData.setComponentByPosition(0).getComponentByPosition(0) + scopedPduSpec = scopedPduData.setComponentByPosition( + 0).getComponentByPosition(0) + try: scopedPDU, rest = decoder.decode(decryptedData, asn1Spec=scopedPduSpec) except PyAsn1Error as exc: debug.logger & debug.FLAG_SM and debug.logger( 'processIncomingMsg: scopedPDU decoder failed %s' % exc) + raise error.StatusInformation( errorIndication=errind.decryptionError, - msgUserName=msgUserName - ) + msgUserName=msgUserName) if eoo.endOfOctets.isSameTypeWith(scopedPDU): raise error.StatusInformation( errorIndication=errind.decryptionError, - msgUserName=msgUserName - ) + msgUserName=msgUserName) + else: # 3.2.8b scopedPDU = scopedPduData.getComponentByPosition(0) if scopedPDU is None: # no plaintext raise error.StatusInformation( errorIndication=errind.decryptionError, - msgUserName=msgUserName - ) + msgUserName=msgUserName) debug.logger & debug.FLAG_SM and debug.logger( 'processIncomingMsg: scopedPDU decoded %s' % scopedPDU.prettyPrint()) @@ -995,7 +1165,8 @@ class SnmpUSMSecurityModel(AbstractSecurityModel): securityName = usmUserSecurityName debug.logger & debug.FLAG_SM and debug.logger( - 'processIncomingMsg: cached msgUserName %s info by securityStateReference %s' % ( + 'processIncomingMsg: cached msgUserName %s info by ' + 'securityStateReference %s' % ( msgUserName, securityStateReference)) # Delayed to include details @@ -1003,6 +1174,7 @@ class SnmpUSMSecurityModel(AbstractSecurityModel): usmStatsUnknownUserNames, = mibBuilder.importSymbols( '__SNMP-USER-BASED-SM-MIB', 'usmStatsUnknownUserNames') usmStatsUnknownUserNames.syntax += 1 + raise error.StatusInformation( errorIndication=errind.unknownSecurityName, oid=usmStatsUnknownUserNames.name, @@ -1021,14 +1193,18 @@ class SnmpUSMSecurityModel(AbstractSecurityModel): return (msgAuthoritativeEngineId, securityName, scopedPDU, maxSizeResponseScopedPDU, securityStateReference) - def __expireTimelineInfo(self): - if self.__expirationTimer in self.__timelineExpQueue: - for engineIdKey in self.__timelineExpQueue[self.__expirationTimer]: - if engineIdKey in self.__timeline: - del self.__timeline[engineIdKey] - debug.logger & debug.FLAG_SM and debug.logger('__expireTimelineInfo: expiring %r' % (engineIdKey,)) - del self.__timelineExpQueue[self.__expirationTimer] - self.__expirationTimer += 1 + def _expireTimelineInfo(self): + if self._expirationTimer in self._timelineExpQueue: + + for engineIdKey in self._timelineExpQueue[self._expirationTimer]: + if engineIdKey in self._timeline: + del self._timeline[engineIdKey] + debug.logger & debug.FLAG_SM and debug.logger( + '__expireTimelineInfo: expiring %r' % (engineIdKey,)) + + del self._timelineExpQueue[self._expirationTimer] + + self._expirationTimer += 1 def receiveTimerTick(self, snmpEngine, timeNow): - self.__expireTimelineInfo() + self._expireTimelineInfo() diff --git a/pysnmp/proto/secmod/rfc3826/priv/aes.py b/pysnmp/proto/secmod/rfc3826/priv/aes.py index 56f69280..d000f8ed 100644 --- a/pysnmp/proto/secmod/rfc3826/priv/aes.py +++ b/pysnmp/proto/secmod/rfc3826/priv/aes.py @@ -38,66 +38,85 @@ class Aes(base.AbstractEncryptionService): local_int = random.randrange(0, 0xffffffffffffffff) # 3.1.2.1 - def __getEncryptionKey(self, privKey, snmpEngineBoots, snmpEngineTime): - salt = [self.local_int >> 56 & 0xff, - self.local_int >> 48 & 0xff, - self.local_int >> 40 & 0xff, - self.local_int >> 32 & 0xff, - self.local_int >> 24 & 0xff, - self.local_int >> 16 & 0xff, - self.local_int >> 8 & 0xff, - self.local_int & 0xff] + def _getEncryptionKey(self, privKey, snmpEngineBoots, snmpEngineTime): + salt = [ + self.local_int >> 56 & 0xff, + self.local_int >> 48 & 0xff, + self.local_int >> 40 & 0xff, + self.local_int >> 32 & 0xff, + self.local_int >> 24 & 0xff, + self.local_int >> 16 & 0xff, + self.local_int >> 8 & 0xff, + self.local_int & 0xff + ] if self.local_int == 0xffffffffffffffff: self.local_int = 0 + else: self.local_int += 1 - return self.__getDecryptionKey(privKey, snmpEngineBoots, snmpEngineTime, salt) + ( - univ.OctetString(salt).asOctets(),) + key, iv = self._getDecryptionKey( + privKey, snmpEngineBoots, snmpEngineTime, salt) + + return key, iv, univ.OctetString(salt).asOctets() + + def _getDecryptionKey(self, privKey, snmpEngineBoots, + snmpEngineTime, salt): - def __getDecryptionKey(self, privKey, snmpEngineBoots, - snmpEngineTime, salt): snmpEngineBoots, snmpEngineTime, salt = ( - int(snmpEngineBoots), int(snmpEngineTime), salt - ) + int(snmpEngineBoots), int(snmpEngineTime), salt) + + iv = [ + snmpEngineBoots >> 24 & 0xff, + snmpEngineBoots >> 16 & 0xff, + snmpEngineBoots >> 8 & 0xff, + snmpEngineBoots & 0xff, + snmpEngineTime >> 24 & 0xff, + snmpEngineTime >> 16 & 0xff, + snmpEngineTime >> 8 & 0xff, + snmpEngineTime & 0xff + ] - iv = [snmpEngineBoots >> 24 & 0xff, - snmpEngineBoots >> 16 & 0xff, - snmpEngineBoots >> 8 & 0xff, - snmpEngineBoots & 0xff, - snmpEngineTime >> 24 & 0xff, - snmpEngineTime >> 16 & 0xff, - snmpEngineTime >> 8 & 0xff, - snmpEngineTime & 0xff] + salt + iv += salt - return privKey[:self.KEY_SIZE].asOctets(), univ.OctetString(iv).asOctets() + key = privKey[:self.KEY_SIZE].asOctets() + iv = univ.OctetString(iv).asOctets() + + return key, iv def hashPassphrase(self, authProtocol, privKey): if authProtocol == hmacmd5.HmacMd5.SERVICE_ID: hashAlgo = md5 + elif authProtocol == hmacsha.HmacSha.SERVICE_ID: hashAlgo = sha1 + elif authProtocol in hmacsha2.HmacSha2.HASH_ALGORITHM: hashAlgo = hmacsha2.HmacSha2.HASH_ALGORITHM[authProtocol] + else: raise error.ProtocolError( - 'Unknown auth protocol %s' % (authProtocol,) - ) + 'Unknown auth protocol %s' % (authProtocol,)) + return localkey.hashPassphrase(privKey, hashAlgo) def localizeKey(self, authProtocol, privKey, snmpEngineID): if authProtocol == hmacmd5.HmacMd5.SERVICE_ID: hashAlgo = md5 + elif authProtocol == hmacsha.HmacSha.SERVICE_ID: hashAlgo = sha1 + elif authProtocol in hmacsha2.HmacSha2.HASH_ALGORITHM: hashAlgo = hmacsha2.HmacSha2.HASH_ALGORITHM[authProtocol] + else: raise error.ProtocolError( - 'Unknown auth protocol %s' % (authProtocol,) - ) + 'Unknown auth protocol %s' % (authProtocol,)) + localPrivKey = localkey.localizeKey(privKey, snmpEngineID, hashAlgo) + return localPrivKey[:self.KEY_SIZE] # 3.2.4.1 @@ -105,21 +124,20 @@ class Aes(base.AbstractEncryptionService): snmpEngineBoots, snmpEngineTime, salt = privParameters # 3.3.1.1 - aesKey, iv, salt = self.__getEncryptionKey( - encryptKey, snmpEngineBoots, snmpEngineTime - ) + aesKey, iv, salt = self._getEncryptionKey( + encryptKey, snmpEngineBoots, snmpEngineTime) # 3.3.1.3 # PyCrypto seems to require padding - dataToEncrypt = dataToEncrypt + univ.OctetString((0,) * (16 - len(dataToEncrypt) % 16)).asOctets() + padding = univ.OctetString((0,) * (16 - len(dataToEncrypt) % 16)) + dataToEncrypt += padding try: - ciphertext = aes.encrypt(dataToEncrypt, aesKey, iv) + ciphertext = aes.encrypt(dataToEncrypt.asOctets(), aesKey, iv) except PysnmpCryptoError: raise error.StatusInformation( - errorIndication=errind.unsupportedPrivProtocol - ) + errorIndication=errind.unsupportedPrivProtocol) # 3.3.1.4 return univ.OctetString(ciphertext), univ.OctetString(salt) @@ -131,16 +149,15 @@ class Aes(base.AbstractEncryptionService): # 3.3.2.1 if len(salt) != 8: raise error.StatusInformation( - errorIndication=errind.decryptionError - ) + errorIndication=errind.decryptionError) # 3.3.2.3 - aesKey, iv = self.__getDecryptionKey( - decryptKey, snmpEngineBoots, snmpEngineTime, salt - ) + aesKey, iv = self._getDecryptionKey( + decryptKey, snmpEngineBoots, snmpEngineTime, salt) # PyCrypto seems to require padding - encryptedData = encryptedData + univ.OctetString((0,) * (16 - len(encryptedData) % 16)).asOctets() + padding = univ.OctetString((0,) * (16 - len(encryptedData) % 16)) + encryptedData += padding try: # 3.3.2.4-6 @@ -148,5 +165,4 @@ class Aes(base.AbstractEncryptionService): except PysnmpCryptoError: raise error.StatusInformation( - errorIndication=errind.unsupportedPrivProtocol - ) + errorIndication=errind.unsupportedPrivProtocol) diff --git a/pysnmp/proto/secmod/rfc7860/auth/hmacsha2.py b/pysnmp/proto/secmod/rfc7860/auth/hmacsha2.py index 11dd79b3..267db682 100644 --- a/pysnmp/proto/secmod/rfc7860/auth/hmacsha2.py +++ b/pysnmp/proto/secmod/rfc7860/auth/hmacsha2.py @@ -19,19 +19,26 @@ except ImportError: sha224 = sha256 = sha384 = sha512 = NotAvailable() -from pyasn1.type import univ from pysnmp.proto.secmod.rfc3414.auth import base from pysnmp.proto.secmod.rfc3414 import localkey from pysnmp.proto import errind, error +from pyasn1.type import univ # 7.2.4 class HmacSha2(base.AbstractAuthenticationService): - SHA224_SERVICE_ID = (1, 3, 6, 1, 6, 3, 10, 1, 1, 4) # usmHMAC128SHA224AuthProtocol - SHA256_SERVICE_ID = (1, 3, 6, 1, 6, 3, 10, 1, 1, 5) # usmHMAC192SHA256AuthProtocol - SHA384_SERVICE_ID = (1, 3, 6, 1, 6, 3, 10, 1, 1, 6) # usmHMAC256SHA384AuthProtocol - SHA512_SERVICE_ID = (1, 3, 6, 1, 6, 3, 10, 1, 1, 7) # usmHMAC384SHA512AuthProtocol + # usmHMAC128SHA224AuthProtocol + SHA224_SERVICE_ID = (1, 3, 6, 1, 6, 3, 10, 1, 1, 4) + + # usmHMAC192SHA256AuthProtocol + SHA256_SERVICE_ID = (1, 3, 6, 1, 6, 3, 10, 1, 1, 5) + + # usmHMAC256SHA384AuthProtocol + SHA384_SERVICE_ID = (1, 3, 6, 1, 6, 3, 10, 1, 1, 6) + + # usmHMAC384SHA512AuthProtocol + SHA512_SERVICE_ID = (1, 3, 6, 1, 6, 3, 10, 1, 1, 7) KEY_LENGTH = { SHA224_SERVICE_ID: 28, @@ -59,39 +66,43 @@ class HmacSha2(base.AbstractAuthenticationService): def __init__(self, oid): if oid not in self.HASH_ALGORITHM: - raise error.ProtocolError('No SHA-2 authentication algorithm %s available' % (oid,)) - self.__hashAlgo = self.HASH_ALGORITHM[oid] - self.__digestLength = self.DIGEST_LENGTH[oid] - self.__placeHolder = univ.OctetString((0,) * self.__digestLength).asOctets() + raise error.ProtocolError( + 'No SHA-2 authentication algorithm %s available' % (oid,)) + + self._hashAlgo = self.HASH_ALGORITHM[oid] + self._digestLength = self.DIGEST_LENGTH[oid] + self._placeHolder = univ.OctetString( + (0,) * self._digestLength).asOctets() def hashPassphrase(self, authKey): - return localkey.hashPassphrase(authKey, self.__hashAlgo) + return localkey.hashPassphrase(authKey, self._hashAlgo) def localizeKey(self, authKey, snmpEngineID): - return localkey.localizeKey(authKey, snmpEngineID, self.__hashAlgo) + return localkey.localizeKey(authKey, snmpEngineID, self._hashAlgo) @property def digestLength(self): - return self.__digestLength + return self._digestLength # 7.3.1 def authenticateOutgoingMsg(self, authKey, wholeMsg): # 7.3.1.1 - location = wholeMsg.find(self.__placeHolder) + location = wholeMsg.find(self._placeHolder) if location == -1: - raise error.ProtocolError('Can\'t locate digest placeholder') + raise error.ProtocolError('Cannot locate digest placeholder') + wholeHead = wholeMsg[:location] - wholeTail = wholeMsg[location + self.__digestLength:] + wholeTail = wholeMsg[location + self._digestLength:] # 7.3.1.2, 7.3.1.3 try: - mac = hmac.new(authKey.asOctets(), wholeMsg, self.__hashAlgo) + mac = hmac.new(authKey.asOctets(), wholeMsg, self._hashAlgo) except errind.ErrorIndication as exc: raise error.StatusInformation(errorIndication=exc) # 7.3.1.4 - mac = mac.digest()[:self.__digestLength] + mac = mac.digest()[:self._digestLength] # 7.3.1.5 & 6 return wholeHead + mac + wholeTail @@ -99,33 +110,32 @@ class HmacSha2(base.AbstractAuthenticationService): # 7.3.2 def authenticateIncomingMsg(self, authKey, authParameters, wholeMsg): # 7.3.2.1 & 2 - if len(authParameters) != self.__digestLength: + if len(authParameters) != self._digestLength: raise error.StatusInformation( - errorIndication=errind.authenticationError - ) + errorIndication=errind.authenticationError) # 7.3.2.3 location = wholeMsg.find(authParameters.asOctets()) if location == -1: - raise error.ProtocolError('Can\'t locate digest in wholeMsg') + raise error.ProtocolError('Cannot locate digest in wholeMsg') + wholeHead = wholeMsg[:location] - wholeTail = wholeMsg[location + self.__digestLength:] - authenticatedWholeMsg = wholeHead + self.__placeHolder + wholeTail + wholeTail = wholeMsg[location + self._digestLength:] + authenticatedWholeMsg = wholeHead + self._placeHolder + wholeTail # 7.3.2.4 try: - mac = hmac.new(authKey.asOctets(), authenticatedWholeMsg, self.__hashAlgo) + mac = hmac.new(authKey.asOctets(), authenticatedWholeMsg, self._hashAlgo) except errind.ErrorIndication as exc: raise error.StatusInformation(errorIndication=exc) # 7.3.2.5 - mac = mac.digest()[:self.__digestLength] + mac = mac.digest()[:self._digestLength] # 7.3.2.6 if mac != authParameters: raise error.StatusInformation( - errorIndication=errind.authenticationFailure - ) + errorIndication=errind.authenticationFailure) return authenticatedWholeMsg |