summaryrefslogtreecommitdiff
path: root/pysnmp/hlapi/lcd.py
diff options
context:
space:
mode:
Diffstat (limited to 'pysnmp/hlapi/lcd.py')
-rw-r--r--pysnmp/hlapi/lcd.py259
1 files changed, 259 insertions, 0 deletions
diff --git a/pysnmp/hlapi/lcd.py b/pysnmp/hlapi/lcd.py
new file mode 100644
index 0000000..e62eaf1
--- /dev/null
+++ b/pysnmp/hlapi/lcd.py
@@ -0,0 +1,259 @@
+from pysnmp.entity import config
+from pysnmp import nextid, error
+from pysnmp.hlapi.auth import *
+
+__all__ = ['CommandGeneratorLcdConfigurator',
+ 'NotificationOriginatorLcdConfigurator' ]
+
+class AbstractLcdConfigurator:
+ nextID = nextid.Integer(0xffffffff)
+
+ def _getCache(self, snmpEngine):
+ cache = snmpEngine.getUserContext(self.__class__.__name__)
+ if cache is None:
+ cache = {
+ 'auth': {}, 'parm': {}, 'tran': {}, 'addr': {}
+ }
+ snmpEngine.setUserContext(cmdgen_cache=cache)
+ return cache
+
+ def configure(self, snmpEngine, authData, transportTarget): pass
+ def unconfigure(self, snmpEngine, authData=None): pass
+
+class CommandGeneratorLcdConfigurator(AbstractLcdConfigurator):
+ def configure(self, snmpEngine, authData, transportTarget):
+ cache = self._getCache(snmpEngine)
+ if isinstance(authData, CommunityData):
+ if authData.communityIndex not in cache['auth']:
+ config.addV1System(
+ snmpEngine,
+ authData.communityIndex,
+ authData.communityName,
+ authData.contextEngineId,
+ authData.contextName,
+ authData.tag,
+ authData.securityName
+ )
+ cache['auth'][authData.communityIndex] = authData
+ elif isinstance(authData, UsmUserData):
+ authDataKey = authData.userName, authData.securityEngineId
+ if authDataKey not in cache['auth']:
+ config.addV3User(
+ snmpEngine,
+ authData.userName,
+ authData.authProtocol, authData.authKey,
+ authData.privProtocol, authData.privKey,
+ authData.securityEngineId,
+ securityName=authData.securityName
+ )
+ cache['auth'][authDataKey] = authData
+ else:
+ raise error.PySnmpError('Unsupported authentication object')
+
+ paramsKey = authData.securityName, \
+ authData.securityLevel, \
+ authData.mpModel
+ if paramsKey in cache['parm']:
+ paramsName, useCount = cache['parm'][paramsKey]
+ cache['parm'][paramsKey] = paramsName, useCount + 1
+ else:
+ paramsName = 'p%s' % self.nextID()
+ config.addTargetParams(
+ snmpEngine, paramsName,
+ authData.securityName, authData.securityLevel, authData.mpModel
+ )
+ cache['parm'][paramsKey] = paramsName, 1
+
+ if transportTarget.transportDomain in cache['tran']:
+ transport, useCount = cache['tran'][transportTarget.transportDomain]
+ transportTarget.verifyDispatcherCompatibility(snmpEngine)
+ cache['tran'][transportTarget.transportDomain] = transport, useCount + 1
+ elif config.getTransport(snmpEngine, transportTarget.transportDomain):
+ transportTarget.verifyDispatcherCompatibility(snmpEngine)
+ else:
+ transport = transportTarget.openClientMode()
+ config.addTransport(
+ snmpEngine,
+ transportTarget.transportDomain,
+ transport
+ )
+ cache['tran'][transportTarget.transportDomain] = transport, 1
+
+ transportKey = ( paramsName,
+ transportTarget.transportDomain,
+ transportTarget.transportAddr,
+ transportTarget.tagList )
+
+ if transportKey in cache['addr']:
+ addrName, useCount = cache['addr'][transportKey]
+ cache['addr'][transportKey] = addrName, useCount + 1
+ else:
+ addrName = 'a%s' % self.nextID()
+ config.addTargetAddr(
+ snmpEngine, addrName,
+ transportTarget.transportDomain,
+ transportTarget.transportAddr,
+ paramsName,
+ transportTarget.timeout * 100,
+ transportTarget.retries,
+ transportTarget.tagList
+ )
+ cache['addr'][transportKey] = addrName, 1
+
+ return addrName, paramsName
+
+ def unconfigure(self, snmpEngine, authData=None):
+ cache = _getCache(snmpEngine)
+ if authData:
+ if isinstance(authData, CommunityData):
+ authDataKey = authData.communityIndex
+ elif isinstance(authData, UsmUserData):
+ authDataKey = authData.userName, authData.securityEngineId
+ else:
+ raise error.PySnmpError('Unsupported authentication object')
+ if authDataKey in cache['auth']:
+ authDataKeys = ( authDataKey, )
+ else:
+ raise error.PySnmpError('Unknown authData %s' % (authData,))
+ else:
+ authDataKeys = list(cache['auth'].keys())
+
+ addrNames, paramsNames = set(), set()
+
+ for authDataKey in authDataKeys:
+ authDataX = cache['auth'][authDataKey]
+ del cache['auth'][authDataKey]
+ if isinstance(authDataX, CommunityData):
+ config.delV1System(
+ snmpEngine,
+ authDataX.communityIndex
+ )
+ elif isinstance(authDataX, UsmUserData):
+ config.delV3User(
+ snmpEngine,
+ authDataX.userName,
+ authDataX.securityEngineId
+ )
+ else:
+ raise error.PySnmpError('Unsupported authentication object')
+
+ paramsKey = authDataX.securityName, \
+ authDataX.securityLevel, \
+ authDataX.mpModel
+ if paramsKey in cache['parm']:
+ paramsName, useCount = cache['parm'][paramsKey]
+ useCount -= 1
+ if useCount:
+ cache['parm'][paramsKey] = paramsName, useCount
+ else:
+ del cache['parm'][paramsKey]
+ config.delTargetParams(
+ snmpEngine, paramsName
+ )
+ paramsNames.add(paramsName)
+ else:
+ raise error.PySnmpError('Unknown target %s' % (paramsKey,))
+
+ addrKeys = [ x for x in cache['addr'] if x[0] == paramsName ]
+
+ for addrKey in addrKeys:
+ addrName, useCount = cache['addr'][addrKey]
+ useCount -= 1
+ if useCount:
+ cache['addr'][addrKey] = addrName, useCount
+ else:
+ config.delTargetAddr(snmpEngine, addrName)
+
+ addrNames.add(addrKey)
+
+ if addrKey[1] in cache['tran']:
+ transport, useCount = cache['tran'][addrKey[1]]
+ if useCount > 1:
+ useCount -= 1
+ cache['tran'][addrKey[1]] = transport, useCount
+ else:
+ config.delTransport(snmpEngine, addrKey[1])
+ transport.closeTransport()
+ del cache['tran'][addrKey[1]]
+
+ return addrNames, paramsNames
+
+class NotificationOriginatorLcdConfigurator(CommandGeneratorLcdConfigurator):
+ def configure(self, snmpEngine, authData, transportTarget, notifyType):
+ cache = self._getCache(snmpEngine)
+ addrName, paramsName = CommandGeneratorLcdConfigurator.configure(self, snmpEngine, authData, transportTarget)
+ tagList = transportTarget.tagList.split()
+ if not tagList:
+ tagList = ['']
+ for tag in tagList:
+ notifyNameKey = paramsName, tag, notifyType
+ if notifyNameKey in cache['name']:
+ notifyName, paramsName, useCount = cache['name'][notifyNameKey]
+ cache['name'][notifyNameKey] = notifyName, paramsName, useCount + 1
+ else:
+ notifyName = 'n%s' % self.nextID()
+ config.addNotificationTarget(
+ snmpEngine,
+ notifyName,
+ paramsName,
+ tag,
+ notifyType
+ )
+ cache['name'][notifyNameKey] = notifyName, paramsName, 1
+ authDataKey = authData.securityName, authData.securityModel
+ if authDataKey in cache['auth']:
+ authDataX, subTree, useCount = cache['auth'][authDataKey]
+ cache['auth'][authDataKey] = authDataX, subTree, useCount + 1
+ else:
+ subTree = (1,3,6)
+ config.addTrapUser(
+ snmpEngine,
+ authData.securityModel,
+ authData.securityName,
+ authData.securityLevel,
+ subTree
+ )
+ cache['auth'][authDataKey] = authData, subTree, 1
+
+ return notifyName
+
+ def unconfigure(self, snmpEngine, authData=None):
+ cache = _getCache(snmpEngine)
+ if authData:
+ authDataKey = authData.securityName, authData.securityModel
+ if authDataKey in cache['auth']:
+ authDataKeys = ( authDataKey, )
+ else:
+ raise error.PySnmpError('Unknown authData %s' % (authData,))
+ else:
+ authDataKeys = tuple(cache['auth'].keys())
+
+ addrNames, paramsNames = CommandGeneratorLcdConfigurator.unconfigure(self, snmpEngine, authData)
+
+ notifyAndParamsNames = [ (cache['name'][x], x) for x in cache['name'].keys() if x[0] in paramsNames ]
+
+ for (notifyName, paramsName, useCount), notifyNameKey in notifyAndParamsNames:
+ useCount -= 1
+ if useCount:
+ cache['name'][notifyNameKey] = notifyName, paramsName, useCount
+ else:
+ config.delNotificationTarget(
+ snmpEngine, notifyName, paramsName
+ )
+ del cache['name'][notifyNameKey]
+
+ for authDataKey in authDataKeys:
+ authDataX, subTree, useCount = cache['auth'][authDataKey]
+ useCount -= 1
+ if useCount:
+ cache['auth'][authDataKey] = authDataX, subTree, useCount
+ else:
+ config.delTrapUser(
+ snmpEngine,
+ authDataX.securityModel,
+ authDataX.securityName,
+ authDataX.securityLevel,
+ subTree
+ )
+ del cache['auth'][authDataKey]
+