From bea11dfe16e6714737b6f5016697206a038f051b Mon Sep 17 00:00:00 2001 From: elie Date: Sun, 31 May 2015 19:35:10 +0000 Subject: Major overhaul related to PySMI integration and Standard SNMP Applications API redesign: * MibVariable becomes ObjectIdentity and moves to pysnmp.smi.rfc1902 * ObjectType and NotificationType classes resempling corresponding MIB MACROs implemented * SNMP Standard Applications and examples modified to support ObjectType and NotificationType parameters --- pysnmp/entity/config.py | 25 +- pysnmp/entity/engine.py | 19 ++ pysnmp/entity/rfc3413/config.py | 32 +-- pysnmp/entity/rfc3413/ntforg.py | 129 +++++---- pysnmp/entity/rfc3413/oneliner/cmdgen.py | 87 +++--- pysnmp/entity/rfc3413/oneliner/mibvar.py | 315 +-------------------- pysnmp/entity/rfc3413/oneliner/ntforg.py | 86 +++--- pysnmp/smi/compiler.py | 2 +- pysnmp/smi/rfc1902.py | 463 +++++++++++++++++++++++++++++++ 9 files changed, 676 insertions(+), 482 deletions(-) create mode 100644 pysnmp/smi/rfc1902.py (limited to 'pysnmp') diff --git a/pysnmp/entity/config.py b/pysnmp/entity/config.py index f9018e9..085eea5 100644 --- a/pysnmp/entity/config.py +++ b/pysnmp/entity/config.py @@ -325,12 +325,17 @@ def addTransport(snmpEngine, transportDomain, transport): transport.protoTransportDispatcher() ) # here we note that we have created transportDispatcher automatically - snmpEngine.cache['automaticTransportDispatcher'] = 0 + snmpEngine.setUserContext(automaticTransportDispatcher=0) snmpEngine.transportDispatcher.registerTransport( transportDomain, transport ) - if 'automaticTransportDispatcher' in snmpEngine.cache: - snmpEngine.cache['automaticTransportDispatcher'] += 1 + automaticTransportDispatcher = snmpEngine.getUserContext( + 'automaticTransportDispatcher' + ) + if automaticTransportDispatcher is not None: + snmpEngine.setUserContext( + automaticTransportDispatcher=automaticTransportDispatcher+1 + ) def getTransport(snmpEngine, transportDomain): if not snmpEngine.transportDispatcher: @@ -346,12 +351,18 @@ def delTransport(snmpEngine, transportDomain): transport = getTransport(snmpEngine, transportDomain) snmpEngine.transportDispatcher.unregisterTransport(transportDomain) # automatically shutdown automatically created transportDispatcher - if 'automaticTransportDispatcher' in snmpEngine.cache: - snmpEngine.cache['automaticTransportDispatcher'] -= 1 - if not snmpEngine.cache['automaticTransportDispatcher']: + automaticTransportDispatcher = snmpEngine.getUserContext( + 'automaticTransportDispatcher' + ) + if automaticTransportDispatcher is not None: + automaticTransportDispatcher -= 1 + snmpEngine.setUserContext( + automaticTransportDispatcher=automaticTransportDispatcher + ) + if not automaticTransportDispatcher: snmpEngine.transportDispatcher.closeDispatcher() snmpEngine.unregisterTransportDispatcher() - del snmpEngine.cache['automaticTransportDispatcher'] + snmpEngine.delUserContext(automaticTransportDispatcher) return transport addSocketTransport = addTransport diff --git a/pysnmp/entity/engine.py b/pysnmp/entity/engine.py index ecc0aa5..d663d91 100644 --- a/pysnmp/entity/engine.py +++ b/pysnmp/entity/engine.py @@ -140,3 +140,22 @@ class SnmpEngine: self.transportDispatcher.unregisterRecvCbFun(recvId) self.transportDispatcher.unregisterTimerCbFun() self.transportDispatcher = None + + def getMibBuilder(self): + return self.msgAndPduDsp.mibInstrumController.mibBuilder + + # User app may attach opaque objects to SNMP Engine + def setUserContext(self, **kwargs): + self.cache.update( + dict([('__%s' % k, kwargs[k]) for k in kwargs]) + ) + + def getUserContext(self, arg): + return self.cache.get('__%s' % arg) + + def delUserContext(self, arg): + try: + del self.cache['__%s' % arg] + except KeyError: + pass + diff --git a/pysnmp/entity/rfc3413/config.py b/pysnmp/entity/rfc3413/config.py index 1124600..68218eb 100644 --- a/pysnmp/entity/rfc3413/config.py +++ b/pysnmp/entity/rfc3413/config.py @@ -9,10 +9,10 @@ def getTargetAddr(snmpEngine, snmpTargetAddrName): 'SNMP-TARGET-MIB', 'snmpTargetAddrEntry' ) - if 'getTargetAddr' not in snmpEngine.cache: - snmpEngine.cache['getTargetAddr'] = { 'id': -1 } - - cache = snmpEngine.cache['getTargetAddr'] + cache = snmpEngine.getUserContext('getTargetAddr') + if cache is None: + cache = { 'id': -1 } + snmpEngine.setUserContext(getTargetAddr=cache) if cache['id'] != snmpTargetAddrEntry.branchVersionId: cache['nameToTargetMap'] = {} @@ -98,10 +98,10 @@ def getTargetParams(snmpEngine, paramsName): 'SNMP-TARGET-MIB', 'snmpTargetParamsEntry' ) - if 'getTargetParams' not in snmpEngine.cache: - snmpEngine.cache['getTargetParams'] = { 'id': -1 } - - cache = snmpEngine.cache['getTargetParams'] + cache = snmpEngine.getUserContext('getTargetParams') + if cache is None: + cache = { 'id': -1 } + snmpEngine.setUserContext(getTargetParams=cache) if cache['id'] != snmpTargetParamsEntry.branchVersionId: cache['nameToParamsMap'] = {} @@ -178,10 +178,10 @@ def getNotificationInfo(snmpEngine, notificationTarget): 'SNMP-NOTIFICATION-MIB', 'snmpNotifyEntry' ) - if 'getNotificationInfo' not in snmpEngine.cache: - snmpEngine.cache['getNotificationInfo'] = { 'id': -1 } - - cache = snmpEngine.cache['getNotificationInfo'] + cache = snmpEngine.getUserContext('getNotificationInfo') + if cache is None: + cache = { 'id': -1 } + snmpEngine.setUserContext(getNotificationInfo=cache) if cache['id'] != snmpNotifyEntry.branchVersionId: cache['targetToNotifyMap'] = {} @@ -225,10 +225,10 @@ def getTargetNames(snmpEngine, tag): 'SNMP-TARGET-MIB', 'snmpTargetAddrEntry' ) - if 'getTargetNames' not in snmpEngine.cache: - snmpEngine.cache['getTargetNames'] = { 'id': -1 } - - cache = snmpEngine.cache['getTargetNames'] + cache = snmpEngine.getUserContext('getTargetNames') + if cache is None: + cache = { 'id': -1 } + snmpEngine.setUserContext(getTargetNames=cache) if cache['id'] == snmpTargetAddrEntry.branchVersionId: tagToTargetsMap = cache['tagToTargetsMap'] diff --git a/pysnmp/entity/rfc3413/ntforg.py b/pysnmp/entity/rfc3413/ntforg.py index 5018049..a3d7934 100644 --- a/pysnmp/entity/rfc3413/ntforg.py +++ b/pysnmp/entity/rfc3413/ntforg.py @@ -5,6 +5,7 @@ from pysnmp.proto.proxy import rfc2576 from pysnmp.proto import rfc3411 from pysnmp.proto.api import v2c from pysnmp.proto import error +from pysnmp.smi import view, rfc1902 from pysnmp import nextid from pysnmp import debug @@ -254,14 +255,12 @@ class NotificationOriginator: def sendVarBinds(self, snmpEngine, notificationTarget, - snmpContext, + contextEngineId, contextName, - notificationName, - instanceIndex, - additionalVarBinds=(), + varBinds=(), cbFun=None, cbCtx=None): - debug.logger & debug.flagApp and debug.logger('sendVarBinds: notificationTarget %s, notificationName %s, additionalVarBinds %s, contextName "%s", instanceIndex %s' % (notificationTarget, notificationName, additionalVarBinds, contextName, instanceIndex)) + debug.logger & debug.flagApp and debug.logger('sendVarBinds: notificationTarget %s, contextEngineId %s, contextName "%s", varBinds %s' % (notificationTarget, contextEngineId or '', contextName, varBinds)) if contextName: __SnmpAdminString, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('SNMP-FRAMEWORK-MIB', 'SnmpAdminString') @@ -277,10 +276,32 @@ class NotificationOriginator: debug.logger & debug.flagApp and debug.logger('sendVarBinds: sendRequestHandle %s, notifyTag %s, notifyType %s' % (sendRequestHandle, notifyTag, notifyType)) - contextMibInstrumCtl = snmpContext.getMibInstrum(contextName) - - additionalVarBinds = [ (v2c.ObjectIdentifier(x),y) for x,y in additionalVarBinds ] + varBinds = [ (v2c.ObjectIdentifier(x),y) for x,y in varBinds ] + # 3.3.2 & 3.3.3 + snmpTrapOID, sysUpTime = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('__SNMPv2-MIB', 'snmpTrapOID', 'sysUpTime') + + for idx in range(len(varBinds)): + if idx and varBinds[idx][0] == sysUpTime.getName(): + if varBinds[0][0] == sysUpTime.getName(): + varBinds[0] = varBinds[idx] + else: + varBinds.insert(0, varBinds[idx]) + del varBinds[idx] + + if varBinds[0][0] != sysUpTime.getName(): + varBinds.insert(0, (v2c.ObjectIdentifier(sysUpTime.getName()), + sysUpTime.getSyntax().clone())) + + if len(varBinds) < 2 or varBinds[1][0] != snmpTrapOID.getName(): + varBinds.insert(1, (v2c.ObjectIdentifier(snmpTrapOID.getName()), + snmpTrapOID.getSyntax())) + + debug.logger & debug.flagApp and debug.logger('sendVarBinds: final varBinds %s' % (varBinds,)) + + cbCtx = cbFun, cbCtx + cbFun = self.processResponseVarBinds + for targetAddrName in config.getTargetNames(snmpEngine, notifyTag): ( transportDomain, transportAddress, @@ -292,8 +313,6 @@ class NotificationOriginator: securityName, securityLevel ) = config.getTargetParams(snmpEngine, params) - debug.logger & debug.flagApp and debug.logger('sendVarBinds: sendRequestHandle %s, notifyTag %s yields: transportDomain %s, transportAddress %r, securityModel %s, securityName %s, securityLevel %s' % (sendRequestHandle, notifyTag, transportDomain, transportAddress, securityModel, securityName, securityLevel)) - # 3.3.1 XXX # XXX filtering's yet to be implemented # filterProfileName = config.getNotifyFilterProfile(params) @@ -302,57 +321,22 @@ class NotificationOriginator: # filterMask, # filterType ) = config.getNotifyFilter(filterProfileName) - varBinds = [] - - # 3.3.2 & 3.3.3 - sysUpTime, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('__SNMPv2-MIB', 'sysUpTime') - - for varName, varVal in additionalVarBinds: - if varName == sysUpTime.name: - varBinds.append((varName, varVal)) - break - if not varBinds: - varBinds.append((sysUpTime.name, - sysUpTime.syntax.clone())) # for actual value - - snmpTrapOid, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('__SNMPv2-MIB', 'snmpTrapOID') - if len(notificationName) == 2: # ('MIB', 'symbol') - notificationTypeObject, = contextMibInstrumCtl.mibBuilder.importSymbols(*notificationName) - varBinds.append((snmpTrapOid.name, v2c.ObjectIdentifier(notificationTypeObject.name))) - debug.logger & debug.flagApp and debug.logger('sendVarBinds: notification type object is %s' % notificationTypeObject) - for notificationObject in notificationTypeObject.getObjects(): - mibNode, = contextMibInstrumCtl.mibBuilder.importSymbols(*notificationObject) - if instanceIndex: - mibNode = mibNode.getNode(mibNode.name + instanceIndex) - else: - mibNode = mibNode.getNextNode(mibNode.name) - varBinds.extend( - contextMibInstrumCtl.readVars( - [ (mibNode.name, None) ] # XXX AC is missing - ) - ) - debug.logger & debug.flagApp and debug.logger('sendVarBinds: processed notification object %s, instance index %s, var-bind %s' % (notificationObject, instanceIndex is None and "" or instanceIndex, mibNode)) - elif notificationName: # numeric OID - varBinds.append( - (snmpTrapOid.name, - snmpTrapOid.syntax.clone(notificationName)) - ) - else: - varBinds.append((snmpTrapOid.name, snmpTrapOid.syntax)) + debug.logger & debug.flagApp and debug.logger('sendVarBinds: sendRequestHandle %s, notifyTag %s yields: transportDomain %s, transportAddress %r, securityModel %s, securityName %s, securityLevel %s' % (sendRequestHandle, notifyTag, transportDomain, transportAddress, securityModel, securityName, securityLevel)) - for varName, varVal in additionalVarBinds: - if varName in (sysUpTime.name, snmpTrapOid.name): + for varName, varVal in varBinds: + if varName in (sysUpTime.name, snmpTrapOID.name): continue try: snmpEngine.accessControlModel[self.acmID].isAccessAllowed( snmpEngine, securityModel, securityName, securityLevel, 'notify', contextName, varName ) + + debug.logger & debug.flagApp and debug.logger('sendVarBinds: ACL succeeded for OID %s securityName %s' % (varName, securityName)) + except error.StatusInformation: - debug.logger & debug.flagApp and debug.logger('sendVarBinds: OID %s not allowed for %s, droppping notification' % (varName, securityName)) + debug.logger & debug.flagApp and debug.logger('sendVarBinds: ACL denied access for OID %s securityName %s, droppping notification' % (varName, securityName)) return - else: - varBinds.append((varName, varVal)) # 3.3.4 if notifyType == 1: @@ -365,14 +349,11 @@ class NotificationOriginator: v2c.apiPDU.setDefaults(pdu) v2c.apiPDU.setVarBinds(pdu, varBinds) - cbCtx = cbFun, cbCtx - cbFun = self.processResponseVarBinds - # 3.3.5 try: sendPduHandle = self.sendPdu(snmpEngine, targetAddrName, - snmpContext.contextEngineId, + contextEngineId, contextName, pdu, cbFun, @@ -380,7 +361,7 @@ class NotificationOriginator: except error.StatusInformation: statusInformation = sys.exc_info()[1] - debug.logger & debug.flagApp and debug.logger('sendVarBinds: sendRequestHandle %s: sendVarBindsPdu() failed with %r' % (sendRequestHandle, statusInformation)) + debug.logger & debug.flagApp and debug.logger('sendVarBinds: sendRequestHandle %s: sendPdu() failed with %r' % (sendRequestHandle, statusInformation)) if sendRequestHandle not in self.__pendingNotifications or \ not self.__pendingNotifications[sendRequestHandle]: if sendRequestHandle in self.__pendingNotifications: @@ -440,13 +421,39 @@ def _sendNotification(self, cbCtx = cbFun, cbCtx cbFun = _sendNotificationCbFun + # + # Here we first expand trap OID into associated OBJECTS + # and then look them up at context-specific MIB + # + + mibViewController = snmpEngine.getUserContext('mibViewController') + if not mibViewController: + mibViewController = view.MibViewController(snmpEngine.getMibBuilder()) + snmpEngine.setUserContext(mibViewController=mibViewController) + + # Support the following syntax: + # '1.2.3.4' + # (1,2,3,4) + # ('MIB', 'symbol') + if isinstance(notificationName, (tuple, list)) and \ + notificationName and isinstance(notificationName[0], str): + notificationName = rfc1902.ObjectIdentity(*notificationName) + else: + notificationName = rfc1902.ObjectIdentity(notificationName) + + varBinds = rfc1902.NotificationType( + notificationName, instanceIndex=instanceIndex + ).resolveWithMib(mibViewController) + + mibInstrumController = self.snmpContext.getMibInstrum(contextName) + + varBinds = varBinds[:1] + mibInstrumController.readVars(varBinds[1:]) + return self.sendVarBinds(snmpEngine, notificationTarget, - self.snmpContext, + self.snmpContext.contextEngineId, contextName, - notificationName, - instanceIndex, - additionalVarBinds, + varBinds + list(additionalVarBinds), cbFun, cbCtx) diff --git a/pysnmp/entity/rfc3413/oneliner/cmdgen.py b/pysnmp/entity/rfc3413/oneliner/cmdgen.py index 00d8aa0..2e2a88c 100644 --- a/pysnmp/entity/rfc3413/oneliner/cmdgen.py +++ b/pysnmp/entity/rfc3413/oneliner/cmdgen.py @@ -1,6 +1,6 @@ from pysnmp.entity import engine, config from pysnmp.entity.rfc3413 import cmdgen -from pysnmp.entity.rfc3413.oneliner.mibvar import MibVariable +from pysnmp.smi.rfc1902 import ObjectIdentity, ObjectType from pysnmp.entity.rfc3413.oneliner.auth import CommunityData, UsmUserData from pysnmp.entity.rfc3413.oneliner.target import UdpTransportTarget, \ Udp6TransportTarget, UnixTransportTarget @@ -10,6 +10,8 @@ from pysnmp.smi import view from pysnmp import nextid, error from pyasn1.type import univ, base from pyasn1.compat.octets import null +# obsolete, compatibility symbols +from pysnmp.entity.rfc3413.oneliner.mibvar import MibVariable # Auth protocol usmHMACMD5AuthProtocol = config.usmHMACMD5AuthProtocol @@ -30,21 +32,22 @@ class AsyncCommandGenerator: _null = univ.Null('') def _getCache(self, snmpEngine): - if 'cmdgen' not in snmpEngine.cache: - snmpEngine.cache['cmdgen'] = { - 'auth': {}, - 'parm': {}, - 'tran': {}, - 'addr': {}, - } - return snmpEngine.cache['cmdgen'] + cache = snmpEngine.getUserContext('cmdgen_cache') + if cache is None: + cache = { + 'auth': {}, 'parm': {}, 'tran': {}, 'addr': {} + } + snmpEngine.setUserContext(cmdgen_cache=cache) + return cache def getMibViewController(self, snmpEngine): - if 'mibViewController' not in snmpEngine.cache: - snmpEngine.cache['mibViewController'] = view.MibViewController( - snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder + mibViewController = snmpEngine.getUserContext('mibViewController') + if not mibViewController: + mibViewController = view.MibViewController( + snmpEngine.getMibBuilder() ) - return snmpEngine.cache['mibViewController'] + snmpEngine.setUserContext(mibViewController=mibViewController) + return mibViewController def cfgCmdGen(self, snmpEngine, authData, transportTarget): cache = self._getCache(snmpEngine) @@ -203,59 +206,35 @@ class AsyncCommandGenerator: return addrNames, paramsNames - def makeVarBinds(self, snmpEngine, varBinds, oidOnly=False): + def makeVarBinds(self, snmpEngine, varBinds): mibViewController = self.getMibViewController(snmpEngine) __varBinds = [] - for varName, varVal in varBinds: - if isinstance(varName, MibVariable): - if oidOnly or isinstance(varVal, base.AbstractSimpleAsn1Item): - varName.resolveWithMib(mibViewController, oidOnly=True) - else: - varName.resolveWithMib(mibViewController) - varVal = varName.getMibNode().getSyntax().clone(varVal) - elif isinstance(varName[0], tuple): # legacy - varName = MibVariable(varName[0][0], varName[0][1], *varName[1:]).resolveWithMib(mibViewController) - if not oidOnly and \ - not isinstance(varVal, base.AbstractSimpleAsn1Item): - varVal = varName.getMibNode().getSyntax().clone(varVal) + for varBind in varBinds: + if isinstance(varBind, ObjectType): + pass + elif isinstance(varBind[0], ObjectIdentity): + varBind = ObjectType(*varBind) + elif isinstance(varBind[0][0], tuple): # legacy + varBind = ObjectType(ObjectIdentity(varBind[0][0][0], varBind[0][0][1], *varBind[0][1:]), varBind[1]) else: - if oidOnly or isinstance(varVal, base.AbstractSimpleAsn1Item): - varName = MibVariable(varName).resolveWithMib(mibViewController, oidOnly=True) - else: - varName = MibVariable(varName).resolveWithMib(mibViewController) - try: - varVal = varName.getMibNode().getSyntax().clone(varVal) - except: - raise error.PySnmpError('Unresolved SNMP value type for OID %s (MIB not loaded?)' % (varName,)) + varBind = ObjectType(ObjectIdentity(varBind[0]), varBind[1]) - __varBinds.append((varName, varVal)) + __varBinds.append(varBind.resolveWithMib(mibViewController)) return __varBinds def unmakeVarBinds(self, snmpEngine, varBinds, lookupNames, lookupValues): if lookupNames or lookupValues: mibViewController = self.getMibViewController(snmpEngine) - _varBinds = [] - for name, value in varBinds: - varName = MibVariable(name).resolveWithMib(mibViewController) - if lookupNames: - name = varName - if lookupValues: - if value.tagSet not in (rfc1905.NoSuchObject.tagSet, - rfc1905.NoSuchInstance.tagSet, - rfc1905.EndOfMibView.tagSet): - if varName.isFullyResolved(): - value = varName.getMibNode().getSyntax().clone(value) - _varBinds.append((name, value)) - return _varBinds - else: - return varBinds + varBinds = [ ObjectType(ObjectIdentity(x[0]), x[1]).resolveWithMib(mibViewController) for x in varBinds ] + + return varBinds def makeVarBindsHead(self, snmpEngine, varNames): return [ x[0] for x in self.makeVarBinds( snmpEngine, - [ (x, univ.Null('')) for x in varNames ], oidOnly=True + [ (x, univ.Null('')) for x in varNames ] ) ] @@ -415,12 +394,12 @@ class AsynCommandGenerator: # compatibility stub def makeReadVarBinds(self, varNames): return self.makeVarBinds( - [ (x, univ.Null('')) for x in varNames ], oidOnly=True + [ (x, univ.Null('')) for x in varNames ] ) - def makeVarBinds(self, varBinds, oidOnly=False): + def makeVarBinds(self, varBinds): return self.__asyncCmdGen.makeVarBinds( - self.snmpEngine, varBinds, oidOnly + self.snmpEngine, varBinds ) def unmakeVarBinds(self, varBinds, lookupNames, lookupValues): diff --git a/pysnmp/entity/rfc3413/oneliner/mibvar.py b/pysnmp/entity/rfc3413/oneliner/mibvar.py index cb7b163..5e069ab 100644 --- a/pysnmp/entity/rfc3413/oneliner/mibvar.py +++ b/pysnmp/entity/rfc3413/oneliner/mibvar.py @@ -1,313 +1,6 @@ -from pysnmp.proto import rfc1902 -from pysnmp.smi.builder import ZipMibSource -from pysnmp.smi.compiler import addMibCompiler -from pysnmp.error import PySnmpError -from pyasn1.error import PyAsn1Error +# Obsolete interfaces, use pysnmp.smi.rfc1902 instead. +from pysnmp.smi import rfc1902 -# -# An OID-like object that embeds MIB resolution. -# -# Valid initializers include: -# MibVariable('1.3.6.1.2.1.1.1.0'), -# MibVariable('iso.org.dod.internet.mgmt.mib-2.system.sysDescr.0') -# MibVariable('SNMPv2-MIB', 'system'), -# MibVariable('SNMPv2-MIB', 'sysDescr', 0), -# MibVariable('IP-MIB', 'ipAdEntAddr', '127.0.0.1', 123), -# - -class MibVariable: - stDirty, stOidOnly, stClean, stUnresolved = 1, 2, 4, 8 - - def __init__(self, *args): - self.__args = args - self.__mibSourcesToAdd = self.__modNamesToLoad = None - self.__asn1SourcesToAdd = None - self.__state = self.stDirty - - # - # public API - # - def getMibSymbol(self): - if self.__state & self.stClean: - return self.__modName, self.__symName, self.__indices - else: - raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) - - def getOid(self): - if self.__state & (self.stOidOnly | self.stClean): - return self.__oid - else: - raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) - - def getLabel(self): - if self.__state & self.stClean: - return self.__label - else: - raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) - - def getMibNode(self): # XXX - if self.__state & self.stClean: - return self.__mibNode - else: - raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) - - def isFullyResolved(self): - return not (self.__state & self.stUnresolved) - - # - # A gateway to MIBs manipulation routines - # - - def addAsn1Sources(self, *asn1Sources): - self.__asn1SourcesToAdd = asn1Sources - return self - - def addMibSource(self, *mibSources): - self.__mibSourcesToAdd = mibSources - return self - - # provides deferred MIBs load - def loadMibs(self, *modNames): - self.__modNamesToLoad = modNames - return self - - # this would eventually be called by an entity which posses a - # reference to MibViewController - def resolveWithMib(self, mibViewController, oidOnly=False): - if self.__mibSourcesToAdd is not None: - mibSources = tuple( - [ ZipMibSource(x) for x in self.__mibSourcesToAdd ] - ) + mibViewController.mibBuilder.getMibSources() - mibViewController.mibBuilder.setMibSources(*mibSources) - self.__mibSourcesToAdd = None - - if self.__asn1SourcesToAdd is not None: - addMibCompiler( - mibViewController.mibBuilder, - sources=self.__asn1SourcesToAdd - ) - self.__asn1SourcesToAdd = None - - if self.__modNamesToLoad is not None: - mibViewController.mibBuilder.loadModules(*self.__modNamesToLoad) - self.__modNamesToLoad = None - - if self.__state & (self.stOidOnly | self.stClean): - return self - - MibScalar, MibTableColumn, = mibViewController.mibBuilder.importSymbols('SNMPv2-SMI', 'MibScalar', 'MibTableColumn') - - if len(self.__args) == 1: # OID or label - try: - self.__oid = rfc1902.ObjectName(self.__args[0]) - except PyAsn1Error: - try: - label = tuple(self.__args[0].split('.')) - except ValueError: - raise PySnmpError('Bad OID format %s' % (self.__args[0],)) - prefix, label, suffix = mibViewController.getNodeNameByOid( - label - ) - - if suffix: - try: - suffix = tuple([ int(x) for x in suffix ]) - except ValueError: - raise PySnmpError('Unknown object name component %s' % (suffix,)) - - self.__oid = rfc1902.ObjectName(prefix + suffix) - - self.__state |= self.stOidOnly - - if oidOnly: - return self - else: - self.__state |= self.stOidOnly - - if oidOnly: - return self - - prefix, label, suffix = mibViewController.getNodeNameByOid( - self.__oid - ) - - modName, symName, _ = mibViewController.getNodeLocation(prefix) - - self.__modName = modName - self.__symName = symName - - self.__label = label - - mibNode, = mibViewController.mibBuilder.importSymbols( - modName, symName - ) - - self.__mibNode = mibNode - - if isinstance(mibNode, MibTableColumn): # table column - rowModName, rowSymName, _ = mibViewController.getNodeLocation( - mibNode.name[:-1] - ) - rowNode, = mibViewController.mibBuilder.importSymbols( - rowModName, rowSymName - ) - self.__indices = rowNode.getIndicesFromInstId(suffix) - elif isinstance(mibNode, MibScalar): # scalar - self.__indices = ( rfc1902.ObjectName(suffix), ) - else: - self.__indices = ( rfc1902.ObjectName(suffix), ) - self.__state |= self.stUnresolved - self.__state |= self.stClean - return self - elif len(self.__args) > 1: # MIB, symbol[, index, index ...] - self.__modName = self.__args[0] - if self.__args[1]: - self.__symName = self.__args[1] - else: - mibViewController.mibBuilder.loadModules(self.__modName) - oid, _, _ = mibViewController.getFirstNodeName(self.__modName) - _, self.__symName, _ = mibViewController.getNodeLocation(oid) - - mibNode, = mibViewController.mibBuilder.importSymbols( - self.__modName, self.__symName - ) - - self.__mibNode = mibNode - - self.__indices = () - self.__oid = rfc1902.ObjectName(mibNode.getName()) - - prefix, label, suffix = mibViewController.getNodeNameByOid( - self.__oid - ) - self.__label = label - - if isinstance(mibNode, MibTableColumn): # table - rowModName, rowSymName, _ = mibViewController.getNodeLocation( - mibNode.name[:-1] - ) - rowNode, = mibViewController.mibBuilder.importSymbols( - rowModName, rowSymName - ) - if self.__args[2:]: - instIds = rowNode.getInstIdFromIndices(*self.__args[2:]) - self.__oid += instIds - self.__indices = rowNode.getIndicesFromInstId(instIds) - elif self.__args[2:]: # any other kind of MIB node with indices - instId = rfc1902.ObjectName( - '.'.join([ str(x) for x in self.__args[2:] ]) - ) - self.__oid += instId - self.__indices = ( instId, ) - self.__state |= (self.stClean | self.stOidOnly) - return self - else: - raise PySnmpError('Non-OID, label or MIB symbol') - - def prettyPrint(self): - if self.__state & self.stClean: - return '%s::%s.%s' % ( - self.__modName, self.__symName, - '.'.join(['"%s"' % x.prettyPrint() for x in self.__indices ]) - ) - else: - raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) - - def __repr__(self): - return '%s(%s)' % (self.__class__.__name__, ', '.join([ repr(x) for x in self.__args])) - - # Redirect some attrs access to the OID object to behave alike - - def __str__(self): - if self.__state & self.stOidOnly: - return str(self.__oid) - else: - raise PySnmpError('%s object not properly initialized' % self.__class__.__name__) - - def __eq__(self, other): - if self.__state & self.stOidOnly: - return self.__oid == other - else: - raise PySnmpError('%s object not properly initialized' % self.__class__.__name__) - - def __ne__(self, other): - if self.__state & self.stOidOnly: - return self.__oid != other - else: - raise PySnmpError('%s object not properly initialized' % self.__class__.__name__) - - def __lt__(self, other): - if self.__state & self.stOidOnly: - return self.__oid < other - else: - raise PySnmpError('%s object not properly initialized' % self.__class__.__name__) - - def __le__(self, other): - if self.__state & self.stOidOnly: - return self.__oid <= other - else: - raise PySnmpError('%s object not properly initialized' % self.__class__.__name__) - - def __gt__(self, other): - if self.__state & self.stOidOnly: - return self.__oid > other - else: - raise PySnmpError('%s object not properly initialized' % self.__class__.__name__) - - def __ge__(self, other): - if self.__state & self.stOidOnly: - return self.__oid > other - else: - raise PySnmpError('%s object not properly initialized' % self.__class__.__name__) - - def __nonzero__(self): - if self.__state & self.stOidOnly: - return self.__oid != 0 - else: - raise PySnmpError('%s object not properly initialized' % self.__class__.__name__) - - def __bool__(self): - if self.__state & self.stOidOnly: - return bool(self.__oid) - else: - raise PySnmpError('%s object not properly initialized' % self.__class__.__name__) - - def __getitem__(self, i): - if self.__state & self.stOidOnly: - return self.__oid[i] - else: - raise PySnmpError('%s object not properly initialized' % self.__class__.__name__) - - def __len__(self): - if self.__state & self.stOidOnly: - return len(self.__oid) - else: - raise PySnmpError('%s object not properly initialized' % self.__class__.__name__) - - def __add__(self, other): - if self.__state & self.stOidOnly: - return self.__oid + other - else: - raise PySnmpError('%s object not properly initialized' % self.__class__.__name__) - - def __radd__(self, other): - if self.__state & self.stOidOnly: - return other + self.__oid - else: - raise PySnmpError('%s object not properly initialized' % self.__class__.__name__) - - def __hash__(self): - if self.__state & self.stOidOnly: - return hash(self.__oid) - else: - raise PySnmpError('%s object not properly initialized' % self.__class__.__name__) - - def __getattr__(self, attr): - if self.__state & self.stOidOnly: - if attr in ( 'asTuple', 'clone', 'subtype', 'isPrefixOf', - 'isSameTypeWith', 'isSuperTypeOf'): - return getattr(self.__oid, attr) - raise AttributeError - else: - raise PySnmpError('%s object not properly initialized for %s access' % (self.__class__.__name__, attr)) +class MibVariable(rfc1902.ObjectIdentity): pass +class MibVariableBinding(rfc1902.ObjectType): pass diff --git a/pysnmp/entity/rfc3413/oneliner/ntforg.py b/pysnmp/entity/rfc3413/oneliner/ntforg.py index c81667d..8fc20d0 100644 --- a/pysnmp/entity/rfc3413/oneliner/ntforg.py +++ b/pysnmp/entity/rfc3413/oneliner/ntforg.py @@ -1,12 +1,14 @@ from pyasn1.compat.octets import null from pysnmp import nextid, error from pysnmp.entity import engine, config +from pysnmp.smi.rfc1902 import ObjectIdentity, ObjectType, NotificationType from pysnmp.entity.rfc3413 import ntforg, context -from pysnmp.entity.rfc3413.oneliner.mibvar import MibVariable from pysnmp.entity.rfc3413.oneliner.auth import CommunityData, UsmUserData from pysnmp.entity.rfc3413.oneliner.target import UdpTransportTarget, \ Udp6TransportTarget, UnixTransportTarget from pysnmp.entity.rfc3413.oneliner import cmdgen +# obsolete, compatibility symbols +from pysnmp.entity.rfc3413.oneliner.mibvar import MibVariable # Auth protocol usmHMACMD5AuthProtocol = config.usmHMACMD5AuthProtocol @@ -21,6 +23,8 @@ usmAesCfb192Protocol = config.usmAesCfb192Protocol usmAesCfb256Protocol = config.usmAesCfb256Protocol usmNoPrivProtocol = config.usmNoPrivProtocol +ContextData = cmdgen.ContextData + nextID = nextid.Integer(0xffffffff) class AsyncNotificationOriginator: @@ -28,21 +32,18 @@ class AsyncNotificationOriginator: self.__asyncCmdGen = cmdgen.AsyncCommandGenerator() def _getCache(self, snmpEngine): - if 'ntforg' not in snmpEngine.cache: - snmpEngine.cache['ntforg'] = { - 'auth': {}, - 'name': {} - } - return snmpEngine.cache['ntforg'] + cache = snmpEngine.getUserContext('ntforg') + if cache is None: + cache = { 'auth': {}, 'name': {} } + snmpEngine.setUserContext(ntforg=cache) + return cache def getMibViewController(self, snmpEngine): return self.__asyncCmdGen.getMibViewController(snmpEngine) def cfgNtfOrg(self, snmpEngine, authData, transportTarget, notifyType): cache = self._getCache(snmpEngine) - addrName, paramsName = self.__asyncCmdGen.cfgCmdGen( - snmpEngine, authData, transportTarget - ) + addrName, paramsName = self.__asyncCmdGen.cfgCmdGen( snmpEngine, authData, transportTarget ) tagList = transportTarget.tagList.split() if not tagList: tagList = [''] @@ -118,18 +119,30 @@ class AsyncNotificationOriginator: ) del cache['auth'][authDataKey] - def makeVarBinds(self, snmpEngine, varBinds, oidOnly=False): - return self.__asyncCmdGen.makeVarBinds(snmpEngine, varBinds, oidOnly) + def makeVarBinds(self, snmpEngine, varBinds): + mibViewController = self.getMibViewController(snmpEngine) + if isinstance(varBinds, NotificationType): + varBinds.resolveWithMib(mibViewController) + __varBinds = [] + for varBind in varBinds: + if isinstance(varBind, ObjectType): + pass + elif isinstance(varBind[0], ObjectIdentity): + varBind = ObjectType(*varBind) + else: + varBind = ObjectType(ObjectIdentity(varBind[0]), varBind[1]) + __varBinds.append(varBind.resolveWithMib(mibViewController)) + return __varBinds def unmakeVarBinds(self, snmpEngine, varBinds, lookupNames, lookupValues): - return self.__asyncCmdGen.unmakeVarBinds(snmpEngine, varBinds, - lookupNames, lookupValues) - + if lookupNames or lookupValues: + mibViewController = self.getMibViewController(snmpEngine) + varBinds = [ ObjectType(ObjectIdentity(x[0]), x[1]).resolveWithMib(mibViewController) for x in varBinds ] + return varBinds + def sendNotification(self, snmpEngine, - authData, transportTarget, - snmpContext, contextName, + authData, transportTarget, contextData, notifyType, - notificationType, instanceIndex, varBinds=(), cbInfo=(None, None), lookupNames=False, lookupValues=False): @@ -148,7 +161,7 @@ class AsyncNotificationOriginator: cbCtx ) - (cbFun, cbCtx) = cbInfo + cbFun, cbCtx = cbInfo # Create matching transport tags if not given by user if not transportTarget.tagList: @@ -161,12 +174,8 @@ class AsyncNotificationOriginator: notifyName = self.cfgNtfOrg( snmpEngine, authData, transportTarget, notifyType ) - if isinstance(notificationType, MibVariable): - notificationType = notificationType.resolveWithMib( - self.getMibViewController(snmpEngine), oidOnly=True - ) - return ntforg.NotificationOriginator().sendVarBinds(snmpEngine, notifyName, snmpContext, contextName, notificationType, instanceIndex, self.makeVarBinds(snmpEngine, varBinds), __cbFun, (lookupNames, lookupValues, cbFun, cbCtx)) + return ntforg.NotificationOriginator().sendVarBinds(snmpEngine, notifyName, contextData.contextEngineId, contextData.contextName, self.makeVarBinds(snmpEngine, varBinds), __cbFun, (lookupNames, lookupValues, cbFun, cbCtx)) # substitute sendNotification return object for backward compatibility class ErrorIndicationReturn: @@ -176,7 +185,9 @@ class ErrorIndicationReturn: def __bool__(self): return bool(self.__vars[0]) def __str__(self): return str(self.__vars[0]) -# compatibility implementation, never use this class for new applications +# +# Compatibility implementation, never use this class for new applications +# class AsynNotificationOriginator: def __init__(self, snmpEngine=None, snmpContext=None): if snmpEngine is None: @@ -208,9 +219,10 @@ class AsynNotificationOriginator: def sendNotification(self, authData, transportTarget, notifyType, notificationType, - varBinds=(), + varBinds=(), # legacy, use NotificationType instead cbInfo=(None, None), lookupNames=False, lookupValues=False, + contextEngineId=None, # XXX ordering incompatibility contextName=null): def __cbFun(snmpEngine, sendRequestHandle, errorIndication, @@ -237,16 +249,23 @@ class AsynNotificationOriginator: if contextName is null and authData.contextName: contextName = authData.contextName - # legacy - if not isinstance(notificationType, MibVariable) and \ - isinstance(notificationType[0], tuple): - notificationType = MibVariable(notificationType[0][0], notificationType[0][1], *notificationType[1:]).resolveWithMib(self.mibViewController) + if not isinstance(notificationType, + (ObjectIdentity, ObjectType, NotificationType)): + if isinstance(notificationType[0], tuple): + # legacy + notificationType = ObjectIdentity(notificationType[0][0], notificationType[0][1], *notificationType[1:]) + else: + notificationType = ObjectIdentity(notificationType) + + if not isinstance(notificationType, NotificationType): + notificationType = NotificationType(notificationType) return self.__asyncNtfOrg.sendNotification( self.snmpEngine, authData, transportTarget, - self.snmpContext, contextName, - notifyType, notificationType, None, varBinds, + ContextData(contextEngineId or self.snmpContext.contextEngineId, + contextName), + notifyType, notificationType.addVarBinds(*varBinds), (__cbFun, cbInfo), lookupNames, lookupValues ) @@ -262,6 +281,8 @@ class NotificationOriginator: else: self.__asynNtfOrg = asynNtfOrg + # the varBinds parameter is legacy, use NotificationType instead + def sendNotification(self, authData, transportTarget, notifyType, notificationType, *varBinds, **kwargs): def __cbFun(sendRequestHandle, errorIndication, @@ -276,6 +297,7 @@ class NotificationOriginator: varBinds, (__cbFun, appReturn), kwargs.get('lookupNames', False), kwargs.get('lookupValues', False), + kwargs.get('contextEngineId'), kwargs.get('contextName', null) ) self.__asynNtfOrg.snmpEngine.transportDispatcher.runDispatcher() diff --git a/pysnmp/smi/compiler.py b/pysnmp/smi/compiler.py index c5fc2d9..dde6cdc 100644 --- a/pysnmp/smi/compiler.py +++ b/pysnmp/smi/compiler.py @@ -56,7 +56,7 @@ else: ) compiler.addBorrowers( - *[ PyFileBorrower(x) for x in getReadersFromUrls(*borrowers, originalMatching=False, lowcaseMatching=False) ] + *[ PyFileBorrower(x) for x in getReadersFromUrls(*borrowers, **dict(originalMatching=False, lowcaseMatching=False)) ] ) mibBuilder.setMibCompiler(compiler, destination) diff --git a/pysnmp/smi/rfc1902.py b/pysnmp/smi/rfc1902.py new file mode 100644 index 0000000..75fa7b9 --- /dev/null +++ b/pysnmp/smi/rfc1902.py @@ -0,0 +1,463 @@ +import sys +from pysnmp.proto import rfc1902, rfc1905 +from pysnmp.proto.api import v2c +from pysnmp.smi.builder import ZipMibSource +from pysnmp.smi.compiler import addMibCompiler, defaultDest +from pysnmp.smi.error import SmiError +from pyasn1.type.base import AbstractSimpleAsn1Item +from pyasn1.error import PyAsn1Error +from pysnmp import debug + +# +# An OID-like object that embeds MIB resolution. +# +# Valid initializers include: +# ObjectIdentity('1.3.6.1.2.1.1.1.0') +# ObjectIdentity('iso.org.dod.internet.mgmt.mib-2.system.sysDescr.0') +# ObjectIdentity('SNMPv2-MIB', 'system') +# ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0) +# ObjectIdentity('IP-MIB', 'ipAdEntAddr', '127.0.0.1', 123) +# +class ObjectIdentity: + stDirty, stClean = 1, 2 + + def __init__(self, *args): + self.__args = args + self.__mibSourcesToAdd = self.__modNamesToLoad = None + self.__asn1SourcesToAdd = None + self.__state = self.stDirty + + # + # public API + # + def getMibSymbol(self): + if self.__state & self.stClean: + return self.__modName, self.__symName, self.__indices + else: + raise SmiError('%s object not fully initialized' % self.__class__.__name__) + + def getOid(self): + if self.__state & self.stClean: + return self.__oid + else: + raise SmiError('%s object not fully initialized' % self.__class__.__name__) + + def getLabel(self): + if self.__state & self.stClean: + return self.__label + else: + raise SmiError('%s object not fully initialized' % self.__class__.__name__) + + def getMibNode(self): + if self.__state & self.stClean: + return self.__mibNode + else: + raise SmiError('%s object not fully initialized' % self.__class__.__name__) + + # + # A gateway to MIBs manipulation routines + # + + def addMibCompiler(self, *asn1Sources, **kwargs): + if self.__asn1SourcesToAdd is None: + self.__asn1SourcesToAdd = asn1Sources + else: + self.__asn1SourcesToAdd += asn1Sources + self.__mibDir = kwargs.get('destDir', defaultDest) + return self + + def addMibSource(self, *mibSources): + if self.__mibSourcesToAdd is None: + self.__mibSourcesToAdd = mibSources + else: + self.__mibSourcesToAdd += mibSources + return self + + # provides deferred MIBs load + def loadMibs(self, *modNames): + if self.__modNamesToLoad is None: + self.__modNamesToLoad = modNames + else: + self.__modNamesToLoad += modNames + return self + + # this would eventually be called by an entity which posses a + # reference to MibViewController + def resolveWithMib(self, mibViewController): + if self.__mibSourcesToAdd is not None: + debug.logger & debug.flagMIB and debug.logger('adding MIB sources %s' % ', '.join(self.__mibSourcesToAdd)) + mibViewController.mibBuilder.addMibSources( + *[ ZipMibSource(x) for x in self.__mibSourcesToAdd ] + ) + self.__mibSourcesToAdd = None + + if self.__asn1SourcesToAdd is not None: + debug.logger & debug.flagMIB and debug.logger('adding MIB compiler with source paths %s' % ', '.join(self.__asn1SourcesToAdd)) + addMibCompiler( + mibViewController.mibBuilder, + sources=self.__asn1SourcesToAdd, + destination=self.__mibDir + ) + self.__asn1SourcesToAdd = self.__mibDir = None + + if self.__modNamesToLoad is not None: + debug.logger & debug.flagMIB and debug.logger('loading MIB modules %s' % ', '.join(self.__modNamesToLoad)) + mibViewController.mibBuilder.loadModules(*self.__modNamesToLoad) + self.__modNamesToLoad = None + + if self.__state & self.stClean: + return self + + MibScalar, MibTableColumn = mibViewController.mibBuilder.importSymbols('SNMPv2-SMI', 'MibScalar', 'MibTableColumn') + + self.__indices = () + + if len(self.__args) == 1: # OID or label + debug.logger & debug.flagMIB and debug.logger('resolving %s as OID or label' % self.__args) + try: + self.__oid = rfc1902.ObjectName(self.__args[0]) + except PyAsn1Error: + try: + label = tuple(self.__args[0].split('.')) + except ValueError: + raise SmiError('Bad OID format %r' % (self.__args[0],)) + prefix, label, suffix = mibViewController.getNodeNameByOid( + label + ) + + if suffix: + try: + suffix = tuple([ int(x) for x in suffix ]) + except ValueError: + raise SmiError('Unknown object name component %r' % (suffix,)) + self.__oid = rfc1902.ObjectName(prefix + suffix) + else: + prefix, label, suffix = mibViewController.getNodeNameByOid( + self.__oid + ) + + debug.logger & debug.flagMIB and debug.logger('resolved %r into prefix %r and suffix %r' % (self.__args, prefix, suffix)) + + modName, symName, _ = mibViewController.getNodeLocation(prefix) + + self.__modName = modName + self.__symName = symName + + self.__label = label + + mibNode, = mibViewController.mibBuilder.importSymbols( + modName, symName + ) + + self.__mibNode = mibNode + + debug.logger & debug.flagMIB and debug.logger('resolved prefix %r into MIB node %r' % (prefix, mibNode)) + + if isinstance(mibNode, MibTableColumn): # table column + if suffix: + rowModName, rowSymName, _ = mibViewController.getNodeLocation( + mibNode.name[:-1] + ) + rowNode, = mibViewController.mibBuilder.importSymbols( + rowModName, rowSymName + ) + self.__indices = rowNode.getIndicesFromInstId(suffix) + elif isinstance(mibNode, MibScalar): # scalar + if suffix: + self.__indices = ( rfc1902.ObjectName(suffix), ) + else: + if suffix: + self.__indices = ( rfc1902.ObjectName(suffix), ) + self.__state |= self.stClean + + debug.logger & debug.flagMIB and debug.logger('resolved indices are %r' % (self.__indices,)) + + return self + elif len(self.__args) > 1: # MIB, symbol[, index, index ...] + self.__modName = self.__args[0] + if self.__args[1]: + self.__symName = self.__args[1] + else: + mibViewController.mibBuilder.loadModules(self.__modName) + oid, _, _ = mibViewController.getFirstNodeName(self.__modName) + _, self.__symName, _ = mibViewController.getNodeLocation(oid) + + mibNode, = mibViewController.mibBuilder.importSymbols( + self.__modName, self.__symName + ) + + self.__mibNode = mibNode + + self.__oid = rfc1902.ObjectName(mibNode.getName()) + + prefix, label, suffix = mibViewController.getNodeNameByOid( + self.__oid + ) + self.__label = label + + debug.logger & debug.flagMIB and debug.logger('resolved %r into prefix %r and suffix %r' % (self.__args, prefix, suffix)) + + if isinstance(mibNode, MibTableColumn): # table + rowModName, rowSymName, _ = mibViewController.getNodeLocation( + mibNode.name[:-1] + ) + rowNode, = mibViewController.mibBuilder.importSymbols( + rowModName, rowSymName + ) + if self.__args[2:]: + try: + instIds = rowNode.getInstIdFromIndices(*self.__args[2:]) + self.__oid += instIds + self.__indices = rowNode.getIndicesFromInstId(instIds) + except PyAsn1Error: + raise SmiError('Instance index %r to OID convertion failure at object %r: %s' % (self.__args[2:], mibNode.getLabel(), sys.exc_info()[1])) + elif self.__args[2:]: # any other kind of MIB node with indices + if self.__args[2:]: + instId = rfc1902.ObjectName( + '.'.join([ str(x) for x in self.__args[2:] ]) + ) + self.__oid += instId + self.__indices = ( instId, ) + self.__state |= self.stClean + + debug.logger & debug.flagMIB and debug.logger('resolved indices are %r' % (self.__indices,)) + + return self + else: + raise SmiError('Non-OID, label or MIB symbol') + + def prettyPrint(self): + if self.__state & self.stClean: + return '%s::%s%s%s' % ( + self.__modName, self.__symName, + self.__indices and '.' or '', + '.'.join(['"%s"' % x.prettyPrint() for x in self.__indices ]) + ) + else: + raise SmiError('%s object not fully initialized' % self.__class__.__name__) + + def __repr__(self): + return '%s(%s)' % (self.__class__.__name__, ', '.join([ repr(x) for x in self.__args])) + + # Redirect some attrs access to the OID object to behave alike + + def __str__(self): + if self.__state & self.stClean: + return str(self.__oid) + else: + raise SmiError('%s object not properly initialized' % self.__class__.__name__) + + def __eq__(self, other): + if self.__state & self.stClean: + return self.__oid == other + else: + raise SmiError('%s object not properly initialized' % self.__class__.__name__) + + def __ne__(self, other): + if self.__state & self.stClean: + return self.__oid != other + else: + raise SmiError('%s object not properly initialized' % self.__class__.__name__) + + def __lt__(self, other): + if self.__state & self.stClean: + return self.__oid < other + else: + raise SmiError('%s object not properly initialized' % self.__class__.__name__) + + def __le__(self, other): + if self.__state & self.stClean: + return self.__oid <= other + else: + raise SmiError('%s object not properly initialized' % self.__class__.__name__) + + def __gt__(self, other): + if self.__state & self.stClean: + return self.__oid > other + else: + raise SmiError('%s object not properly initialized' % self.__class__.__name__) + + def __ge__(self, other): + if self.__state & self.stClean: + return self.__oid > other + else: + raise SmiError('%s object not properly initialized' % self.__class__.__name__) + + def __nonzero__(self): + if self.__state & self.stClean: + return self.__oid != 0 + else: + raise SmiError('%s object not properly initialized' % self.__class__.__name__) + + def __bool__(self): + if self.__state & self.stClean: + return bool(self.__oid) + else: + raise SmiError('%s object not properly initialized' % self.__class__.__name__) + + def __getitem__(self, i): + if self.__state & self.stClean: + return self.__oid[i] + else: + raise SmiError('%s object not properly initialized' % self.__class__.__name__) + + def __len__(self): + if self.__state & self.stClean: + return len(self.__oid) + else: + raise SmiError('%s object not properly initialized' % self.__class__.__name__) + + def __add__(self, other): + if self.__state & self.stClean: + return self.__oid + other + else: + raise SmiError('%s object not properly initialized' % self.__class__.__name__) + + def __radd__(self, other): + if self.__state & self.stClean: + return other + self.__oid + else: + raise SmiError('%s object not properly initialized' % self.__class__.__name__) + + def __hash__(self): + if self.__state & self.stClean: + return hash(self.__oid) + else: + raise SmiError('%s object not properly initialized' % self.__class__.__name__) + + def __getattr__(self, attr): + if self.__state & self.stClean: + if attr in ( 'asTuple', 'clone', 'subtype', 'isPrefixOf', + 'isSameTypeWith', 'isSuperTypeOf'): + return getattr(self.__oid, attr) + raise AttributeError + else: + raise SmiError('%s object not properly initialized for accessing %s' % (self.__class__.__name__, attr)) + +# A two-element sequence of ObjectIdentity and SNMP data type object +class ObjectType: + stDirty, stClean = 1, 2 + def __init__(self, objectIdentity, objectSyntax=rfc1905.unSpecified): + if not isinstance(objectIdentity, ObjectIdentity): + raise SmiError('initializer should be ObjectIdentity instance, not %r' % (objectIdentity,)) + self.__args = [ objectIdentity, objectSyntax ] + self.__state = self.stDirty + + def __getitem__(self, i): + if self.__state & self.stClean: + return self.__args[i] + else: + raise SmiError('%s object not fully initialized' % self.__class__.__name__) + + def __repr__(self): + return '%s(%s)' % (self.__class__.__name__, ', '.join([ repr(x) for x in self.__args])) + + def resolveWithMib(self, mibViewController): + if self.__state & self.stClean: + return self + + self.__args[0].resolveWithMib(mibViewController) + + MibScalar, MibTableColumn = mibViewController.mibBuilder.importSymbols('SNMPv2-SMI', 'MibScalar', 'MibTableColumn') + + if not isinstance(self.__args[0].getMibNode(), + (MibScalar, MibTableColumn)): + if not isinstance(self.__args[1], AbstractSimpleAsn1Item): + raise SmiError('MIB object %r is not OBJECT-TYPE (MIB not loaded?)' % (self.__args[0],)) + self.__state |= self.stClean + return self + + if isinstance(self.__args[1], (rfc1905.UnSpecified, + rfc1905.NoSuchObject, + rfc1905.NoSuchInstance, + rfc1905.EndOfMibView)): + self.__state |= self.stClean + return self + + try: + self.__args[1] = self.__args[0].getMibNode().getSyntax().clone(self.__args[1]) + except PyAsn1Error: + raise SmiError('Value %r to type %r convertion failure: %s' % (self.__args[1], self.__args[0].getMibNode().getSyntax().__class__.__name__, sys.exc_info()[1])) + + self.__state |= self.stClean + + debug.logger & debug.flagMIB and debug.logger('resolved %r syntax is %r' % (self.__args[0], self.__args[1])) + + return self + + def prettyPrint(self): + if self.__state & self.stClean: + return '%s = %s' % (self.__args[0].prettyPrint(), + self.__args[1].prettyPrint()) + else: + raise SmiError('%s object not fully initialized' % self.__class__.__name__) + +# A sequence of ObjectType's +class NotificationType: + stDirty, stClean = 1, 2 + def __init__(self, objectIdentity, instanceIndex=(), objects={}): + if not isinstance(objectIdentity, ObjectIdentity): + raise SmiError('initializer should be ObjectIdentity instance, not %r' % (objectIdentity,)) + self.__objectIdentity = objectIdentity + self.__instanceIndex = instanceIndex + self.__objects = objects + self.__varBinds = [] + self.__additionalVarBinds = [] + self.__state = self.stDirty + + def __getitem__(self, i): + if self.__state & self.stClean: + return self.__varBinds[i] + else: + raise SmiError('%s object not fully initialized' % self.__class__.__name__) + + def __repr__(self): + return '%s(%r, %r, %r)' % (self.__class__.__name__, self.__objectIdentity, self.__instanceIndex, self.__objects) + + def addVarBinds(self, *varBinds): + debug.logger & debug.flagMIB and debug.logger('additional var-binds: %r' % (varBinds,)) + if self.__state & self.stClean: + self.__varBinds.extend(varBinds) + else: + self.__additionalVarBinds.extend(varBinds) + return self + + def resolveWithMib(self, mibViewController): + if self.__state & self.stClean: + return self + + self.__objectIdentity.resolveWithMib(mibViewController) + + self.__varBinds.append( + ObjectType(ObjectIdentity(v2c.apiTrapPDU.snmpTrapOID), + self.__objectIdentity).resolveWithMib(mibViewController) + ) + + NotificationType, = mibViewController.mibBuilder.importSymbols('SNMPv2-SMI', 'NotificationType') + + mibNode = self.__objectIdentity.getMibNode() + + if isinstance(mibNode, NotificationType): + for notificationObject in mibNode.getObjects(): + objectIdentity = ObjectIdentity(*notificationObject+self.__instanceIndex).resolveWithMib(mibViewController) + self.__varBinds.append( + ObjectType(objectIdentity, self.__objects.get(notificationObject, rfc1905.unSpecified)).resolveWithMib(mibViewController) + ) + else: + debug.logger & debug.flagMIB and debug.logger('WARNING: MIB object %r is not NOTIFICATION-TYPE (MIB not loaded?)' % (self.__objectIdentity,)) + + if self.__additionalVarBinds: + self.__varBinds.extend(self.__additionalVarBinds) + self.__additionalVarBinds = [] + + self.__state |= self.stClean + + debug.logger & debug.flagMIB and debug.logger('resolved %r into %r' % (self.__objectIdentity, self.__varBinds)) + + return self + + def prettyPrint(self): + if self.__state & self.stClean: + return ' '.join([ '%s = %s' % (x[0].prettyPrint(), x[1].prettyPrint()) for x in self.__varBinds]) + else: + raise SmiError('%s object not fully initialized' % self.__class__.__name__) -- cgit v1.2.1