summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIlya Etingof <etingof@gmail.com>2016-04-03 11:29:27 +0200
committerIlya Etingof <etingof@gmail.com>2016-04-03 11:29:27 +0200
commit6cd2de39a8452c29eeeed99afc0b425291143ebd (patch)
tree50cc72c3067644da5bb1b5164fe3a57cef681cd3
parent90bbf397ad3dd49db7f83d541afff51f17e63054 (diff)
downloadpysnmp-git-6cd2de39a8452c29eeeed99afc0b425291143ebd.tar.gz
pep8 reformatted
-rw-r--r--pysnmp/proto/cache.py3
-rw-r--r--pysnmp/proto/errind.py75
-rw-r--r--pysnmp/proto/error.py10
-rw-r--r--pysnmp/proto/rfc1155.py23
-rw-r--r--pysnmp/proto/rfc1157.py25
-rw-r--r--pysnmp/proto/rfc1901.py1
-rw-r--r--pysnmp/proto/rfc1902.py73
-rw-r--r--pysnmp/proto/rfc1905.py34
-rw-r--r--pysnmp/proto/rfc3412.py96
-rw-r--r--pysnmp/proto/secmod/base.py2
-rw-r--r--pysnmp/proto/secmod/cache.py2
-rw-r--r--pysnmp/proto/secmod/rfc2576.py115
-rw-r--r--pysnmp/proto/secmod/rfc3414/__init__.py6
-rw-r--r--pysnmp/proto/secmod/rfc3414/auth/hmacmd5.py34
-rw-r--r--pysnmp/proto/secmod/rfc3414/auth/hmacsha.py30
-rw-r--r--pysnmp/proto/secmod/rfc3414/auth/noauth.py1
-rw-r--r--pysnmp/proto/secmod/rfc3414/localkey.py30
-rw-r--r--pysnmp/proto/secmod/rfc3414/priv/base.py1
-rw-r--r--pysnmp/proto/secmod/rfc3414/priv/des.py30
-rw-r--r--pysnmp/proto/secmod/rfc3414/priv/nopriv.py4
-rw-r--r--pysnmp/proto/secmod/rfc3414/service.py278
-rw-r--r--pysnmp/proto/secmod/rfc3826/priv/aes.py43
22 files changed, 635 insertions, 281 deletions
diff --git a/pysnmp/proto/cache.py b/pysnmp/proto/cache.py
index eb3e3b69..a86b991c 100644
--- a/pysnmp/proto/cache.py
+++ b/pysnmp/proto/cache.py
@@ -6,6 +6,7 @@
#
from pysnmp.proto import error
+
class Cache:
def __init__(self):
self.__cacheRepository = {}
@@ -26,7 +27,7 @@ class Cache:
if index not in self.__cacheRepository:
raise error.ProtocolError(
'Cache miss on update for %s' % kwargs
- )
+ )
self.__cacheRepository[index].update(kwargs)
def expire(self, cbFun, cbCtx):
diff --git a/pysnmp/proto/errind.py b/pysnmp/proto/errind.py
index f11c1fa1..12402ea1 100644
--- a/pysnmp/proto/errind.py
+++ b/pysnmp/proto/errind.py
@@ -4,8 +4,11 @@
# Copyright (c) 2005-2016, Ilya Etingof <ilya@glas.net>
# License: http://pysnmp.sf.net/license.html
#
+
+
class ErrorIndication:
"""SNMPv3 error-indication values"""
+
def __init__(self, descr=None):
self.__value = self.__descr = self.__class__.__name__[0].lower() + self.__class__.__name__[1:]
if descr:
@@ -32,190 +35,262 @@ class ErrorIndication:
def __str__(self):
return self.__descr
+
# SNMP message processing errors
class SerializationError(ErrorIndication):
pass
+
serializationError = SerializationError('SNMP message serialization error')
+
class DeserializationError(ErrorIndication):
pass
+
deserializationError = DeserializationError('SNMP message deserialization error')
+
class ParseError(DeserializationError):
pass
+
parseError = ParseError('SNMP message deserialization error')
+
class UnsupportedMsgProcessingModel(ErrorIndication):
pass
+
unsupportedMsgProcessingModel = UnsupportedMsgProcessingModel('Unknown SNMP message processing model ID encountered')
+
class UnknownPDUHandler(ErrorIndication):
pass
+
unknownPDUHandler = UnknownPDUHandler('Unhandled PDU type encountered')
+
class UnsupportedPDUtype(ErrorIndication):
pass
+
unsupportedPDUtype = UnsupportedPDUtype('Unsupported SNMP PDU type encountered')
+
class RequestTimedOut(ErrorIndication):
pass
+
requestTimedOut = RequestTimedOut('No SNMP response received before timeout')
+
class EmptyResponse(ErrorIndication):
pass
+
emptyResponse = EmptyResponse('Empty SNMP response message')
+
class NonReportable(ErrorIndication):
pass
+
nonReportable = NonReportable('Report PDU generation not attempted')
+
class DataMismatch(ErrorIndication):
pass
+
dataMismatch = DataMismatch('SNMP request/response parameters mismatched')
+
class EngineIDMismatch(ErrorIndication):
pass
+
engineIDMismatch = EngineIDMismatch('SNMP engine ID mismatch encountered')
+
class UnknownEngineID(ErrorIndication):
pass
+
unknownEngineID = UnknownEngineID('Unknown SNMP engine ID encountered')
+
class TooBig(ErrorIndication):
pass
+
tooBig = TooBig('SNMP message will be too big')
+
class LoopTerminated(ErrorIndication):
pass
+
loopTerminated = LoopTerminated('Infinite SNMP entities talk terminated')
+
class InvalidMsg(ErrorIndication):
pass
+
invalidMsg = InvalidMsg('Invalid SNMP message header parameters encountered')
+
# SNMP security modules errors
class UnknownCommunityName(ErrorIndication):
pass
+
unknownCommunityName = UnknownCommunityName('Unknown SNMP community name encountered')
+
class NoEncryption(ErrorIndication):
pass
+
noEncryption = NoEncryption('No encryption services configured')
+
class EncryptionError(ErrorIndication):
pass
+
encryptionError = EncryptionError('Ciphering services not available')
+
class DecryptionError(ErrorIndication):
pass
+
decryptionError = DecryptionError('Ciphering services not available or ciphertext is broken')
+
class NoAuthentication(ErrorIndication):
pass
+
noAuthentication = NoAuthentication('No authentication services configured')
+
class AuthenticationError(ErrorIndication):
pass
+
authenticationError = AuthenticationError('Ciphering services not available or bad parameters')
+
class AuthenticationFailure(ErrorIndication):
pass
+
authenticationFailure = AuthenticationFailure('Authenticator mismatched')
+
class UnsupportedAuthProtocol(ErrorIndication):
pass
+
unsupportedAuthProtocol = UnsupportedAuthProtocol('Authentication protocol is not supprted')
+
class UnsupportedPrivProtocol(ErrorIndication):
pass
+
unsupportedPrivProtocol = UnsupportedPrivProtocol('Privacy protocol is not supprted')
+
class UnknownSecurityName(ErrorIndication):
pass
+
unknownSecurityName = UnknownSecurityName('Unknown SNMP security name encountered')
+
class UnsupportedSecurityModel(ErrorIndication):
pass
+
unsupportedSecurityModel = UnsupportedSecurityModel('Unsupported SNMP security model')
+
class UnsupportedSecurityLevel(ErrorIndication):
pass
+
unsupportedSecurityLevel = UnsupportedSecurityLevel('Unsupported SNMP security level')
+
class NotInTimeWindow(ErrorIndication):
pass
+
notInTimeWindow = NotInTimeWindow('SNMP message timing parameters not in windows of trust')
+
# SNMP access-control errors
class NoSuchView(ErrorIndication):
pass
+
noSuchView = NoSuchView('No such MIB view currently exists')
+
class NoAccessEntry(ErrorIndication):
pass
+
noAccessEntry = NoAccessEntry('Access to MIB node denined')
+
class NoGroupName(ErrorIndication):
pass
+
noGroupName = NoGroupName('No such VACM group configured')
+
class NoSuchContext(ErrorIndication):
pass
+
noSuchContext = NoSuchContext('SNMP context now found')
+
class NotInView(ErrorIndication):
pass
+
notInView = NotInView('Requested OID is out of MIB view')
+
class AccessAllowed(ErrorIndication):
pass
+
accessAllowed = AccessAllowed()
+
class OtherError(ErrorIndication):
pass
+
otherError = OtherError('Unspecified SNMP engine error occurred')
+
# SNMP Apps errors
class OidNotIncreasing(ErrorIndication):
pass
+
oidNotIncreasing = OidNotIncreasing('OIDs are not increasing')
diff --git a/pysnmp/proto/error.py b/pysnmp/proto/error.py
index b83def84..5abcdabd 100644
--- a/pysnmp/proto/error.py
+++ b/pysnmp/proto/error.py
@@ -8,19 +8,23 @@ from pyasn1.error import PyAsn1Error
from pysnmp.error import PySnmpError
from pysnmp import debug
+
class ProtocolError(PySnmpError, PyAsn1Error):
pass
+
# SNMP v3 exceptions
class SnmpV3Error(ProtocolError):
pass
+
class StatusInformation(SnmpV3Error):
def __init__(self, **kwargs):
SnmpV3Error.__init__(self)
self.__errorIndication = kwargs
- debug.logger & (debug.flagDsp|debug.flagMP|debug.flagSM|debug.flagACL) and debug.logger('StatusInformation: %s' % kwargs)
+ debug.logger & (debug.flagDsp | debug.flagMP | debug.flagSM | debug.flagACL) and debug.logger(
+ 'StatusInformation: %s' % kwargs)
def __str__(self):
return str(self.__errorIndication)
@@ -34,14 +38,18 @@ class StatusInformation(SnmpV3Error):
def get(self, key, defVal=None):
return self.__errorIndication.get(key, defVal)
+
class CacheExpiredError(SnmpV3Error):
pass
+
class InternalError(SnmpV3Error):
pass
+
class MessageProcessingError(SnmpV3Error):
pass
+
class RequestTimeout(SnmpV3Error):
pass
diff --git a/pysnmp/proto/rfc1155.py b/pysnmp/proto/rfc1155.py
index 88939307..fa7ab552 100644
--- a/pysnmp/proto/rfc1155.py
+++ b/pysnmp/proto/rfc1155.py
@@ -11,11 +11,12 @@ from pysnmp.proto import error
__all__ = ['Opaque', 'NetworkAddress', 'ObjectName', 'TimeTicks',
'Counter', 'Gauge', 'IpAddress']
+
class IpAddress(univ.OctetString):
tagSet = univ.OctetString.tagSet.tagImplicitly(
tag.Tag(tag.tagClassApplication, tag.tagFormatSimple, 0x00)
)
- subtypeSpec = univ.OctetString.subtypeSpec+constraint.ValueSizeConstraint(
+ subtypeSpec = univ.OctetString.subtypeSpec + constraint.ValueSizeConstraint(
4, 4
)
@@ -24,7 +25,7 @@ class IpAddress(univ.OctetString):
try:
value = [int(x) for x in value.split('.')]
except:
- raise error.ProtocolError('Bad IP address syntax %s' % value)
+ raise error.ProtocolError('Bad IP address syntax %s' % value)
if len(value) != 4:
raise error.ProtocolError('Bad IP address syntax')
return univ.OctetString.prettyIn(self, value)
@@ -35,44 +36,51 @@ class IpAddress(univ.OctetString):
else:
return ''
+
class Counter(univ.Integer):
tagSet = univ.Integer.tagSet.tagImplicitly(
tag.Tag(tag.tagClassApplication, tag.tagFormatSimple, 0x01)
)
- subtypeSpec = univ.Integer.subtypeSpec+constraint.ValueRangeConstraint(
+ subtypeSpec = univ.Integer.subtypeSpec + constraint.ValueRangeConstraint(
0, 4294967295
)
+
class NetworkAddress(univ.Choice):
componentType = namedtype.NamedTypes(
namedtype.NamedType('internet', IpAddress())
)
+
class Gauge(univ.Integer):
tagSet = univ.Integer.tagSet.tagImplicitly(
tag.Tag(tag.tagClassApplication, tag.tagFormatSimple, 0x02)
)
- subtypeSpec = univ.Integer.subtypeSpec+constraint.ValueRangeConstraint(
+ subtypeSpec = univ.Integer.subtypeSpec + constraint.ValueRangeConstraint(
0, 4294967295
)
+
class TimeTicks(univ.Integer):
tagSet = univ.Integer.tagSet.tagImplicitly(
tag.Tag(tag.tagClassApplication, tag.tagFormatSimple, 0x03)
)
- subtypeSpec = univ.Integer.subtypeSpec+constraint.ValueRangeConstraint(
+ subtypeSpec = univ.Integer.subtypeSpec + constraint.ValueRangeConstraint(
0, 4294967295
)
+
class Opaque(univ.OctetString):
tagSet = univ.OctetString.tagSet.tagImplicitly(
tag.Tag(tag.tagClassApplication, tag.tagFormatSimple, 0x04)
)
+
class ObjectName(univ.ObjectIdentifier):
pass
-class TypeCoercionHackMixIn: # XXX
+
+class TypeCoercionHackMixIn: # XXX
# Reduce ASN1 type check to simple tag check as SMIv2 objects may
# not be constraints-compatible with those used in SNMP PDU.
def _verifyComponent(self, idx, value, **kwargs):
@@ -84,6 +92,7 @@ class TypeCoercionHackMixIn: # XXX
if not t.getTagSet().isSuperTagSetOf(value.getTagSet()):
raise PyAsn1Error('Component type error %r vs %r' % (t, value))
+
class SimpleSyntax(TypeCoercionHackMixIn, univ.Choice):
componentType = namedtype.NamedTypes(
namedtype.NamedType('number', univ.Integer()),
@@ -92,6 +101,7 @@ class SimpleSyntax(TypeCoercionHackMixIn, univ.Choice):
namedtype.NamedType('empty', univ.Null())
)
+
class ApplicationSyntax(TypeCoercionHackMixIn, univ.Choice):
componentType = namedtype.NamedTypes(
namedtype.NamedType('address', NetworkAddress()),
@@ -101,6 +111,7 @@ class ApplicationSyntax(TypeCoercionHackMixIn, univ.Choice):
namedtype.NamedType('arbitrary', Opaque())
)
+
class ObjectSyntax(univ.Choice):
componentType = namedtype.NamedTypes(
namedtype.NamedType('simple', SimpleSyntax()),
diff --git a/pysnmp/proto/rfc1157.py b/pysnmp/proto/rfc1157.py
index 97b4215a..0b04747c 100644
--- a/pysnmp/proto/rfc1157.py
+++ b/pysnmp/proto/rfc1157.py
@@ -10,15 +10,22 @@ from pysnmp.proto import rfc1155
__all__ = ['GetNextRequestPDU', 'GetResponsePDU', 'SetRequestPDU',
'TrapPDU', 'GetRequestPDU']
+
class VarBind(univ.Sequence):
componentType = namedtype.NamedTypes(
namedtype.NamedType('name', rfc1155.ObjectName()),
namedtype.NamedType('value', rfc1155.ObjectSyntax())
)
+
+
class VarBindList(univ.SequenceOf):
componentType = VarBind()
-errorStatus = univ.Integer(namedValues=namedval.NamedValues(('noError', 0), ('tooBig', 1), ('noSuchName', 2), ('badValue', 3), ('readOnly', 4), ('genErr', 5)))
+
+errorStatus = univ.Integer(
+ namedValues=namedval.NamedValues(('noError', 0), ('tooBig', 1), ('noSuchName', 2),
+ ('badValue', 3), ('readOnly', 4), ('genErr', 5)))
+
class _RequestBase(univ.Sequence):
componentType = namedtype.NamedTypes(
@@ -28,24 +35,35 @@ class _RequestBase(univ.Sequence):
namedtype.NamedType('variable-bindings', VarBindList())
)
+
class GetRequestPDU(_RequestBase):
tagSet = _RequestBase.tagSet.tagImplicitly(
tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0)
)
+
+
class GetNextRequestPDU(_RequestBase):
tagSet = _RequestBase.tagSet.tagImplicitly(
tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1)
)
+
+
class GetResponsePDU(_RequestBase):
tagSet = _RequestBase.tagSet.tagImplicitly(
tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 2)
)
+
+
class SetRequestPDU(_RequestBase):
tagSet = _RequestBase.tagSet.tagImplicitly(
tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 3)
)
-genericTrap = univ.Integer().clone(namedValues=namedval.NamedValues(('coldStart', 0), ('warmStart', 1), ('linkDown', 2), ('linkUp', 3), ('authenticationFailure', 4), ('egpNeighborLoss', 5), ('enterpriseSpecific', 6)))
+
+genericTrap = univ.Integer().clone(
+ namedValues=namedval.NamedValues(('coldStart', 0), ('warmStart', 1), ('linkDown', 2), ('linkUp', 3),
+ ('authenticationFailure', 4), ('egpNeighborLoss', 5), ('enterpriseSpecific', 6)))
+
class TrapPDU(univ.Sequence):
tagSet = univ.Sequence.tagSet.tagImplicitly(
@@ -60,6 +78,7 @@ class TrapPDU(univ.Sequence):
namedtype.NamedType('variable-bindings', VarBindList())
)
+
class PDUs(univ.Choice):
componentType = namedtype.NamedTypes(
namedtype.NamedType('get-request', GetRequestPDU()),
@@ -69,8 +88,10 @@ class PDUs(univ.Choice):
namedtype.NamedType('trap', TrapPDU())
)
+
version = univ.Integer(namedValues=namedval.NamedValues(('version-1', 0)))
+
class Message(univ.Sequence):
componentType = namedtype.NamedTypes(
namedtype.NamedType('version', version),
diff --git a/pysnmp/proto/rfc1901.py b/pysnmp/proto/rfc1901.py
index ca72301b..01272a91 100644
--- a/pysnmp/proto/rfc1901.py
+++ b/pysnmp/proto/rfc1901.py
@@ -9,6 +9,7 @@ from pysnmp.proto import rfc1905
version = univ.Integer(namedValues=namedval.NamedValues(('version-2c', 1)))
+
class Message(univ.Sequence):
componentType = namedtype.NamedTypes(
namedtype.NamedType('version', version),
diff --git a/pysnmp/proto/rfc1902.py b/pysnmp/proto/rfc1902.py
index 3b69218b..5f28f986 100644
--- a/pysnmp/proto/rfc1902.py
+++ b/pysnmp/proto/rfc1902.py
@@ -13,6 +13,7 @@ __all__ = ['Opaque', 'TimeTicks', 'Bits', 'Integer', 'OctetString',
'IpAddress', 'Counter64', 'Unsigned32', 'Gauge32', 'Integer32',
'ObjectIdentifier', 'Counter32']
+
class Integer32(univ.Integer):
"""Creates an instance of SNMP Integer32 class.
@@ -55,7 +56,7 @@ class Integer32(univ.Integer):
>>>
"""
- subtypeSpec = univ.Integer.subtypeSpec+constraint.ValueRangeConstraint(
+ subtypeSpec = univ.Integer.subtypeSpec + constraint.ValueRangeConstraint(
-2147483648, 2147483647
)
@@ -63,8 +64,10 @@ class Integer32(univ.Integer):
def withValues(cls, *values):
"""Creates a subclass with discreet values constraint.
"""
+
class X(cls):
subtypeSpec = cls.subtypeSpec + constraint.SingleValueConstraint(*values)
+
X.__name__ = cls.__name__
return X
@@ -72,11 +75,14 @@ class Integer32(univ.Integer):
def withRange(cls, minimum, maximum):
"""Creates a subclass with value range constraint.
"""
+
class X(cls):
subtypeSpec = cls.subtypeSpec + constraint.ValueRangeConstraint(minimum, maximum)
+
X.__name__ = cls.__name__
return X
+
class Integer(Integer32):
"""Creates an instance of SNMP INTEGER class.
@@ -120,16 +126,20 @@ class Integer(Integer32):
>>>
"""
+
@classmethod
def withNamedValues(cls, **values):
"""Creates a subclass with discreet named values constraint.
"""
+
class X(cls):
namedValues = cls.namedValues + namedval.NamedValues(*values.items())
subtypeSpec = cls.subtypeSpec + constraint.SingleValueConstraint(*values.values())
+
X.__name__ = cls.__name__
return X
+
class OctetString(univ.OctetString):
"""Creates an instance of SNMP OCTET STRING class.
@@ -169,7 +179,7 @@ class OctetString(univ.OctetString):
>>>
"""
- subtypeSpec = univ.OctetString.subtypeSpec+constraint.ValueSizeConstraint(
+ subtypeSpec = univ.OctetString.subtypeSpec + constraint.ValueSizeConstraint(
0, 65535
)
@@ -205,8 +215,10 @@ class OctetString(univ.OctetString):
def withSize(cls, minimum, maximum):
"""Creates a subclass with value size constraint.
"""
+
class X(cls):
subtypeSpec = cls.subtypeSpec + constraint.ValueSizeConstraint(minimum, maximum)
+
X.__name__ = cls.__name__
return X
@@ -222,6 +234,7 @@ class OctetString(univ.OctetString):
else:
return octets.octs2str(value)
+
class ObjectIdentifier(univ.ObjectIdentifier):
"""Creates an instance of SNMP OBJECT IDENTIFIER class.
@@ -256,6 +269,7 @@ class ObjectIdentifier(univ.ObjectIdentifier):
"""
+
class IpAddress(OctetString):
"""Creates an instance of SNMP IpAddress class.
@@ -289,7 +303,7 @@ class IpAddress(OctetString):
tagSet = OctetString.tagSet.tagImplicitly(
tag.Tag(tag.tagClassApplication, tag.tagFormatSimple, 0x00)
)
- subtypeSpec = OctetString.subtypeSpec+constraint.ValueSizeConstraint(
+ subtypeSpec = OctetString.subtypeSpec + constraint.ValueSizeConstraint(
4, 4
)
fixedLength = 4
@@ -299,7 +313,7 @@ class IpAddress(OctetString):
try:
value = [int(x) for x in value.split('.')]
except:
- raise error.ProtocolError('Bad IP address syntax %s' % value)
+ raise error.ProtocolError('Bad IP address syntax %s' % value)
value = OctetString.prettyIn(self, value)
if len(value) != 4:
raise error.ProtocolError('Bad IP address syntax')
@@ -313,6 +327,7 @@ class IpAddress(OctetString):
else:
return ''
+
class Counter32(univ.Integer):
"""Creates an instance of SNMP Counter32 class.
@@ -347,10 +362,11 @@ class Counter32(univ.Integer):
tagSet = univ.Integer.tagSet.tagImplicitly(
tag.Tag(tag.tagClassApplication, tag.tagFormatSimple, 0x01)
)
- subtypeSpec = univ.Integer.subtypeSpec+constraint.ValueRangeConstraint(
+ subtypeSpec = univ.Integer.subtypeSpec + constraint.ValueRangeConstraint(
0, 4294967295
)
+
class Gauge32(univ.Integer):
"""Creates an instance of SNMP Gauge32 class.
@@ -385,10 +401,11 @@ class Gauge32(univ.Integer):
tagSet = univ.Integer.tagSet.tagImplicitly(
tag.Tag(tag.tagClassApplication, tag.tagFormatSimple, 0x02)
)
- subtypeSpec = univ.Integer.subtypeSpec+constraint.ValueRangeConstraint(
+ subtypeSpec = univ.Integer.subtypeSpec + constraint.ValueRangeConstraint(
0, 4294967295
)
+
class Unsigned32(univ.Integer):
"""Creates an instance of SNMP Unsigned32 class.
@@ -422,10 +439,11 @@ class Unsigned32(univ.Integer):
tagSet = univ.Integer.tagSet.tagImplicitly(
tag.Tag(tag.tagClassApplication, tag.tagFormatSimple, 0x02)
)
- subtypeSpec = univ.Integer.subtypeSpec+constraint.ValueRangeConstraint(
+ subtypeSpec = univ.Integer.subtypeSpec + constraint.ValueRangeConstraint(
0, 4294967295
)
+
class TimeTicks(univ.Integer):
"""Creates an instance of SNMP TimeTicks class.
@@ -459,10 +477,11 @@ class TimeTicks(univ.Integer):
tagSet = univ.Integer.tagSet.tagImplicitly(
tag.Tag(tag.tagClassApplication, tag.tagFormatSimple, 0x03)
)
- subtypeSpec = univ.Integer.subtypeSpec+constraint.ValueRangeConstraint(
+ subtypeSpec = univ.Integer.subtypeSpec + constraint.ValueRangeConstraint(
0, 4294967295
)
+
class Opaque(univ.OctetString):
"""Creates an instance of SNMP Opaque class.
@@ -507,6 +526,7 @@ class Opaque(univ.OctetString):
tag.Tag(tag.tagClassApplication, tag.tagFormatSimple, 0x04)
)
+
class Counter64(univ.Integer):
"""Creates an instance of SNMP Counter64 class.
@@ -541,10 +561,11 @@ class Counter64(univ.Integer):
tagSet = univ.Integer.tagSet.tagImplicitly(
tag.Tag(tag.tagClassApplication, tag.tagFormatSimple, 0x06)
)
- subtypeSpec = univ.Integer.subtypeSpec+constraint.ValueRangeConstraint(
+ subtypeSpec = univ.Integer.subtypeSpec + constraint.ValueRangeConstraint(
0, 18446744073709551615
)
+
class Bits(OctetString):
"""Creates an instance of SNMP BITS class.
@@ -595,6 +616,7 @@ class Bits(OctetString):
"""
namedValues = namedval.NamedValues()
+
def __init__(self, value=None, tagSet=None, subtypeSpec=None,
encoding=None, binValue=None, hexValue=None,
namedValues=None):
@@ -608,19 +630,19 @@ class Bits(OctetString):
def prettyIn(self, bits):
if not isinstance(bits, (tuple, list)):
- return OctetString.prettyIn(self, bits) # raw bitstring
- octets = []
- for bit in bits: # tuple of named bits
+ return OctetString.prettyIn(self, bits) # raw bitstring
+ _octets = []
+ for bit in bits: # tuple of named bits
v = self.__namedValues.getValue(bit)
if v is None:
raise error.ProtocolError(
'Unknown named bit %s' % bit
- )
+ )
d, m = divmod(v, 8)
- if d >= len(octets):
- octets.extend([0] * (d - len(octets) + 1))
- octets[d] = octets[d] | 0x01 << (7-m)
- return OctetString.prettyIn(self, octets)
+ if d >= len(_octets):
+ _octets.extend([0] * (d - len(_octets) + 1))
+ _octets[d] |= 0x01 << (7 - m)
+ return OctetString.prettyIn(self, _octets)
def prettyOut(self, value):
names = []
@@ -630,10 +652,10 @@ class Bits(OctetString):
v = ints[i]
j = 7
while j >= 0:
- if v & (0x01<<j):
- name = self.__namedValues.getName(i*8+7-j)
+ if v & (0x01 << j):
+ name = self.__namedValues.getName(i * 8 + 7 - j)
if name is None:
- name = 'UnknownBit-%s' % (i*8+7-j,)
+ name = 'UnknownBit-%s' % (i * 8 + 7 - j,)
names.append(name)
j -= 1
i += 1
@@ -643,7 +665,7 @@ class Bits(OctetString):
encoding=None, binValue=None, hexValue=None,
namedValues=None):
if value is None and tagSet is None and subtypeSpec is None \
- and namedValues is None:
+ and namedValues is None:
return self
if value is None:
value = self._value
@@ -681,8 +703,10 @@ class Bits(OctetString):
def withNamedBits(cls, **values):
"""Creates a subclass with discreet named bits constraint.
"""
+
class X(cls):
namedValues = cls.namedValues + namedval.NamedValues(*values.items())
+
X.__name__ = cls.__name__
return X
@@ -690,6 +714,7 @@ class Bits(OctetString):
class ObjectName(univ.ObjectIdentifier):
pass
+
class SimpleSyntax(rfc1155.TypeCoercionHackMixIn, univ.Choice):
componentType = namedtype.NamedTypes(
namedtype.NamedType('integer-value', Integer()),
@@ -697,6 +722,7 @@ class SimpleSyntax(rfc1155.TypeCoercionHackMixIn, univ.Choice):
namedtype.NamedType('objectID-value', univ.ObjectIdentifier())
)
+
class ApplicationSyntax(rfc1155.TypeCoercionHackMixIn, univ.Choice):
componentType = namedtype.NamedTypes(
namedtype.NamedType('ipAddress-value', IpAddress()),
@@ -705,9 +731,10 @@ class ApplicationSyntax(rfc1155.TypeCoercionHackMixIn, univ.Choice):
namedtype.NamedType('arbitrary-value', Opaque()),
namedtype.NamedType('big-counter-value', Counter64()),
# This conflicts with Counter32
- #namedtype.NamedType('unsigned-integer-value', Unsigned32()),
+ # namedtype.NamedType('unsigned-integer-value', Unsigned32()),
namedtype.NamedType('gauge32-value', Gauge32())
- ) # BITS misplaced?
+ ) # BITS misplaced?
+
class ObjectSyntax(univ.Choice):
componentType = namedtype.NamedTypes(
diff --git a/pysnmp/proto/rfc1905.py b/pysnmp/proto/rfc1905.py
index 0b7136a8..d346e468 100644
--- a/pysnmp/proto/rfc1905.py
+++ b/pysnmp/proto/rfc1905.py
@@ -21,6 +21,7 @@ max_bindings = rfc1902.Integer(2147483647)
UnSpecified = univ.Null
unSpecified = UnSpecified()
+
class NoSuchObject(univ.Null):
tagSet = univ.Null.tagSet.tagImplicitly(
tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0x00)
@@ -29,8 +30,10 @@ class NoSuchObject(univ.Null):
def prettyPrint(self, scope=0):
return 'No Such Object currently exists at this OID'
+
noSuchObject = NoSuchObject()
+
class NoSuchInstance(univ.Null):
tagSet = univ.Null.tagSet.tagImplicitly(
tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0x01)
@@ -38,8 +41,11 @@ class NoSuchInstance(univ.Null):
def prettyPrint(self, scope=0):
return 'No Such Instance currently exists at this OID'
+
+
noSuchInstance = NoSuchInstance()
+
class EndOfMibView(univ.Null):
tagSet = univ.Null.tagSet.tagImplicitly(
tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0x02)
@@ -48,8 +54,10 @@ class EndOfMibView(univ.Null):
def prettyPrint(self, scope=0):
return 'No more variables left in this MIB View'
+
endOfMibView = EndOfMibView()
+
# Made a separate class for better readability
class _BindValue(univ.Choice):
componentType = namedtype.NamedTypes(
@@ -60,32 +68,45 @@ class _BindValue(univ.Choice):
namedtype.NamedType('endOfMibView', endOfMibView)
)
+
class VarBind(univ.Sequence):
componentType = namedtype.NamedTypes(
namedtype.NamedType('name', rfc1902.ObjectName()),
namedtype.NamedType('', _BindValue())
)
+
class VarBindList(univ.SequenceOf):
componentType = VarBind()
subtypeSpec = univ.SequenceOf.subtypeSpec + constraint.ValueSizeConstraint(
0, max_bindings
)
-errorStatus = univ.Integer(namedValues=namedval.NamedValues(('noError', 0), ('tooBig', 1), ('noSuchName', 2), ('badValue', 3), ('readOnly', 4), ('genErr', 5), ('noAccess', 6), ('wrongType', 7), ('wrongLength', 8), ('wrongEncoding', 9), ('wrongValue', 10), ('noCreation', 11), ('inconsistentValue', 12), ('resourceUnavailable', 13), ('commitFailed', 14), ('undoFailed', 15), ('authorizationError', 16), ('notWritable', 17), ('inconsistentName', 18)))
+
+errorStatus = univ.Integer(
+ namedValues=namedval.NamedValues(('noError', 0), ('tooBig', 1), ('noSuchName', 2), ('badValue', 3), ('readOnly', 4),
+ ('genErr', 5), ('noAccess', 6), ('wrongType', 7), ('wrongLength', 8),
+ ('wrongEncoding', 9), ('wrongValue', 10), ('noCreation', 11),
+ ('inconsistentValue', 12), ('resourceUnavailable', 13), ('commitFailed', 14),
+ ('undoFailed', 15), ('authorizationError', 16), ('notWritable', 17),
+ ('inconsistentName', 18)))
+
# Base class for a non-bulk PDU
class PDU(univ.Sequence):
componentType = namedtype.NamedTypes(
namedtype.NamedType('request-id', rfc1902.Integer32()),
namedtype.NamedType('error-status', errorStatus),
- namedtype.NamedType('error-index', univ.Integer().subtype(subtypeSpec=constraint.ValueRangeConstraint(0, max_bindings))),
+ namedtype.NamedType('error-index',
+ univ.Integer().subtype(subtypeSpec=constraint.ValueRangeConstraint(0, max_bindings))),
namedtype.NamedType('variable-bindings', VarBindList())
)
+
nonRepeaters = univ.Integer().subtype(subtypeSpec=constraint.ValueRangeConstraint(0, max_bindings))
maxRepetitions = univ.Integer().subtype(subtypeSpec=constraint.ValueRangeConstraint(0, max_bindings))
+
# Base class for bulk PDU
class BulkPDU(univ.Sequence):
componentType = namedtype.NamedTypes(
@@ -95,46 +116,55 @@ class BulkPDU(univ.Sequence):
namedtype.NamedType('variable-bindings', VarBindList())
)
+
class GetRequestPDU(PDU):
tagSet = PDU.tagSet.tagImplicitly(
tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0)
)
+
class GetNextRequestPDU(PDU):
tagSet = PDU.tagSet.tagImplicitly(
tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1)
)
+
class ResponsePDU(PDU):
tagSet = PDU.tagSet.tagImplicitly(
tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 2)
)
+
class SetRequestPDU(PDU):
tagSet = PDU.tagSet.tagImplicitly(
tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 3)
)
+
class GetBulkRequestPDU(BulkPDU):
tagSet = PDU.tagSet.tagImplicitly(
tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 5)
)
+
class InformRequestPDU(PDU):
tagSet = PDU.tagSet.tagImplicitly(
tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 6)
)
+
class SNMPv2TrapPDU(PDU):
tagSet = PDU.tagSet.tagImplicitly(
tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 7)
)
+
class ReportPDU(PDU):
tagSet = PDU.tagSet.tagImplicitly(
tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 8)
)
+
class PDUs(univ.Choice):
componentType = namedtype.NamedTypes(
namedtype.NamedType('get-request', GetRequestPDU()),
diff --git a/pysnmp/proto/rfc3412.py b/pysnmp/proto/rfc3412.py
index cfe01288..9feab6e3 100644
--- a/pysnmp/proto/rfc3412.py
+++ b/pysnmp/proto/rfc3412.py
@@ -8,14 +8,16 @@ import sys
from pyasn1.compat.octets import null
from pysnmp.smi import builder, instrum
from pysnmp.proto import errind, error, cache
-from pysnmp.proto.api import verdec # XXX
+from pysnmp.proto.api import verdec # XXX
from pysnmp.error import PySnmpError
from pysnmp import nextid, debug
+
class MsgAndPduDispatcher:
"""SNMP engine PDU & message dispatcher. Exchanges SNMP PDU's with
applications and serialized messages with transport level.
"""
+
def __init__(self, mibInstrumController=None):
if mibInstrumController is None:
self.mibInstrumController = instrum.MibInstrumController(
@@ -68,7 +70,8 @@ class MsgAndPduDispatcher:
# 4.3.4
self.__appsRegistration[k] = processPdu
- debug.logger & debug.flagDsp and debug.logger('registerContextEngineId: contextEngineId %r pduTypes %s' % (contextEngineId, pduTypes))
+ debug.logger & debug.flagDsp and debug.logger(
+ 'registerContextEngineId: contextEngineId %r pduTypes %s' % (contextEngineId, pduTypes))
# 4.4.1
def unregisterContextEngineId(self, contextEngineId, pduTypes):
@@ -76,14 +79,16 @@ class MsgAndPduDispatcher:
# 4.3.4
if contextEngineId is None:
# Default to local snmpEngineId
- contextEngineId, = self.mibInstrumController.mibBuilder.importSymbols('__SNMP-FRAMEWORK-MIB', 'snmpEngineID')
+ contextEngineId, = self.mibInstrumController.mibBuilder.importSymbols('__SNMP-FRAMEWORK-MIB',
+ 'snmpEngineID')
for pduType in pduTypes:
k = (contextEngineId, pduType)
if k in self.__appsRegistration:
del self.__appsRegistration[k]
- debug.logger & debug.flagDsp and debug.logger('unregisterContextEngineId: contextEngineId %r pduTypes %s' % (contextEngineId, pduTypes))
+ debug.logger & debug.flagDsp and debug.logger(
+ 'unregisterContextEngineId: contextEngineId %r pduTypes %s' % (contextEngineId, pduTypes))
def getRegisteredApp(self, contextEngineId, pduType):
k = (contextEngineId, pduType)
@@ -91,7 +96,7 @@ class MsgAndPduDispatcher:
return self.__appsRegistration[k]
k = (null, pduType)
if k in self.__appsRegistration:
- return self.__appsRegistration[k] # wildcard
+ return self.__appsRegistration[k] # wildcard
# Dispatcher <-> application API
@@ -112,7 +117,8 @@ class MsgAndPduDispatcher:
errorIndication=errind.unsupportedMsgProcessingModel
)
- debug.logger & debug.flagDsp and debug.logger('sendPdu: securityName %s, PDU\n%s' % (securityName, PDU.prettyPrint()))
+ debug.logger & debug.flagDsp and debug.logger(
+ 'sendPdu: securityName %s, PDU\n%s' % (securityName, PDU.prettyPrint()))
# 4.1.1.3
sendPduHandle = self.__sendPduHandle()
@@ -121,14 +127,16 @@ class MsgAndPduDispatcher:
sendPduHandle,
messageProcessingModel=messageProcessingModel,
sendPduHandle=sendPduHandle,
- timeout=timeout+snmpEngine.transportDispatcher.getTimerTicks(),
+ timeout=timeout + snmpEngine.transportDispatcher.getTimerTicks(),
cbFun=cbFun,
cbCtx=cbCtx
)
- debug.logger & debug.flagDsp and debug.logger('sendPdu: current time %d ticks, one tick is %s seconds' % (snmpEngine.transportDispatcher.getTimerTicks(), snmpEngine.transportDispatcher.getTimerResolution()))
+ debug.logger & debug.flagDsp and debug.logger('sendPdu: current time %d ticks, one tick is %s seconds' % (
+ snmpEngine.transportDispatcher.getTimerTicks(), snmpEngine.transportDispatcher.getTimerResolution()))
- debug.logger & debug.flagDsp and debug.logger('sendPdu: new sendPduHandle %s, timeout %s ticks, cbFun %s' % (sendPduHandle, timeout, cbFun))
+ debug.logger & debug.flagDsp and debug.logger(
+ 'sendPdu: new sendPduHandle %s, timeout %s ticks, cbFun %s' % (sendPduHandle, timeout, cbFun))
origTransportDomain = transportDomain
origTransportAddress = transportAddress
@@ -138,11 +146,11 @@ class MsgAndPduDispatcher:
(transportDomain,
transportAddress,
outgoingMessage) = mpHandler.prepareOutgoingMessage(
- snmpEngine, origTransportDomain, origTransportAddress,
- messageProcessingModel, securityModel, securityName,
- securityLevel, contextEngineId, contextName,
- pduVersion, PDU, expectResponse, sendPduHandle
- )
+ snmpEngine, origTransportDomain, origTransportAddress,
+ messageProcessingModel, securityModel, securityName,
+ securityLevel, contextEngineId, contextName,
+ pduVersion, PDU, expectResponse, sendPduHandle
+ )
debug.logger & debug.flagDsp and debug.logger('sendPdu: MP succeeded')
except PySnmpError:
@@ -213,18 +221,19 @@ class MsgAndPduDispatcher:
errorIndication=errind.unsupportedMsgProcessingModel
)
- debug.logger & debug.flagDsp and debug.logger('returnResponsePdu: PDU %s' % (PDU and PDU.prettyPrint() or "<empty>",))
+ debug.logger & debug.flagDsp and debug.logger(
+ 'returnResponsePdu: PDU %s' % (PDU and PDU.prettyPrint() or "<empty>",))
# 4.1.2.2
try:
(transportDomain,
transportAddress,
outgoingMessage) = mpHandler.prepareResponseMessage(
- snmpEngine, messageProcessingModel, securityModel,
- securityName, securityLevel, contextEngineId, contextName,
- pduVersion, PDU, maxSizeResponseScopedPDU, stateReference,
- statusInformation
- )
+ snmpEngine, messageProcessingModel, securityModel,
+ securityName, securityLevel, contextEngineId, contextName,
+ pduVersion, PDU, maxSizeResponseScopedPDU, stateReference,
+ statusInformation
+ )
debug.logger & debug.flagDsp and debug.logger('returnResponsePdu: MP suceeded')
@@ -233,9 +242,10 @@ class MsgAndPduDispatcher:
raise
# Handle oversized messages XXX transport constrains?
- snmpEngineMaxMessageSize, = self.mibInstrumController.mibBuilder.importSymbols('__SNMP-FRAMEWORK-MIB', 'snmpEngineMaxMessageSize')
+ snmpEngineMaxMessageSize, = self.mibInstrumController.mibBuilder.importSymbols('__SNMP-FRAMEWORK-MIB',
+ 'snmpEngineMaxMessageSize')
if snmpEngineMaxMessageSize.syntax and \
- len(outgoingMessage) > snmpEngineMaxMessageSize.syntax:
+ len(outgoingMessage) > snmpEngineMaxMessageSize.syntax:
snmpSilentDrops, = self.mibInstrumController.mibBuilder.importSymbols('__SNMPv2-MIB', 'snmpSilentDrops')
snmpSilentDrops.syntax += 1
raise error.StatusInformation(errorIndication=errind.tooBig)
@@ -271,15 +281,16 @@ class MsgAndPduDispatcher:
# 4.2.1.1
snmpInPkts, = self.mibInstrumController.mibBuilder.importSymbols(
'__SNMPv2-MIB', 'snmpInPkts'
- )
+ )
snmpInPkts.syntax += 1
# 4.2.1.2
try:
- restOfWholeMsg = null # XXX fix decoder non-recursive return
+ restOfWholeMsg = null # XXX fix decoder non-recursive return
msgVersion = verdec.decodeMessageVersion(wholeMsg)
except error.ProtocolError:
- snmpInASNParseErrs, = self.mibInstrumController.mibBuilder.importSymbols('__SNMPv2-MIB', 'snmpInASNParseErrs')
+ snmpInASNParseErrs, = self.mibInstrumController.mibBuilder.importSymbols('__SNMPv2-MIB',
+ 'snmpInASNParseErrs')
snmpInASNParseErrs.syntax += 1
return null # n.b the whole buffer gets dropped
@@ -304,8 +315,8 @@ class MsgAndPduDispatcher:
pduVersion, PDU, pduType, sendPduHandle,
maxSizeResponseScopedPDU, statusInformation,
stateReference) = mpHandler.prepareDataElements(
- snmpEngine, transportDomain, transportAddress, wholeMsg
- )
+ snmpEngine, transportDomain, transportAddress, wholeMsg
+ )
debug.logger & debug.flagDsp and debug.logger('receiveMessage: MP succeded')
@@ -314,7 +325,8 @@ class MsgAndPduDispatcher:
if 'sendPduHandle' in statusInformation:
# Dropped REPORT -- re-run pending reqs queue as some
# of them may be waiting for this REPORT
- debug.logger & debug.flagDsp and debug.logger('receiveMessage: MP failed, statusInformation %s, forcing a retry' % statusInformation)
+ debug.logger & debug.flagDsp and debug.logger(
+ 'receiveMessage: MP failed, statusInformation %s, forcing a retry' % statusInformation)
self.__expireRequest(
statusInformation['sendPduHandle'],
self.__cache.pop(statusInformation['sendPduHandle']),
@@ -336,7 +348,8 @@ class MsgAndPduDispatcher:
# 4.2.2.1.2
if processPdu is None:
# 4.2.2.1.2.a
- snmpUnknownPDUHandlers, = self.mibInstrumController.mibBuilder.importSymbols('__SNMP-MPD-MIB', 'snmpUnknownPDUHandlers')
+ snmpUnknownPDUHandlers, = self.mibInstrumController.mibBuilder.importSymbols('__SNMP-MPD-MIB',
+ 'snmpUnknownPDUHandlers')
snmpUnknownPDUHandlers.syntax += 1
# 4.2.2.1.2.b
@@ -354,15 +367,16 @@ class MsgAndPduDispatcher:
(destTransportDomain,
destTransportAddress,
outgoingMessage) = mpHandler.prepareResponseMessage(
- snmpEngine, messageProcessingModel,
- securityModel, securityName, securityLevel,
- contextEngineId, contextName, pduVersion,
- PDU, maxSizeResponseScopedPDU, stateReference,
- statusInformation
- )
+ snmpEngine, messageProcessingModel,
+ securityModel, securityName, securityLevel,
+ contextEngineId, contextName, pduVersion,
+ PDU, maxSizeResponseScopedPDU, stateReference,
+ statusInformation
+ )
except error.StatusInformation:
- debug.logger & debug.flagDsp and debug.logger('receiveMessage: report failed, statusInformation %s' % sys.exc_info()[1])
+ debug.logger & debug.flagDsp and debug.logger(
+ 'receiveMessage: report failed, statusInformation %s' % sys.exc_info()[1])
return restOfWholeMsg
# 4.2.2.1.2.c
@@ -372,7 +386,7 @@ class MsgAndPduDispatcher:
destTransportAddress
)
- except PySnmpError: # XXX
+ except PySnmpError: # XXX
pass
debug.logger & debug.flagDsp and debug.logger('receiveMessage: reporting succeeded')
@@ -424,11 +438,13 @@ class MsgAndPduDispatcher:
# 4.2.2.2.2
if cachedParams is None:
- snmpUnknownPDUHandlers, = self.mibInstrumController.mibBuilder.importSymbols('__SNMP-MPD-MIB', 'snmpUnknownPDUHandlers')
+ snmpUnknownPDUHandlers, = self.mibInstrumController.mibBuilder.importSymbols('__SNMP-MPD-MIB',
+ 'snmpUnknownPDUHandlers')
snmpUnknownPDUHandlers.syntax += 1
return restOfWholeMsg
- debug.logger & debug.flagDsp and debug.logger('receiveMessage: cache read by sendPduHandle %s' % sendPduHandle)
+ debug.logger & debug.flagDsp and debug.logger(
+ 'receiveMessage: cache read by sendPduHandle %s' % sendPduHandle)
# 4.2.2.2.3
# no-op ? XXX
@@ -476,6 +492,7 @@ class MsgAndPduDispatcher:
# Cache expiration stuff
+ # noinspection PyUnusedLocal
def __expireRequest(self, cacheKey, cachedParams, snmpEngine,
statusInformation=None):
timeNow = snmpEngine.transportDispatcher.getTimerTicks()
@@ -504,5 +521,6 @@ class MsgAndPduDispatcher:
cachedParams['cbCtx'])
return True
+ # noinspection PyUnusedLocal
def receiveTimerTick(self, snmpEngine, timeNow):
self.__cache.expire(self.__expireRequest, snmpEngine)
diff --git a/pysnmp/proto/secmod/base.py b/pysnmp/proto/secmod/base.py
index 2e4d73ed..af5ddb23 100644
--- a/pysnmp/proto/secmod/base.py
+++ b/pysnmp/proto/secmod/base.py
@@ -7,8 +7,10 @@
from pysnmp.proto.secmod import cache
from pysnmp.proto import error
+
class AbstractSecurityModel:
securityModelID = None
+
def __init__(self):
self._cache = cache.Cache()
diff --git a/pysnmp/proto/secmod/cache.py b/pysnmp/proto/secmod/cache.py
index b9e11821..7e6fa6df 100644
--- a/pysnmp/proto/secmod/cache.py
+++ b/pysnmp/proto/secmod/cache.py
@@ -7,8 +7,10 @@
from pysnmp import nextid
from pysnmp.proto import error
+
class Cache:
__stateReference = nextid.Integer(0xffffff)
+
def __init__(self):
self.__cacheEntries = {}
diff --git a/pysnmp/proto/secmod/rfc2576.py b/pysnmp/proto/secmod/rfc2576.py
index 906852fc..da5a68a8 100644
--- a/pysnmp/proto/secmod/rfc2576.py
+++ b/pysnmp/proto/secmod/rfc2576.py
@@ -13,8 +13,10 @@ from pysnmp.smi.error import NoSuchInstanceError
from pysnmp.proto import errind, error
from pysnmp import debug
+
class SnmpV1SecurityModel(base.AbstractSecurityModel):
securityModelID = 1
+
# According to rfc2576, community name <-> contextEngineId/contextName
# mapping is up to MP module for notifications but belongs to secmod
# responsibility for other PDU types. Since I do not yet understand
@@ -26,9 +28,11 @@ class SnmpV1SecurityModel(base.AbstractSecurityModel):
base.AbstractSecurityModel.__init__(self)
def _sec2com(self, snmpEngine, securityName, contextEngineId, contextName):
- snmpTargetParamsSecurityName, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('SNMP-TARGET-MIB', 'snmpTargetParamsSecurityName')
+ snmpTargetParamsSecurityName, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols(
+ 'SNMP-TARGET-MIB', 'snmpTargetParamsSecurityName')
if self.__paramsBranchId != snmpTargetParamsSecurityName.branchVersionId:
- snmpTargetParamsSecurityModel, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('SNMP-TARGET-MIB', 'snmpTargetParamsSecurityModel')
+ snmpTargetParamsSecurityModel, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols(
+ 'SNMP-TARGET-MIB', 'snmpTargetParamsSecurityModel')
self.__nameToModelMap = {}
@@ -55,14 +59,15 @@ class SnmpV1SecurityModel(base.AbstractSecurityModel):
# invalidate next map as it include this one
self.__securityBranchId = -1
- snmpCommunityName, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('SNMP-COMMUNITY-MIB', 'snmpCommunityName')
+ snmpCommunityName, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('SNMP-COMMUNITY-MIB',
+ 'snmpCommunityName')
if self.__securityBranchId != snmpCommunityName.branchVersionId:
(snmpCommunitySecurityName,
snmpCommunityContextEngineId,
snmpCommunityContextName) = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols(
- 'SNMP-COMMUNITY-MIB', 'snmpCommunitySecurityName',
- 'snmpCommunityContextEngineID', 'snmpCommunityContextName'
- )
+ 'SNMP-COMMUNITY-MIB', 'snmpCommunitySecurityName',
+ 'snmpCommunityContextEngineID', 'snmpCommunityContextName'
+ )
self.__securityMap = {}
@@ -78,7 +83,8 @@ class SnmpV1SecurityModel(base.AbstractSecurityModel):
_securityName = snmpCommunitySecurityName.getNode(snmpCommunitySecurityName.name + instId).syntax
- _contextEngineId = snmpCommunityContextEngineId.getNode(snmpCommunityContextEngineId.name + instId).syntax
+ _contextEngineId = snmpCommunityContextEngineId.getNode(
+ snmpCommunityContextEngineId.name + instId).syntax
_contextName = snmpCommunityContextName.getNode(snmpCommunityContextName.name + instId).syntax
@@ -88,7 +94,9 @@ class SnmpV1SecurityModel(base.AbstractSecurityModel):
self.__securityBranchId = snmpCommunityName.branchVersionId
- debug.logger & debug.flagSM and debug.logger('_sec2com: built securityName to communityName map, version %s: %s' % (self.__securityBranchId, self.__securityMap))
+ debug.logger & debug.flagSM and debug.logger(
+ '_sec2com: built securityName to communityName map, version %s: %s' % (
+ self.__securityBranchId, self.__securityMap))
try:
return self.__securityMap[(securityName,
@@ -101,13 +109,14 @@ class SnmpV1SecurityModel(base.AbstractSecurityModel):
)
def _com2sec(self, snmpEngine, communityName, transportInformation):
- snmpTargetAddrTAddress, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('SNMP-TARGET-MIB', 'snmpTargetAddrTAddress')
+ snmpTargetAddrTAddress, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols(
+ 'SNMP-TARGET-MIB', 'snmpTargetAddrTAddress')
if self.__transportBranchId != snmpTargetAddrTAddress.branchVersionId:
(SnmpTagValue, snmpTargetAddrTDomain,
snmpTargetAddrTagList) = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols(
- 'SNMP-TARGET-MIB', 'SnmpTagValue', 'snmpTargetAddrTDomain',
- 'snmpTargetAddrTagList'
- )
+ 'SNMP-TARGET-MIB', 'SnmpTagValue', 'snmpTargetAddrTDomain',
+ 'snmpTargetAddrTagList'
+ )
self.__emptyTag = SnmpTagValue('')
@@ -126,10 +135,12 @@ class SnmpV1SecurityModel(base.AbstractSecurityModel):
targetAddrTDomain = tuple(targetAddrTDomain)
if targetAddrTDomain[:len(udp.snmpUDPDomain)] == udp.snmpUDPDomain:
- SnmpUDPAddress, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('SNMPv2-TM', 'SnmpUDPAddress')
+ SnmpUDPAddress, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('SNMPv2-TM',
+ 'SnmpUDPAddress')
targetAddrTAddress = tuple(SnmpUDPAddress(targetAddrTAddress))
elif targetAddrTDomain[:len(udp6.snmpUDP6Domain)] == udp6.snmpUDP6Domain:
- TransportAddressIPv6, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('TRANSPORT-ADDRESS-MIB', 'TransportAddressIPv6')
+ TransportAddressIPv6, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols(
+ 'TRANSPORT-ADDRESS-MIB', 'TransportAddressIPv6')
targetAddrTAddress = tuple(TransportAddressIPv6(targetAddrTAddress))
elif targetAddrTDomain[:len(unix.snmpLocalDomain)] == unix.snmpLocalDomain:
targetAddrTAddress = str(targetAddrTAddress)
@@ -147,11 +158,14 @@ class SnmpV1SecurityModel(base.AbstractSecurityModel):
self.__transportBranchId = snmpTargetAddrTAddress.branchVersionId
- debug.logger & debug.flagSM and debug.logger('_com2sec: built transport-to-tag map version %s: %s' % (self.__transportBranchId, self.__transportToTagMap))
+ debug.logger & debug.flagSM and debug.logger('_com2sec: built transport-to-tag map version %s: %s' % (
+ self.__transportBranchId, self.__transportToTagMap))
- snmpTargetParamsSecurityName, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('SNMP-TARGET-MIB', 'snmpTargetParamsSecurityName')
+ snmpTargetParamsSecurityName, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols(
+ 'SNMP-TARGET-MIB', 'snmpTargetParamsSecurityName')
if self.__paramsBranchId != snmpTargetParamsSecurityName.branchVersionId:
- snmpTargetParamsSecurityModel, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('SNMP-TARGET-MIB', 'snmpTargetParamsSecurityModel')
+ snmpTargetParamsSecurityModel, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols(
+ 'SNMP-TARGET-MIB', 'snmpTargetParamsSecurityModel')
self.__nameToModelMap = {}
@@ -178,17 +192,20 @@ class SnmpV1SecurityModel(base.AbstractSecurityModel):
# invalidate next map as it include this one
self.__communityBranchId = -1
- debug.logger & debug.flagSM and debug.logger('_com2sec: built securityName to securityModel map, version %s: %s' % (self.__paramsBranchId, self.__nameToModelMap))
+ debug.logger & debug.flagSM and debug.logger(
+ '_com2sec: built securityName to securityModel map, version %s: %s' % (
+ self.__paramsBranchId, self.__nameToModelMap))
- snmpCommunityName, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('SNMP-COMMUNITY-MIB', 'snmpCommunityName')
+ snmpCommunityName, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('SNMP-COMMUNITY-MIB',
+ 'snmpCommunityName')
if self.__communityBranchId != snmpCommunityName.branchVersionId:
(snmpCommunitySecurityName, snmpCommunityContextEngineId,
snmpCommunityContextName,
snmpCommunityTransportTag) = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols(
- 'SNMP-COMMUNITY-MIB', 'snmpCommunitySecurityName',
- 'snmpCommunityContextEngineID', 'snmpCommunityContextName',
- 'snmpCommunityTransportTag'
- )
+ 'SNMP-COMMUNITY-MIB', 'snmpCommunitySecurityName',
+ 'snmpCommunityContextEngineID', 'snmpCommunityContextName',
+ 'snmpCommunityTransportTag'
+ )
self.__communityToTagMap = {}
self.__tagAndCommunityToSecurityMap = {}
@@ -205,7 +222,8 @@ class SnmpV1SecurityModel(base.AbstractSecurityModel):
securityName = snmpCommunitySecurityName.getNode(snmpCommunitySecurityName.name + instId).syntax
- contextEngineId = snmpCommunityContextEngineId.getNode(snmpCommunityContextEngineId.name + instId).syntax
+ contextEngineId = snmpCommunityContextEngineId.getNode(
+ snmpCommunityContextEngineId.name + instId).syntax
contextName = snmpCommunityContextName.getNode(snmpCommunityContextName.name + instId).syntax
@@ -227,13 +245,18 @@ class SnmpV1SecurityModel(base.AbstractSecurityModel):
self.__communityBranchId = snmpCommunityName.branchVersionId
- debug.logger & debug.flagSM and debug.logger('_com2sec: built communityName to tag map (securityModel %s), version %s: %s' % (self.securityModelID, self.__communityBranchId, self.__communityToTagMap))
+ debug.logger & debug.flagSM and debug.logger(
+ '_com2sec: built communityName to tag map (securityModel %s), version %s: %s' % (
+ self.securityModelID, self.__communityBranchId, self.__communityToTagMap))
- debug.logger & debug.flagSM and debug.logger('_com2sec: built tag & community to securityName map (securityModel %s), version %s: %s' % (self.securityModelID, self.__communityBranchId, self.__tagAndCommunityToSecurityMap))
+ debug.logger & debug.flagSM and debug.logger(
+ '_com2sec: built tag & community to securityName map (securityModel %s), version %s: %s' % (
+ self.securityModelID, self.__communityBranchId, self.__tagAndCommunityToSecurityMap))
if communityName in self.__communityToTagMap:
if transportInformation in self.__transportToTagMap:
- tags = self.__transportToTagMap[transportInformation].intersection(self.__communityToTagMap[communityName])
+ tags = self.__transportToTagMap[transportInformation].intersection(
+ self.__communityToTagMap[communityName])
elif self.__emptyTag in self.__communityToTagMap[communityName]:
tags = [self.__emptyTag]
else:
@@ -247,9 +270,14 @@ class SnmpV1SecurityModel(base.AbstractSecurityModel):
# 5.2.1 (row selection in snmpCommunityTable)
# Picks first match but favors entries already in targets table
if candidateSecurityNames:
- candidateSecurityNames.sort(key=lambda x, m=self.__nameToModelMap, v=self.securityModelID: (not int(x[0] in m and v in m[x[0]]), str(x[0])))
+ candidateSecurityNames.sort(
+ key=lambda x, m=self.__nameToModelMap, v=self.securityModelID: (
+ not int(x[0] in m and v in m[x[0]]), str(x[0]))
+ )
chosenSecurityName = candidateSecurityNames[0] # min()
- debug.logger & debug.flagSM and debug.logger('_com2sec: securityName candidates for communityName \'%s\' are %s; choosing securityName \'%s\'' % (communityName, candidateSecurityNames, chosenSecurityName[0]))
+ debug.logger & debug.flagSM and debug.logger(
+ '_com2sec: securityName candidates for communityName \'%s\' are %s; choosing securityName \'%s\'' % (
+ communityName, candidateSecurityNames, chosenSecurityName[0]))
return chosenSecurityName
raise error.StatusInformation(errorIndication=errind.unknownCommunityName)
@@ -265,7 +293,9 @@ class SnmpV1SecurityModel(base.AbstractSecurityModel):
communityName = self._sec2com(snmpEngine, securityName,
contextEngineId, contextName)
- debug.logger & debug.flagSM and debug.logger('generateRequestMsg: using community %r for securityModel %r, securityName %r, contextEngineId %r contextName %r' % (communityName, securityModel, securityName, contextEngineId, contextName))
+ debug.logger & debug.flagSM and debug.logger(
+ 'generateRequestMsg: using community %r for securityModel %r, securityName %r, contextEngineId %r contextName %r' % (
+ communityName, securityModel, securityName, contextEngineId, contextName))
securityParameters = communityName
@@ -281,7 +311,8 @@ class SnmpV1SecurityModel(base.AbstractSecurityModel):
return securityParameters, encoder.encode(msg)
except PyAsn1Error:
- debug.logger & debug.flagMP and debug.logger('generateRequestMsg: serialization failure: %s' % sys.exc_info()[1])
+ debug.logger & debug.flagMP and debug.logger(
+ 'generateRequestMsg: serialization failure: %s' % sys.exc_info()[1])
raise error.StatusInformation(errorIndication=errind.serializationError)
def generateResponseMsg(self, snmpEngine, messageProcessingModel,
@@ -294,7 +325,9 @@ class SnmpV1SecurityModel(base.AbstractSecurityModel):
cachedSecurityData = self._cache.pop(securityStateReference)
communityName = cachedSecurityData['communityName']
- debug.logger & debug.flagSM and debug.logger('generateResponseMsg: recovered community %r by securityStateReference %s' % (communityName, securityStateReference))
+ debug.logger & debug.flagSM and debug.logger(
+ 'generateResponseMsg: recovered community %r by securityStateReference %s' % (
+ communityName, securityStateReference))
msg.setComponentByPosition(1, communityName)
msg.setComponentByPosition(2)
@@ -308,7 +341,8 @@ class SnmpV1SecurityModel(base.AbstractSecurityModel):
return communityName, encoder.encode(msg)
except PyAsn1Error:
- debug.logger & debug.flagMP and debug.logger('generateResponseMsg: serialization failure: %s' % sys.exc_info()[1])
+ debug.logger & debug.flagMP and debug.logger(
+ 'generateResponseMsg: serialization failure: %s' % sys.exc_info()[1])
raise error.StatusInformation(errorIndication=errind.serializationError)
def processIncomingMsg(self, snmpEngine, messageProcessingModel,
@@ -334,11 +368,13 @@ class SnmpV1SecurityModel(base.AbstractSecurityModel):
)
except error.StatusInformation:
- snmpInBadCommunityNames, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('__SNMPv2-MIB', 'snmpInBadCommunityNames')
+ snmpInBadCommunityNames, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols(
+ '__SNMPv2-MIB', 'snmpInBadCommunityNames')
snmpInBadCommunityNames.syntax += 1
raise error.StatusInformation(errorIndication=errind.unknownCommunityName)
- snmpEngineID, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('__SNMP-FRAMEWORK-MIB', 'snmpEngineID')
+ snmpEngineID, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('__SNMP-FRAMEWORK-MIB',
+ 'snmpEngineID')
securityEngineID = snmpEngineID.syntax
@@ -355,7 +391,9 @@ class SnmpV1SecurityModel(base.AbstractSecurityModel):
snmpEngine, 'rfc2576.processIncomingMsg'
)
- debug.logger & debug.flagSM and debug.logger('processIncomingMsg: looked up securityName %r securityModel %r contextEngineId %r contextName %r by communityName %r AND transportInformation %r' % (securityName, self.securityModelID, contextEngineId, contextName, communityName, transportInformation))
+ debug.logger & debug.flagSM and debug.logger(
+ 'processIncomingMsg: looked up securityName %r securityModel %r contextEngineId %r contextName %r by communityName %r AND transportInformation %r' % (
+ securityName, self.securityModelID, contextEngineId, contextName, communityName, transportInformation))
stateReference = self._cache.push(communityName=communityName)
@@ -364,11 +402,14 @@ class SnmpV1SecurityModel(base.AbstractSecurityModel):
maxSizeResponseScopedPDU = maxMessageSize - 128
securityStateReference = stateReference
- debug.logger & debug.flagSM and debug.logger('processIncomingMsg: generated maxSizeResponseScopedPDU %s securityStateReference %s' % (maxSizeResponseScopedPDU, securityStateReference))
+ debug.logger & debug.flagSM and debug.logger(
+ 'processIncomingMsg: generated maxSizeResponseScopedPDU %s securityStateReference %s' % (
+ maxSizeResponseScopedPDU, securityStateReference))
return (securityEngineID, securityName, scopedPDU,
maxSizeResponseScopedPDU, securityStateReference)
+
class SnmpV2cSecurityModel(SnmpV1SecurityModel):
securityModelID = 2
diff --git a/pysnmp/proto/secmod/rfc3414/__init__.py b/pysnmp/proto/secmod/rfc3414/__init__.py
index 59df8fb2..75d3de1c 100644
--- a/pysnmp/proto/secmod/rfc3414/__init__.py
+++ b/pysnmp/proto/secmod/rfc3414/__init__.py
@@ -1,3 +1,9 @@
+#
+# This file is part of pysnmp software.
+#
+# Copyright (c) 2005-2016, Ilya Etingof <ilya@glas.net>
+# License: http://pysnmp.sf.net/license.html
+#
from pysnmp.proto.secmod.rfc3414 import service
SnmpUSMSecurityModel = service.SnmpUSMSecurityModel
diff --git a/pysnmp/proto/secmod/rfc3414/auth/hmacmd5.py b/pysnmp/proto/secmod/rfc3414/auth/hmacmd5.py
index edd6d916..38f34483 100644
--- a/pysnmp/proto/secmod/rfc3414/auth/hmacmd5.py
+++ b/pysnmp/proto/secmod/rfc3414/auth/hmacmd5.py
@@ -8,21 +8,23 @@ try:
from hashlib import md5
except ImportError:
import md5
+
md5 = md5.new
from pyasn1.type import univ
from pysnmp.proto.secmod.rfc3414.auth import base
from pysnmp.proto.secmod.rfc3414 import localkey
from pysnmp.proto import errind, error
-_twelveZeros = univ.OctetString((0,)*12).asOctets()
-_fortyEightZeros = (0,)*48
+_twelveZeros = univ.OctetString((0,) * 12).asOctets()
+_fortyEightZeros = (0,) * 48
+
# rfc3414: 6.2.4
class HmacMd5(base.AbstractAuthenticationService):
serviceID = (1, 3, 6, 1, 6, 3, 10, 1, 1, 2) # usmHMACMD5AuthProtocol
- __ipad = [0x36]*64
- __opad = [0x5C]*64
+ __ipad = [0x36] * 64
+ __opad = [0x5C] * 64
def hashPassphrase(self, authKey):
return localkey.hashPassphraseMD5(authKey)
@@ -40,7 +42,7 @@ class HmacMd5(base.AbstractAuthenticationService):
if l == -1:
raise error.ProtocolError('Cant locate digest placeholder')
wholeHead = wholeMsg[:l]
- wholeTail = wholeMsg[l+12:]
+ wholeTail = wholeMsg[l + 12:]
# 6.3.1.1
@@ -51,21 +53,23 @@ class HmacMd5(base.AbstractAuthenticationService):
# 6.3.1.2c
k1 = univ.OctetString(
- map(lambda x, y: x^y, extendedAuthKey, self.__ipad)
+ map(lambda x, y: x ^ y, extendedAuthKey, self.__ipad)
)
# 6.3.1.2d --> noop
# 6.3.1.2e
k2 = univ.OctetString(
- map(lambda x, y: x^y, extendedAuthKey, self.__opad)
+ map(lambda x, y: x ^ y, extendedAuthKey, self.__opad)
)
# 6.3.1.3
- d1 = md5(k1.asOctets()+wholeMsg).digest()
+ # noinspection PyDeprecation,PyCallingNonCallable
+ d1 = md5(k1.asOctets() + wholeMsg).digest()
# 6.3.1.4
- d2 = md5(k2.asOctets()+d1).digest()
+ # noinspection PyDeprecation,PyCallingNonCallable
+ d2 = md5(k2.asOctets() + d1).digest()
mac = d2[:12]
# 6.3.1.5 & 6
@@ -84,7 +88,7 @@ class HmacMd5(base.AbstractAuthenticationService):
if l == -1:
raise error.ProtocolError('Cant locate digest in wholeMsg')
wholeHead = wholeMsg[:l]
- wholeTail = wholeMsg[l+12:]
+ wholeTail = wholeMsg[l + 12:]
authenticatedWholeMsg = wholeHead + _twelveZeros + wholeTail
# 6.3.2.4a
@@ -94,21 +98,23 @@ class HmacMd5(base.AbstractAuthenticationService):
# 6.3.2.4c
k1 = univ.OctetString(
- map(lambda x, y: x^y, extendedAuthKey, self.__ipad)
+ map(lambda x, y: x ^ y, extendedAuthKey, self.__ipad)
)
# 6.3.2.4d --> noop
# 6.3.2.4e
k2 = univ.OctetString(
- map(lambda x, y: x^y, extendedAuthKey, self.__opad)
+ map(lambda x, y: x ^ y, extendedAuthKey, self.__opad)
)
# 6.3.2.5a
- d1 = md5(k1.asOctets()+authenticatedWholeMsg).digest()
+ # noinspection PyDeprecation,PyCallingNonCallable
+ d1 = md5(k1.asOctets() + authenticatedWholeMsg).digest()
# 6.3.2.5b
- d2 = md5(k2.asOctets()+d1).digest()
+ # noinspection PyDeprecation,PyCallingNonCallable
+ d2 = md5(k2.asOctets() + d1).digest()
# 6.3.2.5c
mac = d2[:12]
diff --git a/pysnmp/proto/secmod/rfc3414/auth/hmacsha.py b/pysnmp/proto/secmod/rfc3414/auth/hmacsha.py
index 95c7af8d..ef365f1f 100644
--- a/pysnmp/proto/secmod/rfc3414/auth/hmacsha.py
+++ b/pysnmp/proto/secmod/rfc3414/auth/hmacsha.py
@@ -8,21 +8,23 @@ try:
from hashlib import sha1
except ImportError:
import sha
+
sha1 = sha.new
from pyasn1.type import univ
from pysnmp.proto.secmod.rfc3414.auth import base
from pysnmp.proto.secmod.rfc3414 import localkey
from pysnmp.proto import errind, error
-_twelveZeros = univ.OctetString((0,)*12).asOctets()
-_fortyFourZeros = (0,)*44
+_twelveZeros = univ.OctetString((0,) * 12).asOctets()
+_fortyFourZeros = (0,) * 44
+
# 7.2.4
class HmacSha(base.AbstractAuthenticationService):
serviceID = (1, 3, 6, 1, 6, 3, 10, 1, 1, 3) # usmHMACSHAAuthProtocol
- __ipad = [0x36]*64
- __opad = [0x5C]*64
+ __ipad = [0x36] * 64
+ __opad = [0x5C] * 64
def hashPassphrase(self, authKey):
return localkey.hashPassphraseSHA(authKey)
@@ -41,7 +43,7 @@ class HmacSha(base.AbstractAuthenticationService):
if l == -1:
raise error.ProtocolError('Cant locate digest placeholder')
wholeHead = wholeMsg[:l]
- wholeTail = wholeMsg[l+12:]
+ wholeTail = wholeMsg[l + 12:]
# 7.3.1.2a
extendedAuthKey = authKey.asNumbers() + _fortyFourZeros
@@ -50,21 +52,21 @@ class HmacSha(base.AbstractAuthenticationService):
# 7.3.1.2c
k1 = univ.OctetString(
- map(lambda x, y: x^y, extendedAuthKey, self.__ipad)
+ map(lambda x, y: x ^ y, extendedAuthKey, self.__ipad)
)
# 7.3.1.2d -- noop
# 7.3.1.2e
k2 = univ.OctetString(
- map(lambda x, y: x^y, extendedAuthKey, self.__opad)
+ map(lambda x, y: x ^ y, extendedAuthKey, self.__opad)
)
# 7.3.1.3
- d1 = sha1(k1.asOctets()+wholeMsg).digest()
+ d1 = sha1(k1.asOctets() + wholeMsg).digest()
# 7.3.1.4
- d2 = sha1(k2.asOctets()+d1).digest()
+ d2 = sha1(k2.asOctets() + d1).digest()
mac = d2[:12]
# 7.3.1.5 & 6
@@ -83,7 +85,7 @@ class HmacSha(base.AbstractAuthenticationService):
if l == -1:
raise error.ProtocolError('Cant locate digest in wholeMsg')
wholeHead = wholeMsg[:l]
- wholeTail = wholeMsg[l+12:]
+ wholeTail = wholeMsg[l + 12:]
authenticatedWholeMsg = wholeHead + _twelveZeros + wholeTail
# 7.3.2.4a
@@ -93,21 +95,21 @@ class HmacSha(base.AbstractAuthenticationService):
# 7.3.2.4c
k1 = univ.OctetString(
- map(lambda x, y: x^y, extendedAuthKey, self.__ipad)
+ map(lambda x, y: x ^ y, extendedAuthKey, self.__ipad)
)
# 7.3.2.4d --> noop
# 7.3.2.4e
k2 = univ.OctetString(
- map(lambda x, y: x^y, extendedAuthKey, self.__opad)
+ map(lambda x, y: x ^ y, extendedAuthKey, self.__opad)
)
# 7.3.2.5a
- d1 = sha1(k1.asOctets()+authenticatedWholeMsg).digest()
+ d1 = sha1(k1.asOctets() + authenticatedWholeMsg).digest()
# 7.3.2.5b
- d2 = sha1(k2.asOctets()+d1).digest()
+ d2 = sha1(k2.asOctets() + d1).digest()
# 7.3.2.5c
mac = d2[:12]
diff --git a/pysnmp/proto/secmod/rfc3414/auth/noauth.py b/pysnmp/proto/secmod/rfc3414/auth/noauth.py
index 721a2456..0a89391b 100644
--- a/pysnmp/proto/secmod/rfc3414/auth/noauth.py
+++ b/pysnmp/proto/secmod/rfc3414/auth/noauth.py
@@ -7,6 +7,7 @@
from pysnmp.proto.secmod.rfc3414.auth import base
from pysnmp.proto import errind, error
+
class NoAuth(base.AbstractAuthenticationService):
serviceID = (1, 3, 6, 1, 6, 3, 10, 1, 1, 1) # usmNoAuthProtocol
diff --git a/pysnmp/proto/secmod/rfc3414/localkey.py b/pysnmp/proto/secmod/rfc3414/localkey.py
index bb05d47c..e9b642c3 100644
--- a/pysnmp/proto/secmod/rfc3414/localkey.py
+++ b/pysnmp/proto/secmod/rfc3414/localkey.py
@@ -7,16 +7,21 @@
try:
from hashlib import md5, sha1
except ImportError:
- import md5, sha
+ import md5
+ import sha
+
md5 = md5.new
sha1 = sha.new
from pyasn1.type import univ
+
# RFC3414: A.2.1
def hashPassphraseMD5(passphrase):
passphrase = univ.OctetString(passphrase).asOctets()
+ # noinspection PyDeprecation,PyCallingNonCallable
md = md5()
- ringBuffer = passphrase * (passphrase and (64//len(passphrase)+1) or 1)
+ ringBuffer = passphrase * (passphrase and (64 // len(passphrase) + 1) or 1)
+ # noinspection PyTypeChecker
ringBufferLen = len(ringBuffer)
count = 0
mark = 0
@@ -27,24 +32,29 @@ def hashPassphraseMD5(passphrase):
mark = e
else:
md.update(
- ringBuffer[mark:ringBufferLen] + ringBuffer[0:e-ringBufferLen]
+ ringBuffer[mark:ringBufferLen] + ringBuffer[0:e - ringBufferLen]
)
- mark = e-ringBufferLen
+ mark = e - ringBufferLen
count += 1
return md.digest()
+
def localizeKeyMD5(passKey, snmpEngineId):
passKey = univ.OctetString(passKey).asOctets()
- return md5(passKey + snmpEngineId.asOctets()+ passKey).digest()
+ # noinspection PyDeprecation,PyCallingNonCallable
+ return md5(passKey + snmpEngineId.asOctets() + passKey).digest()
+
def passwordToKeyMD5(passphrase, snmpEngineId):
return localizeKeyMD5(hashPassphraseMD5(passphrase), snmpEngineId)
+
# RFC3414: A.2.2
def hashPassphraseSHA(passphrase):
passphrase = univ.OctetString(passphrase).asOctets()
md = sha1()
- ringBuffer = passphrase * (64//len(passphrase)+1)
+ ringBuffer = passphrase * (64 // len(passphrase) + 1)
+ # noinspection PyTypeChecker
ringBufferLen = len(ringBuffer)
count = 0
mark = 0
@@ -55,15 +65,17 @@ def hashPassphraseSHA(passphrase):
mark = e
else:
md.update(
- ringBuffer[mark:ringBufferLen] + ringBuffer[0:e-ringBufferLen]
+ ringBuffer[mark:ringBufferLen] + ringBuffer[0:e - ringBufferLen]
)
- mark = e-ringBufferLen
+ mark = e - ringBufferLen
count += 1
return md.digest()
+
def localizeKeySHA(passKey, snmpEngineId):
passKey = univ.OctetString(passKey).asOctets()
- return sha1(passKey + snmpEngineId.asOctets()+ passKey).digest()
+ return sha1(passKey + snmpEngineId.asOctets() + passKey).digest()
+
def passwordToKeySHA(passphrase, snmpEngineId):
return localizeKeySHA(hashPassphraseSHA(passphrase), snmpEngineId)
diff --git a/pysnmp/proto/secmod/rfc3414/priv/base.py b/pysnmp/proto/secmod/rfc3414/priv/base.py
index a9685d6d..224e19aa 100644
--- a/pysnmp/proto/secmod/rfc3414/priv/base.py
+++ b/pysnmp/proto/secmod/rfc3414/priv/base.py
@@ -6,6 +6,7 @@
#
from pysnmp.proto import error
+
class AbstractEncryptionService:
serviceID = None
diff --git a/pysnmp/proto/secmod/rfc3414/priv/des.py b/pysnmp/proto/secmod/rfc3414/priv/des.py
index e011fd66..16b882bd 100644
--- a/pysnmp/proto/secmod/rfc3414/priv/des.py
+++ b/pysnmp/proto/secmod/rfc3414/priv/des.py
@@ -19,12 +19,13 @@ except ImportError:
random.seed()
+
# 8.2.4
class Des(base.AbstractEncryptionService):
- serviceID = (1, 3, 6, 1, 6, 3, 10, 1, 2, 2) # usmDESPrivProtocol
+ serviceID = (1, 3, 6, 1, 6, 3, 10, 1, 2, 2) # usmDESPrivProtocol
if version_info < (2, 3):
- _localInt = int(random.random()*0xffffffff)
+ _localInt = int(random.random() * 0xffffffff)
else:
_localInt = random.randrange(0, 0xffffffff)
@@ -47,7 +48,7 @@ class Des(base.AbstractEncryptionService):
raise error.ProtocolError(
'Unknown auth protocol %s' % (authProtocol,)
)
- return localPrivKey[:32] # key+IV
+ return localPrivKey[:32] # key+IV
# 8.1.1.1
def __getEncryptionKey(self, privKey, snmpEngineBoots):
@@ -56,14 +57,14 @@ class Des(base.AbstractEncryptionService):
securityEngineBoots = int(snmpEngineBoots)
- salt = [securityEngineBoots>>24&0xff,
- securityEngineBoots>>16&0xff,
- securityEngineBoots>>8&0xff,
- securityEngineBoots&0xff,
- self._localInt>>24&0xff,
- self._localInt>>16&0xff,
- self._localInt>>8&0xff,
- self._localInt&0xff]
+ salt = [securityEngineBoots >> 24 & 0xff,
+ securityEngineBoots >> 16 & 0xff,
+ securityEngineBoots >> 8 & 0xff,
+ securityEngineBoots & 0xff,
+ self._localInt >> 24 & 0xff,
+ self._localInt >> 16 & 0xff,
+ self._localInt >> 8 & 0xff,
+ self._localInt & 0xff]
if self._localInt == 0xffffffff:
self._localInt = 0
else:
@@ -71,11 +72,12 @@ class Des(base.AbstractEncryptionService):
return (desKey.asOctets(),
univ.OctetString(salt).asOctets(),
- univ.OctetString(map(lambda x, y: x^y, salt, preIV.asNumbers())).asOctets())
+ univ.OctetString(map(lambda x, y: x ^ y, salt, preIV.asNumbers())).asOctets())
- def __getDecryptionKey(self, privKey, salt):
+ @staticmethod
+ def __getDecryptionKey(privKey, salt):
return (privKey[:8].asOctets(),
- univ.OctetString(map(lambda x, y: x^y, salt.asNumbers(), privKey[8:16].asNumbers())).asOctets())
+ univ.OctetString(map(lambda x, y: x ^ y, salt.asNumbers(), privKey[8:16].asNumbers())).asOctets())
# 8.2.4.1
def encryptData(self, encryptKey, privParameters, dataToEncrypt):
diff --git a/pysnmp/proto/secmod/rfc3414/priv/nopriv.py b/pysnmp/proto/secmod/rfc3414/priv/nopriv.py
index a1491aef..1d4499b3 100644
--- a/pysnmp/proto/secmod/rfc3414/priv/nopriv.py
+++ b/pysnmp/proto/secmod/rfc3414/priv/nopriv.py
@@ -7,8 +7,10 @@
from pysnmp.proto.secmod.rfc3414.priv import base
from pysnmp.proto import errind, error
+
class NoPriv(base.AbstractEncryptionService):
- serviceID = (1, 3, 6, 1, 6, 3, 10, 1, 2, 1) # usmNoPrivProtocol
+ serviceID = (1, 3, 6, 1, 6, 3, 10, 1, 2, 1) # usmNoPrivProtocol
+
def hashPassphrase(self, authProtocol, privKey):
return
diff --git a/pysnmp/proto/secmod/rfc3414/service.py b/pysnmp/proto/secmod/rfc3414/service.py
index 9976a06a..7bcf551c 100644
--- a/pysnmp/proto/secmod/rfc3414/service.py
+++ b/pysnmp/proto/secmod/rfc3414/service.py
@@ -4,7 +4,8 @@
# Copyright (c) 2005-2016, Ilya Etingof <ilya@glas.net>
# License: http://pysnmp.sf.net/license.html
#
-import time, sys
+import time
+import sys
from pysnmp.proto.secmod.base import AbstractSecurityModel
from pysnmp.proto.secmod.rfc3414.auth import hmacmd5, hmacsha, noauth
from pysnmp.proto.secmod.rfc3414.priv import des, nopriv
@@ -18,18 +19,23 @@ from pyasn1.codec.ber import encoder, decoder, eoo
from pyasn1.error import PyAsn1Error
from pyasn1.compat.octets import null
+
# USM security params
class UsmSecurityParameters(rfc1155.TypeCoercionHackMixIn, univ.Sequence):
componentType = namedtype.NamedTypes(
namedtype.NamedType('msgAuthoritativeEngineId', univ.OctetString()),
- namedtype.NamedType('msgAuthoritativeEngineBoots', univ.Integer().subtype(subtypeSpec=constraint.ValueRangeConstraint(0, 2147483647))),
- namedtype.NamedType('msgAuthoritativeEngineTime', univ.Integer().subtype(subtypeSpec=constraint.ValueRangeConstraint(0, 2147483647))),
- namedtype.NamedType('msgUserName', univ.OctetString().subtype(subtypeSpec=constraint.ValueSizeConstraint(0, 32))),
+ namedtype.NamedType('msgAuthoritativeEngineBoots',
+ univ.Integer().subtype(subtypeSpec=constraint.ValueRangeConstraint(0, 2147483647))),
+ namedtype.NamedType('msgAuthoritativeEngineTime',
+ univ.Integer().subtype(subtypeSpec=constraint.ValueRangeConstraint(0, 2147483647))),
+ namedtype.NamedType('msgUserName',
+ univ.OctetString().subtype(subtypeSpec=constraint.ValueSizeConstraint(0, 32))),
namedtype.NamedType('msgAuthenticationParameters', univ.OctetString()),
namedtype.NamedType('msgPrivacyParameters', univ.OctetString())
)
+
class SnmpUSMSecurityModel(AbstractSecurityModel):
securityModelID = 3
authServices = {hmacmd5.HmacMd5.serviceID: hmacmd5.HmacMd5(),
@@ -41,6 +47,7 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
aes192.Aes192.serviceID: aes192.Aes192(),
aes256.Aes256.serviceID: aes256.Aes256(),
nopriv.NoPriv.serviceID: nopriv.NoPriv()}
+
def __init__(self):
AbstractSecurityModel.__init__(self)
self.__securityParametersSpec = UsmSecurityParameters()
@@ -50,9 +57,12 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
self.__paramsBranchId = -1
def __sec2usr(self, snmpEngine, securityName, securityEngineID=None):
- usmUserEngineID, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('SNMP-USER-BASED-SM-MIB', 'usmUserEngineID')
+ mibBuilder = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder
+ usmUserEngineID, = mibBuilder.importSymbols('SNMP-USER-BASED-SM-MIB',
+ 'usmUserEngineID')
if self.__paramsBranchId != usmUserEngineID.branchVersionId:
- usmUserName, usmUserSecurityName = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('SNMP-USER-BASED-SM-MIB', 'usmUserName', 'usmUserSecurityName')
+ usmUserName, usmUserSecurityName = mibBuilder.importSymbols(
+ 'SNMP-USER-BASED-SM-MIB', 'usmUserName', 'usmUserSecurityName')
self.__securityToUserMap = {}
@@ -64,7 +74,9 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
except NoSuchInstanceError:
self.__paramsBranchId = usmUserEngineID.branchVersionId
- debug.logger & debug.flagSM and debug.logger('_sec2usr: built snmpEngineId + securityName to userName map, version %s: %r' % (self.__paramsBranchId, self.__securityToUserMap))
+ debug.logger & debug.flagSM and debug.logger(
+ '_sec2usr: built snmpEngineId + securityName to userName map, version %s: %r' % (
+ self.__paramsBranchId, self.__securityToUserMap))
break
instId = nextMibNode.name[len(usmUserSecurityName.name):]
@@ -80,20 +92,24 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
self.__securityToUserMap[k] = __userName
if securityEngineID is None:
- snmpEngineID, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('__SNMP-FRAMEWORK-MIB', 'snmpEngineID')
+ snmpEngineID, = mibBuilder.importSymbols('__SNMP-FRAMEWORK-MIB', 'snmpEngineID')
securityEngineID = snmpEngineID.syntax
try:
userName = self.__securityToUserMap[(securityEngineID, securityName)]
except KeyError:
- debug.logger & debug.flagSM and debug.logger('_sec2usr: no entry exists for snmpEngineId %r, securityName %r' % (securityEngineID, securityName))
+ debug.logger & debug.flagSM and debug.logger(
+ '_sec2usr: no entry exists for snmpEngineId %r, securityName %r' % (securityEngineID, securityName))
raise NoSuchInstanceError() # emulate MIB lookup
- debug.logger & debug.flagSM and debug.logger('_sec2usr: using userName %r for snmpEngineId %r, securityName %r' % (userName, securityEngineID, securityName))
+ debug.logger & debug.flagSM and debug.logger(
+ '_sec2usr: using userName %r for snmpEngineId %r, securityName %r' % (
+ userName, securityEngineID, securityName))
return userName
- def __getUserInfo(self, mibInstrumController, securityEngineID, userName):
+ @staticmethod
+ def __getUserInfo(mibInstrumController, securityEngineID, userName):
usmUserEntry, = mibInstrumController.mibBuilder.importSymbols(
'SNMP-USER-BASED-SM-MIB', 'usmUserEntry'
)
@@ -198,7 +214,9 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
securityModel, securityEngineID,
securityName, securityLevel,
scopedPDU, securityStateReference):
- snmpEngineID = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('__SNMP-FRAMEWORK-MIB', 'snmpEngineID')[0].syntax
+ mibBuilder = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder
+ snmpEngineID = mibBuilder.importSymbols('__SNMP-FRAMEWORK-MIB', 'snmpEngineID')[0].syntax
+
# 3.1.1
if securityStateReference is not None:
# 3.1.1a
@@ -232,14 +250,14 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
(usmUserName, usmUserSecurityName, usmUserAuthProtocol,
usmUserAuthKeyLocalized, usmUserPrivProtocol,
usmUserPrivKeyLocalized) = self.__getUserInfo(
- snmpEngine.msgAndPduDsp.mibInstrumController,
- securityEngineID,
- self.__sec2usr(snmpEngine, securityName, securityEngineID)
- )
+ snmpEngine.msgAndPduDsp.mibInstrumController,
+ securityEngineID,
+ self.__sec2usr(snmpEngine, securityName, securityEngineID)
+ )
debug.logger & debug.flagSM and debug.logger('__generateRequestOrResponseMsg: read user info')
except NoSuchInstanceError:
- pysnmpUsmDiscovery, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('__PYSNMP-USM-MIB', 'pysnmpUsmDiscovery')
+ pysnmpUsmDiscovery, = mibBuilder.importSymbols('__PYSNMP-USM-MIB', 'pysnmpUsmDiscovery')
reportUnknownName = not pysnmpUsmDiscovery.syntax
if not reportUnknownName:
try:
@@ -247,10 +265,10 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
usmUserAuthProtocol, usmUserAuthKeyLocalized,
usmUserPrivProtocol,
usmUserPrivKeyLocalized) = self.__cloneUserInfo(
- snmpEngine.msgAndPduDsp.mibInstrumController,
- securityEngineID,
- self.__sec2usr(snmpEngine, securityName)
- )
+ snmpEngine.msgAndPduDsp.mibInstrumController,
+ securityEngineID,
+ self.__sec2usr(snmpEngine, securityName)
+ )
except NoSuchInstanceError:
reportUnknownName = True
@@ -263,8 +281,9 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
debug.logger & debug.flagSM and debug.logger('__generateRequestOrResponseMsg: clone user info')
except PyAsn1Error:
- debug.logger & debug.flagSM and debug.logger('__generateRequestOrResponseMsg: %s' % (sys.exc_info()[1],))
- snmpInGenErrs, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('__SNMPv2-MIB', 'snmpInGenErrs')
+ debug.logger & debug.flagSM and debug.logger(
+ '__generateRequestOrResponseMsg: %s' % (sys.exc_info()[1],))
+ snmpInGenErrs, = mibBuilder.importSymbols('__SNMPv2-MIB', 'snmpInGenErrs')
snmpInGenErrs.syntax += 1
raise error.StatusInformation(
errorIndication=errind.invalidMsg
@@ -277,14 +296,17 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
usmUserAuthKeyLocalized = usmUserPrivKeyLocalized = None
debug.logger & debug.flagSM and debug.logger('__generateRequestOrResponseMsg: use empty USM data')
- debug.logger & debug.flagSM and debug.logger('__generateRequestOrResponseMsg: local usmUserName %r usmUserSecurityName %r usmUserAuthProtocol %s usmUserPrivProtocol %s securityEngineID %r securityName %r' % (usmUserName, usmUserSecurityName, usmUserAuthProtocol, usmUserPrivProtocol, securityEngineID, securityName))
+ # noinspection PyUnboundLocalVariable
+ debug.logger & debug.flagSM and debug.logger(
+ '__generateRequestOrResponseMsg: local usmUserName %r usmUserSecurityName %r usmUserAuthProtocol %s usmUserPrivProtocol %s securityEngineID %r securityName %r' % (
+ usmUserName, usmUserSecurityName, usmUserAuthProtocol, usmUserPrivProtocol, securityEngineID, securityName))
msg = globalData
# 3.1.2
if securityLevel == 3:
if usmUserAuthProtocol == noauth.NoAuth.serviceID or \
- usmUserPrivProtocol == nopriv.NoPriv.serviceID:
+ usmUserPrivProtocol == nopriv.NoPriv.serviceID:
raise error.StatusInformation(
errorIndication=errind.unsupportedSecurityLevel
)
@@ -308,24 +330,30 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
if securityEngineID in self.__timeline:
(snmpEngineBoots, snmpEngineTime, latestReceivedEngineTime,
latestUpdateTimestamp) = self.__timeline[securityEngineID]
- debug.logger & debug.flagSM and debug.logger('__generateRequestOrResponseMsg: read snmpEngineBoots, snmpEngineTime from timeline')
+ debug.logger & debug.flagSM and debug.logger(
+ '__generateRequestOrResponseMsg: read snmpEngineBoots, snmpEngineTime from timeline')
else:
# 2.3 XXX is this correct?
snmpEngineBoots = snmpEngineTime = 0
- debug.logger & debug.flagSM and debug.logger('__generateRequestOrResponseMsg: no timeline for securityEngineID %r' % (securityEngineID,))
+ debug.logger & debug.flagSM and debug.logger(
+ '__generateRequestOrResponseMsg: no timeline for securityEngineID %r' % (securityEngineID,))
# 3.1.6.b
elif securityStateReference is not None: # XXX Report?
(snmpEngineBoots,
- snmpEngineTime) = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('__SNMP-FRAMEWORK-MIB', 'snmpEngineBoots', 'snmpEngineTime')
+ snmpEngineTime) = mibBuilder.importSymbols('__SNMP-FRAMEWORK-MIB', 'snmpEngineBoots', 'snmpEngineTime')
snmpEngineBoots = snmpEngineBoots.syntax
snmpEngineTime = snmpEngineTime.syntax.clone()
- debug.logger & debug.flagSM and debug.logger('__generateRequestOrResponseMsg: read snmpEngineBoots, snmpEngineTime from LCD')
+ debug.logger & debug.flagSM and debug.logger(
+ '__generateRequestOrResponseMsg: read snmpEngineBoots, snmpEngineTime from LCD')
# 3.1.6.c
else:
snmpEngineBoots = snmpEngineTime = 0
- debug.logger & debug.flagSM and debug.logger('__generateRequestOrResponseMsg: assuming zero snmpEngineBoots, snmpEngineTime')
+ debug.logger & debug.flagSM and debug.logger(
+ '__generateRequestOrResponseMsg: assuming zero snmpEngineBoots, snmpEngineTime')
- debug.logger & debug.flagSM and debug.logger('__generateRequestOrResponseMsg: use snmpEngineBoots %s snmpEngineTime %s for securityEngineID %r' % (snmpEngineBoots, snmpEngineTime, securityEngineID))
+ debug.logger & debug.flagSM and debug.logger(
+ '__generateRequestOrResponseMsg: use snmpEngineBoots %s snmpEngineTime %s for securityEngineID %r' % (
+ snmpEngineBoots, snmpEngineTime, securityEngineID))
# 3.1.4a
if securityLevel == 3:
@@ -336,24 +364,28 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
errorIndication=errind.encryptionError
)
- debug.logger & debug.flagSM and debug.logger('__generateRequestOrResponseMsg: scopedPDU %s' % scopedPDU.prettyPrint())
+ debug.logger & debug.flagSM and debug.logger(
+ '__generateRequestOrResponseMsg: scopedPDU %s' % scopedPDU.prettyPrint())
try:
dataToEncrypt = encoder.encode(scopedPDU)
except PyAsn1Error:
- debug.logger & debug.flagSM and debug.logger('__generateRequestOrResponseMsg: scopedPDU serialization error: %s' % sys.exc_info()[1])
+ debug.logger & debug.flagSM and debug.logger(
+ '__generateRequestOrResponseMsg: scopedPDU serialization error: %s' % sys.exc_info()[1])
raise error.StatusInformation(
errorIndication=errind.serializationError
)
- debug.logger & debug.flagSM and debug.logger('__generateRequestOrResponseMsg: scopedPDU encoded into %s' % debug.hexdump(dataToEncrypt))
+ debug.logger & debug.flagSM and debug.logger(
+ '__generateRequestOrResponseMsg: scopedPDU encoded into %s' % debug.hexdump(dataToEncrypt))
+ # noinspection PyUnboundLocalVariable
(encryptedData,
privParameters) = privHandler.encryptData(
- usmUserPrivKeyLocalized,
- (snmpEngineBoots, snmpEngineTime, None), dataToEncrypt
- )
+ usmUserPrivKeyLocalized,
+ (snmpEngineBoots, snmpEngineTime, None), dataToEncrypt
+ )
securityParameters.setComponentByPosition(
5, privParameters, verifyConstraints=False
@@ -362,7 +394,8 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
1, encryptedData, verifyConstraints=False
)
- debug.logger & debug.flagSM and debug.logger('__generateRequestOrResponseMsg: scopedPDU ciphered into %s' % debug.hexdump(encryptedData))
+ debug.logger & debug.flagSM and debug.logger(
+ '__generateRequestOrResponseMsg: scopedPDU ciphered into %s' % debug.hexdump(encryptedData))
# 3.1.4b
elif securityLevel == 1 or securityLevel == 2:
@@ -400,28 +433,33 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
4, '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
)
- debug.logger & debug.flagSM and debug.logger('__generateRequestOrResponseMsg: %s' % (securityParameters.prettyPrint(),))
+ debug.logger & debug.flagSM and debug.logger(
+ '__generateRequestOrResponseMsg: %s' % (securityParameters.prettyPrint(),))
try:
msg.setComponentByPosition(2, encoder.encode(securityParameters), verifyConstraints=False)
except PyAsn1Error:
- debug.logger & debug.flagSM and debug.logger('__generateRequestOrResponseMsg: securityParameters serialization error: %s' % sys.exc_info()[1])
+ debug.logger & debug.flagSM and debug.logger(
+ '__generateRequestOrResponseMsg: securityParameters serialization error: %s' % sys.exc_info()[1])
raise error.StatusInformation(
errorIndication=errind.serializationError
)
- debug.logger & debug.flagSM and debug.logger('__generateRequestOrResponseMsg: auth outgoing msg: %s' % msg.prettyPrint())
+ debug.logger & debug.flagSM and debug.logger(
+ '__generateRequestOrResponseMsg: auth outgoing msg: %s' % msg.prettyPrint())
try:
wholeMsg = encoder.encode(msg)
except PyAsn1Error:
- debug.logger & debug.flagSM and debug.logger('__generateRequestOrResponseMsg: msg serialization error: %s' % sys.exc_info()[1])
+ debug.logger & debug.flagSM and debug.logger(
+ '__generateRequestOrResponseMsg: msg serialization error: %s' % sys.exc_info()[1])
raise error.StatusInformation(
errorIndication=errind.serializationError
)
+ # noinspection PyUnboundLocalVariable
authenticatedWholeMsg = authHandler.authenticateOutgoingMsg(
usmUserAuthKeyLocalized, wholeMsg
)
@@ -432,31 +470,36 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
4, '', verifyConstraints=False
)
- debug.logger & debug.flagSM and debug.logger('__generateRequestOrResponseMsg: %s' % (securityParameters.prettyPrint(),))
+ debug.logger & debug.flagSM and debug.logger(
+ '__generateRequestOrResponseMsg: %s' % (securityParameters.prettyPrint(),))
try:
msg.setComponentByPosition(2, encoder.encode(securityParameters), verifyConstraints=False)
except PyAsn1Error:
- debug.logger & debug.flagSM and debug.logger('__generateRequestOrResponseMsg: secutiryParameters serialization error: %s' % sys.exc_info()[1])
+ debug.logger & debug.flagSM and debug.logger(
+ '__generateRequestOrResponseMsg: secutiryParameters serialization error: %s' % sys.exc_info()[1])
raise error.StatusInformation(
errorIndication=errind.serializationError
)
try:
- debug.logger & debug.flagSM and debug.logger('__generateRequestOrResponseMsg: plain outgoing msg: %s' % msg.prettyPrint())
+ debug.logger & debug.flagSM and debug.logger(
+ '__generateRequestOrResponseMsg: plain outgoing msg: %s' % msg.prettyPrint())
authenticatedWholeMsg = encoder.encode(msg)
except PyAsn1Error:
- debug.logger & debug.flagSM and debug.logger('__generateRequestOrResponseMsg: msg serialization error: %s' % sys.exc_info()[1])
+ debug.logger & debug.flagSM and debug.logger(
+ '__generateRequestOrResponseMsg: msg serialization error: %s' % sys.exc_info()[1])
raise error.StatusInformation(
errorIndication=errind.serializationError
)
- debug.logger & debug.flagSM and debug.logger('__generateRequestOrResponseMsg: %s outgoing msg: %s' % (securityLevel > 1 and "authenticated" or "plain", debug.hexdump(authenticatedWholeMsg)))
+ debug.logger & debug.flagSM and debug.logger('__generateRequestOrResponseMsg: %s outgoing msg: %s' % (
+ securityLevel > 1 and "authenticated" or "plain", debug.hexdump(authenticatedWholeMsg)))
# 3.1.9
- return (msg.getComponentByPosition(2), authenticatedWholeMsg)
+ return msg.getComponentByPosition(2), authenticatedWholeMsg
def generateRequestMsg(self, snmpEngine, messageProcessingModel,
globalData, maxMessageSize, securityModel,
@@ -487,12 +530,15 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
def processIncomingMsg(self, snmpEngine, messageProcessingModel,
maxMessageSize, securityParameters,
securityModel, securityLevel, wholeMsg, msg):
+ mibBuilder = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder
+
# 3.2.9 -- moved up here to be able to report
# maxSizeResponseScopedPDU on error
# (48 - maximum SNMPv3 header length)
maxSizeResponseScopedPDU = int(maxMessageSize) - len(securityParameters) - 48
- debug.logger & debug.flagSM and debug.logger('processIncomingMsg: securityParameters %s' % debug.hexdump(securityParameters))
+ debug.logger & debug.flagSM and debug.logger(
+ 'processIncomingMsg: securityParameters %s' % debug.hexdump(securityParameters))
# 3.2.1
try:
@@ -502,7 +548,7 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
except PyAsn1Error:
debug.logger & debug.flagSM and debug.logger('processIncomingMsg: %s' % (sys.exc_info()[1],))
- snmpInASNParseErrs, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('__SNMPv2-MIB', 'snmpInASNParseErrs')
+ snmpInASNParseErrs, = mibBuilder.importSymbols('__SNMPv2-MIB', 'snmpInASNParseErrs')
snmpInASNParseErrs.syntax += 1
raise error.StatusInformation(errorIndication=errind.parseError)
@@ -517,15 +563,17 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
msgUserName=securityParameters.getComponentByPosition(3)
)
- debug.logger & debug.flagSM and debug.logger('processIncomingMsg: cache write securityStateReference %s by msgUserName %s' % (securityStateReference, securityParameters.getComponentByPosition(3)))
+ debug.logger & debug.flagSM and debug.logger(
+ 'processIncomingMsg: cache write securityStateReference %s by msgUserName %s' % (
+ securityStateReference, securityParameters.getComponentByPosition(3)))
scopedPduData = msg.getComponentByPosition(3)
# Used for error reporting
- contextEngineId = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('__SNMP-FRAMEWORK-MIB', 'snmpEngineID')[0].syntax
+ contextEngineId = mibBuilder.importSymbols('__SNMP-FRAMEWORK-MIB', 'snmpEngineID')[0].syntax
contextName = null
- snmpEngineID = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('__SNMP-FRAMEWORK-MIB', 'snmpEngineID')[0].syntax
+ snmpEngineID = mibBuilder.importSymbols('__SNMP-FRAMEWORK-MIB', 'snmpEngineID')[0].syntax
# 3.2.3
if msgAuthoritativeEngineId != snmpEngineID and \
@@ -533,20 +581,27 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
if msgAuthoritativeEngineId and \
4 < len(msgAuthoritativeEngineId) < 33:
# 3.2.3a - cloned user when request was sent
- debug.logger & debug.flagSM and debug.logger('processIncomingMsg: unsynchronized securityEngineID %r' % (msgAuthoritativeEngineId,))
+ debug.logger & debug.flagSM and debug.logger(
+ 'processIncomingMsg: unsynchronized securityEngineID %r' % (msgAuthoritativeEngineId,))
else:
# 3.2.3b
- debug.logger & debug.flagSM and debug.logger('processIncomingMsg: peer requested snmpEngineID discovery')
- usmStatsUnknownEngineIDs, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('__SNMP-USER-BASED-SM-MIB', 'usmStatsUnknownEngineIDs')
+ debug.logger & debug.flagSM and debug.logger(
+ 'processIncomingMsg: peer requested snmpEngineID discovery')
+ usmStatsUnknownEngineIDs, = mibBuilder.importSymbols(
+ '__SNMP-USER-BASED-SM-MIB', 'usmStatsUnknownEngineIDs')
usmStatsUnknownEngineIDs.syntax += 1
- debug.logger & debug.flagSM and debug.logger('processIncomingMsg: null or malformed msgAuthoritativeEngineId')
- pysnmpUsmDiscoverable, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('__PYSNMP-USM-MIB', 'pysnmpUsmDiscoverable')
+ debug.logger & debug.flagSM and debug.logger(
+ 'processIncomingMsg: null or malformed msgAuthoritativeEngineId')
+ pysnmpUsmDiscoverable, = mibBuilder.importSymbols(
+ '__PYSNMP-USM-MIB', 'pysnmpUsmDiscoverable')
if pysnmpUsmDiscoverable.syntax:
- debug.logger & debug.flagSM and debug.logger('processIncomingMsg: starting snmpEngineID discovery procedure')
+ debug.logger & debug.flagSM and debug.logger(
+ 'processIncomingMsg: starting snmpEngineID discovery procedure')
# Report original contextName
if scopedPduData.getName() != 'plaintext':
- debug.logger & debug.flagSM and debug.logger('processIncomingMsg: scopedPduData not plaintext %s' % scopedPduData.prettyPrint())
+ debug.logger & debug.flagSM and debug.logger(
+ 'processIncomingMsg: scopedPduData not plaintext %s' % scopedPduData.prettyPrint())
raise error.StatusInformation(
errorIndication=errind.unknownEngineID
)
@@ -576,7 +631,9 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
msgUserName = securityParameters.getComponentByPosition(3)
- debug.logger & debug.flagSM and debug.logger('processIncomingMsg: read from securityParams msgAuthoritativeEngineId %r msgUserName %r' % (msgAuthoritativeEngineId, msgUserName))
+ debug.logger & debug.flagSM and debug.logger(
+ 'processIncomingMsg: read from securityParams msgAuthoritativeEngineId %r msgUserName %r' % (
+ msgAuthoritativeEngineId, msgUserName))
if msgUserName:
# 3.2.4
@@ -584,14 +641,17 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
(usmUserName, usmUserSecurityName, usmUserAuthProtocol,
usmUserAuthKeyLocalized, usmUserPrivProtocol,
usmUserPrivKeyLocalized) = self.__getUserInfo(
- snmpEngine.msgAndPduDsp.mibInstrumController,
- msgAuthoritativeEngineId, msgUserName
- )
+ snmpEngine.msgAndPduDsp.mibInstrumController,
+ msgAuthoritativeEngineId, msgUserName
+ )
debug.logger & debug.flagSM and debug.logger('processIncomingMsg: read user info from LCD')
except NoSuchInstanceError:
- debug.logger & debug.flagSM and debug.logger('processIncomingMsg: unknown securityEngineID %r msgUserName %r' % (msgAuthoritativeEngineId, msgUserName))
- usmStatsUnknownUserNames, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('__SNMP-USER-BASED-SM-MIB', 'usmStatsUnknownUserNames')
+ debug.logger & debug.flagSM and debug.logger(
+ 'processIncomingMsg: unknown securityEngineID %r msgUserName %r' % (
+ msgAuthoritativeEngineId, msgUserName))
+ usmStatsUnknownUserNames, = mibBuilder.importSymbols(
+ '__SNMP-USER-BASED-SM-MIB', 'usmStatsUnknownUserNames')
usmStatsUnknownUserNames.syntax += 1
raise error.StatusInformation(
errorIndication=errind.unknownSecurityName,
@@ -606,7 +666,7 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
except PyAsn1Error:
debug.logger & debug.flagSM and debug.logger('processIncomingMsg: %s' % (sys.exc_info()[1],))
- snmpInGenErrs, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('__SNMPv2-MIB', 'snmpInGenErrs')
+ snmpInGenErrs, = mibBuilder.importSymbols('__SNMPv2-MIB', 'snmpInGenErrs')
snmpInGenErrs.syntax += 1
raise error.StatusInformation(errorIndication=errind.invalidMsg)
else:
@@ -616,7 +676,9 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
usmUserPrivProtocol = nopriv.NoPriv.serviceID
usmUserAuthKeyLocalized = usmUserPrivKeyLocalized = None
- debug.logger & debug.flagSM and debug.logger('processIncomingMsg: now have usmUserName %r usmUserSecurityName %r usmUserAuthProtocol %r usmUserPrivProtocol %r for msgUserName %r' % (usmUserName, usmUserSecurityName, usmUserAuthProtocol, usmUserPrivProtocol, msgUserName))
+ debug.logger & debug.flagSM and debug.logger(
+ 'processIncomingMsg: now have usmUserName %r usmUserSecurityName %r usmUserAuthProtocol %r usmUserPrivProtocol %r for msgUserName %r' % (
+ usmUserName, usmUserSecurityName, usmUserAuthProtocol, usmUserPrivProtocol, msgUserName))
# 3.2.11 (moved up here to let Reports be authenticated & encrypted)
self._cache.pop(securityStateReference)
@@ -647,7 +709,7 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
snmpEngine.observer.clearExecutionContext(
snmpEngine, 'rfc3414.processIncomingMsg'
)
-
+
# 3.2.5
if msgAuthoritativeEngineId == snmpEngineID:
# Authoritative SNMP engine: make sure securityLevel is sufficient
@@ -671,9 +733,12 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
if usmUserPrivProtocol != nopriv.NoPriv.serviceID:
badSecIndication = 'noAuthNoPriv wanted while priv expected'
if badSecIndication:
- usmStatsUnsupportedSecLevels, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('__SNMP-USER-BASED-SM-MIB', 'usmStatsUnsupportedSecLevels')
+ usmStatsUnsupportedSecLevels, = mibBuilder.importSymbols(
+ '__SNMP-USER-BASED-SM-MIB', 'usmStatsUnsupportedSecLevels')
usmStatsUnsupportedSecLevels.syntax += 1
- debug.logger & debug.flagSM and debug.logger('processIncomingMsg: reporting inappropriate security level for user %s: %s' % (msgUserName, badSecIndication))
+ debug.logger & debug.flagSM and debug.logger(
+ 'processIncomingMsg: reporting inappropriate security level for user %s: %s' % (
+ msgUserName, badSecIndication))
raise error.StatusInformation(
errorIndication=errind.unsupportedSecurityLevel,
oid=usmStatsUnsupportedSecLevels.name,
@@ -695,14 +760,15 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
)
try:
- authenticatedWholeMsg = authHandler.authenticateIncomingMsg(
+ authHandler.authenticateIncomingMsg(
usmUserAuthKeyLocalized,
securityParameters.getComponentByPosition(4),
wholeMsg
)
except error.StatusInformation:
- usmStatsWrongDigests, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('__SNMP-USER-BASED-SM-MIB', 'usmStatsWrongDigests')
+ usmStatsWrongDigests, = mibBuilder.importSymbols(
+ '__SNMP-USER-BASED-SM-MIB', 'usmStatsWrongDigests')
usmStatsWrongDigests.syntax += 1
raise error.StatusInformation(
errorIndication=errind.authenticationFailure,
@@ -730,38 +796,45 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
self.__timelineExpQueue[expireAt] = []
self.__timelineExpQueue[expireAt].append(msgAuthoritativeEngineId)
- debug.logger & debug.flagSM and debug.logger('processIncomingMsg: store timeline for securityEngineID %r' % (msgAuthoritativeEngineId,))
+ debug.logger & debug.flagSM and debug.logger(
+ 'processIncomingMsg: store timeline for securityEngineID %r' % (msgAuthoritativeEngineId,))
# 3.2.7
if securityLevel == 3 or securityLevel == 2:
if msgAuthoritativeEngineId == snmpEngineID:
# Authoritative SNMP engine: use local notion (SF bug #1649032)
- (snmpEngineBoots, snmpEngineTime) = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('__SNMP-FRAMEWORK-MIB', 'snmpEngineBoots', 'snmpEngineTime')
+ (snmpEngineBoots,
+ snmpEngineTime) = mibBuilder.importSymbols(
+ '__SNMP-FRAMEWORK-MIB', 'snmpEngineBoots', 'snmpEngineTime')
snmpEngineBoots = snmpEngineBoots.syntax
snmpEngineTime = snmpEngineTime.syntax.clone()
idleTime = 0
- debug.logger & debug.flagSM and debug.logger('processIncomingMsg: read snmpEngineBoots (%s), snmpEngineTime (%s) from LCD' % (snmpEngineBoots, snmpEngineTime))
+ debug.logger & debug.flagSM and debug.logger(
+ 'processIncomingMsg: read snmpEngineBoots (%s), snmpEngineTime (%s) from LCD' % (
+ snmpEngineBoots, snmpEngineTime))
else:
# Non-authoritative SNMP engine: use cached estimates
if msgAuthoritativeEngineId in self.__timeline:
(snmpEngineBoots, snmpEngineTime,
latestReceivedEngineTime,
latestUpdateTimestamp) = self.__timeline[
- msgAuthoritativeEngineId
- ]
+ msgAuthoritativeEngineId
+ ]
# time passed since last talk with this SNMP engine
- idleTime = int(time.time())-latestUpdateTimestamp
- debug.logger & debug.flagSM and debug.logger('processIncomingMsg: read timeline snmpEngineBoots %s snmpEngineTime %s for msgAuthoritativeEngineId %r, idle time %s secs' % (snmpEngineBoots, snmpEngineTime, msgAuthoritativeEngineId, idleTime))
+ idleTime = int(time.time()) - latestUpdateTimestamp
+ debug.logger & debug.flagSM and debug.logger(
+ 'processIncomingMsg: read timeline snmpEngineBoots %s snmpEngineTime %s for msgAuthoritativeEngineId %r, idle time %s secs' % (
+ snmpEngineBoots, snmpEngineTime, msgAuthoritativeEngineId, idleTime))
else:
raise error.ProtocolError('Peer SNMP engine info missing')
# 3.2.7a
if msgAuthoritativeEngineId == snmpEngineID:
if snmpEngineBoots == 2147483647 or \
- snmpEngineBoots != msgAuthoritativeEngineBoots or \
- abs(idleTime + int(snmpEngineTime) - \
- int(msgAuthoritativeEngineTime)) > 150:
- usmStatsNotInTimeWindows, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('__SNMP-USER-BASED-SM-MIB', 'usmStatsNotInTimeWindows')
+ snmpEngineBoots != msgAuthoritativeEngineBoots or \
+ abs(idleTime + int(snmpEngineTime) - int(msgAuthoritativeEngineTime)) > 150:
+ usmStatsNotInTimeWindows, = mibBuilder.importSymbols(
+ '__SNMP-USER-BASED-SM-MIB', 'usmStatsNotInTimeWindows')
usmStatsNotInTimeWindows.syntax += 1
raise error.StatusInformation(
errorIndication=errind.notInTimeWindow,
@@ -776,9 +849,10 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
# 3.2.7b
else:
# 3.2.7b.1
+ # noinspection PyUnboundLocalVariable
if msgAuthoritativeEngineBoots > snmpEngineBoots or \
- msgAuthoritativeEngineBoots == snmpEngineBoots and \
- msgAuthoritativeEngineTime > latestReceivedEngineTime:
+ msgAuthoritativeEngineBoots == snmpEngineBoots and \
+ msgAuthoritativeEngineTime > latestReceivedEngineTime:
self.__timeline[msgAuthoritativeEngineId] = (
msgAuthoritativeEngineBoots,
msgAuthoritativeEngineTime,
@@ -790,14 +864,15 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
self.__timelineExpQueue[expireAt] = []
self.__timelineExpQueue[expireAt].append(msgAuthoritativeEngineId)
- debug.logger & debug.flagSM and debug.logger('processIncomingMsg: stored timeline msgAuthoritativeEngineBoots %s msgAuthoritativeEngineTime %s for msgAuthoritativeEngineId %r' % (msgAuthoritativeEngineBoots, msgAuthoritativeEngineTime, msgAuthoritativeEngineId))
+ debug.logger & debug.flagSM and debug.logger(
+ 'processIncomingMsg: stored timeline msgAuthoritativeEngineBoots %s msgAuthoritativeEngineTime %s for msgAuthoritativeEngineId %r' % (
+ msgAuthoritativeEngineBoots, msgAuthoritativeEngineTime, msgAuthoritativeEngineId))
# 3.2.7b.2
if snmpEngineBoots == 2147483647 or \
- msgAuthoritativeEngineBoots < snmpEngineBoots or \
- msgAuthoritativeEngineBoots == snmpEngineBoots and \
- abs(idleTime + int(snmpEngineTime) - \
- int(msgAuthoritativeEngineTime)) > 150:
+ msgAuthoritativeEngineBoots < snmpEngineBoots or \
+ msgAuthoritativeEngineBoots == snmpEngineBoots and \
+ abs(idleTime + int(snmpEngineTime) - int(msgAuthoritativeEngineTime)) > 150:
raise error.StatusInformation(
errorIndication=errind.notInTimeWindow
)
@@ -811,7 +886,7 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
errorIndication=errind.decryptionError
)
encryptedPDU = scopedPduData.getComponentByPosition(1)
- if encryptedPDU is None: # no ciphertext
+ if encryptedPDU is None: # no ciphertext
raise error.StatusInformation(
errorIndication=errind.decryptionError
)
@@ -824,10 +899,12 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
securityParameters.getComponentByPosition(5)),
encryptedPDU
)
- debug.logger & debug.flagSM and debug.logger('processIncomingMsg: PDU deciphered into %s' % debug.hexdump(decryptedData))
+ debug.logger & debug.flagSM and debug.logger(
+ 'processIncomingMsg: PDU deciphered into %s' % debug.hexdump(decryptedData))
except error.StatusInformation:
- usmStatsDecryptionErrors, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('__SNMP-USER-BASED-SM-MIB', 'usmStatsDecryptionErrors')
+ usmStatsDecryptionErrors, = mibBuilder.importSymbols(
+ '__SNMP-USER-BASED-SM-MIB', 'usmStatsDecryptionErrors')
usmStatsDecryptionErrors.syntax += 1
raise error.StatusInformation(
errorIndication=errind.decryptionError,
@@ -845,7 +922,8 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
asn1Spec=scopedPduSpec)
except PyAsn1Error:
- debug.logger & debug.flagSM and debug.logger('processIncomingMsg: scopedPDU decoder failed %s' % sys.exc_info()[0])
+ debug.logger & debug.flagSM and debug.logger(
+ 'processIncomingMsg: scopedPDU decoder failed %s' % sys.exc_info()[0])
raise error.StatusInformation(
errorIndication=errind.decryptionError
)
@@ -862,16 +940,20 @@ class SnmpUSMSecurityModel(AbstractSecurityModel):
errorIndication=errind.decryptionError
)
- debug.logger & debug.flagSM and debug.logger('processIncomingMsg: scopedPDU decoded %s' % scopedPDU.prettyPrint())
+ debug.logger & debug.flagSM and debug.logger(
+ 'processIncomingMsg: scopedPDU decoded %s' % scopedPDU.prettyPrint())
# 3.2.10
securityName = usmUserSecurityName
- debug.logger & debug.flagSM and debug.logger('processIncomingMsg: cached msgUserName %s info by securityStateReference %s' % (msgUserName, securityStateReference))
+ debug.logger & debug.flagSM and debug.logger(
+ 'processIncomingMsg: cached msgUserName %s info by securityStateReference %s' % (
+ msgUserName, securityStateReference))
# Delayed to include details
if not msgUserName and not msgAuthoritativeEngineId:
- usmStatsUnknownUserNames, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('__SNMP-USER-BASED-SM-MIB', 'usmStatsUnknownUserNames')
+ usmStatsUnknownUserNames, = mibBuilder.importSymbols(
+ '__SNMP-USER-BASED-SM-MIB', 'usmStatsUnknownUserNames')
usmStatsUnknownUserNames.syntax += 1
raise error.StatusInformation(
errorIndication=errind.unknownSecurityName,
diff --git a/pysnmp/proto/secmod/rfc3826/priv/aes.py b/pysnmp/proto/secmod/rfc3826/priv/aes.py
index ef22f470..e6541881 100644
--- a/pysnmp/proto/secmod/rfc3826/priv/aes.py
+++ b/pysnmp/proto/secmod/rfc3826/priv/aes.py
@@ -18,31 +18,34 @@ except ImportError:
random.seed()
+
# RFC3826
#
class Aes(base.AbstractEncryptionService):
- serviceID = (1, 3, 6, 1, 6, 3, 10, 1, 2, 4) # usmAesCfb128Protocol
+ serviceID = (1, 3, 6, 1, 6, 3, 10, 1, 2, 4) # usmAesCfb128Protocol
keySize = 16
_localInt = random.randrange(0, 0xffffffffffffffff)
+
# 3.1.2.1
def __getEncryptionKey(self, privKey, snmpEngineBoots, snmpEngineTime):
- salt = [self._localInt>>56&0xff,
- self._localInt>>48&0xff,
- self._localInt>>40&0xff,
- self._localInt>>32&0xff,
- self._localInt>>24&0xff,
- self._localInt>>16&0xff,
- self._localInt>>8&0xff,
- self._localInt&0xff]
+ salt = [self._localInt >> 56 & 0xff,
+ self._localInt >> 48 & 0xff,
+ self._localInt >> 40 & 0xff,
+ self._localInt >> 32 & 0xff,
+ self._localInt >> 24 & 0xff,
+ self._localInt >> 16 & 0xff,
+ self._localInt >> 8 & 0xff,
+ self._localInt & 0xff]
if self._localInt == 0xffffffffffffffff:
self._localInt = 0
else:
self._localInt += 1
- return self.__getDecryptionKey(privKey, snmpEngineBoots, snmpEngineTime, salt) + (univ.OctetString(salt).asOctets(),)
+ return self.__getDecryptionKey(privKey, snmpEngineBoots, snmpEngineTime, salt) + (
+ univ.OctetString(salt).asOctets(),)
def __getDecryptionKey(self, privKey, snmpEngineBoots,
snmpEngineTime, salt):
@@ -50,14 +53,14 @@ class Aes(base.AbstractEncryptionService):
int(snmpEngineBoots), int(snmpEngineTime), salt
)
- iv = [snmpEngineBoots>>24&0xff,
- snmpEngineBoots>>16&0xff,
- snmpEngineBoots>>8&0xff,
- snmpEngineBoots&0xff,
- snmpEngineTime>>24&0xff,
- snmpEngineTime>>16&0xff,
- snmpEngineTime>>8&0xff,
- snmpEngineTime&0xff] + salt
+ iv = [snmpEngineBoots >> 24 & 0xff,
+ snmpEngineBoots >> 16 & 0xff,
+ snmpEngineBoots >> 8 & 0xff,
+ snmpEngineBoots & 0xff,
+ snmpEngineTime >> 24 & 0xff,
+ snmpEngineTime >> 16 & 0xff,
+ snmpEngineTime >> 8 & 0xff,
+ snmpEngineTime & 0xff] + salt
return privKey[:self.keySize].asOctets(), univ.OctetString(iv).asOctets()
@@ -100,7 +103,7 @@ class Aes(base.AbstractEncryptionService):
aesObj = AES.new(aesKey, AES.MODE_CFB, iv, segment_size=128)
# PyCrypto seems to require padding
- dataToEncrypt = dataToEncrypt + univ.OctetString((0,) * (16-len(dataToEncrypt)%16)).asOctets()
+ dataToEncrypt = dataToEncrypt + univ.OctetString((0,) * (16 - len(dataToEncrypt) % 16)).asOctets()
ciphertext = aesObj.encrypt(dataToEncrypt)
@@ -130,7 +133,7 @@ class Aes(base.AbstractEncryptionService):
aesObj = AES.new(aesKey, AES.MODE_CFB, iv, segment_size=128)
# PyCrypto seems to require padding
- encryptedData = encryptedData + univ.OctetString((0,) * (16-len(encryptedData)%16)).asOctets()
+ encryptedData = encryptedData + univ.OctetString((0,) * (16 - len(encryptedData) % 16)).asOctets()
# 3.3.2.4-6
return aesObj.decrypt(encryptedData.asOctets())