summaryrefslogtreecommitdiff
path: root/examples/v3arch/asyncore/oneliner/manager/cmdgen/get-threaded-multiple-transports-and-protocols.py
blob: 47630ea5ac447539e9a38fdf689b3dc507f8f45f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
"""
Query Agents from multiple threads
++++++++++++++++++++++++++++++++++

Send a bunch of SNMP GET requests simultaneously using the following options:

* process 5 GET requests in 3 parallel threads
* with SNMPv1, community 'public' and 
  with SNMPv2c, community 'public' and
  with SNMPv3, user 'usr-md5-des', MD5 auth and DES privacy
* over IPv4/UDP and 
  over IPv6/UDP
* to an Agent at demo.snmplabs.com:161 and
  to an Agent at [::1]:161
* for instances of SNMPv2-MIB::sysDescr.0 and
  SNMPv2-MIB::sysLocation.0 MIB objects

"""#
from sys import version_info
if version_info[0] == 2:
    from Queue import Queue
else:
    from queue import Queue
from threading import Thread
from pysnmp.entity.rfc3413.oneliner.cmdgen import *

# List of targets in the followin format:
# ( ( authData, transportTarget, varNames ), ... )
targets = (
    # 1-st target (SNMPv1 over IPv4/UDP)
    ( CommunityData('public', mpModel=0),
      UdpTransportTarget(('demo.snmplabs.com', 161)),
      ( ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0)),
        ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysLocation', 0)))),
    # 2-nd target (SNMPv2c over IPv4/UDP)
    ( CommunityData('public'),
      UdpTransportTarget(('demo.snmplabs.com', 161)),
      ( ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0)),
        ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysLocation', 0)))),
    # 3-nd target (SNMPv2c over IPv4/UDP) - same community and 
    # different transport address.
    ( CommunityData('public'),
      UdpTransportTarget(('localhost', 161)),
      ( ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysContact', 0)),
        ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysName', 0)))),
    # 4-nd target (SNMPv3 over IPv4/UDP)
    ( UsmUserData('usr-md5-des', 'authkey1', 'privkey1'),
      UdpTransportTarget(('demo.snmplabs.com', 161)),
      ( ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0)),
        ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysLocation', 0) ))),
    # 5-th target (SNMPv3 over IPv6/UDP)
    ( UsmUserData('usr-md5-none', 'authkey1'),
      Udp6TransportTarget(('::1', 161)),
      ( ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0)),
        ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysLocation', 0)))),
    # N-th target
    # ...
)

class Worker(Thread):
    def __init__(self, requests, responses):
        Thread.__init__(self)
        self.snmpEngine = SnmpEngine()
        self.requests = requests
        self.responses = responses
        self.setDaemon(True)
        self.start()
    
    def run(self):
        while True:
            authData, transportTarget, varBinds = self.requests.get()
            self.responses.append(
                next(
                    getCmd(
                        self.snmpEngine,
                        authData, transportTarget, ContextData(), *varBinds
                    )
                )
            )
            if hasattr(self.requests, 'task_done'):  # 2.5+
                self.requests.task_done()

class ThreadPool:
    def __init__(self, num_threads):
        self.requests = Queue(num_threads)
        self.responses = []
        for _ in range(num_threads):
            Worker(self.requests, self.responses)

    def addRequest(self, authData, transportTarget, varBinds):
        self.requests.put((authData, transportTarget, varBinds))

    def getResponses(self): return self.responses

    def waitCompletion(self):
        if hasattr(self.requests, 'join'):
            self.requests.join()  # 2.5+
        else:
            from time import sleep
            # this is a lame substitute for missing .join()
            # adding an explicit synchronization might be a better solution
            while not self.requests.empty():  
                sleep(1) 

pool = ThreadPool(3)

# Submit GET requests
for authData, transportTarget, varBinds in targets:
    pool.addRequest(authData, transportTarget, varBinds)
 
# Wait for responses or errors
pool.waitCompletion()

# Walk through responses
for errorIndication, errorStatus, errorIndex, varBinds in pool.getResponses():
    print('Response for %s from %s:' % (authData, transportTarget))
    if errorIndication:
        print(errorIndication)
    elif errorStatus:
        print('%s at %s' % (
                errorStatus.prettyPrint(),
                errorIndex and varBinds[int(errorIndex)-1][0] or '?'
            )
        )
    else:
        for varBind in varBinds:
            print(' = '.join([ x.prettyPrint() for x in varBind ]))