summaryrefslogtreecommitdiff
path: root/pysnmp/proto/secmod
diff options
context:
space:
mode:
Diffstat (limited to 'pysnmp/proto/secmod')
-rw-r--r--pysnmp/proto/secmod/cache.py12
-rw-r--r--pysnmp/proto/secmod/eso/priv/aes192.py6
-rw-r--r--pysnmp/proto/secmod/eso/priv/aes256.py6
-rw-r--r--pysnmp/proto/secmod/eso/priv/aesbase.py22
-rw-r--r--pysnmp/proto/secmod/eso/priv/des3.py51
-rw-r--r--pysnmp/proto/secmod/rfc2576.py349
-rw-r--r--pysnmp/proto/secmod/rfc3414/auth/hmacmd5.py38
-rw-r--r--pysnmp/proto/secmod/rfc3414/auth/hmacsha.py37
-rw-r--r--pysnmp/proto/secmod/rfc3414/localkey.py15
-rw-r--r--pysnmp/proto/secmod/rfc3414/priv/des.py53
-rw-r--r--pysnmp/proto/secmod/rfc3414/service.py732
-rw-r--r--pysnmp/proto/secmod/rfc3826/priv/aes.py102
-rw-r--r--pysnmp/proto/secmod/rfc7860/auth/hmacsha2.py64
13 files changed, 892 insertions, 595 deletions
diff --git a/pysnmp/proto/secmod/cache.py b/pysnmp/proto/secmod/cache.py
index f5843c50..1859bbad 100644
--- a/pysnmp/proto/secmod/cache.py
+++ b/pysnmp/proto/secmod/cache.py
@@ -21,10 +21,8 @@ class Cache(object):
def pop(self, stateReference):
if stateReference in self.__cacheEntries:
- securityData = self.__cacheEntries[stateReference]
- else:
- raise error.ProtocolError(
- 'Cache miss for stateReference=%s at %s' % (stateReference, self)
- )
- del self.__cacheEntries[stateReference]
- return securityData
+ return self.__cacheEntries.pop(stateReference)
+
+ raise error.ProtocolError(
+ 'Cache miss for stateReference=%s at '
+ '%s' % (stateReference, self))
diff --git a/pysnmp/proto/secmod/eso/priv/aes192.py b/pysnmp/proto/secmod/eso/priv/aes192.py
index 7df0a0f5..cbbc748e 100644
--- a/pysnmp/proto/secmod/eso/priv/aes192.py
+++ b/pysnmp/proto/secmod/eso/priv/aes192.py
@@ -14,7 +14,8 @@ class AesBlumenthal192(aesbase.AbstractAesBlumenthal):
http://tools.ietf.org/html/draft-blumenthal-aes-usm-04
"""
- SERVICE_ID = (1, 3, 6, 1, 4, 1, 9, 12, 6, 1, 1) # cusmAESCfb192PrivProtocol
+ # cusmAESCfb192PrivProtocol
+ SERVICE_ID = (1, 3, 6, 1, 4, 1, 9, 12, 6, 1, 1)
KEY_SIZE = 24
@@ -29,5 +30,6 @@ class Aes192(aesbase.AbstractAesReeder):
Known to be used by many vendors including Cisco and others.
"""
- SERVICE_ID = (1, 3, 6, 1, 4, 1, 9, 12, 6, 1, 101) # cusmAESCfb192PrivProtocol (non-standard OID)
+ # cusmAESCfb192PrivProtocol (non-standard OID)
+ SERVICE_ID = (1, 3, 6, 1, 4, 1, 9, 12, 6, 1, 101)
KEY_SIZE = 24
diff --git a/pysnmp/proto/secmod/eso/priv/aes256.py b/pysnmp/proto/secmod/eso/priv/aes256.py
index 94a255ff..7ca42504 100644
--- a/pysnmp/proto/secmod/eso/priv/aes256.py
+++ b/pysnmp/proto/secmod/eso/priv/aes256.py
@@ -12,7 +12,8 @@ class AesBlumenthal256(aesbase.AbstractAesBlumenthal):
http://tools.ietf.org/html/draft-blumenthal-aes-usm-04
"""
- SERVICE_ID = (1, 3, 6, 1, 4, 1, 9, 12, 6, 1, 2) # cusmAESCfb256PrivProtocol
+ # cusmAESCfb256PrivProtocol
+ SERVICE_ID = (1, 3, 6, 1, 4, 1, 9, 12, 6, 1, 2)
KEY_SIZE = 32
@@ -27,5 +28,6 @@ class Aes256(aesbase.AbstractAesReeder):
Known to be used by many vendors including Cisco and others.
"""
- SERVICE_ID = (1, 3, 6, 1, 4, 1, 9, 12, 6, 1, 102) # cusmAESCfb256PrivProtocol (non-standard OID)
+ # cusmAESCfb256PrivProtocol (non-standard OID)
+ SERVICE_ID = (1, 3, 6, 1, 4, 1, 9, 12, 6, 1, 102)
KEY_SIZE = 32
diff --git a/pysnmp/proto/secmod/eso/priv/aesbase.py b/pysnmp/proto/secmod/eso/priv/aesbase.py
index 0c2e5d17..3af30df0 100644
--- a/pysnmp/proto/secmod/eso/priv/aesbase.py
+++ b/pysnmp/proto/secmod/eso/priv/aesbase.py
@@ -24,19 +24,25 @@ class AbstractAesBlumenthal(aes.Aes):
def localizeKey(self, authProtocol, privKey, snmpEngineID):
if authProtocol == hmacmd5.HmacMd5.SERVICE_ID:
hashAlgo = md5
+
elif authProtocol == hmacsha.HmacSha.SERVICE_ID:
hashAlgo = sha1
+
elif authProtocol in hmacsha2.HmacSha2.HASH_ALGORITHM:
hashAlgo = hmacsha2.HmacSha2.HASH_ALGORITHM[authProtocol]
+
else:
raise error.ProtocolError(
- 'Unknown auth protocol %s' % (authProtocol,)
- )
+ 'Unknown auth protocol %s' % (authProtocol,))
- localPrivKey = localkey.localizeKey(privKey, snmpEngineID, hashAlgo)
+ localPrivKey = localkey.localizeKey(
+ privKey, snmpEngineID, hashAlgo)
- # now extend this key if too short by repeating steps that includes the hashPassphrase step
- for count in range(1, int(ceil(self.KEY_SIZE * 1.0 / len(localPrivKey)))):
+ # now extend this key if too short by repeating steps that includes
+ # the hashPassphrase step
+ rounds = int(ceil(self.KEY_SIZE * 1.0 / len(localPrivKey)))
+
+ for _ in range(1, rounds):
localPrivKey += hashAlgo(localPrivKey).digest()
return localPrivKey[:self.KEY_SIZE]
@@ -64,14 +70,16 @@ class AbstractAesReeder(aes.Aes):
def localizeKey(self, authProtocol, privKey, snmpEngineID):
if authProtocol == hmacmd5.HmacMd5.SERVICE_ID:
hashAlgo = md5
+
elif authProtocol == hmacsha.HmacSha.SERVICE_ID:
hashAlgo = sha1
+
elif authProtocol in hmacsha2.HmacSha2.HASH_ALGORITHM:
hashAlgo = hmacsha2.HmacSha2.HASH_ALGORITHM[authProtocol]
+
else:
raise error.ProtocolError(
- 'Unknown auth protocol %s' % (authProtocol,)
- )
+ 'Unknown auth protocol %s' % (authProtocol,))
localPrivKey = localkey.localizeKey(privKey, snmpEngineID, hashAlgo)
diff --git a/pysnmp/proto/secmod/eso/priv/des3.py b/pysnmp/proto/secmod/eso/priv/des3.py
index da84475b..8d193084 100644
--- a/pysnmp/proto/secmod/eso/priv/des3.py
+++ b/pysnmp/proto/secmod/eso/priv/des3.py
@@ -34,7 +34,8 @@ class Des3(base.AbstractEncryptionService):
https://tools.ietf.org/html/draft-reeder-snmpv3-usm-3desede-00
"""
- SERVICE_ID = (1, 3, 6, 1, 6, 3, 10, 1, 2, 3) # usm3DESEDEPrivProtocol
+ # usm3DESEDEPrivProtocol
+ SERVICE_ID = (1, 3, 6, 1, 6, 3, 10, 1, 2, 3)
KEY_SIZE = 32
local_int = random.randrange(0, 0xffffffff)
@@ -42,28 +43,34 @@ class Des3(base.AbstractEncryptionService):
def hashPassphrase(self, authProtocol, privKey):
if authProtocol == hmacmd5.HmacMd5.SERVICE_ID:
hashAlgo = md5
+
elif authProtocol == hmacsha.HmacSha.SERVICE_ID:
hashAlgo = sha1
+
elif authProtocol in hmacsha2.HmacSha2.HASH_ALGORITHM:
hashAlgo = hmacsha2.HmacSha2.HASH_ALGORITHM[authProtocol]
+
else:
raise error.ProtocolError(
- 'Unknown auth protocol %s' % (authProtocol,)
- )
+ 'Unknown auth protocol %s' % (authProtocol,))
+
return localkey.hashPassphrase(privKey, hashAlgo)
# 2.1
def localizeKey(self, authProtocol, privKey, snmpEngineID):
if authProtocol == hmacmd5.HmacMd5.SERVICE_ID:
hashAlgo = md5
+
elif authProtocol == hmacsha.HmacSha.SERVICE_ID:
hashAlgo = sha1
+
elif authProtocol in hmacsha2.HmacSha2.HASH_ALGORITHM:
hashAlgo = hmacsha2.HmacSha2.HASH_ALGORITHM[authProtocol]
+
else:
raise error.ProtocolError(
- 'Unknown auth protocol %s' % (authProtocol,)
- )
+ 'Unknown auth protocol %s' % (authProtocol,))
+
localPrivKey = localkey.localizeKey(privKey, snmpEngineID, hashAlgo)
# now extend this key if too short by repeating steps that includes the hashPassphrase step
@@ -75,7 +82,7 @@ class Des3(base.AbstractEncryptionService):
return localPrivKey[:self.KEY_SIZE]
# 5.1.1.1
- def __getEncryptionKey(self, privKey, snmpEngineBoots):
+ def _getEncryptionKey(self, privKey, snmpEngineBoots):
# 5.1.1.1.1
des3Key = privKey[:24]
preIV = privKey[24:32]
@@ -94,39 +101,46 @@ class Des3(base.AbstractEncryptionService):
]
if self.local_int == 0xffffffff:
self.local_int = 0
+
else:
self.local_int += 1
# salt not yet hashed XXX
+ iv = map(lambda x, y: x ^ y, salt, preIV.asNumbers())
+
return (des3Key.asOctets(),
univ.OctetString(salt).asOctets(),
- univ.OctetString(map(lambda x, y: x ^ y, salt, preIV.asNumbers())).asOctets())
+ univ.OctetString(iv).asOctets())
@staticmethod
- def __getDecryptionKey(privKey, salt):
+ def _getDecryptionKey(privKey, salt):
+ iv = map(
+ lambda x, y: x ^ y, salt.asNumbers(), privKey[24:32].asNumbers())
+
return (privKey[:24].asOctets(),
- univ.OctetString(map(lambda x, y: x ^ y, salt.asNumbers(), privKey[24:32].asNumbers())).asOctets())
+ univ.OctetString(iv).asOctets())
# 5.1.1.2
def encryptData(self, encryptKey, privParameters, dataToEncrypt):
snmpEngineBoots, snmpEngineTime, salt = privParameters
- des3Key, salt, iv = self.__getEncryptionKey(
+ des3Key, salt, iv = self._getEncryptionKey(
encryptKey, snmpEngineBoots
)
privParameters = univ.OctetString(salt)
- plaintext = dataToEncrypt + univ.OctetString((0,) * (8 - len(dataToEncrypt) % 8)).asOctets()
+ plaintext = dataToEncrypt
+ plaintext += univ.OctetString(
+ (0,) * (8 - len(dataToEncrypt) % 8)).asOctets()
try:
ciphertext = des3.encrypt(plaintext, des3Key, iv)
except PysnmpCryptoError:
raise error.StatusInformation(
- errorIndication=errind.unsupportedPrivProtocol
- )
+ errorIndication=errind.unsupportedPrivProtocol)
return univ.OctetString(ciphertext), privParameters
@@ -136,15 +150,13 @@ class Des3(base.AbstractEncryptionService):
if len(salt) != 8:
raise error.StatusInformation(
- errorIndication=errind.decryptionError
- )
+ errorIndication=errind.decryptionError)
- des3Key, iv = self.__getDecryptionKey(decryptKey, salt)
+ des3Key, iv = self._getDecryptionKey(decryptKey, salt)
if len(encryptedData) % 8 != 0:
raise error.StatusInformation(
- errorIndication=errind.decryptionError
- )
+ errorIndication=errind.decryptionError)
ciphertext = encryptedData.asOctets()
@@ -153,7 +165,6 @@ class Des3(base.AbstractEncryptionService):
except PysnmpCryptoError:
raise error.StatusInformation(
- errorIndication=errind.unsupportedPrivProtocol
- )
+ errorIndication=errind.unsupportedPrivProtocol)
return plaintext
diff --git a/pysnmp/proto/secmod/rfc2576.py b/pysnmp/proto/secmod/rfc2576.py
index ba11b0fe..92b5eac2 100644
--- a/pysnmp/proto/secmod/rfc2576.py
+++ b/pysnmp/proto/secmod/rfc2576.py
@@ -4,9 +4,6 @@
# Copyright (c) 2005-2019, Ilya Etingof <etingof@gmail.com>
# License: http://snmplabs.com/pysnmp/license.html
#
-from pyasn1.codec.ber import encoder
-from pyasn1.error import PyAsn1Error
-
from pysnmp import debug
from pysnmp.carrier.asyncore.dgram import udp
from pysnmp.carrier.asyncore.dgram import udp6
@@ -15,6 +12,9 @@ from pysnmp.proto import error
from pysnmp.proto.secmod import base
from pysnmp.smi.error import NoSuchInstanceError
+from pyasn1.codec.ber import encoder
+from pyasn1.error import PyAsn1Error
+
class SnmpV1SecurityModel(base.AbstractSecurityModel):
SECURITY_MODEL_ID = 1
@@ -26,60 +26,69 @@ class SnmpV1SecurityModel(base.AbstractSecurityModel):
# in here.
def __init__(self):
- self.__transportBranchId = self.__paramsBranchId = self.__communityBranchId = self.__securityBranchId = -1
+ self._transportBranchId = -1
+ self._paramsBranchId = -1
+ self._communityBranchId = -1
+ self._securityBranchId = -1
+
base.AbstractSecurityModel.__init__(self)
def _sec2com(self, snmpEngine, securityName, contextEngineId, contextName):
- snmpTargetParamsSecurityName, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols(
+ mibBuilder = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder
+
+ snmpTargetParamsSecurityName, = mibBuilder.importSymbols(
'SNMP-TARGET-MIB', 'snmpTargetParamsSecurityName')
- if self.__paramsBranchId != snmpTargetParamsSecurityName.branchVersionId:
- snmpTargetParamsSecurityModel, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols(
+
+ if self._paramsBranchId != snmpTargetParamsSecurityName.branchVersionId:
+ snmpTargetParamsSecurityModel, = mibBuilder.importSymbols(
'SNMP-TARGET-MIB', 'snmpTargetParamsSecurityModel')
- self.__nameToModelMap = {}
+ self._nameToModelMap = {}
nextMibNode = snmpTargetParamsSecurityName
while True:
try:
- nextMibNode = snmpTargetParamsSecurityName.getNextNode(nextMibNode.name)
+ nextMibNode = snmpTargetParamsSecurityName.getNextNode(
+ nextMibNode.name)
except NoSuchInstanceError:
break
instId = nextMibNode.name[len(snmpTargetParamsSecurityName.name):]
- mibNode = snmpTargetParamsSecurityModel.getNode(snmpTargetParamsSecurityModel.name + instId)
+ mibNode = snmpTargetParamsSecurityModel.getNode(
+ snmpTargetParamsSecurityModel.name + instId)
try:
- if mibNode.syntax not in self.__nameToModelMap:
- self.__nameToModelMap[nextMibNode.syntax] = set()
+ if mibNode.syntax not in self._nameToModelMap:
+ self._nameToModelMap[nextMibNode.syntax] = set()
- self.__nameToModelMap[nextMibNode.syntax].add(mibNode.syntax)
+ self._nameToModelMap[nextMibNode.syntax].add(mibNode.syntax)
except PyAsn1Error:
debug.logger & debug.FLAG_SM and debug.logger(
- '_sec2com: table entries %r/%r hashing failed' % (
- nextMibNode.syntax, mibNode.syntax)
- )
+ '_sec2com: table entries %r/%r hashing '
+ 'failed' % (nextMibNode.syntax, mibNode.syntax))
continue
- self.__paramsBranchId = snmpTargetParamsSecurityName.branchVersionId
+ self._paramsBranchId = snmpTargetParamsSecurityName.branchVersionId
# invalidate next map as it include this one
- self.__securityBranchId = -1
+ self._securityBranchId = -1
- snmpCommunityName, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('SNMP-COMMUNITY-MIB',
- 'snmpCommunityName')
- if self.__securityBranchId != snmpCommunityName.branchVersionId:
+ snmpCommunityName, = mibBuilder.importSymbols(
+ 'SNMP-COMMUNITY-MIB', 'snmpCommunityName')
+
+ if self._securityBranchId != snmpCommunityName.branchVersionId:
(snmpCommunitySecurityName,
snmpCommunityContextEngineId,
- snmpCommunityContextName) = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols(
+ snmpCommunityContextName) = mibBuilder.importSymbols(
'SNMP-COMMUNITY-MIB', 'snmpCommunitySecurityName',
'snmpCommunityContextEngineID', 'snmpCommunityContextName'
)
- self.__securityMap = {}
+ self._securityMap = {}
nextMibNode = snmpCommunityName
@@ -92,54 +101,55 @@ class SnmpV1SecurityModel(base.AbstractSecurityModel):
instId = nextMibNode.name[len(snmpCommunityName.name):]
- _securityName = snmpCommunitySecurityName.getNode(snmpCommunitySecurityName.name + instId).syntax
+ _securityName = snmpCommunitySecurityName.getNode(
+ snmpCommunitySecurityName.name + instId).syntax
_contextEngineId = snmpCommunityContextEngineId.getNode(
snmpCommunityContextEngineId.name + instId).syntax
- _contextName = snmpCommunityContextName.getNode(snmpCommunityContextName.name + instId).syntax
+ _contextName = snmpCommunityContextName.getNode(
+ snmpCommunityContextName.name + instId).syntax
+
+ key = _securityName, _contextEngineId, _contextName
try:
- self.__securityMap[(_securityName,
- _contextEngineId,
- _contextName)] = nextMibNode.syntax
+ self._securityMap[key] = nextMibNode.syntax
except PyAsn1Error:
debug.logger & debug.FLAG_SM and debug.logger(
- '_sec2com: table entries %r/%r/%r hashing failed' % (
- _securityName, _contextEngineId, _contextName)
- )
+ '_sec2com: table entries %r/%r/%r hashing failed' % key)
continue
- self.__securityBranchId = snmpCommunityName.branchVersionId
+ self._securityBranchId = snmpCommunityName.branchVersionId
debug.logger & debug.FLAG_SM and debug.logger(
- '_sec2com: built securityName to communityName map, version %s: %s' % (
- self.__securityBranchId, self.__securityMap))
+ '_sec2com: built securityName to communityName map, version '
+ '%s: %s' % (self._securityBranchId, self._securityMap))
+
+ key = securityName, contextEngineId, contextName
try:
- return self.__securityMap[(securityName,
- contextEngineId,
- contextName)]
+ return self._securityMap[key]
except KeyError:
raise error.StatusInformation(
- errorIndication=errind.unknownCommunityName
- )
+ errorIndication=errind.unknownCommunityName)
def _com2sec(self, snmpEngine, communityName, transportInformation):
- snmpTargetAddrTAddress, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols(
+ mibBuilder = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder
+
+ snmpTargetAddrTAddress, = mibBuilder.importSymbols(
'SNMP-TARGET-MIB', 'snmpTargetAddrTAddress')
- if self.__transportBranchId != snmpTargetAddrTAddress.branchVersionId:
+
+ if self._transportBranchId != snmpTargetAddrTAddress.branchVersionId:
(SnmpTagValue, snmpTargetAddrTDomain,
- snmpTargetAddrTagList) = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols(
+ snmpTargetAddrTagList) = mibBuilder.importSymbols(
'SNMP-TARGET-MIB', 'SnmpTagValue', 'snmpTargetAddrTDomain',
- 'snmpTargetAddrTagList'
- )
+ 'snmpTargetAddrTagList')
- self.__emptyTag = SnmpTagValue('')
+ self._emptyTag = SnmpTagValue('')
- self.__transportToTagMap = {}
+ self._transportToTagMap = {}
nextMibNode = snmpTargetAddrTagList
@@ -151,56 +161,64 @@ class SnmpV1SecurityModel(base.AbstractSecurityModel):
break
instId = nextMibNode.name[len(snmpTargetAddrTagList.name):]
- targetAddrTDomain = snmpTargetAddrTDomain.getNode(snmpTargetAddrTDomain.name + instId).syntax
- targetAddrTAddress = snmpTargetAddrTAddress.getNode(snmpTargetAddrTAddress.name + instId).syntax
+
+ targetAddrTDomain = snmpTargetAddrTDomain.getNode(
+ snmpTargetAddrTDomain.name + instId).syntax
+ targetAddrTAddress = snmpTargetAddrTAddress.getNode(
+ snmpTargetAddrTAddress.name + instId).syntax
targetAddrTDomain = tuple(targetAddrTDomain)
- if targetAddrTDomain[:len(udp.SNMP_UDP_DOMAIN)] == udp.SNMP_UDP_DOMAIN:
- SnmpUDPAddress, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('SNMPv2-TM',
- 'SnmpUDPAddress')
+ if (targetAddrTDomain[:len(udp.SNMP_UDP_DOMAIN)] ==
+ udp.SNMP_UDP_DOMAIN):
+ SnmpUDPAddress, = mibBuilder.importSymbols(
+ 'SNMPv2-TM', 'SnmpUDPAddress')
targetAddrTAddress = tuple(SnmpUDPAddress(targetAddrTAddress))
- elif targetAddrTDomain[:len(udp6.SNMP_UDP6_DOMAIN)] == udp6.SNMP_UDP6_DOMAIN:
- TransportAddressIPv6, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols(
+
+ elif (targetAddrTDomain[:len(udp6.SNMP_UDP6_DOMAIN)] ==
+ udp6.SNMP_UDP6_DOMAIN):
+ TransportAddressIPv6, = mibBuilder.importSymbols(
'TRANSPORT-ADDRESS-MIB', 'TransportAddressIPv6')
+
targetAddrTAddress = tuple(TransportAddressIPv6(targetAddrTAddress))
targetAddr = targetAddrTDomain, targetAddrTAddress
- targetAddrTagList = snmpTargetAddrTagList.getNode(snmpTargetAddrTagList.name + instId).syntax
- if targetAddr not in self.__transportToTagMap:
- self.__transportToTagMap[targetAddr] = set()
+ targetAddrTagList = snmpTargetAddrTagList.getNode(
+ snmpTargetAddrTagList.name + instId).syntax
+
+ if targetAddr not in self._transportToTagMap:
+ self._transportToTagMap[targetAddr] = set()
try:
if targetAddrTagList:
- self.__transportToTagMap[targetAddr].update(
+ self._transportToTagMap[targetAddr].update(
[SnmpTagValue(x)
- for x in targetAddrTagList.asOctets().split()]
- )
+ for x in targetAddrTagList.asOctets().split()])
else:
- self.__transportToTagMap[targetAddr].add(self.__emptyTag)
+ self._transportToTagMap[targetAddr].add(self._emptyTag)
except PyAsn1Error:
debug.logger & debug.FLAG_SM and debug.logger(
'_com2sec: table entries %r/%r hashing failed' % (
- targetAddr, targetAddrTagList)
- )
+ targetAddr, targetAddrTagList))
continue
- self.__transportBranchId = snmpTargetAddrTAddress.branchVersionId
+ self._transportBranchId = snmpTargetAddrTAddress.branchVersionId
- debug.logger & debug.FLAG_SM and debug.logger('_com2sec: built transport-to-tag map version %s: %s' % (
- self.__transportBranchId, self.__transportToTagMap))
+ debug.logger & debug.FLAG_SM and debug.logger(
+ '_com2sec: built transport-to-tag map version %s: '
+ '%s' % (self._transportBranchId, self._transportToTagMap))
- snmpTargetParamsSecurityName, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols(
+ snmpTargetParamsSecurityName, = mibBuilder.importSymbols(
'SNMP-TARGET-MIB', 'snmpTargetParamsSecurityName')
- if self.__paramsBranchId != snmpTargetParamsSecurityName.branchVersionId:
- snmpTargetParamsSecurityModel, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols(
+ if self._paramsBranchId != snmpTargetParamsSecurityName.branchVersionId:
+ snmpTargetParamsSecurityModel, = mibBuilder.importSymbols(
'SNMP-TARGET-MIB', 'snmpTargetParamsSecurityModel')
- self.__nameToModelMap = {}
+ self._nameToModelMap = {}
nextMibNode = snmpTargetParamsSecurityName
@@ -213,43 +231,44 @@ class SnmpV1SecurityModel(base.AbstractSecurityModel):
instId = nextMibNode.name[len(snmpTargetParamsSecurityName.name):]
- mibNode = snmpTargetParamsSecurityModel.getNode(snmpTargetParamsSecurityModel.name + instId)
+ mibNode = snmpTargetParamsSecurityModel.getNode(
+ snmpTargetParamsSecurityModel.name + instId)
try:
- if nextMibNode.syntax not in self.__nameToModelMap:
- self.__nameToModelMap[nextMibNode.syntax] = set()
+ if nextMibNode.syntax not in self._nameToModelMap:
+ self._nameToModelMap[nextMibNode.syntax] = set()
- self.__nameToModelMap[nextMibNode.syntax].add(mibNode.syntax)
+ self._nameToModelMap[nextMibNode.syntax].add(mibNode.syntax)
except PyAsn1Error:
debug.logger & debug.FLAG_SM and debug.logger(
- '_com2sec: table entries %r/%r hashing failed' % (
- nextMibNode.syntax, mibNode.syntax)
- )
+ '_com2sec: table entries %r/%r hashing '
+ 'failed' % (nextMibNode.syntax, mibNode.syntax))
continue
- self.__paramsBranchId = snmpTargetParamsSecurityName.branchVersionId
+ self._paramsBranchId = snmpTargetParamsSecurityName.branchVersionId
# invalidate next map as it include this one
- self.__communityBranchId = -1
+ self._communityBranchId = -1
debug.logger & debug.FLAG_SM and debug.logger(
- '_com2sec: built securityName to securityModel map, version %s: %s' % (
- self.__paramsBranchId, self.__nameToModelMap))
+ '_com2sec: built securityName to securityModel map, version '
+ '%s: %s' % (self._paramsBranchId, self._nameToModelMap))
+
+ snmpCommunityName, = mibBuilder.importSymbols(
+ 'SNMP-COMMUNITY-MIB', 'snmpCommunityName')
- snmpCommunityName, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('SNMP-COMMUNITY-MIB',
- 'snmpCommunityName')
- if self.__communityBranchId != snmpCommunityName.branchVersionId:
+ if self._communityBranchId != snmpCommunityName.branchVersionId:
(snmpCommunitySecurityName, snmpCommunityContextEngineId,
snmpCommunityContextName,
- snmpCommunityTransportTag) = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols(
+ snmpCommunityTransportTag) = mibBuilder.importSymbols(
'SNMP-COMMUNITY-MIB', 'snmpCommunitySecurityName',
'snmpCommunityContextEngineID', 'snmpCommunityContextName',
'snmpCommunityTransportTag'
)
- self.__communityToTagMap = {}
- self.__tagAndCommunityToSecurityMap = {}
+ self._communityToTagMap = {}
+ self._tagAndCommunityToSecurityMap = {}
nextMibNode = snmpCommunityName
@@ -262,100 +281,125 @@ class SnmpV1SecurityModel(base.AbstractSecurityModel):
instId = nextMibNode.name[len(snmpCommunityName.name):]
- securityName = snmpCommunitySecurityName.getNode(snmpCommunitySecurityName.name + instId).syntax
+ securityName = snmpCommunitySecurityName.getNode(
+ snmpCommunitySecurityName.name + instId).syntax
contextEngineId = snmpCommunityContextEngineId.getNode(
snmpCommunityContextEngineId.name + instId).syntax
- contextName = snmpCommunityContextName.getNode(snmpCommunityContextName.name + instId).syntax
+ contextName = snmpCommunityContextName.getNode(
+ snmpCommunityContextName.name + instId).syntax
- transportTag = snmpCommunityTransportTag.getNode(snmpCommunityTransportTag.name + instId).syntax
+ transportTag = snmpCommunityTransportTag.getNode(
+ snmpCommunityTransportTag.name + instId).syntax
_tagAndCommunity = transportTag, nextMibNode.syntax
try:
- if _tagAndCommunity not in self.__tagAndCommunityToSecurityMap:
- self.__tagAndCommunityToSecurityMap[_tagAndCommunity] = set()
+ if _tagAndCommunity not in self._tagAndCommunityToSecurityMap:
+ self._tagAndCommunityToSecurityMap[_tagAndCommunity] = set()
- self.__tagAndCommunityToSecurityMap[_tagAndCommunity].add(
- (securityName, contextEngineId, contextName)
- )
+ self._tagAndCommunityToSecurityMap[_tagAndCommunity].add(
+ (securityName, contextEngineId, contextName))
- if nextMibNode.syntax not in self.__communityToTagMap:
- self.__communityToTagMap[nextMibNode.syntax] = set()
+ if nextMibNode.syntax not in self._communityToTagMap:
+ self._communityToTagMap[nextMibNode.syntax] = set()
- self.__communityToTagMap[nextMibNode.syntax].add(transportTag)
+ self._communityToTagMap[nextMibNode.syntax].add(transportTag)
except PyAsn1Error:
debug.logger & debug.FLAG_SM and debug.logger(
- '_com2sec: table entries %r/%r hashing failed' % (
- _tagAndCommunity, nextMibNode.syntax)
- )
+ '_com2sec: table entries %r/%r hashing '
+ 'failed' % (_tagAndCommunity, nextMibNode.syntax))
continue
- self.__communityBranchId = snmpCommunityName.branchVersionId
+ self._communityBranchId = snmpCommunityName.branchVersionId
debug.logger & debug.FLAG_SM and debug.logger(
- '_com2sec: built communityName to tag map (securityModel %s), version %s: %s' % (
- self.SECURITY_MODEL_ID, self.__communityBranchId, self.__communityToTagMap))
+ '_com2sec: built communityName to tag map '
+ '(securityModel %s), version %s: '
+ '%s' % (self.SECURITY_MODEL_ID, self._communityBranchId,
+ self._communityToTagMap))
debug.logger & debug.FLAG_SM and debug.logger(
- '_com2sec: built tag & community to securityName map (securityModel %s), version %s: %s' % (
- self.SECURITY_MODEL_ID, self.__communityBranchId, self.__tagAndCommunityToSecurityMap))
-
- if communityName in self.__communityToTagMap:
- if transportInformation in self.__transportToTagMap:
- tags = self.__transportToTagMap[transportInformation].intersection(
- self.__communityToTagMap[communityName])
- elif self.__emptyTag in self.__communityToTagMap[communityName]:
- tags = [self.__emptyTag]
+ '_com2sec: built tag & community to securityName map '
+ '(securityModel %s), version %s: '
+ '%s' % (self.SECURITY_MODEL_ID, self._communityBranchId,
+ self._tagAndCommunityToSecurityMap))
+
+ if communityName in self._communityToTagMap:
+ if transportInformation in self._transportToTagMap:
+ tags = self._transportToTagMap[transportInformation].intersection(
+ self._communityToTagMap[communityName])
+
+ elif self._emptyTag in self._communityToTagMap[communityName]:
+ tags = [self._emptyTag]
+
else:
- raise error.StatusInformation(errorIndication=errind.unknownCommunityName)
+ raise error.StatusInformation(
+ errorIndication=errind.unknownCommunityName)
candidateSecurityNames = []
- for x in [self.__tagAndCommunityToSecurityMap[(t, communityName)] for t in tags]:
+ securityNamesSets = [
+ self._tagAndCommunityToSecurityMap[(t, communityName)]
+ for t in tags
+ ]
+
+ for x in securityNamesSets:
candidateSecurityNames.extend(list(x))
- # 5.2.1 (row selection in snmpCommunityTable)
- # Picks first match but favors entries already in targets table
if candidateSecurityNames:
- candidateSecurityNames.sort(
- key=lambda x, m=self.__nameToModelMap, v=self.SECURITY_MODEL_ID: (
- not int(x[0] in m and v in m[x[0]]), str(x[0]))
- )
+ candidateSecurityNames.sort(key=self._orderSecurityNames)
+
chosenSecurityName = candidateSecurityNames[0] # min()
+
debug.logger & debug.FLAG_SM and debug.logger(
- '_com2sec: securityName candidates for communityName \'%s\' are %s; choosing securityName \'%s\'' % (
- communityName, candidateSecurityNames, chosenSecurityName[0]))
+ '_com2sec: securityName candidates for communityName %s '
+ 'are %s; choosing securityName '
+ '%s' % (communityName, candidateSecurityNames,
+ chosenSecurityName[0]))
+
return chosenSecurityName
- raise error.StatusInformation(errorIndication=errind.unknownCommunityName)
+ raise error.StatusInformation(
+ errorIndication=errind.unknownCommunityName)
+
+ # 5.2.1 (row selection in snmpCommunityTable)
+ # Picks first match but favors entries already in targets table
+ def _orderSecurityNames(self, securityName):
+ return (not int(securityName[0] in self._nameToModelMap and
+ self.SECURITY_MODEL_ID in self._nameToModelMap[securityName[0]]),
+ str(securityName[0]))
def generateRequestMsg(self, snmpEngine, messageProcessingModel,
globalData, maxMessageSize, securityModel,
securityEngineId, securityName, securityLevel,
scopedPDU):
msg, = globalData
+
contextEngineId, contextName, pdu = scopedPDU
# rfc2576: 5.2.3
- communityName = self._sec2com(snmpEngine, securityName,
- contextEngineId, contextName)
+ communityName = self._sec2com(
+ snmpEngine, securityName, contextEngineId, contextName)
debug.logger & debug.FLAG_SM and debug.logger(
- 'generateRequestMsg: using community %r for securityModel %r, securityName %r, contextEngineId %r contextName %r' % (
- communityName, securityModel, securityName, contextEngineId, contextName))
+ 'generateRequestMsg: using community %r for securityModel %r, '
+ 'securityName %r, contextEngineId %r contextName '
+ '%r' % (communityName, securityModel, securityName,
+ contextEngineId, contextName))
securityParameters = communityName
msg.setComponentByPosition(1, securityParameters)
msg.setComponentByPosition(2)
msg.getComponentByPosition(2).setComponentByType(
- pdu.tagSet, pdu, verifyConstraints=False, matchTags=False, matchConstraints=False
- )
+ pdu.tagSet, pdu, verifyConstraints=False, matchTags=False,
+ matchConstraints=False)
- debug.logger & debug.FLAG_MP and debug.logger('generateRequestMsg: %s' % (msg.prettyPrint(),))
+ debug.logger & debug.FLAG_MP and debug.logger(
+ 'generateRequestMsg: %s' % (msg.prettyPrint(),))
try:
return securityParameters, encoder.encode(msg)
@@ -363,7 +407,9 @@ class SnmpV1SecurityModel(base.AbstractSecurityModel):
except PyAsn1Error as exc:
debug.logger & debug.FLAG_MP and debug.logger(
'generateRequestMsg: serialization failure: %s' % exc)
- raise error.StatusInformation(errorIndication=errind.serializationError)
+
+ raise error.StatusInformation(
+ errorIndication=errind.serializationError)
def generateResponseMsg(self, snmpEngine, messageProcessingModel,
globalData, maxMessageSize, securityModel,
@@ -371,21 +417,26 @@ class SnmpV1SecurityModel(base.AbstractSecurityModel):
scopedPDU, securityStateReference):
# rfc2576: 5.2.2
msg, = globalData
+
contextEngineId, contextName, pdu = scopedPDU
+
cachedSecurityData = self._cache.pop(securityStateReference)
+
communityName = cachedSecurityData['communityName']
debug.logger & debug.FLAG_SM and debug.logger(
- 'generateResponseMsg: recovered community %r by securityStateReference %s' % (
- communityName, securityStateReference))
+ 'generateResponseMsg: recovered community %r by '
+ 'securityStateReference '
+ '%s' % (communityName, securityStateReference))
msg.setComponentByPosition(1, communityName)
msg.setComponentByPosition(2)
msg.getComponentByPosition(2).setComponentByType(
- pdu.tagSet, pdu, verifyConstraints=False, matchTags=False, matchConstraints=False
- )
+ pdu.tagSet, pdu, verifyConstraints=False, matchTags=False,
+ matchConstraints=False)
- debug.logger & debug.FLAG_MP and debug.logger('generateResponseMsg: %s' % (msg.prettyPrint(),))
+ debug.logger & debug.FLAG_MP and debug.logger(
+ 'generateResponseMsg: %s' % (msg.prettyPrint(),))
try:
return communityName, encoder.encode(msg)
@@ -393,11 +444,14 @@ class SnmpV1SecurityModel(base.AbstractSecurityModel):
except PyAsn1Error as exc:
debug.logger & debug.FLAG_MP and debug.logger(
'generateResponseMsg: serialization failure: %s' % exc)
+
raise error.StatusInformation(errorIndication=errind.serializationError)
def processIncomingMsg(self, snmpEngine, messageProcessingModel,
maxMessageSize, securityParameters, securityModel,
securityLevel, wholeMsg, msg):
+ mibBuilder = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder
+
# rfc2576: 5.2.1
communityName, transportInformation = securityParameters
@@ -407,6 +461,7 @@ class SnmpV1SecurityModel(base.AbstractSecurityModel):
snmpEngine.observer.storeExecutionContext(
snmpEngine, 'rfc2576.processIncomingMsg:writable', scope
)
+
snmpEngine.observer.clearExecutionContext(
snmpEngine, 'rfc2576.processIncomingMsg:writable'
)
@@ -418,16 +473,17 @@ class SnmpV1SecurityModel(base.AbstractSecurityModel):
)
except error.StatusInformation:
- snmpInBadCommunityNames, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols(
+ snmpInBadCommunityNames, = mibBuilder.importSymbols(
'__SNMPv2-MIB', 'snmpInBadCommunityNames')
snmpInBadCommunityNames.syntax += 1
+
raise error.StatusInformation(
errorIndication=errind.unknownCommunityName,
communityName=communityName
)
- snmpEngineID, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('__SNMP-FRAMEWORK-MIB',
- 'snmpEngineID')
+ snmpEngineID, = mibBuilder.importSymbols(
+ '__SNMP-FRAMEWORK-MIB', 'snmpEngineID')
securityEngineID = snmpEngineID.syntax
@@ -440,24 +496,31 @@ class SnmpV1SecurityModel(base.AbstractSecurityModel):
contextEngineId=contextEngineId,
contextName=contextName)
)
+
snmpEngine.observer.clearExecutionContext(
snmpEngine, 'rfc2576.processIncomingMsg'
)
debug.logger & debug.FLAG_SM and debug.logger(
- 'processIncomingMsg: looked up securityName %r securityModel %r contextEngineId %r contextName %r by communityName %r AND transportInformation %r' % (
- securityName, self.SECURITY_MODEL_ID, contextEngineId, contextName, communityName, transportInformation))
+ 'processIncomingMsg: looked up securityName %r securityModel %r '
+ 'contextEngineId %r contextName %r by communityName %r '
+ 'AND transportInformation '
+ '%r' % (securityName, self.SECURITY_MODEL_ID, contextEngineId,
+ contextName, communityName, transportInformation))
stateReference = self._cache.push(communityName=communityName)
scopedPDU = (contextEngineId, contextName,
msg.getComponentByPosition(2).getComponent())
+
maxSizeResponseScopedPDU = maxMessageSize - 128
+
securityStateReference = stateReference
debug.logger & debug.FLAG_SM and debug.logger(
- 'processIncomingMsg: generated maxSizeResponseScopedPDU %s securityStateReference %s' % (
- maxSizeResponseScopedPDU, securityStateReference))
+ 'processIncomingMsg: generated maxSizeResponseScopedPDU '
+ '%s securityStateReference '
+ '%s' % (maxSizeResponseScopedPDU, securityStateReference))
return (securityEngineID, securityName, scopedPDU,
maxSizeResponseScopedPDU, securityStateReference)
diff --git a/pysnmp/proto/secmod/rfc3414/auth/hmacmd5.py b/pysnmp/proto/secmod/rfc3414/auth/hmacmd5.py
index df3db63f..c3578682 100644
--- a/pysnmp/proto/secmod/rfc3414/auth/hmacmd5.py
+++ b/pysnmp/proto/secmod/rfc3414/auth/hmacmd5.py
@@ -41,11 +41,13 @@ class HmacMd5(base.AbstractAuthenticationService):
# should be in the substrate. Also, it pre-sets digest placeholder
# so we hash wholeMsg out of the box.
# Yes, that's ugly but that's rfc...
- l = wholeMsg.find(TWELVE_ZEROS)
- if l == -1:
+ ln = wholeMsg.find(TWELVE_ZEROS)
+
+ if ln == -1:
raise error.ProtocolError('Cant locate digest placeholder')
- wholeHead = wholeMsg[:l]
- wholeTail = wholeMsg[l + 12:]
+
+ wholeHead = wholeMsg[:ln]
+ wholeTail = wholeMsg[ln + 12:]
# 6.3.1.1
@@ -56,15 +58,13 @@ class HmacMd5(base.AbstractAuthenticationService):
# 6.3.1.2c
k1 = univ.OctetString(
- map(lambda x, y: x ^ y, extendedAuthKey, self.IPAD)
- )
+ map(lambda x, y: x ^ y, extendedAuthKey, self.IPAD))
# 6.3.1.2d --> no-op
# 6.3.1.2e
k2 = univ.OctetString(
- map(lambda x, y: x ^ y, extendedAuthKey, self.OPAD)
- )
+ map(lambda x, y: x ^ y, extendedAuthKey, self.OPAD))
# 6.3.1.3
# noinspection PyDeprecation,PyCallingNonCallable
@@ -83,15 +83,16 @@ class HmacMd5(base.AbstractAuthenticationService):
# 6.3.2.1 & 2
if len(authParameters) != 12:
raise error.StatusInformation(
- errorIndication=errind.authenticationError
- )
+ errorIndication=errind.authenticationError)
# 6.3.2.3
- l = wholeMsg.find(authParameters.asOctets())
- if l == -1:
+ ln = wholeMsg.find(authParameters.asOctets())
+ if ln == -1:
raise error.ProtocolError('Cant locate digest in wholeMsg')
- wholeHead = wholeMsg[:l]
- wholeTail = wholeMsg[l + 12:]
+
+ wholeHead = wholeMsg[:ln]
+ wholeTail = wholeMsg[ln + 12:]
+
authenticatedWholeMsg = wholeHead + TWELVE_ZEROS + wholeTail
# 6.3.2.4a
@@ -101,15 +102,13 @@ class HmacMd5(base.AbstractAuthenticationService):
# 6.3.2.4c
k1 = univ.OctetString(
- map(lambda x, y: x ^ y, extendedAuthKey, self.IPAD)
- )
+ map(lambda x, y: x ^ y, extendedAuthKey, self.IPAD))
# 6.3.2.4d --> no-op
# 6.3.2.4e
k2 = univ.OctetString(
- map(lambda x, y: x ^ y, extendedAuthKey, self.OPAD)
- )
+ map(lambda x, y: x ^ y, extendedAuthKey, self.OPAD))
# 6.3.2.5a
# noinspection PyDeprecation,PyCallingNonCallable
@@ -125,7 +124,6 @@ class HmacMd5(base.AbstractAuthenticationService):
# 6.3.2.6
if mac != authParameters:
raise error.StatusInformation(
- errorIndication=errind.authenticationFailure
- )
+ errorIndication=errind.authenticationFailure)
return authenticatedWholeMsg
diff --git a/pysnmp/proto/secmod/rfc3414/auth/hmacsha.py b/pysnmp/proto/secmod/rfc3414/auth/hmacsha.py
index 4d5498d1..76c35af0 100644
--- a/pysnmp/proto/secmod/rfc3414/auth/hmacsha.py
+++ b/pysnmp/proto/secmod/rfc3414/auth/hmacsha.py
@@ -42,11 +42,12 @@ class HmacSha(base.AbstractAuthenticationService):
# should be in the substrate. Also, it pre-sets digest placeholder
# so we hash wholeMsg out of the box.
# Yes, that's ugly but that's rfc...
- l = wholeMsg.find(TWELVE_ZEROS)
- if l == -1:
+ ln = wholeMsg.find(TWELVE_ZEROS)
+ if ln == -1:
raise error.ProtocolError('Cant locate digest placeholder')
- wholeHead = wholeMsg[:l]
- wholeTail = wholeMsg[l + 12:]
+
+ wholeHead = wholeMsg[:ln]
+ wholeTail = wholeMsg[ln + 12:]
# 7.3.1.2a
extendedAuthKey = authKey.asNumbers() + FORTY_FOUR_ZEROS
@@ -55,15 +56,13 @@ class HmacSha(base.AbstractAuthenticationService):
# 7.3.1.2c
k1 = univ.OctetString(
- map(lambda x, y: x ^ y, extendedAuthKey, self.IPAD)
- )
+ map(lambda x, y: x ^ y, extendedAuthKey, self.IPAD))
# 7.3.1.2d -- no-op
# 7.3.1.2e
k2 = univ.OctetString(
- map(lambda x, y: x ^ y, extendedAuthKey, self.OPAD)
- )
+ map(lambda x, y: x ^ y, extendedAuthKey, self.OPAD))
# 7.3.1.3
d1 = sha1(k1.asOctets() + wholeMsg).digest()
@@ -80,15 +79,16 @@ class HmacSha(base.AbstractAuthenticationService):
# 7.3.2.1 & 2
if len(authParameters) != 12:
raise error.StatusInformation(
- errorIndication=errind.authenticationError
- )
+ errorIndication=errind.authenticationError)
# 7.3.2.3
- l = wholeMsg.find(authParameters.asOctets())
- if l == -1:
+ ln = wholeMsg.find(authParameters.asOctets())
+ if ln == -1:
raise error.ProtocolError('Cant locate digest in wholeMsg')
- wholeHead = wholeMsg[:l]
- wholeTail = wholeMsg[l + 12:]
+
+ wholeHead = wholeMsg[:ln]
+ wholeTail = wholeMsg[ln + 12:]
+
authenticatedWholeMsg = wholeHead + TWELVE_ZEROS + wholeTail
# 7.3.2.4a
@@ -98,15 +98,13 @@ class HmacSha(base.AbstractAuthenticationService):
# 7.3.2.4c
k1 = univ.OctetString(
- map(lambda x, y: x ^ y, extendedAuthKey, self.IPAD)
- )
+ map(lambda x, y: x ^ y, extendedAuthKey, self.IPAD))
# 7.3.2.4d --> no-op
# 7.3.2.4e
k2 = univ.OctetString(
- map(lambda x, y: x ^ y, extendedAuthKey, self.OPAD)
- )
+ map(lambda x, y: x ^ y, extendedAuthKey, self.OPAD))
# 7.3.2.5a
d1 = sha1(k1.asOctets() + authenticatedWholeMsg).digest()
@@ -120,7 +118,6 @@ class HmacSha(base.AbstractAuthenticationService):
# 7.3.2.6
if mac != authParameters:
raise error.StatusInformation(
- errorIndication=errind.authenticationFailure
- )
+ errorIndication=errind.authenticationFailure)
return authenticatedWholeMsg
diff --git a/pysnmp/proto/secmod/rfc3414/localkey.py b/pysnmp/proto/secmod/rfc3414/localkey.py
index 144342fa..bc4faa91 100644
--- a/pysnmp/proto/secmod/rfc3414/localkey.py
+++ b/pysnmp/proto/secmod/rfc3414/localkey.py
@@ -12,9 +12,12 @@ from pyasn1.type import univ
def hashPassphrase(passphrase, hashFunc):
passphrase = univ.OctetString(passphrase).asOctets()
+
hasher = hashFunc()
+
ringBuffer = passphrase * (64 // len(passphrase) + 1)
ringBufferLen = len(ringBuffer)
+
count = 0
mark = 0
@@ -25,9 +28,11 @@ def hashPassphrase(passphrase, hashFunc):
mark = e
else:
- hasher.update(
- ringBuffer[mark:ringBufferLen] + ringBuffer[0:e - ringBufferLen]
- )
+ chunk = ringBuffer[mark:ringBufferLen]
+ chunk += ringBuffer[0:e - ringBufferLen]
+
+ hasher.update(chunk)
+
mark = e - ringBufferLen
count += 1
@@ -36,11 +41,13 @@ def hashPassphrase(passphrase, hashFunc):
def passwordToKey(passphrase, snmpEngineId, hashFunc):
- return localizeKey(hashPassphrase(passphrase, hashFunc), snmpEngineId, hashFunc)
+ return localizeKey(
+ hashPassphrase(passphrase, hashFunc), snmpEngineId, hashFunc)
def localizeKey(passKey, snmpEngineId, hashFunc):
passKey = univ.OctetString(passKey).asOctets()
+
# noinspection PyDeprecation,PyCallingNonCallable
return hashFunc(passKey + snmpEngineId.asOctets() + passKey).digest()
diff --git a/pysnmp/proto/secmod/rfc3414/priv/des.py b/pysnmp/proto/secmod/rfc3414/priv/des.py
index 5128a662..10058aeb 100644
--- a/pysnmp/proto/secmod/rfc3414/priv/des.py
+++ b/pysnmp/proto/secmod/rfc3414/priv/des.py
@@ -38,32 +38,39 @@ class Des(base.AbstractEncryptionService):
def hashPassphrase(self, authProtocol, privKey):
if authProtocol == hmacmd5.HmacMd5.SERVICE_ID:
hashAlgo = md5
+
elif authProtocol == hmacsha.HmacSha.SERVICE_ID:
hashAlgo = sha1
+
elif authProtocol in hmacsha2.HmacSha2.HASH_ALGORITHM:
hashAlgo = hmacsha2.HmacSha2.HASH_ALGORITHM[authProtocol]
+
else:
raise error.ProtocolError(
- 'Unknown auth protocol %s' % (authProtocol,)
- )
+ 'Unknown auth protocol %s' % (authProtocol,))
+
return localkey.hashPassphrase(privKey, hashAlgo)
def localizeKey(self, authProtocol, privKey, snmpEngineID):
if authProtocol == hmacmd5.HmacMd5.SERVICE_ID:
hashAlgo = md5
+
elif authProtocol == hmacsha.HmacSha.SERVICE_ID:
hashAlgo = sha1
+
elif authProtocol in hmacsha2.HmacSha2.HASH_ALGORITHM:
hashAlgo = hmacsha2.HmacSha2.HASH_ALGORITHM[authProtocol]
+
else:
raise error.ProtocolError(
- 'Unknown auth protocol %s' % (authProtocol,)
- )
+ 'Unknown auth protocol %s' % (authProtocol,))
+
localPrivKey = localkey.localizeKey(privKey, snmpEngineID, hashAlgo)
+
return localPrivKey[:self.KEY_SIZE]
# 8.1.1.1
- def __getEncryptionKey(self, privKey, snmpEngineBoots):
+ def _getEncryptionKey(self, privKey, snmpEngineBoots):
desKey = privKey[:8]
preIV = privKey[8:16]
@@ -77,42 +84,47 @@ class Des(base.AbstractEncryptionService):
self.local_int >> 16 & 0xff,
self.local_int >> 8 & 0xff,
self.local_int & 0xff]
+
if self.local_int == 0xffffffff:
self.local_int = 0
+
else:
self.local_int += 1
+ iv = map(lambda x, y: x ^ y, salt, preIV.asNumbers())
+
return (desKey.asOctets(),
univ.OctetString(salt).asOctets(),
- univ.OctetString(map(lambda x, y: x ^ y, salt, preIV.asNumbers())).asOctets())
+ univ.OctetString(iv).asOctets())
@staticmethod
- def __getDecryptionKey(privKey, salt):
- return (privKey[:8].asOctets(),
- univ.OctetString(map(lambda x, y: x ^ y, salt.asNumbers(), privKey[8:16].asNumbers())).asOctets())
+ def _getDecryptionKey(privKey, salt):
+ iv = map(lambda x, y: x ^ y, salt.asNumbers(),
+ privKey[8:16].asNumbers())
+
+ return privKey[:8].asOctets(), univ.OctetString(iv).asOctets()
# 8.2.4.1
def encryptData(self, encryptKey, privParameters, dataToEncrypt):
snmpEngineBoots, snmpEngineTime, salt = privParameters
# 8.3.1.1
- desKey, salt, iv = self.__getEncryptionKey(
- encryptKey, snmpEngineBoots
- )
+ desKey, salt, iv = self._getEncryptionKey(encryptKey, snmpEngineBoots)
# 8.3.1.2
privParameters = univ.OctetString(salt)
# 8.1.1.2
- plaintext = dataToEncrypt + univ.OctetString((0,) * (8 - len(dataToEncrypt) % 8)).asOctets()
+ plaintext = dataToEncrypt
+ plaintext += univ.OctetString(
+ (0,) * (8 - len(dataToEncrypt) % 8)).asOctets()
try:
ciphertext = des.encrypt(plaintext, desKey, iv)
except PysnmpCryptoError:
raise error.StatusInformation(
- errorIndication=errind.unsupportedPrivProtocol
- )
+ errorIndication=errind.unsupportedPrivProtocol)
# 8.3.1.3 & 4
return univ.OctetString(ciphertext), privParameters
@@ -124,19 +136,17 @@ class Des(base.AbstractEncryptionService):
# 8.3.2.1
if len(salt) != 8:
raise error.StatusInformation(
- errorIndication=errind.decryptionError
- )
+ errorIndication=errind.decryptionError)
# 8.3.2.2 no-op
# 8.3.2.3
- desKey, iv = self.__getDecryptionKey(decryptKey, salt)
+ desKey, iv = self._getDecryptionKey(decryptKey, salt)
# 8.3.2.4 -> 8.1.1.3
if len(encryptedData) % 8 != 0:
raise error.StatusInformation(
- errorIndication=errind.decryptionError
- )
+ errorIndication=errind.decryptionError)
try:
# 8.3.2.6
@@ -144,5 +154,4 @@ class Des(base.AbstractEncryptionService):
except PysnmpCryptoError:
raise error.StatusInformation(
- errorIndication=errind.unsupportedPrivProtocol
- )
+ errorIndication=errind.unsupportedPrivProtocol)
diff --git a/pysnmp/proto/secmod/rfc3414/service.py b/pysnmp/proto/secmod/rfc3414/service.py
index f9bf056b..0a8f54e5 100644
--- a/pysnmp/proto/secmod/rfc3414/service.py
+++ b/pysnmp/proto/secmod/rfc3414/service.py
@@ -38,15 +38,26 @@ from pysnmp.smi.error import NoSuchInstanceError
class UsmSecurityParameters(rfc1155.TypeCoercionHackMixIn, univ.Sequence):
componentType = namedtype.NamedTypes(
- namedtype.NamedType('msgAuthoritativeEngineId', univ.OctetString()),
- namedtype.NamedType('msgAuthoritativeEngineBoots',
- univ.Integer().subtype(subtypeSpec=constraint.ValueRangeConstraint(0, 2147483647))),
- namedtype.NamedType('msgAuthoritativeEngineTime',
- univ.Integer().subtype(subtypeSpec=constraint.ValueRangeConstraint(0, 2147483647))),
- namedtype.NamedType('msgUserName',
- univ.OctetString().subtype(subtypeSpec=constraint.ValueSizeConstraint(0, 32))),
- namedtype.NamedType('msgAuthenticationParameters', univ.OctetString()),
- namedtype.NamedType('msgPrivacyParameters', univ.OctetString())
+ namedtype.NamedType(
+ 'msgAuthoritativeEngineId', univ.OctetString()),
+ namedtype.NamedType(
+ 'msgAuthoritativeEngineBoots',
+ univ.Integer().subtype(
+ subtypeSpec=constraint.ValueRangeConstraint(0, 2147483647))),
+ namedtype.NamedType(
+ 'msgAuthoritativeEngineTime',
+ univ.Integer().subtype(
+ subtypeSpec=constraint.ValueRangeConstraint(0, 2147483647))),
+ namedtype.NamedType(
+ 'msgUserName',
+ univ.OctetString().subtype(
+ subtypeSpec=constraint.ValueSizeConstraint(0, 32))),
+ namedtype.NamedType(
+ 'msgAuthenticationParameters',
+ univ.OctetString()),
+ namedtype.NamedType(
+ 'msgPrivacyParameters',
+ univ.OctetString())
)
@@ -56,10 +67,14 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
AUTH_SERVICES = {
hmacmd5.HmacMd5.SERVICE_ID: hmacmd5.HmacMd5(),
hmacsha.HmacSha.SERVICE_ID: hmacsha.HmacSha(),
- hmacsha2.HmacSha2.SHA224_SERVICE_ID: hmacsha2.HmacSha2(hmacsha2.HmacSha2.SHA224_SERVICE_ID),
- hmacsha2.HmacSha2.SHA256_SERVICE_ID: hmacsha2.HmacSha2(hmacsha2.HmacSha2.SHA256_SERVICE_ID),
- hmacsha2.HmacSha2.SHA384_SERVICE_ID: hmacsha2.HmacSha2(hmacsha2.HmacSha2.SHA384_SERVICE_ID),
- hmacsha2.HmacSha2.SHA512_SERVICE_ID: hmacsha2.HmacSha2(hmacsha2.HmacSha2.SHA512_SERVICE_ID),
+ hmacsha2.HmacSha2.SHA224_SERVICE_ID: hmacsha2.HmacSha2(
+ hmacsha2.HmacSha2.SHA224_SERVICE_ID),
+ hmacsha2.HmacSha2.SHA256_SERVICE_ID: hmacsha2.HmacSha2(
+ hmacsha2.HmacSha2.SHA256_SERVICE_ID),
+ hmacsha2.HmacSha2.SHA384_SERVICE_ID: hmacsha2.HmacSha2(
+ hmacsha2.HmacSha2.SHA384_SERVICE_ID),
+ hmacsha2.HmacSha2.SHA512_SERVICE_ID: hmacsha2.HmacSha2(
+ hmacsha2.HmacSha2.SHA512_SERVICE_ID),
noauth.NoAuth.SERVICE_ID: noauth.NoAuth(),
}
@@ -76,21 +91,23 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
def __init__(self):
AbstractSecurityModel.__init__(self)
- self.__securityParametersSpec = UsmSecurityParameters()
- self.__timeline = {}
- self.__timelineExpQueue = {}
- self.__expirationTimer = 0
- self.__paramsBranchId = -1
+ self._securityParametersSpec = UsmSecurityParameters()
+ self._timeline = {}
+ self._timelineExpQueue = {}
+ self._expirationTimer = 0
+ self._paramsBranchId = -1
- def __sec2usr(self, snmpEngine, securityName, securityEngineID=None):
+ def _sec2usr(self, snmpEngine, securityName, securityEngineID=None):
mibBuilder = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder
- usmUserEngineID, = mibBuilder.importSymbols('SNMP-USER-BASED-SM-MIB',
- 'usmUserEngineID')
- if self.__paramsBranchId != usmUserEngineID.branchVersionId:
+
+ usmUserEngineID, = mibBuilder.importSymbols(
+ 'SNMP-USER-BASED-SM-MIB', 'usmUserEngineID')
+
+ if self._paramsBranchId != usmUserEngineID.branchVersionId:
usmUserName, usmUserSecurityName = mibBuilder.importSymbols(
'SNMP-USER-BASED-SM-MIB', 'usmUserName', 'usmUserSecurityName')
- self.__securityToUserMap = {}
+ self._securityToUserMap = {}
nextMibNode = usmUserEngineID
@@ -99,108 +116,138 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
nextMibNode = usmUserEngineID.getNextNode(nextMibNode.name)
except NoSuchInstanceError:
- self.__paramsBranchId = usmUserEngineID.branchVersionId
+ self._paramsBranchId = usmUserEngineID.branchVersionId
+
debug.logger & debug.FLAG_SM and debug.logger(
- '_sec2usr: built snmpEngineId + securityName to userName map, version %s: %r' % (
- self.__paramsBranchId, self.__securityToUserMap))
+ '_sec2usr: built snmpEngineId + securityName to '
+ 'userName map, version %s: %r' % (
+ self._paramsBranchId, self._securityToUserMap))
break
instId = nextMibNode.name[len(usmUserSecurityName.name):]
- __engineID = usmUserEngineID.getNode(usmUserEngineID.name + instId).syntax
- __userName = usmUserName.getNode(usmUserName.name + instId).syntax
- __securityName = usmUserSecurityName.getNode(usmUserSecurityName.name + instId).syntax
+ _engineID = usmUserEngineID.getNode(
+ usmUserEngineID.name + instId).syntax
+ _userName = usmUserName.getNode(
+ usmUserName.name + instId).syntax
+ _securityName = usmUserSecurityName.getNode(
+ usmUserSecurityName.name + instId).syntax
- k = __engineID, __securityName
+ k = _engineID, _securityName
# first (lesser) securityName wins
- if k not in self.__securityToUserMap:
- self.__securityToUserMap[k] = __userName
+ if k not in self._securityToUserMap:
+ self._securityToUserMap[k] = _userName
if securityEngineID is None:
- snmpEngineID, = mibBuilder.importSymbols('__SNMP-FRAMEWORK-MIB', 'snmpEngineID')
+ snmpEngineID, = mibBuilder.importSymbols(
+ '__SNMP-FRAMEWORK-MIB', 'snmpEngineID')
securityEngineID = snmpEngineID.syntax
try:
- userName = self.__securityToUserMap[(securityEngineID, securityName)]
+ userName = self._securityToUserMap[(securityEngineID, securityName)]
+
except KeyError:
debug.logger & debug.FLAG_SM and debug.logger(
- '_sec2usr: no entry exists for snmpEngineId %r, securityName %r' % (securityEngineID, securityName))
+ '_sec2usr: no entry exists for snmpEngineId %r, securityName '
+ '%r' % (securityEngineID, securityName))
raise NoSuchInstanceError() # emulate MIB lookup
debug.logger & debug.FLAG_SM and debug.logger(
- '_sec2usr: using userName %r for snmpEngineId %r, securityName %r' % (
- userName, securityEngineID, securityName))
+ '_sec2usr: using userName %r for snmpEngineId %r, securityName '
+ '%r' % (userName, securityEngineID, securityName))
return userName
@staticmethod
- def __getUserInfo(mibInstrumController, securityEngineID, userName):
- usmUserEntry, = mibInstrumController.mibBuilder.importSymbols(
- 'SNMP-USER-BASED-SM-MIB', 'usmUserEntry'
- )
+ def _getUserInfo(mibInstrumController, securityEngineID, userName):
+ mibBuilder = mibInstrumController.mibBuilder
+
+ usmUserEntry, = mibBuilder.importSymbols(
+ 'SNMP-USER-BASED-SM-MIB', 'usmUserEntry')
+
tblIdx = usmUserEntry.getInstIdFromIndices(securityEngineID, userName)
+
# Get userName & securityName
usmUserName = usmUserEntry.getNode(usmUserEntry.name + (2,) + tblIdx).syntax
usmUserSecurityName = usmUserEntry.getNode(usmUserEntry.name + (3,) + tblIdx).syntax
+
# Get protocols
usmUserAuthProtocol = usmUserEntry.getNode(usmUserEntry.name + (5,) + tblIdx).syntax
usmUserPrivProtocol = usmUserEntry.getNode(usmUserEntry.name + (8,) + tblIdx).syntax
+
# Get keys
- pysnmpUsmKeyEntry, = mibInstrumController.mibBuilder.importSymbols(
- 'PYSNMP-USM-MIB', 'pysnmpUsmKeyEntry'
- )
- pysnmpUsmKeyAuthLocalized = pysnmpUsmKeyEntry.getNode(pysnmpUsmKeyEntry.name + (1,) + tblIdx).syntax
- pysnmpUsmKeyPrivLocalized = pysnmpUsmKeyEntry.getNode(pysnmpUsmKeyEntry.name + (2,) + tblIdx).syntax
+ pysnmpUsmKeyEntry, = mibBuilder.importSymbols(
+ 'PYSNMP-USM-MIB', 'pysnmpUsmKeyEntry')
+
+ pysnmpUsmKeyAuthLocalized = pysnmpUsmKeyEntry.getNode(
+ pysnmpUsmKeyEntry.name + (1,) + tblIdx).syntax
+ pysnmpUsmKeyPrivLocalized = pysnmpUsmKeyEntry.getNode(
+ pysnmpUsmKeyEntry.name + (2,) + tblIdx).syntax
+
return (usmUserName, usmUserSecurityName, usmUserAuthProtocol,
pysnmpUsmKeyAuthLocalized, usmUserPrivProtocol,
pysnmpUsmKeyPrivLocalized)
- def __cloneUserInfo(self, snmpEngine, securityEngineID, userName):
+ def _cloneUserInfo(self, snmpEngine, securityEngineID, userName):
mibInstrumController = snmpEngine.msgAndPduDsp.mibInstrumController
+ mibBuilder = mibInstrumController.mibBuilder
+
+ snmpEngineID, = mibBuilder.importSymbols(
+ '__SNMP-FRAMEWORK-MIB', 'snmpEngineID')
- snmpEngineID, = mibInstrumController.mibBuilder.importSymbols(
- '__SNMP-FRAMEWORK-MIB', 'snmpEngineID'
- )
# Proto entry
- usmUserEntry, = mibInstrumController.mibBuilder.importSymbols(
- 'SNMP-USER-BASED-SM-MIB', 'usmUserEntry'
- )
+ usmUserEntry, = mibBuilder.importSymbols(
+ 'SNMP-USER-BASED-SM-MIB', 'usmUserEntry')
+
tblIdx1 = usmUserEntry.getInstIdFromIndices(
- snmpEngineID.syntax, userName
- )
+ snmpEngineID.syntax, userName)
+
# Get proto protocols
- usmUserName = usmUserEntry.getNode(usmUserEntry.name + (2,) + tblIdx1)
- usmUserSecurityName = usmUserEntry.getNode(usmUserEntry.name + (3,) + tblIdx1)
- usmUserCloneFrom = usmUserEntry.getNode(usmUserEntry.name + (4,) + tblIdx1)
- usmUserAuthProtocol = usmUserEntry.getNode(usmUserEntry.name + (5,) + tblIdx1)
- usmUserPrivProtocol = usmUserEntry.getNode(usmUserEntry.name + (8,) + tblIdx1)
+ usmUserName = usmUserEntry.getNode(
+ usmUserEntry.name + (2,) + tblIdx1)
+ usmUserSecurityName = usmUserEntry.getNode(
+ usmUserEntry.name + (3,) + tblIdx1)
+ usmUserCloneFrom = usmUserEntry.getNode(
+ usmUserEntry.name + (4,) + tblIdx1)
+ usmUserAuthProtocol = usmUserEntry.getNode(
+ usmUserEntry.name + (5,) + tblIdx1)
+ usmUserPrivProtocol = usmUserEntry.getNode(
+ usmUserEntry.name + (8,) + tblIdx1)
+
# Get proto keys
- pysnmpUsmKeyEntry, = mibInstrumController.mibBuilder.importSymbols(
- 'PYSNMP-USM-MIB', 'pysnmpUsmKeyEntry'
- )
- pysnmpUsmKeyAuth = pysnmpUsmKeyEntry.getNode(pysnmpUsmKeyEntry.name + (3,) + tblIdx1)
- pysnmpUsmKeyPriv = pysnmpUsmKeyEntry.getNode(pysnmpUsmKeyEntry.name + (4,) + tblIdx1)
+ pysnmpUsmKeyEntry, = mibBuilder.importSymbols(
+ 'PYSNMP-USM-MIB', 'pysnmpUsmKeyEntry')
+
+ pysnmpUsmKeyAuth = pysnmpUsmKeyEntry.getNode(
+ pysnmpUsmKeyEntry.name + (3,) + tblIdx1)
+ pysnmpUsmKeyPriv = pysnmpUsmKeyEntry.getNode(
+ pysnmpUsmKeyEntry.name + (4,) + tblIdx1)
# Create new row from proto values
- tblIdx2 = usmUserEntry.getInstIdFromIndices(securityEngineID, userName)
+ tblIdx2 = usmUserEntry.getInstIdFromIndices(
+ securityEngineID, userName)
# New inactive row
mibInstrumController.writeMibObjects(
- (usmUserEntry.name + (13,) + tblIdx2, 5), snmpEngine=snmpEngine
- )
+ (usmUserEntry.name + (13,) + tblIdx2, 5), snmpEngine=snmpEngine)
# Set user&securityNames
- usmUserEntry.getNode(usmUserEntry.name + (2,) + tblIdx2).syntax = usmUserName.syntax
- usmUserEntry.getNode(usmUserEntry.name + (3,) + tblIdx2).syntax = usmUserSecurityName.syntax
+ usmUserEntry.getNode(
+ usmUserEntry.name + (2,) + tblIdx2).syntax = usmUserName.syntax
+ usmUserEntry.getNode(
+ usmUserEntry.name + (3,) + tblIdx2).syntax = usmUserSecurityName.syntax
# Store a reference to original row
- usmUserEntry.getNode(usmUserEntry.name + (4,) + tblIdx2).syntax = usmUserCloneFrom.syntax.clone(tblIdx1)
+ usmUserEntry.getNode(
+ usmUserEntry.name + (4,) + tblIdx2).syntax = usmUserCloneFrom.syntax.clone(tblIdx1)
# Set protocols
- usmUserEntry.getNode(usmUserEntry.name + (5,) + tblIdx2).syntax = usmUserAuthProtocol.syntax
- usmUserEntry.getNode(usmUserEntry.name + (8,) + tblIdx2).syntax = usmUserPrivProtocol.syntax
+ usmUserEntry.getNode(
+ usmUserEntry.name + (5,) + tblIdx2).syntax = usmUserAuthProtocol.syntax
+ usmUserEntry.getNode(
+ usmUserEntry.name + (8,) + tblIdx2).syntax = usmUserPrivProtocol.syntax
# Activate row
mibInstrumController.writeMibObjects(
@@ -208,98 +255,124 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
)
# Localize and set keys
- pysnmpUsmKeyEntry, = mibInstrumController.mibBuilder.importSymbols(
- 'PYSNMP-USM-MIB', 'pysnmpUsmKeyEntry'
- )
+ pysnmpUsmKeyEntry, = mibBuilder.importSymbols(
+ 'PYSNMP-USM-MIB', 'pysnmpUsmKeyEntry')
pysnmpUsmKeyAuthLocalized = pysnmpUsmKeyEntry.getNode(
- pysnmpUsmKeyEntry.name + (1,) + tblIdx2
- )
+ pysnmpUsmKeyEntry.name + (1,) + tblIdx2)
+
if usmUserAuthProtocol.syntax in self.AUTH_SERVICES:
localizeKey = self.AUTH_SERVICES[usmUserAuthProtocol.syntax].localizeKey
- localAuthKey = localizeKey(pysnmpUsmKeyAuth.syntax,
- securityEngineID)
+ localAuthKey = localizeKey(
+ pysnmpUsmKeyAuth.syntax, securityEngineID)
+
else:
raise error.StatusInformation(
- errorIndication=errind.unsupportedAuthProtocol
- )
+ errorIndication=errind.unsupportedAuthProtocol)
+
if localAuthKey is not None:
pysnmpUsmKeyAuthLocalized.syntax = pysnmpUsmKeyAuthLocalized.syntax.clone(localAuthKey)
+
pysnmpUsmKeyPrivLocalized = pysnmpUsmKeyEntry.getNode(
- pysnmpUsmKeyEntry.name + (2,) + tblIdx2
- )
+ pysnmpUsmKeyEntry.name + (2,) + tblIdx2)
+
if usmUserPrivProtocol.syntax in self.PRIV_SERVICES:
localizeKey = self.PRIV_SERVICES[usmUserPrivProtocol.syntax].localizeKey
- localPrivKey = localizeKey(usmUserAuthProtocol.syntax,
- pysnmpUsmKeyPriv.syntax,
- securityEngineID)
+ localPrivKey = localizeKey(
+ usmUserAuthProtocol.syntax, pysnmpUsmKeyPriv.syntax,
+ securityEngineID)
+
else:
- raise error.StatusInformation(errorIndication=errind.unsupportedPrivProtocol)
+ raise error.StatusInformation(
+ errorIndication=errind.unsupportedPrivProtocol)
+
if localPrivKey is not None:
pysnmpUsmKeyPrivLocalized.syntax = pysnmpUsmKeyPrivLocalized.syntax.clone(localPrivKey)
+
return (usmUserName.syntax, usmUserSecurityName.syntax,
usmUserAuthProtocol.syntax, pysnmpUsmKeyAuthLocalized.syntax,
usmUserPrivProtocol.syntax, pysnmpUsmKeyPrivLocalized.syntax)
- def __generateRequestOrResponseMsg(self, snmpEngine,
- messageProcessingModel,
- globalData, maxMessageSize,
- securityModel, securityEngineID,
- securityName, securityLevel,
- scopedPDU, securityStateReference):
+ def _generateRequestOrResponseMsg(self, snmpEngine,
+ messageProcessingModel,
+ globalData, maxMessageSize,
+ securityModel, securityEngineID,
+ securityName, securityLevel,
+ scopedPDU, securityStateReference):
+
mibBuilder = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder
- snmpEngineID = mibBuilder.importSymbols('__SNMP-FRAMEWORK-MIB', 'snmpEngineID')[0].syntax
+
+ snmpEngineID = mibBuilder.importSymbols(
+ '__SNMP-FRAMEWORK-MIB', 'snmpEngineID')[0].syntax
# 3.1.1
if securityStateReference is not None:
# 3.1.1a
cachedSecurityData = self._cache.pop(securityStateReference)
+
usmUserName = cachedSecurityData['msgUserName']
+
if 'usmUserSecurityName' in cachedSecurityData:
usmUserSecurityName = cachedSecurityData['usmUserSecurityName']
+
else:
usmUserSecurityName = usmUserName
+
if 'usmUserAuthProtocol' in cachedSecurityData:
usmUserAuthProtocol = cachedSecurityData['usmUserAuthProtocol']
+
else:
usmUserAuthProtocol = noauth.NoAuth.SERVICE_ID
+
if 'usmUserAuthKeyLocalized' in cachedSecurityData:
usmUserAuthKeyLocalized = cachedSecurityData['usmUserAuthKeyLocalized']
+
else:
usmUserAuthKeyLocalized = None
+
if 'usmUserPrivProtocol' in cachedSecurityData:
usmUserPrivProtocol = cachedSecurityData['usmUserPrivProtocol']
+
else:
usmUserPrivProtocol = nopriv.NoPriv.SERVICE_ID
+
if 'usmUserPrivKeyLocalized' in cachedSecurityData:
usmUserPrivKeyLocalized = cachedSecurityData['usmUserPrivKeyLocalized']
+
else:
usmUserPrivKeyLocalized = None
+
securityEngineID = snmpEngineID
- debug.logger & debug.FLAG_SM and debug.logger('__generateRequestOrResponseMsg: user info read from cache')
+
+ debug.logger & debug.FLAG_SM and debug.logger(
+ '__generateRequestOrResponseMsg: user info read from cache')
+
elif securityName:
# 3.1.1b
try:
(usmUserName, usmUserSecurityName, usmUserAuthProtocol,
usmUserAuthKeyLocalized, usmUserPrivProtocol,
- usmUserPrivKeyLocalized) = self.__getUserInfo(
+ usmUserPrivKeyLocalized) = self._getUserInfo(
snmpEngine.msgAndPduDsp.mibInstrumController,
securityEngineID,
- self.__sec2usr(snmpEngine, securityName, securityEngineID)
+ self._sec2usr(snmpEngine, securityName, securityEngineID)
)
- debug.logger & debug.FLAG_SM and debug.logger('__generateRequestOrResponseMsg: read user info')
+ debug.logger & debug.FLAG_SM and debug.logger(
+ '__generateRequestOrResponseMsg: read user info')
except NoSuchInstanceError:
- pysnmpUsmDiscovery, = mibBuilder.importSymbols('__PYSNMP-USM-MIB', 'pysnmpUsmDiscovery')
+ pysnmpUsmDiscovery, = mibBuilder.importSymbols(
+ '__PYSNMP-USM-MIB', 'pysnmpUsmDiscovery')
+
reportUnknownName = not pysnmpUsmDiscovery.syntax
+
if not reportUnknownName:
try:
(usmUserName, usmUserSecurityName,
usmUserAuthProtocol, usmUserAuthKeyLocalized,
usmUserPrivProtocol,
- usmUserPrivKeyLocalized) = self.__cloneUserInfo(
- snmpEngine,
- securityEngineID,
- self.__sec2usr(snmpEngine, securityName)
+ usmUserPrivKeyLocalized) = self._cloneUserInfo(
+ snmpEngine, securityEngineID,
+ self._sec2usr(snmpEngine, securityName)
)
except NoSuchInstanceError:
@@ -307,31 +380,40 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
if reportUnknownName:
raise error.StatusInformation(
- errorIndication=errind.unknownSecurityName
- )
+ errorIndication=errind.unknownSecurityName)
- debug.logger & debug.FLAG_SM and debug.logger('__generateRequestOrResponseMsg: clone user info')
+ debug.logger & debug.FLAG_SM and debug.logger(
+ '__generateRequestOrResponseMsg: clone user info')
except PyAsn1Error as exc:
debug.logger & debug.FLAG_SM and debug.logger(
'__generateRequestOrResponseMsg: %s' % exc)
- snmpInGenErrs, = mibBuilder.importSymbols('__SNMPv2-MIB', 'snmpInGenErrs')
+
+ snmpInGenErrs, = mibBuilder.importSymbols(
+ '__SNMPv2-MIB', 'snmpInGenErrs')
snmpInGenErrs.syntax += 1
- raise error.StatusInformation(
- errorIndication=errind.invalidMsg
- )
+
+ raise error.StatusInformation(errorIndication=errind.invalidMsg)
+
else:
# empty username used for engineID discovery
usmUserName = usmUserSecurityName = null
usmUserAuthProtocol = noauth.NoAuth.SERVICE_ID
usmUserPrivProtocol = nopriv.NoPriv.SERVICE_ID
+
usmUserAuthKeyLocalized = usmUserPrivKeyLocalized = None
- debug.logger & debug.FLAG_SM and debug.logger('__generateRequestOrResponseMsg: use empty USM data')
+
+ debug.logger & debug.FLAG_SM and debug.logger(
+ '__generateRequestOrResponseMsg: use empty USM data')
# noinspection PyUnboundLocalVariable
debug.logger & debug.FLAG_SM and debug.logger(
- '__generateRequestOrResponseMsg: local usmUserName %r usmUserSecurityName %r usmUserAuthProtocol %s usmUserPrivProtocol %s securityEngineID %r securityName %r' % (
- usmUserName, usmUserSecurityName, usmUserAuthProtocol, usmUserPrivProtocol, securityEngineID, securityName))
+ '__generateRequestOrResponseMsg: local usmUserName %r '
+ 'usmUserSecurityName %r usmUserAuthProtocol %s '
+ 'usmUserPrivProtocol %s securityEngineID %r '
+ 'securityName %r' % (
+ usmUserName, usmUserSecurityName, usmUserAuthProtocol,
+ usmUserPrivProtocol, securityEngineID, securityName))
msg = globalData
@@ -340,22 +422,21 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
if (usmUserAuthProtocol == noauth.NoAuth.SERVICE_ID or
usmUserPrivProtocol == nopriv.NoPriv.SERVICE_ID):
raise error.StatusInformation(
- errorIndication=errind.unsupportedSecurityLevel
- )
+ errorIndication=errind.unsupportedSecurityLevel)
# 3.1.3
if securityLevel == 3 or securityLevel == 2:
if usmUserAuthProtocol == noauth.NoAuth.SERVICE_ID:
raise error.StatusInformation(
- errorIndication=errind.unsupportedSecurityLevel
- )
+ errorIndication=errind.unsupportedSecurityLevel)
- securityParameters = self.__securityParametersSpec
+ securityParameters = self._securityParametersSpec
scopedPDUData = msg.setComponentByPosition(3).getComponentByPosition(3)
+
scopedPDUData.setComponentByPosition(
- 0, scopedPDU, verifyConstraints=False, matchTags=False, matchConstraints=False
- )
+ 0, scopedPDU, verifyConstraints=False, matchTags=False,
+ matchConstraints=False)
snmpEngineBoots = snmpEngineTime = 0
@@ -364,161 +445,180 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
# 3.1.6.b
if pdu.tagSet in rfc3411.UNCONFIRMED_CLASS_PDUS:
- (snmpEngineBoots,
- snmpEngineTime) = mibBuilder.importSymbols('__SNMP-FRAMEWORK-MIB', 'snmpEngineBoots', 'snmpEngineTime')
+ snmpEngineBoots, snmpEngineTime = mibBuilder.importSymbols(
+ '__SNMP-FRAMEWORK-MIB', 'snmpEngineBoots',
+ 'snmpEngineTime')
snmpEngineBoots = snmpEngineBoots.syntax
snmpEngineTime = snmpEngineTime.syntax.clone()
debug.logger & debug.FLAG_SM and debug.logger(
- '__generateRequestOrResponseMsg: read snmpEngineBoots, snmpEngineTime from LCD')
+ '__generateRequestOrResponseMsg: read snmpEngineBoots, '
+ 'snmpEngineTime from LCD')
# 3.1.6a
- elif securityEngineID in self.__timeline:
+ elif securityEngineID in self._timeline:
(snmpEngineBoots,
snmpEngineTime,
latestReceivedEngineTime,
- latestUpdateTimestamp) = self.__timeline[securityEngineID]
+ latestUpdateTimestamp) = self._timeline[securityEngineID]
debug.logger & debug.FLAG_SM and debug.logger(
- '__generateRequestOrResponseMsg: read snmpEngineBoots, snmpEngineTime from timeline')
+ '__generateRequestOrResponseMsg: read snmpEngineBoots, '
+ 'snmpEngineTime from timeline')
# 3.1.6.c
else:
debug.logger & debug.FLAG_SM and debug.logger(
- '__generateRequestOrResponseMsg: assuming zero snmpEngineBoots, snmpEngineTime')
+ '__generateRequestOrResponseMsg: assuming zero '
+ 'snmpEngineBoots, snmpEngineTime')
debug.logger & debug.FLAG_SM and debug.logger(
- '__generateRequestOrResponseMsg: use snmpEngineBoots %s snmpEngineTime %s for securityEngineID %r' % (
+ '__generateRequestOrResponseMsg: use snmpEngineBoots %s '
+ 'snmpEngineTime %s for securityEngineID %r' % (
snmpEngineBoots, snmpEngineTime, securityEngineID))
# 3.1.4a
if securityLevel == 3:
if usmUserPrivProtocol in self.PRIV_SERVICES:
privHandler = self.PRIV_SERVICES[usmUserPrivProtocol]
+
else:
raise error.StatusInformation(
- errorIndication=errind.encryptionError
- )
+ errorIndication=errind.encryptionError)
debug.logger & debug.FLAG_SM and debug.logger(
- '__generateRequestOrResponseMsg: scopedPDU %s' % scopedPDU.prettyPrint())
+ '__generateRequestOrResponseMsg: scopedPDU '
+ '%s' % scopedPDU.prettyPrint())
try:
dataToEncrypt = encoder.encode(scopedPDU)
except PyAsn1Error as exc:
debug.logger & debug.FLAG_SM and debug.logger(
- '__generateRequestOrResponseMsg: scopedPDU serialization error: %s' % exc)
+ '__generateRequestOrResponseMsg: scopedPDU serialization '
+ 'error: %s' % exc)
raise error.StatusInformation(
- errorIndication=errind.serializationError
- )
+ errorIndication=errind.serializationError)
debug.logger & debug.FLAG_SM and debug.logger(
- '__generateRequestOrResponseMsg: scopedPDU encoded into %s' % debug.hexdump(dataToEncrypt))
+ '__generateRequestOrResponseMsg: scopedPDU encoded into '
+ '%s' % debug.hexdump(dataToEncrypt))
# noinspection PyUnboundLocalVariable
- (encryptedData,
- privParameters) = privHandler.encryptData(
+ encryptedData, privParameters = privHandler.encryptData(
usmUserPrivKeyLocalized,
- (snmpEngineBoots, snmpEngineTime, None), dataToEncrypt
- )
+ (snmpEngineBoots, snmpEngineTime, None),
+ dataToEncrypt)
securityParameters.setComponentByPosition(
- 5, privParameters, verifyConstraints=False, matchTags=False, matchConstraints=False
- )
+ 5, privParameters, verifyConstraints=False, matchTags=False,
+ matchConstraints=False)
+
scopedPDUData.setComponentByPosition(
- 1, encryptedData, verifyConstraints=False, matchTags=False, matchConstraints=False
- )
+ 1, encryptedData, verifyConstraints=False, matchTags=False,
+ matchConstraints=False)
debug.logger & debug.FLAG_SM and debug.logger(
- '__generateRequestOrResponseMsg: scopedPDU ciphered into %s' % debug.hexdump(encryptedData))
+ '__generateRequestOrResponseMsg: scopedPDU ciphered into '
+ '%s' % debug.hexdump(encryptedData))
# 3.1.4b
elif securityLevel == 1 or securityLevel == 2:
securityParameters.setComponentByPosition(5, '')
- debug.logger & debug.FLAG_SM and debug.logger('__generateRequestOrResponseMsg: %s' % scopedPDUData.prettyPrint())
+ debug.logger & debug.FLAG_SM and debug.logger(
+ '__generateRequestOrResponseMsg: %s' % scopedPDUData.prettyPrint())
# 3.1.5
securityParameters.setComponentByPosition(
- 0, securityEngineID, verifyConstraints=False, matchTags=False, matchConstraints=False
- )
+ 0, securityEngineID, verifyConstraints=False, matchTags=False,
+ matchConstraints=False)
+
securityParameters.setComponentByPosition(
- 1, snmpEngineBoots, verifyConstraints=False, matchTags=False, matchConstraints=False
- )
+ 1, snmpEngineBoots, verifyConstraints=False, matchTags=False,
+ matchConstraints=False)
+
securityParameters.setComponentByPosition(
- 2, snmpEngineTime, verifyConstraints=False, matchTags=False, matchConstraints=False
- )
+ 2, snmpEngineTime, verifyConstraints=False, matchTags=False,
+ matchConstraints=False)
# 3.1.7
securityParameters.setComponentByPosition(
- 3, usmUserName, verifyConstraints=False, matchTags=False, matchConstraints=False
- )
+ 3, usmUserName, verifyConstraints=False, matchTags=False,
+ matchConstraints=False)
# 3.1.8a
if securityLevel == 3 or securityLevel == 2:
if usmUserAuthProtocol in self.AUTH_SERVICES:
authHandler = self.AUTH_SERVICES[usmUserAuthProtocol]
+
else:
raise error.StatusInformation(
- errorIndication=errind.authenticationFailure
- )
+ errorIndication=errind.authenticationFailure)
# extra-wild hack to facilitate BER substrate in-place re-write
securityParameters.setComponentByPosition(
- 4, '\x00' * authHandler.digestLength
- )
+ 4, '\x00' * authHandler.digestLength)
debug.logger & debug.FLAG_SM and debug.logger(
- '__generateRequestOrResponseMsg: %s' % (securityParameters.prettyPrint(),))
+ '__generateRequestOrResponseMsg: '
+ '%s' % (securityParameters.prettyPrint(),))
try:
- msg.setComponentByPosition(2, encoder.encode(securityParameters), verifyConstraints=False)
+ msg.setComponentByPosition(
+ 2, encoder.encode(securityParameters),
+ verifyConstraints=False)
except PyAsn1Error as exc:
debug.logger & debug.FLAG_SM and debug.logger(
- '__generateRequestOrResponseMsg: securityParameters serialization error: %s' % exc)
+ '__generateRequestOrResponseMsg: securityParameters '
+ 'serialization error: %s' % exc)
+
raise error.StatusInformation(
- errorIndication=errind.serializationError
- )
+ errorIndication=errind.serializationError)
debug.logger & debug.FLAG_SM and debug.logger(
- '__generateRequestOrResponseMsg: auth outgoing msg: %s' % msg.prettyPrint())
+ '__generateRequestOrResponseMsg: auth outgoing msg: '
+ '%s' % msg.prettyPrint())
try:
wholeMsg = encoder.encode(msg)
except PyAsn1Error as exc:
debug.logger & debug.FLAG_SM and debug.logger(
- '__generateRequestOrResponseMsg: msg serialization error: %s' % exc)
+ '__generateRequestOrResponseMsg: msg serialization '
+ 'error: %s' % exc)
+
raise error.StatusInformation(
- errorIndication=errind.serializationError
- )
+ errorIndication=errind.serializationError)
# noinspection PyUnboundLocalVariable
authenticatedWholeMsg = authHandler.authenticateOutgoingMsg(
- usmUserAuthKeyLocalized, wholeMsg
- )
+ usmUserAuthKeyLocalized, wholeMsg)
# 3.1.8b
else:
securityParameters.setComponentByPosition(
- 4, '', verifyConstraints=False, matchTags=False, matchConstraints=False
- )
+ 4, '', verifyConstraints=False, matchTags=False,
+ matchConstraints=False)
debug.logger & debug.FLAG_SM and debug.logger(
- '__generateRequestOrResponseMsg: %s' % (securityParameters.prettyPrint(),))
+ '__generateRequestOrResponseMsg: '
+ '%s' % (securityParameters.prettyPrint(),))
try:
- msg.setComponentByPosition(2, encoder.encode(securityParameters), verifyConstraints=False, matchTags=False, matchConstraints=False)
+ msg.setComponentByPosition(
+ 2, encoder.encode(securityParameters), verifyConstraints=False,
+ matchTags=False, matchConstraints=False)
except PyAsn1Error as exc:
debug.logger & debug.FLAG_SM and debug.logger(
- '__generateRequestOrResponseMsg: secutiryParameters serialization error: %s' % exc)
+ '__generateRequestOrResponseMsg: securityParameters '
+ 'serialization error: %s' % exc)
+
raise error.StatusInformation(
- errorIndication=errind.serializationError
- )
+ errorIndication=errind.serializationError)
try:
debug.logger & debug.FLAG_SM and debug.logger(
@@ -527,13 +627,16 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
except PyAsn1Error as exc:
debug.logger & debug.FLAG_SM and debug.logger(
- '__generateRequestOrResponseMsg: msg serialization error: %s' % exc)
+ '__generateRequestOrResponseMsg: msg serialization error: '
+ '%s' % exc)
+
raise error.StatusInformation(
- errorIndication=errind.serializationError
- )
+ errorIndication=errind.serializationError)
- debug.logger & debug.FLAG_SM and debug.logger('__generateRequestOrResponseMsg: %s outgoing msg: %s' % (
- securityLevel > 1 and "authenticated" or "plain", debug.hexdump(authenticatedWholeMsg)))
+ debug.logger & debug.FLAG_SM and debug.logger(
+ '__generateRequestOrResponseMsg: %s outgoing msg: '
+ '%s' % (securityLevel > 1 and "authenticated" or
+ "plain", debug.hexdump(authenticatedWholeMsg)))
# 3.1.9
return msg.getComponentByPosition(2), authenticatedWholeMsg
@@ -542,88 +645,99 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
globalData, maxMessageSize, securityModel,
securityEngineID, securityName, securityLevel,
scopedPDU):
- return self.__generateRequestOrResponseMsg(snmpEngine,
- messageProcessingModel,
- globalData,
- maxMessageSize,
- securityModel,
- securityEngineID,
- securityName,
- securityLevel,
- scopedPDU,
- None)
+
+ return self._generateRequestOrResponseMsg(
+ snmpEngine, messageProcessingModel, globalData,
+ maxMessageSize, securityModel, securityEngineID,
+ securityName, securityLevel, scopedPDU, None)
def generateResponseMsg(self, snmpEngine, messageProcessingModel,
globalData, maxMessageSize, securityModel,
securityEngineID, securityName, securityLevel,
scopedPDU, securityStateReference):
- return self.__generateRequestOrResponseMsg(
+
+ return self._generateRequestOrResponseMsg(
snmpEngine, messageProcessingModel, globalData,
maxMessageSize, securityModel, securityEngineID,
- securityName, securityLevel, scopedPDU, securityStateReference
+ securityName, securityLevel, scopedPDU,
+ securityStateReference
)
# 3.2
def processIncomingMsg(self, snmpEngine, messageProcessingModel,
maxMessageSize, securityParameters,
securityModel, securityLevel, wholeMsg, msg):
+
mibBuilder = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder
# 3.2.9 -- moved up here to be able to report
# maxSizeResponseScopedPDU on error
# (48 - maximum SNMPv3 header length)
- maxSizeResponseScopedPDU = int(maxMessageSize) - len(securityParameters) - 48
+ maxSizeResponseScopedPDU = (int(maxMessageSize) -
+ len(securityParameters) - 48)
debug.logger & debug.FLAG_SM and debug.logger(
- 'processIncomingMsg: securityParameters %s' % debug.hexdump(securityParameters))
+ 'processIncomingMsg: securityParameters '
+ '%s' % debug.hexdump(securityParameters))
# 3.2.1
securityParameters, rest = decoder.decode(
- securityParameters, asn1Spec=self.__securityParametersSpec
- )
+ securityParameters, asn1Spec=self._securityParametersSpec)
- debug.logger & debug.FLAG_SM and debug.logger('processIncomingMsg: %s' % (securityParameters.prettyPrint(),))
+ debug.logger & debug.FLAG_SM and debug.logger(
+ 'processIncomingMsg: %s' % (securityParameters.prettyPrint(),))
if eoo.endOfOctets.isSameTypeWith(securityParameters):
raise error.StatusInformation(errorIndication=errind.parseError)
# 3.2.2
msgAuthoritativeEngineId = securityParameters.getComponentByPosition(0)
+
securityStateReference = self._cache.push(
msgUserName=securityParameters.getComponentByPosition(3)
)
debug.logger & debug.FLAG_SM and debug.logger(
- 'processIncomingMsg: cache write securityStateReference %s by msgUserName %s' % (
- securityStateReference, securityParameters.getComponentByPosition(3)))
+ 'processIncomingMsg: cache write securityStateReference %s by '
+ 'msgUserName %s' % (securityStateReference,
+ securityParameters.getComponentByPosition(3)))
scopedPduData = msg.getComponentByPosition(3)
# Used for error reporting
- contextEngineId = mibBuilder.importSymbols('__SNMP-FRAMEWORK-MIB', 'snmpEngineID')[0].syntax
+ contextEngineId = mibBuilder.importSymbols(
+ '__SNMP-FRAMEWORK-MIB', 'snmpEngineID')[0].syntax
contextName = null
- snmpEngineID = mibBuilder.importSymbols('__SNMP-FRAMEWORK-MIB', 'snmpEngineID')[0].syntax
+ snmpEngineID = mibBuilder.importSymbols(
+ '__SNMP-FRAMEWORK-MIB', 'snmpEngineID')[0].syntax
# 3.2.3
if (msgAuthoritativeEngineId != snmpEngineID and
- msgAuthoritativeEngineId not in self.__timeline):
+ msgAuthoritativeEngineId not in self._timeline):
+
if (msgAuthoritativeEngineId and
4 < len(msgAuthoritativeEngineId) < 33):
# 3.2.3a - cloned user when request was sent
debug.logger & debug.FLAG_SM and debug.logger(
- 'processIncomingMsg: non-synchronized securityEngineID %r' % (msgAuthoritativeEngineId,))
+ 'processIncomingMsg: non-synchronized securityEngineID '
+ '%r' % (msgAuthoritativeEngineId,))
+
else:
# 3.2.3b
debug.logger & debug.FLAG_SM and debug.logger(
'processIncomingMsg: peer requested snmpEngineID discovery')
+
usmStatsUnknownEngineIDs, = mibBuilder.importSymbols(
'__SNMP-USER-BASED-SM-MIB', 'usmStatsUnknownEngineIDs')
usmStatsUnknownEngineIDs.syntax += 1
+
debug.logger & debug.FLAG_SM and debug.logger(
'processIncomingMsg: null or malformed msgAuthoritativeEngineId')
+
pysnmpUsmDiscoverable, = mibBuilder.importSymbols(
'__PYSNMP-USM-MIB', 'pysnmpUsmDiscoverable')
+
if pysnmpUsmDiscoverable.syntax:
debug.logger & debug.FLAG_SM and debug.logger(
'processIncomingMsg: starting snmpEngineID discovery procedure')
@@ -631,10 +745,11 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
# Report original contextName
if scopedPduData.getName() != 'plaintext':
debug.logger & debug.FLAG_SM and debug.logger(
- 'processIncomingMsg: scopedPduData not plaintext %s' % scopedPduData.prettyPrint())
+ 'processIncomingMsg: scopedPduData not plaintext '
+ '%s' % scopedPduData.prettyPrint())
+
raise error.StatusInformation(
- errorIndication=errind.unknownEngineID
- )
+ errorIndication=errind.unknownEngineID)
# 7.2.6.a.1
scopedPdu = scopedPduData.getComponent()
@@ -653,16 +768,18 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
maxSizeResponseScopedPDU=maxSizeResponseScopedPDU
)
else:
- debug.logger & debug.FLAG_SM and debug.logger('processIncomingMsg: will not discover EngineID')
+ debug.logger & debug.FLAG_SM and debug.logger(
+ 'processIncomingMsg: will not discover EngineID')
+
# free securityStateReference XXX
raise error.StatusInformation(
- errorIndication=errind.unknownEngineID
- )
+ errorIndication=errind.unknownEngineID)
msgUserName = securityParameters.getComponentByPosition(3)
debug.logger & debug.FLAG_SM and debug.logger(
- 'processIncomingMsg: read from securityParams msgAuthoritativeEngineId %r msgUserName %r' % (
+ 'processIncomingMsg: read from securityParams '
+ 'msgAuthoritativeEngineId %r msgUserName %r' % (
msgAuthoritativeEngineId, msgUserName))
if msgUserName:
@@ -673,16 +790,17 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
usmUserAuthProtocol,
usmUserAuthKeyLocalized,
usmUserPrivProtocol,
- usmUserPrivKeyLocalized) = self.__getUserInfo(
+ usmUserPrivKeyLocalized) = self._getUserInfo(
snmpEngine.msgAndPduDsp.mibInstrumController,
msgAuthoritativeEngineId, msgUserName
)
- debug.logger & debug.FLAG_SM and debug.logger('processIncomingMsg: read user info from LCD')
+ debug.logger & debug.FLAG_SM and debug.logger(
+ 'processIncomingMsg: read user info from LCD')
except NoSuchInstanceError:
debug.logger & debug.FLAG_SM and debug.logger(
- 'processIncomingMsg: unknown securityEngineID %r msgUserName %r' % (
- msgAuthoritativeEngineId, msgUserName))
+ 'processIncomingMsg: unknown securityEngineID %r '
+ 'msgUserName %r' % (msgAuthoritativeEngineId, msgUserName))
usmStatsUnknownUserNames, = mibBuilder.importSymbols(
'__SNMP-USER-BASED-SM-MIB', 'usmStatsUnknownUserNames')
@@ -701,10 +819,15 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
)
except PyAsn1Error as exc:
- debug.logger & debug.FLAG_SM and debug.logger('processIncomingMsg: %s' % exc)
- snmpInGenErrs, = mibBuilder.importSymbols('__SNMPv2-MIB', 'snmpInGenErrs')
+ debug.logger & debug.FLAG_SM and debug.logger(
+ 'processIncomingMsg: %s' % exc)
+
+ snmpInGenErrs, = mibBuilder.importSymbols(
+ '__SNMPv2-MIB', 'snmpInGenErrs')
snmpInGenErrs.syntax += 1
+
raise error.StatusInformation(errorIndication=errind.invalidMsg)
+
else:
# empty username used for engineID discovery
usmUserName = usmUserSecurityName = null
@@ -713,11 +836,14 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
usmUserAuthKeyLocalized = usmUserPrivKeyLocalized = None
debug.logger & debug.FLAG_SM and debug.logger(
- 'processIncomingMsg: now have usmUserName %r usmUserSecurityName %r usmUserAuthProtocol %r usmUserPrivProtocol %r for msgUserName %r' % (
- usmUserName, usmUserSecurityName, usmUserAuthProtocol, usmUserPrivProtocol, msgUserName))
+ 'processIncomingMsg: now have usmUserName %r usmUserSecurityName '
+ '%r usmUserAuthProtocol %r usmUserPrivProtocol %r for msgUserName '
+ '%r' % (usmUserName, usmUserSecurityName, usmUserAuthProtocol,
+ usmUserPrivProtocol, msgUserName))
# 3.2.11 (moved up here to let Reports be authenticated & encrypted)
self._cache.pop(securityStateReference)
+
securityStateReference = self._cache.push(
msgUserName=securityParameters.getComponentByPosition(3),
usmUserSecurityName=usmUserSecurityName,
@@ -743,21 +869,24 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
privKey=usmUserPrivKeyLocalized)
)
snmpEngine.observer.clearExecutionContext(
- snmpEngine, 'rfc3414.processIncomingMsg'
- )
+ snmpEngine, 'rfc3414.processIncomingMsg')
# 3.2.5
if msgAuthoritativeEngineId == snmpEngineID:
# Authoritative SNMP engine: make sure securityLevel is sufficient
badSecIndication = None
+
if securityLevel == 3:
if usmUserAuthProtocol == noauth.NoAuth.SERVICE_ID:
badSecIndication = 'authPriv wanted while auth not expected'
+
if usmUserPrivProtocol == nopriv.NoPriv.SERVICE_ID:
badSecIndication = 'authPriv wanted while priv not expected'
+
elif securityLevel == 2:
if usmUserAuthProtocol == noauth.NoAuth.SERVICE_ID:
badSecIndication = 'authNoPriv wanted while auth not expected'
+
if usmUserPrivProtocol != nopriv.NoPriv.SERVICE_ID:
# 4 (discovery phase always uses authenticated messages)
if msgAuthoritativeEngineBoots or msgAuthoritativeEngineTime:
@@ -766,15 +895,20 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
elif securityLevel == 1:
if usmUserAuthProtocol != noauth.NoAuth.SERVICE_ID:
badSecIndication = 'noAuthNoPriv wanted while auth expected'
+
if usmUserPrivProtocol != nopriv.NoPriv.SERVICE_ID:
badSecIndication = 'noAuthNoPriv wanted while priv expected'
+
if badSecIndication:
usmStatsUnsupportedSecLevels, = mibBuilder.importSymbols(
'__SNMP-USER-BASED-SM-MIB', 'usmStatsUnsupportedSecLevels')
+
usmStatsUnsupportedSecLevels.syntax += 1
+
debug.logger & debug.FLAG_SM and debug.logger(
- 'processIncomingMsg: reporting inappropriate security level for user %s: %s' % (
- msgUserName, badSecIndication))
+ 'processIncomingMsg: reporting inappropriate security '
+ 'level for user %s: %s' % (msgUserName, badSecIndication))
+
raise error.StatusInformation(
errorIndication=errind.unsupportedSecurityLevel,
oid=usmStatsUnsupportedSecLevels.name,
@@ -791,22 +925,23 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
if securityLevel == 3 or securityLevel == 2:
if usmUserAuthProtocol in self.AUTH_SERVICES:
authHandler = self.AUTH_SERVICES[usmUserAuthProtocol]
+
else:
raise error.StatusInformation(
- errorIndication=errind.authenticationFailure
- )
+ errorIndication=errind.authenticationFailure)
try:
authHandler.authenticateIncomingMsg(
usmUserAuthKeyLocalized,
securityParameters.getComponentByPosition(4),
- wholeMsg
- )
+ wholeMsg)
except error.StatusInformation:
usmStatsWrongDigests, = mibBuilder.importSymbols(
'__SNMP-USER-BASED-SM-MIB', 'usmStatsWrongDigests')
+
usmStatsWrongDigests.syntax += 1
+
raise error.StatusInformation(
errorIndication=errind.authenticationFailure,
oid=usmStatsWrongDigests.name,
@@ -819,62 +954,81 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
maxSizeResponseScopedPDU=maxSizeResponseScopedPDU
)
- debug.logger & debug.FLAG_SM and debug.logger('processIncomingMsg: incoming msg authenticated')
+ debug.logger & debug.FLAG_SM and debug.logger(
+ 'processIncomingMsg: incoming msg authenticated')
# synchronize time with authed peer
- self.__timeline[msgAuthoritativeEngineId] = (
+ self._timeline[msgAuthoritativeEngineId] = (
securityParameters.getComponentByPosition(1),
securityParameters.getComponentByPosition(2),
securityParameters.getComponentByPosition(2),
int(time.time())
)
- timerResolution = snmpEngine.transportDispatcher is None and 1.0 or snmpEngine.transportDispatcher.getTimerResolution()
- expireAt = int(self.__expirationTimer + 300 / timerResolution)
- if expireAt not in self.__timelineExpQueue:
- self.__timelineExpQueue[expireAt] = []
- self.__timelineExpQueue[expireAt].append(msgAuthoritativeEngineId)
+ timerResolution = (snmpEngine.transportDispatcher is None and 1.0 or
+ snmpEngine.transportDispatcher.getTimerResolution())
+
+ expireAt = int(self._expirationTimer + 300 / timerResolution)
+
+ if expireAt not in self._timelineExpQueue:
+ self._timelineExpQueue[expireAt] = []
+
+ self._timelineExpQueue[expireAt].append(msgAuthoritativeEngineId)
debug.logger & debug.FLAG_SM and debug.logger(
- 'processIncomingMsg: store timeline for securityEngineID %r' % (msgAuthoritativeEngineId,))
+ 'processIncomingMsg: store timeline for securityEngineID '
+ '%r' % (msgAuthoritativeEngineId,))
# 3.2.7
if securityLevel == 3 or securityLevel == 2:
if msgAuthoritativeEngineId == snmpEngineID:
# Authoritative SNMP engine: use local notion (SF bug #1649032)
- (snmpEngineBoots,
- snmpEngineTime) = mibBuilder.importSymbols(
+ snmpEngineBoots, snmpEngineTime = mibBuilder.importSymbols(
'__SNMP-FRAMEWORK-MIB', 'snmpEngineBoots', 'snmpEngineTime')
+
snmpEngineBoots = snmpEngineBoots.syntax
snmpEngineTime = snmpEngineTime.syntax.clone()
+
idleTime = 0
+
debug.logger & debug.FLAG_SM and debug.logger(
- 'processIncomingMsg: read snmpEngineBoots (%s), snmpEngineTime (%s) from LCD' % (
+ 'processIncomingMsg: read snmpEngineBoots (%s), '
+ 'snmpEngineTime (%s) from LCD' % (
snmpEngineBoots, snmpEngineTime))
+
else:
# Non-authoritative SNMP engine: use cached estimates
- if msgAuthoritativeEngineId in self.__timeline:
+ if msgAuthoritativeEngineId in self._timeline:
(snmpEngineBoots, snmpEngineTime,
latestReceivedEngineTime,
- latestUpdateTimestamp) = self.__timeline[
+ latestUpdateTimestamp) = self._timeline[
msgAuthoritativeEngineId
]
+
# time passed since last talk with this SNMP engine
idleTime = int(time.time()) - latestUpdateTimestamp
+
debug.logger & debug.FLAG_SM and debug.logger(
- 'processIncomingMsg: read timeline snmpEngineBoots %s snmpEngineTime %s for msgAuthoritativeEngineId %r, idle time %s secs' % (
- snmpEngineBoots, snmpEngineTime, msgAuthoritativeEngineId, idleTime))
+ 'processIncomingMsg: read timeline snmpEngineBoots %s '
+ 'snmpEngineTime %s for msgAuthoritativeEngineId %r, '
+ 'idle time %s secs' % (snmpEngineBoots, snmpEngineTime,
+ msgAuthoritativeEngineId,
+ idleTime))
else:
raise error.ProtocolError('Peer SNMP engine info missing')
# 3.2.7a
if msgAuthoritativeEngineId == snmpEngineID:
+
if (snmpEngineBoots == 2147483647 or
snmpEngineBoots != msgAuthoritativeEngineBoots or
- abs(idleTime + int(snmpEngineTime) - int(msgAuthoritativeEngineTime)) > 150):
+ (abs(idleTime + int(snmpEngineTime)
+ - int(msgAuthoritativeEngineTime))) > 150):
+
usmStatsNotInTimeWindows, = mibBuilder.importSymbols(
'__SNMP-USER-BASED-SM-MIB', 'usmStatsNotInTimeWindows')
usmStatsNotInTimeWindows.syntax += 1
+
raise error.StatusInformation(
errorIndication=errind.notInTimeWindow,
oid=usmStatsNotInTimeWindows.name,
@@ -886,6 +1040,7 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
msgUserName=msgUserName,
maxSizeResponseScopedPDU=maxSizeResponseScopedPDU
)
+
# 3.2.7b
else:
# 3.2.7b.1
@@ -893,48 +1048,60 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
if (msgAuthoritativeEngineBoots > snmpEngineBoots or
msgAuthoritativeEngineBoots == snmpEngineBoots and
msgAuthoritativeEngineTime > latestReceivedEngineTime):
- self.__timeline[msgAuthoritativeEngineId] = (
+ self._timeline[msgAuthoritativeEngineId] = (
msgAuthoritativeEngineBoots,
msgAuthoritativeEngineTime,
msgAuthoritativeEngineTime,
int(time.time())
)
- timerResolution = snmpEngine.transportDispatcher is None and 1.0 or snmpEngine.transportDispatcher.getTimerResolution()
- expireAt = int(self.__expirationTimer + 300 / timerResolution)
- if expireAt not in self.__timelineExpQueue:
- self.__timelineExpQueue[expireAt] = []
- self.__timelineExpQueue[expireAt].append(msgAuthoritativeEngineId)
+ timerResolution = (
+ snmpEngine.transportDispatcher is None and 1.0 or
+ snmpEngine.transportDispatcher.getTimerResolution())
+
+ expireAt = int(self._expirationTimer + 300 / timerResolution)
+
+ if expireAt not in self._timelineExpQueue:
+ self._timelineExpQueue[expireAt] = []
+
+ self._timelineExpQueue[expireAt].append(msgAuthoritativeEngineId)
debug.logger & debug.FLAG_SM and debug.logger(
- 'processIncomingMsg: stored timeline msgAuthoritativeEngineBoots %s msgAuthoritativeEngineTime %s for msgAuthoritativeEngineId %r' % (
- msgAuthoritativeEngineBoots, msgAuthoritativeEngineTime, msgAuthoritativeEngineId))
+ 'processIncomingMsg: stored timeline '
+ 'msgAuthoritativeEngineBoots %s '
+ 'msgAuthoritativeEngineTime %s for '
+ 'msgAuthoritativeEngineId '
+ '%r' % (msgAuthoritativeEngineBoots,
+ msgAuthoritativeEngineTime,
+ msgAuthoritativeEngineId))
# 3.2.7b.2
if (snmpEngineBoots == 2147483647 or
msgAuthoritativeEngineBoots < snmpEngineBoots or
msgAuthoritativeEngineBoots == snmpEngineBoots and
- abs(idleTime + int(snmpEngineTime) - int(msgAuthoritativeEngineTime)) > 150):
+ (abs(idleTime + int(snmpEngineTime)
+ - int(msgAuthoritativeEngineTime))) > 150):
+
raise error.StatusInformation(
errorIndication=errind.notInTimeWindow,
- msgUserName=msgUserName
- )
+ msgUserName=msgUserName)
# 3.2.8a
if securityLevel == 3:
if usmUserPrivProtocol in self.PRIV_SERVICES:
privHandler = self.PRIV_SERVICES[usmUserPrivProtocol]
+
else:
raise error.StatusInformation(
errorIndication=errind.decryptionError,
- msgUserName=msgUserName
- )
+ msgUserName=msgUserName)
+
encryptedPDU = scopedPduData.getComponentByPosition(1)
+
if encryptedPDU is None: # no ciphertext
raise error.StatusInformation(
errorIndication=errind.decryptionError,
- msgUserName=msgUserName
- )
+ msgUserName=msgUserName)
try:
decryptedData = privHandler.decryptData(
@@ -942,15 +1109,17 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
(securityParameters.getComponentByPosition(1),
securityParameters.getComponentByPosition(2),
securityParameters.getComponentByPosition(5)),
- encryptedPDU
- )
+ encryptedPDU)
+
debug.logger & debug.FLAG_SM and debug.logger(
- 'processIncomingMsg: PDU deciphered into %s' % debug.hexdump(decryptedData))
+ 'processIncomingMsg: PDU deciphered into '
+ '%s' % debug.hexdump(decryptedData))
except error.StatusInformation:
usmStatsDecryptionErrors, = mibBuilder.importSymbols(
'__SNMP-USER-BASED-SM-MIB', 'usmStatsDecryptionErrors')
usmStatsDecryptionErrors.syntax += 1
+
raise error.StatusInformation(
errorIndication=errind.decryptionError,
oid=usmStatsDecryptionErrors.name,
@@ -962,31 +1131,32 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
msgUserName=msgUserName,
maxSizeResponseScopedPDU=maxSizeResponseScopedPDU
)
- scopedPduSpec = scopedPduData.setComponentByPosition(0).getComponentByPosition(0)
+ scopedPduSpec = scopedPduData.setComponentByPosition(
+ 0).getComponentByPosition(0)
+
try:
scopedPDU, rest = decoder.decode(decryptedData, asn1Spec=scopedPduSpec)
except PyAsn1Error as exc:
debug.logger & debug.FLAG_SM and debug.logger(
'processIncomingMsg: scopedPDU decoder failed %s' % exc)
+
raise error.StatusInformation(
errorIndication=errind.decryptionError,
- msgUserName=msgUserName
- )
+ msgUserName=msgUserName)
if eoo.endOfOctets.isSameTypeWith(scopedPDU):
raise error.StatusInformation(
errorIndication=errind.decryptionError,
- msgUserName=msgUserName
- )
+ msgUserName=msgUserName)
+
else:
# 3.2.8b
scopedPDU = scopedPduData.getComponentByPosition(0)
if scopedPDU is None: # no plaintext
raise error.StatusInformation(
errorIndication=errind.decryptionError,
- msgUserName=msgUserName
- )
+ msgUserName=msgUserName)
debug.logger & debug.FLAG_SM and debug.logger(
'processIncomingMsg: scopedPDU decoded %s' % scopedPDU.prettyPrint())
@@ -995,7 +1165,8 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
securityName = usmUserSecurityName
debug.logger & debug.FLAG_SM and debug.logger(
- 'processIncomingMsg: cached msgUserName %s info by securityStateReference %s' % (
+ 'processIncomingMsg: cached msgUserName %s info by '
+ 'securityStateReference %s' % (
msgUserName, securityStateReference))
# Delayed to include details
@@ -1003,6 +1174,7 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
usmStatsUnknownUserNames, = mibBuilder.importSymbols(
'__SNMP-USER-BASED-SM-MIB', 'usmStatsUnknownUserNames')
usmStatsUnknownUserNames.syntax += 1
+
raise error.StatusInformation(
errorIndication=errind.unknownSecurityName,
oid=usmStatsUnknownUserNames.name,
@@ -1021,14 +1193,18 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
return (msgAuthoritativeEngineId, securityName, scopedPDU,
maxSizeResponseScopedPDU, securityStateReference)
- def __expireTimelineInfo(self):
- if self.__expirationTimer in self.__timelineExpQueue:
- for engineIdKey in self.__timelineExpQueue[self.__expirationTimer]:
- if engineIdKey in self.__timeline:
- del self.__timeline[engineIdKey]
- debug.logger & debug.FLAG_SM and debug.logger('__expireTimelineInfo: expiring %r' % (engineIdKey,))
- del self.__timelineExpQueue[self.__expirationTimer]
- self.__expirationTimer += 1
+ def _expireTimelineInfo(self):
+ if self._expirationTimer in self._timelineExpQueue:
+
+ for engineIdKey in self._timelineExpQueue[self._expirationTimer]:
+ if engineIdKey in self._timeline:
+ del self._timeline[engineIdKey]
+ debug.logger & debug.FLAG_SM and debug.logger(
+ '__expireTimelineInfo: expiring %r' % (engineIdKey,))
+
+ del self._timelineExpQueue[self._expirationTimer]
+
+ self._expirationTimer += 1
def receiveTimerTick(self, snmpEngine, timeNow):
- self.__expireTimelineInfo()
+ self._expireTimelineInfo()
diff --git a/pysnmp/proto/secmod/rfc3826/priv/aes.py b/pysnmp/proto/secmod/rfc3826/priv/aes.py
index 56f69280..d000f8ed 100644
--- a/pysnmp/proto/secmod/rfc3826/priv/aes.py
+++ b/pysnmp/proto/secmod/rfc3826/priv/aes.py
@@ -38,66 +38,85 @@ class Aes(base.AbstractEncryptionService):
local_int = random.randrange(0, 0xffffffffffffffff)
# 3.1.2.1
- def __getEncryptionKey(self, privKey, snmpEngineBoots, snmpEngineTime):
- salt = [self.local_int >> 56 & 0xff,
- self.local_int >> 48 & 0xff,
- self.local_int >> 40 & 0xff,
- self.local_int >> 32 & 0xff,
- self.local_int >> 24 & 0xff,
- self.local_int >> 16 & 0xff,
- self.local_int >> 8 & 0xff,
- self.local_int & 0xff]
+ def _getEncryptionKey(self, privKey, snmpEngineBoots, snmpEngineTime):
+ salt = [
+ self.local_int >> 56 & 0xff,
+ self.local_int >> 48 & 0xff,
+ self.local_int >> 40 & 0xff,
+ self.local_int >> 32 & 0xff,
+ self.local_int >> 24 & 0xff,
+ self.local_int >> 16 & 0xff,
+ self.local_int >> 8 & 0xff,
+ self.local_int & 0xff
+ ]
if self.local_int == 0xffffffffffffffff:
self.local_int = 0
+
else:
self.local_int += 1
- return self.__getDecryptionKey(privKey, snmpEngineBoots, snmpEngineTime, salt) + (
- univ.OctetString(salt).asOctets(),)
+ key, iv = self._getDecryptionKey(
+ privKey, snmpEngineBoots, snmpEngineTime, salt)
+
+ return key, iv, univ.OctetString(salt).asOctets()
+
+ def _getDecryptionKey(self, privKey, snmpEngineBoots,
+ snmpEngineTime, salt):
- def __getDecryptionKey(self, privKey, snmpEngineBoots,
- snmpEngineTime, salt):
snmpEngineBoots, snmpEngineTime, salt = (
- int(snmpEngineBoots), int(snmpEngineTime), salt
- )
+ int(snmpEngineBoots), int(snmpEngineTime), salt)
+
+ iv = [
+ snmpEngineBoots >> 24 & 0xff,
+ snmpEngineBoots >> 16 & 0xff,
+ snmpEngineBoots >> 8 & 0xff,
+ snmpEngineBoots & 0xff,
+ snmpEngineTime >> 24 & 0xff,
+ snmpEngineTime >> 16 & 0xff,
+ snmpEngineTime >> 8 & 0xff,
+ snmpEngineTime & 0xff
+ ]
- iv = [snmpEngineBoots >> 24 & 0xff,
- snmpEngineBoots >> 16 & 0xff,
- snmpEngineBoots >> 8 & 0xff,
- snmpEngineBoots & 0xff,
- snmpEngineTime >> 24 & 0xff,
- snmpEngineTime >> 16 & 0xff,
- snmpEngineTime >> 8 & 0xff,
- snmpEngineTime & 0xff] + salt
+ iv += salt
- return privKey[:self.KEY_SIZE].asOctets(), univ.OctetString(iv).asOctets()
+ key = privKey[:self.KEY_SIZE].asOctets()
+ iv = univ.OctetString(iv).asOctets()
+
+ return key, iv
def hashPassphrase(self, authProtocol, privKey):
if authProtocol == hmacmd5.HmacMd5.SERVICE_ID:
hashAlgo = md5
+
elif authProtocol == hmacsha.HmacSha.SERVICE_ID:
hashAlgo = sha1
+
elif authProtocol in hmacsha2.HmacSha2.HASH_ALGORITHM:
hashAlgo = hmacsha2.HmacSha2.HASH_ALGORITHM[authProtocol]
+
else:
raise error.ProtocolError(
- 'Unknown auth protocol %s' % (authProtocol,)
- )
+ 'Unknown auth protocol %s' % (authProtocol,))
+
return localkey.hashPassphrase(privKey, hashAlgo)
def localizeKey(self, authProtocol, privKey, snmpEngineID):
if authProtocol == hmacmd5.HmacMd5.SERVICE_ID:
hashAlgo = md5
+
elif authProtocol == hmacsha.HmacSha.SERVICE_ID:
hashAlgo = sha1
+
elif authProtocol in hmacsha2.HmacSha2.HASH_ALGORITHM:
hashAlgo = hmacsha2.HmacSha2.HASH_ALGORITHM[authProtocol]
+
else:
raise error.ProtocolError(
- 'Unknown auth protocol %s' % (authProtocol,)
- )
+ 'Unknown auth protocol %s' % (authProtocol,))
+
localPrivKey = localkey.localizeKey(privKey, snmpEngineID, hashAlgo)
+
return localPrivKey[:self.KEY_SIZE]
# 3.2.4.1
@@ -105,21 +124,20 @@ class Aes(base.AbstractEncryptionService):
snmpEngineBoots, snmpEngineTime, salt = privParameters
# 3.3.1.1
- aesKey, iv, salt = self.__getEncryptionKey(
- encryptKey, snmpEngineBoots, snmpEngineTime
- )
+ aesKey, iv, salt = self._getEncryptionKey(
+ encryptKey, snmpEngineBoots, snmpEngineTime)
# 3.3.1.3
# PyCrypto seems to require padding
- dataToEncrypt = dataToEncrypt + univ.OctetString((0,) * (16 - len(dataToEncrypt) % 16)).asOctets()
+ padding = univ.OctetString((0,) * (16 - len(dataToEncrypt) % 16))
+ dataToEncrypt += padding
try:
- ciphertext = aes.encrypt(dataToEncrypt, aesKey, iv)
+ ciphertext = aes.encrypt(dataToEncrypt.asOctets(), aesKey, iv)
except PysnmpCryptoError:
raise error.StatusInformation(
- errorIndication=errind.unsupportedPrivProtocol
- )
+ errorIndication=errind.unsupportedPrivProtocol)
# 3.3.1.4
return univ.OctetString(ciphertext), univ.OctetString(salt)
@@ -131,16 +149,15 @@ class Aes(base.AbstractEncryptionService):
# 3.3.2.1
if len(salt) != 8:
raise error.StatusInformation(
- errorIndication=errind.decryptionError
- )
+ errorIndication=errind.decryptionError)
# 3.3.2.3
- aesKey, iv = self.__getDecryptionKey(
- decryptKey, snmpEngineBoots, snmpEngineTime, salt
- )
+ aesKey, iv = self._getDecryptionKey(
+ decryptKey, snmpEngineBoots, snmpEngineTime, salt)
# PyCrypto seems to require padding
- encryptedData = encryptedData + univ.OctetString((0,) * (16 - len(encryptedData) % 16)).asOctets()
+ padding = univ.OctetString((0,) * (16 - len(encryptedData) % 16))
+ encryptedData += padding
try:
# 3.3.2.4-6
@@ -148,5 +165,4 @@ class Aes(base.AbstractEncryptionService):
except PysnmpCryptoError:
raise error.StatusInformation(
- errorIndication=errind.unsupportedPrivProtocol
- )
+ errorIndication=errind.unsupportedPrivProtocol)
diff --git a/pysnmp/proto/secmod/rfc7860/auth/hmacsha2.py b/pysnmp/proto/secmod/rfc7860/auth/hmacsha2.py
index 11dd79b3..267db682 100644
--- a/pysnmp/proto/secmod/rfc7860/auth/hmacsha2.py
+++ b/pysnmp/proto/secmod/rfc7860/auth/hmacsha2.py
@@ -19,19 +19,26 @@ except ImportError:
sha224 = sha256 = sha384 = sha512 = NotAvailable()
-from pyasn1.type import univ
from pysnmp.proto.secmod.rfc3414.auth import base
from pysnmp.proto.secmod.rfc3414 import localkey
from pysnmp.proto import errind, error
+from pyasn1.type import univ
# 7.2.4
class HmacSha2(base.AbstractAuthenticationService):
- SHA224_SERVICE_ID = (1, 3, 6, 1, 6, 3, 10, 1, 1, 4) # usmHMAC128SHA224AuthProtocol
- SHA256_SERVICE_ID = (1, 3, 6, 1, 6, 3, 10, 1, 1, 5) # usmHMAC192SHA256AuthProtocol
- SHA384_SERVICE_ID = (1, 3, 6, 1, 6, 3, 10, 1, 1, 6) # usmHMAC256SHA384AuthProtocol
- SHA512_SERVICE_ID = (1, 3, 6, 1, 6, 3, 10, 1, 1, 7) # usmHMAC384SHA512AuthProtocol
+ # usmHMAC128SHA224AuthProtocol
+ SHA224_SERVICE_ID = (1, 3, 6, 1, 6, 3, 10, 1, 1, 4)
+
+ # usmHMAC192SHA256AuthProtocol
+ SHA256_SERVICE_ID = (1, 3, 6, 1, 6, 3, 10, 1, 1, 5)
+
+ # usmHMAC256SHA384AuthProtocol
+ SHA384_SERVICE_ID = (1, 3, 6, 1, 6, 3, 10, 1, 1, 6)
+
+ # usmHMAC384SHA512AuthProtocol
+ SHA512_SERVICE_ID = (1, 3, 6, 1, 6, 3, 10, 1, 1, 7)
KEY_LENGTH = {
SHA224_SERVICE_ID: 28,
@@ -59,39 +66,43 @@ class HmacSha2(base.AbstractAuthenticationService):
def __init__(self, oid):
if oid not in self.HASH_ALGORITHM:
- raise error.ProtocolError('No SHA-2 authentication algorithm %s available' % (oid,))
- self.__hashAlgo = self.HASH_ALGORITHM[oid]
- self.__digestLength = self.DIGEST_LENGTH[oid]
- self.__placeHolder = univ.OctetString((0,) * self.__digestLength).asOctets()
+ raise error.ProtocolError(
+ 'No SHA-2 authentication algorithm %s available' % (oid,))
+
+ self._hashAlgo = self.HASH_ALGORITHM[oid]
+ self._digestLength = self.DIGEST_LENGTH[oid]
+ self._placeHolder = univ.OctetString(
+ (0,) * self._digestLength).asOctets()
def hashPassphrase(self, authKey):
- return localkey.hashPassphrase(authKey, self.__hashAlgo)
+ return localkey.hashPassphrase(authKey, self._hashAlgo)
def localizeKey(self, authKey, snmpEngineID):
- return localkey.localizeKey(authKey, snmpEngineID, self.__hashAlgo)
+ return localkey.localizeKey(authKey, snmpEngineID, self._hashAlgo)
@property
def digestLength(self):
- return self.__digestLength
+ return self._digestLength
# 7.3.1
def authenticateOutgoingMsg(self, authKey, wholeMsg):
# 7.3.1.1
- location = wholeMsg.find(self.__placeHolder)
+ location = wholeMsg.find(self._placeHolder)
if location == -1:
- raise error.ProtocolError('Can\'t locate digest placeholder')
+ raise error.ProtocolError('Cannot locate digest placeholder')
+
wholeHead = wholeMsg[:location]
- wholeTail = wholeMsg[location + self.__digestLength:]
+ wholeTail = wholeMsg[location + self._digestLength:]
# 7.3.1.2, 7.3.1.3
try:
- mac = hmac.new(authKey.asOctets(), wholeMsg, self.__hashAlgo)
+ mac = hmac.new(authKey.asOctets(), wholeMsg, self._hashAlgo)
except errind.ErrorIndication as exc:
raise error.StatusInformation(errorIndication=exc)
# 7.3.1.4
- mac = mac.digest()[:self.__digestLength]
+ mac = mac.digest()[:self._digestLength]
# 7.3.1.5 & 6
return wholeHead + mac + wholeTail
@@ -99,33 +110,32 @@ class HmacSha2(base.AbstractAuthenticationService):
# 7.3.2
def authenticateIncomingMsg(self, authKey, authParameters, wholeMsg):
# 7.3.2.1 & 2
- if len(authParameters) != self.__digestLength:
+ if len(authParameters) != self._digestLength:
raise error.StatusInformation(
- errorIndication=errind.authenticationError
- )
+ errorIndication=errind.authenticationError)
# 7.3.2.3
location = wholeMsg.find(authParameters.asOctets())
if location == -1:
- raise error.ProtocolError('Can\'t locate digest in wholeMsg')
+ raise error.ProtocolError('Cannot locate digest in wholeMsg')
+
wholeHead = wholeMsg[:location]
- wholeTail = wholeMsg[location + self.__digestLength:]
- authenticatedWholeMsg = wholeHead + self.__placeHolder + wholeTail
+ wholeTail = wholeMsg[location + self._digestLength:]
+ authenticatedWholeMsg = wholeHead + self._placeHolder + wholeTail
# 7.3.2.4
try:
- mac = hmac.new(authKey.asOctets(), authenticatedWholeMsg, self.__hashAlgo)
+ mac = hmac.new(authKey.asOctets(), authenticatedWholeMsg, self._hashAlgo)
except errind.ErrorIndication as exc:
raise error.StatusInformation(errorIndication=exc)
# 7.3.2.5
- mac = mac.digest()[:self.__digestLength]
+ mac = mac.digest()[:self._digestLength]
# 7.3.2.6
if mac != authParameters:
raise error.StatusInformation(
- errorIndication=errind.authenticationFailure
- )
+ errorIndication=errind.authenticationFailure)
return authenticatedWholeMsg