summaryrefslogtreecommitdiff
path: root/pysnmp/smi
diff options
context:
space:
mode:
authorIlya Etingof <etingof@gmail.com>2019-02-26 08:56:24 +0100
committerGitHub <noreply@github.com>2019-02-26 08:56:24 +0100
commit3f2f132a9fdf7a48ec6131d5498145dded3cfcad (patch)
tree63e6170b35f6b392bf2e3d3feb6996b886e4d36f /pysnmp/smi
parent2ad26f8bfef0e39b3789d9e6d4fcbf76820c9867 (diff)
downloadpysnmp-git-3f2f132a9fdf7a48ec6131d5498145dded3cfcad.tar.gz
PEP-8 long lines and dunders (#245)
This patch massively reformats the whole codebase mainly wrapping long lines and eliminating dundered private attributes.
Diffstat (limited to 'pysnmp/smi')
-rw-r--r--pysnmp/smi/builder.py321
-rw-r--r--pysnmp/smi/compiler.py33
-rw-r--r--pysnmp/smi/error.py14
-rw-r--r--pysnmp/smi/indices.py86
-rw-r--r--pysnmp/smi/instrum.py84
-rw-r--r--pysnmp/smi/rfc1902.py695
-rw-r--r--pysnmp/smi/view.py255
7 files changed, 908 insertions, 580 deletions
diff --git a/pysnmp/smi/builder.py b/pysnmp/smi/builder.py
index 9d75732b..513bc19a 100644
--- a/pysnmp/smi/builder.py
+++ b/pysnmp/smi/builder.py
@@ -33,25 +33,31 @@ from pysnmp import debug
class __AbstractMibSource(object):
def __init__(self, srcName):
self._srcName = srcName
- self.__magic = imp.get_magic()
- self.__sfx = {}
- self.__inited = None
+ self._magic = imp.get_magic()
+ self._sfx = {}
+ self._inited = None
+
for sfx, mode, typ in imp.get_suffixes():
- if typ not in self.__sfx:
- self.__sfx[typ] = []
- self.__sfx[typ].append((sfx, len(sfx), mode))
- debug.logger & debug.FLAG_BLD and debug.logger('trying %s' % self)
+ if typ not in self._sfx:
+ self._sfx[typ] = []
+
+ self._sfx[typ].append((sfx, len(sfx), mode))
+
+ debug.logger & debug.FLAG_BLD and debug.logger(
+ 'trying %s' % self)
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, self._srcName)
def _uniqNames(self, files):
u = set()
+
for f in files:
if f.startswith('__init__.'):
continue
+
for typ in (imp.PY_SOURCE, imp.PY_COMPILED):
- for sfx, sfxLen, mode in self.__sfx[typ]:
+ for sfx, sfxLen, mode in self._sfx[typ]:
if f[-sfxLen:] == sfx:
u.add(f[:-sfxLen])
return tuple(u)
@@ -59,18 +65,23 @@ class __AbstractMibSource(object):
# MibSource API follows
def fullPath(self, f='', sfx=''):
- return self._srcName + (f and (os.sep + f + sfx) or '')
+ if f:
+ return os.path.join(self._srcName, f) + sfx
+
+ return self._srcName
def init(self):
- if self.__inited is None:
- self.__inited = self._init()
- if self.__inited is self:
- self.__inited = True
- if self.__inited is True:
+ if self._inited is None:
+ self._inited = self._init()
+
+ if self._inited is self:
+ self._inited = True
+
+ if self._inited is True:
return self
else:
- return self.__inited
+ return self._inited
def listdir(self):
return self._listdir()
@@ -78,54 +89,62 @@ class __AbstractMibSource(object):
def read(self, f):
pycTime = pyTime = -1
- for pycSfx, pycSfxLen, pycMode in self.__sfx[imp.PY_COMPILED]:
+ for pycSfx, pycSfxLen, pycMode in self._sfx[imp.PY_COMPILED]:
+
+ pycFile = f + pycSfx
+
try:
- pycData, pycPath = self._getData(f + pycSfx, pycMode)
+ pycData, pycPath = self._getData(pycFile, pycMode)
except IOError as exc:
if ENOENT == -1 or exc.errno == ENOENT:
debug.logger & debug.FLAG_BLD and debug.logger(
- 'file %s access error: %s' % (f + pycSfx, exc)
- )
+ 'file %s access error: %s' % (pycFile, exc))
else:
- raise error.MibLoadError('MIB file %s access error: %s' % (f + pycSfx, exc))
+ raise error.MibLoadError(
+ 'MIB file %s access error: %s' % (pycFile, exc))
else:
- if self.__magic == pycData[:4]:
+ if self._magic == pycData[:4]:
pycData = pycData[4:]
pycTime = struct.unpack('<L', pycData[:4])[0]
pycData = pycData[4:]
+
debug.logger & debug.FLAG_BLD and debug.logger(
- 'file %s mtime %d' % (pycPath, pycTime)
- )
+ 'file %s mtime %d' % (pycPath, pycTime))
+
break
else:
- debug.logger & debug.FLAG_BLD and debug.logger('bad magic in %s' % pycPath)
+ debug.logger & debug.FLAG_BLD and debug.logger(
+ 'bad magic in %s' % pycPath)
+
+ for pySfx, pySfxLen, pyMode in self._sfx[imp.PY_SOURCE]:
+ pyFile = f + pySfx
- for pySfx, pySfxLen, pyMode in self.__sfx[imp.PY_SOURCE]:
try:
- pyTime = self._getTimestamp(f + pySfx)
+ pyTime = self._getTimestamp(pyFile)
except IOError as exc:
if ENOENT == -1 or exc.errno == ENOENT:
debug.logger & debug.FLAG_BLD and debug.logger(
- 'file %s access error: %s' % (f + pySfx, exc)
- )
+ 'file %s access error: %s' % (pyFile, exc))
else:
- raise error.MibLoadError('MIB file %s access error: %s' % (f + pySfx, exc))
+ raise error.MibLoadError(
+ 'MIB file %s access error: %s' % (pyFile, exc))
else:
- debug.logger & debug.FLAG_BLD and debug.logger('file %s mtime %d' % (f + pySfx, pyTime))
+ debug.logger & debug.FLAG_BLD and debug.logger(
+ 'file %s mtime %d' % (pyFile, pyTime))
break
if pycTime != -1 and pycTime >= pyTime:
return marshal.loads(pycData), pycSfx
if pyTime != -1:
- modData, pyPath = self._getData(f + pySfx, pyMode)
+ modData, pyPath = self._getData(pyFile, pyMode)
return compile(modData, pyPath, 'exec'), pyPath
raise IOError(ENOENT, 'No suitable module found', f)
@@ -147,16 +166,21 @@ class __AbstractMibSource(object):
class ZipMibSource(__AbstractMibSource):
def _init(self):
try:
- p = __import__(self._srcName, globals(), locals(), ['__init__'])
- if hasattr(p, '__loader__') and hasattr(p.__loader__, '_files'):
- self.__loader = p.__loader__
+ mod = __import__(
+ self._srcName, globals(), locals(), ['__init__'])
+
+ if (hasattr(mod, '__loader__') and
+ hasattr(mod.__loader__, '_files')):
+ self.__loader = mod.__loader__
self._srcName = self._srcName.replace('.', os.sep)
return self
- elif hasattr(p, '__file__'):
+
+ elif hasattr(mod, '__file__'):
# Dir relative to PYTHONPATH
- return DirMibSource(os.path.split(p.__file__)[0]).init()
+ return DirMibSource(os.path.split(mod.__file__)[0]).init()
+
else:
- raise error.MibLoadError('%s access error' % (p,))
+ raise error.MibLoadError('%s access error' % (mod,))
except ImportError:
# Dir relative to CWD
@@ -176,33 +200,40 @@ class ZipMibSource(__AbstractMibSource):
return time.mktime(t)
def _listdir(self):
- l = []
+ dirs = []
+
# noinspection PyProtectedMember
- for f in self.__loader._files.keys():
- d, f = os.path.split(f)
- if d == self._srcName:
- l.append(f)
- return tuple(self._uniqNames(l))
+ for path in self.__loader._files:
+ dr, fl = os.path.split(path)
+ if dr == self._srcName:
+ dirs.append(fl)
+
+ return tuple(self._uniqNames(dirs))
def _getTimestamp(self, f):
- p = os.path.join(self._srcName, f)
+ path = os.path.join(self._srcName, f)
+
# noinspection PyProtectedMember
- if p in self.__loader._files:
+ if path in self.__loader._files:
# noinspection PyProtectedMember
return self._parseDosTime(
- self.__loader._files[p][6], self.__loader._files[p][5]
+ self.__loader._files[path][6], self.__loader._files[path][5]
)
+
else:
- raise IOError(ENOENT, 'No such file in ZIP archive', p)
+ raise IOError(ENOENT, 'No such file in ZIP archive', path)
def _getData(self, f, mode=None):
- p = os.path.join(self._srcName, f)
+ path = os.path.join(self._srcName, f)
+
try:
- return self.__loader.get_data(p), p
+ return self.__loader.get_data(path), path
# ZIP code seems to return all kinds of errors
except Exception as exc:
- raise IOError(ENOENT, 'File or ZIP archive %s access error: %s' % (p, exc))
+ raise IOError(
+ ENOENT, 'File or ZIP archive %s access '
+ 'error: %s' % (path, exc))
class DirMibSource(__AbstractMibSource):
@@ -213,33 +244,35 @@ class DirMibSource(__AbstractMibSource):
def _listdir(self):
try:
return self._uniqNames(os.listdir(self._srcName))
+
except OSError as exc:
debug.logger & debug.FLAG_BLD and debug.logger(
'listdir() failed for %s: %s' % (self._srcName, exc))
return ()
def _getTimestamp(self, f):
- p = os.path.join(self._srcName, f)
+ path = os.path.join(self._srcName, f)
try:
- return os.stat(p)[8]
+ return os.stat(path)[8]
except OSError as exc:
- raise IOError(ENOENT, 'No such file: %s' % exc, p)
+ raise IOError(ENOENT, 'No such file: %s' % exc, path)
+
+ def _getData(self, fl, mode):
+ path = os.path.join(self._srcName, '*')
- def _getData(self, f, mode):
- p = os.path.join(self._srcName, '*')
try:
- if f in os.listdir(self._srcName): # make FS case-sensitive
- p = os.path.join(self._srcName, f)
- fp = open(p, mode)
+ if fl in os.listdir(self._srcName): # make FS case-sensitive
+ path = os.path.join(self._srcName, fl)
+ fp = open(path, mode)
data = fp.read()
fp.close()
- return data, p
+ return data, path
except (IOError, OSError) as exc:
- msg = 'File or directory %s access error: %s' % (p, exc)
+ msg = 'File or directory %s access error: %s' % (path, exc)
else:
- msg = 'No such file or directory: %s' % p
+ msg = 'No such file or directory: %s' % path
raise IOError(ENOENT, msg)
@@ -260,91 +293,108 @@ class MibBuilder(object):
def __init__(self):
self.lastBuildId = self._autoName = 0
+
sources = []
+
for ev in 'PYSNMP_MIB_PKGS', 'PYSNMP_MIB_DIRS', 'PYSNMP_MIB_DIR':
if ev in os.environ:
for m in os.environ[ev].split(os.pathsep):
sources.append(ZipMibSource(m))
+
if not sources and self.DEFAULT_MISC_MIBS:
for m in self.DEFAULT_MISC_MIBS.split(os.pathsep):
sources.append(ZipMibSource(m))
+
for m in self.DEFAULT_CORE_MIBS.split(os.pathsep):
sources.insert(0, ZipMibSource(m))
+
self.mibSymbols = {}
- self.__mibSources = []
- self.__modSeen = {}
- self.__modPathsSeen = set()
- self.__mibCompiler = None
+ self._mibSources = []
+ self._modSeen = {}
+ self._modPathsSeen = set()
+ self._mibCompiler = None
+
self.setMibSources(*sources)
# MIB compiler management
def getMibCompiler(self):
- return self.__mibCompiler
+ return self._mibCompiler
def setMibCompiler(self, mibCompiler, destDir):
self.addMibSources(DirMibSource(destDir))
- self.__mibCompiler = mibCompiler
+ self._mibCompiler = mibCompiler
return self
# MIB modules management
def addMibSources(self, *mibSources):
- self.__mibSources.extend([s.init() for s in mibSources])
- debug.logger & debug.FLAG_BLD and debug.logger('addMibSources: new MIB sources %s' % (self.__mibSources,))
+ self._mibSources.extend([s.init() for s in mibSources])
+
+ debug.logger & debug.FLAG_BLD and debug.logger(
+ 'addMibSources: new MIB sources %s' % (self._mibSources,))
def setMibSources(self, *mibSources):
- self.__mibSources = [s.init() for s in mibSources]
- debug.logger & debug.FLAG_BLD and debug.logger('setMibSources: new MIB sources %s' % (self.__mibSources,))
+ self._mibSources = [s.init() for s in mibSources]
+
+ debug.logger & debug.FLAG_BLD and debug.logger(
+ 'setMibSources: new MIB sources %s' % (self._mibSources,))
def getMibSources(self):
- return tuple(self.__mibSources)
+ return tuple(self._mibSources)
def loadModule(self, modName, **userCtx):
"""Load and execute MIB modules as Python code"""
- for mibSource in self.__mibSources:
- debug.logger & debug.FLAG_BLD and debug.logger('loadModule: trying %s at %s' % (modName, mibSource))
+ for mibSource in self._mibSources:
+ debug.logger & debug.FLAG_BLD and debug.logger(
+ 'loadModule: trying %s at %s' % (modName, mibSource))
+
try:
codeObj, sfx = mibSource.read(modName)
except IOError as exc:
debug.logger & debug.FLAG_BLD and debug.logger(
- 'loadModule: read %s from %s failed: %s' % (modName, mibSource, exc))
+ 'loadModule: read %s from %s failed: '
+ '%s' % (modName, mibSource, exc))
continue
modPath = mibSource.fullPath(modName, sfx)
- if modPath in self.__modPathsSeen:
- debug.logger & debug.FLAG_BLD and debug.logger('loadModule: seen %s' % modPath)
+ if modPath in self._modPathsSeen:
+ debug.logger & debug.FLAG_BLD and debug.logger(
+ 'loadModule: seen %s' % modPath)
break
else:
- self.__modPathsSeen.add(modPath)
+ self._modPathsSeen.add(modPath)
- debug.logger & debug.FLAG_BLD and debug.logger('loadModule: evaluating %s' % modPath)
+ debug.logger & debug.FLAG_BLD and debug.logger(
+ 'loadModule: evaluating %s' % modPath)
- g = {'mibBuilder': self, 'userCtx': userCtx}
+ g = {'mibBuilder': self,
+ 'userCtx': userCtx}
try:
exec(codeObj, g)
except Exception:
- self.__modPathsSeen.remove(modPath)
+ self._modPathsSeen.remove(modPath)
raise error.MibLoadError(
- 'MIB module \'%s\' load error: %s' % (modPath, traceback.format_exception(*sys.exc_info()))
- )
+ 'MIB module "%s" load error: '
+ '%s' % (modPath, traceback.format_exception(*sys.exc_info())))
- self.__modSeen[modName] = modPath
+ self._modSeen[modName] = modPath
- debug.logger & debug.FLAG_BLD and debug.logger('loadModule: loaded %s' % modPath)
+ debug.logger & debug.FLAG_BLD and debug.logger(
+ 'loadModule: loaded %s' % modPath)
break
- if modName not in self.__modSeen:
+ if modName not in self._modSeen:
raise error.MibNotFoundError(
- 'MIB file \"%s\" not found in search path (%s)' % (
- modName and modName + ".py[co]", ', '.join([str(x) for x in self.__mibSources]))
- )
+ 'MIB file "%s" not found in search path '
+ '(%s)' % (modName and modName + ".py[co]", ', '.join(
+ [str(x) for x in self._mibSources])))
return self
@@ -353,28 +403,36 @@ class MibBuilder(object):
# Build a list of available modules
if not modNames:
modNames = {}
- for mibSource in self.__mibSources:
+
+ for mibSource in self._mibSources:
for modName in mibSource.listdir():
modNames[modName] = None
+
modNames = list(modNames)
if not modNames:
raise error.MibNotFoundError(
- 'No MIB module to load at %s' % (self,)
- )
+ 'No MIB module to load at %s' % (self,))
for modName in modNames:
try:
self.loadModule(modName, **userCtx)
except error.MibNotFoundError:
- if self.__mibCompiler:
- debug.logger & debug.FLAG_BLD and debug.logger('loadModules: calling MIB compiler for %s' % modName)
- status = self.__mibCompiler.compile(modName, genTexts=self.loadTexts)
- errs = '; '.join([hasattr(x, 'error') and str(x.error) or x for x in status.values() if
- x in ('failed', 'missing')])
+ if self._mibCompiler:
+ debug.logger & debug.FLAG_BLD and debug.logger(
+ 'loadModules: calling MIB compiler for %s' % modName)
+
+ status = self._mibCompiler.compile(modName, genTexts=self.loadTexts)
+
+ errs = '; '.join(
+ hasattr(x, 'error') and str(x.error) or x
+ for x in status.values()
+ if x in ('failed', 'missing'))
+
if errs:
- raise error.MibNotFoundError('%s compilation error(s): %s' % (modName, errs))
+ raise error.MibNotFoundError(
+ '%s compilation error(s): %s' % (modName, errs))
# compilation succeeded, MIB might load now
self.loadModule(modName, **userCtx)
@@ -383,84 +441,103 @@ class MibBuilder(object):
def unloadModules(self, *modNames):
if not modNames:
- modNames = list(self.mibSymbols.keys())
+ modNames = list(self.mibSymbols)
+
for modName in modNames:
if modName not in self.mibSymbols:
raise error.MibNotFoundError(
- 'No module %s at %s' % (modName, self)
- )
+ 'No module %s at %s' % (modName, self))
+
self.unexportSymbols(modName)
- self.__modPathsSeen.remove(self.__modSeen[modName])
- del self.__modSeen[modName]
- debug.logger & debug.FLAG_BLD and debug.logger('unloadModules: %s' % modName)
+ self._modPathsSeen.remove(self._modSeen[modName])
+
+ del self._modSeen[modName]
+
+ debug.logger & debug.FLAG_BLD and debug.logger(
+ 'unloadModules: %s' % modName)
return self
def importSymbols(self, modName, *symNames, **userCtx):
if not modName:
raise error.SmiError(
- 'importSymbols: empty MIB module name'
- )
- r = ()
+ 'importSymbols: empty MIB module name')
+
+ symbols = []
+
for symName in symNames:
if modName not in self.mibSymbols:
self.loadModules(modName, **userCtx)
+
if modName not in self.mibSymbols:
raise error.MibNotFoundError(
- 'No module %s loaded at %s' % (modName, self)
- )
+ 'No module %s loaded at %s' % (modName, self))
+
if symName not in self.mibSymbols[modName]:
raise error.SmiError(
- 'No symbol %s::%s at %s' % (modName, symName, self)
- )
- r = r + (self.mibSymbols[modName][symName],)
- return r
+ 'No symbol %s::%s at %s' % (modName, symName, self))
+
+ symbols.append(self.mibSymbols[modName][symName])
+
+ return symbols
def exportSymbols(self, modName, *anonymousSyms, **namedSyms):
if modName not in self.mibSymbols:
self.mibSymbols[modName] = {}
+
mibSymbols = self.mibSymbols[modName]
for symObj in anonymousSyms:
debug.logger & debug.FLAG_BLD and debug.logger(
- 'exportSymbols: anonymous symbol %s::__pysnmp_%ld' % (modName, self._autoName))
+ 'exportSymbols: anonymous symbol %s::'
+ '__pysnmp_%ld' % (modName, self._autoName))
+
mibSymbols['__pysnmp_%ld' % self._autoName] = symObj
+
self._autoName += 1
+
for symName, symObj in namedSyms.items():
if symName in mibSymbols:
raise error.SmiError(
- 'Symbol %s already exported at %s' % (symName, modName)
- )
+ 'Symbol %s already exported at %s' % (symName, modName))
+
+ if (symName != self.moduleID and
+ not isinstance(symObj, classTypes)):
- if symName != self.moduleID and \
- not isinstance(symObj, classTypes):
label = symObj.getLabel()
+
if label:
symName = label
+
else:
symObj.setLabel(symName)
mibSymbols[symName] = symObj
- debug.logger & debug.FLAG_BLD and debug.logger('exportSymbols: symbol %s::%s' % (modName, symName))
+ debug.logger & debug.FLAG_BLD and debug.logger(
+ 'exportSymbols: symbol %s::%s' % (modName, symName))
self.lastBuildId += 1
def unexportSymbols(self, modName, *symNames):
if modName not in self.mibSymbols:
raise error.SmiError('No module %s at %s' % (modName, self))
+
mibSymbols = self.mibSymbols[modName]
+
if not symNames:
symNames = list(mibSymbols.keys())
+
for symName in symNames:
if symName not in mibSymbols:
raise error.SmiError(
- 'No symbol %s::%s at %s' % (modName, symName, self)
- )
+ 'No symbol %s::%s at %s' % (modName, symName, self))
+
del mibSymbols[symName]
- debug.logger & debug.FLAG_BLD and debug.logger('unexportSymbols: symbol %s::%s' % (modName, symName))
+ debug.logger & debug.FLAG_BLD and debug.logger(
+ 'unexportSymbols: symbol %s::%s' % (modName, symName))
if not self.mibSymbols[modName]:
del self.mibSymbols[modName]
diff --git a/pysnmp/smi/compiler.py b/pysnmp/smi/compiler.py
index 231685f8..6c4d3d11 100644
--- a/pysnmp/smi/compiler.py
+++ b/pysnmp/smi/compiler.py
@@ -33,9 +33,11 @@ except ImportError as exc:
def addMibCompilerDecorator(errorMsg):
+
def addMibCompiler(mibBuilder, **kwargs):
if not kwargs.get('ifAvailable'):
- raise error.SmiError('MIB compiler not available: %s' % errorMsg)
+ raise error.SmiError(
+ 'MIB compiler not available: %s' % errorMsg)
return addMibCompiler
@@ -48,18 +50,25 @@ else:
if kwargs.get('ifNotAdded') and mibBuilder.getMibCompiler():
return
- compiler = MibCompiler(parserFactory(**smiV1Relaxed)(),
- PySnmpCodeGen(),
- PyFileWriter(kwargs.get('destination') or DEFAULT_DEST))
+ compiler = MibCompiler(
+ parserFactory(**smiV1Relaxed)(),
+ PySnmpCodeGen(),
+ PyFileWriter(kwargs.get('destination') or DEFAULT_DEST))
+
+ compiler.addSources(
+ *getReadersFromUrls(*kwargs.get('sources') or DEFAULT_SOURCES))
+
+ compiler.addSearchers(
+ StubSearcher(*baseMibs))
- compiler.addSources(*getReadersFromUrls(*kwargs.get('sources') or DEFAULT_SOURCES))
+ compiler.addSearchers(
+ *[PyPackageSearcher(x.fullPath()) for x in mibBuilder.getMibSources()])
- compiler.addSearchers(StubSearcher(*baseMibs))
- compiler.addSearchers(*[PyPackageSearcher(x.fullPath()) for x in mibBuilder.getMibSources()])
- compiler.addBorrowers(*[PyFileBorrower(x, genTexts=mibBuilder.loadTexts) for x in
- getReadersFromUrls(*kwargs.get('borrowers') or DEFAULT_BORROWERS,
- lowcaseMatching=False)])
+ compiler.addBorrowers(
+ *[PyFileBorrower(x, genTexts=mibBuilder.loadTexts)
+ for x in getReadersFromUrls(
+ *kwargs.get('borrowers') or DEFAULT_BORROWERS,
+ lowcaseMatching=False)])
mibBuilder.setMibCompiler(
- compiler, kwargs.get('destination') or DEFAULT_DEST
- )
+ compiler, kwargs.get('destination') or DEFAULT_DEST)
diff --git a/pysnmp/smi/error.py b/pysnmp/smi/error.py
index 87909b1e..1322a2c5 100644
--- a/pysnmp/smi/error.py
+++ b/pysnmp/smi/error.py
@@ -23,25 +23,25 @@ class MibNotFoundError(MibLoadError):
class MibOperationError(SmiError):
def __init__(self, **kwargs):
- self.__outArgs = kwargs
+ self._outArgs = kwargs
def __str__(self):
- return '%s(%s)' % (self.__class__.__name__, self.__outArgs)
+ return '%s(%s)' % (self.__class__.__name__, self._outArgs)
def __getitem__(self, key):
- return self.__outArgs[key]
+ return self._outArgs[key]
def __contains__(self, key):
- return key in self.__outArgs
+ return key in self._outArgs
def get(self, key, defVal=None):
- return self.__outArgs.get(key, defVal)
+ return self._outArgs.get(key, defVal)
def keys(self):
- return self.__outArgs.keys()
+ return self._outArgs.keys()
def update(self, d):
- self.__outArgs.update(d)
+ self._outArgs.update(d)
# Aligned with SNMPv2 PDU error-status values
diff --git a/pysnmp/smi/indices.py b/pysnmp/smi/indices.py
index 677ac97c..f55f7e6a 100644
--- a/pysnmp/smi/indices.py
+++ b/pysnmp/smi/indices.py
@@ -11,45 +11,50 @@ class OrderedDict(dict):
"""Ordered dictionary used for indices"""
def __init__(self, *args, **kwargs):
- self.__keys = []
- self.__dirty = True
super(OrderedDict, self).__init__()
+
+ self._keys = []
+ self._dirty = True
+ self._keysLens = []
+
if args:
self.update(*args)
+
if kwargs:
self.update(**kwargs)
def __setitem__(self, key, value):
super(OrderedDict, self).__setitem__(key, value)
- if key not in self.__keys:
- self.__keys.append(key)
- self.__dirty = True
+ if key not in self._keys:
+ self._keys.append(key)
+ self._dirty = True
def __delitem__(self, key):
super(OrderedDict, self).__delitem__(key)
- if key in self.__keys:
- self.__keys.remove(key)
- self.__dirty = True
+ if key in self._keys:
+ self._keys.remove(key)
+ self._dirty = True
def clear(self):
super(OrderedDict, self).clear()
- self.__keys = []
- self.__dirty = True
+ self._keys = []
+ self._dirty = True
def keys(self):
- if self.__dirty:
- self.__order()
- return list(self.__keys)
+ if self._dirty:
+ self._order()
+ return list(self._keys)
def values(self):
- if self.__dirty:
- self.__order()
- return [self[k] for k in self.__keys]
+ if self._dirty:
+ self._order()
+ return [self[k] for k in self._keys]
def items(self):
- if self.__dirty:
- self.__order()
- return [(k, self[k]) for k in self.__keys]
+ if self._dirty:
+ self._order()
+
+ return [(k, self[k]) for k in self._keys]
def update(self, *args, **kwargs):
if args:
@@ -57,6 +62,7 @@ class OrderedDict(dict):
if hasattr(iterable, 'keys'):
for k in iterable:
self[k] = iterable[k]
+
else:
for k, v in iterable:
self[k] = v
@@ -68,16 +74,19 @@ class OrderedDict(dict):
def sortingFun(self, keys):
keys.sort()
- def __order(self):
- self.sortingFun(self.__keys)
- self.__keysLens = sorted(set(len(k) for k in self.__keys), reverse=True)
- self.__dirty = False
+ def _order(self):
+ self.sortingFun(self._keys)
+
+ self._keysLens = sorted(
+ set(len(k) for k in self._keys), reverse=True)
+
+ self._dirty = False
def nextKey(self, key):
- if self.__dirty:
- self.__order()
+ if self._dirty:
+ self._order()
- keys = self.__keys
+ keys = self._keys
if key in keys:
nextIdx = keys.index(key) + 1
@@ -92,30 +101,35 @@ class OrderedDict(dict):
raise KeyError(key)
def getKeysLens(self):
- if self.__dirty:
- self.__order()
- return self.__keysLens
+ if self._dirty:
+ self._order()
+
+ return self._keysLens
class OidOrderedDict(OrderedDict):
"""OID-ordered dictionary used for indices"""
def __init__(self, *args, **kwargs):
- self.__keysCache = {}
OrderedDict.__init__(self, *args, **kwargs)
+ self._keysCache = {}
+
def __setitem__(self, key, value):
OrderedDict.__setitem__(self, key, value)
- if key not in self.__keysCache:
+
+ if key not in self._keysCache:
if isinstance(key, tuple):
- self.__keysCache[key] = key
+ self._keysCache[key] = key
+
else:
- self.__keysCache[key] = [int(x) for x in key.split('.') if x]
+ self._keysCache[key] = [int(x) for x in key.split('.') if x]
def __delitem__(self, key):
OrderedDict.__delitem__(self, key)
- if key in self.__keysCache:
- del self.__keysCache[key]
+
+ if key in self._keysCache:
+ del self._keysCache[key]
def sortingFun(self, keys):
- keys.sort(key=lambda k, d=self.__keysCache: d[k])
+ keys.sort(key=lambda k, d=self._keysCache: d[k])
diff --git a/pysnmp/smi/instrum.py b/pysnmp/smi/instrum.py
index aec25adc..342eeb6b 100644
--- a/pysnmp/smi/instrum.py
+++ b/pysnmp/smi/instrum.py
@@ -38,21 +38,21 @@ class MibInstrumController(AbstractMibInstrumController):
STATE_WRITE_CLEANUP = 'writeCleanup'
STATE_WRITE_UNDO = 'writeUndo'
- fsmReadVar = {
+ FSM_READ_VAR = {
# (state, status) -> newState
(STATE_START, STATUS_OK): STATE_READ_TEST,
(STATE_READ_TEST, STATUS_OK): STATE_READ_GET,
(STATE_READ_GET, STATUS_OK): STATE_STOP,
(STATE_ANY, STATUS_ERROR): STATE_STOP
}
- fsmReadNextVar = {
+ FSM_READ_NEXT_VAR = {
# (state, status) -> newState
(STATE_START, STATUS_OK): STATE_READ_TEST_NEXT,
(STATE_READ_TEST_NEXT, STATUS_OK): STATE_READ_GET_NEXT,
(STATE_READ_GET_NEXT, STATUS_OK): STATE_STOP,
(STATE_ANY, STATUS_ERROR): STATE_STOP
}
- fsmWriteVar = {
+ FSM_WRITE_VAR = {
# (state, status) -> newState
(STATE_START, STATUS_OK): STATE_WRITE_TEST,
(STATE_WRITE_TEST, STATUS_OK): STATE_WRITE_COMMIT,
@@ -79,7 +79,7 @@ class MibInstrumController(AbstractMibInstrumController):
def getMibBuilder(self):
return self.mibBuilder
- def __indexMib(self):
+ def _indexMib(self):
"""Rebuild a tree from MIB objects found at currently loaded modules.
If currently existing tree is out of date, walk over all Managed Objects
@@ -108,7 +108,8 @@ class MibInstrumController(AbstractMibInstrumController):
if self.lastBuildId == self.mibBuilder.lastBuildId:
return
- (MibScalarInstance, MibScalar, MibTableColumn, MibTableRow,
+ (MibScalarInstance, MibScalar,
+ MibTableColumn, MibTableRow,
MibTable) = self.mibBuilder.importSymbols(
'SNMPv2-SMI', 'MibScalarInstance', 'MibScalar',
'MibTableColumn', 'MibTableRow', 'MibTable'
@@ -128,26 +129,36 @@ class MibInstrumController(AbstractMibInstrumController):
mibSymbols.sort(key=lambda x: x[0], reverse=True)
for modName, mibMod in mibSymbols:
+
for symObj in mibMod.values():
+
if isinstance(symObj, MibTable):
tables[symObj.name] = symObj
+
elif isinstance(symObj, MibTableRow):
rows[symObj.name] = symObj
+
elif isinstance(symObj, MibTableColumn):
cols[symObj.name] = symObj
+
elif isinstance(symObj, MibScalarInstance):
instances[symObj.name] = symObj
+
elif isinstance(symObj, MibScalar):
scalars[symObj.name] = symObj
# Detach items from each other
for symName, parentName in self.lastBuildSyms.items():
+
if parentName in scalars:
scalars[parentName].unregisterSubtrees(symName)
+
elif parentName in cols:
cols[parentName].unregisterSubtrees(symName)
+
elif parentName in rows:
rows[parentName].unregisterSubtrees(symName)
+
else:
mibTree.unregisterSubtrees(symName)
@@ -157,23 +168,29 @@ class MibInstrumController(AbstractMibInstrumController):
for inst in instances.values():
if inst.typeName in scalars:
scalars[inst.typeName].registerSubtrees(inst)
+
elif inst.typeName in cols:
cols[inst.typeName].registerSubtrees(inst)
+
else:
raise error.SmiError(
- 'Orphan MIB scalar instance %r at %r' % (inst, self)
- )
+ 'Orphan MIB scalar instance %r at '
+ '%r' % (inst, self))
+
lastBuildSyms[inst.name] = inst.typeName
# Attach Table Columns to Table Rows
for col in cols.values():
rowName = col.name[:-1] # XXX
+
if rowName in rows:
rows[rowName].registerSubtrees(col)
+
else:
raise error.SmiError(
- 'Orphan MIB table column %r at %r' % (col, self)
- )
+ 'Orphan MIB table column %r at '
+ '%r' % (col, self))
+
lastBuildSyms[col.name] = rowName
# Attach Table Rows to MIB tree
@@ -195,7 +212,7 @@ class MibInstrumController(AbstractMibInstrumController):
self.lastBuildId = self.mibBuilder.lastBuildId
- debug.logger & debug.FLAG_INS and debug.logger('__indexMib: rebuilt')
+ debug.logger & debug.FLAG_INS and debug.logger('_indexMib: rebuilt')
def flipFlopFsm(self, fsmTable, *varBinds, **context):
"""Read, modify, create or remove Managed Objects Instances.
@@ -250,12 +267,12 @@ class MibInstrumController(AbstractMibInstrumController):
if err:
# Move other errors into the errors sequence
errors = context['errors']
+
errors.append(
{'error': err,
'idx': idx,
'varbind': varBind,
- 'state': context['state']}
- )
+ 'state': context['state']})
context['status'] = self.STATUS_ERROR
@@ -271,8 +288,8 @@ class MibInstrumController(AbstractMibInstrumController):
count[0] += 1
debug.logger & debug.FLAG_INS and debug.logger(
- '_cbFun: var-bind %d, processed %d, expected %d' % (
- idx, count[0], len(varBinds)))
+ '_cbFun: var-bind %d, processed %d, expected '
+ '%d' % (idx, count[0], len(varBinds)))
if count[0] < len(varBinds):
return
@@ -282,7 +299,8 @@ class MibInstrumController(AbstractMibInstrumController):
self.flipFlopFsm(fsmTable, *varBinds, **dict(context, cbFun=cbFun))
- debug.logger & debug.FLAG_INS and debug.logger('flipFlopFsm: input var-binds %r' % (varBinds,))
+ debug.logger & debug.FLAG_INS and debug.logger(
+ 'flipFlopFsm: input var-binds %r' % (varBinds,))
mibTree, = self.mibBuilder.importSymbols('SNMPv2-SMI', 'iso')
@@ -299,7 +317,7 @@ class MibInstrumController(AbstractMibInstrumController):
errors = []
_varBinds = list(varBinds)
- self.__indexMib()
+ self._indexMib()
debug.logger & debug.FLAG_INS and debug.logger(
'flipFlopFsm: current state %s, status %s' % (state, status))
@@ -312,10 +330,12 @@ class MibInstrumController(AbstractMibInstrumController):
newState = fsmTable[(self.STATE_ANY, status)]
except KeyError:
- raise error.SmiError('Unresolved FSM state %s, %s' % (state, status))
+ raise error.SmiError(
+ 'Unresolved FSM state %s, %s' % (state, status))
debug.logger & debug.FLAG_INS and debug.logger(
- 'flipFlopFsm: state %s status %s -> transitioned into state %s' % (state, status, newState))
+ 'flipFlopFsm: state %s status %s -> transitioned into state '
+ '%s' % (state, status, newState))
state = newState
@@ -324,8 +344,10 @@ class MibInstrumController(AbstractMibInstrumController):
context.pop('status', None)
context.pop('instances', None)
context.pop('varBinds', None)
+
if cbFun:
cbFun(_varBinds, **context)
+
return
# the case of no var-binds
@@ -336,16 +358,15 @@ class MibInstrumController(AbstractMibInstrumController):
actionFun = getattr(mibTree, state, None)
if not actionFun:
raise error.SmiError(
- 'Unsupported state handler %s at %s' % (state, self)
- )
+ 'Unsupported state handler %s at '
+ '%s' % (state, self))
for idx, varBind in enumerate(varBinds):
- actionFun(varBind,
- **dict(context, cbFun=_cbFun,
- state=state, status=status,
- idx=idx, total=len(varBinds),
- instances=instances, errors=errors,
- varBinds=_varBinds, nextName=None))
+ actionFun(
+ varBind,
+ **dict(context, cbFun=_cbFun, state=state, status=status,
+ idx=idx, total=len(varBinds), instances=instances,
+ errors=errors, varBinds=_varBinds, nextName=None))
debug.logger & debug.FLAG_INS and debug.logger(
'flipFlopFsm: func %s initiated for %r' % (actionFun, varBind))
@@ -354,9 +375,10 @@ class MibInstrumController(AbstractMibInstrumController):
def _defaultErrorHandler(varBinds, **context):
"""Raise exception on any error if user callback is missing"""
errors = context.get('errors')
+
if errors:
- error = errors[-1]
- raise error['error']
+ err = errors[-1]
+ raise err['error']
def readMibObjects(self, *varBinds, **context):
"""Read Managed Objects Instances.
@@ -410,7 +432,7 @@ class MibInstrumController(AbstractMibInstrumController):
if 'cbFun' not in context:
context['cbFun'] = self._defaultErrorHandler
- self.flipFlopFsm(self.fsmReadVar, *varBinds, **context)
+ self.flipFlopFsm(self.FSM_READ_VAR, *varBinds, **context)
def readNextMibObjects(self, *varBinds, **context):
"""Read Managed Objects Instances next to the given ones.
@@ -470,7 +492,7 @@ class MibInstrumController(AbstractMibInstrumController):
if 'cbFun' not in context:
context['cbFun'] = self._defaultErrorHandler
- self.flipFlopFsm(self.fsmReadNextVar, *varBinds, **context)
+ self.flipFlopFsm(self.FSM_READ_NEXT_VAR, *varBinds, **context)
def writeMibObjects(self, *varBinds, **context):
"""Create, destroy or modify Managed Objects Instances.
@@ -539,4 +561,4 @@ class MibInstrumController(AbstractMibInstrumController):
if 'cbFun' not in context:
context['cbFun'] = self._defaultErrorHandler
- self.flipFlopFsm(self.fsmWriteVar, *varBinds, **context)
+ self.flipFlopFsm(self.FSM_WRITE_VAR, *varBinds, **context)
diff --git a/pysnmp/smi/rfc1902.py b/pysnmp/smi/rfc1902.py
index 9a3d4507..40249a1e 100644
--- a/pysnmp/smi/rfc1902.py
+++ b/pysnmp/smi/rfc1902.py
@@ -4,9 +4,6 @@
# Copyright (c) 2005-2019, Ilya Etingof <etingof@gmail.com>
# License: http://snmplabs.com/pysnmp/license.html
#
-from pyasn1.error import PyAsn1Error
-from pyasn1.type.base import AbstractSimpleAsn1Item
-
from pysnmp import debug
from pysnmp.proto import rfc1902
from pysnmp.proto import rfc1905
@@ -15,6 +12,9 @@ from pysnmp.smi.builder import ZipMibSource
from pysnmp.smi.compiler import addMibCompiler
from pysnmp.smi.error import SmiError
+from pyasn1.error import PyAsn1Error
+from pyasn1.type.base import AbstractSimpleAsn1Item
+
__all__ = ['ObjectIdentity', 'ObjectType', 'NotificationType']
@@ -78,19 +78,22 @@ class ObjectIdentity(object):
ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0)
>>> ObjectIdentity('IP-MIB', 'ipAdEntAddr', '127.0.0.1', 123)
ObjectIdentity('IP-MIB', 'ipAdEntAddr', '127.0.0.1', 123)
-
"""
ST_DIRTY, ST_CLEAN = 1, 2
def __init__(self, *args, **kwargs):
- self.__args = args
- self.__kwargs = kwargs
- self.__mibSourcesToAdd = self.__modNamesToLoad = None
- self.__asn1SourcesToAdd = self.__asn1SourcesOptions = None
- self.__state = self.ST_DIRTY
- self.__indices = self.__oid = self.__label = ()
- self.__modName = self.__symName = ''
- self.__mibNode = None
+ self._args = args
+ self._kwargs = kwargs
+ self._mibSourcesToAdd = None
+ self._modNamesToLoad = None
+ self._asn1SourcesToAdd = None
+ self._asn1SourcesOptions = None
+ self._state = self.ST_DIRTY
+ self._indices = ()
+ self._oid = ()
+ self._label = ()
+ self._modName = self._symName = ''
+ self._mibNode = None
def getMibSymbol(self):
"""Returns MIB variable symbolic identification.
@@ -116,12 +119,13 @@ class ObjectIdentity(object):
>>> objectIdentity.getMibSymbol()
('SNMPv2-MIB', 'sysDescr', (0,))
>>>
-
"""
- if self.__state & self.ST_CLEAN:
- return self.__modName, self.__symName, self.__indices
+ if self._state & self.ST_CLEAN:
+ return self._modName, self._symName, self._indices
+
else:
- raise SmiError('%s object not fully initialized' % self.__class__.__name__)
+ raise SmiError(
+ '%s object not fully initialized' % self.__class__.__name__)
def getOid(self):
"""Returns OID identifying MIB variable.
@@ -143,12 +147,13 @@ class ObjectIdentity(object):
>>> objectIdentity.getOid()
ObjectName('1.3.6.1.2.1.1.1.0')
>>>
-
"""
- if self.__state & self.ST_CLEAN:
- return self.__oid
+ if self._state & self.ST_CLEAN:
+ return self._oid
+
else:
- raise SmiError('%s object not fully initialized' % self.__class__.__name__)
+ raise SmiError(
+ '%s object not fully initialized' % self.__class__.__name__)
def getLabel(self):
"""Returns symbolic path to this MIB variable.
@@ -179,21 +184,24 @@ class ObjectIdentity(object):
>>> objectIdentity.getOid()
('iso', 'org', 'dod', 'internet', 'mgmt', 'mib-2', 'system', 'sysDescr')
>>>
-
"""
- if self.__state & self.ST_CLEAN:
- return self.__label
+ if self._state & self.ST_CLEAN:
+ return self._label
+
else:
- raise SmiError('%s object not fully initialized' % self.__class__.__name__)
+ raise SmiError(
+ '%s object not fully initialized' % self.__class__.__name__)
def getMibNode(self):
- if self.__state & self.ST_CLEAN:
- return self.__mibNode
+ if self._state & self.ST_CLEAN:
+ return self._mibNode
+
else:
- raise SmiError('%s object not fully initialized' % self.__class__.__name__)
+ raise SmiError(
+ '%s object not fully initialized' % self.__class__.__name__)
def isFullyResolved(self):
- return self.__state & self.ST_CLEAN
+ return self._state & self.ST_CLEAN
#
# A gateway to MIBs manipulation routines
@@ -227,16 +235,19 @@ class ObjectIdentity(object):
>>> ObjectIdentity('SNMPv2-MIB', 'sysDescr').addAsn1Source('http://mibs.snmplabs.com/asn1/@mib@')
ObjectIdentity('SNMPv2-MIB', 'sysDescr')
>>>
-
"""
- if self.__asn1SourcesToAdd is None:
- self.__asn1SourcesToAdd = asn1Sources
+ if self._asn1SourcesToAdd is None:
+ self._asn1SourcesToAdd = asn1Sources
+
else:
- self.__asn1SourcesToAdd += asn1Sources
- if self.__asn1SourcesOptions:
- self.__asn1SourcesOptions.update(kwargs)
+ self._asn1SourcesToAdd += asn1Sources
+
+ if self._asn1SourcesOptions:
+ self._asn1SourcesOptions.update(kwargs)
+
else:
- self.__asn1SourcesOptions = kwargs
+ self._asn1SourcesOptions = kwargs
+
return self
def addMibSource(self, *mibSources):
@@ -266,12 +277,13 @@ class ObjectIdentity(object):
>>> ObjectIdentity('SNMPv2-MIB', 'sysDescr').addMibSource('/opt/pysnmp/mibs', 'pysnmp_mibs')
ObjectIdentity('SNMPv2-MIB', 'sysDescr')
>>>
-
"""
- if self.__mibSourcesToAdd is None:
- self.__mibSourcesToAdd = mibSources
+ if self._mibSourcesToAdd is None:
+ self._mibSourcesToAdd = mibSources
+
else:
- self.__mibSourcesToAdd += mibSources
+ self._mibSourcesToAdd += mibSources
+
return self
# provides deferred MIBs load
@@ -294,12 +306,13 @@ class ObjectIdentity(object):
>>> ObjectIdentity('SNMPv2-MIB', 'sysDescr').loadMibs('IF-MIB', 'TCP-MIB')
ObjectIdentity('SNMPv2-MIB', 'sysDescr')
>>>
-
"""
- if self.__modNamesToLoad is None:
- self.__modNamesToLoad = modNames
+ if self._modNamesToLoad is None:
+ self._modNamesToLoad = modNames
+
else:
- self.__modNamesToLoad += modNames
+ self._modNamesToLoad += modNames
+
return self
# this would eventually be called by an entity which posses a
@@ -344,298 +357,382 @@ class ObjectIdentity(object):
>>> objectIdentity.resolveWithMib(mibViewController)
ObjectIdentity('SNMPv2-MIB', 'sysDescr')
>>>
-
"""
- if self.__mibSourcesToAdd is not None:
- debug.logger & debug.FLAG_MIB and debug.logger('adding MIB sources %s' % ', '.join(self.__mibSourcesToAdd))
+ if self._mibSourcesToAdd is not None:
+ debug.logger & debug.FLAG_MIB and debug.logger(
+ 'adding MIB sources %s' % ', '.join(self._mibSourcesToAdd))
+
mibViewController.mibBuilder.addMibSources(
- *[ZipMibSource(x) for x in self.__mibSourcesToAdd]
- )
- self.__mibSourcesToAdd = None
+ *[ZipMibSource(x) for x in self._mibSourcesToAdd])
+
+ self._mibSourcesToAdd = None
+
+ if self._asn1SourcesToAdd is None:
+ addMibCompiler(
+ mibViewController.mibBuilder,
+ ifAvailable=True, ifNotAdded=True)
- if self.__asn1SourcesToAdd is None:
- addMibCompiler(mibViewController.mibBuilder,
- ifAvailable=True, ifNotAdded=True)
else:
debug.logger & debug.FLAG_MIB and debug.logger(
- 'adding MIB compiler with source paths %s' % ', '.join(self.__asn1SourcesToAdd))
+ 'adding MIB compiler with source paths '
+ '%s' % ', '.join(self._asn1SourcesToAdd))
+
addMibCompiler(
mibViewController.mibBuilder,
- sources=self.__asn1SourcesToAdd,
- searchers=self.__asn1SourcesOptions.get('searchers'),
- borrowers=self.__asn1SourcesOptions.get('borrowers'),
- destination=self.__asn1SourcesOptions.get('destination'),
- ifAvailable=self.__asn1SourcesOptions.get('ifAvailable'),
- ifNotAdded=self.__asn1SourcesOptions.get('ifNotAdded')
+ sources=self._asn1SourcesToAdd,
+ searchers=self._asn1SourcesOptions.get('searchers'),
+ borrowers=self._asn1SourcesOptions.get('borrowers'),
+ destination=self._asn1SourcesOptions.get('destination'),
+ ifAvailable=self._asn1SourcesOptions.get('ifAvailable'),
+ ifNotAdded=self._asn1SourcesOptions.get('ifNotAdded')
)
- self.__asn1SourcesToAdd = self.__asn1SourcesOptions = None
- if self.__modNamesToLoad is not None:
- debug.logger & debug.FLAG_MIB and debug.logger('loading MIB modules %s' % ', '.join(self.__modNamesToLoad))
- mibViewController.mibBuilder.loadModules(*self.__modNamesToLoad)
- self.__modNamesToLoad = None
+ self._asn1SourcesToAdd = self._asn1SourcesOptions = None
+
+ if self._modNamesToLoad is not None:
+ debug.logger & debug.FLAG_MIB and debug.logger(
+ 'loading MIB modules %s' % ', '.join(self._modNamesToLoad))
+
+ mibViewController.mibBuilder.loadModules(*self._modNamesToLoad)
+
+ self._modNamesToLoad = None
- if self.__state & self.ST_CLEAN:
+ if self._state & self.ST_CLEAN:
return self
- MibScalar, MibTableColumn = mibViewController.mibBuilder.importSymbols('SNMPv2-SMI', 'MibScalar',
- 'MibTableColumn')
+ MibScalar, MibTableColumn = mibViewController.mibBuilder.importSymbols(
+ 'SNMPv2-SMI', 'MibScalar', 'MibTableColumn')
- self.__indices = ()
+ self._indices = ()
- if isinstance(self.__args[0], ObjectIdentity):
- self.__args[0].resolveWithMib(mibViewController)
+ if isinstance(self._args[0], ObjectIdentity):
+ self._args[0].resolveWithMib(mibViewController)
+
+ if len(self._args) == 1: # OID or label or MIB module
+ debug.logger & debug.FLAG_MIB and debug.logger(
+ 'resolving %s as OID or label' % self._args)
- if len(self.__args) == 1: # OID or label or MIB module
- debug.logger & debug.FLAG_MIB and debug.logger('resolving %s as OID or label' % self.__args)
try:
# pyasn1 ObjectIdentifier or sequence of ints or string OID
- self.__oid = rfc1902.ObjectName(self.__args[0]) # OID
+ self._oid = rfc1902.ObjectName(self._args[0]) # OID
+
except PyAsn1Error:
# sequence of sub-OIDs and labels
- if isinstance(self.__args[0], (list, tuple)):
+ if isinstance(self._args[0], (list, tuple)):
prefix, label, suffix = mibViewController.getNodeName(
- self.__args[0]
- )
+ self._args[0])
+
# string label
- elif '.' in self.__args[0]:
+ elif '.' in self._args[0]:
prefix, label, suffix = mibViewController.getNodeNameByOid(
- tuple(self.__args[0].split('.'))
- )
+ tuple(self._args[0].split('.')))
+
# MIB module name
else:
- modName = self.__args[0]
+ modName = self._args[0]
+
mibViewController.mibBuilder.loadModules(modName)
- if self.__kwargs.get('last'):
- prefix, label, suffix = mibViewController.getLastNodeName(modName)
+
+ if self._kwargs.get('last'):
+ (prefix, label,
+ suffix) = mibViewController.getLastNodeName(modName)
+
else:
- prefix, label, suffix = mibViewController.getFirstNodeName(modName)
+ (prefix, label,
+ suffix) = mibViewController.getFirstNodeName(modName)
if suffix:
try:
- suffix = tuple([int(x) for x in suffix])
+ suffix = tuple(int(x) for x in suffix)
+
except ValueError:
- raise SmiError('Unknown object name component %r' % (suffix,))
- self.__oid = rfc1902.ObjectName(prefix + suffix)
+ raise SmiError(
+ 'Unknown object name component %r' % (suffix,))
+
+ self._oid = rfc1902.ObjectName(prefix + suffix)
+
else:
prefix, label, suffix = mibViewController.getNodeNameByOid(
- self.__oid
- )
+ self._oid)
debug.logger & debug.FLAG_MIB and debug.logger(
- 'resolved %r into prefix %r and suffix %r' % (self.__args, prefix, suffix))
+ 'resolved %r into prefix %r and suffix '
+ '%r' % (self._args, prefix, suffix))
modName, symName, _ = mibViewController.getNodeLocation(prefix)
- self.__modName = modName
- self.__symName = symName
+ self._modName = modName
+ self._symName = symName
- self.__label = label
+ self._label = label
mibNode, = mibViewController.mibBuilder.importSymbols(
- modName, symName
- )
+ modName, symName)
- self.__mibNode = mibNode
+ self._mibNode = mibNode
- debug.logger & debug.FLAG_MIB and debug.logger('resolved prefix %r into MIB node %r' % (prefix, mibNode))
+ debug.logger & debug.FLAG_MIB and debug.logger(
+ 'resolved prefix %r into MIB node %r' % (prefix, mibNode))
if isinstance(mibNode, MibTableColumn): # table column
if suffix:
rowModName, rowSymName, _ = mibViewController.getNodeLocation(
mibNode.name[:-1]
)
+
rowNode, = mibViewController.mibBuilder.importSymbols(
rowModName, rowSymName
)
- self.__indices = rowNode.getIndicesFromInstId(suffix)
+
+ self._indices = rowNode.getIndicesFromInstId(suffix)
+
elif isinstance(mibNode, MibScalar): # scalar
if suffix:
- self.__indices = (rfc1902.ObjectName(suffix),)
+ self._indices = (rfc1902.ObjectName(suffix),)
+
else:
if suffix:
- self.__indices = (rfc1902.ObjectName(suffix),)
- self.__state |= self.ST_CLEAN
+ self._indices = (rfc1902.ObjectName(suffix),)
+
+ self._state |= self.ST_CLEAN
- debug.logger & debug.FLAG_MIB and debug.logger('resolved indices are %r' % (self.__indices,))
+ debug.logger & debug.FLAG_MIB and debug.logger(
+ 'resolved indices are %r' % (self._indices,))
return self
- elif len(self.__args) > 1: # MIB, symbol[, index, index ...]
+
+ elif len(self._args) > 1: # MIB, symbol[, index, index ...]
# MIB, symbol, index, index
- if self.__args[0] and self.__args[1]:
- self.__modName = self.__args[0]
- self.__symName = self.__args[1]
+ if self._args[0] and self._args[1]:
+ self._modName = self._args[0]
+ self._symName = self._args[1]
+
# MIB, ''
- elif self.__args[0]:
- mibViewController.mibBuilder.loadModules(self.__args[0])
- if self.__kwargs.get('last'):
- prefix, label, suffix = mibViewController.getLastNodeName(self.__args[0])
+ elif self._args[0]:
+ mibViewController.mibBuilder.loadModules(self._args[0])
+
+ if self._kwargs.get('last'):
+ (prefix, label,
+ suffix) = mibViewController.getLastNodeName(self._args[0])
+
else:
- prefix, label, suffix = mibViewController.getFirstNodeName(self.__args[0])
- self.__modName, self.__symName, _ = mibViewController.getNodeLocation(prefix)
+ (prefix, label,
+ suffix) = mibViewController.getFirstNodeName(self._args[0])
+
+ (self._modName,
+ self._symName, _) = mibViewController.getNodeLocation(prefix)
+
# '', symbol, index, index
else:
- prefix, label, suffix = mibViewController.getNodeName(self.__args[1:])
- self.__modName, self.__symName, _ = mibViewController.getNodeLocation(prefix)
+ (prefix, label,
+ suffix) = mibViewController.getNodeName(self._args[1:])
+
+ (self._modName,
+ self._symName, _) = mibViewController.getNodeLocation(prefix)
mibNode, = mibViewController.mibBuilder.importSymbols(
- self.__modName, self.__symName
- )
+ self._modName, self._symName)
- self.__mibNode = mibNode
+ self._mibNode = mibNode
- self.__oid = rfc1902.ObjectName(mibNode.getName())
+ self._oid = rfc1902.ObjectName(mibNode.getName())
- prefix, label, suffix = mibViewController.getNodeNameByOid(
- self.__oid
- )
- self.__label = label
+ (prefix, label,
+ suffix) = mibViewController.getNodeNameByOid(
+ self._oid)
+
+ self._label = label
debug.logger & debug.FLAG_MIB and debug.logger(
- 'resolved %r into prefix %r and suffix %r' % (self.__args, prefix, suffix))
+ 'resolved %r into prefix %r and suffix '
+ '%r' % (self._args, prefix, suffix))
if isinstance(mibNode, MibTableColumn): # table
rowModName, rowSymName, _ = mibViewController.getNodeLocation(
- mibNode.name[:-1]
- )
+ mibNode.name[:-1])
+
rowNode, = mibViewController.mibBuilder.importSymbols(
- rowModName, rowSymName
- )
- if self.__args[2:]:
+ rowModName, rowSymName)
+
+ if self._args[2:]:
try:
- instIds = rowNode.getInstIdFromIndices(*self.__args[2:])
- self.__oid += instIds
- self.__indices = rowNode.getIndicesFromInstId(instIds)
+ instIds = rowNode.getInstIdFromIndices(*self._args[2:])
+ self._oid += instIds
+ self._indices = rowNode.getIndicesFromInstId(instIds)
+
except PyAsn1Error as exc:
- raise SmiError('Instance index %r to OID conversion failure at object %r: %s' % (
- self.__args[2:], mibNode.getLabel(), exc))
- elif self.__args[2:]: # any other kind of MIB node with indices
- if self.__args[2:]:
+ raise SmiError(
+ 'Instance index %r to OID conversion failure '
+ 'at object %r: '
+ '%s' % (self._args[2:], mibNode.getLabel(), exc))
+
+ elif self._args[2:]: # any other kind of MIB node with indices
+ if self._args[2:]:
instId = rfc1902.ObjectName(
- '.'.join([str(x) for x in self.__args[2:]])
- )
- self.__oid += instId
- self.__indices = (instId,)
- self.__state |= self.ST_CLEAN
+ '.'.join(str(x) for x in self._args[2:]))
+
+ self._oid += instId
+
+ self._indices = (instId,)
- debug.logger & debug.FLAG_MIB and debug.logger('resolved indices are %r' % (self.__indices,))
+ self._state |= self.ST_CLEAN
+
+ debug.logger & debug.FLAG_MIB and debug.logger(
+ 'resolved indices are %r' % (self._indices,))
return self
+
else:
raise SmiError('Non-OID, label or MIB symbol')
def prettyPrint(self):
- if self.__state & self.ST_CLEAN:
+ if self._state & self.ST_CLEAN:
s = rfc1902.OctetString()
+
return '%s::%s%s%s' % (
- self.__modName, self.__symName,
- self.__indices and '.' or '',
- '.'.join([x.isSuperTypeOf(s, matchConstraints=False) and '"%s"' % x.prettyPrint() or x.prettyPrint() for x in self.__indices])
- )
+ self._modName, self._symName,
+ self._indices and '.' or '',
+ '.'.join((x.isSuperTypeOf(s, matchConstraints=False) and
+ '"%s"' % x.prettyPrint() or
+ x.prettyPrint()) for x in self._indices))
+
else:
- raise SmiError('%s object not fully initialized' % self.__class__.__name__)
+ raise SmiError(
+ '%s object not fully initialized' % self.__class__.__name__)
def __repr__(self):
- return '%s(%s)' % (self.__class__.__name__, ', '.join([repr(x) for x in self.__args]))
+ return '%s(%s)' % (self.__class__.__name__,
+ ', '.join(repr(x) for x in self._args))
# Redirect some attrs access to the OID object to behave alike
def __str__(self):
- if self.__state & self.ST_CLEAN:
- return str(self.__oid)
+ if self._state & self.ST_CLEAN:
+ return str(self._oid)
+
else:
- raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+ raise SmiError('%s object not properly '
+ 'initialized' % self.__class__.__name__)
def __eq__(self, other):
- if self.__state & self.ST_CLEAN:
- return self.__oid == other
+ if self._state & self.ST_CLEAN:
+ return self._oid == other
+
else:
- raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+ raise SmiError('%s object not properly '
+ 'initialized' % self.__class__.__name__)
def __ne__(self, other):
- if self.__state & self.ST_CLEAN:
- return self.__oid != other
+ if self._state & self.ST_CLEAN:
+ return self._oid != other
+
else:
- raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+ raise SmiError('%s object not properly '
+ 'initialized' % self.__class__.__name__)
def __lt__(self, other):
- if self.__state & self.ST_CLEAN:
- return self.__oid < other
+ if self._state & self.ST_CLEAN:
+ return self._oid < other
+
else:
- raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+ raise SmiError('%s object not properly '
+ 'initialized' % self.__class__.__name__)
def __le__(self, other):
- if self.__state & self.ST_CLEAN:
- return self.__oid <= other
+ if self._state & self.ST_CLEAN:
+ return self._oid <= other
+
else:
- raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+ raise SmiError('%s object not properly '
+ 'initialized' % self.__class__.__name__)
def __gt__(self, other):
- if self.__state & self.ST_CLEAN:
- return self.__oid > other
+ if self._state & self.ST_CLEAN:
+ return self._oid > other
+
else:
- raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+ raise SmiError('%s object not properly '
+ 'initialized' % self.__class__.__name__)
def __ge__(self, other):
- if self.__state & self.ST_CLEAN:
- return self.__oid > other
+ if self._state & self.ST_CLEAN:
+ return self._oid > other
+
else:
- raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+ raise SmiError('%s object not properly '
+ 'initialized' % self.__class__.__name__)
def __nonzero__(self):
- if self.__state & self.ST_CLEAN:
- return self.__oid != 0
+ if self._state & self.ST_CLEAN:
+ return self._oid != 0
+
else:
- raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+ raise SmiError('%s object not properly '
+ 'initialized' % self.__class__.__name__)
def __bool__(self):
- if self.__state & self.ST_CLEAN:
- return bool(self.__oid)
+ if self._state & self.ST_CLEAN:
+ return bool(self._oid)
+
else:
- raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+ raise SmiError('%s object not properly '
+ 'initialized' % self.__class__.__name__)
def __getitem__(self, i):
- if self.__state & self.ST_CLEAN:
- return self.__oid[i]
+ if self._state & self.ST_CLEAN:
+ return self._oid[i]
+
else:
- raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+ raise SmiError('%s object not properly '
+ 'initialized' % self.__class__.__name__)
def __len__(self):
- if self.__state & self.ST_CLEAN:
- return len(self.__oid)
+ if self._state & self.ST_CLEAN:
+ return len(self._oid)
+
else:
- raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+ raise SmiError('%s object not properly '
+ 'initialized' % self.__class__.__name__)
def __add__(self, other):
- if self.__state & self.ST_CLEAN:
- return self.__oid + other
+ if self._state & self.ST_CLEAN:
+ return self._oid + other
+
else:
- raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+ raise SmiError('%s object not properly '
+ 'initialized' % self.__class__.__name__)
def __radd__(self, other):
- if self.__state & self.ST_CLEAN:
- return other + self.__oid
+ if self._state & self.ST_CLEAN:
+ return other + self._oid
+
else:
- raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+ raise SmiError('%s object not properly '
+ 'initialized' % self.__class__.__name__)
def __hash__(self):
- if self.__state & self.ST_CLEAN:
- return hash(self.__oid)
+ if self._state & self.ST_CLEAN:
+ return hash(self._oid)
+
else:
- raise SmiError('%s object not properly initialized' % self.__class__.__name__)
+ raise SmiError('%s object not properly '
+ 'initialized' % self.__class__.__name__)
def __getattr__(self, attr):
- if self.__state & self.ST_CLEAN:
+ if self._state & self.ST_CLEAN:
if attr in ('asTuple', 'clone', 'subtype', 'isPrefixOf',
'isSameTypeWith', 'isSuperTypeOf', 'getTagSet',
'getEffectiveTagSet', 'getTagMap', 'tagSet', 'index'):
- return getattr(self.__oid, attr)
+ return getattr(self._oid, attr)
+
raise AttributeError(attr)
+
else:
- raise SmiError('%s object not properly initialized for accessing %s' % (self.__class__.__name__, attr))
+ raise SmiError(
+ '%s object not properly initialized for accessing '
+ '%s' % (self.__class__.__name__, attr))
-# A two-element sequence of ObjectIdentity and SNMP data type object
class ObjectType(object):
"""Create an object representing MIB variable.
@@ -692,30 +789,35 @@ class ObjectType(object):
ObjectType(ObjectIdentity('1.3.6.1.2.1.1.1.0'), Null(''))
>>> ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0), 'Linux i386')
ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0), 'Linux i386')
-
"""
ST_DIRTY, ST_CLEAM = 1, 2
def __init__(self, objectIdentity, objectSyntax=rfc1905.unSpecified):
if not isinstance(objectIdentity, ObjectIdentity):
- raise SmiError('initializer should be ObjectIdentity instance, not %r' % (objectIdentity,))
- self.__args = [objectIdentity, objectSyntax]
- self.__state = self.ST_DIRTY
+ raise SmiError(
+ 'initializer should be ObjectIdentity instance, '
+ 'not %r' % (objectIdentity,))
+
+ self._args = [objectIdentity, objectSyntax]
+ self._state = self.ST_DIRTY
def __getitem__(self, i):
- if self.__state & self.ST_CLEAM:
- return self.__args[i]
+ if self._state & self.ST_CLEAM:
+ return self._args[i]
+
else:
- raise SmiError('%s object not fully initialized' % self.__class__.__name__)
+ raise SmiError(
+ '%s object not fully initialized' % self.__class__.__name__)
def __str__(self):
return self.prettyPrint()
def __repr__(self):
- return '%s(%s)' % (self.__class__.__name__, ', '.join([repr(x) for x in self.__args]))
+ return '%s(%s)' % (self.__class__.__name__,
+ ', '.join([repr(x) for x in self._args]))
def isFullyResolved(self):
- return self.__state & self.ST_CLEAM
+ return self._state & self.ST_CLEAM
def addAsn1MibSource(self, *asn1Sources, **kwargs):
"""Adds path to a repository to search ASN.1 MIB files.
@@ -747,7 +849,7 @@ class ObjectType(object):
>>>
"""
- self.__args[0].addAsn1MibSource(*asn1Sources, **kwargs)
+ self._args[0].addAsn1MibSource(*asn1Sources, **kwargs)
return self
def addMibSource(self, *mibSources):
@@ -779,7 +881,7 @@ class ObjectType(object):
>>>
"""
- self.__args[0].addMibSource(*mibSources)
+ self._args[0].addMibSource(*mibSources)
return self
def loadMibs(self, *modNames):
@@ -803,7 +905,7 @@ class ObjectType(object):
>>>
"""
- self.__args[0].loadMibs(*modNames)
+ self._args[0].loadMibs(*modNames)
return self
def resolveWithMib(self, mibViewController):
@@ -842,50 +944,61 @@ class ObjectType(object):
>>>
"""
- if self.__state & self.ST_CLEAM:
+ if self._state & self.ST_CLEAM:
return self
- self.__args[0].resolveWithMib(mibViewController)
+ self._args[0].resolveWithMib(mibViewController)
- MibScalar, MibTableColumn = mibViewController.mibBuilder.importSymbols('SNMPv2-SMI', 'MibScalar',
- 'MibTableColumn')
+ MibScalar, MibTableColumn = mibViewController.mibBuilder.importSymbols(
+ 'SNMPv2-SMI', 'MibScalar', 'MibTableColumn')
- if not isinstance(self.__args[0].getMibNode(),
+ if not isinstance(self._args[0].getMibNode(),
(MibScalar, MibTableColumn)):
- if not isinstance(self.__args[1], AbstractSimpleAsn1Item):
- raise SmiError('MIB object %r is not OBJECT-TYPE (MIB not loaded?)' % (self.__args[0],))
- self.__state |= self.ST_CLEAM
+ if not isinstance(self._args[1], AbstractSimpleAsn1Item):
+ raise SmiError('MIB object %r is not OBJECT-TYPE '
+ '(MIB not loaded?)' % (self._args[0],))
+
+ self._state |= self.ST_CLEAM
+
return self
- if isinstance(self.__args[1], (rfc1905.UnSpecified,
- rfc1905.NoSuchObject,
- rfc1905.NoSuchInstance,
- rfc1905.EndOfMibView)):
- self.__state |= self.ST_CLEAM
+ if isinstance(self._args[1], (rfc1905.UnSpecified,
+ rfc1905.NoSuchObject,
+ rfc1905.NoSuchInstance,
+ rfc1905.EndOfMibView)):
+ self._state |= self.ST_CLEAM
return self
+ syntax = self._args[0].getMibNode().getSyntax()
+
try:
- self.__args[1] = self.__args[0].getMibNode().getSyntax().clone(self.__args[1])
+ self._args[1] = syntax.clone(self._args[1])
+
except PyAsn1Error as exc:
- raise SmiError('MIB object %r having type %r failed to cast value %r: %s' % (
- self.__args[0].prettyPrint(), self.__args[0].getMibNode().getSyntax().__class__.__name__, self.__args[1],
- exc))
+ raise SmiError(
+ 'MIB object %r having type %r failed to cast value '
+ '%r: %s' % (self._args[0].prettyPrint(),
+ syntax.__class__.__name__, self._args[1], exc))
- if rfc1902.ObjectIdentifier().isSuperTypeOf(self.__args[1], matchConstraints=False):
- self.__args[1] = ObjectIdentity(self.__args[1]).resolveWithMib(mibViewController)
+ if rfc1902.ObjectIdentifier().isSuperTypeOf(
+ self._args[1], matchConstraints=False):
+ self._args[1] = ObjectIdentity(
+ self._args[1]).resolveWithMib(mibViewController)
- self.__state |= self.ST_CLEAM
+ self._state |= self.ST_CLEAM
- debug.logger & debug.FLAG_MIB and debug.logger('resolved %r syntax is %r' % (self.__args[0], self.__args[1]))
+ debug.logger & debug.FLAG_MIB and debug.logger(
+ 'resolved %r syntax is %r' % (self._args[0], self._args[1]))
return self
def prettyPrint(self):
- if self.__state & self.ST_CLEAM:
- return '%s = %s' % (self.__args[0].prettyPrint(),
- self.__args[1].prettyPrint())
+ if self._state & self.ST_CLEAM:
+ return '%s = %s' % (self._args[0].prettyPrint(),
+ self._args[1].prettyPrint())
else:
- raise SmiError('%s object not fully initialized' % self.__class__.__name__)
+ raise SmiError(
+ '%s object not fully initialized' % self.__class__.__name__)
class NotificationType(object):
@@ -956,22 +1069,29 @@ class NotificationType(object):
def __init__(self, objectIdentity, instanceIndex=(), objects={}):
if not isinstance(objectIdentity, ObjectIdentity):
- raise SmiError('initializer should be ObjectIdentity instance, not %r' % (objectIdentity,))
- self.__objectIdentity = objectIdentity
- self.__instanceIndex = instanceIndex
- self.__objects = objects
- self.__varBinds = []
- self.__additionalVarBinds = []
- self.__state = self.ST_DIRTY
+ raise SmiError(
+ 'initializer should be ObjectIdentity instance, not '
+ '%r' % (objectIdentity,))
+
+ self._objectIdentity = objectIdentity
+ self._instanceIndex = instanceIndex
+ self._objects = objects
+ self._varBinds = []
+ self._additionalVarBinds = []
+ self._state = self.ST_DIRTY
def __getitem__(self, i):
- if self.__state & self.ST_CLEAN:
- return self.__varBinds[i]
+ if self._state & self.ST_CLEAN:
+ return self._varBinds[i]
+
else:
- raise SmiError('%s object not fully initialized' % self.__class__.__name__)
+ raise SmiError(
+ '%s object not fully initialized' % self.__class__.__name__)
def __repr__(self):
- return '%s(%r, %r, %r)' % (self.__class__.__name__, self.__objectIdentity, self.__instanceIndex, self.__objects)
+ return '%s(%r, %r, %r)' % (
+ self.__class__.__name__, self._objectIdentity,
+ self._instanceIndex, self._objects)
def addVarBinds(self, *varBinds):
"""Appends variable-binding to notification.
@@ -999,13 +1119,17 @@ class NotificationType(object):
>>> nt.addVarBinds(ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0)))
NotificationType(ObjectIdentity('IP-MIB', 'linkDown'), (), {})
>>>
-
"""
- debug.logger & debug.FLAG_MIB and debug.logger('additional var-binds: %r' % (varBinds,))
- if self.__state & self.ST_CLEAN:
- raise SmiError('%s object is already sealed' % self.__class__.__name__)
+ debug.logger & debug.FLAG_MIB and debug.logger(
+ 'additional var-binds: %r' % (varBinds,))
+
+ if self._state & self.ST_CLEAN:
+ raise SmiError(
+ '%s object is already sealed' % self.__class__.__name__)
+
else:
- self.__additionalVarBinds.extend(varBinds)
+ self._additionalVarBinds.extend(varBinds)
+
return self
def addAsn1MibSource(self, *asn1Sources, **kwargs):
@@ -1036,9 +1160,8 @@ class NotificationType(object):
>>> NotificationType(ObjectIdentity('IF-MIB', 'linkDown'), (), {}).addAsn1Source('http://mibs.snmplabs.com/asn1/@mib@')
NotificationType(ObjectIdentity('IF-MIB', 'linkDown'), (), {})
>>>
-
"""
- self.__objectIdentity.addAsn1MibSource(*asn1Sources, **kwargs)
+ self._objectIdentity.addAsn1MibSource(*asn1Sources, **kwargs)
return self
def addMibSource(self, *mibSources):
@@ -1068,9 +1191,8 @@ class NotificationType(object):
>>> NotificationType(ObjectIdentity('IF-MIB', 'linkDown'), (), {}).addMibSource('/opt/pysnmp/mibs', 'pysnmp_mibs')
NotificationType(ObjectIdentity('IF-MIB', 'linkDown'), (), {})
>>>
-
"""
- self.__objectIdentity.addMibSource(*mibSources)
+ self._objectIdentity.addMibSource(*mibSources)
return self
def loadMibs(self, *modNames):
@@ -1092,13 +1214,12 @@ class NotificationType(object):
>>> NotificationType(ObjectIdentity('IF-MIB', 'linkDown'), (), {}).loadMibs('IF-MIB', 'TCP-MIB')
NotificationType(ObjectIdentity('IF-MIB', 'linkDown'), (), {})
>>>
-
"""
- self.__objectIdentity.loadMibs(*modNames)
+ self._objectIdentity.loadMibs(*modNames)
return self
def isFullyResolved(self):
- return self.__state & self.ST_CLEAN
+ return self._state & self.ST_CLEAN
def resolveWithMib(self, mibViewController):
"""Perform MIB variable ID conversion and notification objects expansion.
@@ -1136,57 +1257,71 @@ class NotificationType(object):
>>> notificationType.resolveWithMib(mibViewController)
NotificationType(ObjectIdentity('IF-MIB', 'linkDown'), (), {})
>>>
-
"""
- if self.__state & self.ST_CLEAN:
+ if self._state & self.ST_CLEAN:
return self
- self.__objectIdentity.resolveWithMib(mibViewController)
+ self._objectIdentity.resolveWithMib(mibViewController)
- self.__varBinds.append(
+ self._varBinds.append(
ObjectType(ObjectIdentity(v2c.apiTrapPDU.snmpTrapOID),
- self.__objectIdentity).resolveWithMib(mibViewController)
- )
+ self._objectIdentity).resolveWithMib(mibViewController))
- SmiNotificationType, = mibViewController.mibBuilder.importSymbols('SNMPv2-SMI', 'NotificationType')
+ SmiNotificationType, = mibViewController.mibBuilder.importSymbols(
+ 'SNMPv2-SMI', 'NotificationType')
- mibNode = self.__objectIdentity.getMibNode()
+ mibNode = self._objectIdentity.getMibNode()
varBindsLocation = {}
if isinstance(mibNode, SmiNotificationType):
+
for notificationObject in mibNode.getObjects():
- objectIdentity = ObjectIdentity(*notificationObject + self.__instanceIndex).resolveWithMib(
- mibViewController)
- self.__varBinds.append(
- ObjectType(objectIdentity,
- self.__objects.get(notificationObject, rfc1905.unSpecified)).resolveWithMib(
- mibViewController)
- )
- varBindsLocation[objectIdentity] = len(self.__varBinds) - 1
+ objectIdentity = ObjectIdentity(
+ *notificationObject + self._instanceIndex)
+
+ objectIdentity.resolveWithMib(mibViewController)
+
+ objectType = ObjectType(
+ objectIdentity, self._objects.get(
+ notificationObject, rfc1905.unSpecified))
+
+ objectType.resolveWithMib(mibViewController)
+
+ self._varBinds.append(objectType)
+
+ varBindsLocation[objectIdentity] = len(self._varBinds) - 1
+
else:
debug.logger & debug.FLAG_MIB and debug.logger(
- 'WARNING: MIB object %r is not NOTIFICATION-TYPE (MIB not loaded?)' % (self.__objectIdentity,))
+ 'WARNING: MIB object %r is not NOTIFICATION-TYPE (MIB not '
+ 'loaded?)' % (self._objectIdentity,))
- for varBinds in self.__additionalVarBinds:
+ for varBinds in self._additionalVarBinds:
if not isinstance(varBinds, ObjectType):
varBinds = ObjectType(ObjectIdentity(varBinds[0]), varBinds[1])
+
varBinds.resolveWithMib(mibViewController)
+
if varBinds[0] in varBindsLocation:
- self.__varBinds[varBindsLocation[varBinds[0]]] = varBinds
+ self._varBinds[varBindsLocation[varBinds[0]]] = varBinds
+
else:
- self.__varBinds.append(varBinds)
+ self._varBinds.append(varBinds)
- self.__additionalVarBinds = []
+ self._additionalVarBinds = []
- self.__state |= self.ST_CLEAN
+ self._state |= self.ST_CLEAN
- debug.logger & debug.FLAG_MIB and debug.logger('resolved %r into %r' % (self.__objectIdentity, self.__varBinds))
+ debug.logger & debug.FLAG_MIB and debug.logger(
+ 'resolved %r into %r' % (self._objectIdentity, self._varBinds))
return self
def prettyPrint(self):
- if self.__state & self.ST_CLEAN:
- return ' '.join(['%s = %s' % (x[0].prettyPrint(), x[1].prettyPrint()) for x in self.__varBinds])
- else:
- raise SmiError('%s object not fully initialized' % self.__class__.__name__)
+ if self._state & self.ST_CLEAN:
+ return ' '.join('%s = %s' % (x[0].prettyPrint(), x[1].prettyPrint())
+ for x in self._varBinds)
+
+ raise SmiError(
+ '%s object not fully initialized' % self.__class__.__name__)
diff --git a/pysnmp/smi/view.py b/pysnmp/smi/view.py
index e9fd405e..46ce5991 100644
--- a/pysnmp/smi/view.py
+++ b/pysnmp/smi/view.py
@@ -6,13 +6,6 @@
#
import sys
-from pysnmp import debug
-from pysnmp.smi import error
-from pysnmp.smi.indices import OidOrderedDict
-from pysnmp.smi.indices import OrderedDict
-
-__all__ = ['MibViewController']
-
if sys.version_info[0] <= 2:
import types
@@ -22,52 +15,69 @@ else:
classTypes = (type,)
instanceTypes = (object,)
+from pysnmp import debug
+from pysnmp.smi import error
+from pysnmp.smi.indices import OidOrderedDict
+from pysnmp.smi.indices import OrderedDict
+
+__all__ = ['MibViewController']
+
class MibViewController(object):
def __init__(self, mibBuilder):
self.mibBuilder = mibBuilder
self.lastBuildId = -1
- self.__mibSymbolsIdx = OrderedDict()
+ self._mibSymbolsIdx = OrderedDict()
# Indexing part
+ def _sortFun(self, name):
+ # This is potentially ambiguous mapping. Sort modules in
+ # ascending age for resolution
+ mb = self.mibBuilder
+
+ if mb.moduleID in mb.mibSymbols[name]:
+ mib = mb.mibSymbols[name][mb.moduleID]
+ revs = mib.getRevisions()
+ if revs:
+ return revs[0]
+
+ return '1970-01-01 00:00'
+
def indexMib(self):
if self.lastBuildId == self.mibBuilder.lastBuildId:
return
- debug.logger & debug.FLAG_MIB and debug.logger('indexMib: re-indexing MIB view')
+ debug.logger & debug.FLAG_MIB and debug.logger(
+ 'indexMib: re-indexing MIB view')
MibScalarInstance, = self.mibBuilder.importSymbols(
- 'SNMPv2-SMI', 'MibScalarInstance'
- )
+ 'SNMPv2-SMI', 'MibScalarInstance')
#
# Create indices
#
# Module name -> module-scope indices
- self.__mibSymbolsIdx.clear()
+ self._mibSymbolsIdx.clear()
- # Oid <-> label indices
+ globMibMod = {
+ 'oidToLabelIdx': OidOrderedDict(),
+ 'labelToOidIdx': {},
+ 'varToNameIdx': {},
+ 'typeToModIdx': OrderedDict(),
+ 'oidToModIdx': {}
+ }
- # This is potentially ambiguous mapping. Sort modules in
- # ascending age for resolution
- def __sortFun(x, b=self.mibBuilder):
- if b.moduleID in b.mibSymbols[x]:
- m = b.mibSymbols[x][b.moduleID]
- r = m.getRevisions()
- if r:
- return r[0]
+ self._mibSymbolsIdx[''] = globMibMod
- return "1970-01-01 00:00"
+ # Oid <-> label indices
- modNames = list(self.mibBuilder.mibSymbols.keys())
- modNames.sort(key=__sortFun)
+ modNames = sorted(self.mibBuilder.mibSymbols, key=self._sortFun)
# Index modules names
- for modName in [''] + modNames:
- # Modules index
- self.__mibSymbolsIdx[modName] = mibMod = {
+ for modName in modNames:
+ mibMod = {
'oidToLabelIdx': OidOrderedDict(),
'labelToOidIdx': {},
'varToNameIdx': {},
@@ -75,76 +85,93 @@ class MibViewController(object):
'oidToModIdx': {}
}
- if not modName:
- globMibMod = mibMod
- continue
+ self._mibSymbolsIdx[modName] = mibMod
# Types & MIB vars indices
for n, v in self.mibBuilder.mibSymbols[modName].items():
if n == self.mibBuilder.moduleID: # do not index this
continue # special symbol
+
if isinstance(v, classTypes):
if n in mibMod['typeToModIdx']:
raise error.SmiError(
- 'Duplicate SMI type %s::%s, has %s' % (modName, n, mibMod['typeToModIdx'][n])
- )
+ 'Duplicate SMI type %s::%s, has '
+ '%s' % (modName, n, mibMod['typeToModIdx'][n]))
+
globMibMod['typeToModIdx'][n] = modName
mibMod['typeToModIdx'][n] = modName
+
elif isinstance(v, instanceTypes):
if isinstance(v, MibScalarInstance):
continue
+
if n in mibMod['varToNameIdx']:
raise error.SmiError(
- 'Duplicate MIB variable %s::%s has %s' % (modName, n, mibMod['varToNameIdx'][n])
- )
+ 'Duplicate MIB variable %s::%s has '
+ '%s' % (modName, n, mibMod['varToNameIdx'][n]))
+
globMibMod['varToNameIdx'][n] = v.name
mibMod['varToNameIdx'][n] = v.name
- # Potentionally ambiguous mapping ahead
+
+ # Potentially ambiguous mapping ahead
globMibMod['oidToModIdx'][v.name] = modName
mibMod['oidToModIdx'][v.name] = modName
globMibMod['oidToLabelIdx'][v.name] = (n,)
mibMod['oidToLabelIdx'][v.name] = (n,)
+
else:
raise error.SmiError(
- 'Unexpected object %s::%s' % (modName, n)
- )
+ 'Unexpected object %s::%s' % (modName, n))
# Build oid->long-label index
- oidToLabelIdx = self.__mibSymbolsIdx['']['oidToLabelIdx']
- labelToOidIdx = self.__mibSymbolsIdx['']['labelToOidIdx']
+ oidToLabelIdx = self._mibSymbolsIdx['']['oidToLabelIdx']
+ labelToOidIdx = self._mibSymbolsIdx['']['labelToOidIdx']
+
prevOid = ()
baseLabel = ()
- for key in oidToLabelIdx.keys():
+
+ for key in oidToLabelIdx:
keydiff = len(key) - len(prevOid)
+
if keydiff > 0:
if prevOid:
if keydiff == 1:
baseLabel = oidToLabelIdx[prevOid]
+
else:
baseLabel += key[-keydiff:-1]
else:
baseLabel = ()
+
elif keydiff < 0:
baseLabel = ()
keyLen = len(key)
+
i = keyLen - 1
+
while i:
k = key[:i]
+
if k in oidToLabelIdx:
baseLabel = oidToLabelIdx[k]
+
if i != keyLen - 1:
baseLabel += key[i:-1]
+
break
+
i -= 1
+
# Build oid->long-label index
oidToLabelIdx[key] = baseLabel + oidToLabelIdx[key]
+
# Build label->oid index
labelToOidIdx[oidToLabelIdx[key]] = key
prevOid = key
# Build module-scope oid->long-label index
- for mibMod in self.__mibSymbolsIdx.values():
- for oid in mibMod['oidToLabelIdx'].keys():
+ for mibMod in self._mibSymbolsIdx.values():
+ for oid in mibMod['oidToLabelIdx']:
mibMod['oidToLabelIdx'][oid] = oidToLabelIdx[oid]
mibMod['labelToOidIdx'][oidToLabelIdx[oid]] = oid
@@ -154,9 +181,11 @@ class MibViewController(object):
def getOrderedModuleName(self, index):
self.indexMib()
- modNames = self.__mibSymbolsIdx.keys()
+
+ modNames = self._mibSymbolsIdx
if modNames:
return modNames[index]
+
raise error.SmiError('No modules loaded at %s' % self)
def getFirstModuleName(self):
@@ -167,69 +196,89 @@ class MibViewController(object):
def getNextModuleName(self, modName):
self.indexMib()
+
try:
- return self.__mibSymbolsIdx.nextKey(modName)
+ return self._mibSymbolsIdx.nextKey(modName)
+
except KeyError:
raise error.SmiError(
- 'No module next to %s at %s' % (modName, self)
- )
+ 'No module next to %s at %s' % (modName, self))
# MIB tree node management
- def __getOidLabel(self, nodeName, oidToLabelIdx, labelToOidIdx):
+ def _getOidLabel(self, nodeName, oidToLabelIdx, labelToOidIdx):
"""getOidLabel(nodeName) -> (oid, label, suffix)"""
if not nodeName:
return nodeName, nodeName, ()
+
if nodeName in labelToOidIdx:
return labelToOidIdx[nodeName], nodeName, ()
+
if nodeName in oidToLabelIdx:
return nodeName, oidToLabelIdx[nodeName], ()
+
if len(nodeName) < 2:
return nodeName, nodeName, ()
- oid, label, suffix = self.__getOidLabel(
- nodeName[:-1], oidToLabelIdx, labelToOidIdx
- )
+
+ oid, label, suffix = self._getOidLabel(
+ nodeName[:-1], oidToLabelIdx, labelToOidIdx)
+
suffix = suffix + nodeName[-1:]
+
resLabel = label + tuple([str(x) for x in suffix])
if resLabel in labelToOidIdx:
return labelToOidIdx[resLabel], resLabel, ()
+
resOid = oid + suffix
if resOid in oidToLabelIdx:
return resOid, oidToLabelIdx[resOid], ()
+
return oid, label, suffix
def getNodeNameByOid(self, nodeName, modName=''):
self.indexMib()
- if modName in self.__mibSymbolsIdx:
- mibMod = self.__mibSymbolsIdx[modName]
+
+ if modName in self._mibSymbolsIdx:
+ mibMod = self._mibSymbolsIdx[modName]
+
else:
raise error.SmiError('No module %s at %s' % (modName, self))
- oid, label, suffix = self.__getOidLabel(
- nodeName, mibMod['oidToLabelIdx'], mibMod['labelToOidIdx']
- )
+
+ oid, label, suffix = self._getOidLabel(
+ nodeName, mibMod['oidToLabelIdx'], mibMod['labelToOidIdx'])
+
if oid == label:
raise error.NoSuchObjectError(
- str='Can\'t resolve node name %s::%s at %s' %
- (modName, nodeName, self)
- )
+ str='Cannot resolve node name %s::%s at '
+ '%s' % (modName, nodeName, self))
+
debug.logger & debug.FLAG_MIB and debug.logger(
- 'getNodeNameByOid: resolved %s:%s -> %s.%s' % (modName, nodeName, label, suffix))
+ 'getNodeNameByOid: resolved %s:%s -> '
+ '%s.%s' % (modName, nodeName, label, suffix))
+
return oid, label, suffix
def getNodeNameByDesc(self, nodeName, modName=''):
self.indexMib()
- if modName in self.__mibSymbolsIdx:
- mibMod = self.__mibSymbolsIdx[modName]
+
+ if modName in self._mibSymbolsIdx:
+ mibMod = self._mibSymbolsIdx[modName]
+
else:
raise error.SmiError('No module %s at %s' % (modName, self))
+
if nodeName in mibMod['varToNameIdx']:
oid = mibMod['varToNameIdx'][nodeName]
+
else:
raise error.NoSuchObjectError(
- str='No such symbol %s::%s at %s' % (modName, nodeName, self)
- )
+ str='No such symbol %s::%s at '
+ '%s' % (modName, nodeName, self))
+
debug.logger & debug.FLAG_MIB and debug.logger(
- 'getNodeNameByDesc: resolved %s:%s -> %s' % (modName, nodeName, oid))
+ 'getNodeNameByDesc: resolved %s:%s -> '
+ '%s' % (modName, nodeName, oid))
+
return self.getNodeNameByOid(oid, modName)
def getNodeName(self, nodeName, modName=''):
@@ -238,28 +287,36 @@ class MibViewController(object):
try:
# First try nodeName as an OID/label
return self.getNodeNameByOid(nodeName, modName)
+
except error.NoSuchObjectError:
# ...on failure, try as MIB symbol
oid, label, suffix = self.getNodeNameByDesc(nodeName[0], modName)
+
# ...with trailing suffix
return self.getNodeNameByOid(oid + suffix + nodeName[1:], modName)
def getOrderedNodeName(self, index, modName=''):
self.indexMib()
- if modName in self.__mibSymbolsIdx:
- mibMod = self.__mibSymbolsIdx[modName]
+
+ if modName in self._mibSymbolsIdx:
+ mibMod = self._mibSymbolsIdx[modName]
+
else:
raise error.SmiError('No module %s at %s' % (modName, self))
+
if not mibMod['oidToLabelIdx']:
raise error.NoSuchObjectError(
- str='No variables at MIB module %s at %s' % (modName, self)
- )
+ str='No variables at MIB module %s at '
+ '%s' % (modName, self))
+
try:
oid, label = mibMod['oidToLabelIdx'].items()[index]
+
except KeyError:
raise error.NoSuchObjectError(
- str='No symbol at position %s in MIB module %s at %s' % (index, modName, self)
- )
+ str='No symbol at position %s in MIB module %s at '
+ '%s' % (index, modName, self))
+
return oid, label, ()
def getFirstNodeName(self, modName=''):
@@ -272,55 +329,67 @@ class MibViewController(object):
oid, label, suffix = self.getNodeName(nodeName, modName)
try:
return self.getNodeName(
- self.__mibSymbolsIdx[modName]['oidToLabelIdx'].nextKey(oid) + suffix, modName
- )
+ self._mibSymbolsIdx[modName]['oidToLabelIdx'].nextKey(oid) + suffix,
+ modName)
+
except KeyError:
raise error.NoSuchObjectError(
- str='No name next to %s::%s at %s' % (modName, nodeName, self)
- )
+ str='No name next to %s::%s at '
+ '%s' % (modName, nodeName, self))
def getParentNodeName(self, nodeName, modName=''):
oid, label, suffix = self.getNodeName(nodeName, modName)
+
if len(oid) < 2:
raise error.NoSuchObjectError(
- str='No parent name for %s::%s at %s' %
- (modName, nodeName, self)
- )
+ str='No parent name for %s::%s at '
+ '%s' % (modName, nodeName, self))
+
return oid[:-1], label[:-1], oid[-1:] + suffix
def getNodeLocation(self, nodeName, modName=''):
oid, label, suffix = self.getNodeName(nodeName, modName)
- return self.__mibSymbolsIdx['']['oidToModIdx'][oid], label[-1], suffix
+ return (self._mibSymbolsIdx['']['oidToModIdx'][oid], label[-1],
+ suffix)
# MIB type management
def getTypeName(self, typeName, modName=''):
self.indexMib()
- if modName in self.__mibSymbolsIdx:
- mibMod = self.__mibSymbolsIdx[modName]
+
+ if modName in self._mibSymbolsIdx:
+ mibMod = self._mibSymbolsIdx[modName]
+
else:
raise error.SmiError(
- 'No module %s at %s' % (modName, self)
- )
+ 'No module %s at %s' % (modName, self))
+
if typeName in mibMod['typeToModIdx']:
m = mibMod['typeToModIdx'][typeName]
+
else:
raise error.NoSuchObjectError(
- str='No such type %s::%s at %s' % (modName, typeName, self)
- )
+ str='No such type %s::%s at '
+ '%s' % (modName, typeName, self))
+
return m, typeName
def getOrderedTypeName(self, index, modName=''):
self.indexMib()
- if modName in self.__mibSymbolsIdx:
- mibMod = self.__mibSymbolsIdx[modName]
+
+ if modName in self._mibSymbolsIdx:
+ mibMod = self._mibSymbolsIdx[modName]
+
else:
raise error.SmiError('No module %s at %s' % (modName, self))
+
if not mibMod['typeToModIdx']:
raise error.NoSuchObjectError(
- str='No types at MIB module %s at %s' % (modName, self)
- )
- t = mibMod['typeToModIdx'].keys()[index]
+ str='No types at MIB module %s at '
+ '%s' % (modName, self))
+
+ t = list(mibMod['typeToModIdx'])[index]
+
return mibMod['typeToModIdx'][t], t
def getFirstTypeName(self, modName=''):
@@ -331,9 +400,11 @@ class MibViewController(object):
def getNextType(self, typeName, modName=''):
m, t = self.getTypeName(typeName, modName)
+
try:
- return self.__mibSymbolsIdx[m]['typeToModIdx'].nextKey(t)
+ return self._mibSymbolsIdx[m]['typeToModIdx'].nextKey(t)
+
except KeyError:
raise error.NoSuchObjectError(
- str='No type next to %s::%s at %s' % (modName, typeName, self)
- )
+ str='No type next to %s::%s at '
+ '%s' % (modName, typeName, self))