summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkaiyou <pierre@jaury.eu>2017-06-05 17:51:37 +0200
committerkaiyou <pierre@jaury.eu>2017-06-05 18:21:21 +0200
commit5dd91cd4aaa2e7cd8dde1dd316d53cab25ef9b78 (patch)
treef8094d5a29463f9f20b41e45d7d4bdd229d29b56
parentdc2b24dcdd4ed7c8f6874ee5dd95716761ab6c93 (diff)
downloaddocker-py-5dd91cd4aaa2e7cd8dde1dd316d53cab25ef9b78.tar.gz
Rewrite the split_port function using re
In the case of a defined format with specific parts, a regular expression with named capturing bits make reasoning about the parts simpler than imlementing a parser from scratch. Signed-off-by: kaiyou <pierre@jaury.eu>
-rw-r--r--docker/utils/ports.py107
1 files changed, 40 insertions, 67 deletions
diff --git a/docker/utils/ports.py b/docker/utils/ports.py
index 3708958..57332de 100644
--- a/docker/utils/ports.py
+++ b/docker/utils/ports.py
@@ -1,3 +1,16 @@
+import re
+
+PORT_SPEC = re.compile(
+ "^" # Match full string
+ "(" # External part
+ "((?P<host>[a-fA-F\d.:]+):)?" # Address
+ "(?P<ext>[\d]*)(-(?P<ext_end>[\d]+))?:" # External range
+ ")?"
+ "(?P<int>[\d]+)(-(?P<int_end>[\d]+))?" # Internal range
+ "(?P<proto>/(udp|tcp))?" # Protocol
+ "$" # Match full string
+)
+
def add_port_mapping(port_bindings, internal_port, external):
if internal_port in port_bindings:
@@ -24,81 +37,41 @@ def build_port_bindings(ports):
return port_bindings
-def to_port_range(port, randomly_available_port=False):
- if not port:
- return None
-
- protocol = ""
- if "/" in port:
- parts = port.split("/")
- if len(parts) != 2:
- _raise_invalid_port(port)
-
- port, protocol = parts
- protocol = "/" + protocol
-
- if randomly_available_port:
- return ["%s%s" % (port, protocol)]
-
- parts = str(port).split('-')
-
- if len(parts) == 1:
- return ["%s%s" % (port, protocol)]
-
- if len(parts) == 2:
- full_port_range = range(int(parts[0]), int(parts[1]) + 1)
- return ["%s%s" % (p, protocol) for p in full_port_range]
-
- raise ValueError('Invalid port range "%s", should be '
- 'port or startport-endport' % port)
-
-
def _raise_invalid_port(port):
raise ValueError('Invalid port "%s", should be '
'[[remote_ip:]remote_port[-remote_port]:]'
'port[/protocol]' % port)
-def split_port(port):
- parts = str(port).split(':')
-
- if not 1 <= len(parts) <= 3:
- _raise_invalid_port(port)
-
- if len(parts) == 1:
- internal_port, = parts
- if not internal_port:
- _raise_invalid_port(port)
- return to_port_range(internal_port), None
- if len(parts) == 2:
- external_port, internal_port = parts
-
- internal_range = to_port_range(internal_port)
- if internal_range is None:
- _raise_invalid_port(port)
-
- external_range = to_port_range(external_port, len(internal_range) == 1)
- if external_range is None:
- _raise_invalid_port(port)
-
- if len(internal_range) != len(external_range):
- raise ValueError('Port ranges don\'t match in length')
-
- return internal_range, external_range
+def port_range(start, end, proto, randomly_available_port=False):
+ if not start:
+ return start
+ if not end:
+ return [start + proto]
+ if randomly_available_port:
+ return ['{}-{}'.format(start, end) + proto]
+ return [str(port) + proto for port in range(int(start), int(end) + 1)]
- external_ip, external_port, internal_port = parts
- if not internal_port:
+def split_port(port):
+ match = PORT_SPEC.match(port)
+ if match is None:
_raise_invalid_port(port)
+ parts = match.groupdict()
- internal_range = to_port_range(internal_port)
- external_range = to_port_range(external_port, len(internal_range) == 1)
-
- if not external_range:
- external_range = [None] * len(internal_range)
-
- if len(internal_range) != len(external_range):
- raise ValueError('Port ranges don\'t match in length')
+ host = parts['host']
+ proto = parts['proto'] or ''
+ internal = port_range(parts['int'], parts['int_end'], proto)
+ external = port_range(
+ parts['ext'], parts['ext_end'], '', len(internal) == 1)
- return internal_range, [(external_ip, ex_port or None)
- for ex_port in external_range]
+ if host is None:
+ if external is not None and len(internal) != len(external):
+ raise ValueError('Port ranges don\'t match in length')
+ return internal, external
+ else:
+ if not external:
+ external = [None] * len(internal)
+ elif len(internal) != len(external):
+ raise ValueError('Port ranges don\'t match in length')
+ return internal, [(host, ext_port) for ext_port in external]