diff options
author | Ilya Etingof <etingof@gmail.com> | 2019-02-26 08:56:24 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-02-26 08:56:24 +0100 |
commit | 3f2f132a9fdf7a48ec6131d5498145dded3cfcad (patch) | |
tree | 63e6170b35f6b392bf2e3d3feb6996b886e4d36f /pysnmp/carrier | |
parent | 2ad26f8bfef0e39b3789d9e6d4fcbf76820c9867 (diff) | |
download | pysnmp-git-3f2f132a9fdf7a48ec6131d5498145dded3cfcad.tar.gz |
PEP-8 long lines and dunders (#245)
This patch massively reformats the whole codebase mainly wrapping
long lines and eliminating dundered private attributes.
Diffstat (limited to 'pysnmp/carrier')
-rw-r--r-- | pysnmp/carrier/asyncio/base.py | 2 | ||||
-rw-r--r-- | pysnmp/carrier/asyncio/dgram/base.py | 46 | ||||
-rw-r--r-- | pysnmp/carrier/asyncio/dispatch.py | 26 | ||||
-rw-r--r-- | pysnmp/carrier/asyncore/base.py | 38 | ||||
-rw-r--r-- | pysnmp/carrier/asyncore/dgram/base.py | 93 | ||||
-rw-r--r-- | pysnmp/carrier/asyncore/dgram/udp6.py | 13 | ||||
-rw-r--r-- | pysnmp/carrier/asyncore/dgram/unix.py | 11 | ||||
-rw-r--r-- | pysnmp/carrier/asyncore/dispatch.py | 21 | ||||
-rw-r--r-- | pysnmp/carrier/base.py | 54 | ||||
-rw-r--r-- | pysnmp/carrier/sockmsg.py | 10 | ||||
-rw-r--r-- | pysnmp/carrier/twisted/dgram/base.py | 20 | ||||
-rw-r--r-- | pysnmp/carrier/twisted/dgram/udp.py | 12 | ||||
-rw-r--r-- | pysnmp/carrier/twisted/dgram/unix.py | 10 | ||||
-rw-r--r-- | pysnmp/carrier/twisted/dispatch.py | 22 |
14 files changed, 270 insertions, 108 deletions
diff --git a/pysnmp/carrier/asyncio/base.py b/pysnmp/carrier/asyncio/base.py index 4ee21ead..65be62ed 100644 --- a/pysnmp/carrier/asyncio/base.py +++ b/pysnmp/carrier/asyncio/base.py @@ -35,5 +35,5 @@ from pysnmp.carrier.base import AbstractTransport class AbstractAsyncioTransport(AbstractTransport): - PROTO_TRANSPORT_DISPATCHER = AsyncioDispatcher """Base Asyncio Transport, to be used with AsyncioDispatcher""" + PROTO_TRANSPORT_DISPATCHER = AsyncioDispatcher diff --git a/pysnmp/carrier/asyncio/dgram/base.py b/pysnmp/carrier/asyncio/dgram/base.py index ae5ab3a7..be91a983 100644 --- a/pysnmp/carrier/asyncio/dgram/base.py +++ b/pysnmp/carrier/asyncio/dgram/base.py @@ -57,27 +57,38 @@ class DgramAsyncioProtocol(asyncio.DatagramProtocol, AbstractAsyncioTransport): def __init__(self, sock=None, sockMap=None, loop=None): self._writeQ = [] self._lport = None + if loop is None: loop = asyncio.get_event_loop() + self.loop = loop def datagram_received(self, datagram, transportAddress): if self._cbFun is None: raise error.CarrierError('Unable to call cbFun') + else: self.loop.call_soon(self._cbFun, self, transportAddress, datagram) def connection_made(self, transport): self.transport = transport + debug.logger & debug.FLAG_IO and debug.logger('connection_made: invoked') + while self._writeQ: outgoingMessage, transportAddress = self._writeQ.pop(0) - debug.logger & debug.FLAG_IO and debug.logger('connection_made: transportAddress %r outgoingMessage %s' % - (transportAddress, debug.hexdump(outgoingMessage))) + + debug.logger & debug.FLAG_IO and debug.logger( + 'connection_made: transportAddress %r outgoingMessage %s' % ( + transportAddress, debug.hexdump(outgoingMessage))) + try: - self.transport.sendto(outgoingMessage, self.normalizeAddress(transportAddress)) + self.transport.sendto( + outgoingMessage, self.normalizeAddress(transportAddress)) + except Exception: - raise error.CarrierError(';'.join(traceback.format_exception(*sys.exc_info()))) + raise error.CarrierError( + ';'.join(traceback.format_exception(*sys.exc_info()))) def connection_lost(self, exc): debug.logger & debug.FLAG_IO and debug.logger('connection_lost: invoked') @@ -89,14 +100,17 @@ class DgramAsyncioProtocol(asyncio.DatagramProtocol, AbstractAsyncioTransport): c = self.loop.create_datagram_endpoint( lambda: self, local_addr=iface, family=self.SOCK_FAMILY ) + # Avoid deprecation warning for asyncio.async() if IS_PYTHON_344_PLUS: self._lport = asyncio.ensure_future(c) + else: # pragma: no cover self._lport = getattr(asyncio, 'async')(c) except Exception: raise error.CarrierError(';'.join(traceback.format_exception(*sys.exc_info()))) + return self def openServerMode(self, iface): @@ -104,36 +118,48 @@ class DgramAsyncioProtocol(asyncio.DatagramProtocol, AbstractAsyncioTransport): c = self.loop.create_datagram_endpoint( lambda: self, local_addr=iface, family=self.SOCK_FAMILY ) + # Avoid deprecation warning for asyncio.async() if IS_PYTHON_344_PLUS: self._lport = asyncio.ensure_future(c) + else: # pragma: no cover self._lport = getattr(asyncio, 'async')(c) + except Exception: raise error.CarrierError(';'.join(traceback.format_exception(*sys.exc_info()))) + return self def closeTransport(self): if self._lport is not None: self._lport.cancel() + if self.transport is not None: self.transport.close() + AbstractAsyncioTransport.closeTransport(self) def sendMessage(self, outgoingMessage, transportAddress): - debug.logger & debug.FLAG_IO and debug.logger('sendMessage: %s transportAddress %r outgoingMessage %s' % ( - (self.transport is None and "queuing" or "sending"), - transportAddress, debug.hexdump(outgoingMessage) - )) + debug.logger & debug.FLAG_IO and debug.logger( + 'sendMessage: %s transportAddress %r outgoingMessage ' + '%s' % (self.transport is None and "queuing" or "sending", + transportAddress, debug.hexdump(outgoingMessage))) + if self.transport is None: self._writeQ.append((outgoingMessage, transportAddress)) + else: try: - self.transport.sendto(outgoingMessage, self.normalizeAddress(transportAddress)) + self.transport.sendto( + outgoingMessage, self.normalizeAddress(transportAddress)) + except Exception: - raise error.CarrierError(';'.join(traceback.format_exception(*sys.exc_info()))) + raise error.CarrierError( + ';'.join(traceback.format_exception(*sys.exc_info()))) def normalizeAddress(self, transportAddress): if not isinstance(transportAddress, self.ADDRESS_TYPE): transportAddress = self.ADDRESS_TYPE(transportAddress) + return transportAddress diff --git a/pysnmp/carrier/asyncio/dispatch.py b/pysnmp/carrier/asyncio/dispatch.py index cb9f2020..2fd7d86e 100644 --- a/pysnmp/carrier/asyncio/dispatch.py +++ b/pysnmp/carrier/asyncio/dispatch.py @@ -34,24 +34,29 @@ import platform import sys import traceback -from pysnmp.carrier.base import AbstractTransportDispatcher -from pysnmp.error import PySnmpError - try: import asyncio + except ImportError: import trollius as asyncio +from pysnmp.carrier.base import AbstractTransportDispatcher +from pysnmp.error import PySnmpError + IS_PYTHON_344_PLUS = platform.python_version_tuple() >= ('3', '4', '4') + class AsyncioDispatcher(AbstractTransportDispatcher): """AsyncioDispatcher based on asyncio event loop""" def __init__(self, *args, **kwargs): AbstractTransportDispatcher.__init__(self) + self.__transportCount = 0 + if 'timeout' in kwargs: self.setTimerResolution(kwargs['timeout']) + self.loopingcall = None self.loop = kwargs.pop('loop', asyncio.get_event_loop()) @@ -65,25 +70,32 @@ class AsyncioDispatcher(AbstractTransportDispatcher): if not self.loop.is_running(): try: self.loop.run_forever() + except KeyboardInterrupt: raise + except Exception: - raise PySnmpError(';'.join(traceback.format_exception(*sys.exc_info()))) + raise PySnmpError( + ';'.join(traceback.format_exception(*sys.exc_info()))) def registerTransport(self, tDomain, transport): if self.loopingcall is None and self.getTimerResolution() > 0: # Avoid deprecation warning for asyncio.async() if IS_PYTHON_344_PLUS: - self.loopingcall = asyncio.ensure_future(self.handle_timeout()) - else: # pragma: no cover - self.loopingcall = getattr(asyncio, 'async')(self.handle_timeout()) + self.loopingcall = asyncio.ensure_future(self.handle_timeout()) + + else: + self.loopingcall = getattr(asyncio, 'async')(self.handle_timeout()) + AbstractTransportDispatcher.registerTransport( self, tDomain, transport ) + self.__transportCount += 1 def unregisterTransport(self, tDomain): t = AbstractTransportDispatcher.getTransport(self, tDomain) + if t is not None: AbstractTransportDispatcher.unregisterTransport(self, tDomain) self.__transportCount -= 1 diff --git a/pysnmp/carrier/asyncore/base.py b/pysnmp/carrier/asyncore/base.py index f7b1af9e..3fa13290 100644 --- a/pysnmp/carrier/asyncore/base.py +++ b/pysnmp/carrier/asyncore/base.py @@ -24,17 +24,21 @@ class AbstractSocketTransport(asyncore.dispatcher, AbstractTransport): # noinspection PyUnusedLocal def __init__(self, sock=None, sockMap=None): asyncore.dispatcher.__init__(self) + if sock is None: if self.SOCK_FAMILY is None: raise error.CarrierError( 'Address family %s not supported' % self.__class__.__name__ ) + if self.SOCK_TYPE is None: raise error.CarrierError( 'Socket type %s not supported' % self.__class__.__name__ ) + try: sock = socket.socket(self.SOCK_FAMILY, self.SOCK_TYPE) + except socket.error as exc: raise error.CarrierError('socket() failed: %s' % exc) @@ -43,9 +47,16 @@ class AbstractSocketTransport(asyncore.dispatcher, AbstractTransport): bsize = sock.getsockopt(socket.SOL_SOCKET, b) if bsize < self.BUFFER_SIZE: sock.setsockopt(socket.SOL_SOCKET, b, self.BUFFER_SIZE) - debug.logger & debug.FLAG_IO and debug.logger('%s: socket %d buffer size increased from %d to %d for buffer %d' % (self.__class__.__name__, sock.fileno(), bsize, self.BUFFER_SIZE, b)) + debug.logger & debug.FLAG_IO and debug.logger( + '%s: socket %d buffer size increased from %d to %d ' + 'for buffer %d' % ( + self.__class__.__name__, sock.fileno(), + bsize, self.BUFFER_SIZE, b)) + except Exception as exc: - debug.logger & debug.FLAG_IO and debug.logger('%s: socket buffer size option mangling failure for buffer: %s' % (self.__class__.__name__, exc)) + debug.logger & debug.FLAG_IO and debug.logger( + '%s: socket buffer size option mangling failure for buffer: ' + '%s' % (self.__class__.__name__, exc)) # The socket map is managed by the AsyncoreDispatcher on # which this transport is registered. Here we just prepare @@ -54,6 +65,7 @@ class AbstractSocketTransport(asyncore.dispatcher, AbstractTransport): sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setblocking(0) + self.set_socket(sock) def __hash__(self): @@ -61,16 +73,6 @@ class AbstractSocketTransport(asyncore.dispatcher, AbstractTransport): # The following two methods are part of base class so here we overwrite # them to separate socket management from dispatcher registration tasks. - # These two are just for dispatcher registration. - def add_channel(self, map=None): - if map is not None: - map[self._fileno] = self - self.connected = True - - def del_channel(self, map=None): - if map is not None and self._fileno in map: - del map[self._fileno] - self.connected = False def registerSocket(self, sockMap=None): self.add_channel(sockMap) @@ -84,6 +86,18 @@ class AbstractSocketTransport(asyncore.dispatcher, AbstractTransport): # asyncore API + # The first two methods are just for dispatcher registration + + def add_channel(self, map=None): + if map is not None: + map[self._fileno] = self + self.connected = True + + def del_channel(self, map=None): + if map is not None and self._fileno in map: + del map[self._fileno] + self.connected = False + def handle_close(self): raise error.CarrierError('Transport unexpectedly closed') diff --git a/pysnmp/carrier/asyncore/dgram/base.py b/pysnmp/carrier/asyncore/dgram/base.py index e456da20..06453096 100644 --- a/pysnmp/carrier/asyncore/dgram/base.py +++ b/pysnmp/carrier/asyncore/dgram/base.py @@ -47,16 +47,20 @@ class DgramSocketTransport(AbstractSocketTransport): if iface is not None: try: self.socket.bind(iface) + except socket.error as exc: - raise error.CarrierError( - 'bind() for %s failed: %s' % (iface is None and "<all local>" or iface, exc)) + raise error.CarrierError('bind() for %s failed: %s' % ( + iface is None and "<all local>" or iface, exc)) + return self def openServerMode(self, iface): try: self.socket.bind(iface) + except socket.error as exc: raise error.CarrierError('bind() for %s failed: %s' % (iface, exc)) + return self def enableBroadcast(self, flag=1): @@ -64,30 +68,47 @@ class DgramSocketTransport(AbstractSocketTransport): self.socket.setsockopt( socket.SOL_SOCKET, socket.SO_BROADCAST, flag ) + except socket.error as exc: - raise error.CarrierError('setsockopt() for SO_BROADCAST failed: %s' % exc) - debug.logger & debug.FLAG_IO and debug.logger('enableBroadcast: %s option SO_BROADCAST on socket %s' % (flag and "enabled" or "disabled", self.socket.fileno())) + raise error.CarrierError('setsockopt() for SO_BROADCAST ' + 'failed: %s' % exc) + + debug.logger & debug.FLAG_IO and debug.logger( + 'enableBroadcast: %s option SO_BROADCAST on ' + 'socket %s' % (flag and "enabled" or "disabled", self.socket.fileno())) + return self def enablePktInfo(self, flag=1): if (not hasattr(self.socket, 'sendmsg') or not hasattr(self.socket, 'recvmsg')): - raise error.CarrierError('sendmsg()/recvmsg() interface is not supported by this OS and/or Python version') + raise error.CarrierError( + 'sendmsg()/recvmsg() interface is not supported by this OS ' + 'and/or Python version') try: if self.socket.family == socket.AF_INET: self.socket.setsockopt(socket.SOL_IP, socket.IP_PKTINFO, flag) if self.socket.family == socket.AF_INET6: - self.socket.setsockopt(socket.SOL_IPV6, socket.IPV6_RECVPKTINFO, flag) + self.socket.setsockopt( + socket.SOL_IPV6, socket.IPV6_RECVPKTINFO, flag) except socket.error as exc: - raise error.CarrierError('setsockopt() for %s failed: %s' % (self.socket.family == socket.AF_INET6 and "IPV6_RECVPKTINFO" or "IP_PKTINFO", exc)) + raise error.CarrierError( + 'setsockopt() for %s failed: ' + '%s' % (self.socket.family == socket.AF_INET6 and + "IPV6_RECVPKTINFO" or "IP_PKTINFO", exc)) self._sendto = sockmsg.getSendTo(self.ADDRESS_TYPE) self._recvfrom = sockmsg.getRecvFrom(self.ADDRESS_TYPE) - debug.logger & debug.FLAG_IO and debug.logger('enablePktInfo: %s option %s on socket %s' % (self.socket.family == socket.AF_INET6 and "IPV6_RECVPKTINFO" or "IP_PKTINFO", flag and "enabled" or "disabled", self.socket.fileno())) + debug.logger & debug.FLAG_IO and debug.logger( + 'enablePktInfo: %s option %s on socket ' + '%s' % (self.socket.family == socket.AF_INET6 and "IPV6_RECVPKTINFO" + or "IP_PKTINFO", flag and "enabled" or "disabled", + self.socket.fileno())) + return self def enableTransparent(self, flag=1): @@ -96,25 +117,34 @@ class DgramSocketTransport(AbstractSocketTransport): self.socket.setsockopt( socket.SOL_IP, socket.IP_TRANSPARENT, flag ) + if self.socket.family == socket.AF_INET6: self.socket.setsockopt( socket.SOL_IPV6, socket.IPV6_TRANSPARENT, flag ) except socket.error as exc: - raise error.CarrierError('setsockopt() for IP_TRANSPARENT failed: %s' % exc) + raise error.CarrierError('setsockopt() for IP_TRANSPARENT ' + 'failed: %s' % exc) except OSError: - raise error.CarrierError('IP_TRANSPARENT socket option requires superuser priveleges') + raise error.CarrierError('IP_TRANSPARENT socket option requires ' + 'superuser privileges') + + debug.logger & debug.FLAG_IO and debug.logger( + 'enableTransparent: %s option IP_TRANSPARENT on socket ' + '%s' % (flag and "enabled" or "disabled", self.socket.fileno())) - debug.logger & debug.FLAG_IO and debug.logger('enableTransparent: %s option IP_TRANSPARENT on socket %s' % (flag and "enabled" or "disabled", self.socket.fileno())) return self def sendMessage(self, outgoingMessage, transportAddress): self.__outQueue.append( (outgoingMessage, self.normalizeAddress(transportAddress)) ) - debug.logger & debug.FLAG_IO and debug.logger('sendMessage: outgoingMessage queued (%d octets) %s' % (len(outgoingMessage), debug.hexdump(outgoingMessage))) + + debug.logger & debug.FLAG_IO and debug.logger( + 'sendMessage: outgoingMessage queued (%d octets) %s' % ( + len(outgoingMessage), debug.hexdump(outgoingMessage))) def normalizeAddress(self, transportAddress): if not isinstance(transportAddress, self.ADDRESS_TYPE): @@ -134,6 +164,7 @@ class DgramSocketTransport(AbstractSocketTransport): return '0.0.0.0', 0 # asyncore API + def handle_connect(self): pass @@ -142,40 +173,58 @@ class DgramSocketTransport(AbstractSocketTransport): def handle_write(self): outgoingMessage, transportAddress = self.__outQueue.pop(0) - debug.logger & debug.FLAG_IO and debug.logger('handle_write: transportAddress %r -> %r outgoingMessage (%d octets) %s' % (transportAddress.getLocalAddress(), transportAddress, len(outgoingMessage), debug.hexdump(outgoingMessage))) + + debug.logger & debug.FLAG_IO and debug.logger( + 'handle_write: transportAddress %r -> %r outgoingMessage (%d ' + 'octets) %s' % (transportAddress.getLocalAddress(), transportAddress, + len(outgoingMessage), debug.hexdump(outgoingMessage))) + if not transportAddress: - debug.logger & debug.FLAG_IO and debug.logger('handle_write: missing dst address, loosing outgoing msg') + debug.logger & debug.FLAG_IO and debug.logger( + 'handle_write: missing dst address, loosing outgoing msg') return + try: - self._sendto( - self.socket, outgoingMessage, transportAddress - ) + self._sendto(self.socket, outgoingMessage, transportAddress) + except socket.error as exc: if exc.args[0] in SOCK_ERRORS: - debug.logger & debug.FLAG_IO and debug.logger('handle_write: ignoring socket error %s' % exc) + debug.logger & debug.FLAG_IO and debug.logger( + 'handle_write: ignoring socket error %s' % exc) + else: - raise error.CarrierError('sendto() failed for %s: %s' % (transportAddress, exc)) + raise error.CarrierError( + 'sendto() failed for %s: %s' % (transportAddress, exc)) def readable(self): - return 1 + return True def handle_read(self): try: incomingMessage, transportAddress = self._recvfrom(self.socket, 65535) + transportAddress = self.normalizeAddress(transportAddress) + debug.logger & debug.FLAG_IO and debug.logger( - 'handle_read: transportAddress %r -> %r incomingMessage (%d octets) %s' % (transportAddress, transportAddress.getLocalAddress(), len(incomingMessage), debug.hexdump(incomingMessage))) + 'handle_read: transportAddress %r -> %r incomingMessage (%d ' + 'octets) %s' % (transportAddress, transportAddress.getLocalAddress(), + len(incomingMessage), debug.hexdump(incomingMessage))) + if not incomingMessage: self.handle_close() return + else: self._cbFun(self, transportAddress, incomingMessage) return + except socket.error as exc: if exc.args[0] in SOCK_ERRORS: - debug.logger & debug.FLAG_IO and debug.logger('handle_read: known socket error %s' % exc) + debug.logger & debug.FLAG_IO and debug.logger( + 'handle_read: known socket error %s' % exc) SOCK_ERRORS[exc.args[0]] and self.handle_close() return + else: raise error.CarrierError('recvfrom() failed: %s' % exc) diff --git a/pysnmp/carrier/asyncore/dgram/udp6.py b/pysnmp/carrier/asyncore/dgram/udp6.py index 75d348d6..3207c6ff 100644 --- a/pysnmp/carrier/asyncore/dgram/udp6.py +++ b/pysnmp/carrier/asyncore/dgram/udp6.py @@ -22,17 +22,18 @@ class Udp6SocketTransport(DgramSocketTransport): def normalizeAddress(self, transportAddress): if '%' in transportAddress[0]: # strip zone ID - ta = self.ADDRESS_TYPE((transportAddress[0].split('%')[0], - transportAddress[1], - 0, # flowinfo - 0)) # scopeid + ta = self.ADDRESS_TYPE( + (transportAddress[0].split('%')[0], transportAddress[1], 0, 0)) + else: - ta = self.ADDRESS_TYPE((transportAddress[0], - transportAddress[1], 0, 0)) + ta = self.ADDRESS_TYPE( + (transportAddress[0], transportAddress[1], 0, 0)) if (isinstance(transportAddress, self.ADDRESS_TYPE) and transportAddress.getLocalAddress()): + return ta.setLocalAddress(transportAddress.getLocalAddress()) + else: return ta.setLocalAddress(self.getLocalAddress()) diff --git a/pysnmp/carrier/asyncore/dgram/unix.py b/pysnmp/carrier/asyncore/dgram/unix.py index 0f68f962..b5233844 100644 --- a/pysnmp/carrier/asyncore/dgram/unix.py +++ b/pysnmp/carrier/asyncore/dgram/unix.py @@ -9,6 +9,7 @@ import random try: from socket import AF_UNIX + except ImportError: AF_UNIX = None @@ -33,14 +34,20 @@ class UnixSocketTransport(DgramSocketTransport): if iface is None: # UNIX domain sockets must be explicitly bound iface = '' + while len(iface) < 8: iface += chr(random.randrange(65, 91)) iface += chr(random.randrange(97, 123)) - iface = os.path.sep + 'tmp' + os.path.sep + 'pysnmp' + iface + + iface = os.path.join(os.path.sep, 'tmp', 'pysnmp', iface) + if os.path.exists(iface): os.remove(iface) + DgramSocketTransport.openClientMode(self, iface) + self._iface = iface + return self def openServerMode(self, iface): @@ -50,8 +57,10 @@ class UnixSocketTransport(DgramSocketTransport): def closeTransport(self): DgramSocketTransport.closeTransport(self) + try: os.remove(self._iface) + except OSError: pass diff --git a/pysnmp/carrier/asyncore/dispatch.py b/pysnmp/carrier/asyncore/dispatch.py index 897fdd0c..3f448471 100644 --- a/pysnmp/carrier/asyncore/dispatch.py +++ b/pysnmp/carrier/asyncore/dispatch.py @@ -16,7 +16,8 @@ from pysnmp.error import PySnmpError class AsyncoreDispatcher(AbstractTransportDispatcher): def __init__(self): - self.__sockMap = {} # use own map for MT safety + # use own map for MT safety + self.__sockMap = {} AbstractTransportDispatcher.__init__(self) def getSocketMap(self): @@ -25,13 +26,14 @@ class AsyncoreDispatcher(AbstractTransportDispatcher): def setSocketMap(self, sockMap=socket_map): self.__sockMap = sockMap - def registerTransport(self, tDomain, t): - AbstractTransportDispatcher.registerTransport(self, tDomain, t) - t.registerSocket(self.__sockMap) + def registerTransport(self, transportDomain, transport): + AbstractTransportDispatcher.registerTransport( + self, transportDomain, transport) + transport.registerSocket(self.__sockMap) - def unregisterTransport(self, tDomain): - self.getTransport(tDomain).unregisterSocket(self.__sockMap) - AbstractTransportDispatcher.unregisterTransport(self, tDomain) + def unregisterTransport(self, transportDomain): + self.getTransport(transportDomain).unregisterSocket(self.__sockMap) + AbstractTransportDispatcher.unregisterTransport(self, transportDomain) def transportsAreWorking(self): for transport in self.__sockMap.values(): @@ -43,9 +45,12 @@ class AsyncoreDispatcher(AbstractTransportDispatcher): try: loop(timeout or self.getTimerResolution(), use_poll=True, map=self.__sockMap, count=1) + except KeyboardInterrupt: raise except Exception: - raise PySnmpError('poll error: %s' % ';'.join(format_exception(*exc_info()))) + raise PySnmpError( + 'poll error: %s' % ';'.join(format_exception(*exc_info()))) + self.handleTimerTick(time()) diff --git a/pysnmp/carrier/base.py b/pysnmp/carrier/base.py index 12dfc86b..518f304b 100644 --- a/pysnmp/carrier/base.py +++ b/pysnmp/carrier/base.py @@ -53,6 +53,7 @@ class AbstractTransportDispatcher(object): def _cbFun(self, incomingTransport, transportAddress, incomingMessage): if incomingTransport in self.__transportDomainMap: transportDomain = self.__transportDomainMap[incomingTransport] + else: raise error.CarrierError( 'Unregistered transport %s' % (incomingTransport,) @@ -62,6 +63,7 @@ class AbstractTransportDispatcher(object): recvId = self.__routingCbFun( transportDomain, transportAddress, incomingMessage ) + else: recvId = None @@ -69,6 +71,7 @@ class AbstractTransportDispatcher(object): self.__recvCallables[recvId]( self, transportDomain, transportAddress, incomingMessage ) + else: raise error.CarrierError( 'No callback for "%r" found - loosing incoming event' % (recvId,) @@ -81,6 +84,7 @@ class AbstractTransportDispatcher(object): raise error.CarrierError( 'Data routing callback already registered' ) + self.__routingCbFun = routingCbFun def unregisterRoutingCbFun(self): @@ -92,6 +96,7 @@ class AbstractTransportDispatcher(object): raise error.CarrierError( 'Receive callback %r already registered' % (recvId is None and '<default>' or recvId,) ) + self.__recvCallables[recvId] = recvCb def unregisterRecvCbFun(self, recvId=None): @@ -101,38 +106,44 @@ class AbstractTransportDispatcher(object): def registerTimerCbFun(self, timerCbFun, tickInterval=None): if not tickInterval: tickInterval = self.__timerResolution + self.__timerCallables.append(TimerCallable(timerCbFun, tickInterval)) def unregisterTimerCbFun(self, timerCbFun=None): if timerCbFun: self.__timerCallables.remove(timerCbFun) + else: self.__timerCallables = [] - def registerTransport(self, tDomain, transport): - if tDomain in self.__transports: + def registerTransport(self, transportDomain, transport): + if transportDomain in self.__transports: raise error.CarrierError( - 'Transport %s already registered' % (tDomain,) + 'Transport %s already registered' % (transportDomain,) ) + transport.registerCbFun(self._cbFun) - self.__transports[tDomain] = transport - self.__transportDomainMap[transport] = tDomain - def unregisterTransport(self, tDomain): - if tDomain not in self.__transports: + self.__transports[transportDomain] = transport + self.__transportDomainMap[transport] = transportDomain + + def unregisterTransport(self, transportDomain): + if transportDomain not in self.__transports: raise error.CarrierError( - 'Transport %s not registered' % (tDomain,) + 'Transport %s not registered' % (transportDomain,) ) - self.__transports[tDomain].unregisterCbFun() - del self.__transportDomainMap[self.__transports[tDomain]] - del self.__transports[tDomain] + + self.__transports[transportDomain].unregisterCbFun() + + del self.__transportDomainMap[self.__transports[transportDomain]] + del self.__transports[transportDomain] def getTransport(self, transportDomain): if transportDomain in self.__transports: return self.__transports[transportDomain] + raise error.CarrierError( - 'Transport %s not registered' % (transportDomain,) - ) + 'Transport %s not registered' % (transportDomain,)) def sendMessage(self, outgoingMessage, transportDomain, transportAddress): @@ -140,10 +151,10 @@ class AbstractTransportDispatcher(object): self.__transports[transportDomain].sendMessage( outgoingMessage, transportAddress ) + else: - raise error.CarrierError( - 'No suitable transport domain for %s' % (transportDomain,) - ) + raise error.CarrierError('No suitable transport domain for ' + '%s' % (transportDomain,)) def getTimerResolution(self): return self.__timerResolution @@ -151,6 +162,7 @@ class AbstractTransportDispatcher(object): def setTimerResolution(self, timerResolution): if timerResolution < 0.01 or timerResolution > 10: raise error.CarrierError('Impossible timer resolution') + self.__timerResolution = timerResolution self.__timerDelta = timerResolution * 0.05 @@ -173,6 +185,7 @@ class AbstractTransportDispatcher(object): def jobStarted(self, jobId, count=1): if jobId in self.__jobs: self.__jobs[jobId] += count + else: self.__jobs[jobId] = count @@ -191,6 +204,7 @@ class AbstractTransportDispatcher(object): for tDomain in list(self.__transports): self.__transports[tDomain].closeTransport() self.unregisterTransport(tDomain) + self.__transports.clear() self.unregisterRecvCbFun() self.unregisterTimerCbFun() @@ -207,7 +221,8 @@ class AbstractTransportAddress(object): return self._localAddress def clone(self, localAddress=None): - return self.__class__(self).setLocalAddress(localAddress is None and self.getLocalAddress() or localAddress) + return self.__class__(self).setLocalAddress( + localAddress is None and self.getLocalAddress() or localAddress) class AbstractTransport(object): @@ -221,9 +236,8 @@ class AbstractTransport(object): def registerCbFun(self, cbFun): if self._cbFun: - raise error.CarrierError( - 'Callback function %s already registered at %s' % (self._cbFun, self) - ) + raise error.CarrierError('Callback function %s already registered ' + 'at %s' % (self._cbFun, self)) self._cbFun = cbFun def unregisterCbFun(self): diff --git a/pysnmp/carrier/sockmsg.py b/pysnmp/carrier/sockmsg.py index e36bc85a..6249b79d 100644 --- a/pysnmp/carrier/sockmsg.py +++ b/pysnmp/carrier/sockmsg.py @@ -21,11 +21,13 @@ from pysnmp import debug if sys.version_info[:2] < (3, 3): def getRecvFrom(addressType): - raise error.CarrierError('sendmsg()/recvmsg() interface is not supported by this OS and/or Python version') + raise error.CarrierError('sendmsg()/recvmsg() interface is not ' + 'supported by this OS and/or Python version') def getSendTo(addressType): - raise error.CarrierError('sendmsg()/recvmsg() interface is not supported by this OS and/or Python version') + raise error.CarrierError('sendmsg()/recvmsg() interface is not ' + 'supported by this OS and/or Python version') else: import ctypes @@ -82,13 +84,13 @@ else: if anc[0] == socket.SOL_IP and anc[1] == socket.IP_PKTINFO: addr = in_pktinfo.from_buffer_copy(anc[2]) addr = ipaddress.IPv4Address(memoryview(addr.ipi_addr).tobytes()) - _to = (str(addr), s.getsockname()[1]) + _to = str(addr), s.getsockname()[1] break elif anc[0] == socket.SOL_IPV6 and anc[1] == socket.IPV6_PKTINFO: addr = in6_pktinfo.from_buffer_copy(anc[2]) addr = ipaddress.ip_address(memoryview(addr.ipi6_addr).tobytes()) - _to = (str(addr), s.getsockname()[1]) + _to = str(addr), s.getsockname()[1] break debug.logger & debug.FLAG_IO and debug.logger( diff --git a/pysnmp/carrier/twisted/dgram/base.py b/pysnmp/carrier/twisted/dgram/base.py index c752e580..eddd5a76 100644 --- a/pysnmp/carrier/twisted/dgram/base.py +++ b/pysnmp/carrier/twisted/dgram/base.py @@ -18,8 +18,9 @@ class DgramTwistedTransport(DatagramProtocol, AbstractTwistedTransport): # Twisted Datagram API def datagramReceived(self, datagram, transportAddress): - if self._cbFun is None: + if not self._cbFun: raise error.CarrierError('Unable to call cbFun') + else: # Callback fun is called through callLater() in attempt # to make Twisted timed calls work under high load. @@ -27,11 +28,16 @@ class DgramTwistedTransport(DatagramProtocol, AbstractTwistedTransport): def startProtocol(self): debug.logger & debug.FLAG_IO and debug.logger('startProtocol: invoked') + while self._writeQ: outgoingMessage, transportAddress = self._writeQ.pop(0) - debug.logger & debug.FLAG_IO and debug.logger('startProtocol: transportAddress %r outgoingMessage %s' % (transportAddress, debug.hexdump(outgoingMessage))) + debug.logger & debug.FLAG_IO and debug.logger( + 'startProtocol: transportAddress %r outgoingMessage ' + '%s' % (transportAddress, debug.hexdump(outgoingMessage))) + try: self.transport.write(outgoingMessage, transportAddress) + except Exception as exc: raise error.CarrierError('Twisted exception: %s' % exc) @@ -39,11 +45,17 @@ class DgramTwistedTransport(DatagramProtocol, AbstractTwistedTransport): debug.logger & debug.FLAG_IO and debug.logger('stopProtocol: invoked') def sendMessage(self, outgoingMessage, transportAddress): - debug.logger & debug.FLAG_IO and debug.logger('startProtocol: %s transportAddress %r outgoingMessage %s' % ((self.transport is None and "queuing" or "sending"), transportAddress, debug.hexdump(outgoingMessage))) - if self.transport is None: + debug.logger & debug.FLAG_IO and debug.logger( + 'startProtocol: %s transportAddress %r outgoingMessage ' + '%s' % ((self.transport is None and "queuing" or "sending"), + transportAddress, debug.hexdump(outgoingMessage))) + + if not self.transport: self._writeQ.append((outgoingMessage, transportAddress)) + else: try: self.transport.write(outgoingMessage, transportAddress) + except Exception as exc: raise error.CarrierError('Twisted exception: %s' % exc) diff --git a/pysnmp/carrier/twisted/dgram/udp.py b/pysnmp/carrier/twisted/dgram/udp.py index 10cf946f..2057c97d 100644 --- a/pysnmp/carrier/twisted/dgram/udp.py +++ b/pysnmp/carrier/twisted/dgram/udp.py @@ -26,24 +26,30 @@ class UdpTwistedTransport(DgramTwistedTransport): def openClientMode(self, iface=None): if iface is None: iface = ('', 0) + try: self._lport = reactor.listenUDP(iface[1], self, iface[0]) + except Exception as exc: raise error.CarrierError(exc) + return self def openServerMode(self, iface): try: self._lport = reactor.listenUDP(iface[1], self, iface[0]) + except Exception as exc: raise error.CarrierError(exc) + return self def closeTransport(self): if self._lport is not None: - d = self._lport.stopListening() - if d: - d.addCallback(lambda x: None) + deferred = self._lport.stopListening() + if deferred: + deferred.addCallback(lambda x: None) + DgramTwistedTransport.closeTransport(self) diff --git a/pysnmp/carrier/twisted/dgram/unix.py b/pysnmp/carrier/twisted/dgram/unix.py index 6ce3cde2..2d5aa195 100644 --- a/pysnmp/carrier/twisted/dgram/unix.py +++ b/pysnmp/carrier/twisted/dgram/unix.py @@ -26,13 +26,16 @@ class UnixTwistedTransport(DgramTwistedTransport): def openClientMode(self, iface=''): try: self._lport = reactor.connectUNIXDatagram(iface, self) + except Exception as exc: raise error.CarrierError(exc) + return self def openServerMode(self, iface): try: self._lport = reactor.listenUNIXDatagram(iface, self) + except Exception as exc: raise error.CarrierError(exc) @@ -40,9 +43,10 @@ class UnixTwistedTransport(DgramTwistedTransport): def closeTransport(self): if self._lport is not None: - d = self._lport.stopListening() - if d: - d.addCallback(lambda x: None) + deferred = self._lport.stopListening() + if deferred: + deferred.addCallback(lambda x: None) + DgramTwistedTransport.closeTransport(self) diff --git a/pysnmp/carrier/twisted/dispatch.py b/pysnmp/carrier/twisted/dispatch.py index 77f5c168..ace0f6a4 100644 --- a/pysnmp/carrier/twisted/dispatch.py +++ b/pysnmp/carrier/twisted/dispatch.py @@ -28,9 +28,12 @@ class TwistedDispatcher(AbstractTransportDispatcher): def __init__(self, *args, **kwargs): AbstractTransportDispatcher.__init__(self) + self.__transportCount = 0 + if 'timeout' in kwargs: self.setTimerResolution(kwargs['timeout']) + self.loopingcall = task.LoopingCall( lambda self=self: self.handleTimerTick(time.time()) ) @@ -44,22 +47,27 @@ class TwistedDispatcher(AbstractTransportDispatcher): raise except Exception: - raise PySnmpError('reactor error: %s' % ';'.join(traceback.format_exception(*sys.exc_info()))) + raise PySnmpError('reactor error: %s' % ';'.join( + traceback.format_exception(*sys.exc_info()))) # jobstarted/jobfinished might be okay as-is - def registerTransport(self, tDomain, transport): + def registerTransport(self, transportDomain, transport): if not self.loopingcall.running and self.getTimerResolution() > 0: self.loopingcall.start(self.getTimerResolution(), now=False) + AbstractTransportDispatcher.registerTransport( - self, tDomain, transport + self, transportDomain, transport ) + self.__transportCount += 1 - def unregisterTransport(self, tDomain): - t = AbstractTransportDispatcher.getTransport(self, tDomain) - if t is not None: - AbstractTransportDispatcher.unregisterTransport(self, tDomain) + def unregisterTransport(self, transportDomain): + transport = AbstractTransportDispatcher.getTransport( + self, transportDomain) + if transport: + AbstractTransportDispatcher.unregisterTransport( + self, transportDomain) self.__transportCount -= 1 # The last transport has been removed, stop the timeout |