summaryrefslogtreecommitdiff
path: root/paste/util/ip4.py
diff options
context:
space:
mode:
Diffstat (limited to 'paste/util/ip4.py')
-rw-r--r--paste/util/ip4.py274
1 files changed, 274 insertions, 0 deletions
diff --git a/paste/util/ip4.py b/paste/util/ip4.py
new file mode 100644
index 0000000..9ce17b8
--- /dev/null
+++ b/paste/util/ip4.py
@@ -0,0 +1,274 @@
+# -*- coding: iso-8859-15 -*-
+"""IP4 address range set implementation.
+
+Implements an IPv4-range type.
+
+Copyright (C) 2006, Heiko Wundram.
+Released under the MIT-license.
+"""
+
+# Version information
+# -------------------
+
+__author__ = "Heiko Wundram <me@modelnine.org>"
+__version__ = "0.2"
+__revision__ = "3"
+__date__ = "2006-01-20"
+
+
+# Imports
+# -------
+
+from paste.util import intset
+import socket
+import six
+
+
+# IP4Range class
+# --------------
+
+class IP4Range(intset.IntSet):
+ """IP4 address range class with efficient storage of address ranges.
+ Supports all set operations."""
+
+ _MINIP4 = 0
+ _MAXIP4 = (1<<32) - 1
+ _UNITYTRANS = "".join([chr(n) for n in range(256)])
+ _IPREMOVE = "0123456789."
+
+ def __init__(self,*args):
+ """Initialize an ip4range class. The constructor accepts an unlimited
+ number of arguments that may either be tuples in the form (start,stop),
+ integers, longs or strings, where start and stop in a tuple may
+ also be of the form integer, long or string.
+
+ Passing an integer or long means passing an IPv4-address that's already
+ been converted to integer notation, whereas passing a string specifies
+ an address where this conversion still has to be done. A string
+ address may be in the following formats:
+
+ - 1.2.3.4 - a plain address, interpreted as a single address
+ - 1.2.3 - a set of addresses, interpreted as 1.2.3.0-1.2.3.255
+ - localhost - hostname to look up, interpreted as single address
+ - 1.2.3<->5 - a set of addresses, interpreted as 1.2.3.0-1.2.5.255
+ - 1.2.0.0/16 - a set of addresses, interpreted as 1.2.0.0-1.2.255.255
+
+ Only the first three notations are valid if you use a string address in
+ a tuple, whereby notation 2 is interpreted as 1.2.3.0 if specified as
+ lower bound and 1.2.3.255 if specified as upper bound, not as a range
+ of addresses.
+
+ Specifying a range is done with the <-> operator. This is necessary
+ because '-' might be present in a hostname. '<->' shouldn't be, ever.
+ """
+
+ # Special case copy constructor.
+ if len(args) == 1 and isinstance(args[0],IP4Range):
+ super(IP4Range,self).__init__(args[0])
+ return
+
+ # Convert arguments to tuple syntax.
+ args = list(args)
+ for i in range(len(args)):
+ argval = args[i]
+ if isinstance(argval,str):
+ if "<->" in argval:
+ # Type 4 address.
+ args[i] = self._parseRange(*argval.split("<->",1))
+ continue
+ elif "/" in argval:
+ # Type 5 address.
+ args[i] = self._parseMask(*argval.split("/",1))
+ else:
+ # Type 1, 2 or 3.
+ args[i] = self._parseAddrRange(argval)
+ elif isinstance(argval,tuple):
+ if len(tuple) != 2:
+ raise ValueError("Tuple is of invalid length.")
+ addr1, addr2 = argval
+ if isinstance(addr1,str):
+ addr1 = self._parseAddrRange(addr1)[0]
+ elif not isinstance(addr1, six.integer_types):
+ raise TypeError("Invalid argument.")
+ if isinstance(addr2,str):
+ addr2 = self._parseAddrRange(addr2)[1]
+ elif not isinstance(addr2, six.integer_types):
+ raise TypeError("Invalid argument.")
+ args[i] = (addr1,addr2)
+ elif not isinstance(argval, six.integer_types):
+ raise TypeError("Invalid argument.")
+
+ # Initialize the integer set.
+ super(IP4Range,self).__init__(min=self._MINIP4,max=self._MAXIP4,*args)
+
+ # Parsing functions
+ # -----------------
+
+ def _parseRange(self,addr1,addr2):
+ naddr1, naddr1len = _parseAddr(addr1)
+ naddr2, naddr2len = _parseAddr(addr2)
+ if naddr2len < naddr1len:
+ naddr2 += naddr1&(((1<<((naddr1len-naddr2len)*8))-1)<<
+ (naddr2len*8))
+ naddr2len = naddr1len
+ elif naddr2len > naddr1len:
+ raise ValueError("Range has more dots than address.")
+ naddr1 <<= (4-naddr1len)*8
+ naddr2 <<= (4-naddr2len)*8
+ naddr2 += (1<<((4-naddr2len)*8))-1
+ return (naddr1,naddr2)
+
+ def _parseMask(self,addr,mask):
+ naddr, naddrlen = _parseAddr(addr)
+ naddr <<= (4-naddrlen)*8
+ try:
+ if not mask:
+ masklen = 0
+ else:
+ masklen = int(mask)
+ if not 0 <= masklen <= 32:
+ raise ValueError
+ except ValueError:
+ try:
+ mask = _parseAddr(mask,False)
+ except ValueError:
+ raise ValueError("Mask isn't parseable.")
+ remaining = 0
+ masklen = 0
+ if not mask:
+ masklen = 0
+ else:
+ while not (mask&1):
+ remaining += 1
+ while (mask&1):
+ mask >>= 1
+ masklen += 1
+ if remaining+masklen != 32:
+ raise ValueError("Mask isn't a proper host mask.")
+ naddr1 = naddr & (((1<<masklen)-1)<<(32-masklen))
+ naddr2 = naddr1 + (1<<(32-masklen)) - 1
+ return (naddr1,naddr2)
+
+ def _parseAddrRange(self,addr):
+ naddr, naddrlen = _parseAddr(addr)
+ naddr1 = naddr<<((4-naddrlen)*8)
+ naddr2 = ( (naddr<<((4-naddrlen)*8)) +
+ (1<<((4-naddrlen)*8)) - 1 )
+ return (naddr1,naddr2)
+
+ # Utility functions
+ # -----------------
+
+ def _int2ip(self,num):
+ rv = []
+ for i in range(4):
+ rv.append(str(num&255))
+ num >>= 8
+ return ".".join(reversed(rv))
+
+ # Iterating
+ # ---------
+
+ def iteraddresses(self):
+ """Returns an iterator which iterates over ips in this iprange. An
+ IP is returned in string form (e.g. '1.2.3.4')."""
+
+ for v in super(IP4Range,self).__iter__():
+ yield self._int2ip(v)
+
+ def iterranges(self):
+ """Returns an iterator which iterates over ip-ip ranges which build
+ this iprange if combined. An ip-ip pair is returned in string form
+ (e.g. '1.2.3.4-2.3.4.5')."""
+
+ for r in self._ranges:
+ if r[1]-r[0] == 1:
+ yield self._int2ip(r[0])
+ else:
+ yield '%s-%s' % (self._int2ip(r[0]),self._int2ip(r[1]-1))
+
+ def itermasks(self):
+ """Returns an iterator which iterates over ip/mask pairs which build
+ this iprange if combined. An IP/Mask pair is returned in string form
+ (e.g. '1.2.3.0/24')."""
+
+ for r in self._ranges:
+ for v in self._itermasks(r):
+ yield v
+
+ def _itermasks(self,r):
+ ranges = [r]
+ while ranges:
+ cur = ranges.pop()
+ curmask = 0
+ while True:
+ curmasklen = 1<<(32-curmask)
+ start = (cur[0]+curmasklen-1)&(((1<<curmask)-1)<<(32-curmask))
+ if start >= cur[0] and start+curmasklen <= cur[1]:
+ break
+ else:
+ curmask += 1
+ yield "%s/%s" % (self._int2ip(start),curmask)
+ if cur[0] < start:
+ ranges.append((cur[0],start))
+ if cur[1] > start+curmasklen:
+ ranges.append((start+curmasklen,cur[1]))
+
+ __iter__ = iteraddresses
+
+ # Printing
+ # --------
+
+ def __repr__(self):
+ """Returns a string which can be used to reconstruct this iprange."""
+
+ rv = []
+ for start, stop in self._ranges:
+ if stop-start == 1:
+ rv.append("%r" % (self._int2ip(start),))
+ else:
+ rv.append("(%r,%r)" % (self._int2ip(start),
+ self._int2ip(stop-1)))
+ return "%s(%s)" % (self.__class__.__name__,",".join(rv))
+
+def _parseAddr(addr,lookup=True):
+ if lookup and any(ch not in IP4Range._IPREMOVE for ch in addr):
+ try:
+ addr = socket.gethostbyname(addr)
+ except socket.error:
+ raise ValueError("Invalid Hostname as argument.")
+ naddr = 0
+ for naddrpos, part in enumerate(addr.split(".")):
+ if naddrpos >= 4:
+ raise ValueError("Address contains more than four parts.")
+ try:
+ if not part:
+ part = 0
+ else:
+ part = int(part)
+ if not 0 <= part < 256:
+ raise ValueError
+ except ValueError:
+ raise ValueError("Address part out of range.")
+ naddr <<= 8
+ naddr += part
+ return naddr, naddrpos+1
+
+def ip2int(addr, lookup=True):
+ return _parseAddr(addr, lookup=lookup)[0]
+
+if __name__ == "__main__":
+ # Little test script.
+ x = IP4Range("172.22.162.250/24")
+ y = IP4Range("172.22.162.250","172.22.163.250","172.22.163.253<->255")
+ print(x)
+ for val in x.itermasks():
+ print(val)
+ for val in y.itermasks():
+ print(val)
+ for val in (x|y).itermasks():
+ print(val)
+ for val in (x^y).iterranges():
+ print(val)
+ for val in x:
+ print(val)