diff options
Diffstat (limited to 'pysnmp/smi')
-rw-r--r-- | pysnmp/smi/builder.py | 321 | ||||
-rw-r--r-- | pysnmp/smi/compiler.py | 33 | ||||
-rw-r--r-- | pysnmp/smi/error.py | 14 | ||||
-rw-r--r-- | pysnmp/smi/indices.py | 86 | ||||
-rw-r--r-- | pysnmp/smi/instrum.py | 84 | ||||
-rw-r--r-- | pysnmp/smi/rfc1902.py | 695 | ||||
-rw-r--r-- | pysnmp/smi/view.py | 255 |
7 files changed, 908 insertions, 580 deletions
diff --git a/pysnmp/smi/builder.py b/pysnmp/smi/builder.py index 9d75732b..513bc19a 100644 --- a/pysnmp/smi/builder.py +++ b/pysnmp/smi/builder.py @@ -33,25 +33,31 @@ from pysnmp import debug class __AbstractMibSource(object): def __init__(self, srcName): self._srcName = srcName - self.__magic = imp.get_magic() - self.__sfx = {} - self.__inited = None + self._magic = imp.get_magic() + self._sfx = {} + self._inited = None + for sfx, mode, typ in imp.get_suffixes(): - if typ not in self.__sfx: - self.__sfx[typ] = [] - self.__sfx[typ].append((sfx, len(sfx), mode)) - debug.logger & debug.FLAG_BLD and debug.logger('trying %s' % self) + if typ not in self._sfx: + self._sfx[typ] = [] + + self._sfx[typ].append((sfx, len(sfx), mode)) + + debug.logger & debug.FLAG_BLD and debug.logger( + 'trying %s' % self) def __repr__(self): return '%s(%r)' % (self.__class__.__name__, self._srcName) def _uniqNames(self, files): u = set() + for f in files: if f.startswith('__init__.'): continue + for typ in (imp.PY_SOURCE, imp.PY_COMPILED): - for sfx, sfxLen, mode in self.__sfx[typ]: + for sfx, sfxLen, mode in self._sfx[typ]: if f[-sfxLen:] == sfx: u.add(f[:-sfxLen]) return tuple(u) @@ -59,18 +65,23 @@ class __AbstractMibSource(object): # MibSource API follows def fullPath(self, f='', sfx=''): - return self._srcName + (f and (os.sep + f + sfx) or '') + if f: + return os.path.join(self._srcName, f) + sfx + + return self._srcName def init(self): - if self.__inited is None: - self.__inited = self._init() - if self.__inited is self: - self.__inited = True - if self.__inited is True: + if self._inited is None: + self._inited = self._init() + + if self._inited is self: + self._inited = True + + if self._inited is True: return self else: - return self.__inited + return self._inited def listdir(self): return self._listdir() @@ -78,54 +89,62 @@ class __AbstractMibSource(object): def read(self, f): pycTime = pyTime = -1 - for pycSfx, pycSfxLen, pycMode in self.__sfx[imp.PY_COMPILED]: + for pycSfx, pycSfxLen, pycMode in self._sfx[imp.PY_COMPILED]: + + pycFile = f + pycSfx + try: - pycData, pycPath = self._getData(f + pycSfx, pycMode) + pycData, pycPath = self._getData(pycFile, pycMode) except IOError as exc: if ENOENT == -1 or exc.errno == ENOENT: debug.logger & debug.FLAG_BLD and debug.logger( - 'file %s access error: %s' % (f + pycSfx, exc) - ) + 'file %s access error: %s' % (pycFile, exc)) else: - raise error.MibLoadError('MIB file %s access error: %s' % (f + pycSfx, exc)) + raise error.MibLoadError( + 'MIB file %s access error: %s' % (pycFile, exc)) else: - if self.__magic == pycData[:4]: + if self._magic == pycData[:4]: pycData = pycData[4:] pycTime = struct.unpack('<L', pycData[:4])[0] pycData = pycData[4:] + debug.logger & debug.FLAG_BLD and debug.logger( - 'file %s mtime %d' % (pycPath, pycTime) - ) + 'file %s mtime %d' % (pycPath, pycTime)) + break else: - debug.logger & debug.FLAG_BLD and debug.logger('bad magic in %s' % pycPath) + debug.logger & debug.FLAG_BLD and debug.logger( + 'bad magic in %s' % pycPath) + + for pySfx, pySfxLen, pyMode in self._sfx[imp.PY_SOURCE]: + pyFile = f + pySfx - for pySfx, pySfxLen, pyMode in self.__sfx[imp.PY_SOURCE]: try: - pyTime = self._getTimestamp(f + pySfx) + pyTime = self._getTimestamp(pyFile) except IOError as exc: if ENOENT == -1 or exc.errno == ENOENT: debug.logger & debug.FLAG_BLD and debug.logger( - 'file %s access error: %s' % (f + pySfx, exc) - ) + 'file %s access error: %s' % (pyFile, exc)) else: - raise error.MibLoadError('MIB file %s access error: %s' % (f + pySfx, exc)) + raise error.MibLoadError( + 'MIB file %s access error: %s' % (pyFile, exc)) else: - debug.logger & debug.FLAG_BLD and debug.logger('file %s mtime %d' % (f + pySfx, pyTime)) + debug.logger & debug.FLAG_BLD and debug.logger( + 'file %s mtime %d' % (pyFile, pyTime)) break if pycTime != -1 and pycTime >= pyTime: return marshal.loads(pycData), pycSfx if pyTime != -1: - modData, pyPath = self._getData(f + pySfx, pyMode) + modData, pyPath = self._getData(pyFile, pyMode) return compile(modData, pyPath, 'exec'), pyPath raise IOError(ENOENT, 'No suitable module found', f) @@ -147,16 +166,21 @@ class __AbstractMibSource(object): class ZipMibSource(__AbstractMibSource): def _init(self): try: - p = __import__(self._srcName, globals(), locals(), ['__init__']) - if hasattr(p, '__loader__') and hasattr(p.__loader__, '_files'): - self.__loader = p.__loader__ + mod = __import__( + self._srcName, globals(), locals(), ['__init__']) + + if (hasattr(mod, '__loader__') and + hasattr(mod.__loader__, '_files')): + self.__loader = mod.__loader__ self._srcName = self._srcName.replace('.', os.sep) return self - elif hasattr(p, '__file__'): + + elif hasattr(mod, '__file__'): # Dir relative to PYTHONPATH - return DirMibSource(os.path.split(p.__file__)[0]).init() + return DirMibSource(os.path.split(mod.__file__)[0]).init() + else: - raise error.MibLoadError('%s access error' % (p,)) + raise error.MibLoadError('%s access error' % (mod,)) except ImportError: # Dir relative to CWD @@ -176,33 +200,40 @@ class ZipMibSource(__AbstractMibSource): return time.mktime(t) def _listdir(self): - l = [] + dirs = [] + # noinspection PyProtectedMember - for f in self.__loader._files.keys(): - d, f = os.path.split(f) - if d == self._srcName: - l.append(f) - return tuple(self._uniqNames(l)) + for path in self.__loader._files: + dr, fl = os.path.split(path) + if dr == self._srcName: + dirs.append(fl) + + return tuple(self._uniqNames(dirs)) def _getTimestamp(self, f): - p = os.path.join(self._srcName, f) + path = os.path.join(self._srcName, f) + # noinspection PyProtectedMember - if p in self.__loader._files: + if path in self.__loader._files: # noinspection PyProtectedMember return self._parseDosTime( - self.__loader._files[p][6], self.__loader._files[p][5] + self.__loader._files[path][6], self.__loader._files[path][5] ) + else: - raise IOError(ENOENT, 'No such file in ZIP archive', p) + raise IOError(ENOENT, 'No such file in ZIP archive', path) def _getData(self, f, mode=None): - p = os.path.join(self._srcName, f) + path = os.path.join(self._srcName, f) + try: - return self.__loader.get_data(p), p + return self.__loader.get_data(path), path # ZIP code seems to return all kinds of errors except Exception as exc: - raise IOError(ENOENT, 'File or ZIP archive %s access error: %s' % (p, exc)) + raise IOError( + ENOENT, 'File or ZIP archive %s access ' + 'error: %s' % (path, exc)) class DirMibSource(__AbstractMibSource): @@ -213,33 +244,35 @@ class DirMibSource(__AbstractMibSource): def _listdir(self): try: return self._uniqNames(os.listdir(self._srcName)) + except OSError as exc: debug.logger & debug.FLAG_BLD and debug.logger( 'listdir() failed for %s: %s' % (self._srcName, exc)) return () def _getTimestamp(self, f): - p = os.path.join(self._srcName, f) + path = os.path.join(self._srcName, f) try: - return os.stat(p)[8] + return os.stat(path)[8] except OSError as exc: - raise IOError(ENOENT, 'No such file: %s' % exc, p) + raise IOError(ENOENT, 'No such file: %s' % exc, path) + + def _getData(self, fl, mode): + path = os.path.join(self._srcName, '*') - def _getData(self, f, mode): - p = os.path.join(self._srcName, '*') try: - if f in os.listdir(self._srcName): # make FS case-sensitive - p = os.path.join(self._srcName, f) - fp = open(p, mode) + if fl in os.listdir(self._srcName): # make FS case-sensitive + path = os.path.join(self._srcName, fl) + fp = open(path, mode) data = fp.read() fp.close() - return data, p + return data, path except (IOError, OSError) as exc: - msg = 'File or directory %s access error: %s' % (p, exc) + msg = 'File or directory %s access error: %s' % (path, exc) else: - msg = 'No such file or directory: %s' % p + msg = 'No such file or directory: %s' % path raise IOError(ENOENT, msg) @@ -260,91 +293,108 @@ class MibBuilder(object): def __init__(self): self.lastBuildId = self._autoName = 0 + sources = [] + for ev in 'PYSNMP_MIB_PKGS', 'PYSNMP_MIB_DIRS', 'PYSNMP_MIB_DIR': if ev in os.environ: for m in os.environ[ev].split(os.pathsep): sources.append(ZipMibSource(m)) + if not sources and self.DEFAULT_MISC_MIBS: for m in self.DEFAULT_MISC_MIBS.split(os.pathsep): sources.append(ZipMibSource(m)) + for m in self.DEFAULT_CORE_MIBS.split(os.pathsep): sources.insert(0, ZipMibSource(m)) + self.mibSymbols = {} - self.__mibSources = [] - self.__modSeen = {} - self.__modPathsSeen = set() - self.__mibCompiler = None + self._mibSources = [] + self._modSeen = {} + self._modPathsSeen = set() + self._mibCompiler = None + self.setMibSources(*sources) # MIB compiler management def getMibCompiler(self): - return self.__mibCompiler + return self._mibCompiler def setMibCompiler(self, mibCompiler, destDir): self.addMibSources(DirMibSource(destDir)) - self.__mibCompiler = mibCompiler + self._mibCompiler = mibCompiler return self # MIB modules management def addMibSources(self, *mibSources): - self.__mibSources.extend([s.init() for s in mibSources]) - debug.logger & debug.FLAG_BLD and debug.logger('addMibSources: new MIB sources %s' % (self.__mibSources,)) + self._mibSources.extend([s.init() for s in mibSources]) + + debug.logger & debug.FLAG_BLD and debug.logger( + 'addMibSources: new MIB sources %s' % (self._mibSources,)) def setMibSources(self, *mibSources): - self.__mibSources = [s.init() for s in mibSources] - debug.logger & debug.FLAG_BLD and debug.logger('setMibSources: new MIB sources %s' % (self.__mibSources,)) + self._mibSources = [s.init() for s in mibSources] + + debug.logger & debug.FLAG_BLD and debug.logger( + 'setMibSources: new MIB sources %s' % (self._mibSources,)) def getMibSources(self): - return tuple(self.__mibSources) + return tuple(self._mibSources) def loadModule(self, modName, **userCtx): """Load and execute MIB modules as Python code""" - for mibSource in self.__mibSources: - debug.logger & debug.FLAG_BLD and debug.logger('loadModule: trying %s at %s' % (modName, mibSource)) + for mibSource in self._mibSources: + debug.logger & debug.FLAG_BLD and debug.logger( + 'loadModule: trying %s at %s' % (modName, mibSource)) + try: codeObj, sfx = mibSource.read(modName) except IOError as exc: debug.logger & debug.FLAG_BLD and debug.logger( - 'loadModule: read %s from %s failed: %s' % (modName, mibSource, exc)) + 'loadModule: read %s from %s failed: ' + '%s' % (modName, mibSource, exc)) continue modPath = mibSource.fullPath(modName, sfx) - if modPath in self.__modPathsSeen: - debug.logger & debug.FLAG_BLD and debug.logger('loadModule: seen %s' % modPath) + if modPath in self._modPathsSeen: + debug.logger & debug.FLAG_BLD and debug.logger( + 'loadModule: seen %s' % modPath) break else: - self.__modPathsSeen.add(modPath) + self._modPathsSeen.add(modPath) - debug.logger & debug.FLAG_BLD and debug.logger('loadModule: evaluating %s' % modPath) + debug.logger & debug.FLAG_BLD and debug.logger( + 'loadModule: evaluating %s' % modPath) - g = {'mibBuilder': self, 'userCtx': userCtx} + g = {'mibBuilder': self, + 'userCtx': userCtx} try: exec(codeObj, g) except Exception: - self.__modPathsSeen.remove(modPath) + self._modPathsSeen.remove(modPath) raise error.MibLoadError( - 'MIB module \'%s\' load error: %s' % (modPath, traceback.format_exception(*sys.exc_info())) - ) + 'MIB module "%s" load error: ' + '%s' % (modPath, traceback.format_exception(*sys.exc_info()))) - self.__modSeen[modName] = modPath + self._modSeen[modName] = modPath - debug.logger & debug.FLAG_BLD and debug.logger('loadModule: loaded %s' % modPath) + debug.logger & debug.FLAG_BLD and debug.logger( + 'loadModule: loaded %s' % modPath) break - if modName not in self.__modSeen: + if modName not in self._modSeen: raise error.MibNotFoundError( - 'MIB file \"%s\" not found in search path (%s)' % ( - modName and modName + ".py[co]", ', '.join([str(x) for x in self.__mibSources])) - ) + 'MIB file "%s" not found in search path ' + '(%s)' % (modName and modName + ".py[co]", ', '.join( + [str(x) for x in self._mibSources]))) return self @@ -353,28 +403,36 @@ class MibBuilder(object): # Build a list of available modules if not modNames: modNames = {} - for mibSource in self.__mibSources: + + for mibSource in self._mibSources: for modName in mibSource.listdir(): modNames[modName] = None + modNames = list(modNames) if not modNames: raise error.MibNotFoundError( - 'No MIB module to load at %s' % (self,) - ) + 'No MIB module to load at %s' % (self,)) for modName in modNames: try: self.loadModule(modName, **userCtx) except error.MibNotFoundError: - if self.__mibCompiler: - debug.logger & debug.FLAG_BLD and debug.logger('loadModules: calling MIB compiler for %s' % modName) - status = self.__mibCompiler.compile(modName, genTexts=self.loadTexts) - errs = '; '.join([hasattr(x, 'error') and str(x.error) or x for x in status.values() if - x in ('failed', 'missing')]) + if self._mibCompiler: + debug.logger & debug.FLAG_BLD and debug.logger( + 'loadModules: calling MIB compiler for %s' % modName) + + status = self._mibCompiler.compile(modName, genTexts=self.loadTexts) + + errs = '; '.join( + hasattr(x, 'error') and str(x.error) or x + for x in status.values() + if x in ('failed', 'missing')) + if errs: - raise error.MibNotFoundError('%s compilation error(s): %s' % (modName, errs)) + raise error.MibNotFoundError( + '%s compilation error(s): %s' % (modName, errs)) # compilation succeeded, MIB might load now self.loadModule(modName, **userCtx) @@ -383,84 +441,103 @@ class MibBuilder(object): def unloadModules(self, *modNames): if not modNames: - modNames = list(self.mibSymbols.keys()) + modNames = list(self.mibSymbols) + for modName in modNames: if modName not in self.mibSymbols: raise error.MibNotFoundError( - 'No module %s at %s' % (modName, self) - ) + 'No module %s at %s' % (modName, self)) + self.unexportSymbols(modName) - self.__modPathsSeen.remove(self.__modSeen[modName]) - del self.__modSeen[modName] - debug.logger & debug.FLAG_BLD and debug.logger('unloadModules: %s' % modName) + self._modPathsSeen.remove(self._modSeen[modName]) + + del self._modSeen[modName] + + debug.logger & debug.FLAG_BLD and debug.logger( + 'unloadModules: %s' % modName) return self def importSymbols(self, modName, *symNames, **userCtx): if not modName: raise error.SmiError( - 'importSymbols: empty MIB module name' - ) - r = () + 'importSymbols: empty MIB module name') + + symbols = [] + for symName in symNames: if modName not in self.mibSymbols: self.loadModules(modName, **userCtx) + if modName not in self.mibSymbols: raise error.MibNotFoundError( - 'No module %s loaded at %s' % (modName, self) - ) + 'No module %s loaded at %s' % (modName, self)) + if symName not in self.mibSymbols[modName]: raise error.SmiError( - 'No symbol %s::%s at %s' % (modName, symName, self) - ) - r = r + (self.mibSymbols[modName][symName],) - return r + 'No symbol %s::%s at %s' % (modName, symName, self)) + + symbols.append(self.mibSymbols[modName][symName]) + + return symbols def exportSymbols(self, modName, *anonymousSyms, **namedSyms): if modName not in self.mibSymbols: self.mibSymbols[modName] = {} + mibSymbols = self.mibSymbols[modName] for symObj in anonymousSyms: debug.logger & debug.FLAG_BLD and debug.logger( - 'exportSymbols: anonymous symbol %s::__pysnmp_%ld' % (modName, self._autoName)) + 'exportSymbols: anonymous symbol %s::' + '__pysnmp_%ld' % (modName, self._autoName)) + mibSymbols['__pysnmp_%ld' % self._autoName] = symObj + self._autoName += 1 + for symName, symObj in namedSyms.items(): if symName in mibSymbols: raise error.SmiError( - 'Symbol %s already exported at %s' % (symName, modName) - ) + 'Symbol %s already exported at %s' % (symName, modName)) + + if (symName != self.moduleID and + not isinstance(symObj, classTypes)): - if symName != self.moduleID and \ - not isinstance(symObj, classTypes): label = symObj.getLabel() + if label: symName = label + else: symObj.setLabel(symName) mibSymbols[symName] = symObj - debug.logger & debug.FLAG_BLD and debug.logger('exportSymbols: symbol %s::%s' % (modName, symName)) + debug.logger & debug.FLAG_BLD and debug.logger( + 'exportSymbols: symbol %s::%s' % (modName, symName)) self.lastBuildId += 1 def unexportSymbols(self, modName, *symNames): if modName not in self.mibSymbols: raise error.SmiError('No module %s at %s' % (modName, self)) + mibSymbols = self.mibSymbols[modName] + if not symNames: symNames = list(mibSymbols.keys()) + for symName in symNames: if symName not in mibSymbols: raise error.SmiError( - 'No symbol %s::%s at %s' % (modName, symName, self) - ) + 'No symbol %s::%s at %s' % (modName, symName, self)) + del mibSymbols[symName] - debug.logger & debug.FLAG_BLD and debug.logger('unexportSymbols: symbol %s::%s' % (modName, symName)) + debug.logger & debug.FLAG_BLD and debug.logger( + 'unexportSymbols: symbol %s::%s' % (modName, symName)) if not self.mibSymbols[modName]: del self.mibSymbols[modName] diff --git a/pysnmp/smi/compiler.py b/pysnmp/smi/compiler.py index 231685f8..6c4d3d11 100644 --- a/pysnmp/smi/compiler.py +++ b/pysnmp/smi/compiler.py @@ -33,9 +33,11 @@ except ImportError as exc: def addMibCompilerDecorator(errorMsg): + def addMibCompiler(mibBuilder, **kwargs): if not kwargs.get('ifAvailable'): - raise error.SmiError('MIB compiler not available: %s' % errorMsg) + raise error.SmiError( + 'MIB compiler not available: %s' % errorMsg) return addMibCompiler @@ -48,18 +50,25 @@ else: if kwargs.get('ifNotAdded') and mibBuilder.getMibCompiler(): return - compiler = MibCompiler(parserFactory(**smiV1Relaxed)(), - PySnmpCodeGen(), - PyFileWriter(kwargs.get('destination') or DEFAULT_DEST)) + compiler = MibCompiler( + parserFactory(**smiV1Relaxed)(), + PySnmpCodeGen(), + PyFileWriter(kwargs.get('destination') or DEFAULT_DEST)) + + compiler.addSources( + *getReadersFromUrls(*kwargs.get('sources') or DEFAULT_SOURCES)) + + compiler.addSearchers( + StubSearcher(*baseMibs)) - compiler.addSources(*getReadersFromUrls(*kwargs.get('sources') or DEFAULT_SOURCES)) + compiler.addSearchers( + *[PyPackageSearcher(x.fullPath()) for x in mibBuilder.getMibSources()]) - compiler.addSearchers(StubSearcher(*baseMibs)) - compiler.addSearchers(*[PyPackageSearcher(x.fullPath()) for x in mibBuilder.getMibSources()]) - compiler.addBorrowers(*[PyFileBorrower(x, genTexts=mibBuilder.loadTexts) for x in - getReadersFromUrls(*kwargs.get('borrowers') or DEFAULT_BORROWERS, - lowcaseMatching=False)]) + compiler.addBorrowers( + *[PyFileBorrower(x, genTexts=mibBuilder.loadTexts) + for x in getReadersFromUrls( + *kwargs.get('borrowers') or DEFAULT_BORROWERS, + lowcaseMatching=False)]) mibBuilder.setMibCompiler( - compiler, kwargs.get('destination') or DEFAULT_DEST - ) + compiler, kwargs.get('destination') or DEFAULT_DEST) diff --git a/pysnmp/smi/error.py b/pysnmp/smi/error.py index 87909b1e..1322a2c5 100644 --- a/pysnmp/smi/error.py +++ b/pysnmp/smi/error.py @@ -23,25 +23,25 @@ class MibNotFoundError(MibLoadError): class MibOperationError(SmiError): def __init__(self, **kwargs): - self.__outArgs = kwargs + self._outArgs = kwargs def __str__(self): - return '%s(%s)' % (self.__class__.__name__, self.__outArgs) + return '%s(%s)' % (self.__class__.__name__, self._outArgs) def __getitem__(self, key): - return self.__outArgs[key] + return self._outArgs[key] def __contains__(self, key): - return key in self.__outArgs + return key in self._outArgs def get(self, key, defVal=None): - return self.__outArgs.get(key, defVal) + return self._outArgs.get(key, defVal) def keys(self): - return self.__outArgs.keys() + return self._outArgs.keys() def update(self, d): - self.__outArgs.update(d) + self._outArgs.update(d) # Aligned with SNMPv2 PDU error-status values diff --git a/pysnmp/smi/indices.py b/pysnmp/smi/indices.py index 677ac97c..f55f7e6a 100644 --- a/pysnmp/smi/indices.py +++ b/pysnmp/smi/indices.py @@ -11,45 +11,50 @@ class OrderedDict(dict): """Ordered dictionary used for indices""" def __init__(self, *args, **kwargs): - self.__keys = [] - self.__dirty = True super(OrderedDict, self).__init__() + + self._keys = [] + self._dirty = True + self._keysLens = [] + if args: self.update(*args) + if kwargs: self.update(**kwargs) def __setitem__(self, key, value): super(OrderedDict, self).__setitem__(key, value) - if key not in self.__keys: - self.__keys.append(key) - self.__dirty = True + if key not in self._keys: + self._keys.append(key) + self._dirty = True def __delitem__(self, key): super(OrderedDict, self).__delitem__(key) - if key in self.__keys: - self.__keys.remove(key) - self.__dirty = True + if key in self._keys: + self._keys.remove(key) + self._dirty = True def clear(self): super(OrderedDict, self).clear() - self.__keys = [] - self.__dirty = True + self._keys = [] + self._dirty = True def keys(self): - if self.__dirty: - self.__order() - return list(self.__keys) + if self._dirty: + self._order() + return list(self._keys) def values(self): - if self.__dirty: - self.__order() - return [self[k] for k in self.__keys] + if self._dirty: + self._order() + return [self[k] for k in self._keys] def items(self): - if self.__dirty: - self.__order() - return [(k, self[k]) for k in self.__keys] + if self._dirty: + self._order() + + return [(k, self[k]) for k in self._keys] def update(self, *args, **kwargs): if args: @@ -57,6 +62,7 @@ class OrderedDict(dict): if hasattr(iterable, 'keys'): for k in iterable: self[k] = iterable[k] + else: for k, v in iterable: self[k] = v @@ -68,16 +74,19 @@ class OrderedDict(dict): def sortingFun(self, keys): keys.sort() - def __order(self): - self.sortingFun(self.__keys) - self.__keysLens = sorted(set(len(k) for k in self.__keys), reverse=True) - self.__dirty = False + def _order(self): + self.sortingFun(self._keys) + + self._keysLens = sorted( + set(len(k) for k in self._keys), reverse=True) + + self._dirty = False def nextKey(self, key): - if self.__dirty: - self.__order() + if self._dirty: + self._order() - keys = self.__keys + keys = self._keys if key in keys: nextIdx = keys.index(key) + 1 @@ -92,30 +101,35 @@ class OrderedDict(dict): raise KeyError(key) def getKeysLens(self): - if self.__dirty: - self.__order() - return self.__keysLens + if self._dirty: + self._order() + + return self._keysLens class OidOrderedDict(OrderedDict): """OID-ordered dictionary used for indices""" def __init__(self, *args, **kwargs): - self.__keysCache = {} OrderedDict.__init__(self, *args, **kwargs) + self._keysCache = {} + def __setitem__(self, key, value): OrderedDict.__setitem__(self, key, value) - if key not in self.__keysCache: + + if key not in self._keysCache: if isinstance(key, tuple): - self.__keysCache[key] = key + self._keysCache[key] = key + else: - self.__keysCache[key] = [int(x) for x in key.split('.') if x] + self._keysCache[key] = [int(x) for x in key.split('.') if x] def __delitem__(self, key): OrderedDict.__delitem__(self, key) - if key in self.__keysCache: - del self.__keysCache[key] + + if key in self._keysCache: + del self._keysCache[key] def sortingFun(self, keys): - keys.sort(key=lambda k, d=self.__keysCache: d[k]) + keys.sort(key=lambda k, d=self._keysCache: d[k]) diff --git a/pysnmp/smi/instrum.py b/pysnmp/smi/instrum.py index aec25adc..342eeb6b 100644 --- a/pysnmp/smi/instrum.py +++ b/pysnmp/smi/instrum.py @@ -38,21 +38,21 @@ class MibInstrumController(AbstractMibInstrumController): STATE_WRITE_CLEANUP = 'writeCleanup' STATE_WRITE_UNDO = 'writeUndo' - fsmReadVar = { + FSM_READ_VAR = { # (state, status) -> newState (STATE_START, STATUS_OK): STATE_READ_TEST, (STATE_READ_TEST, STATUS_OK): STATE_READ_GET, (STATE_READ_GET, STATUS_OK): STATE_STOP, (STATE_ANY, STATUS_ERROR): STATE_STOP } - fsmReadNextVar = { + FSM_READ_NEXT_VAR = { # (state, status) -> newState (STATE_START, STATUS_OK): STATE_READ_TEST_NEXT, (STATE_READ_TEST_NEXT, STATUS_OK): STATE_READ_GET_NEXT, (STATE_READ_GET_NEXT, STATUS_OK): STATE_STOP, (STATE_ANY, STATUS_ERROR): STATE_STOP } - fsmWriteVar = { + FSM_WRITE_VAR = { # (state, status) -> newState (STATE_START, STATUS_OK): STATE_WRITE_TEST, (STATE_WRITE_TEST, STATUS_OK): STATE_WRITE_COMMIT, @@ -79,7 +79,7 @@ class MibInstrumController(AbstractMibInstrumController): def getMibBuilder(self): return self.mibBuilder - def __indexMib(self): + def _indexMib(self): """Rebuild a tree from MIB objects found at currently loaded modules. If currently existing tree is out of date, walk over all Managed Objects @@ -108,7 +108,8 @@ class MibInstrumController(AbstractMibInstrumController): if self.lastBuildId == self.mibBuilder.lastBuildId: return - (MibScalarInstance, MibScalar, MibTableColumn, MibTableRow, + (MibScalarInstance, MibScalar, + MibTableColumn, MibTableRow, MibTable) = self.mibBuilder.importSymbols( 'SNMPv2-SMI', 'MibScalarInstance', 'MibScalar', 'MibTableColumn', 'MibTableRow', 'MibTable' @@ -128,26 +129,36 @@ class MibInstrumController(AbstractMibInstrumController): mibSymbols.sort(key=lambda x: x[0], reverse=True) for modName, mibMod in mibSymbols: + for symObj in mibMod.values(): + if isinstance(symObj, MibTable): tables[symObj.name] = symObj + elif isinstance(symObj, MibTableRow): rows[symObj.name] = symObj + elif isinstance(symObj, MibTableColumn): cols[symObj.name] = symObj + elif isinstance(symObj, MibScalarInstance): instances[symObj.name] = symObj + elif isinstance(symObj, MibScalar): scalars[symObj.name] = symObj # Detach items from each other for symName, parentName in self.lastBuildSyms.items(): + if parentName in scalars: scalars[parentName].unregisterSubtrees(symName) + elif parentName in cols: cols[parentName].unregisterSubtrees(symName) + elif parentName in rows: rows[parentName].unregisterSubtrees(symName) + else: mibTree.unregisterSubtrees(symName) @@ -157,23 +168,29 @@ class MibInstrumController(AbstractMibInstrumController): for inst in instances.values(): if inst.typeName in scalars: scalars[inst.typeName].registerSubtrees(inst) + elif inst.typeName in cols: cols[inst.typeName].registerSubtrees(inst) + else: raise error.SmiError( - 'Orphan MIB scalar instance %r at %r' % (inst, self) - ) + 'Orphan MIB scalar instance %r at ' + '%r' % (inst, self)) + lastBuildSyms[inst.name] = inst.typeName # Attach Table Columns to Table Rows for col in cols.values(): rowName = col.name[:-1] # XXX + if rowName in rows: rows[rowName].registerSubtrees(col) + else: raise error.SmiError( - 'Orphan MIB table column %r at %r' % (col, self) - ) + 'Orphan MIB table column %r at ' + '%r' % (col, self)) + lastBuildSyms[col.name] = rowName # Attach Table Rows to MIB tree @@ -195,7 +212,7 @@ class MibInstrumController(AbstractMibInstrumController): self.lastBuildId = self.mibBuilder.lastBuildId - debug.logger & debug.FLAG_INS and debug.logger('__indexMib: rebuilt') + debug.logger & debug.FLAG_INS and debug.logger('_indexMib: rebuilt') def flipFlopFsm(self, fsmTable, *varBinds, **context): """Read, modify, create or remove Managed Objects Instances. @@ -250,12 +267,12 @@ class MibInstrumController(AbstractMibInstrumController): if err: # Move other errors into the errors sequence errors = context['errors'] + errors.append( {'error': err, 'idx': idx, 'varbind': varBind, - 'state': context['state']} - ) + 'state': context['state']}) context['status'] = self.STATUS_ERROR @@ -271,8 +288,8 @@ class MibInstrumController(AbstractMibInstrumController): count[0] += 1 debug.logger & debug.FLAG_INS and debug.logger( - '_cbFun: var-bind %d, processed %d, expected %d' % ( - idx, count[0], len(varBinds))) + '_cbFun: var-bind %d, processed %d, expected ' + '%d' % (idx, count[0], len(varBinds))) if count[0] < len(varBinds): return @@ -282,7 +299,8 @@ class MibInstrumController(AbstractMibInstrumController): self.flipFlopFsm(fsmTable, *varBinds, **dict(context, cbFun=cbFun)) - debug.logger & debug.FLAG_INS and debug.logger('flipFlopFsm: input var-binds %r' % (varBinds,)) + debug.logger & debug.FLAG_INS and debug.logger( + 'flipFlopFsm: input var-binds %r' % (varBinds,)) mibTree, = self.mibBuilder.importSymbols('SNMPv2-SMI', 'iso') @@ -299,7 +317,7 @@ class MibInstrumController(AbstractMibInstrumController): errors = [] _varBinds = list(varBinds) - self.__indexMib() + self._indexMib() debug.logger & debug.FLAG_INS and debug.logger( 'flipFlopFsm: current state %s, status %s' % (state, status)) @@ -312,10 +330,12 @@ class MibInstrumController(AbstractMibInstrumController): newState = fsmTable[(self.STATE_ANY, status)] except KeyError: - raise error.SmiError('Unresolved FSM state %s, %s' % (state, status)) + raise error.SmiError( + 'Unresolved FSM state %s, %s' % (state, status)) debug.logger & debug.FLAG_INS and debug.logger( - 'flipFlopFsm: state %s status %s -> transitioned into state %s' % (state, status, newState)) + 'flipFlopFsm: state %s status %s -> transitioned into state ' + '%s' % (state, status, newState)) state = newState @@ -324,8 +344,10 @@ class MibInstrumController(AbstractMibInstrumController): context.pop('status', None) context.pop('instances', None) context.pop('varBinds', None) + if cbFun: cbFun(_varBinds, **context) + return # the case of no var-binds @@ -336,16 +358,15 @@ class MibInstrumController(AbstractMibInstrumController): actionFun = getattr(mibTree, state, None) if not actionFun: raise error.SmiError( - 'Unsupported state handler %s at %s' % (state, self) - ) + 'Unsupported state handler %s at ' + '%s' % (state, self)) for idx, varBind in enumerate(varBinds): - actionFun(varBind, - **dict(context, cbFun=_cbFun, - state=state, status=status, - idx=idx, total=len(varBinds), - instances=instances, errors=errors, - varBinds=_varBinds, nextName=None)) + actionFun( + varBind, + **dict(context, cbFun=_cbFun, state=state, status=status, + idx=idx, total=len(varBinds), instances=instances, + errors=errors, varBinds=_varBinds, nextName=None)) debug.logger & debug.FLAG_INS and debug.logger( 'flipFlopFsm: func %s initiated for %r' % (actionFun, varBind)) @@ -354,9 +375,10 @@ class MibInstrumController(AbstractMibInstrumController): def _defaultErrorHandler(varBinds, **context): """Raise exception on any error if user callback is missing""" errors = context.get('errors') + if errors: - error = errors[-1] - raise error['error'] + err = errors[-1] + raise err['error'] def readMibObjects(self, *varBinds, **context): """Read Managed Objects Instances. @@ -410,7 +432,7 @@ class MibInstrumController(AbstractMibInstrumController): if 'cbFun' not in context: context['cbFun'] = self._defaultErrorHandler - self.flipFlopFsm(self.fsmReadVar, *varBinds, **context) + self.flipFlopFsm(self.FSM_READ_VAR, *varBinds, **context) def readNextMibObjects(self, *varBinds, **context): """Read Managed Objects Instances next to the given ones. @@ -470,7 +492,7 @@ class MibInstrumController(AbstractMibInstrumController): if 'cbFun' not in context: context['cbFun'] = self._defaultErrorHandler - self.flipFlopFsm(self.fsmReadNextVar, *varBinds, **context) + self.flipFlopFsm(self.FSM_READ_NEXT_VAR, *varBinds, **context) def writeMibObjects(self, *varBinds, **context): """Create, destroy or modify Managed Objects Instances. @@ -539,4 +561,4 @@ class MibInstrumController(AbstractMibInstrumController): if 'cbFun' not in context: context['cbFun'] = self._defaultErrorHandler - self.flipFlopFsm(self.fsmWriteVar, *varBinds, **context) + self.flipFlopFsm(self.FSM_WRITE_VAR, *varBinds, **context) diff --git a/pysnmp/smi/rfc1902.py b/pysnmp/smi/rfc1902.py index 9a3d4507..40249a1e 100644 --- a/pysnmp/smi/rfc1902.py +++ b/pysnmp/smi/rfc1902.py @@ -4,9 +4,6 @@ # Copyright (c) 2005-2019, Ilya Etingof <etingof@gmail.com> # License: http://snmplabs.com/pysnmp/license.html # -from pyasn1.error import PyAsn1Error -from pyasn1.type.base import AbstractSimpleAsn1Item - from pysnmp import debug from pysnmp.proto import rfc1902 from pysnmp.proto import rfc1905 @@ -15,6 +12,9 @@ from pysnmp.smi.builder import ZipMibSource from pysnmp.smi.compiler import addMibCompiler from pysnmp.smi.error import SmiError +from pyasn1.error import PyAsn1Error +from pyasn1.type.base import AbstractSimpleAsn1Item + __all__ = ['ObjectIdentity', 'ObjectType', 'NotificationType'] @@ -78,19 +78,22 @@ class ObjectIdentity(object): ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0) >>> ObjectIdentity('IP-MIB', 'ipAdEntAddr', '127.0.0.1', 123) ObjectIdentity('IP-MIB', 'ipAdEntAddr', '127.0.0.1', 123) - """ ST_DIRTY, ST_CLEAN = 1, 2 def __init__(self, *args, **kwargs): - self.__args = args - self.__kwargs = kwargs - self.__mibSourcesToAdd = self.__modNamesToLoad = None - self.__asn1SourcesToAdd = self.__asn1SourcesOptions = None - self.__state = self.ST_DIRTY - self.__indices = self.__oid = self.__label = () - self.__modName = self.__symName = '' - self.__mibNode = None + self._args = args + self._kwargs = kwargs + self._mibSourcesToAdd = None + self._modNamesToLoad = None + self._asn1SourcesToAdd = None + self._asn1SourcesOptions = None + self._state = self.ST_DIRTY + self._indices = () + self._oid = () + self._label = () + self._modName = self._symName = '' + self._mibNode = None def getMibSymbol(self): """Returns MIB variable symbolic identification. @@ -116,12 +119,13 @@ class ObjectIdentity(object): >>> objectIdentity.getMibSymbol() ('SNMPv2-MIB', 'sysDescr', (0,)) >>> - """ - if self.__state & self.ST_CLEAN: - return self.__modName, self.__symName, self.__indices + if self._state & self.ST_CLEAN: + return self._modName, self._symName, self._indices + else: - raise SmiError('%s object not fully initialized' % self.__class__.__name__) + raise SmiError( + '%s object not fully initialized' % self.__class__.__name__) def getOid(self): """Returns OID identifying MIB variable. @@ -143,12 +147,13 @@ class ObjectIdentity(object): >>> objectIdentity.getOid() ObjectName('1.3.6.1.2.1.1.1.0') >>> - """ - if self.__state & self.ST_CLEAN: - return self.__oid + if self._state & self.ST_CLEAN: + return self._oid + else: - raise SmiError('%s object not fully initialized' % self.__class__.__name__) + raise SmiError( + '%s object not fully initialized' % self.__class__.__name__) def getLabel(self): """Returns symbolic path to this MIB variable. @@ -179,21 +184,24 @@ class ObjectIdentity(object): >>> objectIdentity.getOid() ('iso', 'org', 'dod', 'internet', 'mgmt', 'mib-2', 'system', 'sysDescr') >>> - """ - if self.__state & self.ST_CLEAN: - return self.__label + if self._state & self.ST_CLEAN: + return self._label + else: - raise SmiError('%s object not fully initialized' % self.__class__.__name__) + raise SmiError( + '%s object not fully initialized' % self.__class__.__name__) def getMibNode(self): - if self.__state & self.ST_CLEAN: - return self.__mibNode + if self._state & self.ST_CLEAN: + return self._mibNode + else: - raise SmiError('%s object not fully initialized' % self.__class__.__name__) + raise SmiError( + '%s object not fully initialized' % self.__class__.__name__) def isFullyResolved(self): - return self.__state & self.ST_CLEAN + return self._state & self.ST_CLEAN # # A gateway to MIBs manipulation routines @@ -227,16 +235,19 @@ class ObjectIdentity(object): >>> ObjectIdentity('SNMPv2-MIB', 'sysDescr').addAsn1Source('http://mibs.snmplabs.com/asn1/@mib@') ObjectIdentity('SNMPv2-MIB', 'sysDescr') >>> - """ - if self.__asn1SourcesToAdd is None: - self.__asn1SourcesToAdd = asn1Sources + if self._asn1SourcesToAdd is None: + self._asn1SourcesToAdd = asn1Sources + else: - self.__asn1SourcesToAdd += asn1Sources - if self.__asn1SourcesOptions: - self.__asn1SourcesOptions.update(kwargs) + self._asn1SourcesToAdd += asn1Sources + + if self._asn1SourcesOptions: + self._asn1SourcesOptions.update(kwargs) + else: - self.__asn1SourcesOptions = kwargs + self._asn1SourcesOptions = kwargs + return self def addMibSource(self, *mibSources): @@ -266,12 +277,13 @@ class ObjectIdentity(object): >>> ObjectIdentity('SNMPv2-MIB', 'sysDescr').addMibSource('/opt/pysnmp/mibs', 'pysnmp_mibs') ObjectIdentity('SNMPv2-MIB', 'sysDescr') >>> - """ - if self.__mibSourcesToAdd is None: - self.__mibSourcesToAdd = mibSources + if self._mibSourcesToAdd is None: + self._mibSourcesToAdd = mibSources + else: - self.__mibSourcesToAdd += mibSources + self._mibSourcesToAdd += mibSources + return self # provides deferred MIBs load @@ -294,12 +306,13 @@ class ObjectIdentity(object): >>> ObjectIdentity('SNMPv2-MIB', 'sysDescr').loadMibs('IF-MIB', 'TCP-MIB') ObjectIdentity('SNMPv2-MIB', 'sysDescr') >>> - """ - if self.__modNamesToLoad is None: - self.__modNamesToLoad = modNames + if self._modNamesToLoad is None: + self._modNamesToLoad = modNames + else: - self.__modNamesToLoad += modNames + self._modNamesToLoad += modNames + return self # this would eventually be called by an entity which posses a @@ -344,298 +357,382 @@ class ObjectIdentity(object): >>> objectIdentity.resolveWithMib(mibViewController) ObjectIdentity('SNMPv2-MIB', 'sysDescr') >>> - """ - if self.__mibSourcesToAdd is not None: - debug.logger & debug.FLAG_MIB and debug.logger('adding MIB sources %s' % ', '.join(self.__mibSourcesToAdd)) + if self._mibSourcesToAdd is not None: + debug.logger & debug.FLAG_MIB and debug.logger( + 'adding MIB sources %s' % ', '.join(self._mibSourcesToAdd)) + mibViewController.mibBuilder.addMibSources( - *[ZipMibSource(x) for x in self.__mibSourcesToAdd] - ) - self.__mibSourcesToAdd = None + *[ZipMibSource(x) for x in self._mibSourcesToAdd]) + + self._mibSourcesToAdd = None + + if self._asn1SourcesToAdd is None: + addMibCompiler( + mibViewController.mibBuilder, + ifAvailable=True, ifNotAdded=True) - if self.__asn1SourcesToAdd is None: - addMibCompiler(mibViewController.mibBuilder, - ifAvailable=True, ifNotAdded=True) else: debug.logger & debug.FLAG_MIB and debug.logger( - 'adding MIB compiler with source paths %s' % ', '.join(self.__asn1SourcesToAdd)) + 'adding MIB compiler with source paths ' + '%s' % ', '.join(self._asn1SourcesToAdd)) + addMibCompiler( mibViewController.mibBuilder, - sources=self.__asn1SourcesToAdd, - searchers=self.__asn1SourcesOptions.get('searchers'), - borrowers=self.__asn1SourcesOptions.get('borrowers'), - destination=self.__asn1SourcesOptions.get('destination'), - ifAvailable=self.__asn1SourcesOptions.get('ifAvailable'), - ifNotAdded=self.__asn1SourcesOptions.get('ifNotAdded') + sources=self._asn1SourcesToAdd, + searchers=self._asn1SourcesOptions.get('searchers'), + borrowers=self._asn1SourcesOptions.get('borrowers'), + destination=self._asn1SourcesOptions.get('destination'), + ifAvailable=self._asn1SourcesOptions.get('ifAvailable'), + ifNotAdded=self._asn1SourcesOptions.get('ifNotAdded') ) - self.__asn1SourcesToAdd = self.__asn1SourcesOptions = None - if self.__modNamesToLoad is not None: - debug.logger & debug.FLAG_MIB and debug.logger('loading MIB modules %s' % ', '.join(self.__modNamesToLoad)) - mibViewController.mibBuilder.loadModules(*self.__modNamesToLoad) - self.__modNamesToLoad = None + self._asn1SourcesToAdd = self._asn1SourcesOptions = None + + if self._modNamesToLoad is not None: + debug.logger & debug.FLAG_MIB and debug.logger( + 'loading MIB modules %s' % ', '.join(self._modNamesToLoad)) + + mibViewController.mibBuilder.loadModules(*self._modNamesToLoad) + + self._modNamesToLoad = None - if self.__state & self.ST_CLEAN: + if self._state & self.ST_CLEAN: return self - MibScalar, MibTableColumn = mibViewController.mibBuilder.importSymbols('SNMPv2-SMI', 'MibScalar', - 'MibTableColumn') + MibScalar, MibTableColumn = mibViewController.mibBuilder.importSymbols( + 'SNMPv2-SMI', 'MibScalar', 'MibTableColumn') - self.__indices = () + self._indices = () - if isinstance(self.__args[0], ObjectIdentity): - self.__args[0].resolveWithMib(mibViewController) + if isinstance(self._args[0], ObjectIdentity): + self._args[0].resolveWithMib(mibViewController) + + if len(self._args) == 1: # OID or label or MIB module + debug.logger & debug.FLAG_MIB and debug.logger( + 'resolving %s as OID or label' % self._args) - if len(self.__args) == 1: # OID or label or MIB module - debug.logger & debug.FLAG_MIB and debug.logger('resolving %s as OID or label' % self.__args) try: # pyasn1 ObjectIdentifier or sequence of ints or string OID - self.__oid = rfc1902.ObjectName(self.__args[0]) # OID + self._oid = rfc1902.ObjectName(self._args[0]) # OID + except PyAsn1Error: # sequence of sub-OIDs and labels - if isinstance(self.__args[0], (list, tuple)): + if isinstance(self._args[0], (list, tuple)): prefix, label, suffix = mibViewController.getNodeName( - self.__args[0] - ) + self._args[0]) + # string label - elif '.' in self.__args[0]: + elif '.' in self._args[0]: prefix, label, suffix = mibViewController.getNodeNameByOid( - tuple(self.__args[0].split('.')) - ) + tuple(self._args[0].split('.'))) + # MIB module name else: - modName = self.__args[0] + modName = self._args[0] + mibViewController.mibBuilder.loadModules(modName) - if self.__kwargs.get('last'): - prefix, label, suffix = mibViewController.getLastNodeName(modName) + + if self._kwargs.get('last'): + (prefix, label, + suffix) = mibViewController.getLastNodeName(modName) + else: - prefix, label, suffix = mibViewController.getFirstNodeName(modName) + (prefix, label, + suffix) = mibViewController.getFirstNodeName(modName) if suffix: try: - suffix = tuple([int(x) for x in suffix]) + suffix = tuple(int(x) for x in suffix) + except ValueError: - raise SmiError('Unknown object name component %r' % (suffix,)) - self.__oid = rfc1902.ObjectName(prefix + suffix) + raise SmiError( + 'Unknown object name component %r' % (suffix,)) + + self._oid = rfc1902.ObjectName(prefix + suffix) + else: prefix, label, suffix = mibViewController.getNodeNameByOid( - self.__oid - ) + self._oid) debug.logger & debug.FLAG_MIB and debug.logger( - 'resolved %r into prefix %r and suffix %r' % (self.__args, prefix, suffix)) + 'resolved %r into prefix %r and suffix ' + '%r' % (self._args, prefix, suffix)) modName, symName, _ = mibViewController.getNodeLocation(prefix) - self.__modName = modName - self.__symName = symName + self._modName = modName + self._symName = symName - self.__label = label + self._label = label mibNode, = mibViewController.mibBuilder.importSymbols( - modName, symName - ) + modName, symName) - self.__mibNode = mibNode + self._mibNode = mibNode - debug.logger & debug.FLAG_MIB and debug.logger('resolved prefix %r into MIB node %r' % (prefix, mibNode)) + debug.logger & debug.FLAG_MIB and debug.logger( + 'resolved prefix %r into MIB node %r' % (prefix, mibNode)) if isinstance(mibNode, MibTableColumn): # table column if suffix: rowModName, rowSymName, _ = mibViewController.getNodeLocation( mibNode.name[:-1] ) + rowNode, = mibViewController.mibBuilder.importSymbols( rowModName, rowSymName ) - self.__indices = rowNode.getIndicesFromInstId(suffix) + + self._indices = rowNode.getIndicesFromInstId(suffix) + elif isinstance(mibNode, MibScalar): # scalar if suffix: - self.__indices = (rfc1902.ObjectName(suffix),) + self._indices = (rfc1902.ObjectName(suffix),) + else: if suffix: - self.__indices = (rfc1902.ObjectName(suffix),) - self.__state |= self.ST_CLEAN + self._indices = (rfc1902.ObjectName(suffix),) + + self._state |= self.ST_CLEAN - debug.logger & debug.FLAG_MIB and debug.logger('resolved indices are %r' % (self.__indices,)) + debug.logger & debug.FLAG_MIB and debug.logger( + 'resolved indices are %r' % (self._indices,)) return self - elif len(self.__args) > 1: # MIB, symbol[, index, index ...] + + elif len(self._args) > 1: # MIB, symbol[, index, index ...] # MIB, symbol, index, index - if self.__args[0] and self.__args[1]: - self.__modName = self.__args[0] - self.__symName = self.__args[1] + if self._args[0] and self._args[1]: + self._modName = self._args[0] + self._symName = self._args[1] + # MIB, '' - elif self.__args[0]: - mibViewController.mibBuilder.loadModules(self.__args[0]) - if self.__kwargs.get('last'): - prefix, label, suffix = mibViewController.getLastNodeName(self.__args[0]) + elif self._args[0]: + mibViewController.mibBuilder.loadModules(self._args[0]) + + if self._kwargs.get('last'): + (prefix, label, + suffix) = mibViewController.getLastNodeName(self._args[0]) + else: - prefix, label, suffix = mibViewController.getFirstNodeName(self.__args[0]) - self.__modName, self.__symName, _ = mibViewController.getNodeLocation(prefix) + (prefix, label, + suffix) = mibViewController.getFirstNodeName(self._args[0]) + + (self._modName, + self._symName, _) = mibViewController.getNodeLocation(prefix) + # '', symbol, index, index else: - prefix, label, suffix = mibViewController.getNodeName(self.__args[1:]) - self.__modName, self.__symName, _ = mibViewController.getNodeLocation(prefix) + (prefix, label, + suffix) = mibViewController.getNodeName(self._args[1:]) + + (self._modName, + self._symName, _) = mibViewController.getNodeLocation(prefix) mibNode, = mibViewController.mibBuilder.importSymbols( - self.__modName, self.__symName - ) + self._modName, self._symName) - self.__mibNode = mibNode + self._mibNode = mibNode - self.__oid = rfc1902.ObjectName(mibNode.getName()) + self._oid = rfc1902.ObjectName(mibNode.getName()) - prefix, label, suffix = mibViewController.getNodeNameByOid( - self.__oid - ) - self.__label = label + (prefix, label, + suffix) = mibViewController.getNodeNameByOid( + self._oid) + + self._label = label debug.logger & debug.FLAG_MIB and debug.logger( - 'resolved %r into prefix %r and suffix %r' % (self.__args, prefix, suffix)) + 'resolved %r into prefix %r and suffix ' + '%r' % (self._args, prefix, suffix)) if isinstance(mibNode, MibTableColumn): # table rowModName, rowSymName, _ = mibViewController.getNodeLocation( - mibNode.name[:-1] - ) + mibNode.name[:-1]) + rowNode, = mibViewController.mibBuilder.importSymbols( - rowModName, rowSymName - ) - if self.__args[2:]: + rowModName, rowSymName) + + if self._args[2:]: try: - instIds = rowNode.getInstIdFromIndices(*self.__args[2:]) - self.__oid += instIds - self.__indices = rowNode.getIndicesFromInstId(instIds) + instIds = rowNode.getInstIdFromIndices(*self._args[2:]) + self._oid += instIds + self._indices = rowNode.getIndicesFromInstId(instIds) + except PyAsn1Error as exc: - raise SmiError('Instance index %r to OID conversion failure at object %r: %s' % ( - self.__args[2:], mibNode.getLabel(), exc)) - elif self.__args[2:]: # any other kind of MIB node with indices - if self.__args[2:]: + raise SmiError( + 'Instance index %r to OID conversion failure ' + 'at object %r: ' + '%s' % (self._args[2:], mibNode.getLabel(), exc)) + + elif self._args[2:]: # any other kind of MIB node with indices + if self._args[2:]: instId = rfc1902.ObjectName( - '.'.join([str(x) for x in self.__args[2:]]) - ) - self.__oid += instId - self.__indices = (instId,) - self.__state |= self.ST_CLEAN + '.'.join(str(x) for x in self._args[2:])) + + self._oid += instId + + self._indices = (instId,) - debug.logger & debug.FLAG_MIB and debug.logger('resolved indices are %r' % (self.__indices,)) + self._state |= self.ST_CLEAN + + debug.logger & debug.FLAG_MIB and debug.logger( + 'resolved indices are %r' % (self._indices,)) return self + else: raise SmiError('Non-OID, label or MIB symbol') def prettyPrint(self): - if self.__state & self.ST_CLEAN: + if self._state & self.ST_CLEAN: s = rfc1902.OctetString() + return '%s::%s%s%s' % ( - self.__modName, self.__symName, - self.__indices and '.' or '', - '.'.join([x.isSuperTypeOf(s, matchConstraints=False) and '"%s"' % x.prettyPrint() or x.prettyPrint() for x in self.__indices]) - ) + self._modName, self._symName, + self._indices and '.' or '', + '.'.join((x.isSuperTypeOf(s, matchConstraints=False) and + '"%s"' % x.prettyPrint() or + x.prettyPrint()) for x in self._indices)) + else: - raise SmiError('%s object not fully initialized' % self.__class__.__name__) + raise SmiError( + '%s object not fully initialized' % self.__class__.__name__) def __repr__(self): - return '%s(%s)' % (self.__class__.__name__, ', '.join([repr(x) for x in self.__args])) + return '%s(%s)' % (self.__class__.__name__, + ', '.join(repr(x) for x in self._args)) # Redirect some attrs access to the OID object to behave alike def __str__(self): - if self.__state & self.ST_CLEAN: - return str(self.__oid) + if self._state & self.ST_CLEAN: + return str(self._oid) + else: - raise SmiError('%s object not properly initialized' % self.__class__.__name__) + raise SmiError('%s object not properly ' + 'initialized' % self.__class__.__name__) def __eq__(self, other): - if self.__state & self.ST_CLEAN: - return self.__oid == other + if self._state & self.ST_CLEAN: + return self._oid == other + else: - raise SmiError('%s object not properly initialized' % self.__class__.__name__) + raise SmiError('%s object not properly ' + 'initialized' % self.__class__.__name__) def __ne__(self, other): - if self.__state & self.ST_CLEAN: - return self.__oid != other + if self._state & self.ST_CLEAN: + return self._oid != other + else: - raise SmiError('%s object not properly initialized' % self.__class__.__name__) + raise SmiError('%s object not properly ' + 'initialized' % self.__class__.__name__) def __lt__(self, other): - if self.__state & self.ST_CLEAN: - return self.__oid < other + if self._state & self.ST_CLEAN: + return self._oid < other + else: - raise SmiError('%s object not properly initialized' % self.__class__.__name__) + raise SmiError('%s object not properly ' + 'initialized' % self.__class__.__name__) def __le__(self, other): - if self.__state & self.ST_CLEAN: - return self.__oid <= other + if self._state & self.ST_CLEAN: + return self._oid <= other + else: - raise SmiError('%s object not properly initialized' % self.__class__.__name__) + raise SmiError('%s object not properly ' + 'initialized' % self.__class__.__name__) def __gt__(self, other): - if self.__state & self.ST_CLEAN: - return self.__oid > other + if self._state & self.ST_CLEAN: + return self._oid > other + else: - raise SmiError('%s object not properly initialized' % self.__class__.__name__) + raise SmiError('%s object not properly ' + 'initialized' % self.__class__.__name__) def __ge__(self, other): - if self.__state & self.ST_CLEAN: - return self.__oid > other + if self._state & self.ST_CLEAN: + return self._oid > other + else: - raise SmiError('%s object not properly initialized' % self.__class__.__name__) + raise SmiError('%s object not properly ' + 'initialized' % self.__class__.__name__) def __nonzero__(self): - if self.__state & self.ST_CLEAN: - return self.__oid != 0 + if self._state & self.ST_CLEAN: + return self._oid != 0 + else: - raise SmiError('%s object not properly initialized' % self.__class__.__name__) + raise SmiError('%s object not properly ' + 'initialized' % self.__class__.__name__) def __bool__(self): - if self.__state & self.ST_CLEAN: - return bool(self.__oid) + if self._state & self.ST_CLEAN: + return bool(self._oid) + else: - raise SmiError('%s object not properly initialized' % self.__class__.__name__) + raise SmiError('%s object not properly ' + 'initialized' % self.__class__.__name__) def __getitem__(self, i): - if self.__state & self.ST_CLEAN: - return self.__oid[i] + if self._state & self.ST_CLEAN: + return self._oid[i] + else: - raise SmiError('%s object not properly initialized' % self.__class__.__name__) + raise SmiError('%s object not properly ' + 'initialized' % self.__class__.__name__) def __len__(self): - if self.__state & self.ST_CLEAN: - return len(self.__oid) + if self._state & self.ST_CLEAN: + return len(self._oid) + else: - raise SmiError('%s object not properly initialized' % self.__class__.__name__) + raise SmiError('%s object not properly ' + 'initialized' % self.__class__.__name__) def __add__(self, other): - if self.__state & self.ST_CLEAN: - return self.__oid + other + if self._state & self.ST_CLEAN: + return self._oid + other + else: - raise SmiError('%s object not properly initialized' % self.__class__.__name__) + raise SmiError('%s object not properly ' + 'initialized' % self.__class__.__name__) def __radd__(self, other): - if self.__state & self.ST_CLEAN: - return other + self.__oid + if self._state & self.ST_CLEAN: + return other + self._oid + else: - raise SmiError('%s object not properly initialized' % self.__class__.__name__) + raise SmiError('%s object not properly ' + 'initialized' % self.__class__.__name__) def __hash__(self): - if self.__state & self.ST_CLEAN: - return hash(self.__oid) + if self._state & self.ST_CLEAN: + return hash(self._oid) + else: - raise SmiError('%s object not properly initialized' % self.__class__.__name__) + raise SmiError('%s object not properly ' + 'initialized' % self.__class__.__name__) def __getattr__(self, attr): - if self.__state & self.ST_CLEAN: + if self._state & self.ST_CLEAN: if attr in ('asTuple', 'clone', 'subtype', 'isPrefixOf', 'isSameTypeWith', 'isSuperTypeOf', 'getTagSet', 'getEffectiveTagSet', 'getTagMap', 'tagSet', 'index'): - return getattr(self.__oid, attr) + return getattr(self._oid, attr) + raise AttributeError(attr) + else: - raise SmiError('%s object not properly initialized for accessing %s' % (self.__class__.__name__, attr)) + raise SmiError( + '%s object not properly initialized for accessing ' + '%s' % (self.__class__.__name__, attr)) -# A two-element sequence of ObjectIdentity and SNMP data type object class ObjectType(object): """Create an object representing MIB variable. @@ -692,30 +789,35 @@ class ObjectType(object): ObjectType(ObjectIdentity('1.3.6.1.2.1.1.1.0'), Null('')) >>> ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0), 'Linux i386') ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0), 'Linux i386') - """ ST_DIRTY, ST_CLEAM = 1, 2 def __init__(self, objectIdentity, objectSyntax=rfc1905.unSpecified): if not isinstance(objectIdentity, ObjectIdentity): - raise SmiError('initializer should be ObjectIdentity instance, not %r' % (objectIdentity,)) - self.__args = [objectIdentity, objectSyntax] - self.__state = self.ST_DIRTY + raise SmiError( + 'initializer should be ObjectIdentity instance, ' + 'not %r' % (objectIdentity,)) + + self._args = [objectIdentity, objectSyntax] + self._state = self.ST_DIRTY def __getitem__(self, i): - if self.__state & self.ST_CLEAM: - return self.__args[i] + if self._state & self.ST_CLEAM: + return self._args[i] + else: - raise SmiError('%s object not fully initialized' % self.__class__.__name__) + raise SmiError( + '%s object not fully initialized' % self.__class__.__name__) def __str__(self): return self.prettyPrint() def __repr__(self): - return '%s(%s)' % (self.__class__.__name__, ', '.join([repr(x) for x in self.__args])) + return '%s(%s)' % (self.__class__.__name__, + ', '.join([repr(x) for x in self._args])) def isFullyResolved(self): - return self.__state & self.ST_CLEAM + return self._state & self.ST_CLEAM def addAsn1MibSource(self, *asn1Sources, **kwargs): """Adds path to a repository to search ASN.1 MIB files. @@ -747,7 +849,7 @@ class ObjectType(object): >>> """ - self.__args[0].addAsn1MibSource(*asn1Sources, **kwargs) + self._args[0].addAsn1MibSource(*asn1Sources, **kwargs) return self def addMibSource(self, *mibSources): @@ -779,7 +881,7 @@ class ObjectType(object): >>> """ - self.__args[0].addMibSource(*mibSources) + self._args[0].addMibSource(*mibSources) return self def loadMibs(self, *modNames): @@ -803,7 +905,7 @@ class ObjectType(object): >>> """ - self.__args[0].loadMibs(*modNames) + self._args[0].loadMibs(*modNames) return self def resolveWithMib(self, mibViewController): @@ -842,50 +944,61 @@ class ObjectType(object): >>> """ - if self.__state & self.ST_CLEAM: + if self._state & self.ST_CLEAM: return self - self.__args[0].resolveWithMib(mibViewController) + self._args[0].resolveWithMib(mibViewController) - MibScalar, MibTableColumn = mibViewController.mibBuilder.importSymbols('SNMPv2-SMI', 'MibScalar', - 'MibTableColumn') + MibScalar, MibTableColumn = mibViewController.mibBuilder.importSymbols( + 'SNMPv2-SMI', 'MibScalar', 'MibTableColumn') - if not isinstance(self.__args[0].getMibNode(), + if not isinstance(self._args[0].getMibNode(), (MibScalar, MibTableColumn)): - if not isinstance(self.__args[1], AbstractSimpleAsn1Item): - raise SmiError('MIB object %r is not OBJECT-TYPE (MIB not loaded?)' % (self.__args[0],)) - self.__state |= self.ST_CLEAM + if not isinstance(self._args[1], AbstractSimpleAsn1Item): + raise SmiError('MIB object %r is not OBJECT-TYPE ' + '(MIB not loaded?)' % (self._args[0],)) + + self._state |= self.ST_CLEAM + return self - if isinstance(self.__args[1], (rfc1905.UnSpecified, - rfc1905.NoSuchObject, - rfc1905.NoSuchInstance, - rfc1905.EndOfMibView)): - self.__state |= self.ST_CLEAM + if isinstance(self._args[1], (rfc1905.UnSpecified, + rfc1905.NoSuchObject, + rfc1905.NoSuchInstance, + rfc1905.EndOfMibView)): + self._state |= self.ST_CLEAM return self + syntax = self._args[0].getMibNode().getSyntax() + try: - self.__args[1] = self.__args[0].getMibNode().getSyntax().clone(self.__args[1]) + self._args[1] = syntax.clone(self._args[1]) + except PyAsn1Error as exc: - raise SmiError('MIB object %r having type %r failed to cast value %r: %s' % ( - self.__args[0].prettyPrint(), self.__args[0].getMibNode().getSyntax().__class__.__name__, self.__args[1], - exc)) + raise SmiError( + 'MIB object %r having type %r failed to cast value ' + '%r: %s' % (self._args[0].prettyPrint(), + syntax.__class__.__name__, self._args[1], exc)) - if rfc1902.ObjectIdentifier().isSuperTypeOf(self.__args[1], matchConstraints=False): - self.__args[1] = ObjectIdentity(self.__args[1]).resolveWithMib(mibViewController) + if rfc1902.ObjectIdentifier().isSuperTypeOf( + self._args[1], matchConstraints=False): + self._args[1] = ObjectIdentity( + self._args[1]).resolveWithMib(mibViewController) - self.__state |= self.ST_CLEAM + self._state |= self.ST_CLEAM - debug.logger & debug.FLAG_MIB and debug.logger('resolved %r syntax is %r' % (self.__args[0], self.__args[1])) + debug.logger & debug.FLAG_MIB and debug.logger( + 'resolved %r syntax is %r' % (self._args[0], self._args[1])) return self def prettyPrint(self): - if self.__state & self.ST_CLEAM: - return '%s = %s' % (self.__args[0].prettyPrint(), - self.__args[1].prettyPrint()) + if self._state & self.ST_CLEAM: + return '%s = %s' % (self._args[0].prettyPrint(), + self._args[1].prettyPrint()) else: - raise SmiError('%s object not fully initialized' % self.__class__.__name__) + raise SmiError( + '%s object not fully initialized' % self.__class__.__name__) class NotificationType(object): @@ -956,22 +1069,29 @@ class NotificationType(object): def __init__(self, objectIdentity, instanceIndex=(), objects={}): if not isinstance(objectIdentity, ObjectIdentity): - raise SmiError('initializer should be ObjectIdentity instance, not %r' % (objectIdentity,)) - self.__objectIdentity = objectIdentity - self.__instanceIndex = instanceIndex - self.__objects = objects - self.__varBinds = [] - self.__additionalVarBinds = [] - self.__state = self.ST_DIRTY + raise SmiError( + 'initializer should be ObjectIdentity instance, not ' + '%r' % (objectIdentity,)) + + self._objectIdentity = objectIdentity + self._instanceIndex = instanceIndex + self._objects = objects + self._varBinds = [] + self._additionalVarBinds = [] + self._state = self.ST_DIRTY def __getitem__(self, i): - if self.__state & self.ST_CLEAN: - return self.__varBinds[i] + if self._state & self.ST_CLEAN: + return self._varBinds[i] + else: - raise SmiError('%s object not fully initialized' % self.__class__.__name__) + raise SmiError( + '%s object not fully initialized' % self.__class__.__name__) def __repr__(self): - return '%s(%r, %r, %r)' % (self.__class__.__name__, self.__objectIdentity, self.__instanceIndex, self.__objects) + return '%s(%r, %r, %r)' % ( + self.__class__.__name__, self._objectIdentity, + self._instanceIndex, self._objects) def addVarBinds(self, *varBinds): """Appends variable-binding to notification. @@ -999,13 +1119,17 @@ class NotificationType(object): >>> nt.addVarBinds(ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0))) NotificationType(ObjectIdentity('IP-MIB', 'linkDown'), (), {}) >>> - """ - debug.logger & debug.FLAG_MIB and debug.logger('additional var-binds: %r' % (varBinds,)) - if self.__state & self.ST_CLEAN: - raise SmiError('%s object is already sealed' % self.__class__.__name__) + debug.logger & debug.FLAG_MIB and debug.logger( + 'additional var-binds: %r' % (varBinds,)) + + if self._state & self.ST_CLEAN: + raise SmiError( + '%s object is already sealed' % self.__class__.__name__) + else: - self.__additionalVarBinds.extend(varBinds) + self._additionalVarBinds.extend(varBinds) + return self def addAsn1MibSource(self, *asn1Sources, **kwargs): @@ -1036,9 +1160,8 @@ class NotificationType(object): >>> NotificationType(ObjectIdentity('IF-MIB', 'linkDown'), (), {}).addAsn1Source('http://mibs.snmplabs.com/asn1/@mib@') NotificationType(ObjectIdentity('IF-MIB', 'linkDown'), (), {}) >>> - """ - self.__objectIdentity.addAsn1MibSource(*asn1Sources, **kwargs) + self._objectIdentity.addAsn1MibSource(*asn1Sources, **kwargs) return self def addMibSource(self, *mibSources): @@ -1068,9 +1191,8 @@ class NotificationType(object): >>> NotificationType(ObjectIdentity('IF-MIB', 'linkDown'), (), {}).addMibSource('/opt/pysnmp/mibs', 'pysnmp_mibs') NotificationType(ObjectIdentity('IF-MIB', 'linkDown'), (), {}) >>> - """ - self.__objectIdentity.addMibSource(*mibSources) + self._objectIdentity.addMibSource(*mibSources) return self def loadMibs(self, *modNames): @@ -1092,13 +1214,12 @@ class NotificationType(object): >>> NotificationType(ObjectIdentity('IF-MIB', 'linkDown'), (), {}).loadMibs('IF-MIB', 'TCP-MIB') NotificationType(ObjectIdentity('IF-MIB', 'linkDown'), (), {}) >>> - """ - self.__objectIdentity.loadMibs(*modNames) + self._objectIdentity.loadMibs(*modNames) return self def isFullyResolved(self): - return self.__state & self.ST_CLEAN + return self._state & self.ST_CLEAN def resolveWithMib(self, mibViewController): """Perform MIB variable ID conversion and notification objects expansion. @@ -1136,57 +1257,71 @@ class NotificationType(object): >>> notificationType.resolveWithMib(mibViewController) NotificationType(ObjectIdentity('IF-MIB', 'linkDown'), (), {}) >>> - """ - if self.__state & self.ST_CLEAN: + if self._state & self.ST_CLEAN: return self - self.__objectIdentity.resolveWithMib(mibViewController) + self._objectIdentity.resolveWithMib(mibViewController) - self.__varBinds.append( + self._varBinds.append( ObjectType(ObjectIdentity(v2c.apiTrapPDU.snmpTrapOID), - self.__objectIdentity).resolveWithMib(mibViewController) - ) + self._objectIdentity).resolveWithMib(mibViewController)) - SmiNotificationType, = mibViewController.mibBuilder.importSymbols('SNMPv2-SMI', 'NotificationType') + SmiNotificationType, = mibViewController.mibBuilder.importSymbols( + 'SNMPv2-SMI', 'NotificationType') - mibNode = self.__objectIdentity.getMibNode() + mibNode = self._objectIdentity.getMibNode() varBindsLocation = {} if isinstance(mibNode, SmiNotificationType): + for notificationObject in mibNode.getObjects(): - objectIdentity = ObjectIdentity(*notificationObject + self.__instanceIndex).resolveWithMib( - mibViewController) - self.__varBinds.append( - ObjectType(objectIdentity, - self.__objects.get(notificationObject, rfc1905.unSpecified)).resolveWithMib( - mibViewController) - ) - varBindsLocation[objectIdentity] = len(self.__varBinds) - 1 + objectIdentity = ObjectIdentity( + *notificationObject + self._instanceIndex) + + objectIdentity.resolveWithMib(mibViewController) + + objectType = ObjectType( + objectIdentity, self._objects.get( + notificationObject, rfc1905.unSpecified)) + + objectType.resolveWithMib(mibViewController) + + self._varBinds.append(objectType) + + varBindsLocation[objectIdentity] = len(self._varBinds) - 1 + else: debug.logger & debug.FLAG_MIB and debug.logger( - 'WARNING: MIB object %r is not NOTIFICATION-TYPE (MIB not loaded?)' % (self.__objectIdentity,)) + 'WARNING: MIB object %r is not NOTIFICATION-TYPE (MIB not ' + 'loaded?)' % (self._objectIdentity,)) - for varBinds in self.__additionalVarBinds: + for varBinds in self._additionalVarBinds: if not isinstance(varBinds, ObjectType): varBinds = ObjectType(ObjectIdentity(varBinds[0]), varBinds[1]) + varBinds.resolveWithMib(mibViewController) + if varBinds[0] in varBindsLocation: - self.__varBinds[varBindsLocation[varBinds[0]]] = varBinds + self._varBinds[varBindsLocation[varBinds[0]]] = varBinds + else: - self.__varBinds.append(varBinds) + self._varBinds.append(varBinds) - self.__additionalVarBinds = [] + self._additionalVarBinds = [] - self.__state |= self.ST_CLEAN + self._state |= self.ST_CLEAN - debug.logger & debug.FLAG_MIB and debug.logger('resolved %r into %r' % (self.__objectIdentity, self.__varBinds)) + debug.logger & debug.FLAG_MIB and debug.logger( + 'resolved %r into %r' % (self._objectIdentity, self._varBinds)) return self def prettyPrint(self): - if self.__state & self.ST_CLEAN: - return ' '.join(['%s = %s' % (x[0].prettyPrint(), x[1].prettyPrint()) for x in self.__varBinds]) - else: - raise SmiError('%s object not fully initialized' % self.__class__.__name__) + if self._state & self.ST_CLEAN: + return ' '.join('%s = %s' % (x[0].prettyPrint(), x[1].prettyPrint()) + for x in self._varBinds) + + raise SmiError( + '%s object not fully initialized' % self.__class__.__name__) diff --git a/pysnmp/smi/view.py b/pysnmp/smi/view.py index e9fd405e..46ce5991 100644 --- a/pysnmp/smi/view.py +++ b/pysnmp/smi/view.py @@ -6,13 +6,6 @@ # import sys -from pysnmp import debug -from pysnmp.smi import error -from pysnmp.smi.indices import OidOrderedDict -from pysnmp.smi.indices import OrderedDict - -__all__ = ['MibViewController'] - if sys.version_info[0] <= 2: import types @@ -22,52 +15,69 @@ else: classTypes = (type,) instanceTypes = (object,) +from pysnmp import debug +from pysnmp.smi import error +from pysnmp.smi.indices import OidOrderedDict +from pysnmp.smi.indices import OrderedDict + +__all__ = ['MibViewController'] + class MibViewController(object): def __init__(self, mibBuilder): self.mibBuilder = mibBuilder self.lastBuildId = -1 - self.__mibSymbolsIdx = OrderedDict() + self._mibSymbolsIdx = OrderedDict() # Indexing part + def _sortFun(self, name): + # This is potentially ambiguous mapping. Sort modules in + # ascending age for resolution + mb = self.mibBuilder + + if mb.moduleID in mb.mibSymbols[name]: + mib = mb.mibSymbols[name][mb.moduleID] + revs = mib.getRevisions() + if revs: + return revs[0] + + return '1970-01-01 00:00' + def indexMib(self): if self.lastBuildId == self.mibBuilder.lastBuildId: return - debug.logger & debug.FLAG_MIB and debug.logger('indexMib: re-indexing MIB view') + debug.logger & debug.FLAG_MIB and debug.logger( + 'indexMib: re-indexing MIB view') MibScalarInstance, = self.mibBuilder.importSymbols( - 'SNMPv2-SMI', 'MibScalarInstance' - ) + 'SNMPv2-SMI', 'MibScalarInstance') # # Create indices # # Module name -> module-scope indices - self.__mibSymbolsIdx.clear() + self._mibSymbolsIdx.clear() - # Oid <-> label indices + globMibMod = { + 'oidToLabelIdx': OidOrderedDict(), + 'labelToOidIdx': {}, + 'varToNameIdx': {}, + 'typeToModIdx': OrderedDict(), + 'oidToModIdx': {} + } - # This is potentially ambiguous mapping. Sort modules in - # ascending age for resolution - def __sortFun(x, b=self.mibBuilder): - if b.moduleID in b.mibSymbols[x]: - m = b.mibSymbols[x][b.moduleID] - r = m.getRevisions() - if r: - return r[0] + self._mibSymbolsIdx[''] = globMibMod - return "1970-01-01 00:00" + # Oid <-> label indices - modNames = list(self.mibBuilder.mibSymbols.keys()) - modNames.sort(key=__sortFun) + modNames = sorted(self.mibBuilder.mibSymbols, key=self._sortFun) # Index modules names - for modName in [''] + modNames: - # Modules index - self.__mibSymbolsIdx[modName] = mibMod = { + for modName in modNames: + mibMod = { 'oidToLabelIdx': OidOrderedDict(), 'labelToOidIdx': {}, 'varToNameIdx': {}, @@ -75,76 +85,93 @@ class MibViewController(object): 'oidToModIdx': {} } - if not modName: - globMibMod = mibMod - continue + self._mibSymbolsIdx[modName] = mibMod # Types & MIB vars indices for n, v in self.mibBuilder.mibSymbols[modName].items(): if n == self.mibBuilder.moduleID: # do not index this continue # special symbol + if isinstance(v, classTypes): if n in mibMod['typeToModIdx']: raise error.SmiError( - 'Duplicate SMI type %s::%s, has %s' % (modName, n, mibMod['typeToModIdx'][n]) - ) + 'Duplicate SMI type %s::%s, has ' + '%s' % (modName, n, mibMod['typeToModIdx'][n])) + globMibMod['typeToModIdx'][n] = modName mibMod['typeToModIdx'][n] = modName + elif isinstance(v, instanceTypes): if isinstance(v, MibScalarInstance): continue + if n in mibMod['varToNameIdx']: raise error.SmiError( - 'Duplicate MIB variable %s::%s has %s' % (modName, n, mibMod['varToNameIdx'][n]) - ) + 'Duplicate MIB variable %s::%s has ' + '%s' % (modName, n, mibMod['varToNameIdx'][n])) + globMibMod['varToNameIdx'][n] = v.name mibMod['varToNameIdx'][n] = v.name - # Potentionally ambiguous mapping ahead + + # Potentially ambiguous mapping ahead globMibMod['oidToModIdx'][v.name] = modName mibMod['oidToModIdx'][v.name] = modName globMibMod['oidToLabelIdx'][v.name] = (n,) mibMod['oidToLabelIdx'][v.name] = (n,) + else: raise error.SmiError( - 'Unexpected object %s::%s' % (modName, n) - ) + 'Unexpected object %s::%s' % (modName, n)) # Build oid->long-label index - oidToLabelIdx = self.__mibSymbolsIdx['']['oidToLabelIdx'] - labelToOidIdx = self.__mibSymbolsIdx['']['labelToOidIdx'] + oidToLabelIdx = self._mibSymbolsIdx['']['oidToLabelIdx'] + labelToOidIdx = self._mibSymbolsIdx['']['labelToOidIdx'] + prevOid = () baseLabel = () - for key in oidToLabelIdx.keys(): + + for key in oidToLabelIdx: keydiff = len(key) - len(prevOid) + if keydiff > 0: if prevOid: if keydiff == 1: baseLabel = oidToLabelIdx[prevOid] + else: baseLabel += key[-keydiff:-1] else: baseLabel = () + elif keydiff < 0: baseLabel = () keyLen = len(key) + i = keyLen - 1 + while i: k = key[:i] + if k in oidToLabelIdx: baseLabel = oidToLabelIdx[k] + if i != keyLen - 1: baseLabel += key[i:-1] + break + i -= 1 + # Build oid->long-label index oidToLabelIdx[key] = baseLabel + oidToLabelIdx[key] + # Build label->oid index labelToOidIdx[oidToLabelIdx[key]] = key prevOid = key # Build module-scope oid->long-label index - for mibMod in self.__mibSymbolsIdx.values(): - for oid in mibMod['oidToLabelIdx'].keys(): + for mibMod in self._mibSymbolsIdx.values(): + for oid in mibMod['oidToLabelIdx']: mibMod['oidToLabelIdx'][oid] = oidToLabelIdx[oid] mibMod['labelToOidIdx'][oidToLabelIdx[oid]] = oid @@ -154,9 +181,11 @@ class MibViewController(object): def getOrderedModuleName(self, index): self.indexMib() - modNames = self.__mibSymbolsIdx.keys() + + modNames = self._mibSymbolsIdx if modNames: return modNames[index] + raise error.SmiError('No modules loaded at %s' % self) def getFirstModuleName(self): @@ -167,69 +196,89 @@ class MibViewController(object): def getNextModuleName(self, modName): self.indexMib() + try: - return self.__mibSymbolsIdx.nextKey(modName) + return self._mibSymbolsIdx.nextKey(modName) + except KeyError: raise error.SmiError( - 'No module next to %s at %s' % (modName, self) - ) + 'No module next to %s at %s' % (modName, self)) # MIB tree node management - def __getOidLabel(self, nodeName, oidToLabelIdx, labelToOidIdx): + def _getOidLabel(self, nodeName, oidToLabelIdx, labelToOidIdx): """getOidLabel(nodeName) -> (oid, label, suffix)""" if not nodeName: return nodeName, nodeName, () + if nodeName in labelToOidIdx: return labelToOidIdx[nodeName], nodeName, () + if nodeName in oidToLabelIdx: return nodeName, oidToLabelIdx[nodeName], () + if len(nodeName) < 2: return nodeName, nodeName, () - oid, label, suffix = self.__getOidLabel( - nodeName[:-1], oidToLabelIdx, labelToOidIdx - ) + + oid, label, suffix = self._getOidLabel( + nodeName[:-1], oidToLabelIdx, labelToOidIdx) + suffix = suffix + nodeName[-1:] + resLabel = label + tuple([str(x) for x in suffix]) if resLabel in labelToOidIdx: return labelToOidIdx[resLabel], resLabel, () + resOid = oid + suffix if resOid in oidToLabelIdx: return resOid, oidToLabelIdx[resOid], () + return oid, label, suffix def getNodeNameByOid(self, nodeName, modName=''): self.indexMib() - if modName in self.__mibSymbolsIdx: - mibMod = self.__mibSymbolsIdx[modName] + + if modName in self._mibSymbolsIdx: + mibMod = self._mibSymbolsIdx[modName] + else: raise error.SmiError('No module %s at %s' % (modName, self)) - oid, label, suffix = self.__getOidLabel( - nodeName, mibMod['oidToLabelIdx'], mibMod['labelToOidIdx'] - ) + + oid, label, suffix = self._getOidLabel( + nodeName, mibMod['oidToLabelIdx'], mibMod['labelToOidIdx']) + if oid == label: raise error.NoSuchObjectError( - str='Can\'t resolve node name %s::%s at %s' % - (modName, nodeName, self) - ) + str='Cannot resolve node name %s::%s at ' + '%s' % (modName, nodeName, self)) + debug.logger & debug.FLAG_MIB and debug.logger( - 'getNodeNameByOid: resolved %s:%s -> %s.%s' % (modName, nodeName, label, suffix)) + 'getNodeNameByOid: resolved %s:%s -> ' + '%s.%s' % (modName, nodeName, label, suffix)) + return oid, label, suffix def getNodeNameByDesc(self, nodeName, modName=''): self.indexMib() - if modName in self.__mibSymbolsIdx: - mibMod = self.__mibSymbolsIdx[modName] + + if modName in self._mibSymbolsIdx: + mibMod = self._mibSymbolsIdx[modName] + else: raise error.SmiError('No module %s at %s' % (modName, self)) + if nodeName in mibMod['varToNameIdx']: oid = mibMod['varToNameIdx'][nodeName] + else: raise error.NoSuchObjectError( - str='No such symbol %s::%s at %s' % (modName, nodeName, self) - ) + str='No such symbol %s::%s at ' + '%s' % (modName, nodeName, self)) + debug.logger & debug.FLAG_MIB and debug.logger( - 'getNodeNameByDesc: resolved %s:%s -> %s' % (modName, nodeName, oid)) + 'getNodeNameByDesc: resolved %s:%s -> ' + '%s' % (modName, nodeName, oid)) + return self.getNodeNameByOid(oid, modName) def getNodeName(self, nodeName, modName=''): @@ -238,28 +287,36 @@ class MibViewController(object): try: # First try nodeName as an OID/label return self.getNodeNameByOid(nodeName, modName) + except error.NoSuchObjectError: # ...on failure, try as MIB symbol oid, label, suffix = self.getNodeNameByDesc(nodeName[0], modName) + # ...with trailing suffix return self.getNodeNameByOid(oid + suffix + nodeName[1:], modName) def getOrderedNodeName(self, index, modName=''): self.indexMib() - if modName in self.__mibSymbolsIdx: - mibMod = self.__mibSymbolsIdx[modName] + + if modName in self._mibSymbolsIdx: + mibMod = self._mibSymbolsIdx[modName] + else: raise error.SmiError('No module %s at %s' % (modName, self)) + if not mibMod['oidToLabelIdx']: raise error.NoSuchObjectError( - str='No variables at MIB module %s at %s' % (modName, self) - ) + str='No variables at MIB module %s at ' + '%s' % (modName, self)) + try: oid, label = mibMod['oidToLabelIdx'].items()[index] + except KeyError: raise error.NoSuchObjectError( - str='No symbol at position %s in MIB module %s at %s' % (index, modName, self) - ) + str='No symbol at position %s in MIB module %s at ' + '%s' % (index, modName, self)) + return oid, label, () def getFirstNodeName(self, modName=''): @@ -272,55 +329,67 @@ class MibViewController(object): oid, label, suffix = self.getNodeName(nodeName, modName) try: return self.getNodeName( - self.__mibSymbolsIdx[modName]['oidToLabelIdx'].nextKey(oid) + suffix, modName - ) + self._mibSymbolsIdx[modName]['oidToLabelIdx'].nextKey(oid) + suffix, + modName) + except KeyError: raise error.NoSuchObjectError( - str='No name next to %s::%s at %s' % (modName, nodeName, self) - ) + str='No name next to %s::%s at ' + '%s' % (modName, nodeName, self)) def getParentNodeName(self, nodeName, modName=''): oid, label, suffix = self.getNodeName(nodeName, modName) + if len(oid) < 2: raise error.NoSuchObjectError( - str='No parent name for %s::%s at %s' % - (modName, nodeName, self) - ) + str='No parent name for %s::%s at ' + '%s' % (modName, nodeName, self)) + return oid[:-1], label[:-1], oid[-1:] + suffix def getNodeLocation(self, nodeName, modName=''): oid, label, suffix = self.getNodeName(nodeName, modName) - return self.__mibSymbolsIdx['']['oidToModIdx'][oid], label[-1], suffix + return (self._mibSymbolsIdx['']['oidToModIdx'][oid], label[-1], + suffix) # MIB type management def getTypeName(self, typeName, modName=''): self.indexMib() - if modName in self.__mibSymbolsIdx: - mibMod = self.__mibSymbolsIdx[modName] + + if modName in self._mibSymbolsIdx: + mibMod = self._mibSymbolsIdx[modName] + else: raise error.SmiError( - 'No module %s at %s' % (modName, self) - ) + 'No module %s at %s' % (modName, self)) + if typeName in mibMod['typeToModIdx']: m = mibMod['typeToModIdx'][typeName] + else: raise error.NoSuchObjectError( - str='No such type %s::%s at %s' % (modName, typeName, self) - ) + str='No such type %s::%s at ' + '%s' % (modName, typeName, self)) + return m, typeName def getOrderedTypeName(self, index, modName=''): self.indexMib() - if modName in self.__mibSymbolsIdx: - mibMod = self.__mibSymbolsIdx[modName] + + if modName in self._mibSymbolsIdx: + mibMod = self._mibSymbolsIdx[modName] + else: raise error.SmiError('No module %s at %s' % (modName, self)) + if not mibMod['typeToModIdx']: raise error.NoSuchObjectError( - str='No types at MIB module %s at %s' % (modName, self) - ) - t = mibMod['typeToModIdx'].keys()[index] + str='No types at MIB module %s at ' + '%s' % (modName, self)) + + t = list(mibMod['typeToModIdx'])[index] + return mibMod['typeToModIdx'][t], t def getFirstTypeName(self, modName=''): @@ -331,9 +400,11 @@ class MibViewController(object): def getNextType(self, typeName, modName=''): m, t = self.getTypeName(typeName, modName) + try: - return self.__mibSymbolsIdx[m]['typeToModIdx'].nextKey(t) + return self._mibSymbolsIdx[m]['typeToModIdx'].nextKey(t) + except KeyError: raise error.NoSuchObjectError( - str='No type next to %s::%s at %s' % (modName, typeName, self) - ) + str='No type next to %s::%s at ' + '%s' % (modName, typeName, self)) |