summaryrefslogtreecommitdiff
path: root/pysnmp/smi/rfc1902.py
diff options
context:
space:
mode:
Diffstat (limited to 'pysnmp/smi/rfc1902.py')
-rw-r--r--pysnmp/smi/rfc1902.py463
1 files changed, 463 insertions, 0 deletions
diff --git a/pysnmp/smi/rfc1902.py b/pysnmp/smi/rfc1902.py
new file mode 100644
index 0000000..75fa7b9
--- /dev/null
+++ b/pysnmp/smi/rfc1902.py
@@ -0,0 +1,463 @@
+import sys
+from pysnmp.proto import rfc1902, rfc1905
+from pysnmp.proto.api import v2c
+from pysnmp.smi.builder import ZipMibSource
+from pysnmp.smi.compiler import addMibCompiler, defaultDest
+from pysnmp.smi.error import SmiError
+from pyasn1.type.base import AbstractSimpleAsn1Item
+from pyasn1.error import PyAsn1Error
+from pysnmp import debug
+
+#
+# An OID-like object that embeds MIB resolution.
+#
+# Valid initializers include:
+# ObjectIdentity('1.3.6.1.2.1.1.1.0')
+# ObjectIdentity('iso.org.dod.internet.mgmt.mib-2.system.sysDescr.0')
+# ObjectIdentity('SNMPv2-MIB', 'system')
+# ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0)
+# ObjectIdentity('IP-MIB', 'ipAdEntAddr', '127.0.0.1', 123)
+#
+class ObjectIdentity:
+ stDirty, stClean = 1, 2
+
+ def __init__(self, *args):
+ self.__args = args
+ self.__mibSourcesToAdd = self.__modNamesToLoad = None
+ self.__asn1SourcesToAdd = None
+ self.__state = self.stDirty
+
+ #
+ # public API
+ #
+ def getMibSymbol(self):
+ if self.__state & self.stClean:
+ return self.__modName, self.__symName, self.__indices
+ else:
+ raise SmiError('%s object not fully initialized' % self.__class__.__name__)
+
+ def getOid(self):
+ if self.__state & self.stClean:
+ return self.__oid
+ else:
+ raise SmiError('%s object not fully initialized' % self.__class__.__name__)
+
+ def getLabel(self):
+ if self.__state & self.stClean:
+ return self.__label
+ else:
+ raise SmiError('%s object not fully initialized' % self.__class__.__name__)
+
+ def getMibNode(self):
+ if self.__state & self.stClean:
+ return self.__mibNode
+ else:
+ raise SmiError('%s object not fully initialized' % self.__class__.__name__)
+
+ #
+ # A gateway to MIBs manipulation routines
+ #
+
+ def addMibCompiler(self, *asn1Sources, **kwargs):
+ if self.__asn1SourcesToAdd is None:
+ self.__asn1SourcesToAdd = asn1Sources
+ else:
+ self.__asn1SourcesToAdd += asn1Sources
+ self.__mibDir = kwargs.get('destDir', defaultDest)
+ return self
+
+ def addMibSource(self, *mibSources):
+ if self.__mibSourcesToAdd is None:
+ self.__mibSourcesToAdd = mibSources
+ else:
+ self.__mibSourcesToAdd += mibSources
+ return self
+
+ # provides deferred MIBs load
+ def loadMibs(self, *modNames):
+ if self.__modNamesToLoad is None:
+ self.__modNamesToLoad = modNames
+ else:
+ self.__modNamesToLoad += modNames
+ return self
+
+ # this would eventually be called by an entity which posses a
+ # reference to MibViewController
+ def resolveWithMib(self, mibViewController):
+ if self.__mibSourcesToAdd is not None:
+ debug.logger & debug.flagMIB and debug.logger('adding MIB sources %s' % ', '.join(self.__mibSourcesToAdd))
+ mibViewController.mibBuilder.addMibSources(
+ *[ ZipMibSource(x) for x in self.__mibSourcesToAdd ]
+ )
+ self.__mibSourcesToAdd = None
+
+ if self.__asn1SourcesToAdd is not None:
+ debug.logger & debug.flagMIB and debug.logger('adding MIB compiler with source paths %s' % ', '.join(self.__asn1SourcesToAdd))
+ addMibCompiler(
+ mibViewController.mibBuilder,
+ sources=self.__asn1SourcesToAdd,
+ destination=self.__mibDir
+ )
+ self.__asn1SourcesToAdd = self.__mibDir = None
+
+ if self.__modNamesToLoad is not None:
+ debug.logger & debug.flagMIB and debug.logger('loading MIB modules %s' % ', '.join(self.__modNamesToLoad))
+ mibViewController.mibBuilder.loadModules(*self.__modNamesToLoad)
+ self.__modNamesToLoad = None
+
+ if self.__state & self.stClean:
+ return self
+
+ MibScalar, MibTableColumn = mibViewController.mibBuilder.importSymbols('SNMPv2-SMI', 'MibScalar', 'MibTableColumn')
+
+ self.__indices = ()
+
+ if len(self.__args) == 1: # OID or label
+ debug.logger & debug.flagMIB and debug.logger('resolving %s as OID or label' % self.__args)
+ try:
+ self.__oid = rfc1902.ObjectName(self.__args[0])
+ except PyAsn1Error:
+ try:
+ label = tuple(self.__args[0].split('.'))
+ except ValueError:
+ raise SmiError('Bad OID format %r' % (self.__args[0],))
+ prefix, label, suffix = mibViewController.getNodeNameByOid(
+ label
+ )
+
+ if suffix:
+ try:
+ suffix = tuple([ int(x) for x in suffix ])
+ except ValueError:
+ raise SmiError('Unknown object name component %r' % (suffix,))
+ self.__oid = rfc1902.ObjectName(prefix + suffix)
+ else:
+ prefix, label, suffix = mibViewController.getNodeNameByOid(
+ self.__oid
+ )
+
+ debug.logger & debug.flagMIB and debug.logger('resolved %r into prefix %r and suffix %r' % (self.__args, prefix, suffix))
+
+ modName, symName, _ = mibViewController.getNodeLocation(prefix)
+
+ self.__modName = modName
+ self.__symName = symName
+
+ self.__label = label
+
+ mibNode, = mibViewController.mibBuilder.importSymbols(
+ modName, symName
+ )
+
+ self.__mibNode = mibNode
+
+ debug.logger & debug.flagMIB and debug.logger('resolved prefix %r into MIB node %r' % (prefix, mibNode))
+
+ if isinstance(mibNode, MibTableColumn): # table column
+ if suffix:
+ rowModName, rowSymName, _ = mibViewController.getNodeLocation(
+ mibNode.name[:-1]
+ )
+ rowNode, = mibViewController.mibBuilder.importSymbols(
+ rowModName, rowSymName
+ )
+ self.__indices = rowNode.getIndicesFromInstId(suffix)
+ elif isinstance(mibNode, MibScalar): # scalar
+ if suffix:
+ self.__indices = ( rfc1902.ObjectName(suffix), )
+ else:
+ if suffix:
+ self.__indices = ( rfc1902.ObjectName(suffix), )
+ self.__state |= self.stClean
+
+ debug.logger & debug.flagMIB and debug.logger('resolved indices are %r' % (self.__indices,))
+
+ return self
+ elif len(self.__args) > 1: # MIB, symbol[, index, index ...]
+ self.__modName = self.__args[0]
+ if self.__args[1]:
+ self.__symName = self.__args[1]
+ else:
+ mibViewController.mibBuilder.loadModules(self.__modName)
+ oid, _, _ = mibViewController.getFirstNodeName(self.__modName)
+ _, self.__symName, _ = mibViewController.getNodeLocation(oid)
+
+ mibNode, = mibViewController.mibBuilder.importSymbols(
+ self.__modName, self.__symName
+ )
+
+ self.__mibNode = mibNode
+
+ self.__oid = rfc1902.ObjectName(mibNode.getName())
+
+ prefix, label, suffix = mibViewController.getNodeNameByOid(
+ self.__oid
+ )
+ self.__label = label
+
+ debug.logger & debug.flagMIB and debug.logger('resolved %r into prefix %r and suffix %r' % (self.__args, prefix, suffix))
+
+ if isinstance(mibNode, MibTableColumn): # table
+ rowModName, rowSymName, _ = mibViewController.getNodeLocation(
+ mibNode.name[:-1]
+ )
+ rowNode, = mibViewController.mibBuilder.importSymbols(
+ rowModName, rowSymName
+ )
+ if self.__args[2:]:
+ try:
+ instIds = rowNode.getInstIdFromIndices(*self.__args[2:])
+ self.__oid += instIds
+ self.__indices = rowNode.getIndicesFromInstId(instIds)
+ except PyAsn1Error:
+ raise SmiError('Instance index %r to OID convertion failure at object %r: %s' % (self.__args[2:], mibNode.getLabel(), sys.exc_info()[1]))
+ elif self.__args[2:]: # any other kind of MIB node with indices
+ if self.__args[2:]:
+ instId = rfc1902.ObjectName(
+ '.'.join([ str(x) for x in self.__args[2:] ])
+ )
+ self.__oid += instId
+ self.__indices = ( instId, )
+ self.__state |= self.stClean
+
+ debug.logger & debug.flagMIB and debug.logger('resolved indices are %r' % (self.__indices,))
+
+ return self
+ else:
+ raise SmiError('Non-OID, label or MIB symbol')
+
+ def prettyPrint(self):
+ if self.__state & self.stClean:
+ return '%s::%s%s%s' % (
+ self.__modName, self.__symName,
+ self.__indices and '.' or '',
+ '.'.join(['"%s"' % x.prettyPrint() for x in self.__indices ])
+ )
+ else:
+ raise SmiError('%s object not fully initialized' % self.__class__.__name__)
+
+ def __repr__(self):
+ return '%s(%s)' % (self.__class__.__name__, ', '.join([ repr(x) for x in self.__args]))
+
+ # Redirect some attrs access to the OID object to behave alike
+
+ def __str__(self):
+ if self.__state & self.stClean:
+ return str(self.__oid)
+ else:
+ raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+
+ def __eq__(self, other):
+ if self.__state & self.stClean:
+ return self.__oid == other
+ else:
+ raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+
+ def __ne__(self, other):
+ if self.__state & self.stClean:
+ return self.__oid != other
+ else:
+ raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+
+ def __lt__(self, other):
+ if self.__state & self.stClean:
+ return self.__oid < other
+ else:
+ raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+
+ def __le__(self, other):
+ if self.__state & self.stClean:
+ return self.__oid <= other
+ else:
+ raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+
+ def __gt__(self, other):
+ if self.__state & self.stClean:
+ return self.__oid > other
+ else:
+ raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+
+ def __ge__(self, other):
+ if self.__state & self.stClean:
+ return self.__oid > other
+ else:
+ raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+
+ def __nonzero__(self):
+ if self.__state & self.stClean:
+ return self.__oid != 0
+ else:
+ raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+
+ def __bool__(self):
+ if self.__state & self.stClean:
+ return bool(self.__oid)
+ else:
+ raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+
+ def __getitem__(self, i):
+ if self.__state & self.stClean:
+ return self.__oid[i]
+ else:
+ raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+
+ def __len__(self):
+ if self.__state & self.stClean:
+ return len(self.__oid)
+ else:
+ raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+
+ def __add__(self, other):
+ if self.__state & self.stClean:
+ return self.__oid + other
+ else:
+ raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+
+ def __radd__(self, other):
+ if self.__state & self.stClean:
+ return other + self.__oid
+ else:
+ raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+
+ def __hash__(self):
+ if self.__state & self.stClean:
+ return hash(self.__oid)
+ else:
+ raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+
+ def __getattr__(self, attr):
+ if self.__state & self.stClean:
+ if attr in ( 'asTuple', 'clone', 'subtype', 'isPrefixOf',
+ 'isSameTypeWith', 'isSuperTypeOf'):
+ return getattr(self.__oid, attr)
+ raise AttributeError
+ else:
+ raise SmiError('%s object not properly initialized for accessing %s' % (self.__class__.__name__, attr))
+
+# A two-element sequence of ObjectIdentity and SNMP data type object
+class ObjectType:
+ stDirty, stClean = 1, 2
+ def __init__(self, objectIdentity, objectSyntax=rfc1905.unSpecified):
+ if not isinstance(objectIdentity, ObjectIdentity):
+ raise SmiError('initializer should be ObjectIdentity instance, not %r' % (objectIdentity,))
+ self.__args = [ objectIdentity, objectSyntax ]
+ self.__state = self.stDirty
+
+ def __getitem__(self, i):
+ if self.__state & self.stClean:
+ return self.__args[i]
+ else:
+ raise SmiError('%s object not fully initialized' % self.__class__.__name__)
+
+ def __repr__(self):
+ return '%s(%s)' % (self.__class__.__name__, ', '.join([ repr(x) for x in self.__args]))
+
+ def resolveWithMib(self, mibViewController):
+ if self.__state & self.stClean:
+ return self
+
+ self.__args[0].resolveWithMib(mibViewController)
+
+ MibScalar, MibTableColumn = mibViewController.mibBuilder.importSymbols('SNMPv2-SMI', 'MibScalar', 'MibTableColumn')
+
+ if not isinstance(self.__args[0].getMibNode(),
+ (MibScalar, MibTableColumn)):
+ if not isinstance(self.__args[1], AbstractSimpleAsn1Item):
+ raise SmiError('MIB object %r is not OBJECT-TYPE (MIB not loaded?)' % (self.__args[0],))
+ self.__state |= self.stClean
+ return self
+
+ if isinstance(self.__args[1], (rfc1905.UnSpecified,
+ rfc1905.NoSuchObject,
+ rfc1905.NoSuchInstance,
+ rfc1905.EndOfMibView)):
+ self.__state |= self.stClean
+ return self
+
+ try:
+ self.__args[1] = self.__args[0].getMibNode().getSyntax().clone(self.__args[1])
+ except PyAsn1Error:
+ raise SmiError('Value %r to type %r convertion failure: %s' % (self.__args[1], self.__args[0].getMibNode().getSyntax().__class__.__name__, sys.exc_info()[1]))
+
+ self.__state |= self.stClean
+
+ debug.logger & debug.flagMIB and debug.logger('resolved %r syntax is %r' % (self.__args[0], self.__args[1]))
+
+ return self
+
+ def prettyPrint(self):
+ if self.__state & self.stClean:
+ return '%s = %s' % (self.__args[0].prettyPrint(),
+ self.__args[1].prettyPrint())
+ else:
+ raise SmiError('%s object not fully initialized' % self.__class__.__name__)
+
+# A sequence of ObjectType's
+class NotificationType:
+ stDirty, stClean = 1, 2
+ def __init__(self, objectIdentity, instanceIndex=(), objects={}):
+ if not isinstance(objectIdentity, ObjectIdentity):
+ raise SmiError('initializer should be ObjectIdentity instance, not %r' % (objectIdentity,))
+ self.__objectIdentity = objectIdentity
+ self.__instanceIndex = instanceIndex
+ self.__objects = objects
+ self.__varBinds = []
+ self.__additionalVarBinds = []
+ self.__state = self.stDirty
+
+ def __getitem__(self, i):
+ if self.__state & self.stClean:
+ return self.__varBinds[i]
+ else:
+ raise SmiError('%s object not fully initialized' % self.__class__.__name__)
+
+ def __repr__(self):
+ return '%s(%r, %r, %r)' % (self.__class__.__name__, self.__objectIdentity, self.__instanceIndex, self.__objects)
+
+ def addVarBinds(self, *varBinds):
+ debug.logger & debug.flagMIB and debug.logger('additional var-binds: %r' % (varBinds,))
+ if self.__state & self.stClean:
+ self.__varBinds.extend(varBinds)
+ else:
+ self.__additionalVarBinds.extend(varBinds)
+ return self
+
+ def resolveWithMib(self, mibViewController):
+ if self.__state & self.stClean:
+ return self
+
+ self.__objectIdentity.resolveWithMib(mibViewController)
+
+ self.__varBinds.append(
+ ObjectType(ObjectIdentity(v2c.apiTrapPDU.snmpTrapOID),
+ self.__objectIdentity).resolveWithMib(mibViewController)
+ )
+
+ NotificationType, = mibViewController.mibBuilder.importSymbols('SNMPv2-SMI', 'NotificationType')
+
+ mibNode = self.__objectIdentity.getMibNode()
+
+ if isinstance(mibNode, NotificationType):
+ for notificationObject in mibNode.getObjects():
+ objectIdentity = ObjectIdentity(*notificationObject+self.__instanceIndex).resolveWithMib(mibViewController)
+ self.__varBinds.append(
+ ObjectType(objectIdentity, self.__objects.get(notificationObject, rfc1905.unSpecified)).resolveWithMib(mibViewController)
+ )
+ else:
+ debug.logger & debug.flagMIB and debug.logger('WARNING: MIB object %r is not NOTIFICATION-TYPE (MIB not loaded?)' % (self.__objectIdentity,))
+
+ if self.__additionalVarBinds:
+ self.__varBinds.extend(self.__additionalVarBinds)
+ self.__additionalVarBinds = []
+
+ self.__state |= self.stClean
+
+ debug.logger & debug.flagMIB and debug.logger('resolved %r into %r' % (self.__objectIdentity, self.__varBinds))
+
+ return self
+
+ def prettyPrint(self):
+ if self.__state & self.stClean:
+ return ' '.join([ '%s = %s' % (x[0].prettyPrint(), x[1].prettyPrint()) for x in self.__varBinds])
+ else:
+ raise SmiError('%s object not fully initialized' % self.__class__.__name__)