summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGES2
-rw-r--r--pysnmp/smi/mibs/TRANSPORT-ADDRESS-MIB.py172
-rw-r--r--setup.py11
3 files changed, 107 insertions, 78 deletions
diff --git a/CHANGES b/CHANGES
index 7123930..2f2097a 100644
--- a/CHANGES
+++ b/CHANGES
@@ -106,6 +106,8 @@ Revision 4.2.5rc2
supporting UTF-8 initializers.
- Fix to v1/v2c message processing module which used to refer to a
bogus stateReference in some cases what causes SNMP engine crashes.
+- Fix to IPv6 transport to zero ZoneID, FlowID and ScopeID components
+ sometimes coming along with incoming packet.
- Asyncsock sockets now configured with SO_REUSEADDR option to fix possible
Windows error 10048.
- Gracefully handle malformed SnmpEngineID at USM coming from SNMPv3 header.
diff --git a/pysnmp/smi/mibs/TRANSPORT-ADDRESS-MIB.py b/pysnmp/smi/mibs/TRANSPORT-ADDRESS-MIB.py
index 6f4e6bb..9e96625 100644
--- a/pysnmp/smi/mibs/TRANSPORT-ADDRESS-MIB.py
+++ b/pysnmp/smi/mibs/TRANSPORT-ADDRESS-MIB.py
@@ -2,79 +2,117 @@
# by libsmi2pysnmp-0.1.3 at Tue Apr 3 16:58:37 2012,
# Python version sys.version_info(major=2, minor=7, micro=2, releaselevel='final', serial=0)
-from pysnmp.error import PySnmpError
-from socket import AF_INET, error, has_ipv6
+from pyasn1.compat.octets import int2oct, oct2int
+from pysnmp import error
+import socket
-try:
- from socket import AF_INET6
-except ImportError:
- has_ipv6 = False
+has_ipv6 = socket.has_ipv6 and hasattr(socket, 'AF_INET6')
-try:
- from socket import inet_ntop, inet_pton
-except ImportError:
+if hasattr(socket, 'inet_ntop') and hasattr(socket, 'inet_pton'):
+ inet_ntop = socket.inet_ntop
+ inet_pton = socket.inet_pton
+else:
import sys
+
if sys.platform != "win32":
from socket import inet_ntoa, inet_aton
inet_ntop = lambda x,y: inet_ntoa(y)
inet_pton = lambda x,y: inet_aton(y)
has_ipv6 = False
elif has_ipv6:
- try:
- import ctypes
- except ImportError:
- raise PySnmpError('Need ctypes module to handle IPv6 addresses')
-
- class sockaddr(ctypes.Structure):
- _fields_ = [("sa_family", ctypes.c_short),
- ("__pad1", ctypes.c_ushort),
- ("ipv4_addr", ctypes.c_byte * 4),
- ("ipv6_addr", ctypes.c_byte * 16),
- ("__pad2", ctypes.c_ulong)]
-
- WSAStringToAddressA = ctypes.windll.ws2_32.WSAStringToAddressA
- WSAAddressToStringA = ctypes.windll.ws2_32.WSAAddressToStringA
-
+ import struct # The case of old Python at old Windows
+
def inet_pton(address_family, ip_string):
- addr = sockaddr()
- addr.sa_family = address_family
- addr_size = ctypes.c_int(ctypes.sizeof(addr))
-
- if WSAStringToAddressA(ip_string, address_family, None, ctypes.byref(addr), ctypes.byref(addr_size)) != 0:
- raise error(ctypes.FormatError())
-
- if address_family == AF_INET:
- return ctypes.string_at(addr.ipv4_addr, 4)
- if address_family == AF_INET6:
- return ctypes.string_at(addr.ipv6_addr, 16)
-
- raise error('unknown address family')
-
+ if address_family == socket.AF_INET:
+ return inet_aton(ip_string)
+ elif address_family != socket.AF_INET6:
+ raise socket.error(
+ 'Unknown address family %s' % (address_family,)
+ )
+
+ groups = ip_string.split(":")
+ spaces = groups.count('')
+
+ if '.' in groups[-1]:
+ groups[-1:] = [ "%x" % (x) for x in struct.unpack("!HH", inet_aton(groups[-1])) ]
+
+ if spaces == 1:
+ idx = groups.index('')
+ groups[idx:idx+1] = ['0'] * (8 - len(groups) + 1)
+ elif spaces == 2:
+ zeros = ['0'] * (8 - len(groups) + 2)
+ if ip_string.startswith('::'):
+ groups[:2] = zeros
+ elif ip_string.endswith('::'):
+ groups[-2:] = zeros
+ else:
+ raise socket.error(
+ 'Invalid IPv6 address: "%s"' % (ip_string,)
+ )
+ elif spaces == 3:
+ if ip_string != '::':
+ raise socket.error(
+ 'Invalid IPv6 address: "%s"' % (ip_string,)
+ )
+ return '\x00' * 16;
+ elif spaces > 3:
+ raise socket.error(
+ 'Invalid IPv6 address: "%s"' % (ip_string,)
+ )
+
+ groups = [ t for t in [ int(t, 16) for t in groups ] if t & 0xFFFF == t ]
+
+ if len(groups) != 8:
+ raise socket.error(
+ 'Invalid IPv6 address: "%s"' % (ip_string,)
+ )
+
+ return struct.pack('!8H', *groups)
+
def inet_ntop(address_family, packed_ip):
- addr = sockaddr()
- addr.sa_family = address_family
- addr_size = ctypes.c_int(ctypes.sizeof(addr))
- ip_string = ctypes.create_string_buffer(128)
- ip_string_size = ctypes.c_int(ctypes.sizeof(addr))
-
- if address_family == AF_INET:
- if len(packed_ip) != ctypes.sizeof(addr.ipv4_addr):
- raise error('packed IP wrong length for inet_ntoa')
- ctypes.memmove(addr.ipv4_addr, packed_ip, 4)
- elif address_family == AF_INET6:
- if len(packed_ip) != ctypes.sizeof(addr.ipv6_addr):
- raise error('packed IP wrong length for inet_ntoa')
- ctypes.memmove(addr.ipv6_addr, packed_ip, 16)
- else:
- raise error('unknown address family')
-
- if WSAAddressToStringA(ctypes.byref(addr), addr_size, None, ip_string, ctypes.byref(ip_string_size)) != 0:
- raise error(ctypes.FormatError())
-
- return ip_string[:ip_string_size.value-1]
+ if address_family == socket.AF_INET:
+ return inet_ntop(packed_ip)
+ elif address_family != socket.AF_INET6:
+ raise socket.error(
+ 'Unknown address family %s' % (address_family,)
+ )
-from pyasn1.compat.octets import int2oct, oct2int
-from pysnmp import error
+ if len(packed_ip) != 16:
+ raise socket.error(
+ 'incorrect address length: %s' % len(packed_ip)
+ )
+
+ groups = list(struct.unpack('!8H', packed_ip))
+
+ cur_base = best_base = cur_len = best_len = -1
+
+ for idx in range(8):
+ if groups[idx]:
+ if cur_base != -1:
+ if best_base == -1 or cur_len > best_len:
+ best_base, best_len = cur_base, cur_len
+ cur_base = -1
+ else:
+ if cur_base == -1:
+ cur_base, cur_len = idx, 1
+ else:
+ cur_len += 1
+
+ if cur_base != -1:
+ if best_base == -1 or cur_len > best_len:
+ best_base, best_len = cur_base, cur_len
+
+ if best_base != -1 and best_len > 1:
+ groups[best_base:best_base + best_len] = [':']
+
+ if groups[0] == ':':
+ groups.insert(0, ':')
+ if groups[-1] == ':':
+ groups.append(':')
+
+ f = lambda x: x != ':' and '%x' % x or ''
+
+ return ':'.join([f(x) for x in groups])
# Imports
@@ -101,7 +139,7 @@ class TransportAddressIPv4(TextualConvention, OctetString):
def prettyIn(self, value):
if isinstance(value, tuple):
# Wild hack -- need to implement TextualConvention.prettyIn
- value = inet_pton(AF_INET, value[0]) + \
+ value = inet_pton(socket.AF_INET, value[0]) + \
int2oct((value[1] >> 8) & 0xff) + \
int2oct(value[1] & 0xff)
return OctetString.prettyIn(self, value)
@@ -111,7 +149,7 @@ class TransportAddressIPv4(TextualConvention, OctetString):
if not hasattr(self, '__tuple_value'):
v = self.asOctets()
self.__tuple_value = (
- inet_ntop(AF_INET, v[:4]),
+ inet_ntop(socket.AF_INET, v[:4]),
oct2int(v[4]) << 8 | oct2int(v[5]),
)
return self.__tuple_value[i]
@@ -130,7 +168,7 @@ class TransportAddressIPv6(TextualConvention, OctetString):
if not has_ipv6:
raise error.PySnmpError('IPv6 not supported by platform')
if isinstance(value, tuple):
- value = inet_pton(AF_INET6, value[0]) + \
+ value = inet_pton(socket.AF_INET6, value[0]) + \
int2oct((value[1] >> 8) & 0xff) + \
int2oct(value[1] & 0xff)
return OctetString.prettyIn(self, value)
@@ -142,10 +180,10 @@ class TransportAddressIPv6(TextualConvention, OctetString):
raise error.PySnmpError('IPv6 not supported by platform')
v = self.asOctets()
self.__tuple_value = (
- inet_ntop(AF_INET6, v[:16]),
+ inet_ntop(socket.AF_INET6, v[:16]),
oct2int(v[16]) << 8 | oct2int(v[17]),
- 0,
- 0)
+ 0, # flowinfo
+ 0) # scopeid
return self.__tuple_value[i]
class TransportAddressIPv6z(TextualConvention, OctetString):
diff --git a/setup.py b/setup.py
index 82bae33..a6a1081 100644
--- a/setup.py
+++ b/setup.py
@@ -52,8 +52,6 @@ try:
}
if sys.platform.lower()[:3] != 'win':
params['install_requires'].append('pycrypto>=2.4.1')
- elif sys.version_info[:2] < (2, 5):
- params['install_requires'].append('ctypes')
except ImportError:
for arg in sys.argv:
@@ -66,15 +64,6 @@ except ImportError:
params['requires'] = [ 'pyasn1(>=0.1.2)' ]
if sys.platform.lower()[:3] != 'win':
params['requires'].append('pycrypto(>=2.4.1)')
- elif sys.platform.lower()[:3] == 'win':
- try:
- import ctypes
- except ImportError:
- sys.stderr.write("""WARNING! WARNING! WARNING!
-Handling IPv6 addresses requires the ctypes module. Please install it on
-your system if you need PySNMP supporting IPv6 addressing:
-http://downloads.sourceforge.net/project/ctypes/ctypes/1.0.2/ctypes-1.0.2.tar.gz
-""")
if sys.platform.lower()[:3] == 'win':
try: