diff options
author | elie <elie> | 2012-08-12 06:52:29 +0000 |
---|---|---|
committer | elie <elie> | 2012-08-12 06:52:29 +0000 |
commit | 7ccc0a0962d881352f308472d04993b0b9cfb3a9 (patch) | |
tree | 03d2f1e694ec8e1716494dcacb798a0d8f664825 | |
parent | 30362e91d32ae26142cdac9d3836edee70c1a4af (diff) | |
download | pysnmp-7ccc0a0962d881352f308472d04993b0b9cfb3a9.tar.gz |
auth & target configuration containers moved to separate modules
-rw-r--r-- | CHANGES | 2 | ||||
-rw-r--r-- | pysnmp/entity/rfc3413/oneliner/auth.py | 104 | ||||
-rw-r--r-- | pysnmp/entity/rfc3413/oneliner/cmdgen.py | 166 | ||||
-rw-r--r-- | pysnmp/entity/rfc3413/oneliner/mibvar.py | 34 | ||||
-rw-r--r-- | pysnmp/entity/rfc3413/oneliner/ntforg.py | 30 | ||||
-rw-r--r-- | pysnmp/entity/rfc3413/oneliner/target.py | 65 |
6 files changed, 203 insertions, 198 deletions
@@ -67,6 +67,8 @@ Revision 4.2.3 impementations. The former just process a single interaction and complete while the latter run as many interactions as user callback function indicates to. +- Auth & target configuration container classes moved to their separate + modules at oneliner API. - Built-in debugger now supports negating debugging categories. - Fix to CommandGenerator's SNMPv3 engine autodiscovery algorithm when retryCount is administratively set to 0. diff --git a/pysnmp/entity/rfc3413/oneliner/auth.py b/pysnmp/entity/rfc3413/oneliner/auth.py new file mode 100644 index 0000000..ff2c0af --- /dev/null +++ b/pysnmp/entity/rfc3413/oneliner/auth.py @@ -0,0 +1,104 @@ +from pysnmp.entity import config +from pyasn1.compat.octets import null + +class CommunityData: + mpModel = 1 # Default is SMIv2 + securityModel = mpModel + 1 + securityLevel = 'noAuthNoPriv' + contextName = null + tag = null + def __init__(self, securityName, communityName=None, mpModel=None, + contextEngineId=None, contextName=None, tag=None): + self.securityName = securityName + self.communityName = communityName + if mpModel is not None: + self.mpModel = mpModel + self.securityModel = mpModel + 1 + self.contextEngineId = contextEngineId + if contextName is not None: + self.contextName = contextName + if tag is not None: + self.tag = tag + # Autogenerate securityName if not specified + if communityName is None: + self.communityName = securityName + self.securityName = 's%s' % hash( + ( securityName, self.mpModel, + self.contextEngineId, self.contextName, self.tag ) + ) + + def __repr__(self): + return '%s("%s", <COMMUNITY>, %r, %r, %r, %r)' % ( + self.__class__.__name__, + self.securityName, + self.mpModel, + self.contextEngineId, + self.contextName, + self.tag + ) + + def __hash__(self): return hash(self.securityName) + + def __eq__(self, other): return self.securityName == other + def __ne__(self, other): return self.securityName != other + def __lt__(self, other): return self.securityName < other + def __le__(self, other): return self.securityName <= other + def __gt__(self, other): return self.securityName > other + def __ge__(self, other): return self.securityName >= other + +class UsmUserData: + authKey = privKey = None + authProtocol = config.usmNoAuthProtocol + privProtocol = config.usmNoPrivProtocol + securityLevel = 'noAuthNoPriv' + securityModel = 3 + mpModel = 3 + contextName = null + def __init__(self, securityName, + authKey=None, privKey=None, + authProtocol=None, privProtocol=None, + contextEngineId=None, contextName=None): + self.securityName = securityName + + if authKey is not None: + self.authKey = authKey + if authProtocol is None: + self.authProtocol = config.usmHMACMD5AuthProtocol + else: + self.authProtocol = authProtocol + if self.securityLevel != 'authPriv': + self.securityLevel = 'authNoPriv' + + if privKey is not None: + self.privKey = privKey + if self.authProtocol == config.usmNoAuthProtocol: + raise error.PySnmpError('Privacy implies authenticity') + self.securityLevel = 'authPriv' + if privProtocol is None: + self.privProtocol = config.usmDESPrivProtocol + else: + self.privProtocol = privProtocol + + self.contextEngineId = contextEngineId + if contextName is not None: + self.contextName = contextName + + def __repr__(self): + return '%s("%s", <AUTHKEY>, <PRIVKEY>, %r, %r, %r, %r)' % ( + self.__class__.__name__, + self.securityName, + self.authProtocol, + self.privProtocol, + self.contextEngineId, + self.contextName + ) + + def __hash__(self): return hash(self.securityName) + + def __eq__(self, other): return self.securityName == other + def __ne__(self, other): return self.securityName != other + def __lt__(self, other): return self.securityName < other + def __le__(self, other): return self.securityName <= other + def __gt__(self, other): return self.securityName > other + def __ge__(self, other): return self.securityName >= other + diff --git a/pysnmp/entity/rfc3413/oneliner/cmdgen.py b/pysnmp/entity/rfc3413/oneliner/cmdgen.py index 4398730..1be2d03 100644 --- a/pysnmp/entity/rfc3413/oneliner/cmdgen.py +++ b/pysnmp/entity/rfc3413/oneliner/cmdgen.py @@ -1,8 +1,9 @@ -import socket, sys from pysnmp.entity import engine, config from pysnmp.entity.rfc3413 import cmdgen from pysnmp.entity.rfc3413.oneliner.mibvar import MibVariable -from pysnmp.carrier.asynsock.dgram import udp, udp6, unix +from pysnmp.entity.rfc3413.oneliner.auth import CommunityData, UsmUserData +from pysnmp.entity.rfc3413.oneliner.target import UdpTransportTarget, \ + Udp6TransportTarget, UnixTransportTarget from pysnmp.proto import rfc1905, errind from pysnmp.smi import view from pysnmp import nextid, error @@ -24,167 +25,6 @@ usmNoPrivProtocol = config.usmNoPrivProtocol nextID = nextid.Integer(0xffffffff) -class CommunityData: - mpModel = 1 # Default is SMIv2 - securityModel = mpModel + 1 - securityLevel = 'noAuthNoPriv' - contextName = null - tag = null - def __init__(self, securityName, communityName=None, mpModel=None, - contextEngineId=None, contextName=None, tag=None): - self.securityName = securityName - self.communityName = communityName - if mpModel is not None: - self.mpModel = mpModel - self.securityModel = mpModel + 1 - self.contextEngineId = contextEngineId - if contextName is not None: - self.contextName = contextName - if tag is not None: - self.tag = tag - # Autogenerate securityName if not specified - if communityName is None: - self.communityName = securityName - self.securityName = 's%s' % hash( - ( securityName, self.mpModel, - self.contextEngineId, self.contextName, self.tag ) - ) - - def __repr__(self): - return '%s("%s", <COMMUNITY>, %r, %r, %r, %r)' % ( - self.__class__.__name__, - self.securityName, - self.mpModel, - self.contextEngineId, - self.contextName, - self.tag - ) - - def __hash__(self): return hash(self.securityName) - - def __eq__(self, other): return self.securityName == other - def __ne__(self, other): return self.securityName != other - def __lt__(self, other): return self.securityName < other - def __le__(self, other): return self.securityName <= other - def __gt__(self, other): return self.securityName > other - def __ge__(self, other): return self.securityName >= other - -class UsmUserData: - authKey = privKey = None - authProtocol = usmNoAuthProtocol - privProtocol = usmNoPrivProtocol - securityLevel = 'noAuthNoPriv' - securityModel = 3 - mpModel = 3 - contextName = null - def __init__(self, securityName, - authKey=None, privKey=None, - authProtocol=None, privProtocol=None, - contextEngineId=None, contextName=None): - self.securityName = securityName - - if authKey is not None: - self.authKey = authKey - if authProtocol is None: - self.authProtocol = usmHMACMD5AuthProtocol - else: - self.authProtocol = authProtocol - if self.securityLevel != 'authPriv': - self.securityLevel = 'authNoPriv' - - if privKey is not None: - self.privKey = privKey - if self.authProtocol == usmNoAuthProtocol: - raise error.PySnmpError('Privacy implies authenticity') - self.securityLevel = 'authPriv' - if privProtocol is None: - self.privProtocol = usmDESPrivProtocol - else: - self.privProtocol = privProtocol - - self.contextEngineId = contextEngineId - if contextName is not None: - self.contextName = contextName - - def __repr__(self): - return '%s("%s", <AUTHKEY>, <PRIVKEY>, %r, %r, %r, %r)' % ( - self.__class__.__name__, - self.securityName, - self.authProtocol, - self.privProtocol, - self.contextEngineId, - self.contextName - ) - - def __hash__(self): return hash(self.securityName) - - def __eq__(self, other): return self.securityName == other - def __ne__(self, other): return self.securityName != other - def __lt__(self, other): return self.securityName < other - def __le__(self, other): return self.securityName <= other - def __gt__(self, other): return self.securityName > other - def __ge__(self, other): return self.securityName >= other - -class _AbstractTransportTarget: - transportDomain = protoTransport = None - def __init__(self, transportAddr, timeout=1, retries=5, tagList=null): - self.transportAddr = transportAddr - self.timeout = timeout - self.retries = retries - self.tagList = tagList - - def __repr__(self): return '%s(%r, %r, %r, %r)' % ( - self.__class__.__name__, self.transportAddr, - self.timeout, self.retries, self.tagList - ) - - def __hash__(self): return hash(self.transportAddr) - - def __eq__(self, other): return self.transportAddr == other - def __ne__(self, other): return self.transportAddr != other - def __lt__(self, other): return self.transportAddr < other - def __le__(self, other): return self.transportAddr <= other - def __gt__(self, other): return self.transportAddr > other - def __ge__(self, other): return self.transportAddr >= other - - def openClientMode(self): - self.transport = self.protoTransport().openClientMode() - return self.transport - -class UdpTransportTarget(_AbstractTransportTarget): - transportDomain = udp.domainName - protoTransport = udp.UdpSocketTransport - def __init__(self, transportAddr, timeout=1, retries=5, tagList=null): - _AbstractTransportTarget.__init__(self, transportAddr, timeout, - retries, tagList) - try: - self.transportAddr = socket.getaddrinfo(transportAddr[0], - transportAddr[1], - socket.AF_INET, - socket.SOCK_DGRAM, - socket.IPPROTO_UDP)[0][4][:2] - except socket.gaierror: - raise error.PySnmpError('Bad IPv4/UDP transport address %s: %s' % ('@'.join([ str(x) for x in transportAddr ]), sys.exc_info()[1])) - -class Udp6TransportTarget(_AbstractTransportTarget): - transportDomain = udp6.domainName - protoTransport = udp6.Udp6SocketTransport - def __init__(self, transportAddr, timeout=1, retries=5, tagList=null): - _AbstractTransportTarget.__init__(self, transportAddr, timeout, - retries, tagList) - try: - self.transportAddr = socket.getaddrinfo(transportAddr[0], - transportAddr[1], - socket.AF_INET6, - socket.SOCK_DGRAM, - socket.IPPROTO_UDP)[0][4][:2] - except socket.gaierror: - raise error.PySnmpError('Bad IPv6/UDP transport address %s: %s' % ('@'.join([ str(x) for x in transportAddr ]), sys.exc_info()[1])) - -class UnixTransportTarget(_AbstractTransportTarget): - transportDomain = unix.domainName - protoTransport = unix.UnixSocketTransport - class AsynCommandGenerator: _null = univ.Null('') def __init__(self, snmpEngine=None): diff --git a/pysnmp/entity/rfc3413/oneliner/mibvar.py b/pysnmp/entity/rfc3413/oneliner/mibvar.py index 531f011..c813267 100644 --- a/pysnmp/entity/rfc3413/oneliner/mibvar.py +++ b/pysnmp/entity/rfc3413/oneliner/mibvar.py @@ -32,7 +32,7 @@ class MibVariable: if self.__state & (self.stOidOnly | self.stClean): return self.__oid else: - raise PySnmpError('%s object not properly initialized' % self.__class__.__name__) + raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) def getLabel(self): if self.__state & self.stClean: @@ -190,98 +190,98 @@ class MibVariable: if self.__state & self.stOidOnly: return str(self.__oid) else: - raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) + raise PySnmpError('%s object not properly initialized' % self.__class__.__name__) def __getitem__(self, i): if self.__state & self.stOidOnly: return self.__oid[i] else: - raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) + raise PySnmpError('%s object not properly initialized' % self.__class__.__name__) def __eq__(self, other): if self.__state & self.stOidOnly: return self.__oid == other else: - raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) + raise PySnmpError('%s object not properly initialized' % self.__class__.__name__) def __ne__(self, other): if self.__state & self.stOidOnly: return self.__oid != other else: - raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) + raise PySnmpError('%s object not properly initialized' % self.__class__.__name__) def __lt__(self, other): if self.__state & self.stOidOnly: return self.__oid < other else: - raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) + raise PySnmpError('%s object not properly initialized' % self.__class__.__name__) def __le__(self, other): if self.__state & self.stOidOnly: return self.__oid <= other else: - raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) + raise PySnmpError('%s object not properly initialized' % self.__class__.__name__) def __gt__(self, other): if self.__state & self.stOidOnly: return self.__oid > other else: - raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) + raise PySnmpError('%s object not properly initialized' % self.__class__.__name__) def __ge__(self, other): if self.__state & self.stOidOnly: return self.__oid >= other else: - raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) + raise PySnmpError('%s object not properly initialized' % self.__class__.__name__) if sys.version_info[0] <= 2: def __nonzero__(self): if self.__state & self.stOidOnly: return bool(self.__oid) else: - raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) + raise PySnmpError('%s object not properly initialized' % self.__class__.__name__) else: def __bool__(self): if self.__state & self.stOidOnly: return bool(self.__oid) else: - raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) + raise PySnmpError('%s object not properly initialized' % self.__class__.__name__) def __hash__(self): if self.__state & self.stOidOnly: return hash(self.__oid) else: - raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) + raise PySnmpError('%s object not properly initialized' % self.__class__.__name__) def __len__(self): if self.__state & self.stOidOnly: return len(self.__oid) else: - raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) + raise PySnmpError('%s object not properly initialized' % self.__class__.__name__) def __index__(self, i): if self.__state & self.stOidOnly: return self.__oid[i] else: - raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) + raise PySnmpError('%s object not properly initialized' % self.__class__.__name__) def asTuple(self): if self.__state & self.stOidOnly: return self.__oid.asTuple() else: - raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) + raise PySnmpError('%s object not properly initialized' % self.__class__.__name__) def clone(self, *args): if self.__state & self.stOidOnly: return self.__oid.clone(*args) else: - raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) + raise PySnmpError('%s object not properly initialized' % self.__class__.__name__) def subtype(self, *args): if self.__state & self.stOidOnly: return self.__oid.subtype(*args) else: - raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) + raise PySnmpError('%s object not properly initialized' % self.__class__.__name__) def isPrefixOf(self, *args): if self.__state & self.stOidOnly: diff --git a/pysnmp/entity/rfc3413/oneliner/ntforg.py b/pysnmp/entity/rfc3413/oneliner/ntforg.py index 3572ac1..fe5f39b 100644 --- a/pysnmp/entity/rfc3413/oneliner/ntforg.py +++ b/pysnmp/entity/rfc3413/oneliner/ntforg.py @@ -3,29 +3,23 @@ from pysnmp import nextid from pysnmp.entity import config from pysnmp.entity.rfc3413 import ntforg, context from pysnmp.entity.rfc3413.oneliner.mibvar import MibVariable +from pysnmp.entity.rfc3413.oneliner.auth import CommunityData, UsmUserData +from pysnmp.entity.rfc3413.oneliner.target import UdpTransportTarget, \ + Udp6TransportTarget, UnixTransportTarget from pysnmp.entity.rfc3413.oneliner import cmdgen # Auth protocol -usmHMACMD5AuthProtocol = cmdgen.usmHMACMD5AuthProtocol -usmHMACSHAAuthProtocol = cmdgen.usmHMACSHAAuthProtocol -usmNoAuthProtocol = cmdgen.usmNoAuthProtocol +usmHMACMD5AuthProtocol = config.usmHMACMD5AuthProtocol +usmHMACSHAAuthProtocol = config.usmHMACSHAAuthProtocol +usmNoAuthProtocol = config.usmNoAuthProtocol # Privacy protocol -usmDESPrivProtocol = cmdgen.usmDESPrivProtocol -usm3DESEDEPrivProtocol = cmdgen.usm3DESEDEPrivProtocol -usmAesCfb128Protocol = cmdgen.usmAesCfb128Protocol -usmAesCfb192Protocol = cmdgen.usmAesCfb192Protocol -usmAesCfb256Protocol = cmdgen.usmAesCfb256Protocol -usmNoPrivProtocol = cmdgen.usmNoPrivProtocol - -# Credentials -CommunityData = cmdgen.CommunityData -UsmUserData = cmdgen.UsmUserData - -# Transports -UdpTransportTarget = cmdgen.UdpTransportTarget -Udp6TransportTarget = cmdgen.Udp6TransportTarget -UnixTransportTarget = cmdgen.UnixTransportTarget +usmDESPrivProtocol = config.usmDESPrivProtocol +usm3DESEDEPrivProtocol = config.usm3DESEDEPrivProtocol +usmAesCfb128Protocol = config.usmAesCfb128Protocol +usmAesCfb192Protocol = config.usmAesCfb192Protocol +usmAesCfb256Protocol = config.usmAesCfb256Protocol +usmNoPrivProtocol = config.usmNoPrivProtocol nextID = nextid.Integer(0xffffffff) diff --git a/pysnmp/entity/rfc3413/oneliner/target.py b/pysnmp/entity/rfc3413/oneliner/target.py new file mode 100644 index 0000000..a8249a9 --- /dev/null +++ b/pysnmp/entity/rfc3413/oneliner/target.py @@ -0,0 +1,65 @@ +import socket, sys +from pysnmp.carrier.asynsock.dgram import udp, udp6, unix +from pyasn1.compat.octets import null + +class _AbstractTransportTarget: + transportDomain = protoTransport = None + def __init__(self, transportAddr, timeout=1, retries=5, tagList=null): + self.transportAddr = transportAddr + self.timeout = timeout + self.retries = retries + self.tagList = tagList + + def __repr__(self): return '%s(%r, %r, %r, %r)' % ( + self.__class__.__name__, self.transportAddr, + self.timeout, self.retries, self.tagList + ) + + def __hash__(self): return hash(self.transportAddr) + + def __eq__(self, other): return self.transportAddr == other + def __ne__(self, other): return self.transportAddr != other + def __lt__(self, other): return self.transportAddr < other + def __le__(self, other): return self.transportAddr <= other + def __gt__(self, other): return self.transportAddr > other + def __ge__(self, other): return self.transportAddr >= other + + def openClientMode(self): + self.transport = self.protoTransport().openClientMode() + return self.transport + +class UdpTransportTarget(_AbstractTransportTarget): + transportDomain = udp.domainName + protoTransport = udp.UdpSocketTransport + def __init__(self, transportAddr, timeout=1, retries=5, tagList=null): + _AbstractTransportTarget.__init__(self, transportAddr, timeout, + retries, tagList) + try: + self.transportAddr = socket.getaddrinfo(transportAddr[0], + transportAddr[1], + socket.AF_INET, + socket.SOCK_DGRAM, + socket.IPPROTO_UDP)[0][4][:2] + except socket.gaierror: + raise error.PySnmpError('Bad IPv4/UDP transport address %s: %s' % ('@'.join([ str(x) for x in transportAddr ]), sys.exc_info()[1])) + +class Udp6TransportTarget(_AbstractTransportTarget): + transportDomain = udp6.domainName + protoTransport = udp6.Udp6SocketTransport + def __init__(self, transportAddr, timeout=1, retries=5, tagList=null): + _AbstractTransportTarget.__init__(self, transportAddr, timeout, + retries, tagList) + try: + self.transportAddr = socket.getaddrinfo(transportAddr[0], + transportAddr[1], + socket.AF_INET6, + socket.SOCK_DGRAM, + socket.IPPROTO_UDP)[0][4][:2] + except socket.gaierror: + raise error.PySnmpError('Bad IPv6/UDP transport address %s: %s' % ('@'.join([ str(x) for x in transportAddr ]), sys.exc_info()[1])) + +class UnixTransportTarget(_AbstractTransportTarget): + transportDomain = unix.domainName + protoTransport = unix.UnixSocketTransport + + |