summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDoug Hellmann <doug@doughellmann.com>2015-01-05 16:07:24 -0500
committerDoug Hellmann <doug@doughellmann.com>2015-01-06 17:07:36 -0500
commitca76fdcb52a5149f2b266bfb28c446f652c49128 (patch)
tree45b76495364cec84888fc5715ba8e3667bdc2bc9
parent44f36e35ec47d9ad430676c41749ccd5c6eadad4 (diff)
downloadoslo-utils-ca76fdcb52a5149f2b266bfb28c446f652c49128.tar.gz
Move files out of the namespace package
Move the public API out of oslo.utils to oslo_utils. Retain the ability to import from the old namespace package for backwards compatibility for this release cycle. bp/drop-namespace-packages Change-Id: Ic6dd62097399bf75e3d11b4d8a6400971069c415
-rw-r--r--doc/source/api/encodeutils.rst2
-rw-r--r--doc/source/api/excutils.rst2
-rw-r--r--doc/source/api/importutils.rst2
-rw-r--r--doc/source/api/netutils.rst2
-rw-r--r--doc/source/api/strutils.rst2
-rw-r--r--doc/source/api/timeutils.rst2
-rw-r--r--doc/source/api/units.rst2
-rw-r--r--doc/source/usage.rst2
-rw-r--r--oslo/utils/__init__.py26
-rw-r--r--oslo/utils/encodeutils.py84
-rw-r--r--oslo/utils/excutils.py102
-rw-r--r--oslo/utils/importutils.py62
-rw-r--r--oslo/utils/netutils.py277
-rw-r--r--oslo/utils/reflection.py197
-rw-r--r--oslo/utils/strutils.py249
-rw-r--r--oslo/utils/timeutils.py199
-rw-r--r--oslo/utils/units.py27
-rw-r--r--oslo/utils/uuidutils.py34
-rw-r--r--oslo_utils/__init__.py0
-rw-r--r--oslo_utils/_i18n.py (renamed from oslo/utils/_i18n.py)0
-rw-r--r--oslo_utils/encodeutils.py95
-rw-r--r--oslo_utils/excutils.py113
-rw-r--r--oslo_utils/importutils.py73
-rw-r--r--oslo_utils/netutils.py286
-rw-r--r--oslo_utils/reflection.py208
-rw-r--r--oslo_utils/strutils.py260
-rw-r--r--oslo_utils/tests/__init__.py (renamed from tests/test_utils.py)15
-rw-r--r--oslo_utils/tests/base.py55
-rw-r--r--oslo_utils/tests/fake/__init__.py (renamed from tests/fake/__init__.py)0
-rw-r--r--oslo_utils/tests/test_excutils.py196
-rw-r--r--oslo_utils/tests/test_importutils.py121
-rw-r--r--oslo_utils/tests/test_netutils.py224
-rw-r--r--oslo_utils/tests/test_reflection.py279
-rw-r--r--oslo_utils/tests/test_strutils.py594
-rw-r--r--oslo_utils/tests/test_timeutils.py340
-rw-r--r--oslo_utils/tests/test_uuidutils.py54
-rw-r--r--oslo_utils/tests/tests_encodeutils.py105
-rw-r--r--oslo_utils/timeutils.py210
-rw-r--r--oslo_utils/units.py38
-rw-r--r--oslo_utils/uuidutils.py45
-rw-r--r--setup.cfg1
-rw-r--r--tests/test_importutils.py27
-rw-r--r--tests/test_netutils.py22
-rw-r--r--tests/test_warning.py61
-rw-r--r--tox.ini2
45 files changed, 3420 insertions, 1277 deletions
diff --git a/doc/source/api/encodeutils.rst b/doc/source/api/encodeutils.rst
index 82ca596..35f339c 100644
--- a/doc/source/api/encodeutils.rst
+++ b/doc/source/api/encodeutils.rst
@@ -2,5 +2,5 @@
encodeutils
=============
-.. automodule:: oslo.utils.encodeutils
+.. automodule:: oslo_utils.encodeutils
:members:
diff --git a/doc/source/api/excutils.rst b/doc/source/api/excutils.rst
index d39be38..f5a67a0 100644
--- a/doc/source/api/excutils.rst
+++ b/doc/source/api/excutils.rst
@@ -2,5 +2,5 @@
excutils
==========
-.. automodule:: oslo.utils.excutils
+.. automodule:: oslo_utils.excutils
:members:
diff --git a/doc/source/api/importutils.rst b/doc/source/api/importutils.rst
index 0864d92..a8f7558 100644
--- a/doc/source/api/importutils.rst
+++ b/doc/source/api/importutils.rst
@@ -2,5 +2,5 @@
importutils
=============
-.. automodule:: oslo.utils.importutils
+.. automodule:: oslo_utils.importutils
:members:
diff --git a/doc/source/api/netutils.rst b/doc/source/api/netutils.rst
index bb50442..0fa3156 100644
--- a/doc/source/api/netutils.rst
+++ b/doc/source/api/netutils.rst
@@ -2,5 +2,5 @@
netutils
==========
-.. automodule:: oslo.utils.netutils
+.. automodule:: oslo_utils.netutils
:members:
diff --git a/doc/source/api/strutils.rst b/doc/source/api/strutils.rst
index bd2531b..fe8dabf 100644
--- a/doc/source/api/strutils.rst
+++ b/doc/source/api/strutils.rst
@@ -2,5 +2,5 @@
strutils
==========
-.. automodule:: oslo.utils.strutils
+.. automodule:: oslo_utils.strutils
:members:
diff --git a/doc/source/api/timeutils.rst b/doc/source/api/timeutils.rst
index d1292b3..ee1f1e2 100644
--- a/doc/source/api/timeutils.rst
+++ b/doc/source/api/timeutils.rst
@@ -2,5 +2,5 @@
timeutils
===========
-.. automodule:: oslo.utils.timeutils
+.. automodule:: oslo_utils.timeutils
:members:
diff --git a/doc/source/api/units.rst b/doc/source/api/units.rst
index edd1bed..5a09d3b 100644
--- a/doc/source/api/units.rst
+++ b/doc/source/api/units.rst
@@ -2,5 +2,5 @@
units
=======
-.. automodule:: oslo.utils.units
+.. automodule:: oslo_utils.units
:members:
diff --git a/doc/source/usage.rst b/doc/source/usage.rst
index 9e83f6b..6a79dbc 100644
--- a/doc/source/usage.rst
+++ b/doc/source/usage.rst
@@ -5,6 +5,6 @@
To use oslo.utils in a project, import the individual module you
need. For example::
- from oslo.utils import strutils
+ from oslo_utils import strutils
slug = strutils.to_slug('input value')
diff --git a/oslo/utils/__init__.py b/oslo/utils/__init__.py
index e69de29..73e54f3 100644
--- a/oslo/utils/__init__.py
+++ b/oslo/utils/__init__.py
@@ -0,0 +1,26 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import warnings
+
+
+def deprecated():
+ new_name = __name__.replace('.', '_')
+ warnings.warn(
+ ('The oslo namespace package is deprecated. Please use %s instead.' %
+ new_name),
+ DeprecationWarning,
+ stacklevel=3,
+ )
+
+
+deprecated()
diff --git a/oslo/utils/encodeutils.py b/oslo/utils/encodeutils.py
index 14bd717..0de2476 100644
--- a/oslo/utils/encodeutils.py
+++ b/oslo/utils/encodeutils.py
@@ -1,6 +1,3 @@
-# Copyright 2014 Red Hat, Inc.
-# All Rights Reserved.
-#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
@@ -13,83 +10,4 @@
# License for the specific language governing permissions and limitations
# under the License.
-import sys
-
-import six
-
-
-def safe_decode(text, incoming=None, errors='strict'):
- """Decodes incoming text/bytes string using `incoming` if they're not
- already unicode.
-
- :param incoming: Text's current encoding
- :param errors: Errors handling policy. See here for valid
- values http://docs.python.org/2/library/codecs.html
- :returns: text or a unicode `incoming` encoded
- representation of it.
- :raises TypeError: If text is not an instance of str
- """
- if not isinstance(text, (six.string_types, six.binary_type)):
- raise TypeError("%s can't be decoded" % type(text))
-
- if isinstance(text, six.text_type):
- return text
-
- if not incoming:
- incoming = (sys.stdin.encoding or
- sys.getdefaultencoding())
-
- try:
- return text.decode(incoming, errors)
- except UnicodeDecodeError:
- # Note(flaper87) If we get here, it means that
- # sys.stdin.encoding / sys.getdefaultencoding
- # didn't return a suitable encoding to decode
- # text. This happens mostly when global LANG
- # var is not set correctly and there's no
- # default encoding. In this case, most likely
- # python will use ASCII or ANSI encoders as
- # default encodings but they won't be capable
- # of decoding non-ASCII characters.
- #
- # Also, UTF-8 is being used since it's an ASCII
- # extension.
- return text.decode('utf-8', errors)
-
-
-def safe_encode(text, incoming=None,
- encoding='utf-8', errors='strict'):
- """Encodes incoming text/bytes string using `encoding`.
-
- If incoming is not specified, text is expected to be encoded with
- current python's default encoding. (`sys.getdefaultencoding`)
-
- :param incoming: Text's current encoding
- :param encoding: Expected encoding for text (Default UTF-8)
- :param errors: Errors handling policy. See here for valid
- values http://docs.python.org/2/library/codecs.html
- :returns: text or a bytestring `encoding` encoded
- representation of it.
- :raises TypeError: If text is not an instance of str
- """
- if not isinstance(text, (six.string_types, six.binary_type)):
- raise TypeError("%s can't be encoded" % type(text))
-
- if not incoming:
- incoming = (sys.stdin.encoding or
- sys.getdefaultencoding())
-
- # Avoid case issues in comparisons
- if hasattr(incoming, 'lower'):
- incoming = incoming.lower()
- if hasattr(encoding, 'lower'):
- encoding = encoding.lower()
-
- if isinstance(text, six.text_type):
- return text.encode(encoding, errors)
- elif text and encoding != incoming:
- # Decode text before encoding it with `encoding`
- text = safe_decode(text, incoming, errors)
- return text.encode(encoding, errors)
- else:
- return text
+from oslo_utils.encodeutils import * # noqa
diff --git a/oslo/utils/excutils.py b/oslo/utils/excutils.py
index 732492e..333c955 100644
--- a/oslo/utils/excutils.py
+++ b/oslo/utils/excutils.py
@@ -1,6 +1,3 @@
-# Copyright 2011 OpenStack Foundation.
-# Copyright 2012, Red Hat, Inc.
-#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
@@ -13,101 +10,4 @@
# License for the specific language governing permissions and limitations
# under the License.
-"""
-Exception related utilities.
-"""
-
-import logging
-import sys
-import time
-import traceback
-
-import six
-
-from oslo.utils._i18n import _LE
-
-
-class save_and_reraise_exception(object):
- """Save current exception, run some code and then re-raise.
-
- In some cases the exception context can be cleared, resulting in None
- being attempted to be re-raised after an exception handler is run. This
- can happen when eventlet switches greenthreads or when running an
- exception handler, code raises and catches an exception. In both
- cases the exception context will be cleared.
-
- To work around this, we save the exception state, run handler code, and
- then re-raise the original exception. If another exception occurs, the
- saved exception is logged and the new exception is re-raised.
-
- In some cases the caller may not want to re-raise the exception, and
- for those circumstances this context provides a reraise flag that
- can be used to suppress the exception. For example::
-
- except Exception:
- with save_and_reraise_exception() as ctxt:
- decide_if_need_reraise()
- if not should_be_reraised:
- ctxt.reraise = False
-
- If another exception occurs and reraise flag is False,
- the saved exception will not be logged.
-
- If the caller wants to raise new exception during exception handling
- he/she sets reraise to False initially with an ability to set it back to
- True if needed::
-
- except Exception:
- with save_and_reraise_exception(reraise=False) as ctxt:
- [if statements to determine whether to raise a new exception]
- # Not raising a new exception, so reraise
- ctxt.reraise = True
- """
- def __init__(self, reraise=True):
- self.reraise = reraise
-
- def __enter__(self):
- self.type_, self.value, self.tb, = sys.exc_info()
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- if exc_type is not None:
- if self.reraise:
- logging.error(_LE('Original exception being dropped: %s'),
- traceback.format_exception(self.type_,
- self.value,
- self.tb))
- return False
- if self.reraise:
- six.reraise(self.type_, self.value, self.tb)
-
-
-def forever_retry_uncaught_exceptions(infunc):
- def inner_func(*args, **kwargs):
- last_log_time = 0
- last_exc_message = None
- exc_count = 0
- while True:
- try:
- return infunc(*args, **kwargs)
- except Exception as exc:
- this_exc_message = six.u(str(exc))
- if this_exc_message == last_exc_message:
- exc_count += 1
- else:
- exc_count = 1
- # Do not log any more frequently than once a minute unless
- # the exception message changes
- cur_time = int(time.time())
- if (cur_time - last_log_time > 60 or
- this_exc_message != last_exc_message):
- logging.exception(
- _LE('Unexpected exception occurred %d time(s)... '
- 'retrying.') % exc_count)
- last_log_time = cur_time
- last_exc_message = this_exc_message
- exc_count = 0
- # This should be a very rare event. In case it isn't, do
- # a sleep.
- time.sleep(1)
- return inner_func
+from oslo_utils.excutils import * # noqa
diff --git a/oslo/utils/importutils.py b/oslo/utils/importutils.py
index 043f817..9097dc4 100644
--- a/oslo/utils/importutils.py
+++ b/oslo/utils/importutils.py
@@ -1,6 +1,3 @@
-# Copyright 2011 OpenStack Foundation.
-# All Rights Reserved.
-#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
@@ -13,61 +10,4 @@
# License for the specific language governing permissions and limitations
# under the License.
-"""
-Import related utilities and helper functions.
-"""
-
-import sys
-import traceback
-
-
-def import_class(import_str):
- """Returns a class from a string including module and class."""
- mod_str, _sep, class_str = import_str.rpartition('.')
- __import__(mod_str)
- try:
- return getattr(sys.modules[mod_str], class_str)
- except AttributeError:
- raise ImportError('Class %s cannot be found (%s)' %
- (class_str,
- traceback.format_exception(*sys.exc_info())))
-
-
-def import_object(import_str, *args, **kwargs):
- """Import a class and return an instance of it."""
- return import_class(import_str)(*args, **kwargs)
-
-
-def import_object_ns(name_space, import_str, *args, **kwargs):
- """Tries to import object from default namespace.
-
- Imports a class and return an instance of it, first by trying
- to find the class in a default namespace, then failing back to
- a full path if not found in the default namespace.
- """
- import_value = "%s.%s" % (name_space, import_str)
- try:
- return import_class(import_value)(*args, **kwargs)
- except ImportError:
- return import_class(import_str)(*args, **kwargs)
-
-
-def import_module(import_str):
- """Import a module."""
- __import__(import_str)
- return sys.modules[import_str]
-
-
-def import_versioned_module(version, submodule=None):
- module = 'oslo.v%s' % version
- if submodule:
- module = '.'.join((module, submodule))
- return import_module(module)
-
-
-def try_import(import_str, default=None):
- """Try to import a module and if it fails return default."""
- try:
- return import_module(import_str)
- except ImportError:
- return default
+from oslo_utils.importutils import * # noqa
diff --git a/oslo/utils/netutils.py b/oslo/utils/netutils.py
index 6bc9acf..701224d 100644
--- a/oslo/utils/netutils.py
+++ b/oslo/utils/netutils.py
@@ -1,6 +1,3 @@
-# Copyright 2012 OpenStack Foundation.
-# All Rights Reserved.
-#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
@@ -13,274 +10,6 @@
# License for the specific language governing permissions and limitations
# under the License.
-"""
-Network-related utilities and helper functions.
-"""
-
-import logging
-import socket
-
-import netaddr
-import netifaces
-from six.moves.urllib import parse
-
-from oslo.utils._i18n import _LI
-from oslo.utils._i18n import _LW
-
-LOG = logging.getLogger(__name__)
-
-
-def parse_host_port(address, default_port=None):
- """Interpret a string as a host:port pair.
-
- An IPv6 address MUST be escaped if accompanied by a port,
- because otherwise ambiguity ensues: 2001:db8:85a3::8a2e:370:7334
- means both [2001:db8:85a3::8a2e:370:7334] and
- [2001:db8:85a3::8a2e:370]:7334.
-
- >>> parse_host_port('server01:80')
- ('server01', 80)
- >>> parse_host_port('server01')
- ('server01', None)
- >>> parse_host_port('server01', default_port=1234)
- ('server01', 1234)
- >>> parse_host_port('[::1]:80')
- ('::1', 80)
- >>> parse_host_port('[::1]')
- ('::1', None)
- >>> parse_host_port('[::1]', default_port=1234)
- ('::1', 1234)
- >>> parse_host_port('2001:db8:85a3::8a2e:370:7334', default_port=1234)
- ('2001:db8:85a3::8a2e:370:7334', 1234)
- >>> parse_host_port(None)
- (None, None)
- """
- if not address:
- return (None, None)
-
- if address[0] == '[':
- # Escaped ipv6
- _host, _port = address[1:].split(']')
- host = _host
- if ':' in _port:
- port = _port.split(':')[1]
- else:
- port = default_port
- else:
- if address.count(':') == 1:
- host, port = address.split(':')
- else:
- # 0 means ipv4, >1 means ipv6.
- # We prohibit unescaped ipv6 addresses with port.
- host = address
- port = default_port
-
- return (host, None if port is None else int(port))
-
-
-def is_valid_ipv4(address):
- """Verify that address represents a valid IPv4 address.
-
- :param address: Value to verify
- :type address: string
- :returns: bool
- """
- try:
- return netaddr.valid_ipv4(address)
- except Exception:
- return False
-
-
-def is_valid_ipv6(address):
- """Verify that address represents a valid IPv6 address.
-
- :param address: Value to verify
- :type address: string
- :returns: bool
- """
- try:
- return netaddr.valid_ipv6(address)
- except Exception:
- return False
-
-
-def is_valid_ip(address):
- """Verify that address represents a valid IP address.
-
- :param address: Value to verify
- :type address: string
- :returns: bool
- """
- return is_valid_ipv4(address) or is_valid_ipv6(address)
-
-
-def is_valid_port(port):
- """Verify that port represents a valid port number."""
- try:
- val = int(port)
- except (ValueError, TypeError):
- return False
-
- return (val > 0 and val <= 65535)
-
-
-def get_my_ipv4():
- """Returns the actual ipv4 of the local machine.
-
- This code figures out what source address would be used if some traffic
- were to be sent out to some well known address on the Internet. In this
- case, IP from RFC5737 is used, but the specific address does not
- matter much. No traffic is actually sent.
- """
- try:
- csock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- csock.connect(('192.0.2.0', 80))
- (addr, port) = csock.getsockname()
- csock.close()
- return addr
- except socket.error:
- return _get_my_ipv4_address()
-
-
-def _get_my_ipv4_address():
- """Figure out the best ipv4
- """
- LOCALHOST = '127.0.0.1'
- gtw = netifaces.gateways()
- try:
- interface = gtw['default'][netifaces.AF_INET][1]
- except (KeyError, IndexError):
- LOG.info(_LI('Could not determine default network interface, '
- 'using 127.0.0.1 for IPv4 address'))
- try:
- return netifaces.ifaddresses(interface)[netifaces.AF_INET][0]['addr']
- except (KeyError, IndexError):
- LOG.info(_LI('Could not determine IPv4 address for interface %s, '
- 'using 127.0.0.1'),
- interface)
- except Exception as e:
- LOG.info(_LI('Could not determine IPv4 address for '
- 'interface %(interface)s: %(error)s'),
- {'interface': interface, 'error': e})
- return LOCALHOST
-
-
-class _ModifiedSplitResult(parse.SplitResult):
- """Split results class for urlsplit."""
-
- # NOTE(dims): The functions below are needed for Python 2.6.x.
- # We can remove these when we drop support for 2.6.x.
- @property
- def hostname(self):
- netloc = self.netloc.split('@', 1)[-1]
- host, port = parse_host_port(netloc)
- return host
-
- @property
- def port(self):
- netloc = self.netloc.split('@', 1)[-1]
- host, port = parse_host_port(netloc)
- return port
-
- def params(self, collapse=True):
- """Extracts the query parameters from the split urls components.
-
- This method will provide back as a dictionary the query parameter
- names and values that were provided in the url.
-
- :param collapse: Boolean, turn on or off collapsing of query values
- with the same name. Since a url can contain the same query parameter
- name with different values it may or may not be useful for users to
- care that this has happened. This parameter when True uses the
- last value that was given for a given name, while if False it will
- retain all values provided by associating the query parameter name with
- a list of values instead of a single (non-list) value.
- """
- if self.query:
- if collapse:
- return dict(parse.parse_qsl(self.query))
- else:
- params = {}
- for (key, value) in parse.parse_qsl(self.query):
- if key in params:
- if isinstance(params[key], list):
- params[key].append(value)
- else:
- params[key] = [params[key], value]
- else:
- params[key] = value
- return params
- else:
- return {}
-
-
-def urlsplit(url, scheme='', allow_fragments=True):
- """Parse a URL using urlparse.urlsplit(), splitting query and fragments.
- This function papers over Python issue9374_ when needed.
-
- .. _issue9374: http://bugs.python.org/issue9374
-
- The parameters are the same as urlparse.urlsplit.
- """
- scheme, netloc, path, query, fragment = parse.urlsplit(
- url, scheme, allow_fragments)
- if allow_fragments and '#' in path:
- path, fragment = path.split('#', 1)
- if '?' in path:
- path, query = path.split('?', 1)
- return _ModifiedSplitResult(scheme, netloc,
- path, query, fragment)
-
-
-def set_tcp_keepalive(sock, tcp_keepalive=True,
- tcp_keepidle=None,
- tcp_keepalive_interval=None,
- tcp_keepalive_count=None):
- """Set values for tcp keepalive parameters
-
- This function configures tcp keepalive parameters if users wish to do
- so.
-
- :param tcp_keepalive: Boolean, turn on or off tcp_keepalive. If users are
- not sure, this should be True, and default values will be used.
-
- :param tcp_keepidle: time to wait before starting to send keepalive probes
- :param tcp_keepalive_interval: time between successive probes, once the
- initial wait time is over
- :param tcp_keepalive_count: number of probes to send before the connection
- is killed
- """
-
- # NOTE(praneshp): Despite keepalive being a tcp concept, the level is
- # still SOL_SOCKET. This is a quirk.
- if isinstance(tcp_keepalive, bool):
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, tcp_keepalive)
- else:
- raise TypeError("tcp_keepalive must be a boolean")
-
- if not tcp_keepalive:
- return
-
- # These options aren't available in the OS X version of eventlet,
- # Idle + Count * Interval effectively gives you the total timeout.
- if tcp_keepidle is not None:
- if hasattr(socket, 'TCP_KEEPIDLE'):
- sock.setsockopt(socket.IPPROTO_TCP,
- socket.TCP_KEEPIDLE,
- tcp_keepidle)
- else:
- LOG.warning(_LW('tcp_keepidle not available on your system'))
- if tcp_keepalive_interval is not None:
- if hasattr(socket, 'TCP_KEEPINTVL'):
- sock.setsockopt(socket.IPPROTO_TCP,
- socket.TCP_KEEPINTVL,
- tcp_keepalive_interval)
- else:
- LOG.warning(_LW('tcp_keepintvl not available on your system'))
- if tcp_keepalive_count is not None:
- if hasattr(socket, 'TCP_KEEPCNT'):
- sock.setsockopt(socket.IPPROTO_TCP,
- socket.TCP_KEEPCNT,
- tcp_keepalive_count)
- else:
- LOG.warning(_LW('tcp_keepcnt not available on your system'))
+from oslo_utils.netutils import * # noqa
+# NOTE(dhellmann): Needed for taskflow.
+from oslo_utils.netutils import _ModifiedSplitResult # noqa
diff --git a/oslo/utils/reflection.py b/oslo/utils/reflection.py
index b964cf3..f8b5c16 100644
--- a/oslo/utils/reflection.py
+++ b/oslo/utils/reflection.py
@@ -1,7 +1,3 @@
-# -*- coding: utf-8 -*-
-
-# Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved.
-#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
@@ -14,195 +10,4 @@
# License for the specific language governing permissions and limitations
# under the License.
-import inspect
-import types
-
-import six
-
-try:
- _TYPE_TYPE = types.TypeType
-except AttributeError:
- _TYPE_TYPE = type
-
-# See: https://docs.python.org/2/library/__builtin__.html#module-__builtin__
-# and see https://docs.python.org/2/reference/executionmodel.html (and likely
-# others)...
-_BUILTIN_MODULES = ('builtins', '__builtin__', '__builtins__', 'exceptions')
-
-
-def _get_members(obj, exclude_hidden):
- """Yields the members of an object, filtering by hidden/not hidden."""
- for (name, value) in inspect.getmembers(obj):
- if name.startswith("_") and exclude_hidden:
- continue
- yield (name, value)
-
-
-def get_member_names(obj, exclude_hidden=True):
- """Get all the member names for a object."""
- return [name for (name, _obj) in _get_members(obj, exclude_hidden)]
-
-
-def get_class_name(obj, fully_qualified=True):
- """Get class name for object.
-
- If object is a type, fully qualified name of the type is returned.
- Else, fully qualified name of the type of the object is returned.
- For builtin types, just name is returned.
- """
- if not isinstance(obj, six.class_types):
- obj = type(obj)
- try:
- built_in = obj.__module__ in _BUILTIN_MODULES
- except AttributeError:
- pass
- else:
- if built_in:
- try:
- return obj.__qualname__
- except AttributeError:
- return obj.__name__
- pieces = []
- try:
- pieces.append(obj.__qualname__)
- except AttributeError:
- pieces.append(obj.__name__)
- if fully_qualified:
- try:
- pieces.insert(0, obj.__module__)
- except AttributeError:
- pass
- return '.'.join(pieces)
-
-
-def get_all_class_names(obj, up_to=object):
- """Get class names of object parent classes.
-
- Iterate over all class names object is instance or subclass of,
- in order of method resolution (mro). If up_to parameter is provided,
- only name of classes that are sublcasses to that class are returned.
- """
- if not isinstance(obj, six.class_types):
- obj = type(obj)
- for cls in obj.mro():
- if issubclass(cls, up_to):
- yield get_class_name(cls)
-
-
-def get_callable_name(function):
- """Generate a name from callable.
-
- Tries to do the best to guess fully qualified callable name.
- """
- method_self = get_method_self(function)
- if method_self is not None:
- # This is a bound method.
- if isinstance(method_self, six.class_types):
- # This is a bound class method.
- im_class = method_self
- else:
- im_class = type(method_self)
- try:
- parts = (im_class.__module__, function.__qualname__)
- except AttributeError:
- parts = (im_class.__module__, im_class.__name__, function.__name__)
- elif inspect.ismethod(function) or inspect.isfunction(function):
- # This could be a function, a static method, a unbound method...
- try:
- parts = (function.__module__, function.__qualname__)
- except AttributeError:
- if hasattr(function, 'im_class'):
- # This is a unbound method, which exists only in python 2.x
- im_class = function.im_class
- parts = (im_class.__module__,
- im_class.__name__, function.__name__)
- else:
- parts = (function.__module__, function.__name__)
- else:
- im_class = type(function)
- if im_class is _TYPE_TYPE:
- im_class = function
- try:
- parts = (im_class.__module__, im_class.__qualname__)
- except AttributeError:
- parts = (im_class.__module__, im_class.__name__)
- return '.'.join(parts)
-
-
-def get_method_self(method):
- if not inspect.ismethod(method):
- return None
- try:
- return six.get_method_self(method)
- except AttributeError:
- return None
-
-
-def is_same_callback(callback1, callback2, strict=True):
- """Returns if the two callbacks are the same."""
- if callback1 is callback2:
- # This happens when plain methods are given (or static/non-bound
- # methods).
- return True
- if callback1 == callback2:
- if not strict:
- return True
- # Two bound methods are equal if functions themselves are equal and
- # objects they are applied to are equal. This means that a bound
- # method could be the same bound method on another object if the
- # objects have __eq__ methods that return true (when in fact it is a
- # different bound method). Python u so crazy!
- try:
- self1 = six.get_method_self(callback1)
- self2 = six.get_method_self(callback2)
- return self1 is self2
- except AttributeError:
- pass
- return False
-
-
-def is_bound_method(method):
- """Returns if the given method is bound to an object."""
- return bool(get_method_self(method))
-
-
-def is_subclass(obj, cls):
- """Returns if the object is class and it is subclass of a given class."""
- return inspect.isclass(obj) and issubclass(obj, cls)
-
-
-def _get_arg_spec(function):
- if isinstance(function, _TYPE_TYPE):
- bound = True
- function = function.__init__
- elif isinstance(function, (types.FunctionType, types.MethodType)):
- bound = is_bound_method(function)
- function = getattr(function, '__wrapped__', function)
- else:
- function = function.__call__
- bound = is_bound_method(function)
- return inspect.getargspec(function), bound
-
-
-def get_callable_args(function, required_only=False):
- """Get names of callable arguments.
-
- Special arguments (like *args and **kwargs) are not included into
- output.
-
- If required_only is True, optional arguments (with default values)
- are not included into output.
- """
- argspec, bound = _get_arg_spec(function)
- f_args = argspec.args
- if required_only and argspec.defaults:
- f_args = f_args[:-len(argspec.defaults)]
- if bound:
- f_args = f_args[1:]
- return f_args
-
-
-def accepts_kwargs(function):
- """Returns True if function accepts kwargs."""
- argspec, _bound = _get_arg_spec(function)
- return bool(argspec.keywords)
+from oslo_utils.reflection import * # noqa
diff --git a/oslo/utils/strutils.py b/oslo/utils/strutils.py
index a82a4db..989e088 100644
--- a/oslo/utils/strutils.py
+++ b/oslo/utils/strutils.py
@@ -1,6 +1,3 @@
-# Copyright 2011 OpenStack Foundation.
-# All Rights Reserved.
-#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
@@ -13,248 +10,4 @@
# License for the specific language governing permissions and limitations
# under the License.
-"""
-System-level utilities and helper functions.
-"""
-
-import math
-import re
-import unicodedata
-
-import six
-
-from oslo.utils._i18n import _
-from oslo.utils import encodeutils
-
-
-UNIT_PREFIX_EXPONENT = {
- 'k': 1,
- 'K': 1,
- 'Ki': 1,
- 'M': 2,
- 'Mi': 2,
- 'G': 3,
- 'Gi': 3,
- 'T': 4,
- 'Ti': 4,
-}
-UNIT_SYSTEM_INFO = {
- 'IEC': (1024, re.compile(r'(^[-+]?\d*\.?\d+)([KMGT]i?)?(b|bit|B)$')),
- 'SI': (1000, re.compile(r'(^[-+]?\d*\.?\d+)([kMGT])?(b|bit|B)$')),
-}
-
-TRUE_STRINGS = ('1', 't', 'true', 'on', 'y', 'yes')
-FALSE_STRINGS = ('0', 'f', 'false', 'off', 'n', 'no')
-
-SLUGIFY_STRIP_RE = re.compile(r"[^\w\s-]")
-SLUGIFY_HYPHENATE_RE = re.compile(r"[-\s]+")
-
-
-# NOTE(flaper87): The following globals are used by `mask_password`
-_SANITIZE_KEYS = ['adminPass', 'admin_pass', 'password', 'admin_password',
- 'auth_token', 'new_pass', 'auth_password', 'secret_uuid']
-
-# NOTE(ldbragst): Let's build a list of regex objects using the list of
-# _SANITIZE_KEYS we already have. This way, we only have to add the new key
-# to the list of _SANITIZE_KEYS and we can generate regular expressions
-# for XML and JSON automatically.
-_SANITIZE_PATTERNS_2 = []
-_SANITIZE_PATTERNS_1 = []
-
-# NOTE(amrith): Some regular expressions have only one parameter, some
-# have two parameters. Use different lists of patterns here.
-_FORMAT_PATTERNS_1 = [r'(%(key)s\s*[=]\s*)[^\s^\'^\"]+']
-_FORMAT_PATTERNS_2 = [r'(%(key)s\s*[=]\s*[\"\']).*?([\"\'])',
- r'(%(key)s\s+[\"\']).*?([\"\'])',
- r'([-]{2}%(key)s\s+)[^\'^\"^=^\s]+([\s]*)',
- r'(<%(key)s>).*?(</%(key)s>)',
- r'([\"\']%(key)s[\"\']\s*:\s*[\"\']).*?([\"\'])',
- r'([\'"].*?%(key)s[\'"]\s*:\s*u?[\'"]).*?([\'"])',
- r'([\'"].*?%(key)s[\'"]\s*,\s*\'--?[A-z]+\'\s*,\s*u?'
- '[\'"]).*?([\'"])',
- r'(%(key)s\s*--?[A-z]+\s*)\S+(\s*)']
-
-for key in _SANITIZE_KEYS:
- for pattern in _FORMAT_PATTERNS_2:
- reg_ex = re.compile(pattern % {'key': key}, re.DOTALL)
- _SANITIZE_PATTERNS_2.append(reg_ex)
-
- for pattern in _FORMAT_PATTERNS_1:
- reg_ex = re.compile(pattern % {'key': key}, re.DOTALL)
- _SANITIZE_PATTERNS_1.append(reg_ex)
-
-
-def int_from_bool_as_string(subject):
- """Interpret a string as a boolean and return either 1 or 0.
-
- Any string value in:
-
- ('True', 'true', 'On', 'on', '1')
-
- is interpreted as a boolean True.
-
- Useful for JSON-decoded stuff and config file parsing
- """
- return int(bool_from_string(subject))
-
-
-def bool_from_string(subject, strict=False, default=False):
- """Interpret a string as a boolean.
-
- A case-insensitive match is performed such that strings matching 't',
- 'true', 'on', 'y', 'yes', or '1' are considered True and, when
- `strict=False`, anything else returns the value specified by 'default'.
-
- Useful for JSON-decoded stuff and config file parsing.
-
- If `strict=True`, unrecognized values, including None, will raise a
- ValueError which is useful when parsing values passed in from an API call.
- Strings yielding False are 'f', 'false', 'off', 'n', 'no', or '0'.
- """
- if not isinstance(subject, six.string_types):
- subject = six.text_type(subject)
-
- lowered = subject.strip().lower()
-
- if lowered in TRUE_STRINGS:
- return True
- elif lowered in FALSE_STRINGS:
- return False
- elif strict:
- acceptable = ', '.join(
- "'%s'" % s for s in sorted(TRUE_STRINGS + FALSE_STRINGS))
- msg = _("Unrecognized value '%(val)s', acceptable values are:"
- " %(acceptable)s") % {'val': subject,
- 'acceptable': acceptable}
- raise ValueError(msg)
- else:
- return default
-
-
-def string_to_bytes(text, unit_system='IEC', return_int=False):
- """Converts a string into an float representation of bytes.
-
- The units supported for IEC ::
-
- Kb(it), Kib(it), Mb(it), Mib(it), Gb(it), Gib(it), Tb(it), Tib(it)
- KB, KiB, MB, MiB, GB, GiB, TB, TiB
-
- The units supported for SI ::
-
- kb(it), Mb(it), Gb(it), Tb(it)
- kB, MB, GB, TB
-
- Note that the SI unit system does not support capital letter 'K'
-
- :param text: String input for bytes size conversion.
- :param unit_system: Unit system for byte size conversion.
- :param return_int: If True, returns integer representation of text
- in bytes. (default: decimal)
- :returns: Numerical representation of text in bytes.
- :raises ValueError: If text has an invalid value.
-
- """
- try:
- base, reg_ex = UNIT_SYSTEM_INFO[unit_system]
- except KeyError:
- msg = _('Invalid unit system: "%s"') % unit_system
- raise ValueError(msg)
- match = reg_ex.match(text)
- if match:
- magnitude = float(match.group(1))
- unit_prefix = match.group(2)
- if match.group(3) in ['b', 'bit']:
- magnitude /= 8
- else:
- msg = _('Invalid string format: %s') % text
- raise ValueError(msg)
- if not unit_prefix:
- res = magnitude
- else:
- res = magnitude * pow(base, UNIT_PREFIX_EXPONENT[unit_prefix])
- if return_int:
- return int(math.ceil(res))
- return res
-
-
-def to_slug(value, incoming=None, errors="strict"):
- """Normalize string.
-
- Convert to lowercase, remove non-word characters, and convert spaces
- to hyphens.
-
- Inspired by Django's `slugify` filter.
-
- :param value: Text to slugify
- :param incoming: Text's current encoding
- :param errors: Errors handling policy. See here for valid
- values http://docs.python.org/2/library/codecs.html
- :returns: slugified unicode representation of `value`
- :raises TypeError: If text is not an instance of str
- """
- value = encodeutils.safe_decode(value, incoming, errors)
- # NOTE(aababilov): no need to use safe_(encode|decode) here:
- # encodings are always "ascii", error handling is always "ignore"
- # and types are always known (first: unicode; second: str)
- value = unicodedata.normalize("NFKD", value).encode(
- "ascii", "ignore").decode("ascii")
- value = SLUGIFY_STRIP_RE.sub("", value).strip().lower()
- return SLUGIFY_HYPHENATE_RE.sub("-", value)
-
-
-def mask_password(message, secret="***"):
- """Replace password with 'secret' in message.
-
- :param message: The string which includes security information.
- :param secret: value with which to replace passwords.
- :returns: The unicode value of message with the password fields masked.
-
- For example:
-
- >>> mask_password("'adminPass' : 'aaaaa'")
- "'adminPass' : '***'"
- >>> mask_password("'admin_pass' : 'aaaaa'")
- "'admin_pass' : '***'"
- >>> mask_password('"password" : "aaaaa"')
- '"password" : "***"'
- >>> mask_password("'original_password' : 'aaaaa'")
- "'original_password' : '***'"
- >>> mask_password("u'original_password' : u'aaaaa'")
- "u'original_password' : u'***'"
- """
-
- try:
- message = six.text_type(message)
- except UnicodeDecodeError:
- # NOTE(jecarey): Temporary fix to handle cases where message is a
- # byte string. A better solution will be provided in Kilo.
- pass
-
- # NOTE(ldbragst): Check to see if anything in message contains any key
- # specified in _SANITIZE_KEYS, if not then just return the message since
- # we don't have to mask any passwords.
- if not any(key in message for key in _SANITIZE_KEYS):
- return message
-
- substitute = r'\g<1>' + secret + r'\g<2>'
- for pattern in _SANITIZE_PATTERNS_2:
- message = re.sub(pattern, substitute, message)
-
- substitute = r'\g<1>' + secret
- for pattern in _SANITIZE_PATTERNS_1:
- message = re.sub(pattern, substitute, message)
-
- return message
-
-
-def is_int_like(val):
- """Check if a value looks like an integer with base 10.
-
- :param val: Value to verify
- :type val: string
- :returns: bool
- """
- try:
- return six.text_type(int(val)) == six.text_type(val)
- except (TypeError, ValueError):
- return False
+from oslo_utils.strutils import * # noqa
diff --git a/oslo/utils/timeutils.py b/oslo/utils/timeutils.py
index c48da95..65b856a 100644
--- a/oslo/utils/timeutils.py
+++ b/oslo/utils/timeutils.py
@@ -1,6 +1,3 @@
-# Copyright 2011 OpenStack Foundation.
-# All Rights Reserved.
-#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
@@ -13,198 +10,4 @@
# License for the specific language governing permissions and limitations
# under the License.
-"""
-Time related utilities and helper functions.
-"""
-
-import calendar
-import datetime
-import time
-
-import iso8601
-import six
-
-
-# ISO 8601 extended time format with microseconds
-_ISO8601_TIME_FORMAT_SUBSECOND = '%Y-%m-%dT%H:%M:%S.%f'
-_ISO8601_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S'
-PERFECT_TIME_FORMAT = _ISO8601_TIME_FORMAT_SUBSECOND
-
-
-def isotime(at=None, subsecond=False):
- """Stringify time in ISO 8601 format."""
- if not at:
- at = utcnow()
- st = at.strftime(_ISO8601_TIME_FORMAT
- if not subsecond
- else _ISO8601_TIME_FORMAT_SUBSECOND)
- tz = at.tzinfo.tzname(None) if at.tzinfo else 'UTC'
- st += ('Z' if tz == 'UTC' else tz)
- return st
-
-
-def parse_isotime(timestr):
- """Parse time from ISO 8601 format."""
- try:
- return iso8601.parse_date(timestr)
- except iso8601.ParseError as e:
- raise ValueError(six.text_type(e))
- except TypeError as e:
- raise ValueError(six.text_type(e))
-
-
-def strtime(at=None, fmt=PERFECT_TIME_FORMAT):
- """Returns formatted utcnow."""
- if not at:
- at = utcnow()
- return at.strftime(fmt)
-
-
-def parse_strtime(timestr, fmt=PERFECT_TIME_FORMAT):
- """Turn a formatted time back into a datetime."""
- return datetime.datetime.strptime(timestr, fmt)
-
-
-def normalize_time(timestamp):
- """Normalize time in arbitrary timezone to UTC naive object."""
- offset = timestamp.utcoffset()
- if offset is None:
- return timestamp
- return timestamp.replace(tzinfo=None) - offset
-
-
-def is_older_than(before, seconds):
- """Return True if before is older than seconds."""
- if isinstance(before, six.string_types):
- before = parse_strtime(before).replace(tzinfo=None)
- else:
- before = before.replace(tzinfo=None)
-
- return utcnow() - before > datetime.timedelta(seconds=seconds)
-
-
-def is_newer_than(after, seconds):
- """Return True if after is newer than seconds."""
- if isinstance(after, six.string_types):
- after = parse_strtime(after).replace(tzinfo=None)
- else:
- after = after.replace(tzinfo=None)
-
- return after - utcnow() > datetime.timedelta(seconds=seconds)
-
-
-def utcnow_ts():
- """Timestamp version of our utcnow function."""
- if utcnow.override_time is None:
- # NOTE(kgriffs): This is several times faster
- # than going through calendar.timegm(...)
- return int(time.time())
-
- return calendar.timegm(utcnow().timetuple())
-
-
-def utcnow():
- """Overridable version of utils.utcnow."""
- if utcnow.override_time:
- try:
- return utcnow.override_time.pop(0)
- except AttributeError:
- return utcnow.override_time
- return datetime.datetime.utcnow()
-
-
-def iso8601_from_timestamp(timestamp):
- """Returns an iso8601 formatted date from timestamp."""
- return isotime(datetime.datetime.utcfromtimestamp(timestamp))
-
-
-utcnow.override_time = None
-
-
-def set_time_override(override_time=None):
- """Overrides utils.utcnow.
-
- Make it return a constant time or a list thereof, one at a time.
-
- :param override_time: datetime instance or list thereof. If not
- given, defaults to the current UTC time.
- """
- utcnow.override_time = override_time or datetime.datetime.utcnow()
-
-
-def advance_time_delta(timedelta):
- """Advance overridden time using a datetime.timedelta."""
- assert utcnow.override_time is not None
- try:
- for dt in utcnow.override_time:
- dt += timedelta
- except TypeError:
- utcnow.override_time += timedelta
-
-
-def advance_time_seconds(seconds):
- """Advance overridden time by seconds."""
- advance_time_delta(datetime.timedelta(0, seconds))
-
-
-def clear_time_override():
- """Remove the overridden time."""
- utcnow.override_time = None
-
-
-def marshall_now(now=None):
- """Make an rpc-safe datetime with microseconds.
-
- Note: tzinfo is stripped, but not required for relative times.
- """
- if not now:
- now = utcnow()
- return dict(day=now.day, month=now.month, year=now.year, hour=now.hour,
- minute=now.minute, second=now.second,
- microsecond=now.microsecond)
-
-
-def unmarshall_time(tyme):
- """Unmarshall a datetime dict."""
- return datetime.datetime(day=tyme['day'],
- month=tyme['month'],
- year=tyme['year'],
- hour=tyme['hour'],
- minute=tyme['minute'],
- second=tyme['second'],
- microsecond=tyme['microsecond'])
-
-
-def delta_seconds(before, after):
- """Return the difference between two timing objects.
-
- Compute the difference in seconds between two date, time, or
- datetime objects (as a float, to microsecond resolution).
- """
- delta = after - before
- return total_seconds(delta)
-
-
-def total_seconds(delta):
- """Return the total seconds of datetime.timedelta object.
-
- Compute total seconds of datetime.timedelta, datetime.timedelta
- doesn't have method total_seconds in Python2.6, calculate it manually.
- """
- try:
- return delta.total_seconds()
- except AttributeError:
- return ((delta.days * 24 * 3600) + delta.seconds +
- float(delta.microseconds) / (10 ** 6))
-
-
-def is_soon(dt, window):
- """Determines if time is going to happen in the next window seconds.
-
- :param dt: the time
- :param window: minimum seconds to remain to consider the time not soon
-
- :return: True if expiration is within the given duration
- """
- soon = (utcnow() + datetime.timedelta(seconds=window))
- return normalize_time(dt) <= soon
+from oslo_utils.timeutils import * # noqa
diff --git a/oslo/utils/units.py b/oslo/utils/units.py
index 4817da5..741e740 100644
--- a/oslo/utils/units.py
+++ b/oslo/utils/units.py
@@ -1,6 +1,3 @@
-# Copyright 2013 IBM Corp
-# All Rights Reserved.
-#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
@@ -13,26 +10,4 @@
# License for the specific language governing permissions and limitations
# under the License.
-"""
-Unit constants
-"""
-
-# Binary unit constants.
-Ki = 1024
-Mi = 1024 ** 2
-Gi = 1024 ** 3
-Ti = 1024 ** 4
-Pi = 1024 ** 5
-Ei = 1024 ** 6
-Zi = 1024 ** 7
-Yi = 1024 ** 8
-
-# Decimal unit constants.
-k = 1000
-M = 1000 ** 2
-G = 1000 ** 3
-T = 1000 ** 4
-P = 1000 ** 5
-E = 1000 ** 6
-Z = 1000 ** 7
-Y = 1000 ** 8
+from oslo_utils.units import * # noqa
diff --git a/oslo/utils/uuidutils.py b/oslo/utils/uuidutils.py
index 62b0b5f..4f98a84 100644
--- a/oslo/utils/uuidutils.py
+++ b/oslo/utils/uuidutils.py
@@ -1,6 +1,3 @@
-# Copyright (c) 2012 Intel Corporation.
-# All Rights Reserved.
-#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
@@ -13,33 +10,4 @@
# License for the specific language governing permissions and limitations
# under the License.
-"""
-UUID related utilities and helper functions.
-"""
-
-import uuid
-
-
-def generate_uuid():
- return str(uuid.uuid4())
-
-
-def _format_uuid_string(string):
- return (string.replace('urn:', '')
- .replace('uuid:', '')
- .strip('{}')
- .replace('-', '')
- .lower())
-
-
-def is_uuid_like(val):
- """Returns validation of a value as a UUID.
-
- :param val: Value to verify
- :type val: string
- :returns: bool
- """
- try:
- return str(uuid.UUID(val)).replace('-', '') == _format_uuid_string(val)
- except (TypeError, ValueError, AttributeError):
- return False
+from oslo_utils.uuidutils import * # noqa
diff --git a/oslo_utils/__init__.py b/oslo_utils/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/oslo_utils/__init__.py
diff --git a/oslo/utils/_i18n.py b/oslo_utils/_i18n.py
index 6045a10..6045a10 100644
--- a/oslo/utils/_i18n.py
+++ b/oslo_utils/_i18n.py
diff --git a/oslo_utils/encodeutils.py b/oslo_utils/encodeutils.py
new file mode 100644
index 0000000..14bd717
--- /dev/null
+++ b/oslo_utils/encodeutils.py
@@ -0,0 +1,95 @@
+# Copyright 2014 Red Hat, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import sys
+
+import six
+
+
+def safe_decode(text, incoming=None, errors='strict'):
+ """Decodes incoming text/bytes string using `incoming` if they're not
+ already unicode.
+
+ :param incoming: Text's current encoding
+ :param errors: Errors handling policy. See here for valid
+ values http://docs.python.org/2/library/codecs.html
+ :returns: text or a unicode `incoming` encoded
+ representation of it.
+ :raises TypeError: If text is not an instance of str
+ """
+ if not isinstance(text, (six.string_types, six.binary_type)):
+ raise TypeError("%s can't be decoded" % type(text))
+
+ if isinstance(text, six.text_type):
+ return text
+
+ if not incoming:
+ incoming = (sys.stdin.encoding or
+ sys.getdefaultencoding())
+
+ try:
+ return text.decode(incoming, errors)
+ except UnicodeDecodeError:
+ # Note(flaper87) If we get here, it means that
+ # sys.stdin.encoding / sys.getdefaultencoding
+ # didn't return a suitable encoding to decode
+ # text. This happens mostly when global LANG
+ # var is not set correctly and there's no
+ # default encoding. In this case, most likely
+ # python will use ASCII or ANSI encoders as
+ # default encodings but they won't be capable
+ # of decoding non-ASCII characters.
+ #
+ # Also, UTF-8 is being used since it's an ASCII
+ # extension.
+ return text.decode('utf-8', errors)
+
+
+def safe_encode(text, incoming=None,
+ encoding='utf-8', errors='strict'):
+ """Encodes incoming text/bytes string using `encoding`.
+
+ If incoming is not specified, text is expected to be encoded with
+ current python's default encoding. (`sys.getdefaultencoding`)
+
+ :param incoming: Text's current encoding
+ :param encoding: Expected encoding for text (Default UTF-8)
+ :param errors: Errors handling policy. See here for valid
+ values http://docs.python.org/2/library/codecs.html
+ :returns: text or a bytestring `encoding` encoded
+ representation of it.
+ :raises TypeError: If text is not an instance of str
+ """
+ if not isinstance(text, (six.string_types, six.binary_type)):
+ raise TypeError("%s can't be encoded" % type(text))
+
+ if not incoming:
+ incoming = (sys.stdin.encoding or
+ sys.getdefaultencoding())
+
+ # Avoid case issues in comparisons
+ if hasattr(incoming, 'lower'):
+ incoming = incoming.lower()
+ if hasattr(encoding, 'lower'):
+ encoding = encoding.lower()
+
+ if isinstance(text, six.text_type):
+ return text.encode(encoding, errors)
+ elif text and encoding != incoming:
+ # Decode text before encoding it with `encoding`
+ text = safe_decode(text, incoming, errors)
+ return text.encode(encoding, errors)
+ else:
+ return text
diff --git a/oslo_utils/excutils.py b/oslo_utils/excutils.py
new file mode 100644
index 0000000..c5839d6
--- /dev/null
+++ b/oslo_utils/excutils.py
@@ -0,0 +1,113 @@
+# Copyright 2011 OpenStack Foundation.
+# Copyright 2012, Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Exception related utilities.
+"""
+
+import logging
+import sys
+import time
+import traceback
+
+import six
+
+from oslo_utils._i18n import _LE
+
+
+class save_and_reraise_exception(object):
+ """Save current exception, run some code and then re-raise.
+
+ In some cases the exception context can be cleared, resulting in None
+ being attempted to be re-raised after an exception handler is run. This
+ can happen when eventlet switches greenthreads or when running an
+ exception handler, code raises and catches an exception. In both
+ cases the exception context will be cleared.
+
+ To work around this, we save the exception state, run handler code, and
+ then re-raise the original exception. If another exception occurs, the
+ saved exception is logged and the new exception is re-raised.
+
+ In some cases the caller may not want to re-raise the exception, and
+ for those circumstances this context provides a reraise flag that
+ can be used to suppress the exception. For example::
+
+ except Exception:
+ with save_and_reraise_exception() as ctxt:
+ decide_if_need_reraise()
+ if not should_be_reraised:
+ ctxt.reraise = False
+
+ If another exception occurs and reraise flag is False,
+ the saved exception will not be logged.
+
+ If the caller wants to raise new exception during exception handling
+ he/she sets reraise to False initially with an ability to set it back to
+ True if needed::
+
+ except Exception:
+ with save_and_reraise_exception(reraise=False) as ctxt:
+ [if statements to determine whether to raise a new exception]
+ # Not raising a new exception, so reraise
+ ctxt.reraise = True
+ """
+ def __init__(self, reraise=True):
+ self.reraise = reraise
+
+ def __enter__(self):
+ self.type_, self.value, self.tb, = sys.exc_info()
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ if exc_type is not None:
+ if self.reraise:
+ logging.error(_LE('Original exception being dropped: %s'),
+ traceback.format_exception(self.type_,
+ self.value,
+ self.tb))
+ return False
+ if self.reraise:
+ six.reraise(self.type_, self.value, self.tb)
+
+
+def forever_retry_uncaught_exceptions(infunc):
+ def inner_func(*args, **kwargs):
+ last_log_time = 0
+ last_exc_message = None
+ exc_count = 0
+ while True:
+ try:
+ return infunc(*args, **kwargs)
+ except Exception as exc:
+ this_exc_message = six.u(str(exc))
+ if this_exc_message == last_exc_message:
+ exc_count += 1
+ else:
+ exc_count = 1
+ # Do not log any more frequently than once a minute unless
+ # the exception message changes
+ cur_time = int(time.time())
+ if (cur_time - last_log_time > 60 or
+ this_exc_message != last_exc_message):
+ logging.exception(
+ _LE('Unexpected exception occurred %d time(s)... '
+ 'retrying.') % exc_count)
+ last_log_time = cur_time
+ last_exc_message = this_exc_message
+ exc_count = 0
+ # This should be a very rare event. In case it isn't, do
+ # a sleep.
+ time.sleep(1)
+ return inner_func
diff --git a/oslo_utils/importutils.py b/oslo_utils/importutils.py
new file mode 100644
index 0000000..043f817
--- /dev/null
+++ b/oslo_utils/importutils.py
@@ -0,0 +1,73 @@
+# Copyright 2011 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Import related utilities and helper functions.
+"""
+
+import sys
+import traceback
+
+
+def import_class(import_str):
+ """Returns a class from a string including module and class."""
+ mod_str, _sep, class_str = import_str.rpartition('.')
+ __import__(mod_str)
+ try:
+ return getattr(sys.modules[mod_str], class_str)
+ except AttributeError:
+ raise ImportError('Class %s cannot be found (%s)' %
+ (class_str,
+ traceback.format_exception(*sys.exc_info())))
+
+
+def import_object(import_str, *args, **kwargs):
+ """Import a class and return an instance of it."""
+ return import_class(import_str)(*args, **kwargs)
+
+
+def import_object_ns(name_space, import_str, *args, **kwargs):
+ """Tries to import object from default namespace.
+
+ Imports a class and return an instance of it, first by trying
+ to find the class in a default namespace, then failing back to
+ a full path if not found in the default namespace.
+ """
+ import_value = "%s.%s" % (name_space, import_str)
+ try:
+ return import_class(import_value)(*args, **kwargs)
+ except ImportError:
+ return import_class(import_str)(*args, **kwargs)
+
+
+def import_module(import_str):
+ """Import a module."""
+ __import__(import_str)
+ return sys.modules[import_str]
+
+
+def import_versioned_module(version, submodule=None):
+ module = 'oslo.v%s' % version
+ if submodule:
+ module = '.'.join((module, submodule))
+ return import_module(module)
+
+
+def try_import(import_str, default=None):
+ """Try to import a module and if it fails return default."""
+ try:
+ return import_module(import_str)
+ except ImportError:
+ return default
diff --git a/oslo_utils/netutils.py b/oslo_utils/netutils.py
new file mode 100644
index 0000000..7849ca6
--- /dev/null
+++ b/oslo_utils/netutils.py
@@ -0,0 +1,286 @@
+# Copyright 2012 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Network-related utilities and helper functions.
+"""
+
+import logging
+import socket
+
+import netaddr
+import netifaces
+from six.moves.urllib import parse
+
+from oslo_utils._i18n import _LI
+from oslo_utils._i18n import _LW
+
+LOG = logging.getLogger(__name__)
+
+
+def parse_host_port(address, default_port=None):
+ """Interpret a string as a host:port pair.
+
+ An IPv6 address MUST be escaped if accompanied by a port,
+ because otherwise ambiguity ensues: 2001:db8:85a3::8a2e:370:7334
+ means both [2001:db8:85a3::8a2e:370:7334] and
+ [2001:db8:85a3::8a2e:370]:7334.
+
+ >>> parse_host_port('server01:80')
+ ('server01', 80)
+ >>> parse_host_port('server01')
+ ('server01', None)
+ >>> parse_host_port('server01', default_port=1234)
+ ('server01', 1234)
+ >>> parse_host_port('[::1]:80')
+ ('::1', 80)
+ >>> parse_host_port('[::1]')
+ ('::1', None)
+ >>> parse_host_port('[::1]', default_port=1234)
+ ('::1', 1234)
+ >>> parse_host_port('2001:db8:85a3::8a2e:370:7334', default_port=1234)
+ ('2001:db8:85a3::8a2e:370:7334', 1234)
+ >>> parse_host_port(None)
+ (None, None)
+ """
+ if not address:
+ return (None, None)
+
+ if address[0] == '[':
+ # Escaped ipv6
+ _host, _port = address[1:].split(']')
+ host = _host
+ if ':' in _port:
+ port = _port.split(':')[1]
+ else:
+ port = default_port
+ else:
+ if address.count(':') == 1:
+ host, port = address.split(':')
+ else:
+ # 0 means ipv4, >1 means ipv6.
+ # We prohibit unescaped ipv6 addresses with port.
+ host = address
+ port = default_port
+
+ return (host, None if port is None else int(port))
+
+
+def is_valid_ipv4(address):
+ """Verify that address represents a valid IPv4 address.
+
+ :param address: Value to verify
+ :type address: string
+ :returns: bool
+ """
+ try:
+ return netaddr.valid_ipv4(address)
+ except Exception:
+ return False
+
+
+def is_valid_ipv6(address):
+ """Verify that address represents a valid IPv6 address.
+
+ :param address: Value to verify
+ :type address: string
+ :returns: bool
+ """
+ try:
+ return netaddr.valid_ipv6(address)
+ except Exception:
+ return False
+
+
+def is_valid_ip(address):
+ """Verify that address represents a valid IP address.
+
+ :param address: Value to verify
+ :type address: string
+ :returns: bool
+ """
+ return is_valid_ipv4(address) or is_valid_ipv6(address)
+
+
+def is_valid_port(port):
+ """Verify that port represents a valid port number."""
+ try:
+ val = int(port)
+ except (ValueError, TypeError):
+ return False
+
+ return (val > 0 and val <= 65535)
+
+
+def get_my_ipv4():
+ """Returns the actual ipv4 of the local machine.
+
+ This code figures out what source address would be used if some traffic
+ were to be sent out to some well known address on the Internet. In this
+ case, IP from RFC5737 is used, but the specific address does not
+ matter much. No traffic is actually sent.
+ """
+ try:
+ csock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ csock.connect(('192.0.2.0', 80))
+ (addr, port) = csock.getsockname()
+ csock.close()
+ return addr
+ except socket.error:
+ return _get_my_ipv4_address()
+
+
+def _get_my_ipv4_address():
+ """Figure out the best ipv4
+ """
+ LOCALHOST = '127.0.0.1'
+ gtw = netifaces.gateways()
+ try:
+ interface = gtw['default'][netifaces.AF_INET][1]
+ except (KeyError, IndexError):
+ LOG.info(_LI('Could not determine default network interface, '
+ 'using 127.0.0.1 for IPv4 address'))
+ try:
+ return netifaces.ifaddresses(interface)[netifaces.AF_INET][0]['addr']
+ except (KeyError, IndexError):
+ LOG.info(_LI('Could not determine IPv4 address for interface %s, '
+ 'using 127.0.0.1'),
+ interface)
+ except Exception as e:
+ LOG.info(_LI('Could not determine IPv4 address for '
+ 'interface %(interface)s: %(error)s'),
+ {'interface': interface, 'error': e})
+ return LOCALHOST
+
+
+class _ModifiedSplitResult(parse.SplitResult):
+ """Split results class for urlsplit."""
+
+ # NOTE(dims): The functions below are needed for Python 2.6.x.
+ # We can remove these when we drop support for 2.6.x.
+ @property
+ def hostname(self):
+ netloc = self.netloc.split('@', 1)[-1]
+ host, port = parse_host_port(netloc)
+ return host
+
+ @property
+ def port(self):
+ netloc = self.netloc.split('@', 1)[-1]
+ host, port = parse_host_port(netloc)
+ return port
+
+ def params(self, collapse=True):
+ """Extracts the query parameters from the split urls components.
+
+ This method will provide back as a dictionary the query parameter
+ names and values that were provided in the url.
+
+ :param collapse: Boolean, turn on or off collapsing of query values
+ with the same name. Since a url can contain the same query parameter
+ name with different values it may or may not be useful for users to
+ care that this has happened. This parameter when True uses the
+ last value that was given for a given name, while if False it will
+ retain all values provided by associating the query parameter name with
+ a list of values instead of a single (non-list) value.
+ """
+ if self.query:
+ if collapse:
+ return dict(parse.parse_qsl(self.query))
+ else:
+ params = {}
+ for (key, value) in parse.parse_qsl(self.query):
+ if key in params:
+ if isinstance(params[key], list):
+ params[key].append(value)
+ else:
+ params[key] = [params[key], value]
+ else:
+ params[key] = value
+ return params
+ else:
+ return {}
+
+
+def urlsplit(url, scheme='', allow_fragments=True):
+ """Parse a URL using urlparse.urlsplit(), splitting query and fragments.
+ This function papers over Python issue9374_ when needed.
+
+ .. _issue9374: http://bugs.python.org/issue9374
+
+ The parameters are the same as urlparse.urlsplit.
+ """
+ scheme, netloc, path, query, fragment = parse.urlsplit(
+ url, scheme, allow_fragments)
+ if allow_fragments and '#' in path:
+ path, fragment = path.split('#', 1)
+ if '?' in path:
+ path, query = path.split('?', 1)
+ return _ModifiedSplitResult(scheme, netloc,
+ path, query, fragment)
+
+
+def set_tcp_keepalive(sock, tcp_keepalive=True,
+ tcp_keepidle=None,
+ tcp_keepalive_interval=None,
+ tcp_keepalive_count=None):
+ """Set values for tcp keepalive parameters
+
+ This function configures tcp keepalive parameters if users wish to do
+ so.
+
+ :param tcp_keepalive: Boolean, turn on or off tcp_keepalive. If users are
+ not sure, this should be True, and default values will be used.
+
+ :param tcp_keepidle: time to wait before starting to send keepalive probes
+ :param tcp_keepalive_interval: time between successive probes, once the
+ initial wait time is over
+ :param tcp_keepalive_count: number of probes to send before the connection
+ is killed
+ """
+
+ # NOTE(praneshp): Despite keepalive being a tcp concept, the level is
+ # still SOL_SOCKET. This is a quirk.
+ if isinstance(tcp_keepalive, bool):
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, tcp_keepalive)
+ else:
+ raise TypeError("tcp_keepalive must be a boolean")
+
+ if not tcp_keepalive:
+ return
+
+ # These options aren't available in the OS X version of eventlet,
+ # Idle + Count * Interval effectively gives you the total timeout.
+ if tcp_keepidle is not None:
+ if hasattr(socket, 'TCP_KEEPIDLE'):
+ sock.setsockopt(socket.IPPROTO_TCP,
+ socket.TCP_KEEPIDLE,
+ tcp_keepidle)
+ else:
+ LOG.warning(_LW('tcp_keepidle not available on your system'))
+ if tcp_keepalive_interval is not None:
+ if hasattr(socket, 'TCP_KEEPINTVL'):
+ sock.setsockopt(socket.IPPROTO_TCP,
+ socket.TCP_KEEPINTVL,
+ tcp_keepalive_interval)
+ else:
+ LOG.warning(_LW('tcp_keepintvl not available on your system'))
+ if tcp_keepalive_count is not None:
+ if hasattr(socket, 'TCP_KEEPCNT'):
+ sock.setsockopt(socket.IPPROTO_TCP,
+ socket.TCP_KEEPCNT,
+ tcp_keepalive_count)
+ else:
+ LOG.warning(_LW('tcp_keepcnt not available on your system'))
diff --git a/oslo_utils/reflection.py b/oslo_utils/reflection.py
new file mode 100644
index 0000000..b964cf3
--- /dev/null
+++ b/oslo_utils/reflection.py
@@ -0,0 +1,208 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import inspect
+import types
+
+import six
+
+try:
+ _TYPE_TYPE = types.TypeType
+except AttributeError:
+ _TYPE_TYPE = type
+
+# See: https://docs.python.org/2/library/__builtin__.html#module-__builtin__
+# and see https://docs.python.org/2/reference/executionmodel.html (and likely
+# others)...
+_BUILTIN_MODULES = ('builtins', '__builtin__', '__builtins__', 'exceptions')
+
+
+def _get_members(obj, exclude_hidden):
+ """Yields the members of an object, filtering by hidden/not hidden."""
+ for (name, value) in inspect.getmembers(obj):
+ if name.startswith("_") and exclude_hidden:
+ continue
+ yield (name, value)
+
+
+def get_member_names(obj, exclude_hidden=True):
+ """Get all the member names for a object."""
+ return [name for (name, _obj) in _get_members(obj, exclude_hidden)]
+
+
+def get_class_name(obj, fully_qualified=True):
+ """Get class name for object.
+
+ If object is a type, fully qualified name of the type is returned.
+ Else, fully qualified name of the type of the object is returned.
+ For builtin types, just name is returned.
+ """
+ if not isinstance(obj, six.class_types):
+ obj = type(obj)
+ try:
+ built_in = obj.__module__ in _BUILTIN_MODULES
+ except AttributeError:
+ pass
+ else:
+ if built_in:
+ try:
+ return obj.__qualname__
+ except AttributeError:
+ return obj.__name__
+ pieces = []
+ try:
+ pieces.append(obj.__qualname__)
+ except AttributeError:
+ pieces.append(obj.__name__)
+ if fully_qualified:
+ try:
+ pieces.insert(0, obj.__module__)
+ except AttributeError:
+ pass
+ return '.'.join(pieces)
+
+
+def get_all_class_names(obj, up_to=object):
+ """Get class names of object parent classes.
+
+ Iterate over all class names object is instance or subclass of,
+ in order of method resolution (mro). If up_to parameter is provided,
+ only name of classes that are sublcasses to that class are returned.
+ """
+ if not isinstance(obj, six.class_types):
+ obj = type(obj)
+ for cls in obj.mro():
+ if issubclass(cls, up_to):
+ yield get_class_name(cls)
+
+
+def get_callable_name(function):
+ """Generate a name from callable.
+
+ Tries to do the best to guess fully qualified callable name.
+ """
+ method_self = get_method_self(function)
+ if method_self is not None:
+ # This is a bound method.
+ if isinstance(method_self, six.class_types):
+ # This is a bound class method.
+ im_class = method_self
+ else:
+ im_class = type(method_self)
+ try:
+ parts = (im_class.__module__, function.__qualname__)
+ except AttributeError:
+ parts = (im_class.__module__, im_class.__name__, function.__name__)
+ elif inspect.ismethod(function) or inspect.isfunction(function):
+ # This could be a function, a static method, a unbound method...
+ try:
+ parts = (function.__module__, function.__qualname__)
+ except AttributeError:
+ if hasattr(function, 'im_class'):
+ # This is a unbound method, which exists only in python 2.x
+ im_class = function.im_class
+ parts = (im_class.__module__,
+ im_class.__name__, function.__name__)
+ else:
+ parts = (function.__module__, function.__name__)
+ else:
+ im_class = type(function)
+ if im_class is _TYPE_TYPE:
+ im_class = function
+ try:
+ parts = (im_class.__module__, im_class.__qualname__)
+ except AttributeError:
+ parts = (im_class.__module__, im_class.__name__)
+ return '.'.join(parts)
+
+
+def get_method_self(method):
+ if not inspect.ismethod(method):
+ return None
+ try:
+ return six.get_method_self(method)
+ except AttributeError:
+ return None
+
+
+def is_same_callback(callback1, callback2, strict=True):
+ """Returns if the two callbacks are the same."""
+ if callback1 is callback2:
+ # This happens when plain methods are given (or static/non-bound
+ # methods).
+ return True
+ if callback1 == callback2:
+ if not strict:
+ return True
+ # Two bound methods are equal if functions themselves are equal and
+ # objects they are applied to are equal. This means that a bound
+ # method could be the same bound method on another object if the
+ # objects have __eq__ methods that return true (when in fact it is a
+ # different bound method). Python u so crazy!
+ try:
+ self1 = six.get_method_self(callback1)
+ self2 = six.get_method_self(callback2)
+ return self1 is self2
+ except AttributeError:
+ pass
+ return False
+
+
+def is_bound_method(method):
+ """Returns if the given method is bound to an object."""
+ return bool(get_method_self(method))
+
+
+def is_subclass(obj, cls):
+ """Returns if the object is class and it is subclass of a given class."""
+ return inspect.isclass(obj) and issubclass(obj, cls)
+
+
+def _get_arg_spec(function):
+ if isinstance(function, _TYPE_TYPE):
+ bound = True
+ function = function.__init__
+ elif isinstance(function, (types.FunctionType, types.MethodType)):
+ bound = is_bound_method(function)
+ function = getattr(function, '__wrapped__', function)
+ else:
+ function = function.__call__
+ bound = is_bound_method(function)
+ return inspect.getargspec(function), bound
+
+
+def get_callable_args(function, required_only=False):
+ """Get names of callable arguments.
+
+ Special arguments (like *args and **kwargs) are not included into
+ output.
+
+ If required_only is True, optional arguments (with default values)
+ are not included into output.
+ """
+ argspec, bound = _get_arg_spec(function)
+ f_args = argspec.args
+ if required_only and argspec.defaults:
+ f_args = f_args[:-len(argspec.defaults)]
+ if bound:
+ f_args = f_args[1:]
+ return f_args
+
+
+def accepts_kwargs(function):
+ """Returns True if function accepts kwargs."""
+ argspec, _bound = _get_arg_spec(function)
+ return bool(argspec.keywords)
diff --git a/oslo_utils/strutils.py b/oslo_utils/strutils.py
new file mode 100644
index 0000000..782903d
--- /dev/null
+++ b/oslo_utils/strutils.py
@@ -0,0 +1,260 @@
+# Copyright 2011 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+System-level utilities and helper functions.
+"""
+
+import math
+import re
+import unicodedata
+
+import six
+
+from oslo_utils._i18n import _
+from oslo_utils import encodeutils
+
+
+UNIT_PREFIX_EXPONENT = {
+ 'k': 1,
+ 'K': 1,
+ 'Ki': 1,
+ 'M': 2,
+ 'Mi': 2,
+ 'G': 3,
+ 'Gi': 3,
+ 'T': 4,
+ 'Ti': 4,
+}
+UNIT_SYSTEM_INFO = {
+ 'IEC': (1024, re.compile(r'(^[-+]?\d*\.?\d+)([KMGT]i?)?(b|bit|B)$')),
+ 'SI': (1000, re.compile(r'(^[-+]?\d*\.?\d+)([kMGT])?(b|bit|B)$')),
+}
+
+TRUE_STRINGS = ('1', 't', 'true', 'on', 'y', 'yes')
+FALSE_STRINGS = ('0', 'f', 'false', 'off', 'n', 'no')
+
+SLUGIFY_STRIP_RE = re.compile(r"[^\w\s-]")
+SLUGIFY_HYPHENATE_RE = re.compile(r"[-\s]+")
+
+
+# NOTE(flaper87): The following globals are used by `mask_password`
+_SANITIZE_KEYS = ['adminPass', 'admin_pass', 'password', 'admin_password',
+ 'auth_token', 'new_pass', 'auth_password', 'secret_uuid']
+
+# NOTE(ldbragst): Let's build a list of regex objects using the list of
+# _SANITIZE_KEYS we already have. This way, we only have to add the new key
+# to the list of _SANITIZE_KEYS and we can generate regular expressions
+# for XML and JSON automatically.
+_SANITIZE_PATTERNS_2 = []
+_SANITIZE_PATTERNS_1 = []
+
+# NOTE(amrith): Some regular expressions have only one parameter, some
+# have two parameters. Use different lists of patterns here.
+_FORMAT_PATTERNS_1 = [r'(%(key)s\s*[=]\s*)[^\s^\'^\"]+']
+_FORMAT_PATTERNS_2 = [r'(%(key)s\s*[=]\s*[\"\']).*?([\"\'])',
+ r'(%(key)s\s+[\"\']).*?([\"\'])',
+ r'([-]{2}%(key)s\s+)[^\'^\"^=^\s]+([\s]*)',
+ r'(<%(key)s>).*?(</%(key)s>)',
+ r'([\"\']%(key)s[\"\']\s*:\s*[\"\']).*?([\"\'])',
+ r'([\'"].*?%(key)s[\'"]\s*:\s*u?[\'"]).*?([\'"])',
+ r'([\'"].*?%(key)s[\'"]\s*,\s*\'--?[A-z]+\'\s*,\s*u?'
+ '[\'"]).*?([\'"])',
+ r'(%(key)s\s*--?[A-z]+\s*)\S+(\s*)']
+
+for key in _SANITIZE_KEYS:
+ for pattern in _FORMAT_PATTERNS_2:
+ reg_ex = re.compile(pattern % {'key': key}, re.DOTALL)
+ _SANITIZE_PATTERNS_2.append(reg_ex)
+
+ for pattern in _FORMAT_PATTERNS_1:
+ reg_ex = re.compile(pattern % {'key': key}, re.DOTALL)
+ _SANITIZE_PATTERNS_1.append(reg_ex)
+
+
+def int_from_bool_as_string(subject):
+ """Interpret a string as a boolean and return either 1 or 0.
+
+ Any string value in:
+
+ ('True', 'true', 'On', 'on', '1')
+
+ is interpreted as a boolean True.
+
+ Useful for JSON-decoded stuff and config file parsing
+ """
+ return int(bool_from_string(subject))
+
+
+def bool_from_string(subject, strict=False, default=False):
+ """Interpret a string as a boolean.
+
+ A case-insensitive match is performed such that strings matching 't',
+ 'true', 'on', 'y', 'yes', or '1' are considered True and, when
+ `strict=False`, anything else returns the value specified by 'default'.
+
+ Useful for JSON-decoded stuff and config file parsing.
+
+ If `strict=True`, unrecognized values, including None, will raise a
+ ValueError which is useful when parsing values passed in from an API call.
+ Strings yielding False are 'f', 'false', 'off', 'n', 'no', or '0'.
+ """
+ if not isinstance(subject, six.string_types):
+ subject = six.text_type(subject)
+
+ lowered = subject.strip().lower()
+
+ if lowered in TRUE_STRINGS:
+ return True
+ elif lowered in FALSE_STRINGS:
+ return False
+ elif strict:
+ acceptable = ', '.join(
+ "'%s'" % s for s in sorted(TRUE_STRINGS + FALSE_STRINGS))
+ msg = _("Unrecognized value '%(val)s', acceptable values are:"
+ " %(acceptable)s") % {'val': subject,
+ 'acceptable': acceptable}
+ raise ValueError(msg)
+ else:
+ return default
+
+
+def string_to_bytes(text, unit_system='IEC', return_int=False):
+ """Converts a string into an float representation of bytes.
+
+ The units supported for IEC ::
+
+ Kb(it), Kib(it), Mb(it), Mib(it), Gb(it), Gib(it), Tb(it), Tib(it)
+ KB, KiB, MB, MiB, GB, GiB, TB, TiB
+
+ The units supported for SI ::
+
+ kb(it), Mb(it), Gb(it), Tb(it)
+ kB, MB, GB, TB
+
+ Note that the SI unit system does not support capital letter 'K'
+
+ :param text: String input for bytes size conversion.
+ :param unit_system: Unit system for byte size conversion.
+ :param return_int: If True, returns integer representation of text
+ in bytes. (default: decimal)
+ :returns: Numerical representation of text in bytes.
+ :raises ValueError: If text has an invalid value.
+
+ """
+ try:
+ base, reg_ex = UNIT_SYSTEM_INFO[unit_system]
+ except KeyError:
+ msg = _('Invalid unit system: "%s"') % unit_system
+ raise ValueError(msg)
+ match = reg_ex.match(text)
+ if match:
+ magnitude = float(match.group(1))
+ unit_prefix = match.group(2)
+ if match.group(3) in ['b', 'bit']:
+ magnitude /= 8
+ else:
+ msg = _('Invalid string format: %s') % text
+ raise ValueError(msg)
+ if not unit_prefix:
+ res = magnitude
+ else:
+ res = magnitude * pow(base, UNIT_PREFIX_EXPONENT[unit_prefix])
+ if return_int:
+ return int(math.ceil(res))
+ return res
+
+
+def to_slug(value, incoming=None, errors="strict"):
+ """Normalize string.
+
+ Convert to lowercase, remove non-word characters, and convert spaces
+ to hyphens.
+
+ Inspired by Django's `slugify` filter.
+
+ :param value: Text to slugify
+ :param incoming: Text's current encoding
+ :param errors: Errors handling policy. See here for valid
+ values http://docs.python.org/2/library/codecs.html
+ :returns: slugified unicode representation of `value`
+ :raises TypeError: If text is not an instance of str
+ """
+ value = encodeutils.safe_decode(value, incoming, errors)
+ # NOTE(aababilov): no need to use safe_(encode|decode) here:
+ # encodings are always "ascii", error handling is always "ignore"
+ # and types are always known (first: unicode; second: str)
+ value = unicodedata.normalize("NFKD", value).encode(
+ "ascii", "ignore").decode("ascii")
+ value = SLUGIFY_STRIP_RE.sub("", value).strip().lower()
+ return SLUGIFY_HYPHENATE_RE.sub("-", value)
+
+
+def mask_password(message, secret="***"):
+ """Replace password with 'secret' in message.
+
+ :param message: The string which includes security information.
+ :param secret: value with which to replace passwords.
+ :returns: The unicode value of message with the password fields masked.
+
+ For example:
+
+ >>> mask_password("'adminPass' : 'aaaaa'")
+ "'adminPass' : '***'"
+ >>> mask_password("'admin_pass' : 'aaaaa'")
+ "'admin_pass' : '***'"
+ >>> mask_password('"password" : "aaaaa"')
+ '"password" : "***"'
+ >>> mask_password("'original_password' : 'aaaaa'")
+ "'original_password' : '***'"
+ >>> mask_password("u'original_password' : u'aaaaa'")
+ "u'original_password' : u'***'"
+ """
+
+ try:
+ message = six.text_type(message)
+ except UnicodeDecodeError:
+ # NOTE(jecarey): Temporary fix to handle cases where message is a
+ # byte string. A better solution will be provided in Kilo.
+ pass
+
+ # NOTE(ldbragst): Check to see if anything in message contains any key
+ # specified in _SANITIZE_KEYS, if not then just return the message since
+ # we don't have to mask any passwords.
+ if not any(key in message for key in _SANITIZE_KEYS):
+ return message
+
+ substitute = r'\g<1>' + secret + r'\g<2>'
+ for pattern in _SANITIZE_PATTERNS_2:
+ message = re.sub(pattern, substitute, message)
+
+ substitute = r'\g<1>' + secret
+ for pattern in _SANITIZE_PATTERNS_1:
+ message = re.sub(pattern, substitute, message)
+
+ return message
+
+
+def is_int_like(val):
+ """Check if a value looks like an integer with base 10.
+
+ :param val: Value to verify
+ :type val: string
+ :returns: bool
+ """
+ try:
+ return six.text_type(int(val)) == six.text_type(val)
+ except (TypeError, ValueError):
+ return False
diff --git a/tests/test_utils.py b/oslo_utils/tests/__init__.py
index 19c7d9c..19f5e72 100644
--- a/tests/test_utils.py
+++ b/oslo_utils/tests/__init__.py
@@ -11,18 +11,3 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-
-"""
-test_utils
-----------------------------------
-
-Tests for `utils` module.
-"""
-
-from . import base
-
-
-class TestUtils(base.TestCase):
-
- def test_something(self):
- pass
diff --git a/oslo_utils/tests/base.py b/oslo_utils/tests/base.py
new file mode 100644
index 0000000..a3069ed
--- /dev/null
+++ b/oslo_utils/tests/base.py
@@ -0,0 +1,55 @@
+# -*- coding: utf-8 -*-
+
+# Copyright 2010-2011 OpenStack Foundation
+# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import os
+
+import fixtures
+import testtools
+
+_TRUE_VALUES = ('true', '1', 'yes')
+
+# FIXME(dhellmann) Update this to use oslo.test library
+
+
+class TestCase(testtools.TestCase):
+
+ """Test case base class for all unit tests."""
+
+ def setUp(self):
+ """Run before each test method to initialize test environment."""
+
+ super(TestCase, self).setUp()
+ test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
+ try:
+ test_timeout = int(test_timeout)
+ except ValueError:
+ # If timeout value is invalid do not set a timeout.
+ test_timeout = 0
+ if test_timeout > 0:
+ self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
+
+ self.useFixture(fixtures.NestedTempfile())
+ self.useFixture(fixtures.TempHomeDir())
+
+ if os.environ.get('OS_STDOUT_CAPTURE') in _TRUE_VALUES:
+ stdout = self.useFixture(fixtures.StringStream('stdout')).stream
+ self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
+ if os.environ.get('OS_STDERR_CAPTURE') in _TRUE_VALUES:
+ stderr = self.useFixture(fixtures.StringStream('stderr')).stream
+ self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
+
+ self.log_fixture = self.useFixture(fixtures.FakeLogger())
diff --git a/tests/fake/__init__.py b/oslo_utils/tests/fake/__init__.py
index 06cc944..06cc944 100644
--- a/tests/fake/__init__.py
+++ b/oslo_utils/tests/fake/__init__.py
diff --git a/oslo_utils/tests/test_excutils.py b/oslo_utils/tests/test_excutils.py
new file mode 100644
index 0000000..92bf39c
--- /dev/null
+++ b/oslo_utils/tests/test_excutils.py
@@ -0,0 +1,196 @@
+# Copyright 2012, Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+import time
+
+import mock
+from oslotest import base as test_base
+from oslotest import moxstubout
+
+from oslo_utils import excutils
+
+
+mox = moxstubout.mox
+
+
+class SaveAndReraiseTest(test_base.BaseTestCase):
+
+ def test_save_and_reraise_exception(self):
+ e = None
+ msg = 'foo'
+ try:
+ try:
+ raise Exception(msg)
+ except Exception:
+ with excutils.save_and_reraise_exception():
+ pass
+ except Exception as _e:
+ e = _e
+
+ self.assertEqual(str(e), msg)
+
+ def test_save_and_reraise_exception_dropped(self):
+ e = None
+ msg = 'second exception'
+ with mock.patch('logging.error') as log:
+ try:
+ try:
+ raise Exception('dropped')
+ except Exception:
+ with excutils.save_and_reraise_exception():
+ raise Exception(msg)
+ except Exception as _e:
+ e = _e
+
+ self.assertEqual(str(e), msg)
+ self.assertTrue(log.called)
+
+ def test_save_and_reraise_exception_no_reraise(self):
+ """Test that suppressing the reraise works."""
+ try:
+ raise Exception('foo')
+ except Exception:
+ with excutils.save_and_reraise_exception() as ctxt:
+ ctxt.reraise = False
+
+ def test_save_and_reraise_exception_dropped_no_reraise(self):
+ e = None
+ msg = 'second exception'
+ with mock.patch('logging.error') as log:
+ try:
+ try:
+ raise Exception('dropped')
+ except Exception:
+ with excutils.save_and_reraise_exception(reraise=False):
+ raise Exception(msg)
+ except Exception as _e:
+ e = _e
+
+ self.assertEqual(str(e), msg)
+ self.assertFalse(log.called)
+
+
+class ForeverRetryUncaughtExceptionsTest(test_base.BaseTestCase):
+
+ def setUp(self):
+ super(ForeverRetryUncaughtExceptionsTest, self).setUp()
+ moxfixture = self.useFixture(moxstubout.MoxStubout())
+ self.mox = moxfixture.mox
+ self.stubs = moxfixture.stubs
+
+ @excutils.forever_retry_uncaught_exceptions
+ def exception_generator(self):
+ exc = self.exception_to_raise()
+ while exc is not None:
+ raise exc
+ exc = self.exception_to_raise()
+
+ def exception_to_raise(self):
+ return None
+
+ def my_time_sleep(self, arg):
+ pass
+
+ def exc_retrier_common_start(self):
+ self.stubs.Set(time, 'sleep', self.my_time_sleep)
+ self.mox.StubOutWithMock(logging, 'exception')
+ self.mox.StubOutWithMock(time, 'time',
+ use_mock_anything=True)
+ self.mox.StubOutWithMock(self, 'exception_to_raise')
+
+ def exc_retrier_sequence(self, exc_id=None, timestamp=None,
+ exc_count=None):
+ self.exception_to_raise().AndReturn(
+ Exception('unexpected %d' % exc_id))
+ time.time().AndReturn(timestamp)
+ if exc_count != 0:
+ logging.exception(mox.In(
+ 'Unexpected exception occurred %d time(s)' % exc_count))
+
+ def exc_retrier_common_end(self):
+ self.exception_to_raise().AndReturn(None)
+ self.mox.ReplayAll()
+ self.exception_generator()
+ self.addCleanup(self.stubs.UnsetAll)
+
+ def test_exc_retrier_1exc_gives_1log(self):
+ self.exc_retrier_common_start()
+ self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
+ self.exc_retrier_common_end()
+
+ def test_exc_retrier_same_10exc_1min_gives_1log(self):
+ self.exc_retrier_common_start()
+ self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
+ # By design, the following exception don't get logged because they
+ # are within the same minute.
+ for i in range(2, 11):
+ self.exc_retrier_sequence(exc_id=1, timestamp=i, exc_count=0)
+ self.exc_retrier_common_end()
+
+ def test_exc_retrier_same_2exc_2min_gives_2logs(self):
+ self.exc_retrier_common_start()
+ self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
+ self.exc_retrier_sequence(exc_id=1, timestamp=65, exc_count=1)
+ self.exc_retrier_common_end()
+
+ def test_exc_retrier_same_10exc_2min_gives_2logs(self):
+ self.exc_retrier_common_start()
+ self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
+ self.exc_retrier_sequence(exc_id=1, timestamp=12, exc_count=0)
+ self.exc_retrier_sequence(exc_id=1, timestamp=23, exc_count=0)
+ self.exc_retrier_sequence(exc_id=1, timestamp=34, exc_count=0)
+ self.exc_retrier_sequence(exc_id=1, timestamp=45, exc_count=0)
+ # The previous 4 exceptions are counted here
+ self.exc_retrier_sequence(exc_id=1, timestamp=106, exc_count=5)
+ # Again, the following are not logged due to being within
+ # the same minute
+ self.exc_retrier_sequence(exc_id=1, timestamp=117, exc_count=0)
+ self.exc_retrier_sequence(exc_id=1, timestamp=128, exc_count=0)
+ self.exc_retrier_sequence(exc_id=1, timestamp=139, exc_count=0)
+ self.exc_retrier_sequence(exc_id=1, timestamp=150, exc_count=0)
+ self.exc_retrier_common_end()
+
+ def test_exc_retrier_mixed_4exc_1min_gives_2logs(self):
+ self.exc_retrier_common_start()
+ self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
+ # By design, this second 'unexpected 1' exception is not counted. This
+ # is likely a rare thing and is a sacrifice for code simplicity.
+ self.exc_retrier_sequence(exc_id=1, timestamp=10, exc_count=0)
+ self.exc_retrier_sequence(exc_id=2, timestamp=20, exc_count=1)
+ # Again, trailing exceptions within a minute are not counted.
+ self.exc_retrier_sequence(exc_id=2, timestamp=30, exc_count=0)
+ self.exc_retrier_common_end()
+
+ def test_exc_retrier_mixed_4exc_2min_gives_2logs(self):
+ self.exc_retrier_common_start()
+ self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
+ # Again, this second exception of the same type is not counted
+ # for the sake of code simplicity.
+ self.exc_retrier_sequence(exc_id=1, timestamp=10, exc_count=0)
+ # The difference between this and the previous case is the log
+ # is also triggered by more than a minute expiring.
+ self.exc_retrier_sequence(exc_id=2, timestamp=100, exc_count=1)
+ self.exc_retrier_sequence(exc_id=2, timestamp=110, exc_count=0)
+ self.exc_retrier_common_end()
+
+ def test_exc_retrier_mixed_4exc_2min_gives_3logs(self):
+ self.exc_retrier_common_start()
+ self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
+ # This time the second 'unexpected 1' exception is counted due
+ # to the same exception occurring same when the minute expires.
+ self.exc_retrier_sequence(exc_id=1, timestamp=10, exc_count=0)
+ self.exc_retrier_sequence(exc_id=1, timestamp=100, exc_count=2)
+ self.exc_retrier_sequence(exc_id=2, timestamp=110, exc_count=1)
+ self.exc_retrier_common_end()
diff --git a/oslo_utils/tests/test_importutils.py b/oslo_utils/tests/test_importutils.py
new file mode 100644
index 0000000..bbdff04
--- /dev/null
+++ b/oslo_utils/tests/test_importutils.py
@@ -0,0 +1,121 @@
+# Copyright 2011 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import datetime
+import sys
+
+from oslotest import base as test_base
+
+from oslo_utils import importutils
+
+
+class ImportUtilsTest(test_base.BaseTestCase):
+
+ # NOTE(jkoelker) There has GOT to be a way to test this. But mocking
+ # __import__ is the devil. Right now we just make
+ # sure we can import something from the stdlib
+ def test_import_class(self):
+ dt = importutils.import_class('datetime.datetime')
+ self.assertEqual(sys.modules['datetime'].datetime, dt)
+
+ def test_import_bad_class(self):
+ self.assertRaises(ImportError, importutils.import_class,
+ 'lol.u_mad.brah')
+
+ def test_import_module(self):
+ dt = importutils.import_module('datetime')
+ self.assertEqual(sys.modules['datetime'], dt)
+
+ def test_import_object_optional_arg_not_present(self):
+ obj = importutils.import_object('oslo_utils.tests.fake.FakeDriver')
+ self.assertEqual(obj.__class__.__name__, 'FakeDriver')
+
+ def test_import_object_optional_arg_present(self):
+ obj = importutils.import_object('oslo_utils.tests.fake.FakeDriver',
+ first_arg=False)
+ self.assertEqual(obj.__class__.__name__, 'FakeDriver')
+
+ def test_import_object_required_arg_not_present(self):
+ # arg 1 isn't optional here
+ self.assertRaises(TypeError, importutils.import_object,
+ 'oslo_utils.tests.fake.FakeDriver2')
+
+ def test_import_object_required_arg_present(self):
+ obj = importutils.import_object('oslo_utils.tests.fake.FakeDriver2',
+ first_arg=False)
+ self.assertEqual(obj.__class__.__name__, 'FakeDriver2')
+
+ # namespace tests
+ def test_import_object_ns_optional_arg_not_present(self):
+ obj = importutils.import_object_ns('oslo_utils',
+ 'tests.fake.FakeDriver')
+ self.assertEqual(obj.__class__.__name__, 'FakeDriver')
+
+ def test_import_object_ns_optional_arg_present(self):
+ obj = importutils.import_object_ns('oslo_utils',
+ 'tests.fake.FakeDriver',
+ first_arg=False)
+ self.assertEqual(obj.__class__.__name__, 'FakeDriver')
+
+ def test_import_object_ns_required_arg_not_present(self):
+ # arg 1 isn't optional here
+ self.assertRaises(TypeError, importutils.import_object_ns,
+ 'oslo_utils', 'tests.fake.FakeDriver2')
+
+ def test_import_object_ns_required_arg_present(self):
+ obj = importutils.import_object_ns('oslo_utils',
+ 'tests.fake.FakeDriver2',
+ first_arg=False)
+ self.assertEqual(obj.__class__.__name__, 'FakeDriver2')
+
+ # namespace tests
+ def test_import_object_ns_full_optional_arg_not_present(self):
+ obj = importutils.import_object_ns('tests2',
+ 'oslo_utils.tests.fake.FakeDriver')
+ self.assertEqual(obj.__class__.__name__, 'FakeDriver')
+
+ def test_import_object_ns_full_optional_arg_present(self):
+ obj = importutils.import_object_ns('tests2',
+ 'oslo_utils.tests.fake.FakeDriver',
+ first_arg=False)
+ self.assertEqual(obj.__class__.__name__, 'FakeDriver')
+
+ def test_import_object_ns_full_required_arg_not_present(self):
+ # arg 1 isn't optional here
+ self.assertRaises(TypeError, importutils.import_object_ns,
+ 'tests2', 'oslo_utils.tests.fake.FakeDriver2')
+
+ def test_import_object_ns_full_required_arg_present(self):
+ obj = importutils.import_object_ns('tests2',
+ 'oslo_utils.tests.fake.FakeDriver2',
+ first_arg=False)
+ self.assertEqual(obj.__class__.__name__, 'FakeDriver2')
+
+ def test_import_object(self):
+ dt = importutils.import_object('datetime.time')
+ self.assertTrue(isinstance(dt, sys.modules['datetime'].time))
+
+ def test_import_object_with_args(self):
+ dt = importutils.import_object('datetime.datetime', 2012, 4, 5)
+ self.assertTrue(isinstance(dt, sys.modules['datetime'].datetime))
+ self.assertEqual(dt, datetime.datetime(2012, 4, 5))
+
+ def test_try_import(self):
+ dt = importutils.try_import('datetime')
+ self.assertEqual(sys.modules['datetime'], dt)
+
+ def test_try_import_returns_default(self):
+ foo = importutils.try_import('foo.bar')
+ self.assertIsNone(foo)
diff --git a/oslo_utils/tests/test_netutils.py b/oslo_utils/tests/test_netutils.py
new file mode 100644
index 0000000..790104f
--- /dev/null
+++ b/oslo_utils/tests/test_netutils.py
@@ -0,0 +1,224 @@
+# Copyright 2012 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import socket
+
+import mock
+from mock import patch
+import netifaces
+from oslotest import base as test_base
+
+from oslo_utils import netutils
+
+
+class NetworkUtilsTest(test_base.BaseTestCase):
+
+ def test_no_host(self):
+ result = netutils.urlsplit('http://')
+ self.assertEqual('', result.netloc)
+ self.assertEqual(None, result.port)
+ self.assertEqual(None, result.hostname)
+ self.assertEqual('http', result.scheme)
+
+ def test_parse_host_port(self):
+ self.assertEqual(('server01', 80),
+ netutils.parse_host_port('server01:80'))
+ self.assertEqual(('server01', None),
+ netutils.parse_host_port('server01'))
+ self.assertEqual(('server01', 1234),
+ netutils.parse_host_port('server01',
+ default_port=1234))
+ self.assertEqual(('::1', 80),
+ netutils.parse_host_port('[::1]:80'))
+ self.assertEqual(('::1', None),
+ netutils.parse_host_port('[::1]'))
+ self.assertEqual(('::1', 1234),
+ netutils.parse_host_port('[::1]',
+ default_port=1234))
+ self.assertEqual(('2001:db8:85a3::8a2e:370:7334', 1234),
+ netutils.parse_host_port(
+ '2001:db8:85a3::8a2e:370:7334',
+ default_port=1234))
+
+ def test_urlsplit(self):
+ result = netutils.urlsplit('rpc://myhost?someparam#somefragment')
+ self.assertEqual(result.scheme, 'rpc')
+ self.assertEqual(result.netloc, 'myhost')
+ self.assertEqual(result.path, '')
+ self.assertEqual(result.query, 'someparam')
+ self.assertEqual(result.fragment, 'somefragment')
+
+ result = netutils.urlsplit(
+ 'rpc://myhost/mypath?someparam#somefragment',
+ allow_fragments=False)
+ self.assertEqual(result.scheme, 'rpc')
+ self.assertEqual(result.netloc, 'myhost')
+ self.assertEqual(result.path, '/mypath')
+ self.assertEqual(result.query, 'someparam#somefragment')
+ self.assertEqual(result.fragment, '')
+
+ result = netutils.urlsplit(
+ 'rpc://user:pass@myhost/mypath?someparam#somefragment',
+ allow_fragments=False)
+ self.assertEqual(result.scheme, 'rpc')
+ self.assertEqual(result.netloc, 'user:pass@myhost')
+ self.assertEqual(result.path, '/mypath')
+ self.assertEqual(result.query, 'someparam#somefragment')
+ self.assertEqual(result.fragment, '')
+
+ def test_urlsplit_ipv6(self):
+ ipv6_url = 'http://[::1]:443/v2.0/'
+ result = netutils.urlsplit(ipv6_url)
+ self.assertEqual(result.scheme, 'http')
+ self.assertEqual(result.netloc, '[::1]:443')
+ self.assertEqual(result.path, '/v2.0/')
+ self.assertEqual(result.hostname, '::1')
+ self.assertEqual(result.port, 443)
+
+ ipv6_url = 'http://user:pass@[::1]/v2.0/'
+ result = netutils.urlsplit(ipv6_url)
+ self.assertEqual(result.scheme, 'http')
+ self.assertEqual(result.netloc, 'user:pass@[::1]')
+ self.assertEqual(result.path, '/v2.0/')
+ self.assertEqual(result.hostname, '::1')
+ self.assertEqual(result.port, None)
+
+ ipv6_url = 'https://[2001:db8:85a3::8a2e:370:7334]:1234/v2.0/xy?ab#12'
+ result = netutils.urlsplit(ipv6_url)
+ self.assertEqual(result.scheme, 'https')
+ self.assertEqual(result.netloc, '[2001:db8:85a3::8a2e:370:7334]:1234')
+ self.assertEqual(result.path, '/v2.0/xy')
+ self.assertEqual(result.hostname, '2001:db8:85a3::8a2e:370:7334')
+ self.assertEqual(result.port, 1234)
+ self.assertEqual(result.query, 'ab')
+ self.assertEqual(result.fragment, '12')
+
+ def test_urlsplit_params(self):
+ test_url = "http://localhost/?a=b&c=d"
+ result = netutils.urlsplit(test_url)
+ self.assertEqual({'a': 'b', 'c': 'd'}, result.params())
+ self.assertEqual({'a': 'b', 'c': 'd'}, result.params(collapse=False))
+
+ test_url = "http://localhost/?a=b&a=c&a=d"
+ result = netutils.urlsplit(test_url)
+ self.assertEqual({'a': 'd'}, result.params())
+ self.assertEqual({'a': ['b', 'c', 'd']}, result.params(collapse=False))
+
+ test_url = "http://localhost"
+ result = netutils.urlsplit(test_url)
+ self.assertEqual({}, result.params())
+
+ test_url = "http://localhost?"
+ result = netutils.urlsplit(test_url)
+ self.assertEqual({}, result.params())
+
+ def test_set_tcp_keepalive(self):
+ mock_sock = mock.Mock()
+ netutils.set_tcp_keepalive(mock_sock, True, 100, 10, 5)
+ calls = [
+ mock.call.setsockopt(socket.SOL_SOCKET,
+ socket.SO_KEEPALIVE, True),
+ ]
+ if hasattr(socket, 'TCP_KEEPIDLE'):
+ calls += [
+ mock.call.setsockopt(socket.IPPROTO_TCP,
+ socket.TCP_KEEPIDLE, 100)
+ ]
+ if hasattr(socket, 'TCP_KEEPINTVL'):
+ calls += [
+ mock.call.setsockopt(socket.IPPROTO_TCP,
+ socket.TCP_KEEPINTVL, 10),
+ ]
+ if hasattr(socket, 'TCP_KEEPCNT'):
+ calls += [
+ mock.call.setsockopt(socket.IPPROTO_TCP,
+ socket.TCP_KEEPCNT, 5)
+ ]
+ mock_sock.assert_has_calls(calls)
+
+ mock_sock.reset_mock()
+ netutils.set_tcp_keepalive(mock_sock, False)
+ self.assertEqual(1, len(mock_sock.mock_calls))
+
+ def test_is_valid_ipv4(self):
+ self.assertTrue(netutils.is_valid_ipv4('42.42.42.42'))
+
+ self.assertFalse(netutils.is_valid_ipv4('-1.11.11.11'))
+
+ self.assertFalse(netutils.is_valid_ipv4(''))
+
+ def test_is_valid_ipv6(self):
+ self.assertTrue(netutils.is_valid_ipv6('::1'))
+
+ self.assertFalse(netutils.is_valid_ipv6(
+ '1fff::a88:85a3::172.31.128.1'))
+
+ self.assertFalse(netutils.is_valid_ipv6(''))
+
+ def test_is_valid_ip(self):
+ self.assertTrue(netutils.is_valid_ip('127.0.0.1'))
+
+ self.assertTrue(netutils.is_valid_ip('2001:db8::ff00:42:8329'))
+
+ self.assertFalse(netutils.is_valid_ip('256.0.0.0'))
+
+ self.assertFalse(netutils.is_valid_ip('::1.2.3.'))
+
+ self.assertFalse(netutils.is_valid_ip(''))
+
+ def test_valid_port(self):
+ valid_inputs = [1, '1', 2, '3', '5', 8, 13, 21,
+ '80', '3246', '65535']
+ for input_str in valid_inputs:
+ self.assertTrue(netutils.is_valid_port(input_str))
+
+ def test_valid_port_fail(self):
+ invalid_inputs = ['-32768', '0', 0, '65536', 528491, '528491',
+ '528.491', 'thirty-seven', None]
+ for input_str in invalid_inputs:
+ self.assertFalse(netutils.is_valid_port(input_str))
+
+ def test_get_my_ip(self):
+ sock_attrs = {
+ 'return_value.getsockname.return_value': ['1.2.3.4', '']}
+ with mock.patch('socket.socket', **sock_attrs):
+ addr = netutils.get_my_ipv4()
+ self.assertEqual(addr, '1.2.3.4')
+
+ @mock.patch('socket.socket')
+ @mock.patch('oslo_utils.netutils._get_my_ipv4_address')
+ def test_get_my_ip_socket_error(self, ip, mock_socket):
+ mock_socket.side_effect = socket.error
+ ip.return_value = '1.2.3.4'
+ addr = netutils.get_my_ipv4()
+ self.assertEqual(addr, '1.2.3.4')
+
+ @mock.patch('netifaces.gateways')
+ @mock.patch('netifaces.ifaddresses')
+ def test_get_my_ipv4_address_with_default_route(
+ self, ifaddr, gateways):
+ with patch.dict(netifaces.__dict__, {'AF_INET': '0'}):
+ ifaddr.return_value = {'0': [{'addr': '172.18.204.1'}]}
+ addr = netutils._get_my_ipv4_address()
+ self.assertEqual('172.18.204.1', addr)
+
+ @mock.patch('netifaces.gateways')
+ @mock.patch('netifaces.ifaddresses')
+ def test_get_my_ipv4_address_without_default_route(
+ self, ifaddr, gateways):
+ with patch.dict(netifaces.__dict__, {'AF_INET': '0'}):
+ ifaddr.return_value = {}
+ addr = netutils._get_my_ipv4_address()
+ self.assertEqual('127.0.0.1', addr)
diff --git a/oslo_utils/tests/test_reflection.py b/oslo_utils/tests/test_reflection.py
new file mode 100644
index 0000000..d66f438
--- /dev/null
+++ b/oslo_utils/tests/test_reflection.py
@@ -0,0 +1,279 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from oslotest import base as test_base
+import six
+import testtools
+
+from oslo_utils import reflection
+
+
+if six.PY3:
+ RUNTIME_ERROR_CLASSES = ['RuntimeError', 'Exception',
+ 'BaseException', 'object']
+else:
+ RUNTIME_ERROR_CLASSES = ['RuntimeError', 'StandardError', 'Exception',
+ 'BaseException', 'object']
+
+
+def dummy_decorator(f):
+
+ @six.wraps(f)
+ def wrapper(*args, **kwargs):
+ return f(*args, **kwargs)
+
+ return wrapper
+
+
+def mere_function(a, b):
+ pass
+
+
+def function_with_defs(a, b, optional=None):
+ pass
+
+
+def function_with_kwargs(a, b, **kwargs):
+ pass
+
+
+class Class(object):
+
+ def method(self, c, d):
+ pass
+
+ @staticmethod
+ def static_method(e, f):
+ pass
+
+ @classmethod
+ def class_method(cls, g, h):
+ pass
+
+
+class CallableClass(object):
+ def __call__(self, i, j):
+ pass
+
+
+class ClassWithInit(object):
+ def __init__(self, k, l):
+ pass
+
+
+class CallbackEqualityTest(test_base.BaseTestCase):
+ def test_different_simple_callbacks(self):
+
+ def a():
+ pass
+
+ def b():
+ pass
+
+ self.assertFalse(reflection.is_same_callback(a, b))
+
+ def test_static_instance_callbacks(self):
+
+ class A(object):
+
+ @staticmethod
+ def b(a, b, c):
+ pass
+
+ a = A()
+ b = A()
+
+ self.assertTrue(reflection.is_same_callback(a.b, b.b))
+
+ def test_different_instance_callbacks(self):
+
+ class A(object):
+ def b(self):
+ pass
+
+ def __eq__(self, other):
+ return True
+
+ b = A()
+ c = A()
+
+ self.assertFalse(reflection.is_same_callback(b.b, c.b))
+ self.assertTrue(reflection.is_same_callback(b.b, c.b, strict=False))
+
+
+class GetCallableNameTest(test_base.BaseTestCase):
+
+ def test_mere_function(self):
+ name = reflection.get_callable_name(mere_function)
+ self.assertEqual('.'.join((__name__, 'mere_function')), name)
+
+ def test_method(self):
+ name = reflection.get_callable_name(Class.method)
+ self.assertEqual('.'.join((__name__, 'Class', 'method')), name)
+
+ def test_instance_method(self):
+ name = reflection.get_callable_name(Class().method)
+ self.assertEqual('.'.join((__name__, 'Class', 'method')), name)
+
+ def test_static_method(self):
+ name = reflection.get_callable_name(Class.static_method)
+ if six.PY3:
+ self.assertEqual('.'.join((__name__, 'Class', 'static_method')),
+ name)
+ else:
+ # NOTE(imelnikov): static method are just functions, class name
+ # is not recorded anywhere in them.
+ self.assertEqual('.'.join((__name__, 'static_method')), name)
+
+ def test_class_method(self):
+ name = reflection.get_callable_name(Class.class_method)
+ self.assertEqual('.'.join((__name__, 'Class', 'class_method')), name)
+
+ def test_constructor(self):
+ name = reflection.get_callable_name(Class)
+ self.assertEqual('.'.join((__name__, 'Class')), name)
+
+ def test_callable_class(self):
+ name = reflection.get_callable_name(CallableClass())
+ self.assertEqual('.'.join((__name__, 'CallableClass')), name)
+
+ def test_callable_class_call(self):
+ name = reflection.get_callable_name(CallableClass().__call__)
+ self.assertEqual('.'.join((__name__, 'CallableClass',
+ '__call__')), name)
+
+
+# These extended/special case tests only work on python 3, due to python 2
+# being broken/incorrect with regard to these special cases...
+@testtools.skipIf(not six.PY3, 'python 3.x is not currently available')
+class GetCallableNameTestExtended(test_base.BaseTestCase):
+ # Tests items in http://legacy.python.org/dev/peps/pep-3155/
+
+ class InnerCallableClass(object):
+ def __call__(self):
+ pass
+
+ def test_inner_callable_class(self):
+ obj = self.InnerCallableClass()
+ name = reflection.get_callable_name(obj.__call__)
+ expected_name = '.'.join((__name__, 'GetCallableNameTestExtended',
+ 'InnerCallableClass', '__call__'))
+ self.assertEqual(expected_name, name)
+
+ def test_inner_callable_function(self):
+ def a():
+
+ def b():
+ pass
+
+ return b
+
+ name = reflection.get_callable_name(a())
+ expected_name = '.'.join((__name__, 'GetCallableNameTestExtended',
+ 'test_inner_callable_function', '<locals>',
+ 'a', '<locals>', 'b'))
+ self.assertEqual(expected_name, name)
+
+ def test_inner_class(self):
+ obj = self.InnerCallableClass()
+ name = reflection.get_callable_name(obj)
+ expected_name = '.'.join((__name__,
+ 'GetCallableNameTestExtended',
+ 'InnerCallableClass'))
+ self.assertEqual(expected_name, name)
+
+
+class GetCallableArgsTest(test_base.BaseTestCase):
+
+ def test_mere_function(self):
+ result = reflection.get_callable_args(mere_function)
+ self.assertEqual(['a', 'b'], result)
+
+ def test_function_with_defaults(self):
+ result = reflection.get_callable_args(function_with_defs)
+ self.assertEqual(['a', 'b', 'optional'], result)
+
+ def test_required_only(self):
+ result = reflection.get_callable_args(function_with_defs,
+ required_only=True)
+ self.assertEqual(['a', 'b'], result)
+
+ def test_method(self):
+ result = reflection.get_callable_args(Class.method)
+ self.assertEqual(['self', 'c', 'd'], result)
+
+ def test_instance_method(self):
+ result = reflection.get_callable_args(Class().method)
+ self.assertEqual(['c', 'd'], result)
+
+ def test_class_method(self):
+ result = reflection.get_callable_args(Class.class_method)
+ self.assertEqual(['g', 'h'], result)
+
+ def test_class_constructor(self):
+ result = reflection.get_callable_args(ClassWithInit)
+ self.assertEqual(['k', 'l'], result)
+
+ def test_class_with_call(self):
+ result = reflection.get_callable_args(CallableClass())
+ self.assertEqual(['i', 'j'], result)
+
+ def test_decorators_work(self):
+ @dummy_decorator
+ def special_fun(x, y):
+ pass
+ result = reflection.get_callable_args(special_fun)
+ self.assertEqual(['x', 'y'], result)
+
+
+class AcceptsKwargsTest(test_base.BaseTestCase):
+
+ def test_no_kwargs(self):
+ self.assertEqual(False, reflection.accepts_kwargs(mere_function))
+
+ def test_with_kwargs(self):
+ self.assertEqual(True, reflection.accepts_kwargs(function_with_kwargs))
+
+
+class GetClassNameTest(test_base.BaseTestCase):
+
+ def test_std_exception(self):
+ name = reflection.get_class_name(RuntimeError)
+ self.assertEqual('RuntimeError', name)
+
+ def test_class(self):
+ name = reflection.get_class_name(Class)
+ self.assertEqual('.'.join((__name__, 'Class')), name)
+
+ def test_instance(self):
+ name = reflection.get_class_name(Class())
+ self.assertEqual('.'.join((__name__, 'Class')), name)
+
+ def test_int(self):
+ name = reflection.get_class_name(42)
+ self.assertEqual('int', name)
+
+
+class GetAllClassNamesTest(test_base.BaseTestCase):
+
+ def test_std_class(self):
+ names = list(reflection.get_all_class_names(RuntimeError))
+ self.assertEqual(RUNTIME_ERROR_CLASSES, names)
+
+ def test_std_class_up_to(self):
+ names = list(reflection.get_all_class_names(RuntimeError,
+ up_to=Exception))
+ self.assertEqual(RUNTIME_ERROR_CLASSES[:-2], names)
diff --git a/oslo_utils/tests/test_strutils.py b/oslo_utils/tests/test_strutils.py
new file mode 100644
index 0000000..89734c3
--- /dev/null
+++ b/oslo_utils/tests/test_strutils.py
@@ -0,0 +1,594 @@
+# -*- coding: utf-8 -*-
+
+# Copyright 2011 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import math
+
+import mock
+from oslotest import base as test_base
+import six
+import testscenarios
+
+from oslo_utils import strutils
+from oslo_utils import units
+
+load_tests = testscenarios.load_tests_apply_scenarios
+
+
+class StrUtilsTest(test_base.BaseTestCase):
+
+ def test_bool_bool_from_string(self):
+ self.assertTrue(strutils.bool_from_string(True))
+ self.assertFalse(strutils.bool_from_string(False))
+
+ def test_bool_bool_from_string_default(self):
+ self.assertTrue(strutils.bool_from_string('', default=True))
+ self.assertFalse(strutils.bool_from_string('wibble', default=False))
+
+ def _test_bool_from_string(self, c):
+ self.assertTrue(strutils.bool_from_string(c('true')))
+ self.assertTrue(strutils.bool_from_string(c('TRUE')))
+ self.assertTrue(strutils.bool_from_string(c('on')))
+ self.assertTrue(strutils.bool_from_string(c('On')))
+ self.assertTrue(strutils.bool_from_string(c('yes')))
+ self.assertTrue(strutils.bool_from_string(c('YES')))
+ self.assertTrue(strutils.bool_from_string(c('yEs')))
+ self.assertTrue(strutils.bool_from_string(c('1')))
+ self.assertTrue(strutils.bool_from_string(c('T')))
+ self.assertTrue(strutils.bool_from_string(c('t')))
+ self.assertTrue(strutils.bool_from_string(c('Y')))
+ self.assertTrue(strutils.bool_from_string(c('y')))
+
+ self.assertFalse(strutils.bool_from_string(c('false')))
+ self.assertFalse(strutils.bool_from_string(c('FALSE')))
+ self.assertFalse(strutils.bool_from_string(c('off')))
+ self.assertFalse(strutils.bool_from_string(c('OFF')))
+ self.assertFalse(strutils.bool_from_string(c('no')))
+ self.assertFalse(strutils.bool_from_string(c('0')))
+ self.assertFalse(strutils.bool_from_string(c('42')))
+ self.assertFalse(strutils.bool_from_string(c(
+ 'This should not be True')))
+ self.assertFalse(strutils.bool_from_string(c('F')))
+ self.assertFalse(strutils.bool_from_string(c('f')))
+ self.assertFalse(strutils.bool_from_string(c('N')))
+ self.assertFalse(strutils.bool_from_string(c('n')))
+
+ # Whitespace should be stripped
+ self.assertTrue(strutils.bool_from_string(c(' 1 ')))
+ self.assertTrue(strutils.bool_from_string(c(' true ')))
+ self.assertFalse(strutils.bool_from_string(c(' 0 ')))
+ self.assertFalse(strutils.bool_from_string(c(' false ')))
+
+ def test_bool_from_string(self):
+ self._test_bool_from_string(lambda s: s)
+
+ def test_unicode_bool_from_string(self):
+ self._test_bool_from_string(six.text_type)
+ self.assertFalse(strutils.bool_from_string(u'使用', strict=False))
+
+ exc = self.assertRaises(ValueError, strutils.bool_from_string,
+ u'使用', strict=True)
+ expected_msg = (u"Unrecognized value '使用', acceptable values are:"
+ u" '0', '1', 'f', 'false', 'n', 'no', 'off', 'on',"
+ u" 't', 'true', 'y', 'yes'")
+ self.assertEqual(expected_msg, six.text_type(exc))
+
+ def test_other_bool_from_string(self):
+ self.assertFalse(strutils.bool_from_string(None))
+ self.assertFalse(strutils.bool_from_string(mock.Mock()))
+
+ def test_int_bool_from_string(self):
+ self.assertTrue(strutils.bool_from_string(1))
+
+ self.assertFalse(strutils.bool_from_string(-1))
+ self.assertFalse(strutils.bool_from_string(0))
+ self.assertFalse(strutils.bool_from_string(2))
+
+ def test_strict_bool_from_string(self):
+ # None isn't allowed in strict mode
+ exc = self.assertRaises(ValueError, strutils.bool_from_string, None,
+ strict=True)
+ expected_msg = ("Unrecognized value 'None', acceptable values are:"
+ " '0', '1', 'f', 'false', 'n', 'no', 'off', 'on',"
+ " 't', 'true', 'y', 'yes'")
+ self.assertEqual(expected_msg, str(exc))
+
+ # Unrecognized strings aren't allowed
+ self.assertFalse(strutils.bool_from_string('Other', strict=False))
+ exc = self.assertRaises(ValueError, strutils.bool_from_string, 'Other',
+ strict=True)
+ expected_msg = ("Unrecognized value 'Other', acceptable values are:"
+ " '0', '1', 'f', 'false', 'n', 'no', 'off', 'on',"
+ " 't', 'true', 'y', 'yes'")
+ self.assertEqual(expected_msg, str(exc))
+
+ # Unrecognized numbers aren't allowed
+ exc = self.assertRaises(ValueError, strutils.bool_from_string, 2,
+ strict=True)
+ expected_msg = ("Unrecognized value '2', acceptable values are:"
+ " '0', '1', 'f', 'false', 'n', 'no', 'off', 'on',"
+ " 't', 'true', 'y', 'yes'")
+ self.assertEqual(expected_msg, str(exc))
+
+ # False-like values are allowed
+ self.assertFalse(strutils.bool_from_string('f', strict=True))
+ self.assertFalse(strutils.bool_from_string('false', strict=True))
+ self.assertFalse(strutils.bool_from_string('off', strict=True))
+ self.assertFalse(strutils.bool_from_string('n', strict=True))
+ self.assertFalse(strutils.bool_from_string('no', strict=True))
+ self.assertFalse(strutils.bool_from_string('0', strict=True))
+
+ self.assertTrue(strutils.bool_from_string('1', strict=True))
+
+ # Avoid font-similarity issues (one looks like lowercase-el, zero like
+ # oh, etc...)
+ for char in ('O', 'o', 'L', 'l', 'I', 'i'):
+ self.assertRaises(ValueError, strutils.bool_from_string, char,
+ strict=True)
+
+ def test_int_from_bool_as_string(self):
+ self.assertEqual(1, strutils.int_from_bool_as_string(True))
+ self.assertEqual(0, strutils.int_from_bool_as_string(False))
+
+ def test_slugify(self):
+ to_slug = strutils.to_slug
+ self.assertRaises(TypeError, to_slug, True)
+ self.assertEqual(six.u("hello"), to_slug("hello"))
+ self.assertEqual(six.u("two-words"), to_slug("Two Words"))
+ self.assertEqual(six.u("ma-any-spa-ce-es"),
+ to_slug("Ma-any\t spa--ce- es"))
+ self.assertEqual(six.u("excamation"), to_slug("exc!amation!"))
+ self.assertEqual(six.u("ampserand"), to_slug("&ampser$and"))
+ self.assertEqual(six.u("ju5tnum8er"), to_slug("ju5tnum8er"))
+ self.assertEqual(six.u("strip-"), to_slug(" strip - "))
+ self.assertEqual(six.u("perche"), to_slug(six.b("perch\xc3\xa9")))
+ self.assertEqual(six.u("strange"),
+ to_slug("\x80strange", errors="ignore"))
+
+
+class StringToBytesTest(test_base.BaseTestCase):
+
+ _unit_system = [
+ ('si', dict(unit_system='SI')),
+ ('iec', dict(unit_system='IEC')),
+ ('invalid_unit_system', dict(unit_system='KKK', assert_error=True)),
+ ]
+
+ _sign = [
+ ('no_sign', dict(sign='')),
+ ('positive', dict(sign='+')),
+ ('negative', dict(sign='-')),
+ ('invalid_sign', dict(sign='~', assert_error=True)),
+ ]
+
+ _magnitude = [
+ ('integer', dict(magnitude='79')),
+ ('decimal', dict(magnitude='7.9')),
+ ('decimal_point_start', dict(magnitude='.9')),
+ ('decimal_point_end', dict(magnitude='79.', assert_error=True)),
+ ('invalid_literal', dict(magnitude='7.9.9', assert_error=True)),
+ ('garbage_value', dict(magnitude='asdf', assert_error=True)),
+ ]
+
+ _unit_prefix = [
+ ('no_unit_prefix', dict(unit_prefix='')),
+ ('k', dict(unit_prefix='k')),
+ ('K', dict(unit_prefix='K')),
+ ('M', dict(unit_prefix='M')),
+ ('G', dict(unit_prefix='G')),
+ ('T', dict(unit_prefix='T')),
+ ('Ki', dict(unit_prefix='Ki')),
+ ('Mi', dict(unit_prefix='Mi')),
+ ('Gi', dict(unit_prefix='Gi')),
+ ('Ti', dict(unit_prefix='Ti')),
+ ('invalid_unit_prefix', dict(unit_prefix='B', assert_error=True)),
+ ]
+
+ _unit_suffix = [
+ ('b', dict(unit_suffix='b')),
+ ('bit', dict(unit_suffix='bit')),
+ ('B', dict(unit_suffix='B')),
+ ('invalid_unit_suffix', dict(unit_suffix='Kg', assert_error=True)),
+ ]
+
+ _return_int = [
+ ('return_dec', dict(return_int=False)),
+ ('return_int', dict(return_int=True)),
+ ]
+
+ @classmethod
+ def generate_scenarios(cls):
+ cls.scenarios = testscenarios.multiply_scenarios(cls._unit_system,
+ cls._sign,
+ cls._magnitude,
+ cls._unit_prefix,
+ cls._unit_suffix,
+ cls._return_int)
+
+ def test_string_to_bytes(self):
+
+ def _get_quantity(sign, magnitude, unit_suffix):
+ res = float('%s%s' % (sign, magnitude))
+ if unit_suffix in ['b', 'bit']:
+ res /= 8
+ return res
+
+ def _get_constant(unit_prefix, unit_system):
+ if not unit_prefix:
+ return 1
+ elif unit_system == 'SI':
+ res = getattr(units, unit_prefix)
+ elif unit_system == 'IEC':
+ if unit_prefix.endswith('i'):
+ res = getattr(units, unit_prefix)
+ else:
+ res = getattr(units, '%si' % unit_prefix)
+ return res
+
+ text = ''.join([self.sign, self.magnitude, self.unit_prefix,
+ self.unit_suffix])
+ err_si = self.unit_system == 'SI' and (self.unit_prefix == 'K' or
+ self.unit_prefix.endswith('i'))
+ err_iec = self.unit_system == 'IEC' and self.unit_prefix == 'k'
+ if getattr(self, 'assert_error', False) or err_si or err_iec:
+ self.assertRaises(ValueError, strutils.string_to_bytes,
+ text, unit_system=self.unit_system,
+ return_int=self.return_int)
+ return
+ quantity = _get_quantity(self.sign, self.magnitude, self.unit_suffix)
+ constant = _get_constant(self.unit_prefix, self.unit_system)
+ expected = quantity * constant
+ actual = strutils.string_to_bytes(text, unit_system=self.unit_system,
+ return_int=self.return_int)
+ if self.return_int:
+ self.assertEqual(actual, int(math.ceil(expected)))
+ else:
+ self.assertAlmostEqual(actual, expected)
+
+StringToBytesTest.generate_scenarios()
+
+
+class MaskPasswordTestCase(test_base.BaseTestCase):
+
+ def test_json(self):
+ # Test 'adminPass' w/o spaces
+ payload = """{'adminPass':'mypassword'}"""
+ expected = """{'adminPass':'***'}"""
+ self.assertEqual(expected, strutils.mask_password(payload))
+ # Test 'adminPass' with spaces
+ payload = """{ 'adminPass' : 'mypassword' }"""
+ expected = """{ 'adminPass' : '***' }"""
+ self.assertEqual(expected, strutils.mask_password(payload))
+ # Test 'admin_pass' w/o spaces
+ payload = """{'admin_pass':'mypassword'}"""
+ expected = """{'admin_pass':'***'}"""
+ self.assertEqual(expected, strutils.mask_password(payload))
+ # Test 'admin_pass' with spaces
+ payload = """{ 'admin_pass' : 'mypassword' }"""
+ expected = """{ 'admin_pass' : '***' }"""
+ self.assertEqual(expected, strutils.mask_password(payload))
+ # Test 'admin_password' w/o spaces
+ payload = """{'admin_password':'mypassword'}"""
+ expected = """{'admin_password':'***'}"""
+ self.assertEqual(expected, strutils.mask_password(payload))
+ # Test 'admin_password' with spaces
+ payload = """{ 'admin_password' : 'mypassword' }"""
+ expected = """{ 'admin_password' : '***' }"""
+ self.assertEqual(expected, strutils.mask_password(payload))
+ # Test 'password' w/o spaces
+ payload = """{'password':'mypassword'}"""
+ expected = """{'password':'***'}"""
+ self.assertEqual(expected, strutils.mask_password(payload))
+ # Test 'password' with spaces
+ payload = """{ 'password' : 'mypassword' }"""
+ expected = """{ 'password' : '***' }"""
+ self.assertEqual(expected, strutils.mask_password(payload))
+ # Test 'auth_password' w/o spaces
+ payload = """{'auth_password':'mypassword'}"""
+ expected = """{'auth_password':'***'}"""
+ self.assertEqual(expected, strutils.mask_password(payload))
+ # Test 'auth_password' with spaces
+ payload = """{ 'auth_password' : 'mypassword' }"""
+ expected = """{ 'auth_password' : '***' }"""
+ self.assertEqual(expected, strutils.mask_password(payload))
+ # Test 'secret_uuid' w/o spaces
+ payload = """{'secret_uuid':'myuuid'}"""
+ expected = """{'secret_uuid':'***'}"""
+ self.assertEqual(expected, strutils.mask_password(payload))
+ # Test 'secret_uuid' with spaces
+ payload = """{ 'secret_uuid' : 'myuuid' }"""
+ expected = """{ 'secret_uuid' : '***' }"""
+ self.assertEqual(expected, strutils.mask_password(payload))
+
+ def test_xml(self):
+ # Test 'adminPass' w/o spaces
+ payload = """<adminPass>mypassword</adminPass>"""
+ expected = """<adminPass>***</adminPass>"""
+ self.assertEqual(expected, strutils.mask_password(payload))
+ # Test 'adminPass' with spaces
+ payload = """<adminPass>
+ mypassword
+ </adminPass>"""
+ expected = """<adminPass>***</adminPass>"""
+ self.assertEqual(expected, strutils.mask_password(payload))
+ # Test 'admin_pass' w/o spaces
+ payload = """<admin_pass>mypassword</admin_pass>"""
+ expected = """<admin_pass>***</admin_pass>"""
+ self.assertEqual(expected, strutils.mask_password(payload))
+ # Test 'admin_pass' with spaces
+ payload = """<admin_pass>
+ mypassword
+ </admin_pass>"""
+ expected = """<admin_pass>***</admin_pass>"""
+ self.assertEqual(expected, strutils.mask_password(payload))
+ # Test 'admin_password' w/o spaces
+ payload = """<admin_password>mypassword</admin_password>"""
+ expected = """<admin_password>***</admin_password>"""
+ self.assertEqual(expected, strutils.mask_password(payload))
+ # Test 'admin_password' with spaces
+ payload = """<admin_password>
+ mypassword
+ </admin_password>"""
+ expected = """<admin_password>***</admin_password>"""
+ self.assertEqual(expected, strutils.mask_password(payload))
+ # Test 'password' w/o spaces
+ payload = """<password>mypassword</password>"""
+ expected = """<password>***</password>"""
+ self.assertEqual(expected, strutils.mask_password(payload))
+ # Test 'password' with spaces
+ payload = """<password>
+ mypassword
+ </password>"""
+ expected = """<password>***</password>"""
+ self.assertEqual(expected, strutils.mask_password(payload))
+
+ def test_xml_attribute(self):
+ # Test 'adminPass' w/o spaces
+ payload = """adminPass='mypassword'"""
+ expected = """adminPass='***'"""
+ self.assertEqual(expected, strutils.mask_password(payload))
+ # Test 'adminPass' with spaces
+ payload = """adminPass = 'mypassword'"""
+ expected = """adminPass = '***'"""
+ self.assertEqual(expected, strutils.mask_password(payload))
+ # Test 'adminPass' with double quotes
+ payload = """adminPass = "mypassword\""""
+ expected = """adminPass = "***\""""
+ self.assertEqual(expected, strutils.mask_password(payload))
+ # Test 'admin_pass' w/o spaces
+ payload = """admin_pass='mypassword'"""
+ expected = """admin_pass='***'"""
+ self.assertEqual(expected, strutils.mask_password(payload))
+ # Test 'admin_pass' with spaces
+ payload = """admin_pass = 'mypassword'"""
+ expected = """admin_pass = '***'"""
+ self.assertEqual(expected, strutils.mask_password(payload))
+ # Test 'admin_pass' with double quotes
+ payload = """admin_pass = "mypassword\""""
+ expected = """admin_pass = "***\""""
+ self.assertEqual(expected, strutils.mask_password(payload))
+ # Test 'admin_password' w/o spaces
+ payload = """admin_password='mypassword'"""
+ expected = """admin_password='***'"""
+ self.assertEqual(expected, strutils.mask_password(payload))
+ # Test 'admin_password' with spaces
+ payload = """admin_password = 'mypassword'"""
+ expected = """admin_password = '***'"""
+ self.assertEqual(expected, strutils.mask_password(payload))
+ # Test 'admin_password' with double quotes
+ payload = """admin_password = "mypassword\""""
+ expected = """admin_password = "***\""""
+ self.assertEqual(expected, strutils.mask_password(payload))
+ # Test 'password' w/o spaces
+ payload = """password='mypassword'"""
+ expected = """password='***'"""
+ self.assertEqual(expected, strutils.mask_password(payload))
+ # Test 'password' with spaces
+ payload = """password = 'mypassword'"""
+ expected = """password = '***'"""
+ self.assertEqual(expected, strutils.mask_password(payload))
+ # Test 'password' with double quotes
+ payload = """password = "mypassword\""""
+ expected = """password = "***\""""
+ self.assertEqual(expected, strutils.mask_password(payload))
+
+ def test_json_message(self):
+ payload = """body: {"changePassword": {"adminPass": "1234567"}}"""
+ expected = """body: {"changePassword": {"adminPass": "***"}}"""
+ self.assertEqual(expected, strutils.mask_password(payload))
+ payload = """body: {"rescue": {"admin_pass": "1234567"}}"""
+ expected = """body: {"rescue": {"admin_pass": "***"}}"""
+ self.assertEqual(expected, strutils.mask_password(payload))
+ payload = """body: {"rescue": {"admin_password": "1234567"}}"""
+ expected = """body: {"rescue": {"admin_password": "***"}}"""
+ self.assertEqual(expected, strutils.mask_password(payload))
+ payload = """body: {"rescue": {"password": "1234567"}}"""
+ expected = """body: {"rescue": {"password": "***"}}"""
+ self.assertEqual(expected, strutils.mask_password(payload))
+
+ def test_xml_message(self):
+ payload = """<?xml version="1.0" encoding="UTF-8"?>
+<rebuild
+ xmlns="http://docs.openstack.org/compute/api/v1.1"
+ name="foobar"
+ imageRef="http://openstack.example.com/v1.1/32278/images/70a599e0-31e7"
+ accessIPv4="1.2.3.4"
+ accessIPv6="fe80::100"
+ adminPass="seekr3t">
+ <metadata>
+ <meta key="My Server Name">Apache1</meta>
+ </metadata>
+</rebuild>"""
+ expected = """<?xml version="1.0" encoding="UTF-8"?>
+<rebuild
+ xmlns="http://docs.openstack.org/compute/api/v1.1"
+ name="foobar"
+ imageRef="http://openstack.example.com/v1.1/32278/images/70a599e0-31e7"
+ accessIPv4="1.2.3.4"
+ accessIPv6="fe80::100"
+ adminPass="***">
+ <metadata>
+ <meta key="My Server Name">Apache1</meta>
+ </metadata>
+</rebuild>"""
+ self.assertEqual(expected, strutils.mask_password(payload))
+ payload = """<?xml version="1.0" encoding="UTF-8"?>
+<rescue xmlns="http://docs.openstack.org/compute/api/v1.1"
+ admin_pass="MySecretPass"/>"""
+ expected = """<?xml version="1.0" encoding="UTF-8"?>
+<rescue xmlns="http://docs.openstack.org/compute/api/v1.1"
+ admin_pass="***"/>"""
+ self.assertEqual(expected, strutils.mask_password(payload))
+ payload = """<?xml version="1.0" encoding="UTF-8"?>
+<rescue xmlns="http://docs.openstack.org/compute/api/v1.1"
+ admin_password="MySecretPass"/>"""
+ expected = """<?xml version="1.0" encoding="UTF-8"?>
+<rescue xmlns="http://docs.openstack.org/compute/api/v1.1"
+ admin_password="***"/>"""
+ self.assertEqual(expected, strutils.mask_password(payload))
+ payload = """<?xml version="1.0" encoding="UTF-8"?>
+<rescue xmlns="http://docs.openstack.org/compute/api/v1.1"
+ password="MySecretPass"/>"""
+ expected = """<?xml version="1.0" encoding="UTF-8"?>
+<rescue xmlns="http://docs.openstack.org/compute/api/v1.1"
+ password="***"/>"""
+ self.assertEqual(expected, strutils.mask_password(payload))
+
+ def test_mask_password(self):
+ payload = "test = 'password' : 'aaaaaa'"
+ expected = "test = 'password' : '111'"
+ self.assertEqual(expected,
+ strutils.mask_password(payload, secret='111'))
+
+ payload = 'mysqld --password "aaaaaa"'
+ expected = 'mysqld --password "****"'
+ self.assertEqual(expected,
+ strutils.mask_password(payload, secret='****'))
+
+ payload = 'mysqld --password aaaaaa'
+ expected = 'mysqld --password ???'
+ self.assertEqual(expected,
+ strutils.mask_password(payload, secret='???'))
+
+ payload = 'mysqld --password = "aaaaaa"'
+ expected = 'mysqld --password = "****"'
+ self.assertEqual(expected,
+ strutils.mask_password(payload, secret='****'))
+
+ payload = "mysqld --password = 'aaaaaa'"
+ expected = "mysqld --password = '****'"
+ self.assertEqual(expected,
+ strutils.mask_password(payload, secret='****'))
+
+ payload = "mysqld --password = aaaaaa"
+ expected = "mysqld --password = ****"
+ self.assertEqual(expected,
+ strutils.mask_password(payload, secret='****'))
+
+ payload = "test = password = aaaaaa"
+ expected = "test = password = 111"
+ self.assertEqual(expected,
+ strutils.mask_password(payload, secret='111'))
+
+ payload = "test = password= aaaaaa"
+ expected = "test = password= 111"
+ self.assertEqual(expected,
+ strutils.mask_password(payload, secret='111'))
+
+ payload = "test = password =aaaaaa"
+ expected = "test = password =111"
+ self.assertEqual(expected,
+ strutils.mask_password(payload, secret='111'))
+
+ payload = "test = password=aaaaaa"
+ expected = "test = password=111"
+ self.assertEqual(expected,
+ strutils.mask_password(payload, secret='111'))
+
+ payload = 'test = "original_password" : "aaaaaaaaa"'
+ expected = 'test = "original_password" : "***"'
+ self.assertEqual(expected, strutils.mask_password(payload))
+
+ payload = 'test = "param1" : "value"'
+ expected = 'test = "param1" : "value"'
+ self.assertEqual(expected, strutils.mask_password(payload))
+
+ payload = """{'adminPass':'mypassword'}"""
+ payload = six.text_type(payload)
+ expected = """{'adminPass':'***'}"""
+ self.assertEqual(expected, strutils.mask_password(payload))
+
+ payload = ("test = 'node.session.auth.password','-v','mypassword',"
+ "'nomask'")
+ expected = ("test = 'node.session.auth.password','-v','***',"
+ "'nomask'")
+ self.assertEqual(expected, strutils.mask_password(payload))
+
+ payload = ("test = 'node.session.auth.password', '--password', "
+ "'mypassword', 'nomask'")
+ expected = ("test = 'node.session.auth.password', '--password', "
+ "'***', 'nomask'")
+ self.assertEqual(expected, strutils.mask_password(payload))
+
+ payload = ("test = 'node.session.auth.password', '--password', "
+ "'mypassword'")
+ expected = ("test = 'node.session.auth.password', '--password', "
+ "'***'")
+ self.assertEqual(expected, strutils.mask_password(payload))
+
+ payload = "test = node.session.auth.password -v mypassword nomask"
+ expected = "test = node.session.auth.password -v *** nomask"
+ self.assertEqual(expected, strutils.mask_password(payload))
+
+ payload = ("test = node.session.auth.password --password mypassword "
+ "nomask")
+ expected = ("test = node.session.auth.password --password *** "
+ "nomask")
+ self.assertEqual(expected, strutils.mask_password(payload))
+
+ payload = ("test = node.session.auth.password --password mypassword")
+ expected = ("test = node.session.auth.password --password ***")
+ self.assertEqual(expected, strutils.mask_password(payload))
+
+ payload = "test = cmd --password my\xe9\x80\x80pass"
+ expected = ("test = cmd --password ***")
+ self.assertEqual(expected, strutils.mask_password(payload))
+
+
+class IsIntLikeTestCase(test_base.BaseTestCase):
+ def test_is_int_like_true(self):
+ self.assertTrue(strutils.is_int_like(1))
+ self.assertTrue(strutils.is_int_like("1"))
+ self.assertTrue(strutils.is_int_like("514"))
+ self.assertTrue(strutils.is_int_like("0"))
+
+ def test_is_int_like_false(self):
+ self.assertFalse(strutils.is_int_like(1.1))
+ self.assertFalse(strutils.is_int_like("1.1"))
+ self.assertFalse(strutils.is_int_like("1.1.1"))
+ self.assertFalse(strutils.is_int_like(None))
+ self.assertFalse(strutils.is_int_like("0."))
+ self.assertFalse(strutils.is_int_like("aaaaaa"))
+ self.assertFalse(strutils.is_int_like("...."))
+ self.assertFalse(strutils.is_int_like("1g"))
+ self.assertFalse(
+ strutils.is_int_like("0cc3346e-9fef-4445-abe6-5d2b2690ec64"))
+ self.assertFalse(strutils.is_int_like("a1"))
+ # NOTE(viktors): 12e3 - is a float number
+ self.assertFalse(strutils.is_int_like("12e3"))
+ # NOTE(viktors): Check integer numbers with base not 10
+ self.assertFalse(strutils.is_int_like("0o51"))
+ self.assertFalse(strutils.is_int_like("0xDEADBEEF"))
diff --git a/oslo_utils/tests/test_timeutils.py b/oslo_utils/tests/test_timeutils.py
new file mode 100644
index 0000000..4e24adb
--- /dev/null
+++ b/oslo_utils/tests/test_timeutils.py
@@ -0,0 +1,340 @@
+# Copyright 2011 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import calendar
+import datetime
+import time
+
+import iso8601
+import mock
+from oslotest import base as test_base
+from testtools import matchers
+
+from oslo_utils import timeutils
+
+
+class TimeUtilsTest(test_base.BaseTestCase):
+
+ def setUp(self):
+ super(TimeUtilsTest, self).setUp()
+ self.skynet_self_aware_time_str = '1997-08-29T06:14:00Z'
+ self.skynet_self_aware_time_ms_str = '1997-08-29T06:14:00.000123Z'
+ self.skynet_self_aware_time = datetime.datetime(1997, 8, 29, 6, 14, 0)
+ self.skynet_self_aware_ms_time = datetime.datetime(
+ 1997, 8, 29, 6, 14, 0, 123)
+ self.one_minute_before = datetime.datetime(1997, 8, 29, 6, 13, 0)
+ self.one_minute_after = datetime.datetime(1997, 8, 29, 6, 15, 0)
+ self.skynet_self_aware_time_perfect_str = '1997-08-29T06:14:00.000000'
+ self.skynet_self_aware_time_perfect = datetime.datetime(1997, 8, 29,
+ 6, 14, 0)
+ self.addCleanup(timeutils.clear_time_override)
+
+ def test_isotime(self):
+ with mock.patch('datetime.datetime') as datetime_mock:
+ datetime_mock.utcnow.return_value = self.skynet_self_aware_time
+ dt = timeutils.isotime()
+ self.assertEqual(dt, self.skynet_self_aware_time_str)
+
+ def test_isotimei_micro_second_precision(self):
+ with mock.patch('datetime.datetime') as datetime_mock:
+ datetime_mock.utcnow.return_value = self.skynet_self_aware_ms_time
+ dt = timeutils.isotime(subsecond=True)
+ self.assertEqual(dt, self.skynet_self_aware_time_ms_str)
+
+ def test_parse_isotime(self):
+ expect = timeutils.parse_isotime(self.skynet_self_aware_time_str)
+ skynet_self_aware_time_utc = self.skynet_self_aware_time.replace(
+ tzinfo=iso8601.iso8601.UTC)
+ self.assertEqual(skynet_self_aware_time_utc, expect)
+
+ def test_parse_isotime_micro_second_precision(self):
+ expect = timeutils.parse_isotime(self.skynet_self_aware_time_ms_str)
+ skynet_self_aware_time_ms_utc = self.skynet_self_aware_ms_time.replace(
+ tzinfo=iso8601.iso8601.UTC)
+ self.assertEqual(skynet_self_aware_time_ms_utc, expect)
+
+ def test_strtime(self):
+ expect = timeutils.strtime(self.skynet_self_aware_time_perfect)
+ self.assertEqual(self.skynet_self_aware_time_perfect_str, expect)
+
+ def test_parse_strtime(self):
+ perfect_time_format = self.skynet_self_aware_time_perfect_str
+ expect = timeutils.parse_strtime(perfect_time_format)
+ self.assertEqual(self.skynet_self_aware_time_perfect, expect)
+
+ def test_strtime_and_back(self):
+ orig_t = datetime.datetime(1997, 8, 29, 6, 14, 0)
+ s = timeutils.strtime(orig_t)
+ t = timeutils.parse_strtime(s)
+ self.assertEqual(orig_t, t)
+
+ def _test_is_older_than(self, fn):
+ strptime = datetime.datetime.strptime
+ with mock.patch('datetime.datetime') as datetime_mock:
+ datetime_mock.utcnow.return_value = self.skynet_self_aware_time
+ datetime_mock.strptime = strptime
+ expect_true = timeutils.is_older_than(fn(self.one_minute_before),
+ 59)
+ self.assertTrue(expect_true)
+ expect_false = timeutils.is_older_than(fn(self.one_minute_before),
+ 60)
+ self.assertFalse(expect_false)
+ expect_false = timeutils.is_older_than(fn(self.one_minute_before),
+ 61)
+ self.assertFalse(expect_false)
+
+ def test_is_older_than_datetime(self):
+ self._test_is_older_than(lambda x: x)
+
+ def test_is_older_than_str(self):
+ self._test_is_older_than(timeutils.strtime)
+
+ def test_is_older_than_aware(self):
+ """Tests sending is_older_than an 'aware' datetime."""
+ self._test_is_older_than(lambda x: x.replace(
+ tzinfo=iso8601.iso8601.UTC))
+
+ def _test_is_newer_than(self, fn):
+ strptime = datetime.datetime.strptime
+ with mock.patch('datetime.datetime') as datetime_mock:
+ datetime_mock.utcnow.return_value = self.skynet_self_aware_time
+ datetime_mock.strptime = strptime
+ expect_true = timeutils.is_newer_than(fn(self.one_minute_after),
+ 59)
+ self.assertTrue(expect_true)
+ expect_false = timeutils.is_newer_than(fn(self.one_minute_after),
+ 60)
+ self.assertFalse(expect_false)
+ expect_false = timeutils.is_newer_than(fn(self.one_minute_after),
+ 61)
+ self.assertFalse(expect_false)
+
+ def test_is_newer_than_datetime(self):
+ self._test_is_newer_than(lambda x: x)
+
+ def test_is_newer_than_str(self):
+ self._test_is_newer_than(timeutils.strtime)
+
+ def test_is_newer_than_aware(self):
+ """Tests sending is_newer_than an 'aware' datetime."""
+ self._test_is_newer_than(lambda x: x.replace(
+ tzinfo=iso8601.iso8601.UTC))
+
+ def test_set_time_override_using_default(self):
+ now = timeutils.utcnow_ts()
+
+ # NOTE(kgriffs): Normally it's bad form to sleep in a unit test,
+ # but this is the only way to test that set_time_override defaults
+ # to setting the override to the current time.
+ time.sleep(1)
+
+ timeutils.set_time_override()
+ overriden_now = timeutils.utcnow_ts()
+ self.assertThat(now, matchers.LessThan(overriden_now))
+
+ def test_utcnow_ts(self):
+ skynet_self_aware_ts = 872835240
+ skynet_dt = datetime.datetime.utcfromtimestamp(skynet_self_aware_ts)
+ self.assertEqual(self.skynet_self_aware_time, skynet_dt)
+
+ # NOTE(kgriffs): timeutils.utcnow_ts() uses time.time()
+ # IFF time override is not set.
+ with mock.patch('time.time') as time_mock:
+ time_mock.return_value = skynet_self_aware_ts
+ ts = timeutils.utcnow_ts()
+ self.assertEqual(ts, skynet_self_aware_ts)
+
+ timeutils.set_time_override(skynet_dt)
+ ts = timeutils.utcnow_ts()
+ self.assertEqual(ts, skynet_self_aware_ts)
+
+ def test_utcnow(self):
+ timeutils.set_time_override(mock.sentinel.utcnow)
+ self.assertEqual(timeutils.utcnow(), mock.sentinel.utcnow)
+
+ timeutils.clear_time_override()
+ self.assertFalse(timeutils.utcnow() == mock.sentinel.utcnow)
+
+ self.assertTrue(timeutils.utcnow())
+
+ def test_advance_time_delta(self):
+ timeutils.set_time_override(self.one_minute_before)
+ timeutils.advance_time_delta(datetime.timedelta(seconds=60))
+ self.assertEqual(timeutils.utcnow(), self.skynet_self_aware_time)
+
+ def test_advance_time_seconds(self):
+ timeutils.set_time_override(self.one_minute_before)
+ timeutils.advance_time_seconds(60)
+ self.assertEqual(timeutils.utcnow(), self.skynet_self_aware_time)
+
+ def test_marshall_time(self):
+ now = timeutils.utcnow()
+ binary = timeutils.marshall_now(now)
+ backagain = timeutils.unmarshall_time(binary)
+ self.assertEqual(now, backagain)
+
+ def test_delta_seconds(self):
+ before = timeutils.utcnow()
+ after = before + datetime.timedelta(days=7, seconds=59,
+ microseconds=123456)
+ self.assertAlmostEquals(604859.123456,
+ timeutils.delta_seconds(before, after))
+
+ def test_total_seconds(self):
+ delta = datetime.timedelta(days=1, hours=2, minutes=3, seconds=4.5)
+ self.assertAlmostEquals(93784.5,
+ timeutils.total_seconds(delta))
+
+ def test_iso8601_from_timestamp(self):
+ utcnow = timeutils.utcnow()
+ iso = timeutils.isotime(utcnow)
+ ts = calendar.timegm(utcnow.timetuple())
+ self.assertEqual(iso, timeutils.iso8601_from_timestamp(ts))
+
+ def test_is_soon(self):
+ expires = timeutils.utcnow() + datetime.timedelta(minutes=5)
+ self.assertFalse(timeutils.is_soon(expires, 120))
+ self.assertTrue(timeutils.is_soon(expires, 300))
+ self.assertTrue(timeutils.is_soon(expires, 600))
+
+ with mock.patch('datetime.datetime') as datetime_mock:
+ datetime_mock.utcnow.return_value = self.skynet_self_aware_time
+ expires = timeutils.utcnow()
+ self.assertTrue(timeutils.is_soon(expires, 0))
+
+
+class TestIso8601Time(test_base.BaseTestCase):
+
+ def _instaneous(self, timestamp, yr, mon, day, hr, minute, sec, micro):
+ self.assertEqual(timestamp.year, yr)
+ self.assertEqual(timestamp.month, mon)
+ self.assertEqual(timestamp.day, day)
+ self.assertEqual(timestamp.hour, hr)
+ self.assertEqual(timestamp.minute, minute)
+ self.assertEqual(timestamp.second, sec)
+ self.assertEqual(timestamp.microsecond, micro)
+
+ def _do_test(self, time_str, yr, mon, day, hr, minute, sec, micro, shift):
+ DAY_SECONDS = 24 * 60 * 60
+ timestamp = timeutils.parse_isotime(time_str)
+ self._instaneous(timestamp, yr, mon, day, hr, minute, sec, micro)
+ offset = timestamp.tzinfo.utcoffset(None)
+ self.assertEqual(offset.seconds + offset.days * DAY_SECONDS, shift)
+
+ def test_zulu(self):
+ time_str = '2012-02-14T20:53:07Z'
+ self._do_test(time_str, 2012, 2, 14, 20, 53, 7, 0, 0)
+
+ def test_zulu_micros(self):
+ time_str = '2012-02-14T20:53:07.123Z'
+ self._do_test(time_str, 2012, 2, 14, 20, 53, 7, 123000, 0)
+
+ def test_offset_east(self):
+ time_str = '2012-02-14T20:53:07+04:30'
+ offset = 4.5 * 60 * 60
+ self._do_test(time_str, 2012, 2, 14, 20, 53, 7, 0, offset)
+
+ def test_offset_east_micros(self):
+ time_str = '2012-02-14T20:53:07.42+04:30'
+ offset = 4.5 * 60 * 60
+ self._do_test(time_str, 2012, 2, 14, 20, 53, 7, 420000, offset)
+
+ def test_offset_west(self):
+ time_str = '2012-02-14T20:53:07-05:30'
+ offset = -5.5 * 60 * 60
+ self._do_test(time_str, 2012, 2, 14, 20, 53, 7, 0, offset)
+
+ def test_offset_west_micros(self):
+ time_str = '2012-02-14T20:53:07.654321-05:30'
+ offset = -5.5 * 60 * 60
+ self._do_test(time_str, 2012, 2, 14, 20, 53, 7, 654321, offset)
+
+ def test_compare(self):
+ zulu = timeutils.parse_isotime('2012-02-14T20:53:07')
+ east = timeutils.parse_isotime('2012-02-14T20:53:07-01:00')
+ west = timeutils.parse_isotime('2012-02-14T20:53:07+01:00')
+ self.assertTrue(east > west)
+ self.assertTrue(east > zulu)
+ self.assertTrue(zulu > west)
+
+ def test_compare_micros(self):
+ zulu = timeutils.parse_isotime('2012-02-14T20:53:07.6544')
+ east = timeutils.parse_isotime('2012-02-14T19:53:07.654321-01:00')
+ west = timeutils.parse_isotime('2012-02-14T21:53:07.655+01:00')
+ self.assertTrue(east < west)
+ self.assertTrue(east < zulu)
+ self.assertTrue(zulu < west)
+
+ def test_zulu_roundtrip(self):
+ time_str = '2012-02-14T20:53:07Z'
+ zulu = timeutils.parse_isotime(time_str)
+ self.assertEqual(zulu.tzinfo, iso8601.iso8601.UTC)
+ self.assertEqual(timeutils.isotime(zulu), time_str)
+
+ def test_east_roundtrip(self):
+ time_str = '2012-02-14T20:53:07-07:00'
+ east = timeutils.parse_isotime(time_str)
+ self.assertEqual(east.tzinfo.tzname(None), '-07:00')
+ self.assertEqual(timeutils.isotime(east), time_str)
+
+ def test_west_roundtrip(self):
+ time_str = '2012-02-14T20:53:07+11:30'
+ west = timeutils.parse_isotime(time_str)
+ self.assertEqual(west.tzinfo.tzname(None), '+11:30')
+ self.assertEqual(timeutils.isotime(west), time_str)
+
+ def test_now_roundtrip(self):
+ time_str = timeutils.isotime()
+ now = timeutils.parse_isotime(time_str)
+ self.assertEqual(now.tzinfo, iso8601.iso8601.UTC)
+ self.assertEqual(timeutils.isotime(now), time_str)
+
+ def test_zulu_normalize(self):
+ time_str = '2012-02-14T20:53:07Z'
+ zulu = timeutils.parse_isotime(time_str)
+ normed = timeutils.normalize_time(zulu)
+ self._instaneous(normed, 2012, 2, 14, 20, 53, 7, 0)
+
+ def test_east_normalize(self):
+ time_str = '2012-02-14T20:53:07-07:00'
+ east = timeutils.parse_isotime(time_str)
+ normed = timeutils.normalize_time(east)
+ self._instaneous(normed, 2012, 2, 15, 3, 53, 7, 0)
+
+ def test_west_normalize(self):
+ time_str = '2012-02-14T20:53:07+21:00'
+ west = timeutils.parse_isotime(time_str)
+ normed = timeutils.normalize_time(west)
+ self._instaneous(normed, 2012, 2, 13, 23, 53, 7, 0)
+
+ def test_normalize_aware_to_naive(self):
+ dt = datetime.datetime(2011, 2, 14, 20, 53, 7)
+ time_str = '2011-02-14T20:53:07+21:00'
+ aware = timeutils.parse_isotime(time_str)
+ naive = timeutils.normalize_time(aware)
+ self.assertTrue(naive < dt)
+
+ def test_normalize_zulu_aware_to_naive(self):
+ dt = datetime.datetime(2011, 2, 14, 20, 53, 7)
+ time_str = '2011-02-14T19:53:07Z'
+ aware = timeutils.parse_isotime(time_str)
+ naive = timeutils.normalize_time(aware)
+ self.assertTrue(naive < dt)
+
+ def test_normalize_naive(self):
+ dt = datetime.datetime(2011, 2, 14, 20, 53, 7)
+ dtn = datetime.datetime(2011, 2, 14, 19, 53, 7)
+ naive = timeutils.normalize_time(dtn)
+ self.assertTrue(naive < dt)
diff --git a/oslo_utils/tests/test_uuidutils.py b/oslo_utils/tests/test_uuidutils.py
new file mode 100644
index 0000000..95ede43
--- /dev/null
+++ b/oslo_utils/tests/test_uuidutils.py
@@ -0,0 +1,54 @@
+# Copyright (c) 2012 Intel Corporation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import uuid
+
+from oslotest import base as test_base
+
+from oslo_utils import uuidutils
+
+
+class UUIDUtilsTest(test_base.BaseTestCase):
+
+ def test_generate_uuid(self):
+ uuid_string = uuidutils.generate_uuid()
+ self.assertTrue(isinstance(uuid_string, str))
+ self.assertEqual(len(uuid_string), 36)
+ # make sure there are 4 dashes
+ self.assertEqual(len(uuid_string.replace('-', '')), 32)
+
+ def test_is_uuid_like(self):
+ self.assertTrue(uuidutils.is_uuid_like(str(uuid.uuid4())))
+ self.assertTrue(uuidutils.is_uuid_like(
+ '{12345678-1234-5678-1234-567812345678}'))
+ self.assertTrue(uuidutils.is_uuid_like(
+ '12345678123456781234567812345678'))
+ self.assertTrue(uuidutils.is_uuid_like(
+ 'urn:uuid:12345678-1234-5678-1234-567812345678'))
+ self.assertTrue(uuidutils.is_uuid_like(
+ 'urn:bbbaaaaa-aaaa-aaaa-aabb-bbbbbbbbbbbb'))
+ self.assertTrue(uuidutils.is_uuid_like(
+ 'uuid:bbbaaaaa-aaaa-aaaa-aabb-bbbbbbbbbbbb'))
+ self.assertTrue(uuidutils.is_uuid_like(
+ '{}---bbb---aaa--aaa--aaa-----aaa---aaa--bbb-bbb---bbb-bbb-bb-{}'))
+
+ def test_is_uuid_like_insensitive(self):
+ self.assertTrue(uuidutils.is_uuid_like(str(uuid.uuid4()).upper()))
+
+ def test_id_is_uuid_like(self):
+ self.assertFalse(uuidutils.is_uuid_like(1234567))
+
+ def test_name_is_uuid_like(self):
+ self.assertFalse(uuidutils.is_uuid_like('zhongyueluo'))
diff --git a/oslo_utils/tests/tests_encodeutils.py b/oslo_utils/tests/tests_encodeutils.py
new file mode 100644
index 0000000..6af4bfb
--- /dev/null
+++ b/oslo_utils/tests/tests_encodeutils.py
@@ -0,0 +1,105 @@
+# -*- coding: utf-8 -*-
+
+# Copyright 2014 Red Hat, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import mock
+from oslotest import base as test_base
+import six
+
+from oslo_utils import encodeutils
+
+
+class EncodeUtilsTest(test_base.BaseTestCase):
+
+ def test_safe_decode(self):
+ safe_decode = encodeutils.safe_decode
+ self.assertRaises(TypeError, safe_decode, True)
+ self.assertEqual(six.u('ni\xf1o'), safe_decode(six.b("ni\xc3\xb1o"),
+ incoming="utf-8"))
+ if six.PY2:
+ # In Python 3, bytes.decode() doesn't support anymore
+ # bytes => bytes encodings like base64
+ self.assertEqual(six.u("test"), safe_decode("dGVzdA==",
+ incoming='base64'))
+
+ self.assertEqual(six.u("strange"), safe_decode(six.b('\x80strange'),
+ errors='ignore'))
+
+ self.assertEqual(six.u('\xc0'), safe_decode(six.b('\xc0'),
+ incoming='iso-8859-1'))
+
+ # Forcing incoming to ascii so it falls back to utf-8
+ self.assertEqual(six.u('ni\xf1o'), safe_decode(six.b('ni\xc3\xb1o'),
+ incoming='ascii'))
+
+ self.assertEqual(six.u('foo'), safe_decode(b'foo'))
+
+ def test_safe_encode_none_instead_of_text(self):
+ self.assertRaises(TypeError, encodeutils.safe_encode, None)
+
+ def test_safe_encode_bool_instead_of_text(self):
+ self.assertRaises(TypeError, encodeutils.safe_encode, True)
+
+ def test_safe_encode_int_instead_of_text(self):
+ self.assertRaises(TypeError, encodeutils.safe_encode, 1)
+
+ def test_safe_encode_list_instead_of_text(self):
+ self.assertRaises(TypeError, encodeutils.safe_encode, [])
+
+ def test_safe_encode_dict_instead_of_text(self):
+ self.assertRaises(TypeError, encodeutils.safe_encode, {})
+
+ def test_safe_encode_tuple_instead_of_text(self):
+ self.assertRaises(TypeError, encodeutils.safe_encode, ('foo', 'bar', ))
+
+ def test_safe_encode_py2(self):
+ if six.PY2:
+ # In Python 3, str.encode() doesn't support anymore
+ # text => text encodings like base64
+ self.assertEqual(
+ six.b("dGVzdA==\n"),
+ encodeutils.safe_encode("test", encoding='base64'),
+ )
+ else:
+ self.skipTest("Requires py2.x")
+
+ def test_safe_encode_force_incoming_utf8_to_ascii(self):
+ # Forcing incoming to ascii so it falls back to utf-8
+ self.assertEqual(
+ six.b('ni\xc3\xb1o'),
+ encodeutils.safe_encode(six.b('ni\xc3\xb1o'), incoming='ascii'),
+ )
+
+ def test_safe_encode_same_encoding_different_cases(self):
+ with mock.patch.object(encodeutils, 'safe_decode', mock.Mock()):
+ utf8 = encodeutils.safe_encode(
+ six.u('foo\xf1bar'), encoding='utf-8')
+ self.assertEqual(
+ encodeutils.safe_encode(utf8, 'UTF-8', 'utf-8'),
+ encodeutils.safe_encode(utf8, 'utf-8', 'UTF-8'),
+ )
+ self.assertEqual(
+ encodeutils.safe_encode(utf8, 'UTF-8', 'utf-8'),
+ encodeutils.safe_encode(utf8, 'utf-8', 'utf-8'),
+ )
+ encodeutils.safe_decode.assert_has_calls([])
+
+ def test_safe_encode_different_encodings(self):
+ text = six.u('foo\xc3\xb1bar')
+ result = encodeutils.safe_encode(
+ text=text, incoming='utf-8', encoding='iso-8859-1')
+ self.assertNotEqual(text, result)
+ self.assertNotEqual(six.b("foo\xf1bar"), result)
diff --git a/oslo_utils/timeutils.py b/oslo_utils/timeutils.py
new file mode 100644
index 0000000..c48da95
--- /dev/null
+++ b/oslo_utils/timeutils.py
@@ -0,0 +1,210 @@
+# Copyright 2011 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Time related utilities and helper functions.
+"""
+
+import calendar
+import datetime
+import time
+
+import iso8601
+import six
+
+
+# ISO 8601 extended time format with microseconds
+_ISO8601_TIME_FORMAT_SUBSECOND = '%Y-%m-%dT%H:%M:%S.%f'
+_ISO8601_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S'
+PERFECT_TIME_FORMAT = _ISO8601_TIME_FORMAT_SUBSECOND
+
+
+def isotime(at=None, subsecond=False):
+ """Stringify time in ISO 8601 format."""
+ if not at:
+ at = utcnow()
+ st = at.strftime(_ISO8601_TIME_FORMAT
+ if not subsecond
+ else _ISO8601_TIME_FORMAT_SUBSECOND)
+ tz = at.tzinfo.tzname(None) if at.tzinfo else 'UTC'
+ st += ('Z' if tz == 'UTC' else tz)
+ return st
+
+
+def parse_isotime(timestr):
+ """Parse time from ISO 8601 format."""
+ try:
+ return iso8601.parse_date(timestr)
+ except iso8601.ParseError as e:
+ raise ValueError(six.text_type(e))
+ except TypeError as e:
+ raise ValueError(six.text_type(e))
+
+
+def strtime(at=None, fmt=PERFECT_TIME_FORMAT):
+ """Returns formatted utcnow."""
+ if not at:
+ at = utcnow()
+ return at.strftime(fmt)
+
+
+def parse_strtime(timestr, fmt=PERFECT_TIME_FORMAT):
+ """Turn a formatted time back into a datetime."""
+ return datetime.datetime.strptime(timestr, fmt)
+
+
+def normalize_time(timestamp):
+ """Normalize time in arbitrary timezone to UTC naive object."""
+ offset = timestamp.utcoffset()
+ if offset is None:
+ return timestamp
+ return timestamp.replace(tzinfo=None) - offset
+
+
+def is_older_than(before, seconds):
+ """Return True if before is older than seconds."""
+ if isinstance(before, six.string_types):
+ before = parse_strtime(before).replace(tzinfo=None)
+ else:
+ before = before.replace(tzinfo=None)
+
+ return utcnow() - before > datetime.timedelta(seconds=seconds)
+
+
+def is_newer_than(after, seconds):
+ """Return True if after is newer than seconds."""
+ if isinstance(after, six.string_types):
+ after = parse_strtime(after).replace(tzinfo=None)
+ else:
+ after = after.replace(tzinfo=None)
+
+ return after - utcnow() > datetime.timedelta(seconds=seconds)
+
+
+def utcnow_ts():
+ """Timestamp version of our utcnow function."""
+ if utcnow.override_time is None:
+ # NOTE(kgriffs): This is several times faster
+ # than going through calendar.timegm(...)
+ return int(time.time())
+
+ return calendar.timegm(utcnow().timetuple())
+
+
+def utcnow():
+ """Overridable version of utils.utcnow."""
+ if utcnow.override_time:
+ try:
+ return utcnow.override_time.pop(0)
+ except AttributeError:
+ return utcnow.override_time
+ return datetime.datetime.utcnow()
+
+
+def iso8601_from_timestamp(timestamp):
+ """Returns an iso8601 formatted date from timestamp."""
+ return isotime(datetime.datetime.utcfromtimestamp(timestamp))
+
+
+utcnow.override_time = None
+
+
+def set_time_override(override_time=None):
+ """Overrides utils.utcnow.
+
+ Make it return a constant time or a list thereof, one at a time.
+
+ :param override_time: datetime instance or list thereof. If not
+ given, defaults to the current UTC time.
+ """
+ utcnow.override_time = override_time or datetime.datetime.utcnow()
+
+
+def advance_time_delta(timedelta):
+ """Advance overridden time using a datetime.timedelta."""
+ assert utcnow.override_time is not None
+ try:
+ for dt in utcnow.override_time:
+ dt += timedelta
+ except TypeError:
+ utcnow.override_time += timedelta
+
+
+def advance_time_seconds(seconds):
+ """Advance overridden time by seconds."""
+ advance_time_delta(datetime.timedelta(0, seconds))
+
+
+def clear_time_override():
+ """Remove the overridden time."""
+ utcnow.override_time = None
+
+
+def marshall_now(now=None):
+ """Make an rpc-safe datetime with microseconds.
+
+ Note: tzinfo is stripped, but not required for relative times.
+ """
+ if not now:
+ now = utcnow()
+ return dict(day=now.day, month=now.month, year=now.year, hour=now.hour,
+ minute=now.minute, second=now.second,
+ microsecond=now.microsecond)
+
+
+def unmarshall_time(tyme):
+ """Unmarshall a datetime dict."""
+ return datetime.datetime(day=tyme['day'],
+ month=tyme['month'],
+ year=tyme['year'],
+ hour=tyme['hour'],
+ minute=tyme['minute'],
+ second=tyme['second'],
+ microsecond=tyme['microsecond'])
+
+
+def delta_seconds(before, after):
+ """Return the difference between two timing objects.
+
+ Compute the difference in seconds between two date, time, or
+ datetime objects (as a float, to microsecond resolution).
+ """
+ delta = after - before
+ return total_seconds(delta)
+
+
+def total_seconds(delta):
+ """Return the total seconds of datetime.timedelta object.
+
+ Compute total seconds of datetime.timedelta, datetime.timedelta
+ doesn't have method total_seconds in Python2.6, calculate it manually.
+ """
+ try:
+ return delta.total_seconds()
+ except AttributeError:
+ return ((delta.days * 24 * 3600) + delta.seconds +
+ float(delta.microseconds) / (10 ** 6))
+
+
+def is_soon(dt, window):
+ """Determines if time is going to happen in the next window seconds.
+
+ :param dt: the time
+ :param window: minimum seconds to remain to consider the time not soon
+
+ :return: True if expiration is within the given duration
+ """
+ soon = (utcnow() + datetime.timedelta(seconds=window))
+ return normalize_time(dt) <= soon
diff --git a/oslo_utils/units.py b/oslo_utils/units.py
new file mode 100644
index 0000000..4817da5
--- /dev/null
+++ b/oslo_utils/units.py
@@ -0,0 +1,38 @@
+# Copyright 2013 IBM Corp
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Unit constants
+"""
+
+# Binary unit constants.
+Ki = 1024
+Mi = 1024 ** 2
+Gi = 1024 ** 3
+Ti = 1024 ** 4
+Pi = 1024 ** 5
+Ei = 1024 ** 6
+Zi = 1024 ** 7
+Yi = 1024 ** 8
+
+# Decimal unit constants.
+k = 1000
+M = 1000 ** 2
+G = 1000 ** 3
+T = 1000 ** 4
+P = 1000 ** 5
+E = 1000 ** 6
+Z = 1000 ** 7
+Y = 1000 ** 8
diff --git a/oslo_utils/uuidutils.py b/oslo_utils/uuidutils.py
new file mode 100644
index 0000000..62b0b5f
--- /dev/null
+++ b/oslo_utils/uuidutils.py
@@ -0,0 +1,45 @@
+# Copyright (c) 2012 Intel Corporation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+UUID related utilities and helper functions.
+"""
+
+import uuid
+
+
+def generate_uuid():
+ return str(uuid.uuid4())
+
+
+def _format_uuid_string(string):
+ return (string.replace('urn:', '')
+ .replace('uuid:', '')
+ .strip('{}')
+ .replace('-', '')
+ .lower())
+
+
+def is_uuid_like(val):
+ """Returns validation of a value as a UUID.
+
+ :param val: Value to verify
+ :type val: string
+ :returns: bool
+ """
+ try:
+ return str(uuid.UUID(val)).replace('-', '') == _format_uuid_string(val)
+ except (TypeError, ValueError, AttributeError):
+ return False
diff --git a/setup.cfg b/setup.cfg
index debfb97..0b58de7 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -22,6 +22,7 @@ classifier =
[files]
packages =
oslo
+ oslo_utils
namespace_packages =
oslo
diff --git a/tests/test_importutils.py b/tests/test_importutils.py
index 579bf76..c82b8fa 100644
--- a/tests/test_importutils.py
+++ b/tests/test_importutils.py
@@ -39,64 +39,67 @@ class ImportUtilsTest(test_base.BaseTestCase):
self.assertEqual(sys.modules['datetime'], dt)
def test_import_object_optional_arg_not_present(self):
- obj = importutils.import_object('tests.fake.FakeDriver')
+ obj = importutils.import_object('oslo_utils.tests.fake.FakeDriver')
self.assertEqual(obj.__class__.__name__, 'FakeDriver')
def test_import_object_optional_arg_present(self):
- obj = importutils.import_object('tests.fake.FakeDriver',
+ obj = importutils.import_object('oslo_utils.tests.fake.FakeDriver',
first_arg=False)
self.assertEqual(obj.__class__.__name__, 'FakeDriver')
def test_import_object_required_arg_not_present(self):
# arg 1 isn't optional here
self.assertRaises(TypeError, importutils.import_object,
- 'tests.fake.FakeDriver2')
+ 'oslo_utils.tests.fake.FakeDriver2')
def test_import_object_required_arg_present(self):
- obj = importutils.import_object('tests.fake.FakeDriver2',
+ obj = importutils.import_object('oslo_utils.tests.fake.FakeDriver2',
first_arg=False)
self.assertEqual(obj.__class__.__name__, 'FakeDriver2')
# namespace tests
def test_import_object_ns_optional_arg_not_present(self):
- obj = importutils.import_object_ns('tests', 'fake.FakeDriver')
+ obj = importutils.import_object_ns('oslo_utils',
+ 'tests.fake.FakeDriver')
self.assertEqual(obj.__class__.__name__, 'FakeDriver')
def test_import_object_ns_optional_arg_present(self):
- obj = importutils.import_object_ns('tests', 'fake.FakeDriver',
+ obj = importutils.import_object_ns('oslo_utils',
+ 'tests.fake.FakeDriver',
first_arg=False)
self.assertEqual(obj.__class__.__name__, 'FakeDriver')
def test_import_object_ns_required_arg_not_present(self):
# arg 1 isn't optional here
self.assertRaises(TypeError, importutils.import_object_ns,
- 'tests', 'fake.FakeDriver2')
+ 'oslo_utils', 'tests.fake.FakeDriver2')
def test_import_object_ns_required_arg_present(self):
- obj = importutils.import_object_ns('tests', 'fake.FakeDriver2',
+ obj = importutils.import_object_ns('oslo_utils',
+ 'tests.fake.FakeDriver2',
first_arg=False)
self.assertEqual(obj.__class__.__name__, 'FakeDriver2')
# namespace tests
def test_import_object_ns_full_optional_arg_not_present(self):
obj = importutils.import_object_ns('tests2',
- 'tests.fake.FakeDriver')
+ 'oslo_utils.tests.fake.FakeDriver')
self.assertEqual(obj.__class__.__name__, 'FakeDriver')
def test_import_object_ns_full_optional_arg_present(self):
obj = importutils.import_object_ns('tests2',
- 'tests.fake.FakeDriver',
+ 'oslo_utils.tests.fake.FakeDriver',
first_arg=False)
self.assertEqual(obj.__class__.__name__, 'FakeDriver')
def test_import_object_ns_full_required_arg_not_present(self):
# arg 1 isn't optional here
self.assertRaises(TypeError, importutils.import_object_ns,
- 'tests2', 'tests.fake.FakeDriver2')
+ 'tests2', 'oslo_utils.tests.fake.FakeDriver2')
def test_import_object_ns_full_required_arg_present(self):
obj = importutils.import_object_ns('tests2',
- 'tests.fake.FakeDriver2',
+ 'oslo_utils.tests.fake.FakeDriver2',
first_arg=False)
self.assertEqual(obj.__class__.__name__, 'FakeDriver2')
diff --git a/tests/test_netutils.py b/tests/test_netutils.py
index 3ed240d..0aa59cf 100644
--- a/tests/test_netutils.py
+++ b/tests/test_netutils.py
@@ -16,8 +16,6 @@
import socket
import mock
-from mock import patch
-import netifaces
from oslotest import base as test_base
from oslo.utils import netutils
@@ -198,27 +196,9 @@ class NetworkUtilsTest(test_base.BaseTestCase):
self.assertEqual(addr, '1.2.3.4')
@mock.patch('socket.socket')
- @mock.patch('oslo.utils.netutils._get_my_ipv4_address')
+ @mock.patch('oslo_utils.netutils._get_my_ipv4_address')
def test_get_my_ip_socket_error(self, ip, mock_socket):
mock_socket.side_effect = socket.error
ip.return_value = '1.2.3.4'
addr = netutils.get_my_ipv4()
self.assertEqual(addr, '1.2.3.4')
-
- @mock.patch('netifaces.gateways')
- @mock.patch('netifaces.ifaddresses')
- def test_get_my_ipv4_address_with_default_route(
- self, ifaddr, gateways):
- with patch.dict(netifaces.__dict__, {'AF_INET': '0'}):
- ifaddr.return_value = {'0': [{'addr': '172.18.204.1'}]}
- addr = netutils._get_my_ipv4_address()
- self.assertEqual('172.18.204.1', addr)
-
- @mock.patch('netifaces.gateways')
- @mock.patch('netifaces.ifaddresses')
- def test_get_my_ipv4_address_without_default_route(
- self, ifaddr, gateways):
- with patch.dict(netifaces.__dict__, {'AF_INET': '0'}):
- ifaddr.return_value = {}
- addr = netutils._get_my_ipv4_address()
- self.assertEqual('127.0.0.1', addr)
diff --git a/tests/test_warning.py b/tests/test_warning.py
new file mode 100644
index 0000000..306d93d
--- /dev/null
+++ b/tests/test_warning.py
@@ -0,0 +1,61 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import imp
+import os
+import warnings
+
+import mock
+from oslotest import base as test_base
+import six
+
+
+class DeprecationWarningTest(test_base.BaseTestCase):
+
+ @mock.patch('warnings.warn')
+ def test_warning(self, mock_warn):
+ import oslo.utils
+ imp.reload(oslo.utils)
+ self.assertTrue(mock_warn.called)
+ args = mock_warn.call_args
+ self.assertIn('oslo_utils', args[0][0])
+ self.assertIn('deprecated', args[0][0])
+ self.assertTrue(issubclass(args[0][1], DeprecationWarning))
+
+ def test_real_warning(self):
+ with warnings.catch_warnings(record=True) as warning_msgs:
+ warnings.resetwarnings()
+ warnings.simplefilter('always', DeprecationWarning)
+ import oslo.utils
+
+ # Use a separate function to get the stack level correct
+ # so we know the message points back to this file. This
+ # corresponds to an import or reload, which isn't working
+ # inside the test under Python 3.3. That may be due to a
+ # difference in the import implementation not triggering
+ # warnings properly when the module is reloaded, or
+ # because the warnings module is mostly implemented in C
+ # and something isn't cleanly resetting the global state
+ # used to track whether a warning needs to be
+ # emitted. Whatever the cause, we definitely see the
+ # warnings.warn() being invoked on a reload (see the test
+ # above) and warnings are reported on the console when we
+ # run the tests. A simpler test script run outside of
+ # testr does correctly report the warnings.
+ def foo():
+ oslo.utils.deprecated()
+
+ foo()
+ self.assertEqual(1, len(warning_msgs))
+ msg = warning_msgs[0]
+ self.assertIn('oslo_utils', six.text_type(msg.message))
+ self.assertEqual('test_warning.py', os.path.basename(msg.filename))
diff --git a/tox.ini b/tox.ini
index 5669b16..1baed4c 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,6 +1,6 @@
[tox]
minversion = 1.6
-envlist = py26,py27,py33,py34,pypy,pep8
+envlist = py33,py34,py26,py27,pep8
# NOTE(dhellmann): We cannot set skipdist=True
# for oslo libraries because of the namespace package.
#skipsdist = True