summaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorIlya Etingof <etingof@gmail.com>2016-04-01 01:08:28 +0200
committerIlya Etingof <etingof@gmail.com>2016-04-01 01:08:28 +0200
commit9541f201e33cd613995cee2e7a7656aa687deb51 (patch)
treefdc955b26681f10d7755a2f2aff115c3bcf85bfa /examples
parent18f39e2228d370a336500c5ecc539658d890f06d (diff)
downloadpysnmp-git-9541f201e33cd613995cee2e7a7656aa687deb51.tar.gz
pep8 reformatted
Diffstat (limited to 'examples')
-rw-r--r--examples/smi/manager/var-binds-mib-resolution.py13
-rw-r--r--examples/v1arch/asyncore/agent/cmdrsp/implementing-scalar-mib-objects.py54
-rw-r--r--examples/v1arch/asyncore/agent/ntforg/send-inform-over-ipv4-and-ipv6.py16
-rw-r--r--examples/v1arch/asyncore/agent/ntforg/send-trap-over-ipv4-and-ipv6.py14
-rw-r--r--examples/v1arch/asyncore/manager/cmdgen/broadcast-agent-discovery.py15
-rw-r--r--examples/v1arch/asyncore/manager/cmdgen/fetch-scalar-value.py34
-rw-r--r--examples/v1arch/asyncore/manager/cmdgen/getbulk-pull-whole-mib.py22
-rw-r--r--examples/v1arch/asyncore/manager/cmdgen/getnext-pull-whole-mib.py22
-rw-r--r--examples/v1arch/asyncore/manager/cmdgen/spoof-source-address.py15
-rw-r--r--examples/v1arch/asyncore/manager/cmdgen/v2c-set.py14
-rw-r--r--examples/v1arch/asyncore/manager/ntfrcv/listen-on-ipv4-and-ipv6-interfaces.py34
11 files changed, 137 insertions, 116 deletions
diff --git a/examples/smi/manager/var-binds-mib-resolution.py b/examples/smi/manager/var-binds-mib-resolution.py
index ffbd2f92..cee8b7c0 100644
--- a/examples/smi/manager/var-binds-mib-resolution.py
+++ b/examples/smi/manager/var-binds-mib-resolution.py
@@ -41,7 +41,7 @@ mibVar = rfc1902.ObjectIdentity(str(mibVar)).resolveWithMib(mibView)
print(mibVar.prettyPrint(), tuple(mibVar), str(mibVar))
# Obtain MIB object information by a mix of OID/label parts
-mibVar = rfc1902.ObjectIdentity((1,3,6,1,2,'mib-2',1,'sysDescr')).resolveWithMib(mibView)
+mibVar = rfc1902.ObjectIdentity((1, 3, 6, 1, 2, 'mib-2', 1, 'sysDescr')).resolveWithMib(mibView)
print(mibVar.prettyPrint(), tuple(mibVar), str(mibVar))
@@ -85,18 +85,17 @@ varBind = rfc1902.ObjectType(
print(varBind[0].prettyPrint(), varBind[1].__class__.__name__, varBind[1].prettyPrint())
# Create var-binds from MIB notification object (without OBJECTS clause)
-varBinds = rfc1902.NotificationType(
+varBinds = rfc1902.NotificationType(
rfc1902.ObjectIdentity('SNMPv2-MIB', 'coldStart')
).resolveWithMib(mibView)
-print([ '%s = %s(%s)' % (x[0].prettyPrint(), x[1].__class__.__name__, x[1].prettyPrint()) for x in varBinds])
+print(['%s = %s(%s)' % (x[0].prettyPrint(), x[1].__class__.__name__, x[1].prettyPrint()) for x in varBinds])
# Create var-binds from MIB notification object (with OBJECTS clause)
-varBinds = rfc1902.NotificationType(
+varBinds = rfc1902.NotificationType(
rfc1902.ObjectIdentity('IF-MIB', 'linkUp'),
- instanceIndex = (1,),
- objects= { ('IF-MIB', 'ifOperStatus'): 'down' }
+ instanceIndex=(1,),
+ objects={('IF-MIB', 'ifOperStatus'): 'down'}
).resolveWithMib(mibView)
print(varBinds.prettyPrint())
-
diff --git a/examples/v1arch/asyncore/agent/cmdrsp/implementing-scalar-mib-objects.py b/examples/v1arch/asyncore/agent/cmdrsp/implementing-scalar-mib-objects.py
index 5119a159..e7f4eca0 100644
--- a/examples/v1arch/asyncore/agent/cmdrsp/implementing-scalar-mib-objects.py
+++ b/examples/v1arch/asyncore/agent/cmdrsp/implementing-scalar-mib-objects.py
@@ -25,41 +25,59 @@ from pyasn1.codec.ber import encoder, decoder
from pysnmp.proto import api
import time, bisect
+
class SysDescr:
- name = (1,3,6,1,2,1,1,1,0)
+ name = (1, 3, 6, 1, 2, 1, 1, 1, 0)
+
def __eq__(self, other): return self.name == other
+
def __ne__(self, other): return self.name != other
+
def __lt__(self, other): return self.name < other
+
def __le__(self, other): return self.name <= other
+
def __gt__(self, other): return self.name > other
+
def __ge__(self, other): return self.name >= other
+
def __call__(self, protoVer):
return api.protoModules[protoVer].OctetString(
'PySNMP example command responder'
- )
+ )
+
class Uptime:
- name = (1,3,6,1,2,1,1,3,0)
+ name = (1, 3, 6, 1, 2, 1, 1, 3, 0)
birthday = time.time()
+
def __eq__(self, other): return self.name == other
+
def __ne__(self, other): return self.name != other
+
def __lt__(self, other): return self.name < other
+
def __le__(self, other): return self.name <= other
+
def __gt__(self, other): return self.name > other
- def __ge__(self, other): return self.name >= other
+
+ def __ge__(self, other): return self.name >= other
+
def __call__(self, protoVer):
return api.protoModules[protoVer].TimeTicks(
- (time.time()-self.birthday)*100
- )
+ (time.time() - self.birthday) * 100
+ )
+
mibInstr = (
- SysDescr(), Uptime() # sorted by object name
- )
+ SysDescr(), Uptime() # sorted by object name
+)
mibInstrIdx = {}
for mibVar in mibInstr:
mibInstrIdx[mibVar.name] = mibVar
+
def cbFun(transportDispatcher, transportDomain, transportAddress, wholeMsg):
while wholeMsg:
msgVer = api.decodeMessageVersion(wholeMsg)
@@ -70,11 +88,12 @@ def cbFun(transportDispatcher, transportDomain, transportAddress, wholeMsg):
return
reqMsg, wholeMsg = decoder.decode(
wholeMsg, asn1Spec=pMod.Message(),
- )
+ )
rspMsg = pMod.apiMessage.getResponse(reqMsg)
- rspPDU = pMod.apiMessage.getPDU(rspMsg)
+ rspPDU = pMod.apiMessage.getPDU(rspMsg)
reqPDU = pMod.apiMessage.getPDU(reqMsg)
- varBinds = []; pendingErrors = []
+ varBinds = [];
+ pendingErrors = []
errorIndex = 0
# GETNEXT PDU
if reqPDU.isSameTypeWith(pMod.GetNextRequestPDU()):
@@ -88,12 +107,12 @@ def cbFun(transportDispatcher, transportDomain, transportAddress, wholeMsg):
varBinds.append((oid, val))
pendingErrors.append(
(pMod.apiPDU.setEndOfMibError, errorIndex)
- )
+ )
else:
# Report value if OID is found
varBinds.append(
(mibInstr[nextIdx].name, mibInstr[nextIdx](msgVer))
- )
+ )
elif reqPDU.isSameTypeWith(pMod.GetRequestPDU()):
for oid, val in pMod.apiPDU.getVarBinds(reqPDU):
if oid in mibInstrIdx:
@@ -103,7 +122,7 @@ def cbFun(transportDispatcher, transportDomain, transportAddress, wholeMsg):
varBinds.append((oid, val))
pendingErrors.append(
(pMod.apiPDU.setNoSuchInstanceError, errorIndex)
- )
+ )
break
else:
# Report unsupported request type
@@ -114,9 +133,10 @@ def cbFun(transportDispatcher, transportDomain, transportAddress, wholeMsg):
f(rspPDU, i)
transportDispatcher.sendMessage(
encoder.encode(rspMsg), transportDomain, transportAddress
- )
+ )
return wholeMsg
+
transportDispatcher = AsyncoreDispatcher()
transportDispatcher.registerRecvCbFun(cbFun)
@@ -131,9 +151,9 @@ transportDispatcher.registerTransport(
)
## Local domain socket
-#transportDispatcher.registerTransport(
+# transportDispatcher.registerTransport(
# unix.domainName, unix.UnixSocketTransport().openServerMode('/tmp/snmp-agent')
-#)
+# )
transportDispatcher.jobStarted(1)
diff --git a/examples/v1arch/asyncore/agent/ntforg/send-inform-over-ipv4-and-ipv6.py b/examples/v1arch/asyncore/agent/ntforg/send-inform-over-ipv4-and-ipv6.py
index 528d4fe0..138da0c4 100644
--- a/examples/v1arch/asyncore/agent/ntforg/send-inform-over-ipv4-and-ipv6.py
+++ b/examples/v1arch/asyncore/agent/ntforg/send-inform-over-ipv4-and-ipv6.py
@@ -24,7 +24,7 @@ from pyasn1.codec.ber import encoder, decoder
from pysnmp.proto.api import v2c as pMod
# Build PDU
-reqPDU = pMod.InformRequestPDU()
+reqPDU = pMod.InformRequestPDU()
pMod.apiTrapPDU.setDefaults(reqPDU)
# Build message
@@ -35,6 +35,7 @@ pMod.apiMessage.setPDU(trapMsg, reqPDU)
startedAt = time()
+
def cbTimerFun(timeNow):
if timeNow - startedAt > 3:
raise Exception("Request timed out")
@@ -47,7 +48,7 @@ def cbRecvFun(transportDispatcher, transportDomain, transportAddress,
rspMsg, wholeMsg = decoder.decode(wholeMsg, asn1Spec=pMod.Message())
rspPDU = pMod.apiMessage.getPDU(rspMsg)
# Match response to request
- if pMod.apiPDU.getRequestID(reqPDU)==pMod.apiPDU.getRequestID(rspPDU):
+ if pMod.apiPDU.getRequestID(reqPDU) == pMod.apiPDU.getRequestID(rspPDU):
# Check for SNMP errors reported
errorStatus = pMod.apiPDU.getErrorStatus(rspPDU)
if errorStatus:
@@ -59,6 +60,7 @@ def cbRecvFun(transportDispatcher, transportDomain, transportAddress,
transportDispatcher.jobFinished(1)
return wholeMsg
+
transportDispatcher = AsynsockDispatcher()
transportDispatcher.registerRecvCbFun(cbRecvFun)
@@ -74,13 +76,13 @@ transportDispatcher.sendMessage(
transportDispatcher.jobStarted(1)
# UDP/IPv6
-#transportDispatcher.registerTransport(
+# transportDispatcher.registerTransport(
# udp6.domainName, udp6.Udp6SocketTransport().openClientMode()
-#)
-#transportDispatcher.sendMessage(
+# )
+# transportDispatcher.sendMessage(
# encoder.encode(trapMsg), udp6.domainName, ('::1', 162)
-#)
-#transportDispatcher.jobStarted(1)
+# )
+# transportDispatcher.jobStarted(1)
# Dispatcher will finish as all scheduled messages are sent
transportDispatcher.runDispatcher()
diff --git a/examples/v1arch/asyncore/agent/ntforg/send-trap-over-ipv4-and-ipv6.py b/examples/v1arch/asyncore/agent/ntforg/send-trap-over-ipv4-and-ipv6.py
index e662fc72..314e0e61 100644
--- a/examples/v1arch/asyncore/agent/ntforg/send-trap-over-ipv4-and-ipv6.py
+++ b/examples/v1arch/asyncore/agent/ntforg/send-trap-over-ipv4-and-ipv6.py
@@ -29,15 +29,15 @@ from pysnmp.proto import api
# Protocol version to use
pMod = api.protoModules[api.protoVersion1]
-#pMod = api.protoModules[api.protoVersion2c]
+# pMod = api.protoModules[api.protoVersion2c]
# Build PDU
-trapPDU = pMod.TrapPDU()
+trapPDU = pMod.TrapPDU()
pMod.apiTrapPDU.setDefaults(trapPDU)
# Traps have quite different semantics across proto versions
if pMod == api.protoModules[api.protoVersion1]:
- pMod.apiTrapPDU.setEnterprise(trapPDU, (1,3,6,1,1,2,3,4,1))
+ pMod.apiTrapPDU.setEnterprise(trapPDU, (1, 3, 6, 1, 1, 2, 3, 4, 1))
pMod.apiTrapPDU.setGenericTrap(trapPDU, 'coldStart')
# Build message
@@ -65,12 +65,12 @@ transportDispatcher.sendMessage(
)
## Local domain socket
-#transportDispatcher.registerTransport(
+# transportDispatcher.registerTransport(
# unix.domainName, unix.UnixSocketTransport().openClientMode()
-#)
-#transportDispatcher.sendMessage(
+# )
+# transportDispatcher.sendMessage(
# encoder.encode(trapMsg), unix.domainName, '/tmp/snmp-manager'
-#)
+# )
# Dispatcher will finish as all scheduled messages are sent
transportDispatcher.runDispatcher()
diff --git a/examples/v1arch/asyncore/manager/cmdgen/broadcast-agent-discovery.py b/examples/v1arch/asyncore/manager/cmdgen/broadcast-agent-discovery.py
index c2452282..b7f67da2 100644
--- a/examples/v1arch/asyncore/manager/cmdgen/broadcast-agent-discovery.py
+++ b/examples/v1arch/asyncore/manager/cmdgen/broadcast-agent-discovery.py
@@ -31,16 +31,16 @@ maxWaitForResponses = 5
maxNumberResponses = 10
# Protocol version to use
-#pMod = api.protoModules[api.protoVersion1]
+# pMod = api.protoModules[api.protoVersion1]
pMod = api.protoModules[api.protoVersion2c]
# Build PDU
-reqPDU = pMod.GetRequestPDU()
+reqPDU = pMod.GetRequestPDU()
pMod.apiPDU.setDefaults(reqPDU)
pMod.apiPDU.setVarBinds(
- reqPDU, ( ('1.3.6.1.2.1.1.1.0', pMod.Null('')),
- ('1.3.6.1.2.1.1.3.0', pMod.Null('')) )
- )
+ reqPDU, (('1.3.6.1.2.1.1.1.0', pMod.Null('')),
+ ('1.3.6.1.2.1.1.3.0', pMod.Null('')))
+)
# Build message
reqMsg = pMod.Message()
@@ -50,8 +50,10 @@ pMod.apiMessage.setPDU(reqMsg, reqPDU)
startedAt = time()
+
class StopWaiting(Exception): pass
+
def cbTimerFun(timeNow):
if timeNow - startedAt > maxWaitForResponses:
raise StopWaiting()
@@ -64,7 +66,7 @@ def cbRecvFun(transportDispatcher, transportDomain, transportAddress,
rspMsg, wholeMsg = decoder.decode(wholeMsg, asn1Spec=pMod.Message())
rspPDU = pMod.apiMessage.getPDU(rspMsg)
# Match response to request
- if pMod.apiPDU.getRequestID(reqPDU)==pMod.apiPDU.getRequestID(rspPDU):
+ if pMod.apiPDU.getRequestID(reqPDU) == pMod.apiPDU.getRequestID(rspPDU):
# Check for SNMP errors reported
errorStatus = pMod.apiPDU.getErrorStatus(rspPDU)
if errorStatus:
@@ -75,6 +77,7 @@ def cbRecvFun(transportDispatcher, transportDomain, transportAddress,
transportDispatcher.jobFinished(1)
return wholeMsg
+
transportDispatcher = AsyncoreDispatcher()
transportDispatcher.registerRecvCbFun(cbRecvFun)
diff --git a/examples/v1arch/asyncore/manager/cmdgen/fetch-scalar-value.py b/examples/v1arch/asyncore/manager/cmdgen/fetch-scalar-value.py
index c350f85b..a61b51fa 100644
--- a/examples/v1arch/asyncore/manager/cmdgen/fetch-scalar-value.py
+++ b/examples/v1arch/asyncore/manager/cmdgen/fetch-scalar-value.py
@@ -22,15 +22,15 @@ from time import time
# Protocol version to use
pMod = api.protoModules[api.protoVersion1]
-#pMod = api.protoModules[api.protoVersion2c]
+# pMod = api.protoModules[api.protoVersion2c]
# Build PDU
-reqPDU = pMod.GetRequestPDU()
+reqPDU = pMod.GetRequestPDU()
pMod.apiPDU.setDefaults(reqPDU)
pMod.apiPDU.setVarBinds(
- reqPDU, ( ('1.3.6.1.2.1.1.1.0', pMod.Null('')),
- ('1.3.6.1.2.1.1.3.0', pMod.Null('')) )
- )
+ reqPDU, (('1.3.6.1.2.1.1.1.0', pMod.Null('')),
+ ('1.3.6.1.2.1.1.3.0', pMod.Null('')))
+)
# Build message
reqMsg = pMod.Message()
@@ -40,6 +40,7 @@ pMod.apiMessage.setPDU(reqMsg, reqPDU)
startedAt = time()
+
def cbTimerFun(timeNow):
if timeNow - startedAt > 3:
raise Exception("Request timed out")
@@ -52,7 +53,7 @@ def cbRecvFun(transportDispatcher, transportDomain, transportAddress,
rspMsg, wholeMsg = decoder.decode(wholeMsg, asn1Spec=pMod.Message())
rspPDU = pMod.apiMessage.getPDU(rspMsg)
# Match response to request
- if pMod.apiPDU.getRequestID(reqPDU)==pMod.apiPDU.getRequestID(rspPDU):
+ if pMod.apiPDU.getRequestID(reqPDU) == pMod.apiPDU.getRequestID(rspPDU):
# Check for SNMP errors reported
errorStatus = pMod.apiPDU.getErrorStatus(rspPDU)
if errorStatus:
@@ -63,6 +64,7 @@ def cbRecvFun(transportDispatcher, transportDomain, transportAddress,
transportDispatcher.jobFinished(1)
return wholeMsg
+
transportDispatcher = AsyncoreDispatcher()
transportDispatcher.registerRecvCbFun(cbRecvFun)
@@ -80,26 +82,26 @@ transportDispatcher.sendMessage(
transportDispatcher.jobStarted(1)
## UDP/IPv6 (second copy of the same PDU will be sent)
-#transportDispatcher.registerTransport(
+# transportDispatcher.registerTransport(
# udp6.domainName, udp6.Udp6SocketTransport().openClientMode()
-#)
+# )
# Pass message to dispatcher
-#transportDispatcher.sendMessage(
+# transportDispatcher.sendMessage(
# encoder.encode(reqMsg), udp6.domainName, ('::1', 161)
-#)
-#transportDispatcher.jobStarted(1)
+# )
+# transportDispatcher.jobStarted(1)
## Local domain socket
-#transportDispatcher.registerTransport(
+# transportDispatcher.registerTransport(
# unix.domainName, unix.UnixSocketTransport().openClientMode()
-#)
+# )
#
# Pass message to dispatcher
-#transportDispatcher.sendMessage(
+# transportDispatcher.sendMessage(
# encoder.encode(reqMsg), unix.domainName, '/tmp/snmp-agent'
-#)
-#transportDispatcher.jobStarted(1)
+# )
+# transportDispatcher.jobStarted(1)
# Dispatcher will finish as job#1 counter reaches zero
transportDispatcher.runDispatcher()
diff --git a/examples/v1arch/asyncore/manager/cmdgen/getbulk-pull-whole-mib.py b/examples/v1arch/asyncore/manager/cmdgen/getbulk-pull-whole-mib.py
index c19194d5..8bb91c2c 100644
--- a/examples/v1arch/asyncore/manager/cmdgen/getbulk-pull-whole-mib.py
+++ b/examples/v1arch/asyncore/manager/cmdgen/getbulk-pull-whole-mib.py
@@ -22,14 +22,14 @@ from pysnmp.proto.api import v2c
from time import time
# SNMP table header
-headVars = [ v2c.ObjectIdentifier((1,3,6)) ]
+headVars = [v2c.ObjectIdentifier((1, 3, 6))]
# Build PDU
-reqPDU = v2c.GetBulkRequestPDU()
+reqPDU = v2c.GetBulkRequestPDU()
v2c.apiBulkPDU.setDefaults(reqPDU)
v2c.apiBulkPDU.setNonRepeaters(reqPDU, 0)
v2c.apiBulkPDU.setMaxRepetitions(reqPDU, 25)
-v2c.apiBulkPDU.setVarBinds(reqPDU, [ (x, v2c.null) for x in headVars ])
+v2c.apiBulkPDU.setVarBinds(reqPDU, [(x, v2c.null) for x in headVars])
# Build message
reqMsg = v2c.Message()
@@ -39,6 +39,7 @@ v2c.apiMessage.setPDU(reqMsg, reqPDU)
startedAt = time()
+
def cbTimerFun(timeNow):
if timeNow - startedAt > 3:
raise Exception("Request timed out")
@@ -53,7 +54,7 @@ def cbRecvFun(transportDispatcher, transportDomain, transportAddress,
rspPDU = v2c.apiMessage.getPDU(rspMsg)
# Match response to request
- if v2c.apiBulkPDU.getRequestID(reqPDU)==v2c.apiBulkPDU.getRequestID(rspPDU):
+ if v2c.apiBulkPDU.getRequestID(reqPDU) == v2c.apiBulkPDU.getRequestID(rspPDU):
# Format var-binds table
varBindTable = v2c.apiBulkPDU.getVarBindTable(reqPDU, rspPDU)
@@ -62,7 +63,7 @@ def cbRecvFun(transportDispatcher, transportDomain, transportAddress,
if errorStatus and errorStatus != 2:
errorIndex = v2c.apiBulkPDU.getErrorIndex(rspPDU)
print('%s at %s' % (errorStatus.prettyPrint(),
- errorIndex and varBindTable[int(errorIndex)-1] or '?'))
+ errorIndex and varBindTable[int(errorIndex) - 1] or '?'))
transportDispatcher.jobFinished(1)
break
@@ -71,8 +72,8 @@ def cbRecvFun(transportDispatcher, transportDomain, transportAddress,
for name, val in tableRow:
print('from: %s, %s = %s' % (
transportAddress, name.prettyPrint(), val.prettyPrint()
- )
)
+ )
# Stop on EOM
for oid, val in varBindTable[-1]:
@@ -80,21 +81,22 @@ def cbRecvFun(transportDispatcher, transportDomain, transportAddress,
break
else:
transportDispatcher.jobFinished(1)
-
+
# Generate request for next row
v2c.apiBulkPDU.setVarBinds(
- reqPDU, [ (x, v2c.null) for x,y in varBindTable[-1] ]
- )
+ reqPDU, [(x, v2c.null) for x, y in varBindTable[-1]]
+ )
v2c.apiBulkPDU.setRequestID(reqPDU, v2c.getNextRequestID())
transportDispatcher.sendMessage(
encoder.encode(reqMsg), transportDomain, transportAddress
- )
+ )
global startedAt
if time() - startedAt > 3:
raise Exception('Request timed out')
startedAt = time()
return wholeMsg
+
transportDispatcher = AsyncoreDispatcher()
transportDispatcher.registerRecvCbFun(cbRecvFun)
diff --git a/examples/v1arch/asyncore/manager/cmdgen/getnext-pull-whole-mib.py b/examples/v1arch/asyncore/manager/cmdgen/getnext-pull-whole-mib.py
index 9dfc52c3..84a2b09b 100644
--- a/examples/v1arch/asyncore/manager/cmdgen/getnext-pull-whole-mib.py
+++ b/examples/v1arch/asyncore/manager/cmdgen/getnext-pull-whole-mib.py
@@ -22,15 +22,15 @@ from time import time
# Protocol version to use
pMod = api.protoModules[api.protoVersion1]
-#pMod = api.protoModules[api.protoVersion2c]
+# pMod = api.protoModules[api.protoVersion2c]
# SNMP table header
-headVars = [ pMod.ObjectIdentifier((1,3,6)) ]
+headVars = [pMod.ObjectIdentifier((1, 3, 6))]
# Build PDU
-reqPDU = pMod.GetNextRequestPDU()
+reqPDU = pMod.GetNextRequestPDU()
pMod.apiPDU.setDefaults(reqPDU)
-pMod.apiPDU.setVarBinds(reqPDU, [ (x, pMod.null) for x in headVars ])
+pMod.apiPDU.setVarBinds(reqPDU, [(x, pMod.null) for x in headVars])
# Build message
reqMsg = pMod.Message()
@@ -40,6 +40,7 @@ pMod.apiMessage.setPDU(reqMsg, reqPDU)
startedAt = time()
+
def cbTimerFun(timeNow):
if timeNow - startedAt > 3:
raise Exception("Request timed out")
@@ -52,7 +53,7 @@ def cbRecvFun(transportDispatcher, transportDomain, transportAddress,
rspMsg, wholeMsg = decoder.decode(wholeMsg, asn1Spec=pMod.Message())
rspPDU = pMod.apiMessage.getPDU(rspMsg)
# Match response to request
- if pMod.apiPDU.getRequestID(reqPDU)==pMod.apiPDU.getRequestID(rspPDU):
+ if pMod.apiPDU.getRequestID(reqPDU) == pMod.apiPDU.getRequestID(rspPDU):
# Check for SNMP errors reported
errorStatus = pMod.apiPDU.getErrorStatus(rspPDU)
if errorStatus and errorStatus != 2:
@@ -64,29 +65,30 @@ def cbRecvFun(transportDispatcher, transportDomain, transportAddress,
for name, val in tableRow:
print('from: %s, %s = %s' % (
transportAddress, name.prettyPrint(), val.prettyPrint()
- )
)
+ )
# Stop on EOM
for oid, val in varBindTable[-1]:
if not isinstance(val, pMod.Null):
break
else:
transportDispatcher.jobFinished(1)
-
+
# Generate request for next row
pMod.apiPDU.setVarBinds(
- reqPDU, [ (x, pMod.null) for x,y in varBindTable[-1] ]
- )
+ reqPDU, [(x, pMod.null) for x, y in varBindTable[-1]]
+ )
pMod.apiPDU.setRequestID(reqPDU, pMod.getNextRequestID())
transportDispatcher.sendMessage(
encoder.encode(reqMsg), transportDomain, transportAddress
- )
+ )
global startedAt
if time() - startedAt > 3:
raise Exception('Request timed out')
startedAt = time()
return wholeMsg
+
transportDispatcher = AsyncoreDispatcher()
transportDispatcher.registerRecvCbFun(cbRecvFun)
diff --git a/examples/v1arch/asyncore/manager/cmdgen/spoof-source-address.py b/examples/v1arch/asyncore/manager/cmdgen/spoof-source-address.py
index 332bb8d2..929ff771 100644
--- a/examples/v1arch/asyncore/manager/cmdgen/spoof-source-address.py
+++ b/examples/v1arch/asyncore/manager/cmdgen/spoof-source-address.py
@@ -41,16 +41,16 @@ transportAddress = udp.UdpTransportAddress(('195.218.195.228', 161))
transportAddress.setLocalAddress(('1.2.3.4', 0))
# Protocol version to use
-#pMod = api.protoModules[api.protoVersion1]
+# pMod = api.protoModules[api.protoVersion1]
pMod = api.protoModules[api.protoVersion2c]
# Build PDU
-reqPDU = pMod.GetRequestPDU()
+reqPDU = pMod.GetRequestPDU()
pMod.apiPDU.setDefaults(reqPDU)
pMod.apiPDU.setVarBinds(
- reqPDU, ( ('1.3.6.1.2.1.1.1.0', pMod.Null('')),
- ('1.3.6.1.2.1.1.3.0', pMod.Null('')) )
- )
+ reqPDU, (('1.3.6.1.2.1.1.1.0', pMod.Null('')),
+ ('1.3.6.1.2.1.1.3.0', pMod.Null('')))
+)
# Build message
reqMsg = pMod.Message()
@@ -60,8 +60,10 @@ pMod.apiMessage.setPDU(reqMsg, reqPDU)
startedAt = time()
+
class StopWaiting(Exception): pass
+
def cbTimerFun(timeNow):
if timeNow - startedAt > 3:
raise StopWaiting()
@@ -74,7 +76,7 @@ def cbRecvFun(transportDispatcher, transportDomain, transportAddress,
rspMsg, wholeMsg = decoder.decode(wholeMsg, asn1Spec=pMod.Message())
rspPDU = pMod.apiMessage.getPDU(rspMsg)
# Match response to request
- if pMod.apiPDU.getRequestID(reqPDU)==pMod.apiPDU.getRequestID(rspPDU):
+ if pMod.apiPDU.getRequestID(reqPDU) == pMod.apiPDU.getRequestID(rspPDU):
# Check for SNMP errors reported
errorStatus = pMod.apiPDU.getErrorStatus(rspPDU)
if errorStatus:
@@ -85,6 +87,7 @@ def cbRecvFun(transportDispatcher, transportDomain, transportAddress,
transportDispatcher.jobFinished(1)
return wholeMsg
+
transportDispatcher = AsyncoreDispatcher()
transportDispatcher.registerRecvCbFun(cbRecvFun)
diff --git a/examples/v1arch/asyncore/manager/cmdgen/v2c-set.py b/examples/v1arch/asyncore/manager/cmdgen/v2c-set.py
index 5c8c8318..08e277a1 100644
--- a/examples/v1arch/asyncore/manager/cmdgen/v2c-set.py
+++ b/examples/v1arch/asyncore/manager/cmdgen/v2c-set.py
@@ -21,18 +21,18 @@ from pysnmp.proto import api
from time import time
# Protocol version to use
-#pMod = api.protoModules[api.protoVersion1]
+# pMod = api.protoModules[api.protoVersion1]
pMod = api.protoModules[api.protoVersion2c]
# Build PDU
-reqPDU = pMod.SetRequestPDU()
+reqPDU = pMod.SetRequestPDU()
pMod.apiPDU.setDefaults(reqPDU)
pMod.apiPDU.setVarBinds(
reqPDU,
# A list of Var-Binds to SET
- ( ('1.3.6.1.2.1.1.9.1.3.1', pMod.OctetString('New system description')),
- ('1.3.6.1.2.1.1.9.1.4.1', pMod.TimeTicks(12)) )
- )
+ (('1.3.6.1.2.1.1.9.1.3.1', pMod.OctetString('New system description')),
+ ('1.3.6.1.2.1.1.9.1.4.1', pMod.TimeTicks(12)))
+)
# Build message
reqMsg = pMod.Message()
@@ -42,6 +42,7 @@ pMod.apiMessage.setPDU(reqMsg, reqPDU)
startedAt = time()
+
def cbTimerFun(timeNow):
if timeNow - startedAt > 3:
raise Exception("Request timed out")
@@ -54,7 +55,7 @@ def cbRecvFun(transportDispatcher, transportDomain, transportAddress,
rspMsg, wholeMsg = decoder.decode(wholeMsg, asn1Spec=pMod.Message())
rspPDU = pMod.apiMessage.getPDU(rspMsg)
# Match response to request
- if pMod.apiPDU.getRequestID(reqPDU)==pMod.apiPDU.getRequestID(rspPDU):
+ if pMod.apiPDU.getRequestID(reqPDU) == pMod.apiPDU.getRequestID(rspPDU):
# Check for SNMP errors reported
errorStatus = pMod.apiPDU.getErrorStatus(rspPDU)
if errorStatus:
@@ -65,6 +66,7 @@ def cbRecvFun(transportDispatcher, transportDomain, transportAddress,
transportDispatcher.jobFinished(1)
return wholeMsg
+
transportDispatcher = AsyncoreDispatcher()
transportDispatcher.registerRecvCbFun(cbRecvFun)
diff --git a/examples/v1arch/asyncore/manager/ntfrcv/listen-on-ipv4-and-ipv6-interfaces.py b/examples/v1arch/asyncore/manager/ntfrcv/listen-on-ipv4-and-ipv6-interfaces.py
index 0dd60faa..9e3cec2b 100644
--- a/examples/v1arch/asyncore/manager/ntfrcv/listen-on-ipv4-and-ipv6-interfaces.py
+++ b/examples/v1arch/asyncore/manager/ntfrcv/listen-on-ipv4-and-ipv6-interfaces.py
@@ -37,34 +37,19 @@ def cbFun(transportDispatcher, transportDomain, transportAddress, wholeMsg):
return
reqMsg, wholeMsg = decoder.decode(
wholeMsg, asn1Spec=pMod.Message(),
- )
+ )
print('Notification message from %s:%s: ' % (
transportDomain, transportAddress
- )
)
+ )
reqPDU = pMod.apiMessage.getPDU(reqMsg)
if reqPDU.isSameTypeWith(pMod.TrapPDU()):
if msgVer == api.protoVersion1:
- print('Enterprise: %s' % (
- pMod.apiTrapPDU.getEnterprise(reqPDU).prettyPrint()
- )
- )
- print('Agent Address: %s' % (
- pMod.apiTrapPDU.getAgentAddr(reqPDU).prettyPrint()
- )
- )
- print('Generic Trap: %s' % (
- pMod.apiTrapPDU.getGenericTrap(reqPDU).prettyPrint()
- )
- )
- print('Specific Trap: %s' % (
- pMod.apiTrapPDU.getSpecificTrap(reqPDU).prettyPrint()
- )
- )
- print('Uptime: %s' % (
- pMod.apiTrapPDU.getTimeStamp(reqPDU).prettyPrint()
- )
- )
+ print('Enterprise: %s' % (pMod.apiTrapPDU.getEnterprise(reqPDU).prettyPrint()))
+ print('Agent Address: %s' % (pMod.apiTrapPDU.getAgentAddr(reqPDU).prettyPrint()))
+ print('Generic Trap: %s' % (pMod.apiTrapPDU.getGenericTrap(reqPDU).prettyPrint()))
+ print('Specific Trap: %s' % (pMod.apiTrapPDU.getSpecificTrap(reqPDU).prettyPrint()))
+ print('Uptime: %s' % (pMod.apiTrapPDU.getTimeStamp(reqPDU).prettyPrint()))
varBinds = pMod.apiTrapPDU.getVarBindList(reqPDU)
else:
varBinds = pMod.apiPDU.getVarBindList(reqPDU)
@@ -73,6 +58,7 @@ def cbFun(transportDispatcher, transportDomain, transportAddress, wholeMsg):
print('%s = %s' % (oid.prettyPrint(), val.prettyPrint()))
return wholeMsg
+
transportDispatcher = AsyncoreDispatcher()
transportDispatcher.registerRecvCbFun(cbFun)
@@ -88,9 +74,9 @@ transportDispatcher.registerTransport(
)
## Local domain socket
-#transportDispatcher.registerTransport(
+# transportDispatcher.registerTransport(
# unix.domainName, unix.UnixSocketTransport().openServerMode('/tmp/snmp-manager')
-#)
+# )
transportDispatcher.jobStarted(1)