From 4d1d9778f9b52752a0bb271a0bc5b69f9b4f50e3 Mon Sep 17 00:00:00 2001 From: elie Date: Sun, 27 Sep 2015 10:26:26 +0000 Subject: * Asyncore and asyncio-based APIs reworked to become functions. * Asyncio and Twisted API moved entirely into high-level domain to be aligned with other high-level APIs. This WILL BREAK backward compatibility for those apps that use Twisted API. --- .../cmdgen/query-agents-from-multuple-threads.py | 127 +++++++++++++++++++++ 1 file changed, 127 insertions(+) create mode 100644 examples/hlapi/asyncore/sync/manager/cmdgen/query-agents-from-multuple-threads.py (limited to 'examples/hlapi/asyncore/sync/manager/cmdgen/query-agents-from-multuple-threads.py') diff --git a/examples/hlapi/asyncore/sync/manager/cmdgen/query-agents-from-multuple-threads.py b/examples/hlapi/asyncore/sync/manager/cmdgen/query-agents-from-multuple-threads.py new file mode 100644 index 0000000..34ead8b --- /dev/null +++ b/examples/hlapi/asyncore/sync/manager/cmdgen/query-agents-from-multuple-threads.py @@ -0,0 +1,127 @@ +""" +Query Agents from multiple threads +++++++++++++++++++++++++++++++++++ + +Send a bunch of SNMP GET requests simultaneously using the following options: + +* process 5 GET requests in 3 parallel threads +* with SNMPv1, community 'public' and + with SNMPv2c, community 'public' and + with SNMPv3, user 'usr-md5-des', MD5 auth and DES privacy +* over IPv4/UDP and + over IPv6/UDP +* to an Agent at demo.snmplabs.com:161 and + to an Agent at [::1]:161 +* for instances of SNMPv2-MIB::sysDescr.0 and + SNMPv2-MIB::sysLocation.0 MIB objects + +"""# +from sys import version_info +if version_info[0] == 2: + from Queue import Queue +else: + from queue import Queue +from threading import Thread +from pysnmp.hlapi import * + +# List of targets in the followin format: +# ( ( authData, transportTarget, varNames ), ... ) +targets = ( + # 1-st target (SNMPv1 over IPv4/UDP) + ( CommunityData('public', mpModel=0), + UdpTransportTarget(('demo.snmplabs.com', 161)), + ( ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0)), + ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysLocation', 0)))), + # 2-nd target (SNMPv2c over IPv4/UDP) + ( CommunityData('public'), + UdpTransportTarget(('demo.snmplabs.com', 161)), + ( ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0)), + ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysLocation', 0)))), + # 3-nd target (SNMPv2c over IPv4/UDP) - same community and + # different transport address. + ( CommunityData('public'), + UdpTransportTarget(('localhost', 161)), + ( ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysContact', 0)), + ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysName', 0)))), + # 4-nd target (SNMPv3 over IPv4/UDP) + ( UsmUserData('usr-md5-des', 'authkey1', 'privkey1'), + UdpTransportTarget(('demo.snmplabs.com', 161)), + ( ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0)), + ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysLocation', 0) ))), + # 5-th target (SNMPv3 over IPv6/UDP) + ( UsmUserData('usr-md5-none', 'authkey1'), + Udp6TransportTarget(('::1', 161)), + ( ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0)), + ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysLocation', 0)))), + # N-th target + # ... +) + +class Worker(Thread): + def __init__(self, requests, responses): + Thread.__init__(self) + self.snmpEngine = SnmpEngine() + self.requests = requests + self.responses = responses + self.setDaemon(True) + self.start() + + def run(self): + while True: + authData, transportTarget, varBinds = self.requests.get() + self.responses.append( + next( + getCmd( + self.snmpEngine, + authData, transportTarget, ContextData(), *varBinds + ) + ) + ) + if hasattr(self.requests, 'task_done'): # 2.5+ + self.requests.task_done() + +class ThreadPool: + def __init__(self, num_threads): + self.requests = Queue(num_threads) + self.responses = [] + for _ in range(num_threads): + Worker(self.requests, self.responses) + + def addRequest(self, authData, transportTarget, varBinds): + self.requests.put((authData, transportTarget, varBinds)) + + def getResponses(self): return self.responses + + def waitCompletion(self): + if hasattr(self.requests, 'join'): + self.requests.join() # 2.5+ + else: + from time import sleep + # this is a lame substitute for missing .join() + # adding an explicit synchronization might be a better solution + while not self.requests.empty(): + sleep(1) + +pool = ThreadPool(3) + +# Submit GET requests +for authData, transportTarget, varBinds in targets: + pool.addRequest(authData, transportTarget, varBinds) + +# Wait for responses or errors +pool.waitCompletion() + +# Walk through responses +for errorIndication, errorStatus, errorIndex, varBinds in pool.getResponses(): + print('Response for %s from %s:' % (authData, transportTarget)) + if errorIndication: + print(errorIndication) + elif errorStatus: + print('%s at %s' % ( + errorStatus.prettyPrint(), + errorIndex and varBinds[int(errorIndex)-1][0] or '?' + ) + ) + else: + for varBind in varBinds: + print(' = '.join([ x.prettyPrint() for x in varBind ])) -- cgit v1.2.1