summaryrefslogtreecommitdiff
path: root/pysnmp
diff options
context:
space:
mode:
Diffstat (limited to 'pysnmp')
-rw-r--r--pysnmp/entity/config.py25
-rw-r--r--pysnmp/entity/engine.py19
-rw-r--r--pysnmp/entity/rfc3413/config.py32
-rw-r--r--pysnmp/entity/rfc3413/ntforg.py129
-rw-r--r--pysnmp/entity/rfc3413/oneliner/cmdgen.py87
-rw-r--r--pysnmp/entity/rfc3413/oneliner/mibvar.py315
-rw-r--r--pysnmp/entity/rfc3413/oneliner/ntforg.py86
-rw-r--r--pysnmp/smi/compiler.py2
-rw-r--r--pysnmp/smi/rfc1902.py463
9 files changed, 676 insertions, 482 deletions
diff --git a/pysnmp/entity/config.py b/pysnmp/entity/config.py
index f9018e9..085eea5 100644
--- a/pysnmp/entity/config.py
+++ b/pysnmp/entity/config.py
@@ -325,12 +325,17 @@ def addTransport(snmpEngine, transportDomain, transport):
transport.protoTransportDispatcher()
)
# here we note that we have created transportDispatcher automatically
- snmpEngine.cache['automaticTransportDispatcher'] = 0
+ snmpEngine.setUserContext(automaticTransportDispatcher=0)
snmpEngine.transportDispatcher.registerTransport(
transportDomain, transport
)
- if 'automaticTransportDispatcher' in snmpEngine.cache:
- snmpEngine.cache['automaticTransportDispatcher'] += 1
+ automaticTransportDispatcher = snmpEngine.getUserContext(
+ 'automaticTransportDispatcher'
+ )
+ if automaticTransportDispatcher is not None:
+ snmpEngine.setUserContext(
+ automaticTransportDispatcher=automaticTransportDispatcher+1
+ )
def getTransport(snmpEngine, transportDomain):
if not snmpEngine.transportDispatcher:
@@ -346,12 +351,18 @@ def delTransport(snmpEngine, transportDomain):
transport = getTransport(snmpEngine, transportDomain)
snmpEngine.transportDispatcher.unregisterTransport(transportDomain)
# automatically shutdown automatically created transportDispatcher
- if 'automaticTransportDispatcher' in snmpEngine.cache:
- snmpEngine.cache['automaticTransportDispatcher'] -= 1
- if not snmpEngine.cache['automaticTransportDispatcher']:
+ automaticTransportDispatcher = snmpEngine.getUserContext(
+ 'automaticTransportDispatcher'
+ )
+ if automaticTransportDispatcher is not None:
+ automaticTransportDispatcher -= 1
+ snmpEngine.setUserContext(
+ automaticTransportDispatcher=automaticTransportDispatcher
+ )
+ if not automaticTransportDispatcher:
snmpEngine.transportDispatcher.closeDispatcher()
snmpEngine.unregisterTransportDispatcher()
- del snmpEngine.cache['automaticTransportDispatcher']
+ snmpEngine.delUserContext(automaticTransportDispatcher)
return transport
addSocketTransport = addTransport
diff --git a/pysnmp/entity/engine.py b/pysnmp/entity/engine.py
index ecc0aa5..d663d91 100644
--- a/pysnmp/entity/engine.py
+++ b/pysnmp/entity/engine.py
@@ -140,3 +140,22 @@ class SnmpEngine:
self.transportDispatcher.unregisterRecvCbFun(recvId)
self.transportDispatcher.unregisterTimerCbFun()
self.transportDispatcher = None
+
+ def getMibBuilder(self):
+ return self.msgAndPduDsp.mibInstrumController.mibBuilder
+
+ # User app may attach opaque objects to SNMP Engine
+ def setUserContext(self, **kwargs):
+ self.cache.update(
+ dict([('__%s' % k, kwargs[k]) for k in kwargs])
+ )
+
+ def getUserContext(self, arg):
+ return self.cache.get('__%s' % arg)
+
+ def delUserContext(self, arg):
+ try:
+ del self.cache['__%s' % arg]
+ except KeyError:
+ pass
+
diff --git a/pysnmp/entity/rfc3413/config.py b/pysnmp/entity/rfc3413/config.py
index 1124600..68218eb 100644
--- a/pysnmp/entity/rfc3413/config.py
+++ b/pysnmp/entity/rfc3413/config.py
@@ -9,10 +9,10 @@ def getTargetAddr(snmpEngine, snmpTargetAddrName):
'SNMP-TARGET-MIB', 'snmpTargetAddrEntry'
)
- if 'getTargetAddr' not in snmpEngine.cache:
- snmpEngine.cache['getTargetAddr'] = { 'id': -1 }
-
- cache = snmpEngine.cache['getTargetAddr']
+ cache = snmpEngine.getUserContext('getTargetAddr')
+ if cache is None:
+ cache = { 'id': -1 }
+ snmpEngine.setUserContext(getTargetAddr=cache)
if cache['id'] != snmpTargetAddrEntry.branchVersionId:
cache['nameToTargetMap'] = {}
@@ -98,10 +98,10 @@ def getTargetParams(snmpEngine, paramsName):
'SNMP-TARGET-MIB', 'snmpTargetParamsEntry'
)
- if 'getTargetParams' not in snmpEngine.cache:
- snmpEngine.cache['getTargetParams'] = { 'id': -1 }
-
- cache = snmpEngine.cache['getTargetParams']
+ cache = snmpEngine.getUserContext('getTargetParams')
+ if cache is None:
+ cache = { 'id': -1 }
+ snmpEngine.setUserContext(getTargetParams=cache)
if cache['id'] != snmpTargetParamsEntry.branchVersionId:
cache['nameToParamsMap'] = {}
@@ -178,10 +178,10 @@ def getNotificationInfo(snmpEngine, notificationTarget):
'SNMP-NOTIFICATION-MIB', 'snmpNotifyEntry'
)
- if 'getNotificationInfo' not in snmpEngine.cache:
- snmpEngine.cache['getNotificationInfo'] = { 'id': -1 }
-
- cache = snmpEngine.cache['getNotificationInfo']
+ cache = snmpEngine.getUserContext('getNotificationInfo')
+ if cache is None:
+ cache = { 'id': -1 }
+ snmpEngine.setUserContext(getNotificationInfo=cache)
if cache['id'] != snmpNotifyEntry.branchVersionId:
cache['targetToNotifyMap'] = {}
@@ -225,10 +225,10 @@ def getTargetNames(snmpEngine, tag):
'SNMP-TARGET-MIB', 'snmpTargetAddrEntry'
)
- if 'getTargetNames' not in snmpEngine.cache:
- snmpEngine.cache['getTargetNames'] = { 'id': -1 }
-
- cache = snmpEngine.cache['getTargetNames']
+ cache = snmpEngine.getUserContext('getTargetNames')
+ if cache is None:
+ cache = { 'id': -1 }
+ snmpEngine.setUserContext(getTargetNames=cache)
if cache['id'] == snmpTargetAddrEntry.branchVersionId:
tagToTargetsMap = cache['tagToTargetsMap']
diff --git a/pysnmp/entity/rfc3413/ntforg.py b/pysnmp/entity/rfc3413/ntforg.py
index 5018049..a3d7934 100644
--- a/pysnmp/entity/rfc3413/ntforg.py
+++ b/pysnmp/entity/rfc3413/ntforg.py
@@ -5,6 +5,7 @@ from pysnmp.proto.proxy import rfc2576
from pysnmp.proto import rfc3411
from pysnmp.proto.api import v2c
from pysnmp.proto import error
+from pysnmp.smi import view, rfc1902
from pysnmp import nextid
from pysnmp import debug
@@ -254,14 +255,12 @@ class NotificationOriginator:
def sendVarBinds(self,
snmpEngine,
notificationTarget,
- snmpContext,
+ contextEngineId,
contextName,
- notificationName,
- instanceIndex,
- additionalVarBinds=(),
+ varBinds=(),
cbFun=None,
cbCtx=None):
- debug.logger & debug.flagApp and debug.logger('sendVarBinds: notificationTarget %s, notificationName %s, additionalVarBinds %s, contextName "%s", instanceIndex %s' % (notificationTarget, notificationName, additionalVarBinds, contextName, instanceIndex))
+ debug.logger & debug.flagApp and debug.logger('sendVarBinds: notificationTarget %s, contextEngineId %s, contextName "%s", varBinds %s' % (notificationTarget, contextEngineId or '<default>', contextName, varBinds))
if contextName:
__SnmpAdminString, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('SNMP-FRAMEWORK-MIB', 'SnmpAdminString')
@@ -277,10 +276,32 @@ class NotificationOriginator:
debug.logger & debug.flagApp and debug.logger('sendVarBinds: sendRequestHandle %s, notifyTag %s, notifyType %s' % (sendRequestHandle, notifyTag, notifyType))
- contextMibInstrumCtl = snmpContext.getMibInstrum(contextName)
-
- additionalVarBinds = [ (v2c.ObjectIdentifier(x),y) for x,y in additionalVarBinds ]
+ varBinds = [ (v2c.ObjectIdentifier(x),y) for x,y in varBinds ]
+ # 3.3.2 & 3.3.3
+ snmpTrapOID, sysUpTime = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('__SNMPv2-MIB', 'snmpTrapOID', 'sysUpTime')
+
+ for idx in range(len(varBinds)):
+ if idx and varBinds[idx][0] == sysUpTime.getName():
+ if varBinds[0][0] == sysUpTime.getName():
+ varBinds[0] = varBinds[idx]
+ else:
+ varBinds.insert(0, varBinds[idx])
+ del varBinds[idx]
+
+ if varBinds[0][0] != sysUpTime.getName():
+ varBinds.insert(0, (v2c.ObjectIdentifier(sysUpTime.getName()),
+ sysUpTime.getSyntax().clone()))
+
+ if len(varBinds) < 2 or varBinds[1][0] != snmpTrapOID.getName():
+ varBinds.insert(1, (v2c.ObjectIdentifier(snmpTrapOID.getName()),
+ snmpTrapOID.getSyntax()))
+
+ debug.logger & debug.flagApp and debug.logger('sendVarBinds: final varBinds %s' % (varBinds,))
+
+ cbCtx = cbFun, cbCtx
+ cbFun = self.processResponseVarBinds
+
for targetAddrName in config.getTargetNames(snmpEngine, notifyTag):
( transportDomain,
transportAddress,
@@ -292,8 +313,6 @@ class NotificationOriginator:
securityName,
securityLevel ) = config.getTargetParams(snmpEngine, params)
- debug.logger & debug.flagApp and debug.logger('sendVarBinds: sendRequestHandle %s, notifyTag %s yields: transportDomain %s, transportAddress %r, securityModel %s, securityName %s, securityLevel %s' % (sendRequestHandle, notifyTag, transportDomain, transportAddress, securityModel, securityName, securityLevel))
-
# 3.3.1 XXX
# XXX filtering's yet to be implemented
# filterProfileName = config.getNotifyFilterProfile(params)
@@ -302,57 +321,22 @@ class NotificationOriginator:
# filterMask,
# filterType ) = config.getNotifyFilter(filterProfileName)
- varBinds = []
-
- # 3.3.2 & 3.3.3
- sysUpTime, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('__SNMPv2-MIB', 'sysUpTime')
-
- for varName, varVal in additionalVarBinds:
- if varName == sysUpTime.name:
- varBinds.append((varName, varVal))
- break
- if not varBinds:
- varBinds.append((sysUpTime.name,
- sysUpTime.syntax.clone())) # for actual value
-
- snmpTrapOid, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('__SNMPv2-MIB', 'snmpTrapOID')
- if len(notificationName) == 2: # ('MIB', 'symbol')
- notificationTypeObject, = contextMibInstrumCtl.mibBuilder.importSymbols(*notificationName)
- varBinds.append((snmpTrapOid.name, v2c.ObjectIdentifier(notificationTypeObject.name)))
- debug.logger & debug.flagApp and debug.logger('sendVarBinds: notification type object is %s' % notificationTypeObject)
- for notificationObject in notificationTypeObject.getObjects():
- mibNode, = contextMibInstrumCtl.mibBuilder.importSymbols(*notificationObject)
- if instanceIndex:
- mibNode = mibNode.getNode(mibNode.name + instanceIndex)
- else:
- mibNode = mibNode.getNextNode(mibNode.name)
- varBinds.extend(
- contextMibInstrumCtl.readVars(
- [ (mibNode.name, None) ] # XXX AC is missing
- )
- )
- debug.logger & debug.flagApp and debug.logger('sendVarBinds: processed notification object %s, instance index %s, var-bind %s' % (notificationObject, instanceIndex is None and "<first>" or instanceIndex, mibNode))
- elif notificationName: # numeric OID
- varBinds.append(
- (snmpTrapOid.name,
- snmpTrapOid.syntax.clone(notificationName))
- )
- else:
- varBinds.append((snmpTrapOid.name, snmpTrapOid.syntax))
+ debug.logger & debug.flagApp and debug.logger('sendVarBinds: sendRequestHandle %s, notifyTag %s yields: transportDomain %s, transportAddress %r, securityModel %s, securityName %s, securityLevel %s' % (sendRequestHandle, notifyTag, transportDomain, transportAddress, securityModel, securityName, securityLevel))
- for varName, varVal in additionalVarBinds:
- if varName in (sysUpTime.name, snmpTrapOid.name):
+ for varName, varVal in varBinds:
+ if varName in (sysUpTime.name, snmpTrapOID.name):
continue
try:
snmpEngine.accessControlModel[self.acmID].isAccessAllowed(
snmpEngine, securityModel, securityName,
securityLevel, 'notify', contextName, varName
)
+
+ debug.logger & debug.flagApp and debug.logger('sendVarBinds: ACL succeeded for OID %s securityName %s' % (varName, securityName))
+
except error.StatusInformation:
- debug.logger & debug.flagApp and debug.logger('sendVarBinds: OID %s not allowed for %s, droppping notification' % (varName, securityName))
+ debug.logger & debug.flagApp and debug.logger('sendVarBinds: ACL denied access for OID %s securityName %s, droppping notification' % (varName, securityName))
return
- else:
- varBinds.append((varName, varVal))
# 3.3.4
if notifyType == 1:
@@ -365,14 +349,11 @@ class NotificationOriginator:
v2c.apiPDU.setDefaults(pdu)
v2c.apiPDU.setVarBinds(pdu, varBinds)
- cbCtx = cbFun, cbCtx
- cbFun = self.processResponseVarBinds
-
# 3.3.5
try:
sendPduHandle = self.sendPdu(snmpEngine,
targetAddrName,
- snmpContext.contextEngineId,
+ contextEngineId,
contextName,
pdu,
cbFun,
@@ -380,7 +361,7 @@ class NotificationOriginator:
except error.StatusInformation:
statusInformation = sys.exc_info()[1]
- debug.logger & debug.flagApp and debug.logger('sendVarBinds: sendRequestHandle %s: sendVarBindsPdu() failed with %r' % (sendRequestHandle, statusInformation))
+ debug.logger & debug.flagApp and debug.logger('sendVarBinds: sendRequestHandle %s: sendPdu() failed with %r' % (sendRequestHandle, statusInformation))
if sendRequestHandle not in self.__pendingNotifications or \
not self.__pendingNotifications[sendRequestHandle]:
if sendRequestHandle in self.__pendingNotifications:
@@ -440,13 +421,39 @@ def _sendNotification(self,
cbCtx = cbFun, cbCtx
cbFun = _sendNotificationCbFun
+ #
+ # Here we first expand trap OID into associated OBJECTS
+ # and then look them up at context-specific MIB
+ #
+
+ mibViewController = snmpEngine.getUserContext('mibViewController')
+ if not mibViewController:
+ mibViewController = view.MibViewController(snmpEngine.getMibBuilder())
+ snmpEngine.setUserContext(mibViewController=mibViewController)
+
+ # Support the following syntax:
+ # '1.2.3.4'
+ # (1,2,3,4)
+ # ('MIB', 'symbol')
+ if isinstance(notificationName, (tuple, list)) and \
+ notificationName and isinstance(notificationName[0], str):
+ notificationName = rfc1902.ObjectIdentity(*notificationName)
+ else:
+ notificationName = rfc1902.ObjectIdentity(notificationName)
+
+ varBinds = rfc1902.NotificationType(
+ notificationName, instanceIndex=instanceIndex
+ ).resolveWithMib(mibViewController)
+
+ mibInstrumController = self.snmpContext.getMibInstrum(contextName)
+
+ varBinds = varBinds[:1] + mibInstrumController.readVars(varBinds[1:])
+
return self.sendVarBinds(snmpEngine,
notificationTarget,
- self.snmpContext,
+ self.snmpContext.contextEngineId,
contextName,
- notificationName,
- instanceIndex,
- additionalVarBinds,
+ varBinds + list(additionalVarBinds),
cbFun,
cbCtx)
diff --git a/pysnmp/entity/rfc3413/oneliner/cmdgen.py b/pysnmp/entity/rfc3413/oneliner/cmdgen.py
index 00d8aa0..2e2a88c 100644
--- a/pysnmp/entity/rfc3413/oneliner/cmdgen.py
+++ b/pysnmp/entity/rfc3413/oneliner/cmdgen.py
@@ -1,6 +1,6 @@
from pysnmp.entity import engine, config
from pysnmp.entity.rfc3413 import cmdgen
-from pysnmp.entity.rfc3413.oneliner.mibvar import MibVariable
+from pysnmp.smi.rfc1902 import ObjectIdentity, ObjectType
from pysnmp.entity.rfc3413.oneliner.auth import CommunityData, UsmUserData
from pysnmp.entity.rfc3413.oneliner.target import UdpTransportTarget, \
Udp6TransportTarget, UnixTransportTarget
@@ -10,6 +10,8 @@ from pysnmp.smi import view
from pysnmp import nextid, error
from pyasn1.type import univ, base
from pyasn1.compat.octets import null
+# obsolete, compatibility symbols
+from pysnmp.entity.rfc3413.oneliner.mibvar import MibVariable
# Auth protocol
usmHMACMD5AuthProtocol = config.usmHMACMD5AuthProtocol
@@ -30,21 +32,22 @@ class AsyncCommandGenerator:
_null = univ.Null('')
def _getCache(self, snmpEngine):
- if 'cmdgen' not in snmpEngine.cache:
- snmpEngine.cache['cmdgen'] = {
- 'auth': {},
- 'parm': {},
- 'tran': {},
- 'addr': {},
- }
- return snmpEngine.cache['cmdgen']
+ cache = snmpEngine.getUserContext('cmdgen_cache')
+ if cache is None:
+ cache = {
+ 'auth': {}, 'parm': {}, 'tran': {}, 'addr': {}
+ }
+ snmpEngine.setUserContext(cmdgen_cache=cache)
+ return cache
def getMibViewController(self, snmpEngine):
- if 'mibViewController' not in snmpEngine.cache:
- snmpEngine.cache['mibViewController'] = view.MibViewController(
- snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder
+ mibViewController = snmpEngine.getUserContext('mibViewController')
+ if not mibViewController:
+ mibViewController = view.MibViewController(
+ snmpEngine.getMibBuilder()
)
- return snmpEngine.cache['mibViewController']
+ snmpEngine.setUserContext(mibViewController=mibViewController)
+ return mibViewController
def cfgCmdGen(self, snmpEngine, authData, transportTarget):
cache = self._getCache(snmpEngine)
@@ -203,59 +206,35 @@ class AsyncCommandGenerator:
return addrNames, paramsNames
- def makeVarBinds(self, snmpEngine, varBinds, oidOnly=False):
+ def makeVarBinds(self, snmpEngine, varBinds):
mibViewController = self.getMibViewController(snmpEngine)
__varBinds = []
- for varName, varVal in varBinds:
- if isinstance(varName, MibVariable):
- if oidOnly or isinstance(varVal, base.AbstractSimpleAsn1Item):
- varName.resolveWithMib(mibViewController, oidOnly=True)
- else:
- varName.resolveWithMib(mibViewController)
- varVal = varName.getMibNode().getSyntax().clone(varVal)
- elif isinstance(varName[0], tuple): # legacy
- varName = MibVariable(varName[0][0], varName[0][1], *varName[1:]).resolveWithMib(mibViewController)
- if not oidOnly and \
- not isinstance(varVal, base.AbstractSimpleAsn1Item):
- varVal = varName.getMibNode().getSyntax().clone(varVal)
+ for varBind in varBinds:
+ if isinstance(varBind, ObjectType):
+ pass
+ elif isinstance(varBind[0], ObjectIdentity):
+ varBind = ObjectType(*varBind)
+ elif isinstance(varBind[0][0], tuple): # legacy
+ varBind = ObjectType(ObjectIdentity(varBind[0][0][0], varBind[0][0][1], *varBind[0][1:]), varBind[1])
else:
- if oidOnly or isinstance(varVal, base.AbstractSimpleAsn1Item):
- varName = MibVariable(varName).resolveWithMib(mibViewController, oidOnly=True)
- else:
- varName = MibVariable(varName).resolveWithMib(mibViewController)
- try:
- varVal = varName.getMibNode().getSyntax().clone(varVal)
- except:
- raise error.PySnmpError('Unresolved SNMP value type for OID %s (MIB not loaded?)' % (varName,))
+ varBind = ObjectType(ObjectIdentity(varBind[0]), varBind[1])
- __varBinds.append((varName, varVal))
+ __varBinds.append(varBind.resolveWithMib(mibViewController))
return __varBinds
def unmakeVarBinds(self, snmpEngine, varBinds, lookupNames, lookupValues):
if lookupNames or lookupValues:
mibViewController = self.getMibViewController(snmpEngine)
- _varBinds = []
- for name, value in varBinds:
- varName = MibVariable(name).resolveWithMib(mibViewController)
- if lookupNames:
- name = varName
- if lookupValues:
- if value.tagSet not in (rfc1905.NoSuchObject.tagSet,
- rfc1905.NoSuchInstance.tagSet,
- rfc1905.EndOfMibView.tagSet):
- if varName.isFullyResolved():
- value = varName.getMibNode().getSyntax().clone(value)
- _varBinds.append((name, value))
- return _varBinds
- else:
- return varBinds
+ varBinds = [ ObjectType(ObjectIdentity(x[0]), x[1]).resolveWithMib(mibViewController) for x in varBinds ]
+
+ return varBinds
def makeVarBindsHead(self, snmpEngine, varNames):
return [
x[0] for x in self.makeVarBinds(
snmpEngine,
- [ (x, univ.Null('')) for x in varNames ], oidOnly=True
+ [ (x, univ.Null('')) for x in varNames ]
)
]
@@ -415,12 +394,12 @@ class AsynCommandGenerator:
# compatibility stub
def makeReadVarBinds(self, varNames):
return self.makeVarBinds(
- [ (x, univ.Null('')) for x in varNames ], oidOnly=True
+ [ (x, univ.Null('')) for x in varNames ]
)
- def makeVarBinds(self, varBinds, oidOnly=False):
+ def makeVarBinds(self, varBinds):
return self.__asyncCmdGen.makeVarBinds(
- self.snmpEngine, varBinds, oidOnly
+ self.snmpEngine, varBinds
)
def unmakeVarBinds(self, varBinds, lookupNames, lookupValues):
diff --git a/pysnmp/entity/rfc3413/oneliner/mibvar.py b/pysnmp/entity/rfc3413/oneliner/mibvar.py
index cb7b163..5e069ab 100644
--- a/pysnmp/entity/rfc3413/oneliner/mibvar.py
+++ b/pysnmp/entity/rfc3413/oneliner/mibvar.py
@@ -1,313 +1,6 @@
-from pysnmp.proto import rfc1902
-from pysnmp.smi.builder import ZipMibSource
-from pysnmp.smi.compiler import addMibCompiler
-from pysnmp.error import PySnmpError
-from pyasn1.error import PyAsn1Error
+# Obsolete interfaces, use pysnmp.smi.rfc1902 instead.
+from pysnmp.smi import rfc1902
-#
-# An OID-like object that embeds MIB resolution.
-#
-# Valid initializers include:
-# MibVariable('1.3.6.1.2.1.1.1.0'),
-# MibVariable('iso.org.dod.internet.mgmt.mib-2.system.sysDescr.0')
-# MibVariable('SNMPv2-MIB', 'system'),
-# MibVariable('SNMPv2-MIB', 'sysDescr', 0),
-# MibVariable('IP-MIB', 'ipAdEntAddr', '127.0.0.1', 123),
-#
-
-class MibVariable:
- stDirty, stOidOnly, stClean, stUnresolved = 1, 2, 4, 8
-
- def __init__(self, *args):
- self.__args = args
- self.__mibSourcesToAdd = self.__modNamesToLoad = None
- self.__asn1SourcesToAdd = None
- self.__state = self.stDirty
-
- #
- # public API
- #
- def getMibSymbol(self):
- if self.__state & self.stClean:
- return self.__modName, self.__symName, self.__indices
- else:
- raise PySnmpError('%s object not fully initialized' % self.__class__.__name__)
-
- def getOid(self):
- if self.__state & (self.stOidOnly | self.stClean):
- return self.__oid
- else:
- raise PySnmpError('%s object not fully initialized' % self.__class__.__name__)
-
- def getLabel(self):
- if self.__state & self.stClean:
- return self.__label
- else:
- raise PySnmpError('%s object not fully initialized' % self.__class__.__name__)
-
- def getMibNode(self): # XXX
- if self.__state & self.stClean:
- return self.__mibNode
- else:
- raise PySnmpError('%s object not fully initialized' % self.__class__.__name__)
-
- def isFullyResolved(self):
- return not (self.__state & self.stUnresolved)
-
- #
- # A gateway to MIBs manipulation routines
- #
-
- def addAsn1Sources(self, *asn1Sources):
- self.__asn1SourcesToAdd = asn1Sources
- return self
-
- def addMibSource(self, *mibSources):
- self.__mibSourcesToAdd = mibSources
- return self
-
- # provides deferred MIBs load
- def loadMibs(self, *modNames):
- self.__modNamesToLoad = modNames
- return self
-
- # this would eventually be called by an entity which posses a
- # reference to MibViewController
- def resolveWithMib(self, mibViewController, oidOnly=False):
- if self.__mibSourcesToAdd is not None:
- mibSources = tuple(
- [ ZipMibSource(x) for x in self.__mibSourcesToAdd ]
- ) + mibViewController.mibBuilder.getMibSources()
- mibViewController.mibBuilder.setMibSources(*mibSources)
- self.__mibSourcesToAdd = None
-
- if self.__asn1SourcesToAdd is not None:
- addMibCompiler(
- mibViewController.mibBuilder,
- sources=self.__asn1SourcesToAdd
- )
- self.__asn1SourcesToAdd = None
-
- if self.__modNamesToLoad is not None:
- mibViewController.mibBuilder.loadModules(*self.__modNamesToLoad)
- self.__modNamesToLoad = None
-
- if self.__state & (self.stOidOnly | self.stClean):
- return self
-
- MibScalar, MibTableColumn, = mibViewController.mibBuilder.importSymbols('SNMPv2-SMI', 'MibScalar', 'MibTableColumn')
-
- if len(self.__args) == 1: # OID or label
- try:
- self.__oid = rfc1902.ObjectName(self.__args[0])
- except PyAsn1Error:
- try:
- label = tuple(self.__args[0].split('.'))
- except ValueError:
- raise PySnmpError('Bad OID format %s' % (self.__args[0],))
- prefix, label, suffix = mibViewController.getNodeNameByOid(
- label
- )
-
- if suffix:
- try:
- suffix = tuple([ int(x) for x in suffix ])
- except ValueError:
- raise PySnmpError('Unknown object name component %s' % (suffix,))
-
- self.__oid = rfc1902.ObjectName(prefix + suffix)
-
- self.__state |= self.stOidOnly
-
- if oidOnly:
- return self
- else:
- self.__state |= self.stOidOnly
-
- if oidOnly:
- return self
-
- prefix, label, suffix = mibViewController.getNodeNameByOid(
- self.__oid
- )
-
- modName, symName, _ = mibViewController.getNodeLocation(prefix)
-
- self.__modName = modName
- self.__symName = symName
-
- self.__label = label
-
- mibNode, = mibViewController.mibBuilder.importSymbols(
- modName, symName
- )
-
- self.__mibNode = mibNode
-
- if isinstance(mibNode, MibTableColumn): # table column
- rowModName, rowSymName, _ = mibViewController.getNodeLocation(
- mibNode.name[:-1]
- )
- rowNode, = mibViewController.mibBuilder.importSymbols(
- rowModName, rowSymName
- )
- self.__indices = rowNode.getIndicesFromInstId(suffix)
- elif isinstance(mibNode, MibScalar): # scalar
- self.__indices = ( rfc1902.ObjectName(suffix), )
- else:
- self.__indices = ( rfc1902.ObjectName(suffix), )
- self.__state |= self.stUnresolved
- self.__state |= self.stClean
- return self
- elif len(self.__args) > 1: # MIB, symbol[, index, index ...]
- self.__modName = self.__args[0]
- if self.__args[1]:
- self.__symName = self.__args[1]
- else:
- mibViewController.mibBuilder.loadModules(self.__modName)
- oid, _, _ = mibViewController.getFirstNodeName(self.__modName)
- _, self.__symName, _ = mibViewController.getNodeLocation(oid)
-
- mibNode, = mibViewController.mibBuilder.importSymbols(
- self.__modName, self.__symName
- )
-
- self.__mibNode = mibNode
-
- self.__indices = ()
- self.__oid = rfc1902.ObjectName(mibNode.getName())
-
- prefix, label, suffix = mibViewController.getNodeNameByOid(
- self.__oid
- )
- self.__label = label
-
- if isinstance(mibNode, MibTableColumn): # table
- rowModName, rowSymName, _ = mibViewController.getNodeLocation(
- mibNode.name[:-1]
- )
- rowNode, = mibViewController.mibBuilder.importSymbols(
- rowModName, rowSymName
- )
- if self.__args[2:]:
- instIds = rowNode.getInstIdFromIndices(*self.__args[2:])
- self.__oid += instIds
- self.__indices = rowNode.getIndicesFromInstId(instIds)
- elif self.__args[2:]: # any other kind of MIB node with indices
- instId = rfc1902.ObjectName(
- '.'.join([ str(x) for x in self.__args[2:] ])
- )
- self.__oid += instId
- self.__indices = ( instId, )
- self.__state |= (self.stClean | self.stOidOnly)
- return self
- else:
- raise PySnmpError('Non-OID, label or MIB symbol')
-
- def prettyPrint(self):
- if self.__state & self.stClean:
- return '%s::%s.%s' % (
- self.__modName, self.__symName,
- '.'.join(['"%s"' % x.prettyPrint() for x in self.__indices ])
- )
- else:
- raise PySnmpError('%s object not fully initialized' % self.__class__.__name__)
-
- def __repr__(self):
- return '%s(%s)' % (self.__class__.__name__, ', '.join([ repr(x) for x in self.__args]))
-
- # Redirect some attrs access to the OID object to behave alike
-
- def __str__(self):
- if self.__state & self.stOidOnly:
- return str(self.__oid)
- else:
- raise PySnmpError('%s object not properly initialized' % self.__class__.__name__)
-
- def __eq__(self, other):
- if self.__state & self.stOidOnly:
- return self.__oid == other
- else:
- raise PySnmpError('%s object not properly initialized' % self.__class__.__name__)
-
- def __ne__(self, other):
- if self.__state & self.stOidOnly:
- return self.__oid != other
- else:
- raise PySnmpError('%s object not properly initialized' % self.__class__.__name__)
-
- def __lt__(self, other):
- if self.__state & self.stOidOnly:
- return self.__oid < other
- else:
- raise PySnmpError('%s object not properly initialized' % self.__class__.__name__)
-
- def __le__(self, other):
- if self.__state & self.stOidOnly:
- return self.__oid <= other
- else:
- raise PySnmpError('%s object not properly initialized' % self.__class__.__name__)
-
- def __gt__(self, other):
- if self.__state & self.stOidOnly:
- return self.__oid > other
- else:
- raise PySnmpError('%s object not properly initialized' % self.__class__.__name__)
-
- def __ge__(self, other):
- if self.__state & self.stOidOnly:
- return self.__oid > other
- else:
- raise PySnmpError('%s object not properly initialized' % self.__class__.__name__)
-
- def __nonzero__(self):
- if self.__state & self.stOidOnly:
- return self.__oid != 0
- else:
- raise PySnmpError('%s object not properly initialized' % self.__class__.__name__)
-
- def __bool__(self):
- if self.__state & self.stOidOnly:
- return bool(self.__oid)
- else:
- raise PySnmpError('%s object not properly initialized' % self.__class__.__name__)
-
- def __getitem__(self, i):
- if self.__state & self.stOidOnly:
- return self.__oid[i]
- else:
- raise PySnmpError('%s object not properly initialized' % self.__class__.__name__)
-
- def __len__(self):
- if self.__state & self.stOidOnly:
- return len(self.__oid)
- else:
- raise PySnmpError('%s object not properly initialized' % self.__class__.__name__)
-
- def __add__(self, other):
- if self.__state & self.stOidOnly:
- return self.__oid + other
- else:
- raise PySnmpError('%s object not properly initialized' % self.__class__.__name__)
-
- def __radd__(self, other):
- if self.__state & self.stOidOnly:
- return other + self.__oid
- else:
- raise PySnmpError('%s object not properly initialized' % self.__class__.__name__)
-
- def __hash__(self):
- if self.__state & self.stOidOnly:
- return hash(self.__oid)
- else:
- raise PySnmpError('%s object not properly initialized' % self.__class__.__name__)
-
- def __getattr__(self, attr):
- if self.__state & self.stOidOnly:
- if attr in ( 'asTuple', 'clone', 'subtype', 'isPrefixOf',
- 'isSameTypeWith', 'isSuperTypeOf'):
- return getattr(self.__oid, attr)
- raise AttributeError
- else:
- raise PySnmpError('%s object not properly initialized for %s access' % (self.__class__.__name__, attr))
+class MibVariable(rfc1902.ObjectIdentity): pass
+class MibVariableBinding(rfc1902.ObjectType): pass
diff --git a/pysnmp/entity/rfc3413/oneliner/ntforg.py b/pysnmp/entity/rfc3413/oneliner/ntforg.py
index c81667d..8fc20d0 100644
--- a/pysnmp/entity/rfc3413/oneliner/ntforg.py
+++ b/pysnmp/entity/rfc3413/oneliner/ntforg.py
@@ -1,12 +1,14 @@
from pyasn1.compat.octets import null
from pysnmp import nextid, error
from pysnmp.entity import engine, config
+from pysnmp.smi.rfc1902 import ObjectIdentity, ObjectType, NotificationType
from pysnmp.entity.rfc3413 import ntforg, context
-from pysnmp.entity.rfc3413.oneliner.mibvar import MibVariable
from pysnmp.entity.rfc3413.oneliner.auth import CommunityData, UsmUserData
from pysnmp.entity.rfc3413.oneliner.target import UdpTransportTarget, \
Udp6TransportTarget, UnixTransportTarget
from pysnmp.entity.rfc3413.oneliner import cmdgen
+# obsolete, compatibility symbols
+from pysnmp.entity.rfc3413.oneliner.mibvar import MibVariable
# Auth protocol
usmHMACMD5AuthProtocol = config.usmHMACMD5AuthProtocol
@@ -21,6 +23,8 @@ usmAesCfb192Protocol = config.usmAesCfb192Protocol
usmAesCfb256Protocol = config.usmAesCfb256Protocol
usmNoPrivProtocol = config.usmNoPrivProtocol
+ContextData = cmdgen.ContextData
+
nextID = nextid.Integer(0xffffffff)
class AsyncNotificationOriginator:
@@ -28,21 +32,18 @@ class AsyncNotificationOriginator:
self.__asyncCmdGen = cmdgen.AsyncCommandGenerator()
def _getCache(self, snmpEngine):
- if 'ntforg' not in snmpEngine.cache:
- snmpEngine.cache['ntforg'] = {
- 'auth': {},
- 'name': {}
- }
- return snmpEngine.cache['ntforg']
+ cache = snmpEngine.getUserContext('ntforg')
+ if cache is None:
+ cache = { 'auth': {}, 'name': {} }
+ snmpEngine.setUserContext(ntforg=cache)
+ return cache
def getMibViewController(self, snmpEngine):
return self.__asyncCmdGen.getMibViewController(snmpEngine)
def cfgNtfOrg(self, snmpEngine, authData, transportTarget, notifyType):
cache = self._getCache(snmpEngine)
- addrName, paramsName = self.__asyncCmdGen.cfgCmdGen(
- snmpEngine, authData, transportTarget
- )
+ addrName, paramsName = self.__asyncCmdGen.cfgCmdGen( snmpEngine, authData, transportTarget )
tagList = transportTarget.tagList.split()
if not tagList:
tagList = ['']
@@ -118,18 +119,30 @@ class AsyncNotificationOriginator:
)
del cache['auth'][authDataKey]
- def makeVarBinds(self, snmpEngine, varBinds, oidOnly=False):
- return self.__asyncCmdGen.makeVarBinds(snmpEngine, varBinds, oidOnly)
+ def makeVarBinds(self, snmpEngine, varBinds):
+ mibViewController = self.getMibViewController(snmpEngine)
+ if isinstance(varBinds, NotificationType):
+ varBinds.resolveWithMib(mibViewController)
+ __varBinds = []
+ for varBind in varBinds:
+ if isinstance(varBind, ObjectType):
+ pass
+ elif isinstance(varBind[0], ObjectIdentity):
+ varBind = ObjectType(*varBind)
+ else:
+ varBind = ObjectType(ObjectIdentity(varBind[0]), varBind[1])
+ __varBinds.append(varBind.resolveWithMib(mibViewController))
+ return __varBinds
def unmakeVarBinds(self, snmpEngine, varBinds, lookupNames, lookupValues):
- return self.__asyncCmdGen.unmakeVarBinds(snmpEngine, varBinds,
- lookupNames, lookupValues)
-
+ if lookupNames or lookupValues:
+ mibViewController = self.getMibViewController(snmpEngine)
+ varBinds = [ ObjectType(ObjectIdentity(x[0]), x[1]).resolveWithMib(mibViewController) for x in varBinds ]
+ return varBinds
+
def sendNotification(self, snmpEngine,
- authData, transportTarget,
- snmpContext, contextName,
+ authData, transportTarget, contextData,
notifyType,
- notificationType, instanceIndex,
varBinds=(),
cbInfo=(None, None),
lookupNames=False, lookupValues=False):
@@ -148,7 +161,7 @@ class AsyncNotificationOriginator:
cbCtx
)
- (cbFun, cbCtx) = cbInfo
+ cbFun, cbCtx = cbInfo
# Create matching transport tags if not given by user
if not transportTarget.tagList:
@@ -161,12 +174,8 @@ class AsyncNotificationOriginator:
notifyName = self.cfgNtfOrg(
snmpEngine, authData, transportTarget, notifyType
)
- if isinstance(notificationType, MibVariable):
- notificationType = notificationType.resolveWithMib(
- self.getMibViewController(snmpEngine), oidOnly=True
- )
- return ntforg.NotificationOriginator().sendVarBinds(snmpEngine, notifyName, snmpContext, contextName, notificationType, instanceIndex, self.makeVarBinds(snmpEngine, varBinds), __cbFun, (lookupNames, lookupValues, cbFun, cbCtx))
+ return ntforg.NotificationOriginator().sendVarBinds(snmpEngine, notifyName, contextData.contextEngineId, contextData.contextName, self.makeVarBinds(snmpEngine, varBinds), __cbFun, (lookupNames, lookupValues, cbFun, cbCtx))
# substitute sendNotification return object for backward compatibility
class ErrorIndicationReturn:
@@ -176,7 +185,9 @@ class ErrorIndicationReturn:
def __bool__(self): return bool(self.__vars[0])
def __str__(self): return str(self.__vars[0])
-# compatibility implementation, never use this class for new applications
+#
+# Compatibility implementation, never use this class for new applications
+#
class AsynNotificationOriginator:
def __init__(self, snmpEngine=None, snmpContext=None):
if snmpEngine is None:
@@ -208,9 +219,10 @@ class AsynNotificationOriginator:
def sendNotification(self, authData, transportTarget,
notifyType, notificationType,
- varBinds=(),
+ varBinds=(), # legacy, use NotificationType instead
cbInfo=(None, None),
lookupNames=False, lookupValues=False,
+ contextEngineId=None, # XXX ordering incompatibility
contextName=null):
def __cbFun(snmpEngine, sendRequestHandle, errorIndication,
@@ -237,16 +249,23 @@ class AsynNotificationOriginator:
if contextName is null and authData.contextName:
contextName = authData.contextName
- # legacy
- if not isinstance(notificationType, MibVariable) and \
- isinstance(notificationType[0], tuple):
- notificationType = MibVariable(notificationType[0][0], notificationType[0][1], *notificationType[1:]).resolveWithMib(self.mibViewController)
+ if not isinstance(notificationType,
+ (ObjectIdentity, ObjectType, NotificationType)):
+ if isinstance(notificationType[0], tuple):
+ # legacy
+ notificationType = ObjectIdentity(notificationType[0][0], notificationType[0][1], *notificationType[1:])
+ else:
+ notificationType = ObjectIdentity(notificationType)
+
+ if not isinstance(notificationType, NotificationType):
+ notificationType = NotificationType(notificationType)
return self.__asyncNtfOrg.sendNotification(
self.snmpEngine,
authData, transportTarget,
- self.snmpContext, contextName,
- notifyType, notificationType, None, varBinds,
+ ContextData(contextEngineId or self.snmpContext.contextEngineId,
+ contextName),
+ notifyType, notificationType.addVarBinds(*varBinds),
(__cbFun, cbInfo),
lookupNames, lookupValues
)
@@ -262,6 +281,8 @@ class NotificationOriginator:
else:
self.__asynNtfOrg = asynNtfOrg
+ # the varBinds parameter is legacy, use NotificationType instead
+
def sendNotification(self, authData, transportTarget, notifyType,
notificationType, *varBinds, **kwargs):
def __cbFun(sendRequestHandle, errorIndication,
@@ -276,6 +297,7 @@ class NotificationOriginator:
varBinds, (__cbFun, appReturn),
kwargs.get('lookupNames', False),
kwargs.get('lookupValues', False),
+ kwargs.get('contextEngineId'),
kwargs.get('contextName', null)
)
self.__asynNtfOrg.snmpEngine.transportDispatcher.runDispatcher()
diff --git a/pysnmp/smi/compiler.py b/pysnmp/smi/compiler.py
index c5fc2d9..dde6cdc 100644
--- a/pysnmp/smi/compiler.py
+++ b/pysnmp/smi/compiler.py
@@ -56,7 +56,7 @@ else:
)
compiler.addBorrowers(
- *[ PyFileBorrower(x) for x in getReadersFromUrls(*borrowers, originalMatching=False, lowcaseMatching=False) ]
+ *[ PyFileBorrower(x) for x in getReadersFromUrls(*borrowers, **dict(originalMatching=False, lowcaseMatching=False)) ]
)
mibBuilder.setMibCompiler(compiler, destination)
diff --git a/pysnmp/smi/rfc1902.py b/pysnmp/smi/rfc1902.py
new file mode 100644
index 0000000..75fa7b9
--- /dev/null
+++ b/pysnmp/smi/rfc1902.py
@@ -0,0 +1,463 @@
+import sys
+from pysnmp.proto import rfc1902, rfc1905
+from pysnmp.proto.api import v2c
+from pysnmp.smi.builder import ZipMibSource
+from pysnmp.smi.compiler import addMibCompiler, defaultDest
+from pysnmp.smi.error import SmiError
+from pyasn1.type.base import AbstractSimpleAsn1Item
+from pyasn1.error import PyAsn1Error
+from pysnmp import debug
+
+#
+# An OID-like object that embeds MIB resolution.
+#
+# Valid initializers include:
+# ObjectIdentity('1.3.6.1.2.1.1.1.0')
+# ObjectIdentity('iso.org.dod.internet.mgmt.mib-2.system.sysDescr.0')
+# ObjectIdentity('SNMPv2-MIB', 'system')
+# ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0)
+# ObjectIdentity('IP-MIB', 'ipAdEntAddr', '127.0.0.1', 123)
+#
+class ObjectIdentity:
+ stDirty, stClean = 1, 2
+
+ def __init__(self, *args):
+ self.__args = args
+ self.__mibSourcesToAdd = self.__modNamesToLoad = None
+ self.__asn1SourcesToAdd = None
+ self.__state = self.stDirty
+
+ #
+ # public API
+ #
+ def getMibSymbol(self):
+ if self.__state & self.stClean:
+ return self.__modName, self.__symName, self.__indices
+ else:
+ raise SmiError('%s object not fully initialized' % self.__class__.__name__)
+
+ def getOid(self):
+ if self.__state & self.stClean:
+ return self.__oid
+ else:
+ raise SmiError('%s object not fully initialized' % self.__class__.__name__)
+
+ def getLabel(self):
+ if self.__state & self.stClean:
+ return self.__label
+ else:
+ raise SmiError('%s object not fully initialized' % self.__class__.__name__)
+
+ def getMibNode(self):
+ if self.__state & self.stClean:
+ return self.__mibNode
+ else:
+ raise SmiError('%s object not fully initialized' % self.__class__.__name__)
+
+ #
+ # A gateway to MIBs manipulation routines
+ #
+
+ def addMibCompiler(self, *asn1Sources, **kwargs):
+ if self.__asn1SourcesToAdd is None:
+ self.__asn1SourcesToAdd = asn1Sources
+ else:
+ self.__asn1SourcesToAdd += asn1Sources
+ self.__mibDir = kwargs.get('destDir', defaultDest)
+ return self
+
+ def addMibSource(self, *mibSources):
+ if self.__mibSourcesToAdd is None:
+ self.__mibSourcesToAdd = mibSources
+ else:
+ self.__mibSourcesToAdd += mibSources
+ return self
+
+ # provides deferred MIBs load
+ def loadMibs(self, *modNames):
+ if self.__modNamesToLoad is None:
+ self.__modNamesToLoad = modNames
+ else:
+ self.__modNamesToLoad += modNames
+ return self
+
+ # this would eventually be called by an entity which posses a
+ # reference to MibViewController
+ def resolveWithMib(self, mibViewController):
+ if self.__mibSourcesToAdd is not None:
+ debug.logger & debug.flagMIB and debug.logger('adding MIB sources %s' % ', '.join(self.__mibSourcesToAdd))
+ mibViewController.mibBuilder.addMibSources(
+ *[ ZipMibSource(x) for x in self.__mibSourcesToAdd ]
+ )
+ self.__mibSourcesToAdd = None
+
+ if self.__asn1SourcesToAdd is not None:
+ debug.logger & debug.flagMIB and debug.logger('adding MIB compiler with source paths %s' % ', '.join(self.__asn1SourcesToAdd))
+ addMibCompiler(
+ mibViewController.mibBuilder,
+ sources=self.__asn1SourcesToAdd,
+ destination=self.__mibDir
+ )
+ self.__asn1SourcesToAdd = self.__mibDir = None
+
+ if self.__modNamesToLoad is not None:
+ debug.logger & debug.flagMIB and debug.logger('loading MIB modules %s' % ', '.join(self.__modNamesToLoad))
+ mibViewController.mibBuilder.loadModules(*self.__modNamesToLoad)
+ self.__modNamesToLoad = None
+
+ if self.__state & self.stClean:
+ return self
+
+ MibScalar, MibTableColumn = mibViewController.mibBuilder.importSymbols('SNMPv2-SMI', 'MibScalar', 'MibTableColumn')
+
+ self.__indices = ()
+
+ if len(self.__args) == 1: # OID or label
+ debug.logger & debug.flagMIB and debug.logger('resolving %s as OID or label' % self.__args)
+ try:
+ self.__oid = rfc1902.ObjectName(self.__args[0])
+ except PyAsn1Error:
+ try:
+ label = tuple(self.__args[0].split('.'))
+ except ValueError:
+ raise SmiError('Bad OID format %r' % (self.__args[0],))
+ prefix, label, suffix = mibViewController.getNodeNameByOid(
+ label
+ )
+
+ if suffix:
+ try:
+ suffix = tuple([ int(x) for x in suffix ])
+ except ValueError:
+ raise SmiError('Unknown object name component %r' % (suffix,))
+ self.__oid = rfc1902.ObjectName(prefix + suffix)
+ else:
+ prefix, label, suffix = mibViewController.getNodeNameByOid(
+ self.__oid
+ )
+
+ debug.logger & debug.flagMIB and debug.logger('resolved %r into prefix %r and suffix %r' % (self.__args, prefix, suffix))
+
+ modName, symName, _ = mibViewController.getNodeLocation(prefix)
+
+ self.__modName = modName
+ self.__symName = symName
+
+ self.__label = label
+
+ mibNode, = mibViewController.mibBuilder.importSymbols(
+ modName, symName
+ )
+
+ self.__mibNode = mibNode
+
+ debug.logger & debug.flagMIB and debug.logger('resolved prefix %r into MIB node %r' % (prefix, mibNode))
+
+ if isinstance(mibNode, MibTableColumn): # table column
+ if suffix:
+ rowModName, rowSymName, _ = mibViewController.getNodeLocation(
+ mibNode.name[:-1]
+ )
+ rowNode, = mibViewController.mibBuilder.importSymbols(
+ rowModName, rowSymName
+ )
+ self.__indices = rowNode.getIndicesFromInstId(suffix)
+ elif isinstance(mibNode, MibScalar): # scalar
+ if suffix:
+ self.__indices = ( rfc1902.ObjectName(suffix), )
+ else:
+ if suffix:
+ self.__indices = ( rfc1902.ObjectName(suffix), )
+ self.__state |= self.stClean
+
+ debug.logger & debug.flagMIB and debug.logger('resolved indices are %r' % (self.__indices,))
+
+ return self
+ elif len(self.__args) > 1: # MIB, symbol[, index, index ...]
+ self.__modName = self.__args[0]
+ if self.__args[1]:
+ self.__symName = self.__args[1]
+ else:
+ mibViewController.mibBuilder.loadModules(self.__modName)
+ oid, _, _ = mibViewController.getFirstNodeName(self.__modName)
+ _, self.__symName, _ = mibViewController.getNodeLocation(oid)
+
+ mibNode, = mibViewController.mibBuilder.importSymbols(
+ self.__modName, self.__symName
+ )
+
+ self.__mibNode = mibNode
+
+ self.__oid = rfc1902.ObjectName(mibNode.getName())
+
+ prefix, label, suffix = mibViewController.getNodeNameByOid(
+ self.__oid
+ )
+ self.__label = label
+
+ debug.logger & debug.flagMIB and debug.logger('resolved %r into prefix %r and suffix %r' % (self.__args, prefix, suffix))
+
+ if isinstance(mibNode, MibTableColumn): # table
+ rowModName, rowSymName, _ = mibViewController.getNodeLocation(
+ mibNode.name[:-1]
+ )
+ rowNode, = mibViewController.mibBuilder.importSymbols(
+ rowModName, rowSymName
+ )
+ if self.__args[2:]:
+ try:
+ instIds = rowNode.getInstIdFromIndices(*self.__args[2:])
+ self.__oid += instIds
+ self.__indices = rowNode.getIndicesFromInstId(instIds)
+ except PyAsn1Error:
+ raise SmiError('Instance index %r to OID convertion failure at object %r: %s' % (self.__args[2:], mibNode.getLabel(), sys.exc_info()[1]))
+ elif self.__args[2:]: # any other kind of MIB node with indices
+ if self.__args[2:]:
+ instId = rfc1902.ObjectName(
+ '.'.join([ str(x) for x in self.__args[2:] ])
+ )
+ self.__oid += instId
+ self.__indices = ( instId, )
+ self.__state |= self.stClean
+
+ debug.logger & debug.flagMIB and debug.logger('resolved indices are %r' % (self.__indices,))
+
+ return self
+ else:
+ raise SmiError('Non-OID, label or MIB symbol')
+
+ def prettyPrint(self):
+ if self.__state & self.stClean:
+ return '%s::%s%s%s' % (
+ self.__modName, self.__symName,
+ self.__indices and '.' or '',
+ '.'.join(['"%s"' % x.prettyPrint() for x in self.__indices ])
+ )
+ else:
+ raise SmiError('%s object not fully initialized' % self.__class__.__name__)
+
+ def __repr__(self):
+ return '%s(%s)' % (self.__class__.__name__, ', '.join([ repr(x) for x in self.__args]))
+
+ # Redirect some attrs access to the OID object to behave alike
+
+ def __str__(self):
+ if self.__state & self.stClean:
+ return str(self.__oid)
+ else:
+ raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+
+ def __eq__(self, other):
+ if self.__state & self.stClean:
+ return self.__oid == other
+ else:
+ raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+
+ def __ne__(self, other):
+ if self.__state & self.stClean:
+ return self.__oid != other
+ else:
+ raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+
+ def __lt__(self, other):
+ if self.__state & self.stClean:
+ return self.__oid < other
+ else:
+ raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+
+ def __le__(self, other):
+ if self.__state & self.stClean:
+ return self.__oid <= other
+ else:
+ raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+
+ def __gt__(self, other):
+ if self.__state & self.stClean:
+ return self.__oid > other
+ else:
+ raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+
+ def __ge__(self, other):
+ if self.__state & self.stClean:
+ return self.__oid > other
+ else:
+ raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+
+ def __nonzero__(self):
+ if self.__state & self.stClean:
+ return self.__oid != 0
+ else:
+ raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+
+ def __bool__(self):
+ if self.__state & self.stClean:
+ return bool(self.__oid)
+ else:
+ raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+
+ def __getitem__(self, i):
+ if self.__state & self.stClean:
+ return self.__oid[i]
+ else:
+ raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+
+ def __len__(self):
+ if self.__state & self.stClean:
+ return len(self.__oid)
+ else:
+ raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+
+ def __add__(self, other):
+ if self.__state & self.stClean:
+ return self.__oid + other
+ else:
+ raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+
+ def __radd__(self, other):
+ if self.__state & self.stClean:
+ return other + self.__oid
+ else:
+ raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+
+ def __hash__(self):
+ if self.__state & self.stClean:
+ return hash(self.__oid)
+ else:
+ raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+
+ def __getattr__(self, attr):
+ if self.__state & self.stClean:
+ if attr in ( 'asTuple', 'clone', 'subtype', 'isPrefixOf',
+ 'isSameTypeWith', 'isSuperTypeOf'):
+ return getattr(self.__oid, attr)
+ raise AttributeError
+ else:
+ raise SmiError('%s object not properly initialized for accessing %s' % (self.__class__.__name__, attr))
+
+# A two-element sequence of ObjectIdentity and SNMP data type object
+class ObjectType:
+ stDirty, stClean = 1, 2
+ def __init__(self, objectIdentity, objectSyntax=rfc1905.unSpecified):
+ if not isinstance(objectIdentity, ObjectIdentity):
+ raise SmiError('initializer should be ObjectIdentity instance, not %r' % (objectIdentity,))
+ self.__args = [ objectIdentity, objectSyntax ]
+ self.__state = self.stDirty
+
+ def __getitem__(self, i):
+ if self.__state & self.stClean:
+ return self.__args[i]
+ else:
+ raise SmiError('%s object not fully initialized' % self.__class__.__name__)
+
+ def __repr__(self):
+ return '%s(%s)' % (self.__class__.__name__, ', '.join([ repr(x) for x in self.__args]))
+
+ def resolveWithMib(self, mibViewController):
+ if self.__state & self.stClean:
+ return self
+
+ self.__args[0].resolveWithMib(mibViewController)
+
+ MibScalar, MibTableColumn = mibViewController.mibBuilder.importSymbols('SNMPv2-SMI', 'MibScalar', 'MibTableColumn')
+
+ if not isinstance(self.__args[0].getMibNode(),
+ (MibScalar, MibTableColumn)):
+ if not isinstance(self.__args[1], AbstractSimpleAsn1Item):
+ raise SmiError('MIB object %r is not OBJECT-TYPE (MIB not loaded?)' % (self.__args[0],))
+ self.__state |= self.stClean
+ return self
+
+ if isinstance(self.__args[1], (rfc1905.UnSpecified,
+ rfc1905.NoSuchObject,
+ rfc1905.NoSuchInstance,
+ rfc1905.EndOfMibView)):
+ self.__state |= self.stClean
+ return self
+
+ try:
+ self.__args[1] = self.__args[0].getMibNode().getSyntax().clone(self.__args[1])
+ except PyAsn1Error:
+ raise SmiError('Value %r to type %r convertion failure: %s' % (self.__args[1], self.__args[0].getMibNode().getSyntax().__class__.__name__, sys.exc_info()[1]))
+
+ self.__state |= self.stClean
+
+ debug.logger & debug.flagMIB and debug.logger('resolved %r syntax is %r' % (self.__args[0], self.__args[1]))
+
+ return self
+
+ def prettyPrint(self):
+ if self.__state & self.stClean:
+ return '%s = %s' % (self.__args[0].prettyPrint(),
+ self.__args[1].prettyPrint())
+ else:
+ raise SmiError('%s object not fully initialized' % self.__class__.__name__)
+
+# A sequence of ObjectType's
+class NotificationType:
+ stDirty, stClean = 1, 2
+ def __init__(self, objectIdentity, instanceIndex=(), objects={}):
+ if not isinstance(objectIdentity, ObjectIdentity):
+ raise SmiError('initializer should be ObjectIdentity instance, not %r' % (objectIdentity,))
+ self.__objectIdentity = objectIdentity
+ self.__instanceIndex = instanceIndex
+ self.__objects = objects
+ self.__varBinds = []
+ self.__additionalVarBinds = []
+ self.__state = self.stDirty
+
+ def __getitem__(self, i):
+ if self.__state & self.stClean:
+ return self.__varBinds[i]
+ else:
+ raise SmiError('%s object not fully initialized' % self.__class__.__name__)
+
+ def __repr__(self):
+ return '%s(%r, %r, %r)' % (self.__class__.__name__, self.__objectIdentity, self.__instanceIndex, self.__objects)
+
+ def addVarBinds(self, *varBinds):
+ debug.logger & debug.flagMIB and debug.logger('additional var-binds: %r' % (varBinds,))
+ if self.__state & self.stClean:
+ self.__varBinds.extend(varBinds)
+ else:
+ self.__additionalVarBinds.extend(varBinds)
+ return self
+
+ def resolveWithMib(self, mibViewController):
+ if self.__state & self.stClean:
+ return self
+
+ self.__objectIdentity.resolveWithMib(mibViewController)
+
+ self.__varBinds.append(
+ ObjectType(ObjectIdentity(v2c.apiTrapPDU.snmpTrapOID),
+ self.__objectIdentity).resolveWithMib(mibViewController)
+ )
+
+ NotificationType, = mibViewController.mibBuilder.importSymbols('SNMPv2-SMI', 'NotificationType')
+
+ mibNode = self.__objectIdentity.getMibNode()
+
+ if isinstance(mibNode, NotificationType):
+ for notificationObject in mibNode.getObjects():
+ objectIdentity = ObjectIdentity(*notificationObject+self.__instanceIndex).resolveWithMib(mibViewController)
+ self.__varBinds.append(
+ ObjectType(objectIdentity, self.__objects.get(notificationObject, rfc1905.unSpecified)).resolveWithMib(mibViewController)
+ )
+ else:
+ debug.logger & debug.flagMIB and debug.logger('WARNING: MIB object %r is not NOTIFICATION-TYPE (MIB not loaded?)' % (self.__objectIdentity,))
+
+ if self.__additionalVarBinds:
+ self.__varBinds.extend(self.__additionalVarBinds)
+ self.__additionalVarBinds = []
+
+ self.__state |= self.stClean
+
+ debug.logger & debug.flagMIB and debug.logger('resolved %r into %r' % (self.__objectIdentity, self.__varBinds))
+
+ return self
+
+ def prettyPrint(self):
+ if self.__state & self.stClean:
+ return ' '.join([ '%s = %s' % (x[0].prettyPrint(), x[1].prettyPrint()) for x in self.__varBinds])
+ else:
+ raise SmiError('%s object not fully initialized' % self.__class__.__name__)