summaryrefslogtreecommitdiff
path: root/examples/v3arch/asyncore/oneliner/manager/cmdgen/get-v3-with-specific-engine-id.py
blob: 18b3a47a413f984f41aa04cc3ccfc7e156b0b7b7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
"""
Discover SNMPv3 SecurityEngineId
++++++++++++++++++++++++++++++++

Send SNMP GET request using the following scenario and options:

* try to communicate with a SNMPv3 Engine using:

* a non-existing user
* over IPv4/UDP
* to an Agent at demo.snmplabs.com:161

* if remote SNMP Engine ID is discovered, send SNMP GET request:

* with SNMPv3, user 'usr-md5-none', MD5 authentication, no privacy
   at discovered securityEngineId
* to the same SNMP Engine ID
* for an OID in text form

"""#
from pysnmp.entity.rfc3413.oneliner.cmdgen import *

snmpEngine = SnmpEngine()

transportTarget = UdpTransportTarget(('demo.snmplabs.com', 161))

#
# To discover remote SNMP EngineID we will tap on SNMP engine inner workings
# by setting up execution point observer setup on INTERNAL class PDU processing
#

observerContext = {}

# Register a callback to be invoked at specified execution point of 
# SNMP Engine and passed local variables at execution point's local scope
snmpEngine.observer.registerObserver(
    lambda e,p,v,c: c.update(securityEngineId=v['securityEngineId']),
    'rfc3412.prepareDataElements:internal',
    cbCtx=observerContext
)

# Send probe SNMP request with invalid credentials

authData = UsmUserData('non-existing-user')

errorIndication, errorStatus, errorIndex, varBinds = next(
    getCmd(snmpEngine, authData, transportTarget, ContextData(),
           ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0)))
)

# See if our SNMP engine received REPORT PDU containing securityEngineId

if 'securityEngineId' not in observerContext:
    print("Can't discover peer EngineID, errorIndication: %s" % errorIndication)
    raise Exception()

securityEngineId = observerContext.pop('securityEngineId')

print('Remote securityEngineId = %s' % securityEngineId.prettyPrint())

#
# Query remote SNMP Engine using usmUserTable entry configured for it
#

authData = UsmUserData('usr-md5-none', 'authkey1', 
                       securityEngineId=securityEngineId)

errorIndication, errorStatus, errorIndex, varBinds = next(
    getCmd(snmpEngine,
           authData,
           transportTarget,
           ContextData(),
           ObjectType(ObjectIdentity('1.3.6.1.2.1.1.1.0')))
)

if errorIndication:
    print(errorIndication)
elif errorStatus:
    print('%s at %s' % (
        errorStatus.prettyPrint(),
        errorIndex and varBinds[int(errorIndex)-1][0] or '?'
        )
    )
else:
    for name, val in varBinds:
        print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))