diff options
author | elie <elie> | 2013-09-24 14:01:45 +0000 |
---|---|---|
committer | elie <elie> | 2013-09-24 14:01:45 +0000 |
commit | b1bec1a424016203fca876322fd2a9e4d0c54d4c (patch) | |
tree | d13d567d998b39d80a3c635873fa8473fc7d7d3d /pysnmp | |
parent | 33e16b87e6236eadd6752490e1351678bdee7293 (diff) | |
download | pysnmp-b1bec1a424016203fca876322fd2a9e4d0c54d4c.tar.gz |
sometimes missing inet_ntop()/inet_pton() reimplemented in pure struct
Diffstat (limited to 'pysnmp')
-rw-r--r-- | pysnmp/smi/mibs/TRANSPORT-ADDRESS-MIB.py | 172 |
1 files changed, 105 insertions, 67 deletions
diff --git a/pysnmp/smi/mibs/TRANSPORT-ADDRESS-MIB.py b/pysnmp/smi/mibs/TRANSPORT-ADDRESS-MIB.py index 6f4e6bb..9e96625 100644 --- a/pysnmp/smi/mibs/TRANSPORT-ADDRESS-MIB.py +++ b/pysnmp/smi/mibs/TRANSPORT-ADDRESS-MIB.py @@ -2,79 +2,117 @@ # by libsmi2pysnmp-0.1.3 at Tue Apr 3 16:58:37 2012, # Python version sys.version_info(major=2, minor=7, micro=2, releaselevel='final', serial=0) -from pysnmp.error import PySnmpError -from socket import AF_INET, error, has_ipv6 +from pyasn1.compat.octets import int2oct, oct2int +from pysnmp import error +import socket -try: - from socket import AF_INET6 -except ImportError: - has_ipv6 = False +has_ipv6 = socket.has_ipv6 and hasattr(socket, 'AF_INET6') -try: - from socket import inet_ntop, inet_pton -except ImportError: +if hasattr(socket, 'inet_ntop') and hasattr(socket, 'inet_pton'): + inet_ntop = socket.inet_ntop + inet_pton = socket.inet_pton +else: import sys + if sys.platform != "win32": from socket import inet_ntoa, inet_aton inet_ntop = lambda x,y: inet_ntoa(y) inet_pton = lambda x,y: inet_aton(y) has_ipv6 = False elif has_ipv6: - try: - import ctypes - except ImportError: - raise PySnmpError('Need ctypes module to handle IPv6 addresses') - - class sockaddr(ctypes.Structure): - _fields_ = [("sa_family", ctypes.c_short), - ("__pad1", ctypes.c_ushort), - ("ipv4_addr", ctypes.c_byte * 4), - ("ipv6_addr", ctypes.c_byte * 16), - ("__pad2", ctypes.c_ulong)] - - WSAStringToAddressA = ctypes.windll.ws2_32.WSAStringToAddressA - WSAAddressToStringA = ctypes.windll.ws2_32.WSAAddressToStringA - + import struct # The case of old Python at old Windows + def inet_pton(address_family, ip_string): - addr = sockaddr() - addr.sa_family = address_family - addr_size = ctypes.c_int(ctypes.sizeof(addr)) - - if WSAStringToAddressA(ip_string, address_family, None, ctypes.byref(addr), ctypes.byref(addr_size)) != 0: - raise error(ctypes.FormatError()) - - if address_family == AF_INET: - return ctypes.string_at(addr.ipv4_addr, 4) - if address_family == AF_INET6: - return ctypes.string_at(addr.ipv6_addr, 16) - - raise error('unknown address family') - + if address_family == socket.AF_INET: + return inet_aton(ip_string) + elif address_family != socket.AF_INET6: + raise socket.error( + 'Unknown address family %s' % (address_family,) + ) + + groups = ip_string.split(":") + spaces = groups.count('') + + if '.' in groups[-1]: + groups[-1:] = [ "%x" % (x) for x in struct.unpack("!HH", inet_aton(groups[-1])) ] + + if spaces == 1: + idx = groups.index('') + groups[idx:idx+1] = ['0'] * (8 - len(groups) + 1) + elif spaces == 2: + zeros = ['0'] * (8 - len(groups) + 2) + if ip_string.startswith('::'): + groups[:2] = zeros + elif ip_string.endswith('::'): + groups[-2:] = zeros + else: + raise socket.error( + 'Invalid IPv6 address: "%s"' % (ip_string,) + ) + elif spaces == 3: + if ip_string != '::': + raise socket.error( + 'Invalid IPv6 address: "%s"' % (ip_string,) + ) + return '\x00' * 16; + elif spaces > 3: + raise socket.error( + 'Invalid IPv6 address: "%s"' % (ip_string,) + ) + + groups = [ t for t in [ int(t, 16) for t in groups ] if t & 0xFFFF == t ] + + if len(groups) != 8: + raise socket.error( + 'Invalid IPv6 address: "%s"' % (ip_string,) + ) + + return struct.pack('!8H', *groups) + def inet_ntop(address_family, packed_ip): - addr = sockaddr() - addr.sa_family = address_family - addr_size = ctypes.c_int(ctypes.sizeof(addr)) - ip_string = ctypes.create_string_buffer(128) - ip_string_size = ctypes.c_int(ctypes.sizeof(addr)) - - if address_family == AF_INET: - if len(packed_ip) != ctypes.sizeof(addr.ipv4_addr): - raise error('packed IP wrong length for inet_ntoa') - ctypes.memmove(addr.ipv4_addr, packed_ip, 4) - elif address_family == AF_INET6: - if len(packed_ip) != ctypes.sizeof(addr.ipv6_addr): - raise error('packed IP wrong length for inet_ntoa') - ctypes.memmove(addr.ipv6_addr, packed_ip, 16) - else: - raise error('unknown address family') - - if WSAAddressToStringA(ctypes.byref(addr), addr_size, None, ip_string, ctypes.byref(ip_string_size)) != 0: - raise error(ctypes.FormatError()) - - return ip_string[:ip_string_size.value-1] + if address_family == socket.AF_INET: + return inet_ntop(packed_ip) + elif address_family != socket.AF_INET6: + raise socket.error( + 'Unknown address family %s' % (address_family,) + ) -from pyasn1.compat.octets import int2oct, oct2int -from pysnmp import error + if len(packed_ip) != 16: + raise socket.error( + 'incorrect address length: %s' % len(packed_ip) + ) + + groups = list(struct.unpack('!8H', packed_ip)) + + cur_base = best_base = cur_len = best_len = -1 + + for idx in range(8): + if groups[idx]: + if cur_base != -1: + if best_base == -1 or cur_len > best_len: + best_base, best_len = cur_base, cur_len + cur_base = -1 + else: + if cur_base == -1: + cur_base, cur_len = idx, 1 + else: + cur_len += 1 + + if cur_base != -1: + if best_base == -1 or cur_len > best_len: + best_base, best_len = cur_base, cur_len + + if best_base != -1 and best_len > 1: + groups[best_base:best_base + best_len] = [':'] + + if groups[0] == ':': + groups.insert(0, ':') + if groups[-1] == ':': + groups.append(':') + + f = lambda x: x != ':' and '%x' % x or '' + + return ':'.join([f(x) for x in groups]) # Imports @@ -101,7 +139,7 @@ class TransportAddressIPv4(TextualConvention, OctetString): def prettyIn(self, value): if isinstance(value, tuple): # Wild hack -- need to implement TextualConvention.prettyIn - value = inet_pton(AF_INET, value[0]) + \ + value = inet_pton(socket.AF_INET, value[0]) + \ int2oct((value[1] >> 8) & 0xff) + \ int2oct(value[1] & 0xff) return OctetString.prettyIn(self, value) @@ -111,7 +149,7 @@ class TransportAddressIPv4(TextualConvention, OctetString): if not hasattr(self, '__tuple_value'): v = self.asOctets() self.__tuple_value = ( - inet_ntop(AF_INET, v[:4]), + inet_ntop(socket.AF_INET, v[:4]), oct2int(v[4]) << 8 | oct2int(v[5]), ) return self.__tuple_value[i] @@ -130,7 +168,7 @@ class TransportAddressIPv6(TextualConvention, OctetString): if not has_ipv6: raise error.PySnmpError('IPv6 not supported by platform') if isinstance(value, tuple): - value = inet_pton(AF_INET6, value[0]) + \ + value = inet_pton(socket.AF_INET6, value[0]) + \ int2oct((value[1] >> 8) & 0xff) + \ int2oct(value[1] & 0xff) return OctetString.prettyIn(self, value) @@ -142,10 +180,10 @@ class TransportAddressIPv6(TextualConvention, OctetString): raise error.PySnmpError('IPv6 not supported by platform') v = self.asOctets() self.__tuple_value = ( - inet_ntop(AF_INET6, v[:16]), + inet_ntop(socket.AF_INET6, v[:16]), oct2int(v[16]) << 8 | oct2int(v[17]), - 0, - 0) + 0, # flowinfo + 0) # scopeid return self.__tuple_value[i] class TransportAddressIPv6z(TextualConvention, OctetString): |