diff options
Diffstat (limited to 'pysnmp/entity')
-rw-r--r-- | pysnmp/entity/config.py | 49 | ||||
-rw-r--r-- | pysnmp/entity/rfc3413/cmdgen.py | 39 | ||||
-rw-r--r-- | pysnmp/entity/rfc3413/cmdrsp.py | 3 | ||||
-rw-r--r-- | pysnmp/entity/rfc3413/mibvar.py | 82 | ||||
-rw-r--r-- | pysnmp/entity/rfc3413/ntforg.py | 61 | ||||
-rw-r--r-- | pysnmp/entity/rfc3413/ntfrcv.py | 15 | ||||
-rw-r--r-- | pysnmp/entity/rfc3413/oneliner/__init__.py | 1 | ||||
-rw-r--r-- | pysnmp/entity/rfc3413/oneliner/cmdgen.py | 254 | ||||
-rw-r--r-- | pysnmp/entity/rfc3413/oneliner/ntforg.py | 180 |
9 files changed, 5 insertions, 679 deletions
diff --git a/pysnmp/entity/config.py b/pysnmp/entity/config.py index 06d6c14d..d34c3773 100644 --- a/pysnmp/entity/config.py +++ b/pysnmp/entity/config.py @@ -5,7 +5,7 @@ # License: http://snmplabs.com/pysnmp/license.html # from pyasn1.compat.octets import null -from pysnmp.carrier.asyncore.dgram import udp, udp6, unix +from pysnmp.carrier.asyncore.dgram import udp, udp6 from pysnmp.proto.secmod.rfc3414.auth import hmacmd5, hmacsha, noauth from pysnmp.proto.secmod.rfc3414.priv import des, nopriv from pysnmp.proto.secmod.rfc3826.priv import aes @@ -19,7 +19,6 @@ from pysnmp import error # Transports snmpUDPDomain = udp.snmpUDPDomain snmpUDP6Domain = udp6.snmpUDP6Domain -snmpLocalDomain = unix.snmpLocalDomain # Auth protocol usmHMACMD5AuthProtocol = hmacmd5.HmacMd5.serviceID @@ -129,15 +128,11 @@ def addV3User(snmpEngine, userName, authProtocol=usmNoAuthProtocol, authKey=None, privProtocol=usmNoPrivProtocol, privKey=None, securityEngineId=None, - securityName=None, - # deprecated parameters follow - contextEngineId=None): + securityName=None): mibBuilder = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder if securityName is None: securityName = userName - if securityEngineId is None: # backward compatibility - securityEngineId = contextEngineId (snmpEngineID, usmUserEntry, tblIdx1, pysnmpUsmSecretEntry, tblIdx2) = __cookV3UserInfo(snmpEngine, userName, securityEngineId) @@ -203,11 +198,7 @@ def addV3User(snmpEngine, userName, def delV3User(snmpEngine, userName, - securityEngineId=None, - # deprecated parameters follow - contextEngineId=None): - if securityEngineId is None: # backward compatibility - securityEngineId = contextEngineId + securityEngineId=None): (snmpEngineID, usmUserEntry, tblIdx1, pysnmpUsmSecretEntry, tblIdx2) = __cookV3UserInfo(snmpEngine, userName, securityEngineId) snmpEngine.msgAndPduDsp.mibInstrumController.writeVars( @@ -558,40 +549,6 @@ def delVacmUser(snmpEngine, securityModel, securityName, securityLevel, ) -# Obsolete shortcuts for add/delVacmUser() wrappers - -def addRoUser(snmpEngine, securityModel, securityName, securityLevel, subTree): - addVacmUser(snmpEngine, securityModel, securityName, - securityLevel, subTree) - - -def delRoUser(snmpEngine, securityModel, securityName, securityLevel, subTree): - delVacmUser(snmpEngine, securityModel, securityName, securityLevel, - subTree) - - -def addRwUser(snmpEngine, securityModel, securityName, securityLevel, subTree): - addVacmUser(snmpEngine, securityModel, securityName, securityLevel, - subTree, subTree) - - -def delRwUser(snmpEngine, securityModel, securityName, securityLevel, subTree): - delVacmUser(snmpEngine, securityModel, securityName, securityLevel, - subTree, subTree) - - -def addTrapUser(snmpEngine, securityModel, securityName, - securityLevel, subTree): - addVacmUser(snmpEngine, securityModel, securityName, securityLevel, - (), (), subTree) - - -def delTrapUser(snmpEngine, securityModel, securityName, - securityLevel, subTree): - delVacmUser(snmpEngine, securityModel, securityName, securityLevel, - (), (), subTree) - - # Notification target setup def __cookNotificationTargetInfo(snmpEngine, notificationName, paramsName, diff --git a/pysnmp/entity/rfc3413/cmdgen.py b/pysnmp/entity/rfc3413/cmdgen.py index be7eee0d..90180c67 100644 --- a/pysnmp/entity/rfc3413/cmdgen.py +++ b/pysnmp/entity/rfc3413/cmdgen.py @@ -205,10 +205,6 @@ class CommandGenerator(object): return sendRequestHandle -# backward compatibility stub -CommandGeneratorBase = CommandGenerator - - class GetCommandGenerator(CommandGenerator): def processResponseVarBinds(self, snmpEngine, sendRequestHandle, errorIndication, PDU, cbCtx): @@ -410,38 +406,3 @@ class BulkCommandGenerator(BulkCommandGeneratorSingleRun): sendRequestHandle, statusInformation)) cbFun(snmpEngine, sendRequestHandle, statusInformation['errorIndication'], 0, 0, (), cbCtx) - - -# -# Obsolete, compatibility interfaces. -# - -def __sendReqCbFun(snmpEngine, sendRequestHandle, errorIndication, - errorStatus, errorIndex, varBinds, cbCtx): - cbFun, cbCtx = cbCtx - return cbFun(sendRequestHandle, errorIndication, errorStatus, - errorIndex, varBinds, cbCtx) - - -def _sendReq(self, snmpEngine, targetName, varBinds, cbFun, - cbCtx=None, contextEngineId=None, contextName=''): - return self.sendVarBinds(snmpEngine, targetName, contextEngineId, - contextName, varBinds, __sendReqCbFun, - (cbFun, cbCtx)) - - -def _sendBulkReq(self, snmpEngine, targetName, nonRepeaters, maxRepetitions, - varBinds, cbFun, cbCtx=None, contextEngineId=None, - contextName=''): - return self.sendVarBinds(snmpEngine, targetName, contextEngineId, - contextName, nonRepeaters, maxRepetitions, - varBinds, __sendReqCbFun, (cbFun, cbCtx)) - - -# install compatibility wrappers -GetCommandGenerator.sendReq = _sendReq -SetCommandGenerator.sendReq = _sendReq -NextCommandGenerator.sendReq = _sendReq -NextCommandGeneratorSingleRun.sendReq = _sendReq -BulkCommandGenerator.sendReq = _sendBulkReq -BulkCommandGeneratorSingleRun.sendReq = _sendBulkReq diff --git a/pysnmp/entity/rfc3413/cmdrsp.py b/pysnmp/entity/rfc3413/cmdrsp.py index b260918b..3f99f004 100644 --- a/pysnmp/entity/rfc3413/cmdrsp.py +++ b/pysnmp/entity/rfc3413/cmdrsp.py @@ -52,9 +52,6 @@ class CommandResponderBase(object): self.sendPdu(snmpEngine, stateReference, PDU) - # backward compatibility - sendRsp = sendVarBinds - def sendPdu(self, snmpEngine, stateReference, PDU): (messageProcessingModel, securityModel, securityName, securityLevel, contextEngineId, contextName, diff --git a/pysnmp/entity/rfc3413/mibvar.py b/pysnmp/entity/rfc3413/mibvar.py deleted file mode 100644 index 02051b5c..00000000 --- a/pysnmp/entity/rfc3413/mibvar.py +++ /dev/null @@ -1,82 +0,0 @@ -# -# This file is part of pysnmp software. -# -# Copyright (c) 2005-2018, Ilya Etingof <etingof@gmail.com> -# License: http://snmplabs.com/pysnmp/license.html -# -# THESE FUNCTIONS ARE OBSOLETE AND MUST NOT BE USED! -# USE pysnmp.entity.rfc3413.oneliner.mibvar INSTEAD -# -from pyasn1.type import univ -from pysnmp.smi.error import NoSuchObjectError - - -# Name - -def mibNameToOid(mibView, name): - if isinstance(name[0], tuple): - f = lambda x='', y='': (x, y) - modName, symName = f(*name[0]) - if modName: # load module if needed - mibView.mibBuilder.loadModules(modName) - else: - mibView.mibBuilder.loadModules() # load all (slow) - if symName: - oid, label, suffix = mibView.getNodeNameByDesc(symName, modName) - else: - oid, label, suffix = mibView.getFirstNodeName(modName) - suffix = name[1:] - modName, symName, _s = mibView.getNodeLocation(oid) - mibNode, = mibView.mibBuilder.importSymbols( - modName, symName - ) - if hasattr(mibNode, 'createTest'): # table column XXX - modName, symName, _s = mibView.getNodeLocation(oid[:-1]) - rowNode, = mibView.mibBuilder.importSymbols(modName, symName) - return oid, rowNode.getInstIdFromIndices(*suffix) - else: # scalar or incomplete spec - return oid, suffix - elif not isinstance(name, tuple): - name = tuple(univ.ObjectIdentifier(name)) - - oid, label, suffix = mibView.getNodeNameByOid(name) - - return oid, suffix - - -__scalarSuffix = (univ.Integer(0),) - - -def oidToMibName(mibView, oid): - if not isinstance(oid, tuple): - oid = tuple(univ.ObjectIdentifier(oid)) - _oid, label, suffix = mibView.getNodeNameByOid(oid) - modName, symName, __suffix = mibView.getNodeLocation(_oid) - mibNode, = mibView.mibBuilder.importSymbols( - modName, symName - ) - if hasattr(mibNode, 'createTest'): # table column - __modName, __symName, __s = mibView.getNodeLocation(_oid[:-1]) - rowNode, = mibView.mibBuilder.importSymbols(__modName, __symName) - return (symName, modName), rowNode.getIndicesFromInstId(suffix) - elif not suffix: # scalar - return (symName, modName), suffix - elif suffix == (0,): # scalar - return (symName, modName), __scalarSuffix - else: - raise NoSuchObjectError( - str='No MIB registered that defines %s object, closest known parent is %s (%s::%s)' % ( - univ.ObjectIdentifier(oid), univ.ObjectIdentifier(mibNode.name), modName, symName) - ) - - -# Value - -def cloneFromMibValue(mibView, modName, symName, value): - mibNode, = mibView.mibBuilder.importSymbols( - modName, symName - ) - if hasattr(mibNode, 'syntax'): # scalar - return mibNode.syntax.clone(value) - else: - return # identifier diff --git a/pysnmp/entity/rfc3413/ntforg.py b/pysnmp/entity/rfc3413/ntforg.py index abf1a47f..85bdb9d0 100644 --- a/pysnmp/entity/rfc3413/ntforg.py +++ b/pysnmp/entity/rfc3413/ntforg.py @@ -333,66 +333,5 @@ class NotificationOriginator(object): return notificationHandle - -# -# Obsolete, compatibility interfaces. -# - -def _sendNotificationCbFun(snmpEngine, sendRequestHandle, errorIndication, - errorStatus, errorIndex, varBinds, cbCtx): - cbFun, cbCtx = cbCtx - - try: - # we need to pass response PDU information to user for INFORMs - cbFun(sendRequestHandle, errorIndication, - errorStatus, errorIndex, varBinds, cbCtx) - except TypeError: - # a backward compatible way of calling user function - cbFun(sendRequestHandle, errorIndication, cbCtx) - - -def _sendNotification(self, snmpEngine, notificationTarget, notificationName, - additionalVarBinds=(), cbFun=None, cbCtx=None, - contextName=null, instanceIndex=None): - if self.snmpContext is None: - raise error.ProtocolError('SNMP context not specified') - - # - # Here we first expand trap OID into associated OBJECTS - # and then look them up at context-specific MIB - # - - mibViewController = snmpEngine.getUserContext('mibViewController') - if not mibViewController: - mibViewController = view.MibViewController(snmpEngine.getMibBuilder()) - snmpEngine.setUserContext(mibViewController=mibViewController) - - # Support the following syntax: - # '1.2.3.4' - # (1,2,3,4) - # ('MIB', 'symbol') - if isinstance(notificationName, (tuple, list)) and \ - notificationName and isinstance(notificationName[0], str): - notificationName = rfc1902.ObjectIdentity(*notificationName) - else: - notificationName = rfc1902.ObjectIdentity(notificationName) - - varBinds = rfc1902.NotificationType(notificationName, - instanceIndex=instanceIndex) - varBinds.resolveWithMib(mibViewController) - - mibInstrumController = self.snmpContext.getMibInstrum(contextName) - - varBinds = varBinds[:1] + mibInstrumController.readVars(varBinds[1:]) - - return self.sendVarBinds(snmpEngine, notificationTarget, - self.snmpContext.contextEngineId, - contextName, varBinds + list(additionalVarBinds), - _sendNotificationCbFun, (cbFun, cbCtx)) - - -# install compatibility wrapper -NotificationOriginator.sendNotification = _sendNotification - # XXX # move/group/implement config setting/retrieval at a stand-alone module diff --git a/pysnmp/entity/rfc3413/ntfrcv.py b/pysnmp/entity/rfc3413/ntfrcv.py index 841ca793..34192ad3 100644 --- a/pysnmp/entity/rfc3413/ntfrcv.py +++ b/pysnmp/entity/rfc3413/ntfrcv.py @@ -23,7 +23,6 @@ class NotificationReceiver(object): ) self.__snmpTrapCommunity = '' - self.__cbFunVer = 0 self.__cbFun = cbFun self.__cbCtx = cbCtx @@ -101,15 +100,5 @@ class NotificationReceiver(object): 'processPdu: stateReference %s, user cbFun %s, cbCtx %s, varBinds %s' % ( stateReference, self.__cbFun, self.__cbCtx, varBinds)) - if self.__cbFunVer: - self.__cbFun(snmpEngine, stateReference, contextEngineId, - contextName, varBinds, self.__cbCtx) - else: - # Compatibility stub (handle legacy cbFun interface) - try: - self.__cbFun(snmpEngine, contextEngineId, contextName, - varBinds, self.__cbCtx) - except TypeError: - self.__cbFunVer = 1 - self.__cbFun(snmpEngine, stateReference, contextEngineId, - contextName, varBinds, self.__cbCtx) + self.__cbFun(snmpEngine, stateReference, contextEngineId, + contextName, varBinds, self.__cbCtx) diff --git a/pysnmp/entity/rfc3413/oneliner/__init__.py b/pysnmp/entity/rfc3413/oneliner/__init__.py deleted file mode 100644 index 8c3066b2..00000000 --- a/pysnmp/entity/rfc3413/oneliner/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# This file is necessary to make this directory a package. diff --git a/pysnmp/entity/rfc3413/oneliner/cmdgen.py b/pysnmp/entity/rfc3413/oneliner/cmdgen.py deleted file mode 100644 index ecbca451..00000000 --- a/pysnmp/entity/rfc3413/oneliner/cmdgen.py +++ /dev/null @@ -1,254 +0,0 @@ -# -# This file is part of pysnmp software. -# -# Copyright (c) 2005-2018, Ilya Etingof <etingof@gmail.com> -# License: http://snmplabs.com/pysnmp/license.html -# -# All code in this file belongs to obsolete, compatibility wrappers. -# Never use interfaces below for new applications! -# -from pysnmp.hlapi.asyncore import * -from pysnmp.hlapi.asyncore import sync -from pysnmp.hlapi.varbinds import * -from pysnmp.hlapi.lcd import * -from pyasn1.compat.octets import null -from pyasn1.type import univ - -__all__ = ['AsynCommandGenerator', 'CommandGenerator', 'MibVariable'] - -MibVariable = ObjectIdentity - - -class AsynCommandGenerator(object): - _null = univ.Null('') - - vbProcessor = CommandGeneratorVarBinds() - lcd = CommandGeneratorLcdConfigurator() - - def __init__(self, snmpEngine=None): - if snmpEngine is None: - self.snmpEngine = SnmpEngine() - else: - self.snmpEngine = snmpEngine - - self.mibViewController = self.vbProcessor.getMibViewController(self.snmpEngine) - - def __del__(self): - self.lcd.unconfigure(self.snmpEngine) - - def cfgCmdGen(self, authData, transportTarget): - return self.lcd.configure(self.snmpEngine, authData, transportTarget) - - def uncfgCmdGen(self, authData=None): - return self.lcd.unconfigure(self.snmpEngine, authData) - - # compatibility stub - def makeReadVarBinds(self, varNames): - return self.makeVarBinds([(x, self._null) for x in varNames]) - - def makeVarBinds(self, varBinds): - return self.vbProcessor.makeVarBinds(self.snmpEngine, varBinds) - - def unmakeVarBinds(self, varBinds, lookupNames, lookupValues): - return self.vbProcessor.unmakeVarBinds( - self.snmpEngine, varBinds, lookupNames or lookupValues - ) - - def getCmd(self, authData, transportTarget, varNames, cbInfo, - lookupNames=False, lookupValues=False, - contextEngineId=None, contextName=null): - - def __cbFun(snmpEngine, sendRequestHandle, - errorIndication, errorStatus, errorIndex, - varBindTable, cbInfo): - cbFun, cbCtx = cbInfo - cbFun(sendRequestHandle, - errorIndication, errorStatus, errorIndex, - varBindTable, cbCtx) - - # for backward compatibility - if contextName is null and authData.contextName: - contextName = authData.contextName - - return getCmd( - self.snmpEngine, authData, transportTarget, - ContextData(contextEngineId, contextName), - *[(x, self._null) for x in varNames], - **dict(cbFun=__cbFun, cbCtx=cbInfo, - lookupMib=lookupNames or lookupValues) - ) - - asyncGetCmd = getCmd - - def setCmd(self, authData, transportTarget, varBinds, cbInfo, - lookupNames=False, lookupValues=False, - contextEngineId=None, contextName=null): - - def __cbFun(snmpEngine, sendRequestHandle, - errorIndication, errorStatus, errorIndex, - varBindTable, cbInfo): - cbFun, cbCtx = cbInfo - cbFun(sendRequestHandle, - errorIndication, errorStatus, errorIndex, - varBindTable, cbCtx) - - # for backward compatibility - if contextName is null and authData.contextName: - contextName = authData.contextName - - return setCmd( - self.snmpEngine, authData, transportTarget, - ContextData(contextEngineId, contextName), *varBinds, - **dict(cbFun=__cbFun, cbCtx=cbInfo, - lookupMib=lookupNames or lookupValues) - ) - - asyncSetCmd = setCmd - - def nextCmd(self, authData, transportTarget, varNames, cbInfo, - lookupNames=False, lookupValues=False, - contextEngineId=None, contextName=null): - - def __cbFun(snmpEngine, sendRequestHandle, - errorIndication, errorStatus, errorIndex, - varBindTable, cbInfo): - cbFun, cbCtx = cbInfo - return cbFun(sendRequestHandle, - errorIndication, errorStatus, errorIndex, - varBindTable, cbCtx) - - # for backward compatibility - if contextName is null and authData.contextName: - contextName = authData.contextName - - return nextCmd( - self.snmpEngine, authData, transportTarget, - ContextData(contextEngineId, contextName), - *[(x, self._null) for x in varNames], - **dict(cbFun=__cbFun, cbCtx=cbInfo, - lookupMib=lookupNames or lookupValues) - ) - - asyncNextCmd = nextCmd - - def bulkCmd(self, authData, transportTarget, - nonRepeaters, maxRepetitions, varNames, cbInfo, - lookupNames=False, lookupValues=False, - contextEngineId=None, contextName=null): - - def __cbFun(snmpEngine, sendRequestHandle, - errorIndication, errorStatus, errorIndex, - varBindTable, cbInfo): - cbFun, cbCtx = cbInfo - return cbFun(sendRequestHandle, - errorIndication, errorStatus, errorIndex, - varBindTable, cbCtx) - - # for backward compatibility - if contextName is null and authData.contextName: - contextName = authData.contextName - - return bulkCmd( - self.snmpEngine, authData, transportTarget, - ContextData(contextEngineId, contextName), - nonRepeaters, maxRepetitions, - *[(x, self._null) for x in varNames], - **dict(cbFun=__cbFun, cbCtx=cbInfo, - lookupMib=lookupNames or lookupValues) - ) - - asyncBulkCmd = bulkCmd - - -class CommandGenerator(object): - _null = univ.Null('') - - def __init__(self, snmpEngine=None, asynCmdGen=None): - # compatibility attributes - self.snmpEngine = snmpEngine or SnmpEngine() - - def getCmd(self, authData, transportTarget, *varNames, **kwargs): - if 'lookupNames' not in kwargs: - kwargs['lookupNames'] = False - if 'lookupValues' not in kwargs: - kwargs['lookupValues'] = False - errorIndication, errorStatus, errorIndex, varBinds = None, 0, 0, [] - for (errorIndication, - errorStatus, - errorIndex, - varBinds) in sync.getCmd(self.snmpEngine, authData, transportTarget, - ContextData(kwargs.get('contextEngineId'), - kwargs.get('contextName', null)), - *[(x, self._null) for x in varNames], - **kwargs): - break - return errorIndication, errorStatus, errorIndex, varBinds - - def setCmd(self, authData, transportTarget, *varBinds, **kwargs): - if 'lookupNames' not in kwargs: - kwargs['lookupNames'] = False - if 'lookupValues' not in kwargs: - kwargs['lookupValues'] = False - errorIndication, errorStatus, errorIndex, rspVarBinds = None, 0, 0, [] - for (errorIndication, - errorStatus, - errorIndex, - rspVarBinds) in sync.setCmd(self.snmpEngine, authData, transportTarget, - ContextData(kwargs.get('contextEngineId'), - kwargs.get('contextName', null)), - *varBinds, - **kwargs): - break - - return errorIndication, errorStatus, errorIndex, rspVarBinds - - def nextCmd(self, authData, transportTarget, *varNames, **kwargs): - if 'lookupNames' not in kwargs: - kwargs['lookupNames'] = False - if 'lookupValues' not in kwargs: - kwargs['lookupValues'] = False - if 'lexicographicMode' not in kwargs: - kwargs['lexicographicMode'] = False - errorIndication, errorStatus, errorIndex = None, 0, 0 - varBindTable = [] - for (errorIndication, - errorStatus, - errorIndex, - varBinds) in sync.nextCmd(self.snmpEngine, authData, transportTarget, - ContextData(kwargs.get('contextEngineId'), - kwargs.get('contextName', null)), - *[(x, self._null) for x in varNames], - **kwargs): - if errorIndication or errorStatus: - return errorIndication, errorStatus, errorIndex, varBinds - - varBindTable.append(varBinds) - - return errorIndication, errorStatus, errorIndex, varBindTable - - def bulkCmd(self, authData, transportTarget, - nonRepeaters, maxRepetitions, *varNames, **kwargs): - if 'lookupNames' not in kwargs: - kwargs['lookupNames'] = False - if 'lookupValues' not in kwargs: - kwargs['lookupValues'] = False - if 'lexicographicMode' not in kwargs: - kwargs['lexicographicMode'] = False - errorIndication, errorStatus, errorIndex = None, 0, 0 - varBindTable = [] - for (errorIndication, - errorStatus, - errorIndex, - varBinds) in sync.bulkCmd(self.snmpEngine, authData, - transportTarget, - ContextData(kwargs.get('contextEngineId'), - kwargs.get('contextName', null)), - nonRepeaters, maxRepetitions, - *[(x, self._null) for x in varNames], - **kwargs): - if errorIndication or errorStatus: - return errorIndication, errorStatus, errorIndex, varBinds - - varBindTable.append(varBinds) - - return errorIndication, errorStatus, errorIndex, varBindTable diff --git a/pysnmp/entity/rfc3413/oneliner/ntforg.py b/pysnmp/entity/rfc3413/oneliner/ntforg.py deleted file mode 100644 index 8f24d84a..00000000 --- a/pysnmp/entity/rfc3413/oneliner/ntforg.py +++ /dev/null @@ -1,180 +0,0 @@ -# -# This file is part of pysnmp software. -# -# Copyright (c) 2005-2018, Ilya Etingof <etingof@gmail.com> -# License: http://snmplabs.com/pysnmp/license.html -# -# All code in this file belongs to obsolete, compatibility wrappers. -# Never use interfaces below for new applications! -# -from pysnmp.hlapi.asyncore import * -from pysnmp.hlapi.asyncore import sync -from pysnmp.hlapi.varbinds import * -from pysnmp.hlapi.lcd import * -from pyasn1.compat.octets import null -from pysnmp.entity import config -from pysnmp.entity.rfc3413 import context - -__all__ = ['AsynNotificationOriginator', - 'NotificationOriginator', - 'MibVariable'] - -MibVariable = ObjectIdentity - - -class ErrorIndicationReturn(object): - def __init__(self, *vars): - self.__vars = vars - - def __getitem__(self, i): - return self.__vars[i] - - def __nonzero__(self): - return bool(self) - - def __bool__(self): - return bool(self.__vars[0]) - - def __str__(self): - return str(self.__vars[0]) - - -class AsynNotificationOriginator(object): - vbProcessor = NotificationOriginatorVarBinds() - lcd = NotificationOriginatorLcdConfigurator() - - def __init__(self, snmpEngine=None, snmpContext=None): - if snmpEngine is None: - self.snmpEngine = snmpEngine = SnmpEngine() - else: - self.snmpEngine = snmpEngine - - if snmpContext is None: - self.snmpContext = context.SnmpContext(self.snmpEngine) - config.addContext( - self.snmpEngine, '' # this is leaky - ) - else: - self.snmpContext = snmpContext - - self.mibViewController = self.vbProcessor.getMibViewController(self.snmpEngine) - - def __del__(self): - self.uncfgNtfOrg() - - def cfgNtfOrg(self, authData, transportTarget, notifyType): - return self.lcd.configure( - self.snmpEngine, authData, transportTarget, notifyType - ) - - def uncfgNtfOrg(self, authData=None): - return self.lcd.unconfigure(self.snmpEngine, authData) - - def makeVarBinds(self, varBinds): - return self.vbProcessor.makeVarBinds( - self.snmpEngine, varBinds - ) - - def unmakeVarBinds(self, varBinds, lookupNames, lookupValues): - return self.vbProcessor.unmakeVarBinds( - self.snmpEngine, varBinds, lookupNames or lookupValues - ) - - def sendNotification(self, authData, transportTarget, - notifyType, notificationType, - varBinds=(), # legacy, use NotificationType instead - cbInfo=(None, None), - lookupNames=False, lookupValues=False, - contextEngineId=None, # XXX ordering incompatibility - contextName=null): - - def __cbFun(snmpEngine, sendRequestHandle, errorIndication, - errorStatus, errorIndex, varBinds, cbCtx): - cbFun, cbCtx = cbCtx - try: - # we need to pass response PDU information to user for INFORMs - return cbFun and cbFun( - sendRequestHandle, - errorIndication, - errorStatus, errorIndex, - varBinds, - cbCtx - ) - except TypeError: - # a backward compatible way of calling user function - return cbFun( - sendRequestHandle, - errorIndication, - cbCtx - ) - - # for backward compatibility - if contextName is null and authData.contextName: - contextName = authData.contextName - - if not isinstance(notificationType, - (ObjectIdentity, ObjectType, NotificationType)): - if isinstance(notificationType[0], tuple): - # legacy - notificationType = ObjectIdentity(notificationType[0][0], notificationType[0][1], *notificationType[1:]) - else: - notificationType = ObjectIdentity(notificationType) - - if not isinstance(notificationType, NotificationType): - notificationType = NotificationType(notificationType) - - return sendNotification( - self.snmpEngine, - authData, transportTarget, - ContextData(contextEngineId or self.snmpContext.contextEngineId, - contextName), - notifyType, notificationType.addVarBinds(*varBinds), - __cbFun, - cbInfo, - lookupNames or lookupValues - ) - - asyncSendNotification = sendNotification - - -class NotificationOriginator(object): - vbProcessor = NotificationOriginatorVarBinds() - - def __init__(self, snmpEngine=None, snmpContext=None, asynNtfOrg=None): - # compatibility attributes - self.snmpEngine = snmpEngine or SnmpEngine() - self.mibViewController = self.vbProcessor.getMibViewController(self.snmpEngine) - - # the varBinds parameter is legacy, use NotificationType instead - - def sendNotification(self, authData, transportTarget, notifyType, - notificationType, *varBinds, **kwargs): - if 'lookupNames' not in kwargs: - kwargs['lookupNames'] = False - if 'lookupValues' not in kwargs: - kwargs['lookupValues'] = False - if not isinstance(notificationType, - (ObjectIdentity, ObjectType, NotificationType)): - if isinstance(notificationType[0], tuple): - # legacy - notificationType = ObjectIdentity(notificationType[0][0], notificationType[0][1], *notificationType[1:]) - else: - notificationType = ObjectIdentity(notificationType) - - if not isinstance(notificationType, NotificationType): - notificationType = NotificationType(notificationType) - - for (errorIndication, - errorStatus, - errorIndex, - rspVarBinds) in sync.sendNotification(self.snmpEngine, authData, - transportTarget, - ContextData(kwargs.get('contextEngineId'), - kwargs.get('contextName', null)), - notifyType, - notificationType.addVarBinds(*varBinds), - **kwargs): - if notifyType == 'inform': - return errorIndication, errorStatus, errorIndex, rspVarBinds - else: - break |