""" Implementing scalar MIB objects +++++++++++++++++++++++++++++++ Listen and respond to SNMP GET/GETNEXT queries with the following options: * SNMPv1 or SNMPv2c * with SNMP community "public" * over IPv4/UDP, listening at * over IPv6/UDP, listening at [::1]:161 * serving two Managed Objects Instances (sysDescr.0 and sysUptime.0) Either of the following Net-SNMP commands will walk this Agent: | $ snmpwalk -v2c -c public .1.3.6 | $ snmpwalk -v2c -c public udp6:[::1] .1.3.6 The Command Receiver below uses two distinct transports for communication with Command Generators - UDP over IPv4 and UDP over IPv6. """# from pysnmp.carrier.asyncore.dispatch import AsyncoreDispatcher from pysnmp.carrier.asyncore.dgram import udp, udp6 from pyasn1.codec.ber import encoder, decoder from pysnmp.proto import api import time, bisect class SysDescr(object): name = (1, 3, 6, 1, 2, 1, 1, 1, 0) def __eq__(self, other): return self.name == other def __ne__(self, other): return self.name != other def __lt__(self, other): return self.name < other def __le__(self, other): return self.name <= other def __gt__(self, other): return self.name > other def __ge__(self, other): return self.name >= other def __call__(self, protoVer): return api.PROTOCOL_MODULES[protoVer].OctetString( 'PySNMP example command responder' ) class Uptime(object): name = (1, 3, 6, 1, 2, 1, 1, 3, 0) birthday = time.time() def __eq__(self, other): return self.name == other def __ne__(self, other): return self.name != other def __lt__(self, other): return self.name < other def __le__(self, other): return self.name <= other def __gt__(self, other): return self.name > other def __ge__(self, other): return self.name >= other def __call__(self, protoVer): return api.PROTOCOL_MODULES[protoVer].TimeTicks( (time.time() - self.birthday) * 100 ) mibInstr = ( SysDescr(), Uptime() # sorted by object name ) mibInstrIdx = {} for mibVar in mibInstr: mibInstrIdx[mibVar.name] = mibVar def cbFun(transportDispatcher, transportDomain, transportAddress, wholeMsg): while wholeMsg: msgVer = api.decodeMessageVersion(wholeMsg) if msgVer in api.PROTOCOL_MODULES: pMod = api.PROTOCOL_MODULES[msgVer] else: print('Unsupported SNMP version %s' % msgVer) return reqMsg, wholeMsg = decoder.decode( wholeMsg, asn1Spec=pMod.Message(), ) rspMsg = pMod.apiMessage.getResponse(reqMsg) rspPDU = pMod.apiMessage.getPDU(rspMsg) reqPDU = pMod.apiMessage.getPDU(reqMsg) varBinds = [] pendingErrors = [] errorIndex = 0 # GETNEXT PDU if reqPDU.isSameTypeWith(pMod.GetNextRequestPDU()): # Produce response var-binds for oid, val in pMod.apiPDU.getVarBinds(reqPDU): errorIndex = errorIndex + 1 # Search next OID to report nextIdx = bisect.bisect(mibInstr, oid) if nextIdx == len(mibInstr): # Out of MIB varBinds.append((oid, val)) pendingErrors.append( (pMod.apiPDU.setEndOfMibError, errorIndex) ) else: # Report value if OID is found varBinds.append( (mibInstr[nextIdx].name, mibInstr[nextIdx](msgVer)) ) elif reqPDU.isSameTypeWith(pMod.GetRequestPDU()): for oid, val in pMod.apiPDU.getVarBinds(reqPDU): if oid in mibInstrIdx: varBinds.append((oid, mibInstrIdx[oid](msgVer))) else: # No such instance varBinds.append((oid, val)) pendingErrors.append( (pMod.apiPDU.setNoSuchInstanceError, errorIndex) ) break else: # Report unsupported request type pMod.apiPDU.setErrorStatus(rspPDU, 'genErr') pMod.apiPDU.setVarBinds(rspPDU, varBinds) # Commit possible error indices to response PDU for f, i in pendingErrors: f(rspPDU, i) transportDispatcher.sendMessage( encoder.encode(rspMsg), transportDomain, transportAddress ) return wholeMsg transportDispatcher = AsyncoreDispatcher() transportDispatcher.registerRecvCbFun(cbFun) # UDP/IPv4 transportDispatcher.registerTransport( udp.DOMAIN_NAME, udp.UdpSocketTransport().openServerMode(('localhost', 161)) ) # UDP/IPv6 transportDispatcher.registerTransport( udp6.DOMAIN_NAME, udp6.Udp6SocketTransport().openServerMode(('::1', 161)) ) transportDispatcher.jobStarted(1) try: # Dispatcher will never finish as job#1 never reaches zero transportDispatcher.runDispatcher() except: transportDispatcher.closeDispatcher() raise