diff options
author | elie <elie> | 2009-09-05 15:19:42 +0000 |
---|---|---|
committer | elie <elie> | 2009-09-05 15:19:42 +0000 |
commit | 8ff69d259701248ca38380b0eb43d3e3aae88c25 (patch) | |
tree | 64b2eb38fa2b18bd0866110db31ca168f298dc1d | |
parent | 87cc4781abe9751be8c463bf588cb9c2912984f6 (diff) | |
download | pysnmp-8ff69d259701248ca38380b0eb43d3e3aae88c25.tar.gz |
MIB code objects now supported
-rw-r--r-- | pysnmp/smi/builder.py | 136 |
1 files changed, 105 insertions, 31 deletions
diff --git a/pysnmp/smi/builder.py b/pysnmp/smi/builder.py index c50d6de..cc5d463 100644 --- a/pysnmp/smi/builder.py +++ b/pysnmp/smi/builder.py @@ -1,27 +1,83 @@ # MIB modules loader import os, types, string +import imp, struct, marshal, time from pysnmp.smi import error from pysnmp import debug -class __BaseMibSource: +class __AbstractMibSource: def __init__(self, srcName): - self._srcName = srcName + self._srcName = srcName + self.__magic = imp.get_magic() + self.__sfx = {} + for sfx, mode, typ in imp.get_suffixes(): + self.__sfx[typ] = (sfx, len(sfx), mode) debug.logger & debug.flagBld and debug.logger('trying %s' % self) def __repr__(self): return '%s(\'%s\')' % (self.__class__.__name__, self._srcName) - def fullPath(self, f=''): - return self._srcName + (f and (os.sep + f + '.py') or '') + def _uniqNames(self, files): + u = {} + for f in files: + if f[:9] == '__init__.': + continue + for typ in (imp.PY_SOURCE, imp.PY_COMPILED): + sfx, sfxLen, mode = self.__sfx[typ] + if f[-sfxLen:] == sfx: + u[f[:-sfxLen]] = None + return tuple(u.keys()) - def init(self): raise Exception('Method not implemented') - def listdir(self): raise Exception('Method not implemented') - def read(self, path): raise Exception('Method not implemented') + # MibSource API follows + + def fullPath(self, f='', sfx=''): + return self._srcName + (f and (os.sep + f + sfx) or '') + + def init(self): return self._init() + def listdir(self): return self._listdir() + def read(self, f): + pycSfx, pycSfxLen, pycMode = self.__sfx[imp.PY_COMPILED] + p = os.path.join(self._srcName, f) + pycSfx + try: + pycData = self._getData(p, pycMode) + except IOError: + pycTime = -1 + else: + if self.__magic == pycData[:4]: + pycData = pycData[4:] + pycTime = struct.unpack('L', pycData[:4])[0] + pycData = pycData[4:] + else: + debug.logger & debug.flagBld and debug.logger( + 'bad magic in %s' % p + ) + pycTime = -1 -class ZipMibSource(__BaseMibSource): - def init(self): + debug.logger & debug.flagBld and debug.logger( + 'file %s mtime %d' % (p, pycTime) + ) + + pySfx, pySfxLen, pyMode = self.__sfx[imp.PY_SOURCE] + p = os.path.join(self._srcName, f) + pySfx + try: + pyTime = self._getTimestamp(p) + except (IOError, OSError): + pyTime = -1 + + debug.logger & debug.flagBld and debug.logger( + 'file %s mtime %d' % (p, pyTime) + ) + + if pycTime != -1 and pycTime >= pyTime: + return marshal.loads(pycData), pycSfx + if pyTime != -1: + return self._getData(p, pyMode), pySfx + + raise IOError('No suitable module found') + +class ZipMibSource(__AbstractMibSource): + def _init(self): p = __import__( - self._srcName, globals(), locals(), string.split(self._srcName, '.') + self._srcName, globals(),locals(), string.split(self._srcName, '.') ) if hasattr(p, '__loader__'): self.__loader = p.__loader__ @@ -29,32 +85,50 @@ class ZipMibSource(__BaseMibSource): return self else: return DirMibSource(os.path.split(p.__file__)[0]).init() - - def listdir(self): + + def _parseDosTime(self, dosdate, dostime): + t = ( ((dosdate >> 9) & 0x7f) + 1980, # year + ((dosdate >> 5) & 0x0f), # month + dosdate & 0x1f, # mday + (dostime >> 11) & 0x1f, # hour + (dostime >> 5) & 0x3f, # min + (dostime & 0x1f) * 2, # sec + -1, # wday + -1, # yday + -1 ) # dst + return time.mktime(t) + + def _listdir(self): l = [] for f in self.__loader._files.keys(): d, f = os.path.split(f) - if d == self._srcName and f != '__init__.py' and f[-3:] == '.py': - l.append(f[:-3]) - return tuple(l) + if d == self._srcName: + l.append(f) + return tuple(self._uniqNames(l)) - def read(self, f): - return self.__loader.get_data(os.path.join(self._srcName, f) + '.py') + def _getTimestamp(self, p): + if self.__loader._files.has_key(p): + return self._parseDosTime( + self.__loader._files[p][6], + self.__loader._files[p][5] + ) + else: + raise IOError('No file in ZIP: %s' % p) + + def _getData(self, p, mode=None): return self.__loader.get_data(p) -class DirMibSource(__BaseMibSource): - def init(self): +class DirMibSource(__AbstractMibSource): + def _init(self): self._srcName = os.path.normpath(self._srcName) return self - def listdir(self): - l = [] - for f in os.listdir(self._srcName): - if f != '__init__.py' and f[-3:] == '.py': - l.append(f[:-3]) - return tuple(l) - - def read(self, f): - return open(os.path.join(self._srcName, f) + '.py').read() + def _listdir(self): + return self._uniqNames(os.listdir(self._srcName)) + + def _getTimestamp(self, p): + return os.stat(p)[8] + + def _getData(self, p, mode): return open(p, mode).read() class MibBuilder: loadTexts = 0 @@ -120,12 +194,12 @@ class MibBuilder: for mibSource in self.__mibSources: debug.logger & debug.flagBld and debug.logger('loadModules: trying %s at %s' % (modName, mibSource)) try: - modData = mibSource.read(modName) + modData, sfx = mibSource.read(modName) except IOError, why: debug.logger & debug.flagBld and debug.logger('loadModules: read %s from %s failed: %s' % (modName, mibSource, why)) continue - modPath = mibSource.fullPath(modName) + modPath = mibSource.fullPath(modName, sfx) if self.__modPathsSeen.has_key(modPath): debug.logger & debug.flagBld and debug.logger('loadModules: seen %s' % modPath) @@ -153,7 +227,7 @@ class MibBuilder: if not self.__modSeen.has_key(modName): raise error.SmiError( - 'MIB file \"%s\" not found in search path' % (modName and modName + ".py") + 'MIB file \"%s\" not found in search path' % (modName and modName + ".py[co]") ) return self |