From c54a3f6dc8ee433a55001e1cd97f6801bd6e52b7 Mon Sep 17 00:00:00 2001 From: Ilya Etingof Date: Sat, 5 Nov 2016 22:59:31 +0100 Subject: WIP: gracefully shutdown asyncio dispatcher --- TODO.txt | 12 ++++ .../hlapi/asyncio/agent/ntforg/default-v1-trap.py | 3 +- .../agent/ntforg/multiple-notifications-at-once.py | 5 ++ .../hlapi/asyncio/manager/cmdgen/getbulk-to-eom.py | 18 +++--- .../manager/cmdgen/multiple-concurrent-queries.py | 5 ++ .../manager/cmdgen/multiple-sequential-queries.py | 6 +- pysnmp/carrier/asyncio/dispatch.py | 35 ++++++------ pysnmp/hlapi/asyncio/cmdgen.py | 17 +++++- pysnmp/hlapi/asyncio/ntforg.py | 17 +++++- pysnmp/hlapi/lcd.py | 65 +++++++++++----------- 10 files changed, 116 insertions(+), 67 deletions(-) diff --git a/TODO.txt b/TODO.txt index c7a4fae3..ce68a2b4 100644 --- a/TODO.txt +++ b/TODO.txt @@ -58,3 +58,15 @@ Sparse notes on major existing problems/plans * add RowStatus checks when reading MIB tables (LCD) * Disallow empty SET value reaching scalar MIB object + +---- + +- Make pyasn1 classes calling .super() for MI to work +- Release pyasn1 and make pysnmp dependent on it +- Make all pysnmp classes new-style +- Make pysnmp's TextualConvention leading MI +- Fix pysmi to put TextualConvention at the first position in MI +- Make SNMP NULL objects singletones +- Allow MIB resolution to fail gracefully on constraints violation +- add AES192/256 examples + snmpsim +- docstrings for USM protocol constants not rendered in sphinx docs diff --git a/examples/hlapi/asyncio/agent/ntforg/default-v1-trap.py b/examples/hlapi/asyncio/agent/ntforg/default-v1-trap.py index 92d3e70e..94926f55 100644 --- a/examples/hlapi/asyncio/agent/ntforg/default-v1-trap.py +++ b/examples/hlapi/asyncio/agent/ntforg/default-v1-trap.py @@ -27,6 +27,7 @@ from pysnmp.hlapi.asyncio import * @asyncio.coroutine def run(): snmpEngine = SnmpEngine() + errorIndication, errorStatus, errorIndex, varBinds = yield from sendNotification( snmpEngine, CommunityData('public', mpModel=0), @@ -44,7 +45,7 @@ def run(): if errorIndication: print(errorIndication) - snmpEngine.transportDispatcher.closeDispatcher() + yield from unconfigureNtfOrg(snmpEngine) asyncio.get_event_loop().run_until_complete(run()) diff --git a/examples/hlapi/asyncio/agent/ntforg/multiple-notifications-at-once.py b/examples/hlapi/asyncio/agent/ntforg/multiple-notifications-at-once.py index 027d3ac6..5345e98b 100644 --- a/examples/hlapi/asyncio/agent/ntforg/multiple-notifications-at-once.py +++ b/examples/hlapi/asyncio/agent/ntforg/multiple-notifications-at-once.py @@ -56,7 +56,12 @@ def sendone(snmpEngine, hostname, notifyType): snmpEngine = SnmpEngine() loop = asyncio.get_event_loop() + +# send notifications concurrently loop.run_until_complete( asyncio.wait([sendone(snmpEngine, 'demo.snmplabs.com', 'trap'), sendone(snmpEngine, 'demo.snmplabs.com', 'inform')]) ) + +# this will cancel internal timer +loop.run_until_complete(unconfigureNtfOrg(snmpEngine)) \ No newline at end of file diff --git a/examples/hlapi/asyncio/manager/cmdgen/getbulk-to-eom.py b/examples/hlapi/asyncio/manager/cmdgen/getbulk-to-eom.py index b45057e3..e5f493c0 100644 --- a/examples/hlapi/asyncio/manager/cmdgen/getbulk-to-eom.py +++ b/examples/hlapi/asyncio/manager/cmdgen/getbulk-to-eom.py @@ -22,11 +22,9 @@ from pysnmp.hlapi.asyncio import * @asyncio.coroutine -def run(varBinds): - snmpEngine = SnmpEngine() +def run(snmpEngine, varBinds): while True: - errorIndication, errorStatus, errorIndex, \ - varBindTable = yield from bulkCmd( + errorIndication, errorStatus, errorIndex, varBindTable = yield from bulkCmd( snmpEngine, UsmUserData('usr-none-none'), UdpTransportTarget(('demo.snmplabs.com', 161)), @@ -38,11 +36,8 @@ def run(varBinds): print(errorIndication) break elif errorStatus: - print('%s at %s' % ( - errorStatus.prettyPrint(), - errorIndex and varBinds[int(errorIndex) - 1][0] or '?' - ) - ) + print('%s at %s' % (errorStatus.prettyPrint(), + errorIndex and varBinds[int(errorIndex) - 1][0] or '?')) else: for varBindRow in varBindTable: for varBind in varBindRow: @@ -52,10 +47,11 @@ def run(varBinds): if isEndOfMib(varBinds): break - snmpEngine.transportDispatcher.closeDispatcher() + yield from unconfigureCmdGen(snmpEngine) +snmpEngine = SnmpEngine() loop = asyncio.get_event_loop() loop.run_until_complete( - run([ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysDescr'))]) + run(snmpEngine, [ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysDescr'))]) ) diff --git a/examples/hlapi/asyncio/manager/cmdgen/multiple-concurrent-queries.py b/examples/hlapi/asyncio/manager/cmdgen/multiple-concurrent-queries.py index ad9c441b..b9de2660 100644 --- a/examples/hlapi/asyncio/manager/cmdgen/multiple-concurrent-queries.py +++ b/examples/hlapi/asyncio/manager/cmdgen/multiple-concurrent-queries.py @@ -47,8 +47,13 @@ def getone(snmpEngine, hostname): snmpEngine = SnmpEngine() loop = asyncio.get_event_loop() + +# run parallel queries loop.run_until_complete( asyncio.wait([getone(snmpEngine, ('demo.snmplabs.com', 1161)), getone(snmpEngine, ('demo.snmplabs.com', 2161)), getone(snmpEngine, ('demo.snmplabs.com', 3161))]) ) + +# unconfigure SNMP engine +loop.run_until_complete(unconfigureCmdGen(snmpEngine)) diff --git a/examples/hlapi/asyncio/manager/cmdgen/multiple-sequential-queries.py b/examples/hlapi/asyncio/manager/cmdgen/multiple-sequential-queries.py index 4a43bc49..b9416366 100644 --- a/examples/hlapi/asyncio/manager/cmdgen/multiple-sequential-queries.py +++ b/examples/hlapi/asyncio/manager/cmdgen/multiple-sequential-queries.py @@ -37,8 +37,8 @@ def getone(snmpEngine, hostname): print('%s at %s' % ( errorStatus.prettyPrint(), errorIndex and varBinds[int(errorIndex) - 1][0] or '?' + ) ) - ) else: for varBind in varBinds: print(' = '.join([x.prettyPrint() for x in varBind])) @@ -48,11 +48,11 @@ def getone(snmpEngine, hostname): def getall(snmpEngine, hostnames): for hostname in hostnames: yield from getone(snmpEngine, hostname) - + yield from unconfigureCmdGen(snmpEngine) snmpEngine = SnmpEngine() loop = asyncio.get_event_loop() loop.run_until_complete(getall(snmpEngine, [('demo.snmplabs.com', 1161), ('demo.snmplabs.com', 2161), - ('demo.snmplabs.com', 3161)])) + ('demo.snmplabs.com', 3161)])) \ No newline at end of file diff --git a/pysnmp/carrier/asyncio/dispatch.py b/pysnmp/carrier/asyncio/dispatch.py index cb4c0941..4227d100 100644 --- a/pysnmp/carrier/asyncio/dispatch.py +++ b/pysnmp/carrier/asyncio/dispatch.py @@ -51,13 +51,14 @@ class AsyncioDispatcher(AbstractTransportDispatcher): self.__transportCount = 0 if 'timeout' in kwargs: self.setTimerResolution(kwargs['timeout']) - self.loopingcall = None + self._futureTimer = None @asyncio.coroutine - def handle_timeout(self): - while True: - yield asyncio.From(asyncio.sleep(self.getTimerResolution())) - self.handleTimerTick(loop.time()) + def fireTimer(self): + yield asyncio.From(asyncio.sleep(self.getTimerResolution())) + self.handleTimerTick(loop.time()) + if self._futureTimer: + self._futureTimer = asyncio.async(self.fireTimer()) def runDispatcher(self, timeout=0.0): if not loop.is_running(): @@ -69,8 +70,8 @@ class AsyncioDispatcher(AbstractTransportDispatcher): raise PySnmpError(';'.join(traceback.format_exception(*sys.exc_info()))) def registerTransport(self, tDomain, transport): - if self.loopingcall is None and self.getTimerResolution() > 0: - self.loopingcall = asyncio.async(self.handle_timeout()) + if not self._futureTimer and self.getTimerResolution() > 0: + self._futureTimer = asyncio.async(self.fireTimer()) AbstractTransportDispatcher.registerTransport( self, tDomain, transport ) @@ -83,18 +84,20 @@ class AsyncioDispatcher(AbstractTransportDispatcher): self.__transportCount -= 1 # The last transport has been removed, stop the timeout - if self.__transportCount == 0 and not self.loopingcall.done(): - self.loopingcall.cancel() - self.loopingcall = None + if self.__transportCount == 0: + if self._futureTimer: + self._futureTimer.cancel() + self._futureTimer = None # Trollius or Tulip? if not hasattr(asyncio, "From"): - exec ("""\ + exec("""\ @asyncio.coroutine -def handle_timeout(self): - while True: - yield from asyncio.sleep(self.getTimerResolution()) - self.handleTimerTick(loop.time()) -AsyncioDispatcher.handle_timeout = handle_timeout\ +def fireTimer(self): + yield from asyncio.sleep(self.getTimerResolution()) + self.handleTimerTick(loop.time()) + if self._futureTimer: + self._futureTimer = asyncio.async(self.fireTimer()) +AsyncioDispatcher.fireTimer = fireTimer\ """) diff --git a/pysnmp/hlapi/asyncio/cmdgen.py b/pysnmp/hlapi/asyncio/cmdgen.py index d9a8d4d6..ea65961f 100644 --- a/pysnmp/hlapi/asyncio/cmdgen.py +++ b/pysnmp/hlapi/asyncio/cmdgen.py @@ -44,7 +44,7 @@ try: except ImportError: import trollius as asyncio -__all__ = ['getCmd', 'nextCmd', 'setCmd', 'bulkCmd', 'isEndOfMib'] +__all__ = ['getCmd', 'nextCmd', 'setCmd', 'bulkCmd', 'isEndOfMib', 'unconfigureCmdGen'] vbProcessor = CommandGeneratorVarBinds() lcd = CommandGeneratorLcdConfigurator() @@ -52,6 +52,21 @@ lcd = CommandGeneratorLcdConfigurator() isEndOfMib = lambda x: not cmdgen.getNextVarBinds(x)[1] +@asyncio.coroutine +def unconfigureCmdGen(snmpEngine, authData=None): + """Remove LCD configuration entry. + + If `authData` is not given, all currently configured LCD entries will be + removed. + + Note + ---- + Configuration entry removal may have a side effect of removing unused transport + and shutting down unused transport dispatcher. + """ + lcd.unconfigure(snmpEngine, authData) + + @asyncio.coroutine def getCmd(snmpEngine, authData, transportTarget, contextData, *varBinds, **options): diff --git a/pysnmp/hlapi/asyncio/ntforg.py b/pysnmp/hlapi/asyncio/ntforg.py index 13ec63bd..0541684e 100644 --- a/pysnmp/hlapi/asyncio/ntforg.py +++ b/pysnmp/hlapi/asyncio/ntforg.py @@ -21,12 +21,27 @@ try: except ImportError: import trollius as asyncio -__all__ = ['sendNotification'] +__all__ = ['sendNotification', 'unconfigureNtfOrg'] vbProcessor = NotificationOriginatorVarBinds() lcd = NotificationOriginatorLcdConfigurator() +@asyncio.coroutine +def unconfigureNtfOrg(snmpEngine, authData=None): + """Remove LCD configuration entry. + + If `authData` is not given, all currently configured LCD entries will be + removed. + + Note + ---- + Configuration entry removal may have a side effect of removing unused transport + and shutting down unused transport dispatcher. + """ + lcd.unconfigure(snmpEngine, authData) + + @asyncio.coroutine def sendNotification(snmpEngine, authData, transportTarget, contextData, notifyType, varBinds, **options): diff --git a/pysnmp/hlapi/lcd.py b/pysnmp/hlapi/lcd.py index 91042645..c276eb3b 100644 --- a/pysnmp/hlapi/lcd.py +++ b/pysnmp/hlapi/lcd.py @@ -37,7 +37,8 @@ class CommandGeneratorLcdConfigurator(AbstractLcdConfigurator): def configure(self, snmpEngine, authData, transportTarget, *options): cache = self._getCache(snmpEngine) if isinstance(authData, CommunityData): - if authData.communityIndex not in cache['auth']: + authDataKey = authData.communityIndex + if authDataKey not in cache['auth']: config.addV1System( snmpEngine, authData.communityIndex, @@ -66,21 +67,26 @@ class CommandGeneratorLcdConfigurator(AbstractLcdConfigurator): paramsKey = (authData.securityName, authData.securityLevel, authData.mpModel) + if paramsKey in cache['parm']: paramsName, useCount = cache['parm'][paramsKey] - cache['parm'][paramsKey] = paramsName, useCount + 1 + useCount.add(authDataKey) else: paramsName = 'p%s' % self.nextID() config.addTargetParams( snmpEngine, paramsName, authData.securityName, authData.securityLevel, authData.mpModel ) - cache['parm'][paramsKey] = paramsName, 1 + cache['parm'][paramsKey] = paramsName, set([authDataKey]) + + transportKey = (paramsName, transportTarget.transportDomain, + transportTarget.transportAddr, + transportTarget.tagList) if transportTarget.transportDomain in cache['tran']: transport, useCount = cache['tran'][transportTarget.transportDomain] transportTarget.verifyDispatcherCompatibility(snmpEngine) - cache['tran'][transportTarget.transportDomain] = transport, useCount + 1 + useCount.add(transportKey) elif config.getTransport(snmpEngine, transportTarget.transportDomain): transportTarget.verifyDispatcherCompatibility(snmpEngine) else: @@ -90,15 +96,10 @@ class CommandGeneratorLcdConfigurator(AbstractLcdConfigurator): transportTarget.transportDomain, transport ) - cache['tran'][transportTarget.transportDomain] = transport, 1 - - transportKey = (paramsName, transportTarget.transportDomain, - transportTarget.transportAddr, - transportTarget.tagList) + cache['tran'][transportTarget.transportDomain] = transport, set([transportKey]) if transportKey in cache['addr']: - addrName, useCount = cache['addr'][transportKey] - cache['addr'][transportKey] = addrName, useCount + 1 + addrName = cache['addr'][transportKey] else: addrName = 'a%s' % self.nextID() config.addTargetAddr( @@ -110,7 +111,7 @@ class CommandGeneratorLcdConfigurator(AbstractLcdConfigurator): transportTarget.retries, transportTarget.tagList ) - cache['addr'][transportKey] = addrName, 1 + cache['addr'][transportKey] = addrName return addrName, paramsName @@ -152,12 +153,12 @@ class CommandGeneratorLcdConfigurator(AbstractLcdConfigurator): paramsKey = (authDataX.securityName, authDataX.securityLevel, authDataX.mpModel) + if paramsKey in cache['parm']: paramsName, useCount = cache['parm'][paramsKey] - useCount -= 1 - if useCount: - cache['parm'][paramsKey] = paramsName, useCount - else: + if authDataKey in useCount: + useCount.remove(authDataKey) + if not useCount: del cache['parm'][paramsKey] config.delTargetParams( snmpEngine, paramsName @@ -169,24 +170,20 @@ class CommandGeneratorLcdConfigurator(AbstractLcdConfigurator): addrKeys = [x for x in cache['addr'] if x[0] == paramsName] for addrKey in addrKeys: - addrName, useCount = cache['addr'][addrKey] - useCount -= 1 - if useCount: - cache['addr'][addrKey] = addrName, useCount - else: - config.delTargetAddr(snmpEngine, addrName) - - addrNames.add(addrKey) - - if addrKey[1] in cache['tran']: - transport, useCount = cache['tran'][addrKey[1]] - if useCount > 1: - useCount -= 1 - cache['tran'][addrKey[1]] = transport, useCount - else: - config.delTransport(snmpEngine, addrKey[1]) - transport.closeTransport() - del cache['tran'][addrKey[1]] + addrName = cache['addr'][addrKey] + + config.delTargetAddr(snmpEngine, addrName) + + addrNames.add(addrKey) + + if addrKey[1] in cache['tran']: + transport, useCount = cache['tran'][addrKey[1]] + if addrKey in useCount: + useCount.remove(addrKey) + if not useCount: + config.delTransport(snmpEngine, addrKey[1]) + transport.closeTransport() + del cache['tran'][addrKey[1]] return addrNames, paramsNames -- cgit v1.2.1