diff options
author | elie <elie> | 2012-08-11 21:58:56 +0000 |
---|---|---|
committer | elie <elie> | 2012-08-11 21:58:56 +0000 |
commit | 30362e91d32ae26142cdac9d3836edee70c1a4af (patch) | |
tree | bc1b076ffdc94d61882f3ecf9f4ff366f5717475 | |
parent | c66a0317f8fb8128dacf8e2eebda5e093bd7fc66 (diff) | |
download | pysnmp-30362e91d32ae26142cdac9d3836edee70c1a4af.tar.gz |
converted to MibVariable-based parameters passing
-rw-r--r-- | CHANGES | 7 | ||||
-rw-r--r-- | examples/v3arch/oneliner/manager/bulkgen.py | 6 | ||||
-rw-r--r-- | examples/v3arch/oneliner/manager/getgen.py | 6 | ||||
-rw-r--r-- | examples/v3arch/oneliner/manager/nextgen.py | 10 | ||||
-rw-r--r-- | examples/v3arch/oneliner/manager/setgen.py | 7 | ||||
-rw-r--r-- | pysnmp/entity/rfc3413/oneliner/cmdgen.py | 80 | ||||
-rw-r--r-- | pysnmp/entity/rfc3413/oneliner/mibvar.py | 292 | ||||
-rw-r--r-- | pysnmp/entity/rfc3413/oneliner/ntforg.py | 43 |
8 files changed, 362 insertions, 89 deletions
@@ -6,7 +6,12 @@ Revision 4.2.3 configured for the user in question. That allowed unauthenticated/unciphered access to pysnmp-based Agent even if USM user is configured to provide one. - Oneliner CommandGenerator now supports optional keyword args lookupNames, - lookupValues that enable response OID / value lookup at MIB. + lookupValues that enable response OID / value looked up at MIB and reported + as a MibVariable container object carrying relevant MIB info. +- Oneliner CommandGenerator now supports symbolic MIB object names to be + passed within a MibVariable container object which would do a deferred + MIB lookup for name resolution. This is a new and preferred API which + obsoletes the tuple-based one (it is still suppored though). - Oneliner CommandGenerator's class attributes lexicographicMode, maxRows and ignoreNonIncreasingOid moved to optional keyword args of nextGen() and bulkGen() methods. diff --git a/examples/v3arch/oneliner/manager/bulkgen.py b/examples/v3arch/oneliner/manager/bulkgen.py index 88cd4c9..829203b 100644 --- a/examples/v3arch/oneliner/manager/bulkgen.py +++ b/examples/v3arch/oneliner/manager/bulkgen.py @@ -65,9 +65,7 @@ else: else: for varBindTableRow in varBindTable: for name, val in varBindTableRow: - (modName, symName), indices = name - indices = '.'.join(['"%s"' % x.prettyPrint() for x in indices ]) - print('%s::%s.%s = %s' % (modName, symName, indices, val.prettyPrint())) + print('%s = %s' % (name.prettyPrint(), val.prettyPrint())) # Send a series of SNMP GETBULK requests @@ -100,5 +98,5 @@ else: else: for varBindTableRow in varBindTable: for name, val in varBindTableRow: - print('%s = %s' % (name, val.prettyPrint())) + print('%s = %s' % (name.prettyPrint(), val.prettyPrint())) diff --git a/examples/v3arch/oneliner/manager/getgen.py b/examples/v3arch/oneliner/manager/getgen.py index 30437d9..10d3997 100644 --- a/examples/v3arch/oneliner/manager/getgen.py +++ b/examples/v3arch/oneliner/manager/getgen.py @@ -38,7 +38,7 @@ else: errorIndication, errorStatus, errorIndex, varBinds = cmdGen.getCmd( cmdgen.CommunityData('public', mpModel=0), cmdgen.UdpTransportTarget(('localhost', 161)), - ('iso', 'org', 'dod', 'internet', 'mgmt', 'mib-2', 'system', 'sysDescr', 0), + cmdgen.MibVariable('iso.org.dod.internet.mgmt.mib-2.system.sysDescr.0'), (('SNMPv2-MIB', 'sysDescr'), 0) ) @@ -137,9 +137,7 @@ else: ) else: for name, val in varBinds: - (modName, symName), indices = name - indices = '.'.join(['"%s"' % x.prettyPrint() for x in indices ]) - print('%s::%s.%s = %s' % (modName, symName, indices, val.prettyPrint())) + print('%s = %s' % (name.prettyPrint(), val.prettyPrint())) # Send SNMP GET request diff --git a/examples/v3arch/oneliner/manager/nextgen.py b/examples/v3arch/oneliner/manager/nextgen.py index c4eea8f..097b695 100644 --- a/examples/v3arch/oneliner/manager/nextgen.py +++ b/examples/v3arch/oneliner/manager/nextgen.py @@ -90,9 +90,7 @@ else: else: for varBindTableRow in varBindTable: for name, val in varBindTableRow: - (modName, symName), indices = name - indices = '.'.join(['"%s"' % x.prettyPrint() for x in indices ]) - print('%s::%s.%s = %s' % (modName, symName, indices, val.prettyPrint())) + print('%s = %s' % (name.prettyPrint(), val.prettyPrint())) # Send a series of SNMP GETNEXT requests @@ -123,7 +121,7 @@ else: else: for varBindTableRow in varBindTable: for name, val in varBindTableRow: - print('%s = %s' % (name, val.prettyPrint())) + print('%s = %s' % (name.prettyPrint(), val.prettyPrint())) # Send a series of SNMP GETNEXT requests @@ -141,7 +139,7 @@ errorIndication, errorStatus, errorIndex, varBindTable = cmdGen.nextCmd( privProtocol=cmdgen.usmAesCfb128Protocol), cmdgen.UdpTransportTarget(('localhost', 161)), (('IF-MIB', ''),), - lexicographicMode=True, maxRows=20, + lexicographicMode=True, #maxRows=20, ignoreNonIncreasingOid=True ) @@ -157,5 +155,5 @@ else: else: for varBindTableRow in varBindTable: for name, val in varBindTableRow: - print('%s = %s' % (name, val.prettyPrint())) + print('%s = %s' % (name.prettyPrint(), val.prettyPrint())) diff --git a/examples/v3arch/oneliner/manager/setgen.py b/examples/v3arch/oneliner/manager/setgen.py index cfedd1a..0f5f023 100644 --- a/examples/v3arch/oneliner/manager/setgen.py +++ b/examples/v3arch/oneliner/manager/setgen.py @@ -39,6 +39,7 @@ errorIndication, errorStatus, errorIndex, varBinds = cmdGen.setCmd( cmdgen.CommunityData('public'), cmdgen.UdpTransportTarget(('localhost', 161)), ('1.3.6.1.2.1.1.2.0', rfc1902.ObjectName('1.3.6.1.4.1.20408.1.1')), + ('1.3.6.1.2.1.1.2.0', '1.3.6.1.4.1.20408.1.1'), ('1.3.6.1.2.1.1.5.0', rfc1902.OctetString('new system name')) ) @@ -82,8 +83,4 @@ else: ) else: for name, val in varBinds: - (modName, symName), indices = name - indices = '.'.join([x.prettyPrint() for x in indices ]) - print('%s::%s.%s = %s' % (modName, symName, indices, val.prettyPrint())) - - + print('%s = %s' % (name.prettyPrint(), val.prettyPrint())) diff --git a/pysnmp/entity/rfc3413/oneliner/cmdgen.py b/pysnmp/entity/rfc3413/oneliner/cmdgen.py index dfff898..4398730 100644 --- a/pysnmp/entity/rfc3413/oneliner/cmdgen.py +++ b/pysnmp/entity/rfc3413/oneliner/cmdgen.py @@ -1,6 +1,7 @@ import socket, sys from pysnmp.entity import engine, config -from pysnmp.entity.rfc3413 import cmdgen, mibvar +from pysnmp.entity.rfc3413 import cmdgen +from pysnmp.entity.rfc3413.oneliner.mibvar import MibVariable from pysnmp.carrier.asynsock.dgram import udp, udp6, unix from pysnmp.proto import rfc1905, errind from pysnmp.smi import view @@ -298,39 +299,32 @@ class AsynCommandGenerator: ) self.__knownTransportAddrs.clear() - if sys.version_info[0] <= 2: - intTypes = (int, long) - else: - intTypes = (int,) - def makeReadVarBinds(self, varNames): varBinds = [] for varName in varNames: - if isinstance(varName[0], self.intTypes): - name = varName + if isinstance(varName, MibVariable): + varName.resolveWithMib( + self.mibViewController, oidOnly=True + ) + elif isinstance(varName[0], tuple): # legacy + varName = MibVariable(varName[0][0], varName[0][1], *varName[1:]).resolveWithMib(self.mibViewController) else: - name, oid = mibvar.mibNameToOid( - self.mibViewController, varName - ) - name = name + oid - varBinds.append((name, self._null)) + varName = MibVariable(varName).resolveWithMib(self.mibViewController, oidOnly=True) + varBinds.append((varName, self._null)) return varBinds def unmakeVarBinds(self, varBinds, lookupNames=True, lookupValues=True): _varBinds = [] for name, value in varBinds: - (symName, modName), indices = mibvar.oidToMibName( - self.mibViewController, name - ) + if lookupNames or lookupValues: + varName = MibVariable(name).resolveWithMib(self.mibViewController) if lookupNames: - name = (modName, symName), indices + name = varName if lookupValues: if value.tagSet not in (rfc1905.NoSuchObject.tagSet, rfc1905.NoSuchInstance.tagSet, rfc1905.EndOfMibView.tagSet): - value = mibvar.cloneFromMibValue( - self.mibViewController, modName, symName, value - ) + value = varName.getMibNode().getSyntax().clone(value) _varBinds.append((name, value)) return _varBinds @@ -356,22 +350,22 @@ class AsynCommandGenerator: ) __varBinds = [] for varName, varVal in varBinds: - name, oid = mibvar.mibNameToOid( - self.mibViewController, varName - ) - if not isinstance(varVal, base.AbstractSimpleAsn1Item): - ((symName, modName), suffix) = mibvar.oidToMibName( - self.mibViewController, name + oid - ) - syntax = mibvar.cloneFromMibValue( - self.mibViewController, modName, symName, varVal - ) - if syntax is None: - raise error.PySnmpError( - 'Value type MIB lookup failed for %r' % (varName,) - ) - varVal = syntax.clone(varVal) - __varBinds.append((name + oid, varVal)) + if isinstance(varName, MibVariable): + varName.resolveWithMib(self.mibViewController) + if not isinstance(varVal, base.AbstractSimpleAsn1Item): + varVal = varName.getMibNode().getSyntax().clone(varVal) + elif isinstance(varName[0], tuple): # legacy + varName = MibVariable(varName[0][0], varName[0][1], *varName[1:]).resolveWithMib(self.mibViewController) + if not isinstance(varVal, base.AbstractSimpleAsn1Item): + varVal = varName.getMibNode().getSyntax().clone(varVal) + else: + if isinstance(varVal, base.AbstractSimpleAsn1Item): + varName = MibVariable(varName).resolveWithMib(self.mibViewController, oidOnly=True) + else: + varName = MibVariable(varName).resolveWithMib(self.mibViewController) + varVal = varName.getMibNode().getSyntax().clone(varVal) + + __varBinds.append((varName, varVal)) return cmdgen.SetCommandGenerator().sendReq( self.snmpEngine, addrName, __varBinds, cbFun, cbCtx, authData.contextEngineId, authData.contextName @@ -541,12 +535,7 @@ class CommandGenerator: maxRows = kwargs.get('maxRows', 0) ignoreNonIncreasingOid = kwargs.get('ignoreNonIncreasingOid', False) - varBindHead = [] - for varName in varNames: - name, suffix = mibvar.mibNameToOid( - self.__asynCmdGen.mibViewController, varName - ) - varBindHead.append(univ.ObjectIdentifier(name + suffix)) + varBindHead = [ univ.ObjectIdentifier(x[0]) for x in self.__asynCmdGen.makeReadVarBinds(varNames) ] appReturn = {} self.__asynCmdGen.nextCmd( @@ -640,12 +629,7 @@ class CommandGenerator: maxRows = kwargs.get('maxRows', 0) ignoreNonIncreasingOid = kwargs.get('ignoreNonIncreasingOid', False) - varBindHead = [] - for varName in varNames: - name, suffix = mibvar.mibNameToOid( - self.__asynCmdGen.mibViewController, varName - ) - varBindHead.append(univ.ObjectIdentifier(name + suffix)) + varBindHead = [ univ.ObjectIdentifier(x[0]) for x in self.__asynCmdGen.makeReadVarBinds(varNames) ] appReturn = {} diff --git a/pysnmp/entity/rfc3413/oneliner/mibvar.py b/pysnmp/entity/rfc3413/oneliner/mibvar.py new file mode 100644 index 0000000..531f011 --- /dev/null +++ b/pysnmp/entity/rfc3413/oneliner/mibvar.py @@ -0,0 +1,292 @@ +import sys +from pysnmp.proto import rfc1902 +from pysnmp.error import PySnmpError +from pyasn1.error import PyAsn1Error + +# +# An OID-like object that embeds MIB resolution. +# +# Valid initializers include: +# MibVariable('1.3.6.1.2.1.1.1.0'), +# MibVariable('iso.org.dod.internet.mgmt.mib-2.system.sysDescr.0') +# MibVariable('SNMPv2-MIB', 'system'), +# MibVariable('SNMPv2-MIB', 'sysDescr', 0), +# MibVariable('IP-MIB', 'ipAdEntAddr', '127.0.0.1', 123), +# + +class MibVariable: + stDirty, stOidOnly, stClean, stUnresolved = 1, 2, 4, 8 + + def __init__(self, *args): + self.__args = args + self.__modNamesToLoad = None + self.__state = self.stDirty + + def getMibSymbol(self): + if self.__state & self.stClean: + return self.__modName, self.__symName, self.__indices + else: + raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) + + def getOid(self): + if self.__state & (self.stOidOnly | self.stClean): + return self.__oid + else: + raise PySnmpError('%s object not properly initialized' % self.__class__.__name__) + + def getLabel(self): + if self.__state & self.stClean: + return self.__label + else: + raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) + + def getMibNode(self): # XXX + if self.__state & self.stClean: + return self.__mibNode + else: + raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) + + def isFullyResolved(self): + return not (self.__state & self.stUnresolved) + + def loadMibs(self, *modNames): + self.__modNamesToLoad = modNames + return self + + def resolveWithMib(self, mibViewController, oidOnly=False): + if self.__modNamesToLoad is not None: + mibViewController.mibBuilder.loadModules(*self.__modNamesToLoad) + self.__modNamesToLoad = None + + if self.__state & (self.stOidOnly | self.stClean): + return self + + MibScalar, MibTableColumn, = mibViewController.mibBuilder.importSymbols( + 'SNMPv2-SMI', 'MibScalar', 'MibTableColumn' + ) + + if len(self.__args) == 1: # OID or label + try: + self.__oid = rfc1902.ObjectName(self.__args[0]) + except PyAsn1Error: + try: + label = tuple(self.__args[0].split('.')) + except ValueError: + raise PySnmpError('Bad OID format %s' % (self.__args[0],)) + prefix, label, suffix = mibViewController.getNodeNameByOid( + label + ) + + if suffix: + try: + suffix = tuple([ int(x) for x in suffix ]) + except ValueError: + raise PySnmpError('Unknown object name component %s' % (suffix,)) + + self.__oid = rfc1902.ObjectName(prefix + suffix) + + self.__state |= self.stOidOnly + + if oidOnly: + return self + else: + self.__state |= self.stOidOnly + + if oidOnly: + return self + + prefix, label, suffix = mibViewController.getNodeNameByOid( + self.__oid + ) + + modName, symName, _ = mibViewController.getNodeLocation(prefix) + + self.__modName = modName + self.__symName = symName + + self.__label = label + + mibNode, = mibViewController.mibBuilder.importSymbols( + modName, symName + ) + + self.__mibNode = mibNode + + if isinstance(mibNode, MibTableColumn): # table column + rowModName, rowSymName, _ = mibViewController.getNodeLocation( + mibNode.name[:-1] + ) + rowNode, = mibViewController.mibBuilder.importSymbols( + rowModName, rowSymName + ) + self.__indices = rowNode.getIndicesFromInstId(suffix) + elif isinstance(mibNode, MibScalar): # scalar + self.__indices = ( rfc1902.ObjectName(suffix), ) + else: + self.__indices = ( rfc1902.ObjectName(suffix), ) + if suffix: + self.__state |= self.stUnresolved + self.__state |= self.stClean + return self + elif len(self.__args) > 1: # MIB, symbol[, index, index ...] + self.__modName = self.__args[0] + if self.__args[1]: + self.__symName = self.__args[1] + else: + oid, _, _ = mibViewController.getFirstNodeName(self.__args[0]) + _, self.__symName, _ = mibViewController.getNodeLocation(oid) + + mibNode, = mibViewController.mibBuilder.importSymbols( + self.__modName, self.__symName + ) + + self.__mibNode = mibNode + + self.__indices = () + self.__oid = rfc1902.ObjectName(mibNode.getName()) + + prefix, label, suffix = mibViewController.getNodeNameByOid( + self.__oid + ) + self.__label = label + + if isinstance(mibNode, MibTableColumn): # table + rowModName, rowSymName, _ = mibViewController.getNodeLocation( + mibNode.name[:-1] + ) + rowNode, = mibViewController.mibBuilder.importSymbols( + rowModName, rowSymName + ) + if self.__args[2:]: + instIds = rowNode.getInstIdFromIndices(*self.__args[2:]) + self.__oid += instIds + self.__indices = rowNode.getIndicesFromInstId(instIds) + elif self.__args[2:]: # any other kind of MIB node with indices + instId = rfc1902.ObjectName( + '.'.join([ str(x) for x in self.__args[2:] ]) + ) + self.__oid += instId + self.__indices = ( instId, ) + self.__state |= (self.stClean | self.stOidOnly) + return self + else: + raise PySnmpError('Non-OID, label or MIB symbol') + + def prettyPrint(self): + if self.__state & self.stClean: + return '%s::%s.%s' % ( + self.__modName, self.__symName, + '.'.join(['"%s"' % x.prettyPrint() for x in self.__indices ]) + ) + else: + raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) + + def __repr__(self): + return '%s(%s)' % (self.__class__.__name__, ', '.join([ repr(x) for x in self.__args])) + + # Redirect some attrs access to the OID object to behave alike + + def __str__(self): + if self.__state & self.stOidOnly: + return str(self.__oid) + else: + raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) + + def __getitem__(self, i): + if self.__state & self.stOidOnly: + return self.__oid[i] + else: + raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) + + def __eq__(self, other): + if self.__state & self.stOidOnly: + return self.__oid == other + else: + raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) + + def __ne__(self, other): + if self.__state & self.stOidOnly: + return self.__oid != other + else: + raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) + + def __lt__(self, other): + if self.__state & self.stOidOnly: + return self.__oid < other + else: + raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) + + def __le__(self, other): + if self.__state & self.stOidOnly: + return self.__oid <= other + else: + raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) + + def __gt__(self, other): + if self.__state & self.stOidOnly: + return self.__oid > other + else: + raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) + + def __ge__(self, other): + if self.__state & self.stOidOnly: + return self.__oid >= other + else: + raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) + + if sys.version_info[0] <= 2: + def __nonzero__(self): + if self.__state & self.stOidOnly: + return bool(self.__oid) + else: + raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) + else: + def __bool__(self): + if self.__state & self.stOidOnly: + return bool(self.__oid) + else: + raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) + + def __hash__(self): + if self.__state & self.stOidOnly: + return hash(self.__oid) + else: + raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) + + def __len__(self): + if self.__state & self.stOidOnly: + return len(self.__oid) + else: + raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) + + def __index__(self, i): + if self.__state & self.stOidOnly: + return self.__oid[i] + else: + raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) + + def asTuple(self): + if self.__state & self.stOidOnly: + return self.__oid.asTuple() + else: + raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) + + def clone(self, *args): + if self.__state & self.stOidOnly: + return self.__oid.clone(*args) + else: + raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) + + def subtype(self, *args): + if self.__state & self.stOidOnly: + return self.__oid.subtype(*args) + else: + raise PySnmpError('%s object not fully initialized' % self.__class__.__name__) + + def isPrefixOf(self, *args): + if self.__state & self.stOidOnly: + return self.__oid.isPrefixOf(*args) + else: + raise PySnmpError('%s object not properly initialized' % self.__class__.__name__) + + diff --git a/pysnmp/entity/rfc3413/oneliner/ntforg.py b/pysnmp/entity/rfc3413/oneliner/ntforg.py index f9878cf..3572ac1 100644 --- a/pysnmp/entity/rfc3413/oneliner/ntforg.py +++ b/pysnmp/entity/rfc3413/oneliner/ntforg.py @@ -1,7 +1,8 @@ from pyasn1.type import base from pysnmp import nextid from pysnmp.entity import config -from pysnmp.entity.rfc3413 import ntforg, context, mibvar +from pysnmp.entity.rfc3413 import ntforg, context +from pysnmp.entity.rfc3413.oneliner.mibvar import MibVariable from pysnmp.entity.rfc3413.oneliner import cmdgen # Auth protocol @@ -99,29 +100,29 @@ class AsynNotificationOriginator(cmdgen.AsynCommandGenerator): (cbFun, cbCtx) = cbInfo notifyName = self.cfgNtfOrg(authData, transportTarget, notifyType) if notificationType: - name, oid = mibvar.mibNameToOid( - self.mibViewController, notificationType - ) - notificationType = name + oid + if isinstance(notificationType, MibVariable): + notificationType = notificationType.resolveWithMib(self.mibViewController, oidOnly=True) + elif isinstance(notificationType[0], tuple): # legacy + notificationType = MibVariable(notificationType[0][0], notificationType[0][1], *notificationType[1:]).resolveWithMib(self.mibViewController) if varBinds: __varBinds = [] for varName, varVal in varBinds: - name, oid = mibvar.mibNameToOid( - self.mibViewController, varName - ) - if not isinstance(varVal, base.Asn1ItemBase): - ((symName, modName), suffix) = mibvar.oidToMibName( - self.mibViewController, name + oid - ) - syntax = mibvar.cloneFromMibValue( - self.mibViewController, modName, symName, varVal - ) - if syntax is None: - raise error.PySnmpError( - 'Value type MIB lookup failed for %r' % (varName,) - ) - varVal = syntax.clone(varVal) - __varBinds.append((name + oid, varVal)) + if isinstance(varName, MibVariable): + varName.resolveWithMib(self.mibViewController) + if not isinstance(varVal, base.AbstractSimpleAsn1Item): + varVal = varName.getMibNode().getSyntax().clone(varVal) + elif isinstance(varName[0], tuple): # legacy + varName = MibVariable(varName[0][0], varName[0][1], *varName[1:]).resolveWithMib(self.mibViewController) + if not isinstance(varVal, base.AbstractSimpleAsn1Item): + varVal = varName.getMibNode().getSyntax().clone(varVal) + else: + if isinstance(varVal, base.AbstractSimpleAsn1Item): + varName = MibVariable(varName).resolveWithMib(self.mibViewController, oidOnly=True) + else: + varName = MibVariable(varName).resolveWithMib(self.mibViewController) + varVal = varName.getMibNode().getSyntax().clone(varVal) + + __varBinds.append((varName, varVal)) else: __varBinds = None |