summaryrefslogtreecommitdiff
path: root/pysnmp/entity
diff options
context:
space:
mode:
authorelie <elie>2015-09-27 15:46:12 +0000
committerelie <elie>2015-09-27 15:46:12 +0000
commitd044736ce1ba79e0ec674d4bf814edd843545b8a (patch)
tree23b4df94da24b7878d936d8e2316f8ed39b247e1 /pysnmp/entity
parent120a39ce88723792fef654451c7135461c152b9d (diff)
downloadpysnmp-git-d044736ce1ba79e0ec674d4bf814edd843545b8a.tar.gz
missing compatibility modules revived
Diffstat (limited to 'pysnmp/entity')
-rw-r--r--pysnmp/entity/rfc3413/oneliner/__init__.py1
-rw-r--r--pysnmp/entity/rfc3413/oneliner/cmdgen.py249
-rw-r--r--pysnmp/entity/rfc3413/oneliner/ntforg.py155
3 files changed, 405 insertions, 0 deletions
diff --git a/pysnmp/entity/rfc3413/oneliner/__init__.py b/pysnmp/entity/rfc3413/oneliner/__init__.py
new file mode 100644
index 00000000..8c3066b2
--- /dev/null
+++ b/pysnmp/entity/rfc3413/oneliner/__init__.py
@@ -0,0 +1 @@
+# This file is necessary to make this directory a package.
diff --git a/pysnmp/entity/rfc3413/oneliner/cmdgen.py b/pysnmp/entity/rfc3413/oneliner/cmdgen.py
new file mode 100644
index 00000000..2736d28f
--- /dev/null
+++ b/pysnmp/entity/rfc3413/oneliner/cmdgen.py
@@ -0,0 +1,249 @@
+#
+# All code in this file belongs to obsolete, compatibility wrappers.
+# Never use interfaces below for new applications!
+#
+from pysnmp.hlapi.asyncore import *
+from pysnmp.hlapi.asyncore import sync
+from pysnmp.hlapi.varbinds import *
+from pysnmp.hlapi.lcd import *
+from pyasn1.compat.octets import null
+from pysnmp.entity import config
+from pyasn1.type import univ
+
+__all__ = ['AsynCommandGenerator', 'CommandGenerator', 'MibVariable']
+
+MibVariable = ObjectIdentity
+
+class AsynCommandGenerator:
+ _null = univ.Null('')
+
+ vbProcessor = CommandGeneratorVarBinds()
+ lcd = CommandGeneratorLcdConfigurator()
+
+ def __init__(self, snmpEngine=None):
+ if snmpEngine is None:
+ self.snmpEngine = snmpEngine = SnmpEngine()
+ else:
+ self.snmpEngine = snmpEngine
+
+ self.mibViewController = self.vbProcessor.getMibViewController(self.snmpEngine)
+
+ def __del__(self):
+ self.lcd.unconfigure(self.snmpEngine)
+
+ def cfgCmdGen(self, authData, transportTarget):
+ return self.lcd.configure(
+ self.snmpEngine, authData, transportTarget
+ )
+
+ def uncfgCmdGen(self, authData=None):
+ return self.lcd.unconfigure(
+ self.snmpEngine, authData
+ )
+
+ # compatibility stub
+ def makeReadVarBinds(self, varNames):
+ return self.makeVarBinds(
+ [ (x, self._null) for x in varNames ]
+ )
+
+ def makeVarBinds(self, varBinds):
+ return self.vbProcessor.makeVarBinds(
+ self.snmpEngine, varBinds
+ )
+
+ def unmakeVarBinds(self, varBinds, lookupNames, lookupValues):
+ return self.vbProcessor.unmakeVarBinds(
+ self.snmpEngine, varBinds, lookupNames or lookupValues
+ )
+
+ def getCmd(self, authData, transportTarget, varNames, cbInfo,
+ lookupNames=False, lookupValues=False,
+ contextEngineId=None, contextName=null):
+
+ def __cbFun(snmpEngine, sendRequestHandle,
+ errorIndication, errorStatus, errorIndex,
+ varBindTable, cbInfo):
+ cbFun, cbCtx = cbInfo
+ cbFun(sendRequestHandle,
+ errorIndication, errorStatus, errorIndex,
+ varBindTable, cbCtx)
+
+ # for backward compatibility
+ if contextName is null and authData.contextName:
+ contextName = authData.contextName
+
+ return getCmd(
+ self.snmpEngine,
+ authData, transportTarget,
+ ContextData(contextEngineId, contextName),
+ *[(x, self._null) for x in varNames],
+ **dict(cbFun=__cbFun, cbCtx=cbInfo,
+ lookupMib=lookupNames or lookupValues)
+ )
+
+ asyncGetCmd = getCmd
+
+ def setCmd(self, authData, transportTarget, varBinds, cbInfo,
+ lookupNames=False, lookupValues=False,
+ contextEngineId=None, contextName=null):
+
+ def __cbFun(snmpEngine, sendRequestHandle,
+ errorIndication, errorStatus, errorIndex,
+ varBindTable, cbInfo):
+ cbFun, cbCtx = cbInfo
+ cbFun(sendRequestHandle,
+ errorIndication, errorStatus, errorIndex,
+ varBindTable, cbCtx)
+
+ # for backward compatibility
+ if contextName is null and authData.contextName:
+ contextName = authData.contextName
+
+
+ return setCmd(
+ self.snmpEngine,
+ authData, transportTarget,
+ ContextData(contextEngineId, contextName),
+ *varBinds,
+ **dict(cbFun=__cbFun, cbCtx=cbInfo,
+ lookupMib=lookupNames or lookupValues)
+ )
+
+ asyncSetCmd = setCmd
+
+ def nextCmd(self, authData, transportTarget, varNames, cbInfo,
+ lookupNames=False, lookupValues=False,
+ contextEngineId=None, contextName=null):
+
+ def __cbFun(snmpEngine, sendRequestHandle,
+ errorIndication, errorStatus, errorIndex,
+ varBindTable, cbInfo):
+ cbFun, cbCtx = cbInfo
+ return cbFun(sendRequestHandle,
+ errorIndication, errorStatus, errorIndex,
+ varBindTable, cbCtx)
+
+ # for backward compatibility
+ if contextName is null and authData.contextName:
+ contextName = authData.contextName
+
+ return nextCmd(
+ self.snmpEngine,
+ authData, transportTarget,
+ ContextData(contextEngineId, contextName),
+ *[(x, self._null) for x in varNames],
+ **dict(cbFun=__cbFun, cbCtx=cbInfo,
+ lookupMib=lookupNames or lookupValues)
+ )
+
+ asyncNextCmd = nextCmd
+
+ def bulkCmd(self, authData, transportTarget,
+ nonRepeaters, maxRepetitions, varNames, cbInfo,
+ lookupNames=False, lookupValues=False,
+ contextEngineId=None, contextName=null):
+
+ def __cbFun(snmpEngine, sendRequestHandle,
+ errorIndication, errorStatus, errorIndex,
+ varBindTable, cbInfo):
+ cbFun, cbCtx = cbInfo
+ return cbFun(sendRequestHandle,
+ errorIndication, errorStatus, errorIndex,
+ varBindTable, cbCtx)
+
+ # for backward compatibility
+ if contextName is null and authData.contextName:
+ contextName = authData.contextName
+
+ return bulkCmd(
+ self.snmpEngine,
+ authData, transportTarget,
+ ContextData(contextEngineId, contextName),
+ nonRepeaters, maxRepetitions,
+ *[(x, self._null) for x in varNames],
+ **dict(cbFun=__cbFun, cbCtx=cbInfo,
+ lookupMib=lookupNames or lookupValues)
+ )
+
+ asyncBulkCmd = bulkCmd
+
+class CommandGenerator:
+ _null = univ.Null('')
+ def __init__(self, snmpEngine=None, asynCmdGen=None):
+ # compatibility attributes
+ self.snmpEngine = snmpEngine or SnmpEngine()
+
+ def getCmd(self, authData, transportTarget, *varNames, **kwargs):
+ if 'lookupNames' not in kwargs:
+ kwargs['lookupNames'] = False
+ if 'lookupValues' not in kwargs:
+ kwargs['lookupValues'] = False
+ for x in sync.getCmd(self.snmpEngine, authData, transportTarget,
+ ContextData(kwargs.get('contextEngineId'),
+ kwargs.get('contextName', null)),
+ *[ (x, self._null) for x in varNames ],
+ **kwargs):
+ return x
+
+ def setCmd(self, authData, transportTarget, *varBinds, **kwargs):
+ if 'lookupNames' not in kwargs:
+ kwargs['lookupNames'] = False
+ if 'lookupValues' not in kwargs:
+ kwargs['lookupValues'] = False
+ for x in sync.setCmd(self.snmpEngine, authData, transportTarget,
+ ContextData(kwargs.get('contextEngineId'),
+ kwargs.get('contextName', null)),
+ *varBinds,
+ **kwargs):
+ return x
+
+ def nextCmd(self, authData, transportTarget, *varNames, **kwargs):
+ if 'lookupNames' not in kwargs:
+ kwargs['lookupNames'] = False
+ if 'lookupValues' not in kwargs:
+ kwargs['lookupValues'] = False
+ if 'lexicographicMode' not in kwargs:
+ kwargs['lexicographicMode'] = False
+ varBindTable = []
+ for errorIndication, \
+ errorStatus, errorIndex, \
+ varBinds \
+ in sync.nextCmd(self.snmpEngine, authData, transportTarget,
+ ContextData(kwargs.get('contextEngineId'),
+ kwargs.get('contextName', null)),
+ *[ (x, self._null) for x in varNames ],
+ **kwargs):
+ if errorIndication or errorStatus:
+ return errorIndication, errorStatus, errorIndex, varBinds
+
+ varBindTable.append(varBinds)
+
+ return errorIndication, errorStatus, errorIndex, varBindTable
+
+ def bulkCmd(self, authData, transportTarget,
+ nonRepeaters, maxRepetitions, *varNames, **kwargs):
+ if 'lookupNames' not in kwargs:
+ kwargs['lookupNames'] = False
+ if 'lookupValues' not in kwargs:
+ kwargs['lookupValues'] = False
+ if 'lexicographicMode' not in kwargs:
+ kwargs['lexicographicMode'] = False
+ varBindTable = []
+ for errorIndication, \
+ errorStatus, errorIndex, \
+ varBinds \
+ in sync.bulkCmd(self.snmpEngine, authData,
+ transportTarget,
+ ContextData(kwargs.get('contextEngineId'),
+ kwargs.get('contextName', null)),
+ nonRepeaters, maxRepetitions,
+ *[ (x, self._null) for x in varNames ],
+ **kwargs):
+ if errorIndication or errorStatus:
+ return errorIndication, errorStatus, errorIndex, varBinds
+
+ varBindTable.append(varBinds)
+
+ return errorIndication, errorStatus, errorIndex, varBindTable
+
diff --git a/pysnmp/entity/rfc3413/oneliner/ntforg.py b/pysnmp/entity/rfc3413/oneliner/ntforg.py
new file mode 100644
index 00000000..1d733dac
--- /dev/null
+++ b/pysnmp/entity/rfc3413/oneliner/ntforg.py
@@ -0,0 +1,155 @@
+#
+# All code in this file belongs to obsolete, compatibility wrappers.
+# Never use interfaces below for new applications!
+#
+from pysnmp.hlapi.asyncore import *
+from pysnmp.hlapi.asyncore import sync
+from pysnmp.hlapi.varbinds import *
+from pysnmp.hlapi.lcd import *
+from pyasn1.compat.octets import null
+from pysnmp.entity import config
+from pysnmp.entity.rfc3413 import context
+
+__all__ = ['AsynNotificationOriginator', 'NotificationOriginator',
+ 'MibVariable']
+
+MibVariable = ObjectIdentity
+
+class ErrorIndicationReturn:
+ def __init__(self, *vars): self.__vars = vars
+ def __getitem__(self, i): return self.__vars[i]
+ def __nonzero__(self): return self.__vars[0] and 1 or 0
+ def __bool__(self): return bool(self.__vars[0])
+ def __str__(self): return str(self.__vars[0])
+
+class AsynNotificationOriginator:
+ vbProcessor = NotificationOriginatorVarBinds()
+ lcd = NotificationOriginatorLcdConfigurator()
+ def __init__(self, snmpEngine=None, snmpContext=None):
+ if snmpEngine is None:
+ self.snmpEngine = snmpEngine = SnmpEngine()
+ else:
+ self.snmpEngine = snmpEngine
+
+ if snmpContext is None:
+ self.snmpContext = context.SnmpContext(self.snmpEngine)
+ config.addContext(
+ self.snmpEngine, '' # this is leaky
+ )
+ else:
+ self.snmpContext = snmpContext
+
+ self.mibViewController = self.vbProcessor.getMibViewController(self.snmpEngine)
+
+ def __del__(self): self.uncfgNtfOrg()
+
+ def cfgNtfOrg(self, authData, transportTarget, notifyType):
+ return self.lcd.configure(
+ self.snmpEngine, authData, transportTarget, notifyType
+ )
+
+ def uncfgNtfOrg(self, authData=None):
+ return self.lcd.unconfigure(self.snmpEngine, authData)
+
+ def makeVarBinds(self, varBinds):
+ return self.vbProcessor.makeVarBinds(
+ self.snmpEngine, varBinds
+ )
+
+ def unmakeVarBinds(self, varBinds, lookupNames, lookupValues):
+ return self.vbProcessor.unmakeVarBinds(
+ self.snmpEngine, varBinds, lookupNames or lookupValues
+ )
+
+ def sendNotification(self, authData, transportTarget,
+ notifyType, notificationType,
+ varBinds=(), # legacy, use NotificationType instead
+ cbInfo=(None, None),
+ lookupNames=False, lookupValues=False,
+ contextEngineId=None, # XXX ordering incompatibility
+ contextName=null):
+
+ def __cbFun(snmpEngine, sendRequestHandle, errorIndication,
+ errorStatus, errorIndex, varBinds, cbCtx):
+ cbFun, cbCtx = cbCtx
+ try:
+ # we need to pass response PDU information to user for INFORMs
+ return cbFun and cbFun(
+ sendRequestHandle,
+ errorIndication,
+ errorStatus, errorIndex,
+ varBinds,
+ cbCtx
+ )
+ except TypeError:
+ # a backward compatible way of calling user function
+ return cbFun(
+ sendRequestHandle,
+ errorIndication,
+ cbCtx
+ )
+
+ # for backward compatibility
+ if contextName is null and authData.contextName:
+ contextName = authData.contextName
+
+ if not isinstance(notificationType,
+ (ObjectIdentity, ObjectType, NotificationType)):
+ if isinstance(notificationType[0], tuple):
+ # legacy
+ notificationType = ObjectIdentity(notificationType[0][0], notificationType[0][1], *notificationType[1:])
+ else:
+ notificationType = ObjectIdentity(notificationType)
+
+ if not isinstance(notificationType, NotificationType):
+ notificationType = NotificationType(notificationType)
+
+ return sendNotification(
+ self.snmpEngine,
+ authData, transportTarget,
+ ContextData(contextEngineId or self.snmpContext.contextEngineId,
+ contextName),
+ notifyType, notificationType.addVarBinds(*varBinds),
+ __cbFun,
+ cbInfo,
+ lookupNames or lookupValues
+ )
+
+ asyncSendNotification = sendNotification
+
+class NotificationOriginator:
+ vbProcessor = NotificationOriginatorVarBinds()
+ def __init__(self, snmpEngine=None, snmpContext=None, asynNtfOrg=None):
+ # compatibility attributes
+ self.snmpEngine = snmpEngine or SnmpEngine()
+ self.mibViewController = self.vbProcessor.getMibViewController(self.snmpEngine)
+
+ # the varBinds parameter is legacy, use NotificationType instead
+
+ def sendNotification(self, authData, transportTarget, notifyType,
+ notificationType, *varBinds, **kwargs):
+ if 'lookupNames' not in kwargs:
+ kwargs['lookupNames'] = False
+ if 'lookupValues' not in kwargs:
+ kwargs['lookupValues'] = False
+ if not isinstance(notificationType,
+ (ObjectIdentity, ObjectType, NotificationType)):
+ if isinstance(notificationType[0], tuple):
+ # legacy
+ notificationType = ObjectIdentity(notificationType[0][0], notificationType[0][1], *notificationType[1:])
+ else:
+ notificationType = ObjectIdentity(notificationType)
+
+ if not isinstance(notificationType, NotificationType):
+ notificationType = NotificationType(notificationType)
+ for x in sync.sendNotification(self.snmpEngine, authData,
+ transportTarget,
+ ContextData(kwargs.get('contextEngineId'),
+ kwargs.get('contextName', null)),
+ notifyType,
+ notificationType.addVarBinds(*varBinds),
+ **kwargs):
+ if notifyType == 'inform':
+ return x
+ else:
+ return