summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormmcardle <mark.mcardle@sohonet.com>2018-07-10 14:45:32 +0100
committermmcardle <mark.mcardle@sohonet.com>2018-07-10 15:23:30 +0100
commit47fb18c41b4851ba6071f0215e96e222b8ccef29 (patch)
tree424c38feb86558a61beb6b502f9608b0cd5011fb
parentc2c5af603f8ae25be052a20b02dc109b0f8f014a (diff)
downloadpython-swiftclient-47fb18c41b4851ba6071f0215e96e222b8ccef29.tar.gz
Add ability to generate a temporary URL with an
IP range restriction Change-Id: I4734599886e4f4a563162390d0ff3bb1ef639db4
-rwxr-xr-xswiftclient/shell.py11
-rw-r--r--swiftclient/utils.py25
-rw-r--r--tests/unit/test_shell.py30
-rw-r--r--tests/unit/test_utils.py48
4 files changed, 105 insertions, 9 deletions
diff --git a/swiftclient/shell.py b/swiftclient/shell.py
index e91a16f..74a47b7 100755
--- a/swiftclient/shell.py
+++ b/swiftclient/shell.py
@@ -1325,6 +1325,8 @@ Optional arguments:
generated.
--iso8601 If present, the generated temporary URL will contain an
ISO 8601 UTC timestamp instead of a Unix timestamp.
+ --ip-range If present, the temporary URL will be restricted to the
+ given ip or ip range.
'''.strip('\n')
@@ -1348,6 +1350,12 @@ def st_tempurl(parser, args, thread_manager):
help=("If present, the temporary URL will contain an ISO 8601 UTC "
"timestamp instead of a Unix timestamp."),
)
+ parser.add_argument(
+ '--ip-range', action='store',
+ default=None,
+ help=("If present, the temporary URL will be restricted to the "
+ "given ip or ip range."),
+ )
(options, args) = parse_args(parser, args)
args = args[1:]
@@ -1367,7 +1375,8 @@ def st_tempurl(parser, args, thread_manager):
path = generate_temp_url(parsed.path, timestamp, key, method,
absolute=options['absolute_expiry'],
iso8601=options['iso8601'],
- prefix=options['prefix_based'])
+ prefix=options['prefix_based'],
+ ip_range=options['ip_range'])
except ValueError as err:
thread_manager.error(err)
return
diff --git a/swiftclient/utils.py b/swiftclient/utils.py
index 8afcde9..5c17c61 100644
--- a/swiftclient/utils.py
+++ b/swiftclient/utils.py
@@ -69,7 +69,7 @@ def prt_bytes(num_bytes, human_flag):
def generate_temp_url(path, seconds, key, method, absolute=False,
- prefix=False, iso8601=False):
+ prefix=False, iso8601=False, ip_range=None):
"""Generates a temporary URL that gives unauthenticated access to the
Swift object.
@@ -92,6 +92,8 @@ def generate_temp_url(path, seconds, key, method, absolute=False,
:param prefix: if True then a prefix-based temporary URL will be generated.
:param iso8601: if True, a URL containing an ISO 8601 UTC timestamp
instead of a UNIX timestamp will be created.
+ :param ip_range: if a valid ip range, restricts the temporary URL to the
+ range of ips.
:raises ValueError: if timestamp or path is not in valid format.
:return: the path portion of a temporary URL
"""
@@ -155,8 +157,21 @@ def generate_temp_url(path, seconds, key, method, absolute=False,
expiration = int(time.time() + timestamp)
else:
expiration = timestamp
- hmac_body = u'\n'.join([method.upper(), str(expiration),
- ('prefix:' if prefix else '') + path_for_body])
+
+ hmac_parts = [method.upper(), str(expiration),
+ ('prefix:' if prefix else '') + path_for_body]
+
+ if ip_range:
+ if isinstance(ip_range, six.binary_type):
+ try:
+ ip_range = ip_range.decode('utf-8')
+ except UnicodeDecodeError:
+ raise ValueError(
+ 'ip_range must be representable as UTF-8'
+ )
+ hmac_parts.insert(0, "ip=%s" % ip_range)
+
+ hmac_body = u'\n'.join(hmac_parts)
# Encode to UTF-8 for py3 compatibility
if not isinstance(key, six.binary_type):
@@ -169,6 +184,10 @@ def generate_temp_url(path, seconds, key, method, absolute=False,
temp_url = u'{path}?temp_url_sig={sig}&temp_url_expires={exp}'.format(
path=path_for_body, sig=sig, exp=expiration)
+
+ if ip_range:
+ temp_url += u'&temp_url_ip_range={}'.format(ip_range)
+
if prefix:
temp_url += u'&temp_url_prefix={}'.format(parts[4])
# Have return type match path from caller
diff --git a/tests/unit/test_shell.py b/tests/unit/test_shell.py
index 3db48a4..8c995e5 100644
--- a/tests/unit/test_shell.py
+++ b/tests/unit/test_shell.py
@@ -1667,7 +1667,7 @@ class TestShell(unittest.TestCase):
swiftclient.shell.main(argv)
temp_url.assert_called_with(
'/v1/AUTH_account/c/o', "60", 'secret_key', 'GET', absolute=False,
- iso8601=False, prefix=False)
+ iso8601=False, prefix=False, ip_range=None)
@mock.patch('swiftclient.shell.generate_temp_url', return_value='')
def test_temp_url_prefix_based(self, temp_url):
@@ -1676,7 +1676,7 @@ class TestShell(unittest.TestCase):
swiftclient.shell.main(argv)
temp_url.assert_called_with(
'/v1/AUTH_account/c/', "60", 'secret_key', 'GET', absolute=False,
- iso8601=False, prefix=True)
+ iso8601=False, prefix=True, ip_range=None)
@mock.patch('swiftclient.shell.generate_temp_url', return_value='')
def test_temp_url_iso8601_in(self, temp_url):
@@ -1688,7 +1688,7 @@ class TestShell(unittest.TestCase):
swiftclient.shell.main(argv)
temp_url.assert_called_with(
'/v1/AUTH_account/c/', d, 'secret_key', 'GET', absolute=False,
- iso8601=False, prefix=False)
+ iso8601=False, prefix=False, ip_range=None)
@mock.patch('swiftclient.shell.generate_temp_url', return_value='')
def test_temp_url_iso8601_out(self, temp_url):
@@ -1697,7 +1697,7 @@ class TestShell(unittest.TestCase):
swiftclient.shell.main(argv)
temp_url.assert_called_with(
'/v1/AUTH_account/c/', "60", 'secret_key', 'GET', absolute=False,
- iso8601=True, prefix=False)
+ iso8601=True, prefix=False, ip_range=None)
@mock.patch('swiftclient.shell.generate_temp_url', return_value='')
def test_absolute_expiry_temp_url(self, temp_url):
@@ -1706,7 +1706,16 @@ class TestShell(unittest.TestCase):
swiftclient.shell.main(argv)
temp_url.assert_called_with(
'/v1/AUTH_account/c/o', "60", 'secret_key', 'GET', absolute=True,
- iso8601=False, prefix=False)
+ iso8601=False, prefix=False, ip_range=None)
+
+ @mock.patch('swiftclient.shell.generate_temp_url', return_value='')
+ def test_temp_url_with_ip_range(self, temp_url):
+ argv = ["", "tempurl", "GET", "60", "/v1/AUTH_account/c/o",
+ "secret_key", "--ip-range", "1.2.3.4"]
+ swiftclient.shell.main(argv)
+ temp_url.assert_called_with(
+ '/v1/AUTH_account/c/o', "60", 'secret_key', 'GET', absolute=False,
+ iso8601=False, prefix=False, ip_range='1.2.3.4')
def test_temp_url_output(self):
argv = ["", "tempurl", "GET", "60", "/v1/a/c/o",
@@ -1769,6 +1778,17 @@ class TestShell(unittest.TestCase):
swiftclient.shell.main(argv)
self.assertEqual(expected, output.out)
+ argv = ["", "tempurl", "GET", "60", "/v1/a/c/o",
+ "secret_key", "--absolute", "--ip-range", "1.2.3.4"]
+ with CaptureOutput(suppress_systemexit=True) as output:
+ swiftclient.shell.main(argv)
+ sig = "6a6ec8efa4be53904ecba8d055d841e24a937c98"
+ expected = (
+ "/v1/a/c/o?temp_url_sig=%s&temp_url_expires=60"
+ "&temp_url_ip_range=1.2.3.4\n" % sig
+ )
+ self.assertEqual(expected, output.out)
+
def test_temp_url_error_output(self):
expected = 'path must be full path to an object e.g. /v1/a/c/o\n'
for bad_path in ('/v1/a/c', 'v1/a/c/o', '/v1/a/c/', '/v1/a//o',
diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py
index adead00..e54b90c 100644
--- a/tests/unit/test_utils.py
+++ b/tests/unit/test_utils.py
@@ -152,6 +152,54 @@ class TestTempURL(unittest.TestCase):
self.assertIsInstance(url, type(self.url))
@mock.patch('hmac.HMAC')
+ @mock.patch('time.time', return_value=1400000000)
+ def test_generate_temp_url_ip_range(self, time_mock, hmac_mock):
+ hmac_mock().hexdigest.return_value = 'temp_url_signature'
+ ip_ranges = [
+ '1.2.3.4', '1.2.3.4/24', '2001:db8::',
+ b'1.2.3.4', b'1.2.3.4/24', b'2001:db8::',
+ ]
+ path = '/v1/AUTH_account/c/o/'
+ expected_url = path + ('?temp_url_sig=temp_url_signature'
+ '&temp_url_expires=1400003600'
+ '&temp_url_ip_range=')
+ for ip_range in ip_ranges:
+ hmac_mock.reset_mock()
+ url = u.generate_temp_url(path, self.seconds,
+ self.key, self.method,
+ ip_range=ip_range)
+ key = self.key
+ if not isinstance(key, six.binary_type):
+ key = key.encode('utf-8')
+
+ if isinstance(ip_range, six.binary_type):
+ ip_range_expected_url = (
+ expected_url + ip_range.decode('utf-8')
+ )
+ expected_body = '\n'.join([
+ 'ip=' + ip_range.decode('utf-8'),
+ self.method,
+ '1400003600',
+ path,
+ ]).encode('utf-8')
+ else:
+ ip_range_expected_url = expected_url + ip_range
+ expected_body = '\n'.join([
+ 'ip=' + ip_range,
+ self.method,
+ '1400003600',
+ path,
+ ]).encode('utf-8')
+
+ self.assertEqual(url, ip_range_expected_url)
+
+ self.assertEqual(hmac_mock.mock_calls, [
+ mock.call(key, expected_body, sha1),
+ mock.call().hexdigest(),
+ ])
+ self.assertIsInstance(url, type(path))
+
+ @mock.patch('hmac.HMAC')
def test_generate_temp_url_iso8601_argument(self, hmac_mock):
hmac_mock().hexdigest.return_value = 'temp_url_signature'
url = u.generate_temp_url(self.url, '2014-05-13T17:53:20Z',