""" Listen for notifications at IPv4 & IPv6 interfaces ++++++++++++++++++++++++++++++++++++++++++++++++++ Receive SNMP TRAP messages with the following options: * SNMPv1/SNMPv2c * with SNMP community "public" * over IPv4/UDP, listening at 127.0.0.1:162 * over IPv6/UDP, listening at [::1]:162 * print received data on stdout Either of the following Net-SNMP commands will send notifications to this receiver: | $ snmptrap -v1 -c public 127.0.0.1 1.3.6.1.4.1.20408.4.1.1.2 127.0.0.1 1 1 123 1.3.6.1.2.1.1.1.0 s test | $ snmptrap -v2c -c public udp6:[::1] 123 1.3.6.1.6.3.1.1.5.1 1.3.6.1.2.1.1.5.0 s test Notification Receiver below uses two different transports for communication with Notification Originators - UDP over IPv4 and UDP over IPv6. """# from pysnmp.carrier.asyncore.dispatch import AsyncoreDispatcher from pysnmp.carrier.asyncore.dgram import udp, udp6, unix from pyasn1.codec.ber import decoder from pysnmp.proto import api # noinspection PyUnusedLocal def cbFun(transportDispatcher, transportDomain, transportAddress, wholeMsg): while wholeMsg: msgVer = int(api.decodeMessageVersion(wholeMsg)) if msgVer in api.protoModules: pMod = api.protoModules[msgVer] else: print('Unsupported SNMP version %s' % msgVer) return reqMsg, wholeMsg = decoder.decode( wholeMsg, asn1Spec=pMod.Message(), ) print('Notification message from %s:%s: ' % ( transportDomain, transportAddress ) ) reqPDU = pMod.apiMessage.getPDU(reqMsg) if reqPDU.isSameTypeWith(pMod.TrapPDU()): if msgVer == api.protoVersion1: print('Enterprise: %s' % (pMod.apiTrapPDU.getEnterprise(reqPDU).prettyPrint())) print('Agent Address: %s' % (pMod.apiTrapPDU.getAgentAddr(reqPDU).prettyPrint())) print('Generic Trap: %s' % (pMod.apiTrapPDU.getGenericTrap(reqPDU).prettyPrint())) print('Specific Trap: %s' % (pMod.apiTrapPDU.getSpecificTrap(reqPDU).prettyPrint())) print('Uptime: %s' % (pMod.apiTrapPDU.getTimeStamp(reqPDU).prettyPrint())) varBinds = pMod.apiTrapPDU.getVarBinds(reqPDU) else: varBinds = pMod.apiPDU.getVarBinds(reqPDU) print('Var-binds:') for oid, val in varBinds: print('%s = %s' % (oid.prettyPrint(), val.prettyPrint())) return wholeMsg transportDispatcher = AsyncoreDispatcher() transportDispatcher.registerRecvCbFun(cbFun) # UDP/IPv4 transportDispatcher.registerTransport( udp.domainName, udp.UdpSocketTransport().openServerMode(('localhost', 162)) ) # UDP/IPv6 transportDispatcher.registerTransport( udp6.domainName, udp6.Udp6SocketTransport().openServerMode(('::1', 162)) ) ## Local domain socket # transportDispatcher.registerTransport( # unix.domainName, unix.UnixSocketTransport().openServerMode('/tmp/snmp-manager') # ) transportDispatcher.jobStarted(1) try: # Dispatcher will never finish as job#1 never reaches zero transportDispatcher.runDispatcher() except: transportDispatcher.closeDispatcher() raise