diff options
author | Davanum Srinivas <dims@linux.vnet.ibm.com> | 2014-08-25 21:24:50 -0400 |
---|---|---|
committer | Davanum Srinivas <dims@linux.vnet.ibm.com> | 2014-08-25 21:24:50 -0400 |
commit | 96b532f77634382a529e1c9d4dd72fb293937c1a (patch) | |
tree | 2ad34fb5e1f9d3845a7f8dbb95595711b6c531c3 | |
parent | 81e3337d77394ae20807e734c39dfce985268db8 (diff) | |
download | oslo-log-96b532f77634382a529e1c9d4dd72fb293937c1a.tar.gz |
Get py27 amd pep8 to work
63 files changed, 9935 insertions, 50 deletions
diff --git a/.testr.conf b/.testr.conf index 19721fc..a9fa8bf 100644 --- a/.testr.conf +++ b/.testr.conf @@ -2,6 +2,6 @@ test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} \ OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} \ OS_TEST_TIMEOUT=${OS_TEST_TIMEOUT:-60} \ - ${PYTHON:-python} -m subunit.run discover -t ./ ./tests $LISTOPT $IDOPTION + ${PYTHON:-python} -m subunit.run discover -t ./ ./tests/ $LISTOPT $IDOPTION test_id_option=--load-list $IDFILE -test_list_option=--list
\ No newline at end of file +test_list_option=--list diff --git a/openstack-common.conf b/openstack-common.conf index 4ec5177..0b190f7 100644 --- a/openstack-common.conf +++ b/openstack-common.conf @@ -1,5 +1,7 @@ [DEFAULT] +modules=importutils,gettextutils,jsonutils,fileutils,log_handler,notifier,fixture.config + # The list of modules to copy from oslo-incubator.git script = tools/run_cross_tests.sh diff --git a/oslo/log/fixture/__init__.py b/oslo/log/fixture/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/oslo/log/fixture/__init__.py diff --git a/oslo/log/log.py b/oslo/log/log.py index b3db8e5..7198938 100644 --- a/oslo/log/log.py +++ b/oslo/log/log.py @@ -43,13 +43,10 @@ from six import moves _PY26 = sys.version_info[0:2] == (2, 6) -from openstack.common.gettextutils import _ -from openstack.common import importutils -from openstack.common import jsonutils -from openstack.common import local -# NOTE(flaper87): Pls, remove when graduating this module -# from the incubator. -from openstack.common.strutils import mask_password # noqa +from oslo.log import local +from oslo.log.openstack.common.gettextutils import _ +from oslo.log.openstack.common import importutils +from oslo.log.openstack.common import jsonutils _DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S" @@ -503,7 +500,7 @@ def _setup_logging_from_conf(project, version): if CONF.publish_errors: try: handler = importutils.import_object( - "openstack.common.log_handler.PublishErrorsHandler", + "oslo.log.openstack.common.log_handler.PublishErrorsHandler", logging.ERROR) except ImportError: handler = importutils.import_object( diff --git a/oslo/log/openstack/__init__.py b/oslo/log/openstack/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/oslo/log/openstack/__init__.py diff --git a/oslo/log/openstack/common/__init__.py b/oslo/log/openstack/common/__init__.py new file mode 100644 index 0000000..d1223ea --- /dev/null +++ b/oslo/log/openstack/common/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import six + + +six.add_move(six.MovedModule('mox', 'mox', 'mox3.mox')) diff --git a/oslo/log/openstack/common/context.py b/oslo/log/openstack/common/context.py new file mode 100644 index 0000000..b612db7 --- /dev/null +++ b/oslo/log/openstack/common/context.py @@ -0,0 +1,126 @@ +# Copyright 2011 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Simple class that stores security context information in the web request. + +Projects should subclass this class if they wish to enhance the request +context or provide additional information in their specific WSGI pipeline. +""" + +import itertools +import uuid + + +def generate_request_id(): + return b'req-' + str(uuid.uuid4()).encode('ascii') + + +class RequestContext(object): + + """Helper class to represent useful information about a request context. + + Stores information about the security context under which the user + accesses the system, as well as additional request information. + """ + + user_idt_format = '{user} {tenant} {domain} {user_domain} {p_domain}' + + def __init__(self, auth_token=None, user=None, tenant=None, domain=None, + user_domain=None, project_domain=None, is_admin=False, + read_only=False, show_deleted=False, request_id=None, + instance_uuid=None): + self.auth_token = auth_token + self.user = user + self.tenant = tenant + self.domain = domain + self.user_domain = user_domain + self.project_domain = project_domain + self.is_admin = is_admin + self.read_only = read_only + self.show_deleted = show_deleted + self.instance_uuid = instance_uuid + if not request_id: + request_id = generate_request_id() + self.request_id = request_id + + def to_dict(self): + user_idt = ( + self.user_idt_format.format(user=self.user or '-', + tenant=self.tenant or '-', + domain=self.domain or '-', + user_domain=self.user_domain or '-', + p_domain=self.project_domain or '-')) + + return {'user': self.user, + 'tenant': self.tenant, + 'domain': self.domain, + 'user_domain': self.user_domain, + 'project_domain': self.project_domain, + 'is_admin': self.is_admin, + 'read_only': self.read_only, + 'show_deleted': self.show_deleted, + 'auth_token': self.auth_token, + 'request_id': self.request_id, + 'instance_uuid': self.instance_uuid, + 'user_identity': user_idt} + + @classmethod + def from_dict(cls, ctx): + return cls( + auth_token=ctx.get("auth_token"), + user=ctx.get("user"), + tenant=ctx.get("tenant"), + domain=ctx.get("domain"), + user_domain=ctx.get("user_domain"), + project_domain=ctx.get("project_domain"), + is_admin=ctx.get("is_admin", False), + read_only=ctx.get("read_only", False), + show_deleted=ctx.get("show_deleted", False), + request_id=ctx.get("request_id"), + instance_uuid=ctx.get("instance_uuid")) + + +def get_admin_context(show_deleted=False): + context = RequestContext(None, + tenant=None, + is_admin=True, + show_deleted=show_deleted) + return context + + +def get_context_from_function_and_args(function, args, kwargs): + """Find an arg of type RequestContext and return it. + + This is useful in a couple of decorators where we don't + know much about the function we're wrapping. + """ + + for arg in itertools.chain(kwargs.values(), args): + if isinstance(arg, RequestContext): + return arg + + return None + + +def is_user_context(context): + """Indicates if the request context is a normal user.""" + if not context: + return False + if context.is_admin: + return False + if not context.user_id or not context.project_id: + return False + return True diff --git a/oslo/log/openstack/common/eventlet_backdoor.py b/oslo/log/openstack/common/eventlet_backdoor.py new file mode 100644 index 0000000..c9e220f --- /dev/null +++ b/oslo/log/openstack/common/eventlet_backdoor.py @@ -0,0 +1,145 @@ +# Copyright (c) 2012 OpenStack Foundation. +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from __future__ import print_function + +import errno +import gc +import os +import pprint +import socket +import sys +import traceback + +import eventlet +import eventlet.backdoor +import greenlet +from oslo.config import cfg + +from oslo.log.openstack.common.gettextutils import _LI +from oslo.log.openstack.common import log as logging + +help_for_backdoor_port = ( + "Acceptable values are 0, <port>, and <start>:<end>, where 0 results " + "in listening on a random tcp port number; <port> results in listening " + "on the specified port number (and not enabling backdoor if that port " + "is in use); and <start>:<end> results in listening on the smallest " + "unused port number within the specified range of port numbers. The " + "chosen port is displayed in the service's log file.") +eventlet_backdoor_opts = [ + cfg.StrOpt('backdoor_port', + help="Enable eventlet backdoor. %s" % help_for_backdoor_port) +] + +CONF = cfg.CONF +CONF.register_opts(eventlet_backdoor_opts) +LOG = logging.getLogger(__name__) + + +class EventletBackdoorConfigValueError(Exception): + def __init__(self, port_range, help_msg, ex): + msg = ('Invalid backdoor_port configuration %(range)s: %(ex)s. ' + '%(help)s' % + {'range': port_range, 'ex': ex, 'help': help_msg}) + super(EventletBackdoorConfigValueError, self).__init__(msg) + self.port_range = port_range + + +def _dont_use_this(): + print("Don't use this, just disconnect instead") + + +def _find_objects(t): + return [o for o in gc.get_objects() if isinstance(o, t)] + + +def _print_greenthreads(): + for i, gt in enumerate(_find_objects(greenlet.greenlet)): + print(i, gt) + traceback.print_stack(gt.gr_frame) + print() + + +def _print_nativethreads(): + for threadId, stack in sys._current_frames().items(): + print(threadId) + traceback.print_stack(stack) + print() + + +def _parse_port_range(port_range): + if ':' not in port_range: + start, end = port_range, port_range + else: + start, end = port_range.split(':', 1) + try: + start, end = int(start), int(end) + if end < start: + raise ValueError + return start, end + except ValueError as ex: + raise EventletBackdoorConfigValueError(port_range, ex, + help_for_backdoor_port) + + +def _listen(host, start_port, end_port, listen_func): + try_port = start_port + while True: + try: + return listen_func((host, try_port)) + except socket.error as exc: + if (exc.errno != errno.EADDRINUSE or + try_port >= end_port): + raise + try_port += 1 + + +def initialize_if_enabled(): + backdoor_locals = { + 'exit': _dont_use_this, # So we don't exit the entire process + 'quit': _dont_use_this, # So we don't exit the entire process + 'fo': _find_objects, + 'pgt': _print_greenthreads, + 'pnt': _print_nativethreads, + } + + if CONF.backdoor_port is None: + return None + + start_port, end_port = _parse_port_range(str(CONF.backdoor_port)) + + # NOTE(johannes): The standard sys.displayhook will print the value of + # the last expression and set it to __builtin__._, which overwrites + # the __builtin__._ that gettext sets. Let's switch to using pprint + # since it won't interact poorly with gettext, and it's easier to + # read the output too. + def displayhook(val): + if val is not None: + pprint.pprint(val) + sys.displayhook = displayhook + + sock = _listen('localhost', start_port, end_port, eventlet.listen) + + # In the case of backdoor port being zero, a port number is assigned by + # listen(). In any case, pull the port number out here. + port = sock.getsockname()[1] + LOG.info( + _LI('Eventlet backdoor listening on %(port)s for process %(pid)d') % + {'port': port, 'pid': os.getpid()} + ) + eventlet.spawn_n(eventlet.backdoor.backdoor_server, sock, + locals=backdoor_locals) + return port diff --git a/oslo/log/openstack/common/excutils.py b/oslo/log/openstack/common/excutils.py new file mode 100644 index 0000000..87c9251 --- /dev/null +++ b/oslo/log/openstack/common/excutils.py @@ -0,0 +1,113 @@ +# Copyright 2011 OpenStack Foundation. +# Copyright 2012, Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Exception related utilities. +""" + +import logging +import sys +import time +import traceback + +import six + +from oslo.log.openstack.common.gettextutils import _LE + + +class save_and_reraise_exception(object): + """Save current exception, run some code and then re-raise. + + In some cases the exception context can be cleared, resulting in None + being attempted to be re-raised after an exception handler is run. This + can happen when eventlet switches greenthreads or when running an + exception handler, code raises and catches an exception. In both + cases the exception context will be cleared. + + To work around this, we save the exception state, run handler code, and + then re-raise the original exception. If another exception occurs, the + saved exception is logged and the new exception is re-raised. + + In some cases the caller may not want to re-raise the exception, and + for those circumstances this context provides a reraise flag that + can be used to suppress the exception. For example:: + + except Exception: + with save_and_reraise_exception() as ctxt: + decide_if_need_reraise() + if not should_be_reraised: + ctxt.reraise = False + + If another exception occurs and reraise flag is False, + the saved exception will not be logged. + + If the caller wants to raise new exception during exception handling + he/she sets reraise to False initially with an ability to set it back to + True if needed:: + + except Exception: + with save_and_reraise_exception(reraise=False) as ctxt: + [if statements to determine whether to raise a new exception] + # Not raising a new exception, so reraise + ctxt.reraise = True + """ + def __init__(self, reraise=True): + self.reraise = reraise + + def __enter__(self): + self.type_, self.value, self.tb, = sys.exc_info() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if exc_type is not None: + if self.reraise: + logging.error(_LE('Original exception being dropped: %s'), + traceback.format_exception(self.type_, + self.value, + self.tb)) + return False + if self.reraise: + six.reraise(self.type_, self.value, self.tb) + + +def forever_retry_uncaught_exceptions(infunc): + def inner_func(*args, **kwargs): + last_log_time = 0 + last_exc_message = None + exc_count = 0 + while True: + try: + return infunc(*args, **kwargs) + except Exception as exc: + this_exc_message = six.u(str(exc)) + if this_exc_message == last_exc_message: + exc_count += 1 + else: + exc_count = 1 + # Do not log any more frequently than once a minute unless + # the exception message changes + cur_time = int(time.time()) + if (cur_time - last_log_time > 60 or + this_exc_message != last_exc_message): + logging.exception( + _LE('Unexpected exception occurred %d time(s)... ' + 'retrying.') % exc_count) + last_log_time = cur_time + last_exc_message = this_exc_message + exc_count = 0 + # This should be a very rare event. In case it isn't, do + # a sleep. + time.sleep(1) + return inner_func diff --git a/oslo/log/openstack/common/fileutils.py b/oslo/log/openstack/common/fileutils.py new file mode 100644 index 0000000..cb63233 --- /dev/null +++ b/oslo/log/openstack/common/fileutils.py @@ -0,0 +1,146 @@ +# Copyright 2011 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import contextlib +import errno +import os +import tempfile + +from oslo.log.openstack.common import excutils +from oslo.log.openstack.common import log as logging + +LOG = logging.getLogger(__name__) + +_FILE_CACHE = {} + + +def ensure_tree(path): + """Create a directory (and any ancestor directories required) + + :param path: Directory to create + """ + try: + os.makedirs(path) + except OSError as exc: + if exc.errno == errno.EEXIST: + if not os.path.isdir(path): + raise + else: + raise + + +def read_cached_file(filename, force_reload=False): + """Read from a file if it has been modified. + + :param force_reload: Whether to reload the file. + :returns: A tuple with a boolean specifying if the data is fresh + or not. + """ + global _FILE_CACHE + + if force_reload: + delete_cached_file(filename) + + reloaded = False + mtime = os.path.getmtime(filename) + cache_info = _FILE_CACHE.setdefault(filename, {}) + + if not cache_info or mtime > cache_info.get('mtime', 0): + LOG.debug("Reloading cached file %s" % filename) + with open(filename) as fap: + cache_info['data'] = fap.read() + cache_info['mtime'] = mtime + reloaded = True + return (reloaded, cache_info['data']) + + +def delete_cached_file(filename): + """Delete cached file if present. + + :param filename: filename to delete + """ + global _FILE_CACHE + + if filename in _FILE_CACHE: + del _FILE_CACHE[filename] + + +def delete_if_exists(path, remove=os.unlink): + """Delete a file, but ignore file not found error. + + :param path: File to delete + :param remove: Optional function to remove passed path + """ + + try: + remove(path) + except OSError as e: + if e.errno != errno.ENOENT: + raise + + +@contextlib.contextmanager +def remove_path_on_error(path, remove=delete_if_exists): + """Protect code that wants to operate on PATH atomically. + Any exception will cause PATH to be removed. + + :param path: File to work with + :param remove: Optional function to remove passed path + """ + + try: + yield + except Exception: + with excutils.save_and_reraise_exception(): + remove(path) + + +def file_open(*args, **kwargs): + """Open file + + see built-in open() documentation for more details + + Note: The reason this is kept in a separate module is to easily + be able to provide a stub module that doesn't alter system + state at all (for unit tests) + """ + return open(*args, **kwargs) + + +def write_to_tempfile(content, path=None, suffix='', prefix='tmp'): + """Create temporary file or use existing file. + + This util is needed for creating temporary file with + specified content, suffix and prefix. If path is not None, + it will be used for writing content. If the path doesn't + exist it'll be created. + + :param content: content for temporary file. + :param path: same as parameter 'dir' for mkstemp + :param suffix: same as parameter 'suffix' for mkstemp + :param prefix: same as parameter 'prefix' for mkstemp + + For example: it can be used in database tests for creating + configuration files. + """ + if path: + ensure_tree(path) + + (fd, path) = tempfile.mkstemp(suffix=suffix, dir=path, prefix=prefix) + try: + os.write(fd, content) + finally: + os.close(fd) + return path diff --git a/oslo/log/openstack/common/fixture/__init__.py b/oslo/log/openstack/common/fixture/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/oslo/log/openstack/common/fixture/__init__.py diff --git a/oslo/log/openstack/common/fixture/config.py b/oslo/log/openstack/common/fixture/config.py new file mode 100644 index 0000000..9489b85 --- /dev/null +++ b/oslo/log/openstack/common/fixture/config.py @@ -0,0 +1,85 @@ +# +# Copyright 2013 Mirantis, Inc. +# Copyright 2013 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import fixtures +from oslo.config import cfg +import six + + +class Config(fixtures.Fixture): + """Allows overriding configuration settings for the test. + + `conf` will be reset on cleanup. + + """ + + def __init__(self, conf=cfg.CONF): + self.conf = conf + + def setUp(self): + super(Config, self).setUp() + # NOTE(morganfainberg): unregister must be added to cleanup before + # reset is because cleanup works in reverse order of registered items, + # and a reset must occur before unregistering options can occur. + self.addCleanup(self._unregister_config_opts) + self.addCleanup(self.conf.reset) + self._registered_config_opts = {} + + def config(self, **kw): + """Override configuration values. + + The keyword arguments are the names of configuration options to + override and their values. + + If a `group` argument is supplied, the overrides are applied to + the specified configuration option group, otherwise the overrides + are applied to the ``default`` group. + + """ + + group = kw.pop('group', None) + for k, v in six.iteritems(kw): + self.conf.set_override(k, v, group) + + def _unregister_config_opts(self): + for group in self._registered_config_opts: + self.conf.unregister_opts(self._registered_config_opts[group], + group=group) + + def register_opt(self, opt, group=None): + """Register a single option for the test run. + + Options registered in this manner will automatically be unregistered + during cleanup. + + If a `group` argument is supplied, it will register the new option + to that group, otherwise the option is registered to the ``default`` + group. + """ + self.conf.register_opt(opt, group=group) + self._registered_config_opts.setdefault(group, set()).add(opt) + + def register_opts(self, opts, group=None): + """Register multiple options for the test run. + + This works in the same manner as register_opt() but takes a list of + options as the first argument. All arguments will be registered to the + same group if the ``group`` argument is supplied, otherwise all options + will be registered to the ``default`` group. + """ + for opt in opts: + self.register_opt(opt, group=group) diff --git a/oslo/log/openstack/common/gettextutils.py b/oslo/log/openstack/common/gettextutils.py new file mode 100644 index 0000000..8811dfe --- /dev/null +++ b/oslo/log/openstack/common/gettextutils.py @@ -0,0 +1,479 @@ +# Copyright 2012 Red Hat, Inc. +# Copyright 2013 IBM Corp. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +gettext for openstack-common modules. + +Usual usage in an openstack.common module: + + from oslo.log.openstack.common.gettextutils import _ +""" + +import copy +import gettext +import locale +from logging import handlers +import os + +from babel import localedata +import six + +_AVAILABLE_LANGUAGES = {} + +# FIXME(dhellmann): Remove this when moving to oslo.i18n. +USE_LAZY = False + + +class TranslatorFactory(object): + """Create translator functions + """ + + def __init__(self, domain, localedir=None): + """Establish a set of translation functions for the domain. + + :param domain: Name of translation domain, + specifying a message catalog. + :type domain: str + :param lazy: Delays translation until a message is emitted. + Defaults to False. + :type lazy: Boolean + :param localedir: Directory with translation catalogs. + :type localedir: str + """ + self.domain = domain + if localedir is None: + localedir = os.environ.get(domain.upper() + '_LOCALEDIR') + self.localedir = localedir + + def _make_translation_func(self, domain=None): + """Return a new translation function ready for use. + + Takes into account whether or not lazy translation is being + done. + + The domain can be specified to override the default from the + factory, but the localedir from the factory is always used + because we assume the log-level translation catalogs are + installed in the same directory as the main application + catalog. + + """ + if domain is None: + domain = self.domain + t = gettext.translation(domain, + localedir=self.localedir, + fallback=True) + # Use the appropriate method of the translation object based + # on the python version. + m = t.gettext if six.PY3 else t.ugettext + + def f(msg): + """oslo.i18n.gettextutils translation function.""" + if USE_LAZY: + return Message(msg, domain=domain) + return m(msg) + return f + + @property + def primary(self): + "The default translation function." + return self._make_translation_func() + + def _make_log_translation_func(self, level): + return self._make_translation_func(self.domain + '-log-' + level) + + @property + def log_info(self): + "Translate info-level log messages." + return self._make_log_translation_func('info') + + @property + def log_warning(self): + "Translate warning-level log messages." + return self._make_log_translation_func('warning') + + @property + def log_error(self): + "Translate error-level log messages." + return self._make_log_translation_func('error') + + @property + def log_critical(self): + "Translate critical-level log messages." + return self._make_log_translation_func('critical') + + +# NOTE(dhellmann): When this module moves out of the incubator into +# oslo.i18n, these global variables can be moved to an integration +# module within each application. + +# Create the global translation functions. +_translators = TranslatorFactory('oslo.log') + +# The primary translation function using the well-known name "_" +_ = _translators.primary + +# Translators for log levels. +# +# The abbreviated names are meant to reflect the usual use of a short +# name like '_'. The "L" is for "log" and the other letter comes from +# the level. +_LI = _translators.log_info +_LW = _translators.log_warning +_LE = _translators.log_error +_LC = _translators.log_critical + +# NOTE(dhellmann): End of globals that will move to the application's +# integration module. + + +def enable_lazy(): + """Convenience function for configuring _() to use lazy gettext + + Call this at the start of execution to enable the gettextutils._ + function to use lazy gettext functionality. This is useful if + your project is importing _ directly instead of using the + gettextutils.install() way of importing the _ function. + """ + global USE_LAZY + USE_LAZY = True + + +def install(domain): + """Install a _() function using the given translation domain. + + Given a translation domain, install a _() function using gettext's + install() function. + + The main difference from gettext.install() is that we allow + overriding the default localedir (e.g. /usr/share/locale) using + a translation-domain-specific environment variable (e.g. + NOVA_LOCALEDIR). + + Note that to enable lazy translation, enable_lazy must be + called. + + :param domain: the translation domain + """ + from six import moves + tf = TranslatorFactory(domain) + moves.builtins.__dict__['_'] = tf.primary + + +class Message(six.text_type): + """A Message object is a unicode object that can be translated. + + Translation of Message is done explicitly using the translate() method. + For all non-translation intents and purposes, a Message is simply unicode, + and can be treated as such. + """ + + def __new__(cls, msgid, msgtext=None, params=None, + domain='oslo.log', *args): + """Create a new Message object. + + In order for translation to work gettext requires a message ID, this + msgid will be used as the base unicode text. It is also possible + for the msgid and the base unicode text to be different by passing + the msgtext parameter. + """ + # If the base msgtext is not given, we use the default translation + # of the msgid (which is in English) just in case the system locale is + # not English, so that the base text will be in that locale by default. + if not msgtext: + msgtext = Message._translate_msgid(msgid, domain) + # We want to initialize the parent unicode with the actual object that + # would have been plain unicode if 'Message' was not enabled. + msg = super(Message, cls).__new__(cls, msgtext) + msg.msgid = msgid + msg.domain = domain + msg.params = params + return msg + + def translate(self, desired_locale=None): + """Translate this message to the desired locale. + + :param desired_locale: The desired locale to translate the message to, + if no locale is provided the message will be + translated to the system's default locale. + + :returns: the translated message in unicode + """ + + translated_message = Message._translate_msgid(self.msgid, + self.domain, + desired_locale) + if self.params is None: + # No need for more translation + return translated_message + + # This Message object may have been formatted with one or more + # Message objects as substitution arguments, given either as a single + # argument, part of a tuple, or as one or more values in a dictionary. + # When translating this Message we need to translate those Messages too + translated_params = _translate_args(self.params, desired_locale) + + translated_message = translated_message % translated_params + + return translated_message + + @staticmethod + def _translate_msgid(msgid, domain, desired_locale=None): + if not desired_locale: + system_locale = locale.getdefaultlocale() + # If the system locale is not available to the runtime use English + if not system_locale[0]: + desired_locale = 'en_US' + else: + desired_locale = system_locale[0] + + locale_dir = os.environ.get(domain.upper() + '_LOCALEDIR') + lang = gettext.translation(domain, + localedir=locale_dir, + languages=[desired_locale], + fallback=True) + if six.PY3: + translator = lang.gettext + else: + translator = lang.ugettext + + translated_message = translator(msgid) + return translated_message + + def __mod__(self, other): + # When we mod a Message we want the actual operation to be performed + # by the parent class (i.e. unicode()), the only thing we do here is + # save the original msgid and the parameters in case of a translation + params = self._sanitize_mod_params(other) + unicode_mod = super(Message, self).__mod__(params) + modded = Message(self.msgid, + msgtext=unicode_mod, + params=params, + domain=self.domain) + return modded + + def _sanitize_mod_params(self, other): + """Sanitize the object being modded with this Message. + + - Add support for modding 'None' so translation supports it + - Trim the modded object, which can be a large dictionary, to only + those keys that would actually be used in a translation + - Snapshot the object being modded, in case the message is + translated, it will be used as it was when the Message was created + """ + if other is None: + params = (other,) + elif isinstance(other, dict): + # Merge the dictionaries + # Copy each item in case one does not support deep copy. + params = {} + if isinstance(self.params, dict): + for key, val in self.params.items(): + params[key] = self._copy_param(val) + for key, val in other.items(): + params[key] = self._copy_param(val) + else: + params = self._copy_param(other) + return params + + def _copy_param(self, param): + try: + return copy.deepcopy(param) + except Exception: + # Fallback to casting to unicode this will handle the + # python code-like objects that can't be deep-copied + return six.text_type(param) + + def __add__(self, other): + msg = _('Message objects do not support addition.') + raise TypeError(msg) + + def __radd__(self, other): + return self.__add__(other) + + if six.PY2: + def __str__(self): + # NOTE(luisg): Logging in python 2.6 tries to str() log records, + # and it expects specifically a UnicodeError in order to proceed. + msg = _('Message objects do not support str() because they may ' + 'contain non-ascii characters. ' + 'Please use unicode() or translate() instead.') + raise UnicodeError(msg) + + +def get_available_languages(domain): + """Lists the available languages for the given translation domain. + + :param domain: the domain to get languages for + """ + if domain in _AVAILABLE_LANGUAGES: + return copy.copy(_AVAILABLE_LANGUAGES[domain]) + + localedir = '%s_LOCALEDIR' % domain.upper() + find = lambda x: gettext.find(domain, + localedir=os.environ.get(localedir), + languages=[x]) + + # NOTE(mrodden): en_US should always be available (and first in case + # order matters) since our in-line message strings are en_US + language_list = ['en_US'] + # NOTE(luisg): Babel <1.0 used a function called list(), which was + # renamed to locale_identifiers() in >=1.0, the requirements master list + # requires >=0.9.6, uncapped, so defensively work with both. We can remove + # this check when the master list updates to >=1.0, and update all projects + list_identifiers = (getattr(localedata, 'list', None) or + getattr(localedata, 'locale_identifiers')) + locale_identifiers = list_identifiers() + + for i in locale_identifiers: + if find(i) is not None: + language_list.append(i) + + # NOTE(luisg): Babel>=1.0,<1.3 has a bug where some OpenStack supported + # locales (e.g. 'zh_CN', and 'zh_TW') aren't supported even though they + # are perfectly legitimate locales: + # https://github.com/mitsuhiko/babel/issues/37 + # In Babel 1.3 they fixed the bug and they support these locales, but + # they are still not explicitly "listed" by locale_identifiers(). + # That is why we add the locales here explicitly if necessary so that + # they are listed as supported. + aliases = {'zh': 'zh_CN', + 'zh_Hant_HK': 'zh_HK', + 'zh_Hant': 'zh_TW', + 'fil': 'tl_PH'} + for (locale_, alias) in six.iteritems(aliases): + if locale_ in language_list and alias not in language_list: + language_list.append(alias) + + _AVAILABLE_LANGUAGES[domain] = language_list + return copy.copy(language_list) + + +def translate(obj, desired_locale=None): + """Gets the translated unicode representation of the given object. + + If the object is not translatable it is returned as-is. + If the locale is None the object is translated to the system locale. + + :param obj: the object to translate + :param desired_locale: the locale to translate the message to, if None the + default system locale will be used + :returns: the translated object in unicode, or the original object if + it could not be translated + """ + message = obj + if not isinstance(message, Message): + # If the object to translate is not already translatable, + # let's first get its unicode representation + message = six.text_type(obj) + if isinstance(message, Message): + # Even after unicoding() we still need to check if we are + # running with translatable unicode before translating + return message.translate(desired_locale) + return obj + + +def _translate_args(args, desired_locale=None): + """Translates all the translatable elements of the given arguments object. + + This method is used for translating the translatable values in method + arguments which include values of tuples or dictionaries. + If the object is not a tuple or a dictionary the object itself is + translated if it is translatable. + + If the locale is None the object is translated to the system locale. + + :param args: the args to translate + :param desired_locale: the locale to translate the args to, if None the + default system locale will be used + :returns: a new args object with the translated contents of the original + """ + if isinstance(args, tuple): + return tuple(translate(v, desired_locale) for v in args) + if isinstance(args, dict): + translated_dict = {} + for (k, v) in six.iteritems(args): + translated_v = translate(v, desired_locale) + translated_dict[k] = translated_v + return translated_dict + return translate(args, desired_locale) + + +class TranslationHandler(handlers.MemoryHandler): + """Handler that translates records before logging them. + + The TranslationHandler takes a locale and a target logging.Handler object + to forward LogRecord objects to after translating them. This handler + depends on Message objects being logged, instead of regular strings. + + The handler can be configured declaratively in the logging.conf as follows: + + [handlers] + keys = translatedlog, translator + + [handler_translatedlog] + class = handlers.WatchedFileHandler + args = ('/var/log/api-localized.log',) + formatter = context + + [handler_translator] + class = openstack.common.log.TranslationHandler + target = translatedlog + args = ('zh_CN',) + + If the specified locale is not available in the system, the handler will + log in the default locale. + """ + + def __init__(self, locale=None, target=None): + """Initialize a TranslationHandler + + :param locale: locale to use for translating messages + :param target: logging.Handler object to forward + LogRecord objects to after translation + """ + # NOTE(luisg): In order to allow this handler to be a wrapper for + # other handlers, such as a FileHandler, and still be able to + # configure it using logging.conf, this handler has to extend + # MemoryHandler because only the MemoryHandlers' logging.conf + # parsing is implemented such that it accepts a target handler. + handlers.MemoryHandler.__init__(self, capacity=0, target=target) + self.locale = locale + + def setFormatter(self, fmt): + self.target.setFormatter(fmt) + + def emit(self, record): + # We save the message from the original record to restore it + # after translation, so other handlers are not affected by this + original_msg = record.msg + original_args = record.args + + try: + self._translate_and_log_record(record) + finally: + record.msg = original_msg + record.args = original_args + + def _translate_and_log_record(self, record): + record.msg = translate(record.msg, self.locale) + + # In addition to translating the message, we also need to translate + # arguments that were passed to the log method that were not part + # of the main message e.g., log.info(_('Some message %s'), this_one)) + record.args = _translate_args(record.args, self.locale) + + self.target.emit(record) diff --git a/oslo/log/openstack/common/importutils.py b/oslo/log/openstack/common/importutils.py new file mode 100644 index 0000000..8242e2b --- /dev/null +++ b/oslo/log/openstack/common/importutils.py @@ -0,0 +1,73 @@ +# Copyright 2011 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Import related utilities and helper functions. +""" + +import sys +import traceback + + +def import_class(import_str): + """Returns a class from a string including module and class.""" + mod_str, _sep, class_str = import_str.rpartition('.') + __import__(mod_str) + try: + return getattr(sys.modules[mod_str], class_str) + except AttributeError: + raise ImportError('Class %s cannot be found (%s)' % + (class_str, + traceback.format_exception(*sys.exc_info()))) + + +def import_object(import_str, *args, **kwargs): + """Import a class and return an instance of it.""" + return import_class(import_str)(*args, **kwargs) + + +def import_object_ns(name_space, import_str, *args, **kwargs): + """Tries to import object from default namespace. + + Imports a class and return an instance of it, first by trying + to find the class in a default namespace, then failing back to + a full path if not found in the default namespace. + """ + import_value = "%s.%s" % (name_space, import_str) + try: + return import_class(import_value)(*args, **kwargs) + except ImportError: + return import_class(import_str)(*args, **kwargs) + + +def import_module(import_str): + """Import a module.""" + __import__(import_str) + return sys.modules[import_str] + + +def import_versioned_module(version, submodule=None): + module = 'oslo.log.v%s' % version + if submodule: + module = '.'.join((module, submodule)) + return import_module(module) + + +def try_import(import_str, default=None): + """Try to import a module and if it fails return default.""" + try: + return import_module(import_str) + except ImportError: + return default diff --git a/oslo/log/openstack/common/jsonutils.py b/oslo/log/openstack/common/jsonutils.py new file mode 100644 index 0000000..90e397d --- /dev/null +++ b/oslo/log/openstack/common/jsonutils.py @@ -0,0 +1,196 @@ +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# Copyright 2011 Justin Santa Barbara +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +''' +JSON related utilities. + +This module provides a few things: + + 1) A handy function for getting an object down to something that can be + JSON serialized. See to_primitive(). + + 2) Wrappers around loads() and dumps(). The dumps() wrapper will + automatically use to_primitive() for you if needed. + + 3) This sets up anyjson to use the loads() and dumps() wrappers if anyjson + is available. +''' + + +import codecs +import datetime +import functools +import inspect +import itertools +import sys + +is_simplejson = False +if sys.version_info < (2, 7): + # On Python <= 2.6, json module is not C boosted, so try to use + # simplejson module if available + try: + import simplejson as json + is_simplejson = True + except ImportError: + import json +else: + import json + +import six +import six.moves.xmlrpc_client as xmlrpclib + +from oslo.log.openstack.common import gettextutils +from oslo.log.openstack.common import importutils +from oslo.log.openstack.common import strutils +from oslo.log.openstack.common import timeutils + +netaddr = importutils.try_import("netaddr") + +_nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod, + inspect.isfunction, inspect.isgeneratorfunction, + inspect.isgenerator, inspect.istraceback, inspect.isframe, + inspect.iscode, inspect.isbuiltin, inspect.isroutine, + inspect.isabstract] + +_simple_types = (six.string_types + six.integer_types + + (type(None), bool, float)) + + +def to_primitive(value, convert_instances=False, convert_datetime=True, + level=0, max_depth=3): + """Convert a complex object into primitives. + + Handy for JSON serialization. We can optionally handle instances, + but since this is a recursive function, we could have cyclical + data structures. + + To handle cyclical data structures we could track the actual objects + visited in a set, but not all objects are hashable. Instead we just + track the depth of the object inspections and don't go too deep. + + Therefore, convert_instances=True is lossy ... be aware. + + """ + # handle obvious types first - order of basic types determined by running + # full tests on nova project, resulting in the following counts: + # 572754 <type 'NoneType'> + # 460353 <type 'int'> + # 379632 <type 'unicode'> + # 274610 <type 'str'> + # 199918 <type 'dict'> + # 114200 <type 'datetime.datetime'> + # 51817 <type 'bool'> + # 26164 <type 'list'> + # 6491 <type 'float'> + # 283 <type 'tuple'> + # 19 <type 'long'> + if isinstance(value, _simple_types): + return value + + if isinstance(value, datetime.datetime): + if convert_datetime: + return timeutils.strtime(value) + else: + return value + + # value of itertools.count doesn't get caught by nasty_type_tests + # and results in infinite loop when list(value) is called. + if type(value) == itertools.count: + return six.text_type(value) + + # FIXME(vish): Workaround for LP bug 852095. Without this workaround, + # tests that raise an exception in a mocked method that + # has a @wrap_exception with a notifier will fail. If + # we up the dependency to 0.5.4 (when it is released) we + # can remove this workaround. + if getattr(value, '__module__', None) == 'mox': + return 'mock' + + if level > max_depth: + return '?' + + # The try block may not be necessary after the class check above, + # but just in case ... + try: + recursive = functools.partial(to_primitive, + convert_instances=convert_instances, + convert_datetime=convert_datetime, + level=level, + max_depth=max_depth) + if isinstance(value, dict): + return dict((k, recursive(v)) for k, v in six.iteritems(value)) + elif isinstance(value, (list, tuple)): + return [recursive(lv) for lv in value] + + # It's not clear why xmlrpclib created their own DateTime type, but + # for our purposes, make it a datetime type which is explicitly + # handled + if isinstance(value, xmlrpclib.DateTime): + value = datetime.datetime(*tuple(value.timetuple())[:6]) + + if convert_datetime and isinstance(value, datetime.datetime): + return timeutils.strtime(value) + elif isinstance(value, gettextutils.Message): + return value.data + elif hasattr(value, 'iteritems'): + return recursive(dict(value.iteritems()), level=level + 1) + elif hasattr(value, '__iter__'): + return recursive(list(value)) + elif convert_instances and hasattr(value, '__dict__'): + # Likely an instance of something. Watch for cycles. + # Ignore class member vars. + return recursive(value.__dict__, level=level + 1) + elif netaddr and isinstance(value, netaddr.IPAddress): + return six.text_type(value) + else: + if any(test(value) for test in _nasty_type_tests): + return six.text_type(value) + return value + except TypeError: + # Class objects are tricky since they may define something like + # __iter__ defined but it isn't callable as list(). + return six.text_type(value) + + +def dumps(value, default=to_primitive, **kwargs): + if is_simplejson: + kwargs['namedtuple_as_object'] = False + return json.dumps(value, default=default, **kwargs) + + +def dump(obj, fp, *args, **kwargs): + if is_simplejson: + kwargs['namedtuple_as_object'] = False + return json.dump(obj, fp, *args, **kwargs) + + +def loads(s, encoding='utf-8', **kwargs): + return json.loads(strutils.safe_decode(s, encoding), **kwargs) + + +def load(fp, encoding='utf-8', **kwargs): + return json.load(codecs.getreader(encoding)(fp), **kwargs) + + +try: + import anyjson +except ImportError: + pass +else: + anyjson._modules.append((__name__, 'dumps', TypeError, + 'loads', ValueError, 'load')) + anyjson.force_implementation(__name__) diff --git a/oslo/log/openstack/common/local.py b/oslo/log/openstack/common/local.py new file mode 100644 index 0000000..0819d5b --- /dev/null +++ b/oslo/log/openstack/common/local.py @@ -0,0 +1,45 @@ +# Copyright 2011 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Local storage of variables using weak references""" + +import threading +import weakref + + +class WeakLocal(threading.local): + def __getattribute__(self, attr): + rval = super(WeakLocal, self).__getattribute__(attr) + if rval: + # NOTE(mikal): this bit is confusing. What is stored is a weak + # reference, not the value itself. We therefore need to lookup + # the weak reference and return the inner value here. + rval = rval() + return rval + + def __setattr__(self, attr, value): + value = weakref.ref(value) + return super(WeakLocal, self).__setattr__(attr, value) + + +# NOTE(mikal): the name "store" should be deprecated in the future +store = WeakLocal() + +# A "weak" store uses weak references and allows an object to fall out of scope +# when it falls out of scope in the code that uses the thread local storage. A +# "strong" store will hold a reference to the object so that it never falls out +# of scope. +weak_store = WeakLocal() +strong_store = threading.local() diff --git a/oslo/log/openstack/common/log.py b/oslo/log/openstack/common/log.py new file mode 100644 index 0000000..07a1f61 --- /dev/null +++ b/oslo/log/openstack/common/log.py @@ -0,0 +1,713 @@ +# Copyright 2011 OpenStack Foundation. +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""OpenStack logging handler. + +This module adds to logging functionality by adding the option to specify +a context object when calling the various log methods. If the context object +is not specified, default formatting is used. Additionally, an instance uuid +may be passed as part of the log message, which is intended to make it easier +for admins to find messages related to a specific instance. + +It also allows setting of formatting information through conf. + +""" + +import inspect +import itertools +import logging +import logging.config +import logging.handlers +import os +import socket +import sys +import traceback + +from oslo.config import cfg +import six +from six import moves + +_PY26 = sys.version_info[0:2] == (2, 6) + +from oslo.log.openstack.common.gettextutils import _ +from oslo.log.openstack.common import importutils +from oslo.log.openstack.common import jsonutils +from oslo.log.openstack.common import local +# NOTE(flaper87): Pls, remove when graduating this module +# from the incubator. +from oslo.log.openstack.common.strutils import mask_password # noqa + + +_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S" + + +common_cli_opts = [ + cfg.BoolOpt('debug', + short='d', + default=False, + help='Print debugging output (set logging level to ' + 'DEBUG instead of default WARNING level).'), + cfg.BoolOpt('verbose', + short='v', + default=False, + help='Print more verbose output (set logging level to ' + 'INFO instead of default WARNING level).'), +] + +logging_cli_opts = [ + cfg.StrOpt('log-config-append', + metavar='PATH', + deprecated_name='log-config', + help='The name of a logging configuration file. This file ' + 'is appended to any existing logging configuration ' + 'files. For details about logging configuration files, ' + 'see the Python logging module documentation.'), + cfg.StrOpt('log-format', + metavar='FORMAT', + help='DEPRECATED. ' + 'A logging.Formatter log message format string which may ' + 'use any of the available logging.LogRecord attributes. ' + 'This option is deprecated. Please use ' + 'logging_context_format_string and ' + 'logging_default_format_string instead.'), + cfg.StrOpt('log-date-format', + default=_DEFAULT_LOG_DATE_FORMAT, + metavar='DATE_FORMAT', + help='Format string for %%(asctime)s in log records. ' + 'Default: %(default)s .'), + cfg.StrOpt('log-file', + metavar='PATH', + deprecated_name='logfile', + help='(Optional) Name of log file to output to. ' + 'If no default is set, logging will go to stdout.'), + cfg.StrOpt('log-dir', + deprecated_name='logdir', + help='(Optional) The base directory used for relative ' + '--log-file paths.'), + cfg.BoolOpt('use-syslog', + default=False, + help='Use syslog for logging. ' + 'Existing syslog format is DEPRECATED during I, ' + 'and will change in J to honor RFC5424.'), + cfg.BoolOpt('use-syslog-rfc-format', + # TODO(bogdando) remove or use True after existing + # syslog format deprecation in J + default=False, + help='(Optional) Enables or disables syslog rfc5424 format ' + 'for logging. If enabled, prefixes the MSG part of the ' + 'syslog message with APP-NAME (RFC5424). The ' + 'format without the APP-NAME is deprecated in I, ' + 'and will be removed in J.'), + cfg.StrOpt('syslog-log-facility', + default='LOG_USER', + help='Syslog facility to receive log lines.') +] + +generic_log_opts = [ + cfg.BoolOpt('use_stderr', + default=True, + help='Log output to standard error.') +] + +DEFAULT_LOG_LEVELS = ['amqp=WARN', 'amqplib=WARN', 'boto=WARN', + 'qpid=WARN', 'sqlalchemy=WARN', 'suds=INFO', + 'oslo.messaging=INFO', 'iso8601=WARN', + 'requests.packages.urllib3.connectionpool=WARN', + 'urllib3.connectionpool=WARN', 'websocket=WARN', + "keystonemiddleware=WARN", "routes.middleware=WARN", + "stevedore=WARN"] + +log_opts = [ + cfg.StrOpt('logging_context_format_string', + default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s ' + '%(name)s [%(request_id)s %(user_identity)s] ' + '%(instance)s%(message)s', + help='Format string to use for log messages with context.'), + cfg.StrOpt('logging_default_format_string', + default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s ' + '%(name)s [-] %(instance)s%(message)s', + help='Format string to use for log messages without context.'), + cfg.StrOpt('logging_debug_format_suffix', + default='%(funcName)s %(pathname)s:%(lineno)d', + help='Data to append to log format when level is DEBUG.'), + cfg.StrOpt('logging_exception_prefix', + default='%(asctime)s.%(msecs)03d %(process)d TRACE %(name)s ' + '%(instance)s', + help='Prefix each line of exception output with this format.'), + cfg.ListOpt('default_log_levels', + default=DEFAULT_LOG_LEVELS, + help='List of logger=LEVEL pairs.'), + cfg.BoolOpt('publish_errors', + default=False, + help='Enables or disables publication of error events.'), + cfg.BoolOpt('fatal_deprecations', + default=False, + help='Enables or disables fatal status of deprecations.'), + + # NOTE(mikal): there are two options here because sometimes we are handed + # a full instance (and could include more information), and other times we + # are just handed a UUID for the instance. + cfg.StrOpt('instance_format', + default='[instance: %(uuid)s] ', + help='The format for an instance that is passed with the log ' + 'message.'), + cfg.StrOpt('instance_uuid_format', + default='[instance: %(uuid)s] ', + help='The format for an instance UUID that is passed with the ' + 'log message.'), +] + +CONF = cfg.CONF +CONF.register_cli_opts(common_cli_opts) +CONF.register_cli_opts(logging_cli_opts) +CONF.register_opts(generic_log_opts) +CONF.register_opts(log_opts) + +# our new audit level +# NOTE(jkoelker) Since we synthesized an audit level, make the logging +# module aware of it so it acts like other levels. +logging.AUDIT = logging.INFO + 1 +logging.addLevelName(logging.AUDIT, 'AUDIT') + + +try: + NullHandler = logging.NullHandler +except AttributeError: # NOTE(jkoelker) NullHandler added in Python 2.7 + class NullHandler(logging.Handler): + def handle(self, record): + pass + + def emit(self, record): + pass + + def createLock(self): + self.lock = None + + +def _dictify_context(context): + if context is None: + return None + if not isinstance(context, dict) and getattr(context, 'to_dict', None): + context = context.to_dict() + return context + + +def _get_binary_name(): + return os.path.basename(inspect.stack()[-1][1]) + + +def _get_log_file_path(binary=None): + logfile = CONF.log_file + logdir = CONF.log_dir + + if logfile and not logdir: + return logfile + + if logfile and logdir: + return os.path.join(logdir, logfile) + + if logdir: + binary = binary or _get_binary_name() + return '%s.log' % (os.path.join(logdir, binary),) + + return None + + +class BaseLoggerAdapter(logging.LoggerAdapter): + + def audit(self, msg, *args, **kwargs): + self.log(logging.AUDIT, msg, *args, **kwargs) + + def isEnabledFor(self, level): + if _PY26: + # This method was added in python 2.7 (and it does the exact + # same logic, so we need to do the exact same logic so that + # python 2.6 has this capability as well). + return self.logger.isEnabledFor(level) + else: + return super(BaseLoggerAdapter, self).isEnabledFor(level) + + +class LazyAdapter(BaseLoggerAdapter): + def __init__(self, name='unknown', version='unknown'): + self._logger = None + self.extra = {} + self.name = name + self.version = version + + @property + def logger(self): + if not self._logger: + self._logger = getLogger(self.name, self.version) + if six.PY3: + # In Python 3, the code fails because the 'manager' attribute + # cannot be found when using a LoggerAdapter as the + # underlying logger. Work around this issue. + self._logger.manager = self._logger.logger.manager + return self._logger + + +class ContextAdapter(BaseLoggerAdapter): + warn = logging.LoggerAdapter.warning + + def __init__(self, logger, project_name, version_string): + self.logger = logger + self.project = project_name + self.version = version_string + self._deprecated_messages_sent = dict() + + @property + def handlers(self): + return self.logger.handlers + + def deprecated(self, msg, *args, **kwargs): + """Call this method when a deprecated feature is used. + + If the system is configured for fatal deprecations then the message + is logged at the 'critical' level and :class:`DeprecatedConfig` will + be raised. + + Otherwise, the message will be logged (once) at the 'warn' level. + + :raises: :class:`DeprecatedConfig` if the system is configured for + fatal deprecations. + + """ + stdmsg = _("Deprecated: %s") % msg + if CONF.fatal_deprecations: + self.critical(stdmsg, *args, **kwargs) + raise DeprecatedConfig(msg=stdmsg) + + # Using a list because a tuple with dict can't be stored in a set. + sent_args = self._deprecated_messages_sent.setdefault(msg, list()) + + if args in sent_args: + # Already logged this message, so don't log it again. + return + + sent_args.append(args) + self.warn(stdmsg, *args, **kwargs) + + def process(self, msg, kwargs): + # NOTE(jecarey): If msg is not unicode, coerce it into unicode + # before it can get to the python logging and + # possibly cause string encoding trouble + if not isinstance(msg, six.text_type): + msg = six.text_type(msg) + + if 'extra' not in kwargs: + kwargs['extra'] = {} + extra = kwargs['extra'] + + context = kwargs.pop('context', None) + if not context: + context = getattr(local.store, 'context', None) + if context: + extra.update(_dictify_context(context)) + + instance = kwargs.pop('instance', None) + instance_uuid = (extra.get('instance_uuid') or + kwargs.pop('instance_uuid', None)) + instance_extra = '' + if instance: + instance_extra = CONF.instance_format % instance + elif instance_uuid: + instance_extra = (CONF.instance_uuid_format + % {'uuid': instance_uuid}) + extra['instance'] = instance_extra + + extra.setdefault('user_identity', kwargs.pop('user_identity', None)) + + extra['project'] = self.project + extra['version'] = self.version + extra['extra'] = extra.copy() + return msg, kwargs + + +class JSONFormatter(logging.Formatter): + def __init__(self, fmt=None, datefmt=None): + # NOTE(jkoelker) we ignore the fmt argument, but its still there + # since logging.config.fileConfig passes it. + self.datefmt = datefmt + + def formatException(self, ei, strip_newlines=True): + lines = traceback.format_exception(*ei) + if strip_newlines: + lines = [moves.filter( + lambda x: x, + line.rstrip().splitlines()) for line in lines] + lines = list(itertools.chain(*lines)) + return lines + + def format(self, record): + message = {'message': record.getMessage(), + 'asctime': self.formatTime(record, self.datefmt), + 'name': record.name, + 'msg': record.msg, + 'args': record.args, + 'levelname': record.levelname, + 'levelno': record.levelno, + 'pathname': record.pathname, + 'filename': record.filename, + 'module': record.module, + 'lineno': record.lineno, + 'funcname': record.funcName, + 'created': record.created, + 'msecs': record.msecs, + 'relative_created': record.relativeCreated, + 'thread': record.thread, + 'thread_name': record.threadName, + 'process_name': record.processName, + 'process': record.process, + 'traceback': None} + + if hasattr(record, 'extra'): + message['extra'] = record.extra + + if record.exc_info: + message['traceback'] = self.formatException(record.exc_info) + + return jsonutils.dumps(message) + + +def _create_logging_excepthook(product_name): + def logging_excepthook(exc_type, value, tb): + extra = {'exc_info': (exc_type, value, tb)} + getLogger(product_name).critical( + "".join(traceback.format_exception_only(exc_type, value)), + **extra) + return logging_excepthook + + +class LogConfigError(Exception): + + message = _('Error loading logging config %(log_config)s: %(err_msg)s') + + def __init__(self, log_config, err_msg): + self.log_config = log_config + self.err_msg = err_msg + + def __str__(self): + return self.message % dict(log_config=self.log_config, + err_msg=self.err_msg) + + +def _load_log_config(log_config_append): + try: + logging.config.fileConfig(log_config_append, + disable_existing_loggers=False) + except (moves.configparser.Error, KeyError) as exc: + raise LogConfigError(log_config_append, six.text_type(exc)) + + +def setup(product_name, version='unknown'): + """Setup logging.""" + if CONF.log_config_append: + _load_log_config(CONF.log_config_append) + else: + _setup_logging_from_conf(product_name, version) + sys.excepthook = _create_logging_excepthook(product_name) + + +def set_defaults(logging_context_format_string=None, + default_log_levels=None): + # Just in case the caller is not setting the + # default_log_level. This is insurance because + # we introduced the default_log_level parameter + # later in a backwards in-compatible change + if default_log_levels is not None: + cfg.set_defaults( + log_opts, + default_log_levels=default_log_levels) + if logging_context_format_string is not None: + cfg.set_defaults( + log_opts, + logging_context_format_string=logging_context_format_string) + + +def _find_facility_from_conf(): + facility_names = logging.handlers.SysLogHandler.facility_names + facility = getattr(logging.handlers.SysLogHandler, + CONF.syslog_log_facility, + None) + + if facility is None and CONF.syslog_log_facility in facility_names: + facility = facility_names.get(CONF.syslog_log_facility) + + if facility is None: + valid_facilities = facility_names.keys() + consts = ['LOG_AUTH', 'LOG_AUTHPRIV', 'LOG_CRON', 'LOG_DAEMON', + 'LOG_FTP', 'LOG_KERN', 'LOG_LPR', 'LOG_MAIL', 'LOG_NEWS', + 'LOG_AUTH', 'LOG_SYSLOG', 'LOG_USER', 'LOG_UUCP', + 'LOG_LOCAL0', 'LOG_LOCAL1', 'LOG_LOCAL2', 'LOG_LOCAL3', + 'LOG_LOCAL4', 'LOG_LOCAL5', 'LOG_LOCAL6', 'LOG_LOCAL7'] + valid_facilities.extend(consts) + raise TypeError(_('syslog facility must be one of: %s') % + ', '.join("'%s'" % fac + for fac in valid_facilities)) + + return facility + + +class RFCSysLogHandler(logging.handlers.SysLogHandler): + def __init__(self, *args, **kwargs): + self.binary_name = _get_binary_name() + # Do not use super() unless type(logging.handlers.SysLogHandler) + # is 'type' (Python 2.7). + # Use old style calls, if the type is 'classobj' (Python 2.6) + logging.handlers.SysLogHandler.__init__(self, *args, **kwargs) + + def format(self, record): + # Do not use super() unless type(logging.handlers.SysLogHandler) + # is 'type' (Python 2.7). + # Use old style calls, if the type is 'classobj' (Python 2.6) + msg = logging.handlers.SysLogHandler.format(self, record) + msg = self.binary_name + ' ' + msg + return msg + + +def _setup_logging_from_conf(project, version): + log_root = getLogger(None).logger + for handler in log_root.handlers: + log_root.removeHandler(handler) + + logpath = _get_log_file_path() + if logpath: + filelog = logging.handlers.WatchedFileHandler(logpath) + log_root.addHandler(filelog) + + if CONF.use_stderr: + streamlog = ColorHandler() + log_root.addHandler(streamlog) + + elif not logpath: + # pass sys.stdout as a positional argument + # python2.6 calls the argument strm, in 2.7 it's stream + streamlog = logging.StreamHandler(sys.stdout) + log_root.addHandler(streamlog) + + if CONF.publish_errors: + try: + handler = importutils.import_object( + "oslo.log.openstack.common.log_handler.PublishErrorsHandler", + logging.ERROR) + except ImportError: + handler = importutils.import_object( + "oslo.messaging.notify.log_handler.PublishErrorsHandler", + logging.ERROR) + log_root.addHandler(handler) + + datefmt = CONF.log_date_format + for handler in log_root.handlers: + # NOTE(alaski): CONF.log_format overrides everything currently. This + # should be deprecated in favor of context aware formatting. + if CONF.log_format: + handler.setFormatter(logging.Formatter(fmt=CONF.log_format, + datefmt=datefmt)) + log_root.info('Deprecated: log_format is now deprecated and will ' + 'be removed in the next release') + else: + handler.setFormatter(ContextFormatter(project=project, + version=version, + datefmt=datefmt)) + + if CONF.debug: + log_root.setLevel(logging.DEBUG) + elif CONF.verbose: + log_root.setLevel(logging.INFO) + else: + log_root.setLevel(logging.WARNING) + + for pair in CONF.default_log_levels: + mod, _sep, level_name = pair.partition('=') + logger = logging.getLogger(mod) + # NOTE(AAzza) in python2.6 Logger.setLevel doesn't convert string name + # to integer code. + if sys.version_info < (2, 7): + level = logging.getLevelName(level_name) + logger.setLevel(level) + else: + logger.setLevel(level_name) + + if CONF.use_syslog: + try: + facility = _find_facility_from_conf() + # TODO(bogdando) use the format provided by RFCSysLogHandler + # after existing syslog format deprecation in J + if CONF.use_syslog_rfc_format: + syslog = RFCSysLogHandler(facility=facility) + else: + syslog = logging.handlers.SysLogHandler(facility=facility) + log_root.addHandler(syslog) + except socket.error: + log_root.error('Unable to add syslog handler. Verify that syslog' + 'is running.') + + +_loggers = {} + + +def getLogger(name='unknown', version='unknown'): + if name not in _loggers: + _loggers[name] = ContextAdapter(logging.getLogger(name), + name, + version) + return _loggers[name] + + +def getLazyLogger(name='unknown', version='unknown'): + """Returns lazy logger. + + Creates a pass-through logger that does not create the real logger + until it is really needed and delegates all calls to the real logger + once it is created. + """ + return LazyAdapter(name, version) + + +class WritableLogger(object): + """A thin wrapper that responds to `write` and logs.""" + + def __init__(self, logger, level=logging.INFO): + self.logger = logger + self.level = level + + def write(self, msg): + self.logger.log(self.level, msg.rstrip()) + + +class ContextFormatter(logging.Formatter): + """A context.RequestContext aware formatter configured through flags. + + The flags used to set format strings are: logging_context_format_string + and logging_default_format_string. You can also specify + logging_debug_format_suffix to append extra formatting if the log level is + debug. + + For information about what variables are available for the formatter see: + http://docs.python.org/library/logging.html#formatter + + If available, uses the context value stored in TLS - local.store.context + + """ + + def __init__(self, *args, **kwargs): + """Initialize ContextFormatter instance + + Takes additional keyword arguments which can be used in the message + format string. + + :keyword project: project name + :type project: string + :keyword version: project version + :type version: string + + """ + + self.project = kwargs.pop('project', 'unknown') + self.version = kwargs.pop('version', 'unknown') + + logging.Formatter.__init__(self, *args, **kwargs) + + def format(self, record): + """Uses contextstring if request_id is set, otherwise default.""" + + # NOTE(jecarey): If msg is not unicode, coerce it into unicode + # before it can get to the python logging and + # possibly cause string encoding trouble + if not isinstance(record.msg, six.text_type): + record.msg = six.text_type(record.msg) + + # store project info + record.project = self.project + record.version = self.version + + # store request info + context = getattr(local.store, 'context', None) + if context: + d = _dictify_context(context) + for k, v in d.items(): + setattr(record, k, v) + + # NOTE(sdague): default the fancier formatting params + # to an empty string so we don't throw an exception if + # they get used + for key in ('instance', 'color', 'user_identity'): + if key not in record.__dict__: + record.__dict__[key] = '' + + if record.__dict__.get('request_id'): + fmt = CONF.logging_context_format_string + else: + fmt = CONF.logging_default_format_string + + if (record.levelno == logging.DEBUG and + CONF.logging_debug_format_suffix): + fmt += " " + CONF.logging_debug_format_suffix + + if sys.version_info < (3, 2): + self._fmt = fmt + else: + self._style = logging.PercentStyle(fmt) + self._fmt = self._style._fmt + # Cache this on the record, Logger will respect our formatted copy + if record.exc_info: + record.exc_text = self.formatException(record.exc_info, record) + return logging.Formatter.format(self, record) + + def formatException(self, exc_info, record=None): + """Format exception output with CONF.logging_exception_prefix.""" + if not record: + return logging.Formatter.formatException(self, exc_info) + + stringbuffer = moves.StringIO() + traceback.print_exception(exc_info[0], exc_info[1], exc_info[2], + None, stringbuffer) + lines = stringbuffer.getvalue().split('\n') + stringbuffer.close() + + if CONF.logging_exception_prefix.find('%(asctime)') != -1: + record.asctime = self.formatTime(record, self.datefmt) + + formatted_lines = [] + for line in lines: + pl = CONF.logging_exception_prefix % record.__dict__ + fl = '%s%s' % (pl, line) + formatted_lines.append(fl) + return '\n'.join(formatted_lines) + + +class ColorHandler(logging.StreamHandler): + LEVEL_COLORS = { + logging.DEBUG: '\033[00;32m', # GREEN + logging.INFO: '\033[00;36m', # CYAN + logging.AUDIT: '\033[01;36m', # BOLD CYAN + logging.WARN: '\033[01;33m', # BOLD YELLOW + logging.ERROR: '\033[01;31m', # BOLD RED + logging.CRITICAL: '\033[01;31m', # BOLD RED + } + + def format(self, record): + record.color = self.LEVEL_COLORS[record.levelno] + return logging.StreamHandler.format(self, record) + + +class DeprecatedConfig(Exception): + message = _("Fatal call to deprecated config: %(msg)s") + + def __init__(self, msg): + super(Exception, self).__init__(self.message % dict(msg=msg)) diff --git a/oslo/log/openstack/common/log_handler.py b/oslo/log/openstack/common/log_handler.py new file mode 100644 index 0000000..a7f3ae9 --- /dev/null +++ b/oslo/log/openstack/common/log_handler.py @@ -0,0 +1,30 @@ +# Copyright 2013 IBM Corp. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +from oslo.config import cfg + +from oslo.log.openstack.common import notifier + + +class PublishErrorsHandler(logging.Handler): + def emit(self, record): + if ('openstack.common.notifier.log_notifier' in + cfg.CONF.notification_driver): + return + notifier.api.notify(None, 'error.publisher', + 'error_notification', + notifier.api.ERROR, + dict(error=record.getMessage())) diff --git a/oslo/log/openstack/common/loopingcall.py b/oslo/log/openstack/common/loopingcall.py new file mode 100644 index 0000000..a468fcc --- /dev/null +++ b/oslo/log/openstack/common/loopingcall.py @@ -0,0 +1,147 @@ +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# Copyright 2011 Justin Santa Barbara +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import sys +import time + +from eventlet import event +from eventlet import greenthread + +from oslo.log.openstack.common.gettextutils import _LE, _LW +from oslo.log.openstack.common import log as logging + +LOG = logging.getLogger(__name__) + +# NOTE(zyluo): This lambda function was declared to avoid mocking collisions +# with time.time() called in the standard logging module +# during unittests. +_ts = lambda: time.time() + + +class LoopingCallDone(Exception): + """Exception to break out and stop a LoopingCallBase. + + The poll-function passed to LoopingCallBase can raise this exception to + break out of the loop normally. This is somewhat analogous to + StopIteration. + + An optional return-value can be included as the argument to the exception; + this return-value will be returned by LoopingCallBase.wait() + + """ + + def __init__(self, retvalue=True): + """:param retvalue: Value that LoopingCallBase.wait() should return.""" + self.retvalue = retvalue + + +class LoopingCallBase(object): + def __init__(self, f=None, *args, **kw): + self.args = args + self.kw = kw + self.f = f + self._running = False + self.done = None + + def stop(self): + self._running = False + + def wait(self): + return self.done.wait() + + +class FixedIntervalLoopingCall(LoopingCallBase): + """A fixed interval looping call.""" + + def start(self, interval, initial_delay=None): + self._running = True + done = event.Event() + + def _inner(): + if initial_delay: + greenthread.sleep(initial_delay) + + try: + while self._running: + start = _ts() + self.f(*self.args, **self.kw) + end = _ts() + if not self._running: + break + delay = end - start - interval + if delay > 0: + LOG.warn(_LW('task %(func_name)s run outlasted ' + 'interval by %(delay).2f sec'), + {'func_name': repr(self.f), 'delay': delay}) + greenthread.sleep(-delay if delay < 0 else 0) + except LoopingCallDone as e: + self.stop() + done.send(e.retvalue) + except Exception: + LOG.exception(_LE('in fixed duration looping call')) + done.send_exception(*sys.exc_info()) + return + else: + done.send(True) + + self.done = done + + greenthread.spawn_n(_inner) + return self.done + + +class DynamicLoopingCall(LoopingCallBase): + """A looping call which sleeps until the next known event. + + The function called should return how long to sleep for before being + called again. + """ + + def start(self, initial_delay=None, periodic_interval_max=None): + self._running = True + done = event.Event() + + def _inner(): + if initial_delay: + greenthread.sleep(initial_delay) + + try: + while self._running: + idle = self.f(*self.args, **self.kw) + if not self._running: + break + + if periodic_interval_max is not None: + idle = min(idle, periodic_interval_max) + LOG.debug('Dynamic looping call %(func_name)s sleeping ' + 'for %(idle).02f seconds', + {'func_name': repr(self.f), 'idle': idle}) + greenthread.sleep(idle) + except LoopingCallDone as e: + self.stop() + done.send(e.retvalue) + except Exception: + LOG.exception(_LE('in dynamic looping call')) + done.send_exception(*sys.exc_info()) + return + else: + done.send(True) + + self.done = done + + greenthread.spawn(_inner) + return self.done diff --git a/oslo/log/openstack/common/network_utils.py b/oslo/log/openstack/common/network_utils.py new file mode 100644 index 0000000..9cf4ab6 --- /dev/null +++ b/oslo/log/openstack/common/network_utils.py @@ -0,0 +1,163 @@ +# Copyright 2012 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Network-related utilities and helper functions. +""" + +import logging +import socket + +from six.moves.urllib import parse + +from oslo.log.openstack.common.gettextutils import _LW + +LOG = logging.getLogger(__name__) + + +def parse_host_port(address, default_port=None): + """Interpret a string as a host:port pair. + + An IPv6 address MUST be escaped if accompanied by a port, + because otherwise ambiguity ensues: 2001:db8:85a3::8a2e:370:7334 + means both [2001:db8:85a3::8a2e:370:7334] and + [2001:db8:85a3::8a2e:370]:7334. + + >>> parse_host_port('server01:80') + ('server01', 80) + >>> parse_host_port('server01') + ('server01', None) + >>> parse_host_port('server01', default_port=1234) + ('server01', 1234) + >>> parse_host_port('[::1]:80') + ('::1', 80) + >>> parse_host_port('[::1]') + ('::1', None) + >>> parse_host_port('[::1]', default_port=1234) + ('::1', 1234) + >>> parse_host_port('2001:db8:85a3::8a2e:370:7334', default_port=1234) + ('2001:db8:85a3::8a2e:370:7334', 1234) + >>> parse_host_port(None) + (None, None) + """ + if not address: + return (None, None) + + if address[0] == '[': + # Escaped ipv6 + _host, _port = address[1:].split(']') + host = _host + if ':' in _port: + port = _port.split(':')[1] + else: + port = default_port + else: + if address.count(':') == 1: + host, port = address.split(':') + else: + # 0 means ipv4, >1 means ipv6. + # We prohibit unescaped ipv6 addresses with port. + host = address + port = default_port + + return (host, None if port is None else int(port)) + + +class ModifiedSplitResult(parse.SplitResult): + """Split results class for urlsplit.""" + + # NOTE(dims): The functions below are needed for Python 2.6.x. + # We can remove these when we drop support for 2.6.x. + @property + def hostname(self): + netloc = self.netloc.split('@', 1)[-1] + host, port = parse_host_port(netloc) + return host + + @property + def port(self): + netloc = self.netloc.split('@', 1)[-1] + host, port = parse_host_port(netloc) + return port + + +def urlsplit(url, scheme='', allow_fragments=True): + """Parse a URL using urlparse.urlsplit(), splitting query and fragments. + This function papers over Python issue9374 when needed. + + The parameters are the same as urlparse.urlsplit. + """ + scheme, netloc, path, query, fragment = parse.urlsplit( + url, scheme, allow_fragments) + if allow_fragments and '#' in path: + path, fragment = path.split('#', 1) + if '?' in path: + path, query = path.split('?', 1) + return ModifiedSplitResult(scheme, netloc, + path, query, fragment) + + +def set_tcp_keepalive(sock, tcp_keepalive=True, + tcp_keepidle=None, + tcp_keepalive_interval=None, + tcp_keepalive_count=None): + """Set values for tcp keepalive parameters + + This function configures tcp keepalive parameters if users wish to do + so. + + :param tcp_keepalive: Boolean, turn on or off tcp_keepalive. If users are + not sure, this should be True, and default values will be used. + + :param tcp_keepidle: time to wait before starting to send keepalive probes + :param tcp_keepalive_interval: time between successive probes, once the + initial wait time is over + :param tcp_keepalive_count: number of probes to send before the connection + is killed + """ + + # NOTE(praneshp): Despite keepalive being a tcp concept, the level is + # still SOL_SOCKET. This is a quirk. + if isinstance(tcp_keepalive, bool): + sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, tcp_keepalive) + else: + raise TypeError("tcp_keepalive must be a boolean") + + if not tcp_keepalive: + return + + # These options aren't available in the OS X version of eventlet, + # Idle + Count * Interval effectively gives you the total timeout. + if tcp_keepidle is not None: + if hasattr(socket, 'TCP_KEEPIDLE'): + sock.setsockopt(socket.IPPROTO_TCP, + socket.TCP_KEEPIDLE, + tcp_keepidle) + else: + LOG.warning(_LW('tcp_keepidle not available on your system')) + if tcp_keepalive_interval is not None: + if hasattr(socket, 'TCP_KEEPINTVL'): + sock.setsockopt(socket.IPPROTO_TCP, + socket.TCP_KEEPINTVL, + tcp_keepalive_interval) + else: + LOG.warning(_LW('tcp_keepintvl not available on your system')) + if tcp_keepalive_count is not None: + if hasattr(socket, 'TCP_KEEPCNT'): + sock.setsockopt(socket.IPPROTO_TCP, + socket.TCP_KEEPCNT, + tcp_keepalive_count) + else: + LOG.warning(_LW('tcp_keepknt not available on your system')) diff --git a/oslo/log/openstack/common/notifier/__init__.py b/oslo/log/openstack/common/notifier/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/oslo/log/openstack/common/notifier/__init__.py diff --git a/oslo/log/openstack/common/notifier/api.py b/oslo/log/openstack/common/notifier/api.py new file mode 100644 index 0000000..db815e6 --- /dev/null +++ b/oslo/log/openstack/common/notifier/api.py @@ -0,0 +1,172 @@ +# Copyright 2011 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import socket +import uuid + +from oslo.config import cfg + +from oslo.log.openstack.common import context +from oslo.log.openstack.common.gettextutils import _, _LE +from oslo.log.openstack.common import importutils +from oslo.log.openstack.common import jsonutils +from oslo.log.openstack.common import log as logging +from oslo.log.openstack.common import timeutils + + +LOG = logging.getLogger(__name__) + +notifier_opts = [ + cfg.MultiStrOpt('notification_driver', + default=[], + help='Driver or drivers to handle sending notifications'), + cfg.StrOpt('default_notification_level', + default='INFO', + help='Default notification level for outgoing notifications'), + cfg.StrOpt('default_publisher_id', + help='Default publisher_id for outgoing notifications'), +] + +CONF = cfg.CONF +CONF.register_opts(notifier_opts) + +WARN = 'WARN' +INFO = 'INFO' +ERROR = 'ERROR' +CRITICAL = 'CRITICAL' +DEBUG = 'DEBUG' + +log_levels = (DEBUG, WARN, INFO, ERROR, CRITICAL) + + +class BadPriorityException(Exception): + pass + + +def notify_decorator(name, fn): + """Decorator for notify which is used from utils.monkey_patch(). + + :param name: name of the function + :param function: - object of the function + :returns: function -- decorated function + + """ + def wrapped_func(*args, **kwarg): + body = {} + body['args'] = [] + body['kwarg'] = {} + for arg in args: + body['args'].append(arg) + for key in kwarg: + body['kwarg'][key] = kwarg[key] + + ctxt = context.get_context_from_function_and_args(fn, args, kwarg) + notify(ctxt, + CONF.default_publisher_id or socket.gethostname(), + name, + CONF.default_notification_level, + body) + return fn(*args, **kwarg) + return wrapped_func + + +def publisher_id(service, host=None): + if not host: + try: + host = CONF.host + except AttributeError: + host = CONF.default_publisher_id or socket.gethostname() + return "%s.%s" % (service, host) + + +def notify(context, publisher_id, event_type, priority, payload): + """Sends a notification using the specified driver + + :param publisher_id: the source worker_type.host of the message + :param event_type: the literal type of event (ex. Instance Creation) + :param priority: patterned after the enumeration of Python logging + levels in the set (DEBUG, WARN, INFO, ERROR, CRITICAL) + :param payload: A python dictionary of attributes + + Outgoing message format includes the above parameters, and appends the + following: + + message_id + a UUID representing the id for this notification + + timestamp + the GMT timestamp the notification was sent at + + The composite message will be constructed as a dictionary of the above + attributes, which will then be sent via the transport mechanism defined + by the driver. + + Message example:: + + {'message_id': str(uuid.uuid4()), + 'publisher_id': 'compute.host1', + 'timestamp': timeutils.utcnow(), + 'priority': 'WARN', + 'event_type': 'compute.create_instance', + 'payload': {'instance_id': 12, ... }} + + """ + if priority not in log_levels: + raise BadPriorityException( + _('%s not in valid priorities') % priority) + + # Ensure everything is JSON serializable. + payload = jsonutils.to_primitive(payload, convert_instances=True) + + msg = dict(message_id=str(uuid.uuid4()), + publisher_id=publisher_id, + event_type=event_type, + priority=priority, + payload=payload, + timestamp=str(timeutils.utcnow())) + + for driver in _get_drivers(): + try: + driver.notify(context, msg) + except Exception as e: + LOG.exception(_LE("Problem '%(e)s' attempting to " + "send to notification system. " + "Payload=%(payload)s") + % dict(e=e, payload=payload)) + + +_drivers = None + + +def _get_drivers(): + """Instantiate, cache, and return drivers based on the CONF.""" + global _drivers + if _drivers is None: + _drivers = {} + for notification_driver in CONF.notification_driver: + try: + driver = importutils.import_module(notification_driver) + _drivers[notification_driver] = driver + except ImportError: + LOG.exception(_LE("Failed to load notifier %s. " + "These notifications will not be sent.") % + notification_driver) + return _drivers.values() + + +def _reset_drivers(): + """Used by unit tests to reset the drivers.""" + global _drivers + _drivers = None diff --git a/oslo/log/openstack/common/notifier/log_notifier.py b/oslo/log/openstack/common/notifier/log_notifier.py new file mode 100644 index 0000000..d3395d2 --- /dev/null +++ b/oslo/log/openstack/common/notifier/log_notifier.py @@ -0,0 +1,37 @@ +# Copyright 2011 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo.config import cfg + +from oslo.log.openstack.common import jsonutils +from oslo.log.openstack.common import log as logging + + +CONF = cfg.CONF + + +def notify(_context, message): + """Notifies the recipient of the desired event given the model. + + Log notifications using OpenStack's default logging system. + """ + + priority = message.get('priority', + CONF.default_notification_level) + priority = priority.lower() + logger = logging.getLogger( + 'oslo.log.openstack.common.notification.%s' % + message['event_type']) + getattr(logger, priority)(jsonutils.dumps(message)) diff --git a/oslo/log/openstack/common/notifier/no_op_notifier.py b/oslo/log/openstack/common/notifier/no_op_notifier.py new file mode 100644 index 0000000..13d946e --- /dev/null +++ b/oslo/log/openstack/common/notifier/no_op_notifier.py @@ -0,0 +1,19 @@ +# Copyright 2011 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +def notify(_context, message): + """Notifies the recipient of the desired event given the model.""" + pass diff --git a/oslo/log/openstack/common/notifier/proxy.py b/oslo/log/openstack/common/notifier/proxy.py new file mode 100644 index 0000000..9e68f07 --- /dev/null +++ b/oslo/log/openstack/common/notifier/proxy.py @@ -0,0 +1,77 @@ +# Copyright 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +A temporary helper which emulates oslo.messaging.Notifier. + +This helper method allows us to do the tedious porting to the new Notifier API +as a standalone commit so that the commit which switches us to oslo.messaging +is smaller and easier to review. This file will be removed as part of that +commit. +""" + +from oslo.config import cfg + +from oslo.log.openstack.common.notifier import api as notifier_api + +CONF = cfg.CONF + + +class Notifier(object): + + def __init__(self, publisher_id): + super(Notifier, self).__init__() + self.publisher_id = publisher_id + + _marker = object() + + def prepare(self, publisher_id=_marker): + ret = self.__class__(self.publisher_id) + if publisher_id is not self._marker: + ret.publisher_id = publisher_id + return ret + + def _notify(self, ctxt, event_type, payload, priority): + notifier_api.notify(ctxt, + self.publisher_id, + event_type, + priority, + payload) + + def audit(self, ctxt, event_type, payload): + # No audit in old notifier. + self._notify(ctxt, event_type, payload, 'INFO') + + def debug(self, ctxt, event_type, payload): + self._notify(ctxt, event_type, payload, 'DEBUG') + + def info(self, ctxt, event_type, payload): + self._notify(ctxt, event_type, payload, 'INFO') + + def warn(self, ctxt, event_type, payload): + self._notify(ctxt, event_type, payload, 'WARN') + + warning = warn + + def error(self, ctxt, event_type, payload): + self._notify(ctxt, event_type, payload, 'ERROR') + + def critical(self, ctxt, event_type, payload): + self._notify(ctxt, event_type, payload, 'CRITICAL') + + +def get_notifier(service=None, host=None, publisher_id=None): + if not publisher_id: + publisher_id = "%s.%s" % (service, host or CONF.host) + return Notifier(publisher_id) diff --git a/oslo/log/openstack/common/notifier/rpc_notifier.py b/oslo/log/openstack/common/notifier/rpc_notifier.py new file mode 100644 index 0000000..c3d9dba --- /dev/null +++ b/oslo/log/openstack/common/notifier/rpc_notifier.py @@ -0,0 +1,47 @@ +# Copyright 2011 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo.config import cfg + +from oslo.log.openstack.common import context as req_context +from oslo.log.openstack.common.gettextutils import _LE +from oslo.log.openstack.common import log as logging +from oslo.log.openstack.common import rpc + +LOG = logging.getLogger(__name__) + +notification_topic_opt = cfg.ListOpt( + 'notification_topics', default=['notifications', ], + help='AMQP topic used for OpenStack notifications') + +CONF = cfg.CONF +CONF.register_opt(notification_topic_opt) + + +def notify(context, message): + """Sends a notification via RPC.""" + if not context: + context = req_context.get_admin_context() + priority = message.get('priority', + CONF.default_notification_level) + priority = priority.lower() + for topic in CONF.notification_topics: + topic = '%s.%s' % (topic, priority) + try: + rpc.notify(context, topic, message) + except Exception: + LOG.exception(_LE("Could not send notification to %(topic)s. " + "Payload=%(message)s"), + {"topic": topic, "message": message}) diff --git a/oslo/log/openstack/common/notifier/rpc_notifier2.py b/oslo/log/openstack/common/notifier/rpc_notifier2.py new file mode 100644 index 0000000..7d9169a --- /dev/null +++ b/oslo/log/openstack/common/notifier/rpc_notifier2.py @@ -0,0 +1,53 @@ +# Copyright 2011 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +'''messaging based notification driver, with message envelopes''' + +from oslo.config import cfg + +from oslo.log.openstack.common import context as req_context +from oslo.log.openstack.common.gettextutils import _LE +from oslo.log.openstack.common import log as logging +from oslo.log.openstack.common import rpc + +LOG = logging.getLogger(__name__) + +notification_topic_opt = cfg.ListOpt( + 'topics', default=['notifications', ], + help='AMQP topic(s) used for OpenStack notifications') + +opt_group = cfg.OptGroup(name='rpc_notifier2', + title='Options for rpc_notifier2') + +CONF = cfg.CONF +CONF.register_group(opt_group) +CONF.register_opt(notification_topic_opt, opt_group) + + +def notify(context, message): + """Sends a notification via RPC.""" + if not context: + context = req_context.get_admin_context() + priority = message.get('priority', + CONF.default_notification_level) + priority = priority.lower() + for topic in CONF.rpc_notifier2.topics: + topic = '%s.%s' % (topic, priority) + try: + rpc.notify(context, topic, message, envelope=True) + except Exception: + LOG.exception(_LE("Could not send notification to %(topic)s. " + "Payload=%(message)s"), + {"topic": topic, "message": message}) diff --git a/oslo/log/openstack/common/notifier/test_notifier.py b/oslo/log/openstack/common/notifier/test_notifier.py new file mode 100644 index 0000000..11fc21f --- /dev/null +++ b/oslo/log/openstack/common/notifier/test_notifier.py @@ -0,0 +1,21 @@ +# Copyright 2011 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +NOTIFICATIONS = [] + + +def notify(_context, message): + """Test notifier, stores notifications in memory for unittests.""" + NOTIFICATIONS.append(message) diff --git a/oslo/log/openstack/common/rpc/__init__.py b/oslo/log/openstack/common/rpc/__init__.py new file mode 100644 index 0000000..e048412 --- /dev/null +++ b/oslo/log/openstack/common/rpc/__init__.py @@ -0,0 +1,275 @@ +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# Copyright 2011 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +A remote procedure call (rpc) abstraction. + +For some wrappers that add message versioning to rpc, see: + rpc.dispatcher + rpc.proxy +""" + +from oslo.config import cfg + +from oslo.log.openstack.common import importutils +from oslo.log.openstack.common import log as logging + + +LOG = logging.getLogger(__name__) + + +rpc_opts = [ + cfg.StrOpt('rpc_backend', + default='%s.impl_kombu' % __package__, + help="The messaging module to use, defaults to kombu."), + cfg.IntOpt('rpc_thread_pool_size', + default=64, + help='Size of RPC thread pool'), + cfg.IntOpt('rpc_conn_pool_size', + default=30, + help='Size of RPC connection pool'), + cfg.IntOpt('rpc_response_timeout', + default=60, + help='Seconds to wait for a response from call or multicall'), + cfg.IntOpt('rpc_cast_timeout', + default=30, + help='Seconds to wait before a cast expires (TTL). ' + 'Only supported by impl_zmq.'), + cfg.ListOpt('allowed_rpc_exception_modules', + default=['nova.exception', + 'cinder.exception', + 'exceptions', + ], + help='Modules of exceptions that are permitted to be recreated' + ' upon receiving exception data from an rpc call.'), + cfg.BoolOpt('fake_rabbit', + default=False, + help='If passed, use a fake RabbitMQ provider'), + cfg.StrOpt('control_exchange', + default='openstack', + help='AMQP exchange to connect to if using RabbitMQ or Qpid'), +] + +CONF = cfg.CONF +CONF.register_opts(rpc_opts) + + +def set_defaults(control_exchange): + cfg.set_defaults(rpc_opts, + control_exchange=control_exchange) + + +def create_connection(new=True): + """Create a connection to the message bus used for rpc. + + For some example usage of creating a connection and some consumers on that + connection, see nova.service. + + :param new: Whether or not to create a new connection. A new connection + will be created by default. If new is False, the + implementation is free to return an existing connection from a + pool. + + :returns: An instance of openstack.common.rpc.common.Connection + """ + return _get_impl().create_connection(CONF, new=new) + + +def call(context, topic, msg, timeout=None): + """Invoke a remote method that returns something. + + :param context: Information that identifies the user that has made this + request. + :param topic: The topic to send the rpc message to. This correlates to the + topic argument of + openstack.common.rpc.common.Connection.create_consumer() + and only applies when the consumer was created with + fanout=False. + :param msg: This is a dict in the form { "method" : "method_to_invoke", + "args" : dict_of_kwargs } + :param timeout: int, number of seconds to use for a response timeout. + If set, this overrides the rpc_response_timeout option. + + :returns: A dict from the remote method. + + :raises: openstack.common.rpc.common.Timeout if a complete response + is not received before the timeout is reached. + """ + return _get_impl().call(CONF, context, topic, msg, timeout) + + +def cast(context, topic, msg): + """Invoke a remote method that does not return anything. + + :param context: Information that identifies the user that has made this + request. + :param topic: The topic to send the rpc message to. This correlates to the + topic argument of + openstack.common.rpc.common.Connection.create_consumer() + and only applies when the consumer was created with + fanout=False. + :param msg: This is a dict in the form { "method" : "method_to_invoke", + "args" : dict_of_kwargs } + + :returns: None + """ + return _get_impl().cast(CONF, context, topic, msg) + + +def fanout_cast(context, topic, msg): + """Broadcast a remote method invocation with no return. + + This method will get invoked on all consumers that were set up with this + topic name and fanout=True. + + :param context: Information that identifies the user that has made this + request. + :param topic: The topic to send the rpc message to. This correlates to the + topic argument of + openstack.common.rpc.common.Connection.create_consumer() + and only applies when the consumer was created with + fanout=True. + :param msg: This is a dict in the form { "method" : "method_to_invoke", + "args" : dict_of_kwargs } + + :returns: None + """ + return _get_impl().fanout_cast(CONF, context, topic, msg) + + +def multicall(context, topic, msg, timeout=None): + """Invoke a remote method and get back an iterator. + + In this case, the remote method will be returning multiple values in + separate messages, so the return values can be processed as the come in via + an iterator. + + :param context: Information that identifies the user that has made this + request. + :param topic: The topic to send the rpc message to. This correlates to the + topic argument of + openstack.common.rpc.common.Connection.create_consumer() + and only applies when the consumer was created with + fanout=False. + :param msg: This is a dict in the form { "method" : "method_to_invoke", + "args" : dict_of_kwargs } + :param timeout: int, number of seconds to use for a response timeout. + If set, this overrides the rpc_response_timeout option. + + :returns: An iterator. The iterator will yield a tuple (N, X) where N is + an index that starts at 0 and increases by one for each value + returned and X is the Nth value that was returned by the remote + method. + + :raises: openstack.common.rpc.common.Timeout if a complete response + is not received before the timeout is reached. + """ + return _get_impl().multicall(CONF, context, topic, msg, timeout) + + +def notify(context, topic, msg, envelope=False): + """Send notification event. + + :param context: Information that identifies the user that has made this + request. + :param topic: The topic to send the notification to. + :param msg: This is a dict of content of event. + :param envelope: Set to True to enable message envelope for notifications. + + :returns: None + """ + return _get_impl().notify(cfg.CONF, context, topic, msg, envelope) + + +def cleanup(): + """Clean up resources in use by implementation. + + Clean up any resources that have been allocated by the RPC implementation. + This is typically open connections to a messaging service. This function + would get called before an application using this API exits to allow + connections to get torn down cleanly. + + :returns: None + """ + return _get_impl().cleanup() + + +def cast_to_server(context, server_params, topic, msg): + """Invoke a remote method that does not return anything. + + :param context: Information that identifies the user that has made this + request. + :param server_params: Connection information + :param topic: The topic to send the notification to. + :param msg: This is a dict in the form { "method" : "method_to_invoke", + "args" : dict_of_kwargs } + + :returns: None + """ + return _get_impl().cast_to_server(CONF, context, server_params, topic, + msg) + + +def fanout_cast_to_server(context, server_params, topic, msg): + """Broadcast to a remote method invocation with no return. + + :param context: Information that identifies the user that has made this + request. + :param server_params: Connection information + :param topic: The topic to send the notification to. + :param msg: This is a dict in the form { "method" : "method_to_invoke", + "args" : dict_of_kwargs } + + :returns: None + """ + return _get_impl().fanout_cast_to_server(CONF, context, server_params, + topic, msg) + + +def queue_get_for(context, topic, host): + """Get a queue name for a given topic + host. + + This function only works if this naming convention is followed on the + consumer side, as well. For example, in nova, every instance of the + nova-foo service calls create_consumer() for two topics: + + foo + foo.<host> + + Messages sent to the 'foo' topic are distributed to exactly one instance of + the nova-foo service. The services are chosen in a round-robin fashion. + Messages sent to the 'foo.<host>' topic are sent to the nova-foo service on + <host>. + """ + return '%s.%s' % (topic, host) if host else topic + + +_RPCIMPL = None + + +def _get_impl(): + """Delay import of rpc_backend until configuration is loaded.""" + global _RPCIMPL + if _RPCIMPL is None: + try: + _RPCIMPL = importutils.import_module(CONF.rpc_backend) + except ImportError: + # For backwards compatibility with older nova config. + impl = CONF.rpc_backend.replace('nova.rpc', + 'nova.openstack.common.rpc') + _RPCIMPL = importutils.import_module(impl) + return _RPCIMPL diff --git a/oslo/log/openstack/common/rpc/amqp.py b/oslo/log/openstack/common/rpc/amqp.py new file mode 100644 index 0000000..9fbd7f0 --- /dev/null +++ b/oslo/log/openstack/common/rpc/amqp.py @@ -0,0 +1,637 @@ +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# Copyright 2011 - 2012, Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Shared code between AMQP based openstack.common.rpc implementations. + +The code in this module is shared between the rpc implementations based on +AMQP. Specifically, this includes impl_kombu and impl_qpid. impl_carrot also +uses AMQP, but is deprecated and predates this code. +""" + +import collections +import inspect +import sys +import uuid + +from eventlet import greenpool +from eventlet import pools +from eventlet import queue +from eventlet import semaphore +from oslo.config import cfg +import six + + +from oslo.log.openstack.common import excutils +from oslo.log.openstack.common.gettextutils import _, _LE +from oslo.log.openstack.common import local +from oslo.log.openstack.common import log as logging +from oslo.log.openstack.common.rpc import common as rpc_common + + +amqp_opts = [ + cfg.BoolOpt('amqp_durable_queues', + default=False, + deprecated_name='rabbit_durable_queues', + deprecated_group='DEFAULT', + help='Use durable queues in amqp.'), + cfg.BoolOpt('amqp_auto_delete', + default=False, + help='Auto-delete queues in amqp.'), +] + +cfg.CONF.register_opts(amqp_opts) + +UNIQUE_ID = '_unique_id' +LOG = logging.getLogger(__name__) + + +class Pool(pools.Pool): + """Class that implements a Pool of Connections.""" + def __init__(self, conf, connection_cls, *args, **kwargs): + self.connection_cls = connection_cls + self.conf = conf + kwargs.setdefault("max_size", self.conf.rpc_conn_pool_size) + kwargs.setdefault("order_as_stack", True) + super(Pool, self).__init__(*args, **kwargs) + self.reply_proxy = None + + # TODO(comstud): Timeout connections not used in a while + def create(self): + LOG.debug('Pool creating new connection') + return self.connection_cls(self.conf) + + def empty(self): + while self.free_items: + self.get().close() + # Force a new connection pool to be created. + # Note that this was added due to failing unit test cases. The issue + # is the above "while loop" gets all the cached connections from the + # pool and closes them, but never returns them to the pool, a pool + # leak. The unit tests hang waiting for an item to be returned to the + # pool. The unit tests get here via the tearDown() method. In the run + # time code, it gets here via cleanup() and only appears in service.py + # just before doing a sys.exit(), so cleanup() only happens once and + # the leakage is not a problem. + self.connection_cls.pool = None + + +_pool_create_sem = semaphore.Semaphore() + + +def get_connection_pool(conf, connection_cls): + with _pool_create_sem: + # Make sure only one thread tries to create the connection pool. + if not connection_cls.pool: + connection_cls.pool = Pool(conf, connection_cls) + return connection_cls.pool + + +class ConnectionContext(rpc_common.Connection): + """The class that is actually returned to the create_connection() caller. + + This is essentially a wrapper around Connection that supports 'with'. + It can also return a new Connection, or one from a pool. + + The function will also catch when an instance of this class is to be + deleted. With that we can return Connections to the pool on exceptions + and so forth without making the caller be responsible for catching them. + If possible the function makes sure to return a connection to the pool. + """ + + def __init__(self, conf, connection_pool, pooled=True, server_params=None): + """Create a new connection, or get one from the pool.""" + self.connection = None + self.conf = conf + self.connection_pool = connection_pool + if pooled: + self.connection = connection_pool.get() + else: + self.connection = connection_pool.connection_cls( + conf, + server_params=server_params) + self.pooled = pooled + + def __enter__(self): + """When with ConnectionContext() is used, return self.""" + return self + + def _done(self): + """If the connection came from a pool, clean it up and put it back. + If it did not come from a pool, close it. + """ + if self.connection: + if self.pooled: + # Reset the connection so it's ready for the next caller + # to grab from the pool + self.connection.reset() + self.connection_pool.put(self.connection) + else: + try: + self.connection.close() + except Exception: + pass + self.connection = None + + def __exit__(self, exc_type, exc_value, tb): + """End of 'with' statement. We're done here.""" + self._done() + + def __del__(self): + """Caller is done with this connection. Make sure we cleaned up.""" + self._done() + + def close(self): + """Caller is done with this connection.""" + self._done() + + def create_consumer(self, topic, proxy, fanout=False): + self.connection.create_consumer(topic, proxy, fanout) + + def create_worker(self, topic, proxy, pool_name): + self.connection.create_worker(topic, proxy, pool_name) + + def join_consumer_pool(self, callback, pool_name, topic, exchange_name, + ack_on_error=True): + self.connection.join_consumer_pool(callback, + pool_name, + topic, + exchange_name, + ack_on_error) + + def consume_in_thread(self): + return self.connection.consume_in_thread() + + def __getattr__(self, key): + """Proxy all other calls to the Connection instance.""" + if self.connection: + return getattr(self.connection, key) + else: + raise rpc_common.InvalidRPCConnectionReuse() + + +class ReplyProxy(ConnectionContext): + """Connection class for RPC replies / callbacks.""" + def __init__(self, conf, connection_pool): + self._call_waiters = {} + self._num_call_waiters = 0 + self._num_call_waiters_wrn_threshold = 10 + self._reply_q = 'reply_' + uuid.uuid4().hex + super(ReplyProxy, self).__init__(conf, connection_pool, pooled=False) + self.declare_direct_consumer(self._reply_q, self._process_data) + self.consume_in_thread() + + def _process_data(self, message_data): + msg_id = message_data.pop('_msg_id', None) + waiter = self._call_waiters.get(msg_id) + if not waiter: + LOG.warn(_('No calling threads waiting for msg_id : %(msg_id)s' + ', message : %(data)s'), {'msg_id': msg_id, + 'data': message_data}) + LOG.warn(_('_call_waiters: %s') % self._call_waiters) + else: + waiter.put(message_data) + + def add_call_waiter(self, waiter, msg_id): + self._num_call_waiters += 1 + if self._num_call_waiters > self._num_call_waiters_wrn_threshold: + LOG.warn(_('Number of call waiters is greater than warning ' + 'threshold: %d. There could be a MulticallProxyWaiter ' + 'leak.') % self._num_call_waiters_wrn_threshold) + self._num_call_waiters_wrn_threshold *= 2 + self._call_waiters[msg_id] = waiter + + def del_call_waiter(self, msg_id): + self._num_call_waiters -= 1 + del self._call_waiters[msg_id] + + def get_reply_q(self): + return self._reply_q + + +def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None, + failure=None, ending=False, log_failure=True): + """Sends a reply or an error on the channel signified by msg_id. + + Failure should be a sys.exc_info() tuple. + + """ + with ConnectionContext(conf, connection_pool) as conn: + if failure: + failure = rpc_common.serialize_remote_exception(failure, + log_failure) + + msg = {'result': reply, 'failure': failure} + if ending: + msg['ending'] = True + _add_unique_id(msg) + # If a reply_q exists, add the msg_id to the reply and pass the + # reply_q to direct_send() to use it as the response queue. + # Otherwise use the msg_id for backward compatibility. + if reply_q: + msg['_msg_id'] = msg_id + conn.direct_send(reply_q, rpc_common.serialize_msg(msg)) + else: + conn.direct_send(msg_id, rpc_common.serialize_msg(msg)) + + +class RpcContext(rpc_common.CommonRpcContext): + """Context that supports replying to a rpc.call.""" + def __init__(self, **kwargs): + self.msg_id = kwargs.pop('msg_id', None) + self.reply_q = kwargs.pop('reply_q', None) + self.conf = kwargs.pop('conf') + super(RpcContext, self).__init__(**kwargs) + + def deepcopy(self): + values = self.to_dict() + values['conf'] = self.conf + values['msg_id'] = self.msg_id + values['reply_q'] = self.reply_q + return self.__class__(**values) + + def reply(self, reply=None, failure=None, ending=False, + connection_pool=None, log_failure=True): + if self.msg_id: + msg_reply(self.conf, self.msg_id, self.reply_q, connection_pool, + reply, failure, ending, log_failure) + if ending: + self.msg_id = None + + +def unpack_context(conf, msg): + """Unpack context from msg.""" + context_dict = {} + for key in list(msg.keys()): + # NOTE(vish): Some versions of python don't like unicode keys + # in kwargs. + key = str(key) + if key.startswith('_context_'): + value = msg.pop(key) + context_dict[key[9:]] = value + context_dict['msg_id'] = msg.pop('_msg_id', None) + context_dict['reply_q'] = msg.pop('_reply_q', None) + context_dict['conf'] = conf + ctx = RpcContext.from_dict(context_dict) + rpc_common._safe_log(LOG.debug, 'unpacked context: %s', ctx.to_dict()) + return ctx + + +def pack_context(msg, context): + """Pack context into msg. + + Values for message keys need to be less than 255 chars, so we pull + context out into a bunch of separate keys. If we want to support + more arguments in rabbit messages, we may want to do the same + for args at some point. + + """ + if isinstance(context, dict): + context_d = dict([('_context_%s' % key, value) + for (key, value) in six.iteritems(context)]) + else: + context_d = dict([('_context_%s' % key, value) + for (key, value) in + six.iteritems(context.to_dict())]) + + msg.update(context_d) + + +class _MsgIdCache(object): + """This class checks any duplicate messages.""" + + # NOTE: This value is considered can be a configuration item, but + # it is not necessary to change its value in most cases, + # so let this value as static for now. + DUP_MSG_CHECK_SIZE = 16 + + def __init__(self, **kwargs): + self.prev_msgids = collections.deque([], + maxlen=self.DUP_MSG_CHECK_SIZE) + + def check_duplicate_message(self, message_data): + """AMQP consumers may read same message twice when exceptions occur + before ack is returned. This method prevents doing it. + """ + if UNIQUE_ID in message_data: + msg_id = message_data[UNIQUE_ID] + if msg_id not in self.prev_msgids: + self.prev_msgids.append(msg_id) + else: + raise rpc_common.DuplicateMessageError(msg_id=msg_id) + + +def _add_unique_id(msg): + """Add unique_id for checking duplicate messages.""" + unique_id = uuid.uuid4().hex + msg.update({UNIQUE_ID: unique_id}) + LOG.debug('UNIQUE_ID is %s.' % (unique_id)) + + +class _ThreadPoolWithWait(object): + """Base class for a delayed invocation manager. + + Used by the Connection class to start up green threads + to handle incoming messages. + """ + + def __init__(self, conf, connection_pool): + self.pool = greenpool.GreenPool(conf.rpc_thread_pool_size) + self.connection_pool = connection_pool + self.conf = conf + + def wait(self): + """Wait for all callback threads to exit.""" + self.pool.waitall() + + +class CallbackWrapper(_ThreadPoolWithWait): + """Wraps a straight callback. + + Allows it to be invoked in a green thread. + """ + + def __init__(self, conf, callback, connection_pool, + wait_for_consumers=False): + """Initiates CallbackWrapper object. + + :param conf: cfg.CONF instance + :param callback: a callable (probably a function) + :param connection_pool: connection pool as returned by + get_connection_pool() + :param wait_for_consumers: wait for all green threads to + complete and raise the last + caught exception, if any. + + """ + super(CallbackWrapper, self).__init__( + conf=conf, + connection_pool=connection_pool, + ) + self.callback = callback + self.wait_for_consumers = wait_for_consumers + self.exc_info = None + + def _wrap(self, message_data, **kwargs): + """Wrap the callback invocation to catch exceptions. + """ + try: + self.callback(message_data, **kwargs) + except Exception: + self.exc_info = sys.exc_info() + + def __call__(self, message_data): + self.exc_info = None + self.pool.spawn_n(self._wrap, message_data) + + if self.wait_for_consumers: + self.pool.waitall() + if self.exc_info: + six.reraise(self.exc_info[1], None, self.exc_info[2]) + + +class ProxyCallback(_ThreadPoolWithWait): + """Calls methods on a proxy object based on method and args.""" + + def __init__(self, conf, proxy, connection_pool): + super(ProxyCallback, self).__init__( + conf=conf, + connection_pool=connection_pool, + ) + self.proxy = proxy + self.msg_id_cache = _MsgIdCache() + + def __call__(self, message_data): + """Consumer callback to call a method on a proxy object. + + Parses the message for validity and fires off a thread to call the + proxy object method. + + Message data should be a dictionary with two keys: + method: string representing the method to call + args: dictionary of arg: value + + Example: {'method': 'echo', 'args': {'value': 42}} + + """ + # It is important to clear the context here, because at this point + # the previous context is stored in local.store.context + if hasattr(local.store, 'context'): + del local.store.context + rpc_common._safe_log(LOG.debug, 'received %s', message_data) + self.msg_id_cache.check_duplicate_message(message_data) + ctxt = unpack_context(self.conf, message_data) + method = message_data.get('method') + args = message_data.get('args', {}) + version = message_data.get('version') + namespace = message_data.get('namespace') + if not method: + LOG.warn(_('no method for message: %s') % message_data) + ctxt.reply(_('No method for message: %s') % message_data, + connection_pool=self.connection_pool) + return + self.pool.spawn_n(self._process_data, ctxt, version, method, + namespace, args) + + def _process_data(self, ctxt, version, method, namespace, args): + """Process a message in a new thread. + + If the proxy object we have has a dispatch method + (see rpc.dispatcher.RpcDispatcher), pass it the version, + method, and args and let it dispatch as appropriate. If not, use + the old behavior of magically calling the specified method on the + proxy we have here. + """ + ctxt.update_store() + try: + rval = self.proxy.dispatch(ctxt, version, method, namespace, + **args) + # Check if the result was a generator + if inspect.isgenerator(rval): + for x in rval: + ctxt.reply(x, None, connection_pool=self.connection_pool) + else: + ctxt.reply(rval, None, connection_pool=self.connection_pool) + # This final None tells multicall that it is done. + ctxt.reply(ending=True, connection_pool=self.connection_pool) + except rpc_common.ClientException as e: + LOG.debug('Expected exception during message handling (%s)' % + e._exc_info[1]) + ctxt.reply(None, e._exc_info, + connection_pool=self.connection_pool, + log_failure=False) + except Exception: + # sys.exc_info() is deleted by LOG.exception(). + exc_info = sys.exc_info() + LOG.error(_LE('Exception during message handling'), + exc_info=exc_info) + ctxt.reply(None, exc_info, connection_pool=self.connection_pool) + + +class MulticallProxyWaiter(object): + def __init__(self, conf, msg_id, timeout, connection_pool): + self._msg_id = msg_id + self._timeout = timeout or conf.rpc_response_timeout + self._reply_proxy = connection_pool.reply_proxy + self._done = False + self._got_ending = False + self._conf = conf + self._dataqueue = queue.LightQueue() + # Add this caller to the reply proxy's call_waiters + self._reply_proxy.add_call_waiter(self, self._msg_id) + self.msg_id_cache = _MsgIdCache() + + def put(self, data): + self._dataqueue.put(data) + + def done(self): + if self._done: + return + self._done = True + # Remove this caller from reply proxy's call_waiters + self._reply_proxy.del_call_waiter(self._msg_id) + + def _process_data(self, data): + result = None + self.msg_id_cache.check_duplicate_message(data) + if data['failure']: + failure = data['failure'] + result = rpc_common.deserialize_remote_exception(self._conf, + failure) + elif data.get('ending', False): + self._got_ending = True + else: + result = data['result'] + return result + + def __iter__(self): + """Return a result until we get a reply with an 'ending' flag.""" + if self._done: + raise StopIteration + while True: + try: + data = self._dataqueue.get(timeout=self._timeout) + result = self._process_data(data) + except queue.Empty: + self.done() + raise rpc_common.Timeout() + except Exception: + with excutils.save_and_reraise_exception(): + self.done() + if self._got_ending: + self.done() + raise StopIteration + if isinstance(result, Exception): + self.done() + raise result + yield result + + +def create_connection(conf, new, connection_pool): + """Create a connection.""" + return ConnectionContext(conf, connection_pool, pooled=not new) + + +_reply_proxy_create_sem = semaphore.Semaphore() + + +def multicall(conf, context, topic, msg, timeout, connection_pool): + """Make a call that returns multiple times.""" + LOG.debug('Making synchronous call on %s ...', topic) + msg_id = uuid.uuid4().hex + msg.update({'_msg_id': msg_id}) + LOG.debug('MSG_ID is %s' % (msg_id)) + _add_unique_id(msg) + pack_context(msg, context) + + with _reply_proxy_create_sem: + if not connection_pool.reply_proxy: + connection_pool.reply_proxy = ReplyProxy(conf, connection_pool) + msg.update({'_reply_q': connection_pool.reply_proxy.get_reply_q()}) + wait_msg = MulticallProxyWaiter(conf, msg_id, timeout, connection_pool) + with ConnectionContext(conf, connection_pool) as conn: + conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout) + return wait_msg + + +def call(conf, context, topic, msg, timeout, connection_pool): + """Sends a message on a topic and wait for a response.""" + rv = multicall(conf, context, topic, msg, timeout, connection_pool) + # NOTE(vish): return the last result from the multicall + rv = list(rv) + if not rv: + return + return rv[-1] + + +def cast(conf, context, topic, msg, connection_pool): + """Sends a message on a topic without waiting for a response.""" + LOG.debug('Making asynchronous cast on %s...', topic) + _add_unique_id(msg) + pack_context(msg, context) + with ConnectionContext(conf, connection_pool) as conn: + conn.topic_send(topic, rpc_common.serialize_msg(msg)) + + +def fanout_cast(conf, context, topic, msg, connection_pool): + """Sends a message on a fanout exchange without waiting for a response.""" + LOG.debug('Making asynchronous fanout cast...') + _add_unique_id(msg) + pack_context(msg, context) + with ConnectionContext(conf, connection_pool) as conn: + conn.fanout_send(topic, rpc_common.serialize_msg(msg)) + + +def cast_to_server(conf, context, server_params, topic, msg, connection_pool): + """Sends a message on a topic to a specific server.""" + _add_unique_id(msg) + pack_context(msg, context) + with ConnectionContext(conf, connection_pool, pooled=False, + server_params=server_params) as conn: + conn.topic_send(topic, rpc_common.serialize_msg(msg)) + + +def fanout_cast_to_server(conf, context, server_params, topic, msg, + connection_pool): + """Sends a message on a fanout exchange to a specific server.""" + _add_unique_id(msg) + pack_context(msg, context) + with ConnectionContext(conf, connection_pool, pooled=False, + server_params=server_params) as conn: + conn.fanout_send(topic, rpc_common.serialize_msg(msg)) + + +def notify(conf, context, topic, msg, connection_pool, envelope): + """Sends a notification event on a topic.""" + LOG.debug('Sending %(event_type)s on %(topic)s', + dict(event_type=msg.get('event_type'), + topic=topic)) + _add_unique_id(msg) + pack_context(msg, context) + with ConnectionContext(conf, connection_pool) as conn: + if envelope: + msg = rpc_common.serialize_msg(msg) + conn.notify_send(topic, msg) + + +def cleanup(connection_pool): + if connection_pool: + connection_pool.empty() + + +def get_control_exchange(conf): + return conf.control_exchange diff --git a/oslo/log/openstack/common/rpc/common.py b/oslo/log/openstack/common/rpc/common.py new file mode 100644 index 0000000..f861558 --- /dev/null +++ b/oslo/log/openstack/common/rpc/common.py @@ -0,0 +1,508 @@ +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# Copyright 2011 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy +import sys +import traceback + +from oslo.config import cfg +import six + +from oslo.log.openstack.common.gettextutils import _, _LE +from oslo.log.openstack.common import importutils +from oslo.log.openstack.common import jsonutils +from oslo.log.openstack.common import local +from oslo.log.openstack.common import log as logging +from oslo.log.openstack.common import versionutils + + +CONF = cfg.CONF +LOG = logging.getLogger(__name__) + + +_RPC_ENVELOPE_VERSION = '2.0' +'''RPC Envelope Version. + +This version number applies to the top level structure of messages sent out. +It does *not* apply to the message payload, which must be versioned +independently. For example, when using rpc APIs, a version number is applied +for changes to the API being exposed over rpc. This version number is handled +in the rpc proxy and dispatcher modules. + +This version number applies to the message envelope that is used in the +serialization done inside the rpc layer. See serialize_msg() and +deserialize_msg(). + +The current message format (version 2.0) is very simple. It is:: + + { + 'oslo.version': <RPC Envelope Version as a String>, + 'oslo.message': <Application Message Payload, JSON encoded> + } + +Message format version '1.0' is just considered to be the messages we sent +without a message envelope. + +So, the current message envelope just includes the envelope version. It may +eventually contain additional information, such as a signature for the message +payload. + +We will JSON encode the application message payload. The message envelope, +which includes the JSON encoded application message body, will be passed down +to the messaging libraries as a dict. +''' + +_VERSION_KEY = 'oslo.version' +_MESSAGE_KEY = 'oslo.message' + +_REMOTE_POSTFIX = '_Remote' + + +class RPCException(Exception): + msg_fmt = _("An unknown RPC related exception occurred.") + + def __init__(self, message=None, **kwargs): + self.kwargs = kwargs + + if not message: + try: + message = self.msg_fmt % kwargs + + except Exception: + # kwargs doesn't match a variable in the message + # log the issue and the kwargs + LOG.exception(_LE('Exception in string format operation')) + for name, value in six.iteritems(kwargs): + LOG.error("%s: %s" % (name, value)) + # at least get the core message out if something happened + message = self.msg_fmt + + super(RPCException, self).__init__(message) + + +class RemoteError(RPCException): + """Signifies that a remote class has raised an exception. + + Contains a string representation of the type of the original exception, + the value of the original exception, and the traceback. These are + sent to the parent as a joined string so printing the exception + contains all of the relevant info. + + """ + msg_fmt = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.") + + def __init__(self, exc_type=None, value=None, traceback=None): + self.exc_type = exc_type + self.value = value + self.traceback = traceback + super(RemoteError, self).__init__(exc_type=exc_type, + value=value, + traceback=traceback) + + +class Timeout(RPCException): + """Signifies that a timeout has occurred. + + This exception is raised if the rpc_response_timeout is reached while + waiting for a response from the remote side. + """ + msg_fmt = _('Timeout while waiting on RPC response - ' + 'topic: "%(topic)s", RPC method: "%(method)s" ' + 'info: "%(info)s"') + + def __init__(self, info=None, topic=None, method=None): + """Initiates Timeout object. + + :param info: Extra info to convey to the user + :param topic: The topic that the rpc call was sent to + :param rpc_method_name: The name of the rpc method being + called + """ + self.info = info + self.topic = topic + self.method = method + super(Timeout, self).__init__( + None, + info=info or _('<unknown>'), + topic=topic or _('<unknown>'), + method=method or _('<unknown>')) + + +class DuplicateMessageError(RPCException): + msg_fmt = _("Found duplicate message(%(msg_id)s). Skipping it.") + + +class InvalidRPCConnectionReuse(RPCException): + msg_fmt = _("Invalid reuse of an RPC connection.") + + +class UnsupportedRpcVersion(RPCException): + msg_fmt = _("Specified RPC version, %(version)s, not supported by " + "this endpoint.") + + +class UnsupportedRpcEnvelopeVersion(RPCException): + msg_fmt = _("Specified RPC envelope version, %(version)s, " + "not supported by this endpoint.") + + +class RpcVersionCapError(RPCException): + msg_fmt = _("Specified RPC version cap, %(version_cap)s, is too low") + + +class Connection(object): + """A connection, returned by rpc.create_connection(). + + This class represents a connection to the message bus used for rpc. + An instance of this class should never be created by users of the rpc API. + Use rpc.create_connection() instead. + """ + def close(self): + """Close the connection. + + This method must be called when the connection will no longer be used. + It will ensure that any resources associated with the connection, such + as a network connection, and cleaned up. + """ + raise NotImplementedError() + + def create_consumer(self, topic, proxy, fanout=False): + """Create a consumer on this connection. + + A consumer is associated with a message queue on the backend message + bus. The consumer will read messages from the queue, unpack them, and + dispatch them to the proxy object. The contents of the message pulled + off of the queue will determine which method gets called on the proxy + object. + + :param topic: This is a name associated with what to consume from. + Multiple instances of a service may consume from the same + topic. For example, all instances of nova-compute consume + from a queue called "compute". In that case, the + messages will get distributed amongst the consumers in a + round-robin fashion if fanout=False. If fanout=True, + every consumer associated with this topic will get a + copy of every message. + :param proxy: The object that will handle all incoming messages. + :param fanout: Whether or not this is a fanout topic. See the + documentation for the topic parameter for some + additional comments on this. + """ + raise NotImplementedError() + + def create_worker(self, topic, proxy, pool_name): + """Create a worker on this connection. + + A worker is like a regular consumer of messages directed to a + topic, except that it is part of a set of such consumers (the + "pool") which may run in parallel. Every pool of workers will + receive a given message, but only one worker in the pool will + be asked to process it. Load is distributed across the members + of the pool in round-robin fashion. + + :param topic: This is a name associated with what to consume from. + Multiple instances of a service may consume from the same + topic. + :param proxy: The object that will handle all incoming messages. + :param pool_name: String containing the name of the pool of workers + """ + raise NotImplementedError() + + def join_consumer_pool(self, callback, pool_name, topic, exchange_name): + """Register as a member of a group of consumers. + + Uses given topic from the specified exchange. + Exactly one member of a given pool will receive each message. + + A message will be delivered to multiple pools, if more than + one is created. + + :param callback: Callable to be invoked for each message. + :type callback: callable accepting one argument + :param pool_name: The name of the consumer pool. + :type pool_name: str + :param topic: The routing topic for desired messages. + :type topic: str + :param exchange_name: The name of the message exchange where + the client should attach. Defaults to + the configured exchange. + :type exchange_name: str + """ + raise NotImplementedError() + + def consume_in_thread(self): + """Spawn a thread to handle incoming messages. + + Spawn a thread that will be responsible for handling all incoming + messages for consumers that were set up on this connection. + + Message dispatching inside of this is expected to be implemented in a + non-blocking manner. An example implementation would be having this + thread pull messages in for all of the consumers, but utilize a thread + pool for dispatching the messages to the proxy objects. + """ + raise NotImplementedError() + + +def _safe_log(log_func, msg, msg_data): + """Sanitizes the msg_data field before logging.""" + SANITIZE = ['_context_auth_token', 'auth_token', 'new_pass'] + + def _fix_passwords(d): + """Sanitizes the password fields in the dictionary.""" + for k in six.iterkeys(d): + if k.lower().find('password') != -1: + d[k] = '<SANITIZED>' + elif k.lower() in SANITIZE: + d[k] = '<SANITIZED>' + elif isinstance(d[k], list): + for e in d[k]: + if isinstance(e, dict): + _fix_passwords(e) + elif isinstance(d[k], dict): + _fix_passwords(d[k]) + return d + + return log_func(msg, _fix_passwords(copy.deepcopy(msg_data))) + + +def serialize_remote_exception(failure_info, log_failure=True): + """Prepares exception data to be sent over rpc. + + Failure_info should be a sys.exc_info() tuple. + + """ + tb = traceback.format_exception(*failure_info) + failure = failure_info[1] + if log_failure: + LOG.error(_LE("Returning exception %s to caller"), + six.text_type(failure)) + LOG.error(tb) + + kwargs = {} + if hasattr(failure, 'kwargs'): + kwargs = failure.kwargs + + # NOTE(matiu): With cells, it's possible to re-raise remote, remote + # exceptions. Lets turn it back into the original exception type. + cls_name = str(failure.__class__.__name__) + mod_name = str(failure.__class__.__module__) + if (cls_name.endswith(_REMOTE_POSTFIX) and + mod_name.endswith(_REMOTE_POSTFIX)): + cls_name = cls_name[:-len(_REMOTE_POSTFIX)] + mod_name = mod_name[:-len(_REMOTE_POSTFIX)] + + data = { + 'class': cls_name, + 'module': mod_name, + 'message': six.text_type(failure), + 'tb': tb, + 'args': failure.args, + 'kwargs': kwargs + } + + json_data = jsonutils.dumps(data) + + return json_data + + +def deserialize_remote_exception(conf, data): + failure = jsonutils.loads(str(data)) + + trace = failure.get('tb', []) + message = failure.get('message', "") + "\n" + "\n".join(trace) + name = failure.get('class') + module = failure.get('module') + + # NOTE(ameade): We DO NOT want to allow just any module to be imported, in + # order to prevent arbitrary code execution. + if module not in conf.allowed_rpc_exception_modules: + return RemoteError(name, failure.get('message'), trace) + + try: + mod = importutils.import_module(module) + klass = getattr(mod, name) + if not issubclass(klass, Exception): + raise TypeError("Can only deserialize Exceptions") + + failure = klass(*failure.get('args', []), **failure.get('kwargs', {})) + except (AttributeError, TypeError, ImportError): + return RemoteError(name, failure.get('message'), trace) + + ex_type = type(failure) + str_override = lambda self: message + new_ex_type = type(ex_type.__name__ + _REMOTE_POSTFIX, (ex_type,), + {'__str__': str_override, '__unicode__': str_override}) + new_ex_type.__module__ = '%s%s' % (module, _REMOTE_POSTFIX) + try: + # NOTE(ameade): Dynamically create a new exception type and swap it in + # as the new type for the exception. This only works on user defined + # Exceptions and not core python exceptions. This is important because + # we cannot necessarily change an exception message so we must override + # the __str__ method. + failure.__class__ = new_ex_type + except TypeError: + # NOTE(ameade): If a core exception then just add the traceback to the + # first exception argument. + failure.args = (message,) + failure.args[1:] + return failure + + +class CommonRpcContext(object): + def __init__(self, **kwargs): + self.values = kwargs + + def __getattr__(self, key): + try: + return self.values[key] + except KeyError: + raise AttributeError(key) + + def to_dict(self): + return copy.deepcopy(self.values) + + @classmethod + def from_dict(cls, values): + return cls(**values) + + def deepcopy(self): + return self.from_dict(self.to_dict()) + + def update_store(self): + local.store.context = self + + def elevated(self, read_deleted=None, overwrite=False): + """Return a version of this context with admin flag set.""" + # TODO(russellb) This method is a bit of a nova-ism. It makes + # some assumptions about the data in the request context sent + # across rpc, while the rest of this class does not. We could get + # rid of this if we changed the nova code that uses this to + # convert the RpcContext back to its native RequestContext doing + # something like nova.context.RequestContext.from_dict(ctxt.to_dict()) + + context = self.deepcopy() + context.values['is_admin'] = True + + context.values.setdefault('roles', []) + + if 'admin' not in context.values['roles']: + context.values['roles'].append('admin') + + if read_deleted is not None: + context.values['read_deleted'] = read_deleted + + return context + + +class ClientException(Exception): + """Encapsulates actual exception expected to be hit by a RPC proxy object. + + Merely instantiating it records the current exception information, which + will be passed back to the RPC client without exceptional logging. + """ + def __init__(self): + self._exc_info = sys.exc_info() + + +def catch_client_exception(exceptions, func, *args, **kwargs): + try: + return func(*args, **kwargs) + except Exception as e: + if type(e) in exceptions: + raise ClientException() + else: + raise + + +def client_exceptions(*exceptions): + """Decorator for manager methods that raise expected exceptions. + + Marking a Manager method with this decorator allows the declaration + of expected exceptions that the RPC layer should not consider fatal, + and not log as if they were generated in a real error scenario. Note + that this will cause listed exceptions to be wrapped in a + ClientException, which is used internally by the RPC layer. + """ + def outer(func): + def inner(*args, **kwargs): + return catch_client_exception(exceptions, func, *args, **kwargs) + return inner + return outer + + +# TODO(sirp): we should deprecate this in favor of +# using `versionutils.is_compatible` directly +def version_is_compatible(imp_version, version): + """Determine whether versions are compatible. + + :param imp_version: The version implemented + :param version: The version requested by an incoming message. + """ + return versionutils.is_compatible(version, imp_version) + + +def serialize_msg(raw_msg): + # NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more + # information about this format. + msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION, + _MESSAGE_KEY: jsonutils.dumps(raw_msg)} + + return msg + + +def deserialize_msg(msg): + # NOTE(russellb): Hang on to your hats, this road is about to + # get a little bumpy. + # + # Robustness Principle: + # "Be strict in what you send, liberal in what you accept." + # + # At this point we have to do a bit of guessing about what it + # is we just received. Here is the set of possibilities: + # + # 1) We received a dict. This could be 2 things: + # + # a) Inspect it to see if it looks like a standard message envelope. + # If so, great! + # + # b) If it doesn't look like a standard message envelope, it could either + # be a notification, or a message from before we added a message + # envelope (referred to as version 1.0). + # Just return the message as-is. + # + # 2) It's any other non-dict type. Just return it and hope for the best. + # This case covers return values from rpc.call() from before message + # envelopes were used. (messages to call a method were always a dict) + + if not isinstance(msg, dict): + # See #2 above. + return msg + + base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY) + if not all(map(lambda key: key in msg, base_envelope_keys)): + # See #1.b above. + return msg + + # At this point we think we have the message envelope + # format we were expecting. (#1.a above) + + if not version_is_compatible(_RPC_ENVELOPE_VERSION, msg[_VERSION_KEY]): + raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY]) + + raw_msg = jsonutils.loads(msg[_MESSAGE_KEY]) + + return raw_msg diff --git a/oslo/log/openstack/common/rpc/dispatcher.py b/oslo/log/openstack/common/rpc/dispatcher.py new file mode 100644 index 0000000..3851c7a --- /dev/null +++ b/oslo/log/openstack/common/rpc/dispatcher.py @@ -0,0 +1,178 @@ +# Copyright 2012 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Code for rpc message dispatching. + +Messages that come in have a version number associated with them. RPC API +version numbers are in the form: + + Major.Minor + +For a given message with version X.Y, the receiver must be marked as able to +handle messages of version A.B, where: + + A = X + + B >= Y + +The Major version number would be incremented for an almost completely new API. +The Minor version number would be incremented for backwards compatible changes +to an existing API. A backwards compatible change could be something like +adding a new method, adding an argument to an existing method (but not +requiring it), or changing the type for an existing argument (but still +handling the old type as well). + +The conversion over to a versioned API must be done on both the client side and +server side of the API at the same time. However, as the code stands today, +there can be both versioned and unversioned APIs implemented in the same code +base. + +EXAMPLES +======== + +Nova was the first project to use versioned rpc APIs. Consider the compute rpc +API as an example. The client side is in nova/compute/rpcapi.py and the server +side is in nova/compute/manager.py. + + +Example 1) Adding a new method. +------------------------------- + +Adding a new method is a backwards compatible change. It should be added to +nova/compute/manager.py, and RPC_API_VERSION should be bumped from X.Y to +X.Y+1. On the client side, the new method in nova/compute/rpcapi.py should +have a specific version specified to indicate the minimum API version that must +be implemented for the method to be supported. For example:: + + def get_host_uptime(self, ctxt, host): + topic = _compute_topic(self.topic, ctxt, host, None) + return self.call(ctxt, self.make_msg('get_host_uptime'), topic, + version='1.1') + +In this case, version '1.1' is the first version that supported the +get_host_uptime() method. + + +Example 2) Adding a new parameter. +---------------------------------- + +Adding a new parameter to an rpc method can be made backwards compatible. The +RPC_API_VERSION on the server side (nova/compute/manager.py) should be bumped. +The implementation of the method must not expect the parameter to be present.:: + + def some_remote_method(self, arg1, arg2, newarg=None): + # The code needs to deal with newarg=None for cases + # where an older client sends a message without it. + pass + +On the client side, the same changes should be made as in example 1. The +minimum version that supports the new parameter should be specified. +""" + +import six + +from oslo.log.openstack.common.rpc import common as rpc_common +from oslo.log.openstack.common.rpc import serializer as rpc_serializer + + +class RpcDispatcher(object): + """Dispatch rpc messages according to the requested API version. + + This class can be used as the top level 'manager' for a service. It + contains a list of underlying managers that have an API_VERSION attribute. + """ + + def __init__(self, callbacks, serializer=None): + """Initialize the rpc dispatcher. + + :param callbacks: List of proxy objects that are an instance + of a class with rpc methods exposed. Each proxy + object should have an RPC_API_VERSION attribute. + :param serializer: The Serializer object that will be used to + deserialize arguments before the method call and + to serialize the result after it returns. + """ + self.callbacks = callbacks + if serializer is None: + serializer = rpc_serializer.NoOpSerializer() + self.serializer = serializer + super(RpcDispatcher, self).__init__() + + def _deserialize_args(self, context, kwargs): + """Helper method called to deserialize args before dispatch. + + This calls our serializer on each argument, returning a new set of + args that have been deserialized. + + :param context: The request context + :param kwargs: The arguments to be deserialized + :returns: A new set of deserialized args + """ + new_kwargs = dict() + for argname, arg in six.iteritems(kwargs): + new_kwargs[argname] = self.serializer.deserialize_entity(context, + arg) + return new_kwargs + + def dispatch(self, ctxt, version, method, namespace, **kwargs): + """Dispatch a message based on a requested version. + + :param ctxt: The request context + :param version: The requested API version from the incoming message + :param method: The method requested to be called by the incoming + message. + :param namespace: The namespace for the requested method. If None, + the dispatcher will look for a method on a callback + object with no namespace set. + :param kwargs: A dict of keyword arguments to be passed to the method. + + :returns: Whatever is returned by the underlying method that gets + called. + """ + if not version: + version = '1.0' + + had_compatible = False + for proxyobj in self.callbacks: + # Check for namespace compatibility + try: + cb_namespace = proxyobj.RPC_API_NAMESPACE + except AttributeError: + cb_namespace = None + + if namespace != cb_namespace: + continue + + # Check for version compatibility + try: + rpc_api_version = proxyobj.RPC_API_VERSION + except AttributeError: + rpc_api_version = '1.0' + + is_compatible = rpc_common.version_is_compatible(rpc_api_version, + version) + had_compatible = had_compatible or is_compatible + + if not hasattr(proxyobj, method): + continue + if is_compatible: + kwargs = self._deserialize_args(ctxt, kwargs) + result = getattr(proxyobj, method)(ctxt, **kwargs) + return self.serializer.serialize_entity(ctxt, result) + + if had_compatible: + raise AttributeError("No such RPC function '%s'" % method) + else: + raise rpc_common.UnsupportedRpcVersion(version=version) diff --git a/oslo/log/openstack/common/rpc/impl_fake.py b/oslo/log/openstack/common/rpc/impl_fake.py new file mode 100644 index 0000000..fa29f10 --- /dev/null +++ b/oslo/log/openstack/common/rpc/impl_fake.py @@ -0,0 +1,195 @@ +# Copyright 2011 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Fake RPC implementation which calls proxy methods directly with no +queues. Casts will block, but this is very useful for tests. +""" + +import inspect +# NOTE(russellb): We specifically want to use json, not our own jsonutils. +# jsonutils has some extra logic to automatically convert objects to primitive +# types so that they can be serialized. We want to catch all cases where +# non-primitive types make it into this code and treat it as an error. +import json +import time + +import eventlet +import six + +from oslo.log.openstack.common.rpc import common as rpc_common + +CONSUMERS = {} + + +class RpcContext(rpc_common.CommonRpcContext): + def __init__(self, **kwargs): + super(RpcContext, self).__init__(**kwargs) + self._response = [] + self._done = False + + def deepcopy(self): + values = self.to_dict() + new_inst = self.__class__(**values) + new_inst._response = self._response + new_inst._done = self._done + return new_inst + + def reply(self, reply=None, failure=None, ending=False): + if ending: + self._done = True + if not self._done: + self._response.append((reply, failure)) + + +class Consumer(object): + def __init__(self, topic, proxy): + self.topic = topic + self.proxy = proxy + + def call(self, context, version, method, namespace, args, timeout): + done = eventlet.event.Event() + + def _inner(): + ctxt = RpcContext.from_dict(context.to_dict()) + try: + rval = self.proxy.dispatch(context, version, method, + namespace, **args) + res = [] + # Caller might have called ctxt.reply() manually + for (reply, failure) in ctxt._response: + if failure: + six.reraise(failure[0], failure[1], failure[2]) + res.append(reply) + # if ending not 'sent'...we might have more data to + # return from the function itself + if not ctxt._done: + if inspect.isgenerator(rval): + for val in rval: + res.append(val) + else: + res.append(rval) + done.send(res) + except rpc_common.ClientException as e: + done.send_exception(e._exc_info[1]) + except Exception as e: + done.send_exception(e) + + thread = eventlet.greenthread.spawn(_inner) + + if timeout: + start_time = time.time() + while not done.ready(): + eventlet.greenthread.sleep(1) + cur_time = time.time() + if (cur_time - start_time) > timeout: + thread.kill() + raise rpc_common.Timeout() + + return done.wait() + + +class Connection(object): + """Connection object.""" + + def __init__(self): + self.consumers = [] + + def create_consumer(self, topic, proxy, fanout=False): + consumer = Consumer(topic, proxy) + self.consumers.append(consumer) + if topic not in CONSUMERS: + CONSUMERS[topic] = [] + CONSUMERS[topic].append(consumer) + + def close(self): + for consumer in self.consumers: + CONSUMERS[consumer.topic].remove(consumer) + self.consumers = [] + + def consume_in_thread(self): + pass + + +def create_connection(conf, new=True): + """Create a connection.""" + return Connection() + + +def check_serialize(msg): + """Make sure a message intended for rpc can be serialized.""" + json.dumps(msg) + + +def multicall(conf, context, topic, msg, timeout=None): + """Make a call that returns multiple times.""" + + check_serialize(msg) + + method = msg.get('method') + if not method: + return + args = msg.get('args', {}) + version = msg.get('version') + namespace = msg.get('namespace') + + try: + consumer = CONSUMERS[topic][0] + except (KeyError, IndexError): + raise rpc_common.Timeout("No consumers available") + else: + return consumer.call(context, version, method, namespace, args, + timeout) + + +def call(conf, context, topic, msg, timeout=None): + """Sends a message on a topic and wait for a response.""" + rv = multicall(conf, context, topic, msg, timeout) + # NOTE(vish): return the last result from the multicall + rv = list(rv) + if not rv: + return + return rv[-1] + + +def cast(conf, context, topic, msg): + check_serialize(msg) + try: + call(conf, context, topic, msg) + except Exception: + pass + + +def notify(conf, context, topic, msg, envelope): + check_serialize(msg) + + +def cleanup(): + pass + + +def fanout_cast(conf, context, topic, msg): + """Cast to all consumers of a topic.""" + check_serialize(msg) + method = msg.get('method') + if not method: + return + args = msg.get('args', {}) + version = msg.get('version') + namespace = msg.get('namespace') + + for consumer in CONSUMERS.get(topic, []): + try: + consumer.call(context, version, method, namespace, args, None) + except Exception: + pass diff --git a/oslo/log/openstack/common/rpc/impl_kombu.py b/oslo/log/openstack/common/rpc/impl_kombu.py new file mode 100644 index 0000000..07558cb --- /dev/null +++ b/oslo/log/openstack/common/rpc/impl_kombu.py @@ -0,0 +1,873 @@ +# Copyright 2011 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import functools +import itertools +import socket +import ssl +import time +import uuid + +import eventlet +import greenlet +import kombu +import kombu.connection +import kombu.entity +import kombu.messaging +from oslo.config import cfg +import six + +from oslo.log.openstack.common import excutils +from oslo.log.openstack.common.gettextutils import _, _LE, _LI +from oslo.log.openstack.common import network_utils +from oslo.log.openstack.common.rpc import amqp as rpc_amqp +from oslo.log.openstack.common.rpc import common as rpc_common +from oslo.log.openstack.common import sslutils + +kombu_opts = [ + cfg.StrOpt('kombu_ssl_version', + default='', + help='If SSL is enabled, the SSL version to use. Valid ' + 'values are TLSv1, SSLv23 and SSLv3. SSLv2 might ' + 'be available on some distributions.' + ), + cfg.StrOpt('kombu_ssl_keyfile', + default='', + help='SSL key file (valid only if SSL enabled)'), + cfg.StrOpt('kombu_ssl_certfile', + default='', + help='SSL cert file (valid only if SSL enabled)'), + cfg.StrOpt('kombu_ssl_ca_certs', + default='', + help='SSL certification authority file ' + '(valid only if SSL enabled)'), + cfg.FloatOpt('kombu_reconnect_delay', + default=1.0, + help='How long to wait before reconnecting in response to an ' + 'AMQP consumer cancel notification.'), + cfg.StrOpt('rabbit_host', + default='localhost', + help='The RabbitMQ broker address where a single node is used'), + cfg.IntOpt('rabbit_port', + default=5672, + help='The RabbitMQ broker port where a single node is used'), + cfg.ListOpt('rabbit_hosts', + default=['$rabbit_host:$rabbit_port'], + help='RabbitMQ HA cluster host:port pairs'), + cfg.BoolOpt('rabbit_use_ssl', + default=False, + help='Connect over SSL for RabbitMQ'), + cfg.StrOpt('rabbit_userid', + default='guest', + help='The RabbitMQ userid'), + cfg.StrOpt('rabbit_password', + default='guest', + help='The RabbitMQ password', + secret=True), + cfg.StrOpt('rabbit_virtual_host', + default='/', + help='The RabbitMQ virtual host'), + cfg.IntOpt('rabbit_retry_interval', + default=1, + help='How frequently to retry connecting with RabbitMQ'), + cfg.IntOpt('rabbit_retry_backoff', + default=2, + help='How long to backoff for between retries when connecting ' + 'to RabbitMQ'), + cfg.IntOpt('rabbit_max_retries', + default=0, + help='Maximum number of RabbitMQ connection retries. ' + 'Default is 0 (infinite retry count)'), + cfg.BoolOpt('rabbit_ha_queues', + default=False, + help='Use HA queues in RabbitMQ (x-ha-policy: all). ' + 'If you change this option, you must wipe the ' + 'RabbitMQ database.'), + +] + +cfg.CONF.register_opts(kombu_opts) + +LOG = rpc_common.LOG + + +def _get_queue_arguments(conf): + """Construct the arguments for declaring a queue. + + If the rabbit_ha_queues option is set, we declare a mirrored queue + as described here: + + http://www.rabbitmq.com/ha.html + + Setting x-ha-policy to all means that the queue will be mirrored + to all nodes in the cluster. + """ + return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {} + + +class ConsumerBase(object): + """Consumer base class.""" + + def __init__(self, channel, callback, tag, **kwargs): + """Declare a queue on an amqp channel. + + 'channel' is the amqp channel to use + 'callback' is the callback to call when messages are received + 'tag' is a unique ID for the consumer on the channel + + queue name, exchange name, and other kombu options are + passed in here as a dictionary. + """ + self.callback = callback + self.tag = str(tag) + self.kwargs = kwargs + self.queue = None + self.ack_on_error = kwargs.get('ack_on_error', True) + self.reconnect(channel) + + def reconnect(self, channel): + """Re-declare the queue after a rabbit reconnect.""" + self.channel = channel + self.kwargs['channel'] = channel + self.queue = kombu.entity.Queue(**self.kwargs) + self.queue.declare() + + def _callback_handler(self, message, callback): + """Call callback with deserialized message. + + Messages that are processed without exception are ack'ed. + + If the message processing generates an exception, it will be + ack'ed if ack_on_error=True. Otherwise it will be .requeue()'ed. + """ + + try: + msg = rpc_common.deserialize_msg(message.payload) + callback(msg) + except Exception: + if self.ack_on_error: + LOG.exception(_LE("Failed to process message" + " ... skipping it.")) + message.ack() + else: + LOG.exception(_LE("Failed to process message" + " ... will requeue.")) + message.requeue() + else: + message.ack() + + def consume(self, *args, **kwargs): + """Actually declare the consumer on the amqp channel. This will + start the flow of messages from the queue. Using the + Connection.iterconsume() iterator will process the messages, + calling the appropriate callback. + + If a callback is specified in kwargs, use that. Otherwise, + use the callback passed during __init__() + + If kwargs['nowait'] is True, then this call will block until + a message is read. + + """ + + options = {'consumer_tag': self.tag} + options['nowait'] = kwargs.get('nowait', False) + callback = kwargs.get('callback', self.callback) + if not callback: + raise ValueError("No callback defined") + + def _callback(raw_message): + message = self.channel.message_to_python(raw_message) + self._callback_handler(message, callback) + + self.queue.consume(*args, callback=_callback, **options) + + def cancel(self): + """Cancel the consuming from the queue, if it has started.""" + try: + self.queue.cancel(self.tag) + except KeyError as e: + # NOTE(comstud): Kludge to get around a amqplib bug + if str(e) != "u'%s'" % self.tag: + raise + self.queue = None + + +class DirectConsumer(ConsumerBase): + """Queue/consumer class for 'direct'.""" + + def __init__(self, conf, channel, msg_id, callback, tag, **kwargs): + """Init a 'direct' queue. + + 'channel' is the amqp channel to use + 'msg_id' is the msg_id to listen on + 'callback' is the callback to call when messages are received + 'tag' is a unique ID for the consumer on the channel + + Other kombu options may be passed + """ + # Default options + options = {'durable': False, + 'queue_arguments': _get_queue_arguments(conf), + 'auto_delete': True, + 'exclusive': False} + options.update(kwargs) + exchange = kombu.entity.Exchange(name=msg_id, + type='direct', + durable=options['durable'], + auto_delete=options['auto_delete']) + super(DirectConsumer, self).__init__(channel, + callback, + tag, + name=msg_id, + exchange=exchange, + routing_key=msg_id, + **options) + + +class TopicConsumer(ConsumerBase): + """Consumer class for 'topic'.""" + + def __init__(self, conf, channel, topic, callback, tag, name=None, + exchange_name=None, **kwargs): + """Init a 'topic' queue. + + :param channel: the amqp channel to use + :param topic: the topic to listen on + :paramtype topic: str + :param callback: the callback to call when messages are received + :param tag: a unique ID for the consumer on the channel + :param name: optional queue name, defaults to topic + :paramtype name: str + + Other kombu options may be passed as keyword arguments + """ + # Default options + options = {'durable': conf.amqp_durable_queues, + 'queue_arguments': _get_queue_arguments(conf), + 'auto_delete': conf.amqp_auto_delete, + 'exclusive': False} + options.update(kwargs) + exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf) + exchange = kombu.entity.Exchange(name=exchange_name, + type='topic', + durable=options['durable'], + auto_delete=options['auto_delete']) + super(TopicConsumer, self).__init__(channel, + callback, + tag, + name=name or topic, + exchange=exchange, + routing_key=topic, + **options) + + +class FanoutConsumer(ConsumerBase): + """Consumer class for 'fanout'.""" + + def __init__(self, conf, channel, topic, callback, tag, **kwargs): + """Init a 'fanout' queue. + + 'channel' is the amqp channel to use + 'topic' is the topic to listen on + 'callback' is the callback to call when messages are received + 'tag' is a unique ID for the consumer on the channel + + Other kombu options may be passed + """ + unique = uuid.uuid4().hex + exchange_name = '%s_fanout' % topic + queue_name = '%s_fanout_%s' % (topic, unique) + + # Default options + options = {'durable': False, + 'queue_arguments': _get_queue_arguments(conf), + 'auto_delete': True, + 'exclusive': False} + options.update(kwargs) + exchange = kombu.entity.Exchange(name=exchange_name, type='fanout', + durable=options['durable'], + auto_delete=options['auto_delete']) + super(FanoutConsumer, self).__init__(channel, callback, tag, + name=queue_name, + exchange=exchange, + routing_key=topic, + **options) + + +class Publisher(object): + """Base Publisher class.""" + + def __init__(self, channel, exchange_name, routing_key, **kwargs): + """Init the Publisher class with the exchange_name, routing_key, + and other options + """ + self.exchange_name = exchange_name + self.routing_key = routing_key + self.kwargs = kwargs + self.reconnect(channel) + + def reconnect(self, channel): + """Re-establish the Producer after a rabbit reconnection.""" + self.exchange = kombu.entity.Exchange(name=self.exchange_name, + **self.kwargs) + self.producer = kombu.messaging.Producer(exchange=self.exchange, + channel=channel, + routing_key=self.routing_key) + + def send(self, msg, timeout=None): + """Send a message.""" + if timeout: + # + # AMQP TTL is in milliseconds when set in the header. + # + self.producer.publish(msg, headers={'ttl': (timeout * 1000)}) + else: + self.producer.publish(msg) + + +class DirectPublisher(Publisher): + """Publisher class for 'direct'.""" + def __init__(self, conf, channel, msg_id, **kwargs): + """init a 'direct' publisher. + + Kombu options may be passed as keyword args to override defaults + """ + + options = {'durable': False, + 'auto_delete': True, + 'exclusive': False} + options.update(kwargs) + super(DirectPublisher, self).__init__(channel, msg_id, msg_id, + type='direct', **options) + + +class TopicPublisher(Publisher): + """Publisher class for 'topic'.""" + def __init__(self, conf, channel, topic, **kwargs): + """init a 'topic' publisher. + + Kombu options may be passed as keyword args to override defaults + """ + options = {'durable': conf.amqp_durable_queues, + 'auto_delete': conf.amqp_auto_delete, + 'exclusive': False} + options.update(kwargs) + exchange_name = rpc_amqp.get_control_exchange(conf) + super(TopicPublisher, self).__init__(channel, + exchange_name, + topic, + type='topic', + **options) + + +class FanoutPublisher(Publisher): + """Publisher class for 'fanout'.""" + def __init__(self, conf, channel, topic, **kwargs): + """init a 'fanout' publisher. + + Kombu options may be passed as keyword args to override defaults + """ + options = {'durable': False, + 'auto_delete': True, + 'exclusive': False} + options.update(kwargs) + super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic, + None, type='fanout', **options) + + +class NotifyPublisher(TopicPublisher): + """Publisher class for 'notify'.""" + + def __init__(self, conf, channel, topic, **kwargs): + self.durable = kwargs.pop('durable', conf.amqp_durable_queues) + self.queue_arguments = _get_queue_arguments(conf) + super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs) + + def reconnect(self, channel): + super(NotifyPublisher, self).reconnect(channel) + + # NOTE(jerdfelt): Normally the consumer would create the queue, but + # we do this to ensure that messages don't get dropped if the + # consumer is started after we do + queue = kombu.entity.Queue(channel=channel, + exchange=self.exchange, + durable=self.durable, + name=self.routing_key, + routing_key=self.routing_key, + queue_arguments=self.queue_arguments) + queue.declare() + + +class Connection(object): + """Connection object.""" + + pool = None + + def __init__(self, conf, server_params=None): + self.consumers = [] + self.consumer_thread = None + self.proxy_callbacks = [] + self.conf = conf + self.max_retries = self.conf.rabbit_max_retries + # Try forever? + if self.max_retries <= 0: + self.max_retries = None + self.interval_start = self.conf.rabbit_retry_interval + self.interval_stepping = self.conf.rabbit_retry_backoff + # max retry-interval = 30 seconds + self.interval_max = 30 + self.memory_transport = False + + if server_params is None: + server_params = {} + # Keys to translate from server_params to kombu params + server_params_to_kombu_params = {'username': 'userid'} + + ssl_params = self._fetch_ssl_params() + params_list = [] + for adr in self.conf.rabbit_hosts: + hostname, port = network_utils.parse_host_port( + adr, default_port=self.conf.rabbit_port) + + params = { + 'hostname': hostname, + 'port': port, + 'userid': self.conf.rabbit_userid, + 'password': self.conf.rabbit_password, + 'virtual_host': self.conf.rabbit_virtual_host, + } + + for sp_key, value in six.iteritems(server_params): + p_key = server_params_to_kombu_params.get(sp_key, sp_key) + params[p_key] = value + + if self.conf.fake_rabbit: + params['transport'] = 'memory' + if self.conf.rabbit_use_ssl: + params['ssl'] = ssl_params + + params_list.append(params) + + self.params_list = params_list + + brokers_count = len(self.params_list) + self.next_broker_indices = itertools.cycle(range(brokers_count)) + + self.memory_transport = self.conf.fake_rabbit + + self.connection = None + self.reconnect() + + def _fetch_ssl_params(self): + """Handles fetching what ssl params should be used for the connection + (if any). + """ + ssl_params = dict() + + # http://docs.python.org/library/ssl.html - ssl.wrap_socket + if self.conf.kombu_ssl_version: + ssl_params['ssl_version'] = sslutils.validate_ssl_version( + self.conf.kombu_ssl_version) + if self.conf.kombu_ssl_keyfile: + ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile + if self.conf.kombu_ssl_certfile: + ssl_params['certfile'] = self.conf.kombu_ssl_certfile + if self.conf.kombu_ssl_ca_certs: + ssl_params['ca_certs'] = self.conf.kombu_ssl_ca_certs + # We might want to allow variations in the + # future with this? + ssl_params['cert_reqs'] = ssl.CERT_REQUIRED + + # Return the extended behavior or just have the default behavior + return ssl_params or True + + def _connect(self, params): + """Connect to rabbit. Re-establish any queues that may have + been declared before if we are reconnecting. Exceptions should + be handled by the caller. + """ + if self.connection: + LOG.info(_LI("Reconnecting to AMQP server on " + "%(hostname)s:%(port)d") % params) + try: + # XXX(nic): when reconnecting to a RabbitMQ cluster + # with mirrored queues in use, the attempt to release the + # connection can hang "indefinitely" somewhere deep down + # in Kombu. Blocking the thread for a bit prior to + # release seems to kludge around the problem where it is + # otherwise reproduceable. + if self.conf.kombu_reconnect_delay > 0: + LOG.info(_("Delaying reconnect for %1.1f seconds...") % + self.conf.kombu_reconnect_delay) + time.sleep(self.conf.kombu_reconnect_delay) + + self.connection.release() + except self.connection_errors: + pass + # Setting this in case the next statement fails, though + # it shouldn't be doing any network operations, yet. + self.connection = None + self.connection = kombu.connection.BrokerConnection(**params) + self.connection_errors = self.connection.connection_errors + if self.memory_transport: + # Kludge to speed up tests. + self.connection.transport.polling_interval = 0.0 + self.consumer_num = itertools.count(1) + self.connection.connect() + self.channel = self.connection.channel() + # work around 'memory' transport bug in 1.1.3 + if self.memory_transport: + self.channel._new_queue('ae.undeliver') + for consumer in self.consumers: + consumer.reconnect(self.channel) + LOG.info(_LI('Connected to AMQP server on %(hostname)s:%(port)d') % + params) + + def reconnect(self): + """Handles reconnecting and re-establishing queues. + Will retry up to self.max_retries number of times. + self.max_retries = 0 means to retry forever. + Sleep between tries, starting at self.interval_start + seconds, backing off self.interval_stepping number of seconds + each attempt. + """ + + attempt = 0 + while True: + params = self.params_list[next(self.next_broker_indices)] + attempt += 1 + try: + self._connect(params) + return + except (IOError, self.connection_errors) as e: + pass + except Exception as e: + # NOTE(comstud): Unfortunately it's possible for amqplib + # to return an error not covered by its transport + # connection_errors in the case of a timeout waiting for + # a protocol response. (See paste link in LP888621) + # So, we check all exceptions for 'timeout' in them + # and try to reconnect in this case. + if 'timeout' not in str(e): + raise + + log_info = {} + log_info['err_str'] = e + log_info['max_retries'] = self.max_retries + log_info.update(params) + + if self.max_retries and attempt == self.max_retries: + msg = _('Unable to connect to AMQP server on ' + '%(hostname)s:%(port)d after %(max_retries)d ' + 'tries: %(err_str)s') % log_info + LOG.error(msg) + raise rpc_common.RPCException(msg) + + if attempt == 1: + sleep_time = self.interval_start or 1 + elif attempt > 1: + sleep_time += self.interval_stepping + if self.interval_max: + sleep_time = min(sleep_time, self.interval_max) + + log_info['sleep_time'] = sleep_time + LOG.error(_LE('AMQP server on %(hostname)s:%(port)d is ' + 'unreachable: %(err_str)s. Trying again in ' + '%(sleep_time)d seconds.') % log_info) + time.sleep(sleep_time) + + def ensure(self, error_callback, method, *args, **kwargs): + while True: + try: + return method(*args, **kwargs) + except (self.connection_errors, socket.timeout, IOError) as e: + if error_callback: + error_callback(e) + except Exception as e: + # NOTE(comstud): Unfortunately it's possible for amqplib + # to return an error not covered by its transport + # connection_errors in the case of a timeout waiting for + # a protocol response. (See paste link in LP888621) + # So, we check all exceptions for 'timeout' in them + # and try to reconnect in this case. + if 'timeout' not in str(e): + raise + if error_callback: + error_callback(e) + self.reconnect() + + def get_channel(self): + """Convenience call for bin/clear_rabbit_queues.""" + return self.channel + + def close(self): + """Close/release this connection.""" + self.cancel_consumer_thread() + self.wait_on_proxy_callbacks() + self.connection.release() + self.connection = None + + def reset(self): + """Reset a connection so it can be used again.""" + self.cancel_consumer_thread() + self.wait_on_proxy_callbacks() + self.channel.close() + self.channel = self.connection.channel() + # work around 'memory' transport bug in 1.1.3 + if self.memory_transport: + self.channel._new_queue('ae.undeliver') + self.consumers = [] + + def declare_consumer(self, consumer_cls, topic, callback): + """Create a Consumer using the class that was passed in and + add it to our list of consumers + """ + + def _connect_error(exc): + log_info = {'topic': topic, 'err_str': exc} + LOG.error(_LE("Failed to declare consumer for topic '%(topic)s': " + "%(err_str)s") % log_info) + + def _declare_consumer(): + consumer = consumer_cls(self.conf, self.channel, topic, callback, + six.next(self.consumer_num)) + self.consumers.append(consumer) + return consumer + + return self.ensure(_connect_error, _declare_consumer) + + def iterconsume(self, limit=None, timeout=None): + """Return an iterator that will consume from all queues/consumers.""" + + info = {'do_consume': True} + + def _error_callback(exc): + if isinstance(exc, socket.timeout): + LOG.debug('Timed out waiting for RPC response: %s' % + exc) + raise rpc_common.Timeout() + else: + LOG.exception(_LE('Failed to consume message from queue: %s') % + exc) + info['do_consume'] = True + + def _consume(): + if info['do_consume']: + queues_head = self.consumers[:-1] # not fanout. + queues_tail = self.consumers[-1] # fanout + for queue in queues_head: + queue.consume(nowait=True) + queues_tail.consume(nowait=False) + info['do_consume'] = False + return self.connection.drain_events(timeout=timeout) + + for iteration in itertools.count(0): + if limit and iteration >= limit: + raise StopIteration + yield self.ensure(_error_callback, _consume) + + def cancel_consumer_thread(self): + """Cancel a consumer thread.""" + if self.consumer_thread is not None: + self.consumer_thread.kill() + try: + self.consumer_thread.wait() + except greenlet.GreenletExit: + pass + self.consumer_thread = None + + def wait_on_proxy_callbacks(self): + """Wait for all proxy callback threads to exit.""" + for proxy_cb in self.proxy_callbacks: + proxy_cb.wait() + + def publisher_send(self, cls, topic, msg, timeout=None, **kwargs): + """Send to a publisher based on the publisher class.""" + + def _error_callback(exc): + log_info = {'topic': topic, 'err_str': exc} + LOG.exception(_LE("Failed to publish message to topic " + "'%(topic)s': %(err_str)s") % log_info) + + def _publish(): + publisher = cls(self.conf, self.channel, topic, **kwargs) + publisher.send(msg, timeout) + + self.ensure(_error_callback, _publish) + + def declare_direct_consumer(self, topic, callback): + """Create a 'direct' queue. + In nova's use, this is generally a msg_id queue used for + responses for call/multicall + """ + self.declare_consumer(DirectConsumer, topic, callback) + + def declare_topic_consumer(self, topic, callback=None, queue_name=None, + exchange_name=None, ack_on_error=True): + """Create a 'topic' consumer.""" + self.declare_consumer(functools.partial(TopicConsumer, + name=queue_name, + exchange_name=exchange_name, + ack_on_error=ack_on_error, + ), + topic, callback) + + def declare_fanout_consumer(self, topic, callback): + """Create a 'fanout' consumer.""" + self.declare_consumer(FanoutConsumer, topic, callback) + + def direct_send(self, msg_id, msg): + """Send a 'direct' message.""" + self.publisher_send(DirectPublisher, msg_id, msg) + + def topic_send(self, topic, msg, timeout=None): + """Send a 'topic' message.""" + self.publisher_send(TopicPublisher, topic, msg, timeout) + + def fanout_send(self, topic, msg): + """Send a 'fanout' message.""" + self.publisher_send(FanoutPublisher, topic, msg) + + def notify_send(self, topic, msg, **kwargs): + """Send a notify message on a topic.""" + self.publisher_send(NotifyPublisher, topic, msg, None, **kwargs) + + def consume(self, limit=None): + """Consume from all queues/consumers.""" + it = self.iterconsume(limit=limit) + while True: + try: + six.next(it) + except StopIteration: + return + + def consume_in_thread(self): + """Consumer from all queues/consumers in a greenthread.""" + @excutils.forever_retry_uncaught_exceptions + def _consumer_thread(): + try: + self.consume() + except greenlet.GreenletExit: + return + if self.consumer_thread is None: + self.consumer_thread = eventlet.spawn(_consumer_thread) + return self.consumer_thread + + def create_consumer(self, topic, proxy, fanout=False): + """Create a consumer that calls a method in a proxy object.""" + proxy_cb = rpc_amqp.ProxyCallback( + self.conf, proxy, + rpc_amqp.get_connection_pool(self.conf, Connection)) + self.proxy_callbacks.append(proxy_cb) + + if fanout: + self.declare_fanout_consumer(topic, proxy_cb) + else: + self.declare_topic_consumer(topic, proxy_cb) + + def create_worker(self, topic, proxy, pool_name): + """Create a worker that calls a method in a proxy object.""" + proxy_cb = rpc_amqp.ProxyCallback( + self.conf, proxy, + rpc_amqp.get_connection_pool(self.conf, Connection)) + self.proxy_callbacks.append(proxy_cb) + self.declare_topic_consumer(topic, proxy_cb, pool_name) + + def join_consumer_pool(self, callback, pool_name, topic, + exchange_name=None, ack_on_error=True): + """Register as a member of a group of consumers for a given topic from + the specified exchange. + + Exactly one member of a given pool will receive each message. + + A message will be delivered to multiple pools, if more than + one is created. + """ + callback_wrapper = rpc_amqp.CallbackWrapper( + conf=self.conf, + callback=callback, + connection_pool=rpc_amqp.get_connection_pool(self.conf, + Connection), + wait_for_consumers=not ack_on_error + ) + self.proxy_callbacks.append(callback_wrapper) + self.declare_topic_consumer( + queue_name=pool_name, + topic=topic, + exchange_name=exchange_name, + callback=callback_wrapper, + ack_on_error=ack_on_error, + ) + + +def create_connection(conf, new=True): + """Create a connection.""" + return rpc_amqp.create_connection( + conf, new, + rpc_amqp.get_connection_pool(conf, Connection)) + + +def multicall(conf, context, topic, msg, timeout=None): + """Make a call that returns multiple times.""" + return rpc_amqp.multicall( + conf, context, topic, msg, timeout, + rpc_amqp.get_connection_pool(conf, Connection)) + + +def call(conf, context, topic, msg, timeout=None): + """Sends a message on a topic and wait for a response.""" + return rpc_amqp.call( + conf, context, topic, msg, timeout, + rpc_amqp.get_connection_pool(conf, Connection)) + + +def cast(conf, context, topic, msg): + """Sends a message on a topic without waiting for a response.""" + return rpc_amqp.cast( + conf, context, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) + + +def fanout_cast(conf, context, topic, msg): + """Sends a message on a fanout exchange without waiting for a response.""" + return rpc_amqp.fanout_cast( + conf, context, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) + + +def cast_to_server(conf, context, server_params, topic, msg): + """Sends a message on a topic to a specific server.""" + return rpc_amqp.cast_to_server( + conf, context, server_params, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) + + +def fanout_cast_to_server(conf, context, server_params, topic, msg): + """Sends a message on a fanout exchange to a specific server.""" + return rpc_amqp.fanout_cast_to_server( + conf, context, server_params, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) + + +def notify(conf, context, topic, msg, envelope): + """Sends a notification event on a topic.""" + return rpc_amqp.notify( + conf, context, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection), + envelope) + + +def cleanup(): + return rpc_amqp.cleanup(Connection.pool) diff --git a/oslo/log/openstack/common/rpc/impl_qpid.py b/oslo/log/openstack/common/rpc/impl_qpid.py new file mode 100644 index 0000000..6e06b91 --- /dev/null +++ b/oslo/log/openstack/common/rpc/impl_qpid.py @@ -0,0 +1,823 @@ +# Copyright 2011 OpenStack Foundation +# Copyright 2011 - 2012, Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import functools +import itertools +import time + +import eventlet +import greenlet +from oslo.config import cfg +import six + +from oslo.log.openstack.common import excutils +from oslo.log.openstack.common.gettextutils import _, _LE, _LI +from oslo.log.openstack.common import importutils +from oslo.log.openstack.common import jsonutils +from oslo.log.openstack.common import log as logging +from oslo.log.openstack.common.rpc import amqp as rpc_amqp +from oslo.log.openstack.common.rpc import common as rpc_common + +qpid_codec = importutils.try_import("qpid.codec010") +qpid_messaging = importutils.try_import("qpid.messaging") +qpid_exceptions = importutils.try_import("qpid.messaging.exceptions") + +LOG = logging.getLogger(__name__) + +qpid_opts = [ + cfg.StrOpt('qpid_hostname', + default='localhost', + help='Qpid broker hostname'), + cfg.IntOpt('qpid_port', + default=5672, + help='Qpid broker port'), + cfg.ListOpt('qpid_hosts', + default=['$qpid_hostname:$qpid_port'], + help='Qpid HA cluster host:port pairs'), + cfg.StrOpt('qpid_username', + default='', + help='Username for qpid connection'), + cfg.StrOpt('qpid_password', + default='', + help='Password for qpid connection', + secret=True), + cfg.StrOpt('qpid_sasl_mechanisms', + default='', + help='Space separated list of SASL mechanisms to use for auth'), + cfg.IntOpt('qpid_heartbeat', + default=60, + help='Seconds between connection keepalive heartbeats'), + cfg.StrOpt('qpid_protocol', + default='tcp', + help="Transport to use, either 'tcp' or 'ssl'"), + cfg.BoolOpt('qpid_tcp_nodelay', + default=True, + help='Disable Nagle algorithm'), + # NOTE(russellb) If any additional versions are added (beyond 1 and 2), + # this file could probably use some additional refactoring so that the + # differences between each version are split into different classes. + cfg.IntOpt('qpid_topology_version', + default=1, + help="The qpid topology version to use. Version 1 is what " + "was originally used by impl_qpid. Version 2 includes " + "some backwards-incompatible changes that allow broker " + "federation to work. Users should update to version 2 " + "when they are able to take everything down, as it " + "requires a clean break."), +] + +cfg.CONF.register_opts(qpid_opts) + +JSON_CONTENT_TYPE = 'application/json; charset=utf8' + + +def raise_invalid_topology_version(conf): + msg = (_("Invalid value for qpid_topology_version: %d") % + conf.qpid_topology_version) + LOG.error(msg) + raise Exception(msg) + + +class ConsumerBase(object): + """Consumer base class.""" + + def __init__(self, conf, session, callback, node_name, node_opts, + link_name, link_opts): + """Declare a queue on an amqp session. + + 'session' is the amqp session to use + 'callback' is the callback to call when messages are received + 'node_name' is the first part of the Qpid address string, before ';' + 'node_opts' will be applied to the "x-declare" section of "node" + in the address string. + 'link_name' goes into the "name" field of the "link" in the address + string + 'link_opts' will be applied to the "x-declare" section of "link" + in the address string. + """ + self.callback = callback + self.receiver = None + self.session = None + + if conf.qpid_topology_version == 1: + addr_opts = { + "create": "always", + "node": { + "type": "topic", + "x-declare": { + "durable": True, + "auto-delete": True, + }, + }, + "link": { + "durable": True, + "x-declare": { + "durable": False, + "auto-delete": True, + "exclusive": False, + }, + }, + } + addr_opts["node"]["x-declare"].update(node_opts) + elif conf.qpid_topology_version == 2: + addr_opts = { + "link": { + "x-declare": { + "auto-delete": True, + "exclusive": False, + }, + }, + } + else: + raise_invalid_topology_version() + + addr_opts["link"]["x-declare"].update(link_opts) + if link_name: + addr_opts["link"]["name"] = link_name + + self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts)) + + self.connect(session) + + def connect(self, session): + """Declare the receiver on connect.""" + self._declare_receiver(session) + + def reconnect(self, session): + """Re-declare the receiver after a qpid reconnect.""" + self._declare_receiver(session) + + def _declare_receiver(self, session): + self.session = session + self.receiver = session.receiver(self.address) + self.receiver.capacity = 1 + + def _unpack_json_msg(self, msg): + """Load the JSON data in msg if msg.content_type indicates that it + is necessary. Put the loaded data back into msg.content and + update msg.content_type appropriately. + + A Qpid Message containing a dict will have a content_type of + 'amqp/map', whereas one containing a string that needs to be converted + back from JSON will have a content_type of JSON_CONTENT_TYPE. + + :param msg: a Qpid Message object + :returns: None + """ + if msg.content_type == JSON_CONTENT_TYPE: + msg.content = jsonutils.loads(msg.content) + msg.content_type = 'amqp/map' + + def consume(self): + """Fetch the message and pass it to the callback object.""" + message = self.receiver.fetch() + try: + self._unpack_json_msg(message) + msg = rpc_common.deserialize_msg(message.content) + self.callback(msg) + except Exception: + LOG.exception(_LE("Failed to process message... skipping it.")) + finally: + # TODO(sandy): Need support for optional ack_on_error. + self.session.acknowledge(message) + + def get_receiver(self): + return self.receiver + + def get_node_name(self): + return self.address.split(';')[0] + + +class DirectConsumer(ConsumerBase): + """Queue/consumer class for 'direct'.""" + + def __init__(self, conf, session, msg_id, callback): + """Init a 'direct' queue. + + 'session' is the amqp session to use + 'msg_id' is the msg_id to listen on + 'callback' is the callback to call when messages are received + """ + + link_opts = { + "auto-delete": conf.amqp_auto_delete, + "exclusive": True, + "durable": conf.amqp_durable_queues, + } + + if conf.qpid_topology_version == 1: + node_name = "%s/%s" % (msg_id, msg_id) + node_opts = {"type": "direct"} + link_name = msg_id + elif conf.qpid_topology_version == 2: + node_name = "amq.direct/%s" % msg_id + node_opts = {} + link_name = msg_id + else: + raise_invalid_topology_version() + + super(DirectConsumer, self).__init__(conf, session, callback, + node_name, node_opts, link_name, + link_opts) + + +class TopicConsumer(ConsumerBase): + """Consumer class for 'topic'.""" + + def __init__(self, conf, session, topic, callback, name=None, + exchange_name=None): + """Init a 'topic' queue. + + :param session: the amqp session to use + :param topic: is the topic to listen on + :paramtype topic: str + :param callback: the callback to call when messages are received + :param name: optional queue name, defaults to topic + """ + + exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf) + link_opts = { + "auto-delete": conf.amqp_auto_delete, + "durable": conf.amqp_durable_queues, + } + + if conf.qpid_topology_version == 1: + node_name = "%s/%s" % (exchange_name, topic) + elif conf.qpid_topology_version == 2: + node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic) + else: + raise_invalid_topology_version() + + super(TopicConsumer, self).__init__(conf, session, callback, node_name, + {}, name or topic, link_opts) + + +class FanoutConsumer(ConsumerBase): + """Consumer class for 'fanout'.""" + + def __init__(self, conf, session, topic, callback): + """Init a 'fanout' queue. + + 'session' is the amqp session to use + 'topic' is the topic to listen on + 'callback' is the callback to call when messages are received + """ + self.conf = conf + + link_opts = {"exclusive": True} + + if conf.qpid_topology_version == 1: + node_name = "%s_fanout" % topic + node_opts = {"durable": False, "type": "fanout"} + elif conf.qpid_topology_version == 2: + node_name = "amq.topic/fanout/%s" % topic + node_opts = {} + else: + raise_invalid_topology_version() + + super(FanoutConsumer, self).__init__(conf, session, callback, + node_name, node_opts, None, + link_opts) + + +class Publisher(object): + """Base Publisher class.""" + + def __init__(self, conf, session, node_name, node_opts=None): + """Init the Publisher class with the exchange_name, routing_key, + and other options + """ + self.sender = None + self.session = session + + if conf.qpid_topology_version == 1: + addr_opts = { + "create": "always", + "node": { + "type": "topic", + "x-declare": { + "durable": False, + # auto-delete isn't implemented for exchanges in qpid, + # but put in here anyway + "auto-delete": True, + }, + }, + } + if node_opts: + addr_opts["node"]["x-declare"].update(node_opts) + + self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts)) + elif conf.qpid_topology_version == 2: + self.address = node_name + else: + raise_invalid_topology_version() + + self.reconnect(session) + + def reconnect(self, session): + """Re-establish the Sender after a reconnection.""" + self.sender = session.sender(self.address) + + def _pack_json_msg(self, msg): + """Qpid cannot serialize dicts containing strings longer than 65535 + characters. This function dumps the message content to a JSON + string, which Qpid is able to handle. + + :param msg: May be either a Qpid Message object or a bare dict. + :returns: A Qpid Message with its content field JSON encoded. + """ + try: + msg.content = jsonutils.dumps(msg.content) + except AttributeError: + # Need to have a Qpid message so we can set the content_type. + msg = qpid_messaging.Message(jsonutils.dumps(msg)) + msg.content_type = JSON_CONTENT_TYPE + return msg + + def send(self, msg): + """Send a message.""" + try: + # Check if Qpid can encode the message + check_msg = msg + if not hasattr(check_msg, 'content_type'): + check_msg = qpid_messaging.Message(msg) + content_type = check_msg.content_type + enc, dec = qpid_messaging.message.get_codec(content_type) + enc(check_msg.content) + except qpid_codec.CodecException: + # This means the message couldn't be serialized as a dict. + msg = self._pack_json_msg(msg) + self.sender.send(msg) + + +class DirectPublisher(Publisher): + """Publisher class for 'direct'.""" + def __init__(self, conf, session, msg_id): + """Init a 'direct' publisher.""" + + if conf.qpid_topology_version == 1: + node_name = "%s/%s" % (msg_id, msg_id) + node_opts = {"type": "direct"} + elif conf.qpid_topology_version == 2: + node_name = "amq.direct/%s" % msg_id + node_opts = {} + else: + raise_invalid_topology_version() + + super(DirectPublisher, self).__init__(conf, session, node_name, + node_opts) + + +class TopicPublisher(Publisher): + """Publisher class for 'topic'.""" + def __init__(self, conf, session, topic): + """Init a 'topic' publisher. + """ + exchange_name = rpc_amqp.get_control_exchange(conf) + + if conf.qpid_topology_version == 1: + node_name = "%s/%s" % (exchange_name, topic) + elif conf.qpid_topology_version == 2: + node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic) + else: + raise_invalid_topology_version() + + super(TopicPublisher, self).__init__(conf, session, node_name) + + +class FanoutPublisher(Publisher): + """Publisher class for 'fanout'.""" + def __init__(self, conf, session, topic): + """Init a 'fanout' publisher. + """ + + if conf.qpid_topology_version == 1: + node_name = "%s_fanout" % topic + node_opts = {"type": "fanout"} + elif conf.qpid_topology_version == 2: + node_name = "amq.topic/fanout/%s" % topic + node_opts = {} + else: + raise_invalid_topology_version() + + super(FanoutPublisher, self).__init__(conf, session, node_name, + node_opts) + + +class NotifyPublisher(Publisher): + """Publisher class for notifications.""" + def __init__(self, conf, session, topic): + """Init a 'topic' publisher. + """ + exchange_name = rpc_amqp.get_control_exchange(conf) + node_opts = {"durable": True} + + if conf.qpid_topology_version == 1: + node_name = "%s/%s" % (exchange_name, topic) + elif conf.qpid_topology_version == 2: + node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic) + else: + raise_invalid_topology_version() + + super(NotifyPublisher, self).__init__(conf, session, node_name, + node_opts) + + +class Connection(object): + """Connection object.""" + + pool = None + + def __init__(self, conf, server_params=None): + if not qpid_messaging: + raise ImportError("Failed to import qpid.messaging") + + self.connection = None + self.session = None + self.consumers = {} + self.consumer_thread = None + self.proxy_callbacks = [] + self.conf = conf + + if server_params and 'hostname' in server_params: + # NOTE(russellb) This enables support for cast_to_server. + server_params['qpid_hosts'] = [ + '%s:%d' % (server_params['hostname'], + server_params.get('port', 5672)) + ] + + params = { + 'qpid_hosts': self.conf.qpid_hosts, + 'username': self.conf.qpid_username, + 'password': self.conf.qpid_password, + } + params.update(server_params or {}) + + self.brokers = params['qpid_hosts'] + self.username = params['username'] + self.password = params['password'] + + brokers_count = len(self.brokers) + self.next_broker_indices = itertools.cycle(range(brokers_count)) + + self.reconnect() + + def connection_create(self, broker): + # Create the connection - this does not open the connection + self.connection = qpid_messaging.Connection(broker) + + # Check if flags are set and if so set them for the connection + # before we call open + self.connection.username = self.username + self.connection.password = self.password + + self.connection.sasl_mechanisms = self.conf.qpid_sasl_mechanisms + # Reconnection is done by self.reconnect() + self.connection.reconnect = False + self.connection.heartbeat = self.conf.qpid_heartbeat + self.connection.transport = self.conf.qpid_protocol + self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay + + def _register_consumer(self, consumer): + self.consumers[str(consumer.get_receiver())] = consumer + + def _lookup_consumer(self, receiver): + return self.consumers[str(receiver)] + + def reconnect(self): + """Handles reconnecting and re-establishing sessions and queues.""" + delay = 1 + while True: + # Close the session if necessary + if self.connection is not None and self.connection.opened(): + try: + self.connection.close() + except qpid_exceptions.MessagingError: + pass + + broker = self.brokers[next(self.next_broker_indices)] + + try: + self.connection_create(broker) + self.connection.open() + except qpid_exceptions.MessagingError as e: + msg_dict = dict(e=e, delay=delay) + msg = _LE("Unable to connect to AMQP server: %(e)s. " + "Sleeping %(delay)s seconds") % msg_dict + LOG.error(msg) + time.sleep(delay) + delay = min(delay + 1, 5) + else: + LOG.info(_LI('Connected to AMQP server on %s'), broker) + break + + self.session = self.connection.session() + + if self.consumers: + consumers = self.consumers + self.consumers = {} + + for consumer in six.itervalues(consumers): + consumer.reconnect(self.session) + self._register_consumer(consumer) + + LOG.debug("Re-established AMQP queues") + + def ensure(self, error_callback, method, *args, **kwargs): + while True: + try: + return method(*args, **kwargs) + except (qpid_exceptions.Empty, + qpid_exceptions.MessagingError) as e: + if error_callback: + error_callback(e) + self.reconnect() + + def close(self): + """Close/release this connection.""" + self.cancel_consumer_thread() + self.wait_on_proxy_callbacks() + try: + self.connection.close() + except Exception: + # NOTE(dripton) Logging exceptions that happen during cleanup just + # causes confusion; there's really nothing useful we can do with + # them. + pass + self.connection = None + + def reset(self): + """Reset a connection so it can be used again.""" + self.cancel_consumer_thread() + self.wait_on_proxy_callbacks() + self.session.close() + self.session = self.connection.session() + self.consumers = {} + + def declare_consumer(self, consumer_cls, topic, callback): + """Create a Consumer using the class that was passed in and + add it to our list of consumers + """ + def _connect_error(exc): + log_info = {'topic': topic, 'err_str': exc} + LOG.error(_LE("Failed to declare consumer for topic '%(topic)s': " + "%(err_str)s") % log_info) + + def _declare_consumer(): + consumer = consumer_cls(self.conf, self.session, topic, callback) + self._register_consumer(consumer) + return consumer + + return self.ensure(_connect_error, _declare_consumer) + + def iterconsume(self, limit=None, timeout=None): + """Return an iterator that will consume from all queues/consumers.""" + + def _error_callback(exc): + if isinstance(exc, qpid_exceptions.Empty): + LOG.debug('Timed out waiting for RPC response: %s' % + exc) + raise rpc_common.Timeout() + else: + LOG.exception(_LE('Failed to consume message from queue: %s') % + exc) + + def _consume(): + nxt_receiver = self.session.next_receiver(timeout=timeout) + try: + self._lookup_consumer(nxt_receiver).consume() + except Exception: + LOG.exception(_LE("Error processing message. Skipping it.")) + + for iteration in itertools.count(0): + if limit and iteration >= limit: + raise StopIteration + yield self.ensure(_error_callback, _consume) + + def cancel_consumer_thread(self): + """Cancel a consumer thread.""" + if self.consumer_thread is not None: + self.consumer_thread.kill() + try: + self.consumer_thread.wait() + except greenlet.GreenletExit: + pass + self.consumer_thread = None + + def wait_on_proxy_callbacks(self): + """Wait for all proxy callback threads to exit.""" + for proxy_cb in self.proxy_callbacks: + proxy_cb.wait() + + def publisher_send(self, cls, topic, msg): + """Send to a publisher based on the publisher class.""" + + def _connect_error(exc): + log_info = {'topic': topic, 'err_str': exc} + LOG.exception(_LE("Failed to publish message to topic " + "'%(topic)s': %(err_str)s") % log_info) + + def _publisher_send(): + publisher = cls(self.conf, self.session, topic) + publisher.send(msg) + + return self.ensure(_connect_error, _publisher_send) + + def declare_direct_consumer(self, topic, callback): + """Create a 'direct' queue. + In nova's use, this is generally a msg_id queue used for + responses for call/multicall + """ + self.declare_consumer(DirectConsumer, topic, callback) + + def declare_topic_consumer(self, topic, callback=None, queue_name=None, + exchange_name=None): + """Create a 'topic' consumer.""" + self.declare_consumer(functools.partial(TopicConsumer, + name=queue_name, + exchange_name=exchange_name, + ), + topic, callback) + + def declare_fanout_consumer(self, topic, callback): + """Create a 'fanout' consumer.""" + self.declare_consumer(FanoutConsumer, topic, callback) + + def direct_send(self, msg_id, msg): + """Send a 'direct' message.""" + self.publisher_send(DirectPublisher, msg_id, msg) + + def topic_send(self, topic, msg, timeout=None): + """Send a 'topic' message.""" + # + # We want to create a message with attributes, e.g. a TTL. We + # don't really need to keep 'msg' in its JSON format any longer + # so let's create an actual qpid message here and get some + # value-add on the go. + # + # WARNING: Request timeout happens to be in the same units as + # qpid's TTL (seconds). If this changes in the future, then this + # will need to be altered accordingly. + # + qpid_message = qpid_messaging.Message(content=msg, ttl=timeout) + self.publisher_send(TopicPublisher, topic, qpid_message) + + def fanout_send(self, topic, msg): + """Send a 'fanout' message.""" + self.publisher_send(FanoutPublisher, topic, msg) + + def notify_send(self, topic, msg, **kwargs): + """Send a notify message on a topic.""" + self.publisher_send(NotifyPublisher, topic, msg) + + def consume(self, limit=None): + """Consume from all queues/consumers.""" + it = self.iterconsume(limit=limit) + while True: + try: + six.next(it) + except StopIteration: + return + + def consume_in_thread(self): + """Consumer from all queues/consumers in a greenthread.""" + @excutils.forever_retry_uncaught_exceptions + def _consumer_thread(): + try: + self.consume() + except greenlet.GreenletExit: + return + if self.consumer_thread is None: + self.consumer_thread = eventlet.spawn(_consumer_thread) + return self.consumer_thread + + def create_consumer(self, topic, proxy, fanout=False): + """Create a consumer that calls a method in a proxy object.""" + proxy_cb = rpc_amqp.ProxyCallback( + self.conf, proxy, + rpc_amqp.get_connection_pool(self.conf, Connection)) + self.proxy_callbacks.append(proxy_cb) + + if fanout: + consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb) + else: + consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb) + + self._register_consumer(consumer) + + return consumer + + def create_worker(self, topic, proxy, pool_name): + """Create a worker that calls a method in a proxy object.""" + proxy_cb = rpc_amqp.ProxyCallback( + self.conf, proxy, + rpc_amqp.get_connection_pool(self.conf, Connection)) + self.proxy_callbacks.append(proxy_cb) + + consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb, + name=pool_name) + + self._register_consumer(consumer) + + return consumer + + def join_consumer_pool(self, callback, pool_name, topic, + exchange_name=None, ack_on_error=True): + """Register as a member of a group of consumers for a given topic from + the specified exchange. + + Exactly one member of a given pool will receive each message. + + A message will be delivered to multiple pools, if more than + one is created. + """ + callback_wrapper = rpc_amqp.CallbackWrapper( + conf=self.conf, + callback=callback, + connection_pool=rpc_amqp.get_connection_pool(self.conf, + Connection), + wait_for_consumers=not ack_on_error + ) + self.proxy_callbacks.append(callback_wrapper) + + consumer = TopicConsumer(conf=self.conf, + session=self.session, + topic=topic, + callback=callback_wrapper, + name=pool_name, + exchange_name=exchange_name) + + self._register_consumer(consumer) + return consumer + + +def create_connection(conf, new=True): + """Create a connection.""" + return rpc_amqp.create_connection( + conf, new, + rpc_amqp.get_connection_pool(conf, Connection)) + + +def multicall(conf, context, topic, msg, timeout=None): + """Make a call that returns multiple times.""" + return rpc_amqp.multicall( + conf, context, topic, msg, timeout, + rpc_amqp.get_connection_pool(conf, Connection)) + + +def call(conf, context, topic, msg, timeout=None): + """Sends a message on a topic and wait for a response.""" + return rpc_amqp.call( + conf, context, topic, msg, timeout, + rpc_amqp.get_connection_pool(conf, Connection)) + + +def cast(conf, context, topic, msg): + """Sends a message on a topic without waiting for a response.""" + return rpc_amqp.cast( + conf, context, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) + + +def fanout_cast(conf, context, topic, msg): + """Sends a message on a fanout exchange without waiting for a response.""" + return rpc_amqp.fanout_cast( + conf, context, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) + + +def cast_to_server(conf, context, server_params, topic, msg): + """Sends a message on a topic to a specific server.""" + return rpc_amqp.cast_to_server( + conf, context, server_params, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) + + +def fanout_cast_to_server(conf, context, server_params, topic, msg): + """Sends a message on a fanout exchange to a specific server.""" + return rpc_amqp.fanout_cast_to_server( + conf, context, server_params, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) + + +def notify(conf, context, topic, msg, envelope): + """Sends a notification event on a topic.""" + return rpc_amqp.notify(conf, context, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection), + envelope) + + +def cleanup(): + return rpc_amqp.cleanup(Connection.pool) diff --git a/oslo/log/openstack/common/rpc/impl_zmq.py b/oslo/log/openstack/common/rpc/impl_zmq.py new file mode 100644 index 0000000..e4962f5 --- /dev/null +++ b/oslo/log/openstack/common/rpc/impl_zmq.py @@ -0,0 +1,818 @@ +# Copyright 2011 Cloudscaling Group, Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +import pprint +import re +import socket +import sys +import types +import uuid + +import eventlet +import greenlet +from oslo.config import cfg +import six +from six import moves + +from oslo.log.openstack.common import excutils +from oslo.log.openstack.common.gettextutils import _, _LE, _LI +from oslo.log.openstack.common import importutils +from oslo.log.openstack.common import jsonutils +from oslo.log.openstack.common.rpc import common as rpc_common + +zmq = importutils.try_import('eventlet.green.zmq') + +# for convenience, are not modified. +pformat = pprint.pformat +Timeout = eventlet.timeout.Timeout +LOG = rpc_common.LOG +RemoteError = rpc_common.RemoteError +RPCException = rpc_common.RPCException + +zmq_opts = [ + cfg.StrOpt('rpc_zmq_bind_address', default='*', + help='ZeroMQ bind address. Should be a wildcard (*), ' + 'an ethernet interface, or IP. ' + 'The "host" option should point or resolve to this ' + 'address.'), + + # The module.Class to use for matchmaking. + cfg.StrOpt( + 'rpc_zmq_matchmaker', + default=('oslo.log.openstack.common.rpc.' + 'matchmaker.MatchMakerLocalhost'), + help='MatchMaker driver', + ), + + # The following port is unassigned by IANA as of 2012-05-21 + cfg.IntOpt('rpc_zmq_port', default=9501, + help='ZeroMQ receiver listening port'), + + cfg.IntOpt('rpc_zmq_contexts', default=1, + help='Number of ZeroMQ contexts, defaults to 1'), + + cfg.IntOpt('rpc_zmq_topic_backlog', + help='Maximum number of ingress messages to locally buffer ' + 'per topic. Default is unlimited.'), + + cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack', + help='Directory for holding IPC sockets'), + + cfg.StrOpt('rpc_zmq_host', default=socket.gethostname(), + help='Name of this node. Must be a valid hostname, FQDN, or ' + 'IP address. Must match "host" option, if running Nova.') +] + + +CONF = cfg.CONF +CONF.register_opts(zmq_opts) + +ZMQ_CTX = None # ZeroMQ Context, must be global. +matchmaker = None # memorized matchmaker object + + +def _serialize(data): + """Serialization wrapper. + + We prefer using JSON, but it cannot encode all types. + Error if a developer passes us bad data. + """ + try: + return jsonutils.dumps(data, ensure_ascii=True) + except TypeError: + with excutils.save_and_reraise_exception(): + LOG.error(_LE("JSON serialization failed.")) + + +def _deserialize(data): + """Deserialization wrapper.""" + LOG.debug("Deserializing: %s", data) + return jsonutils.loads(data) + + +class ZmqSocket(object): + """A tiny wrapper around ZeroMQ. + + Simplifies the send/recv protocol and connection management. + Can be used as a Context (supports the 'with' statement). + """ + + def __init__(self, addr, zmq_type, bind=True, subscribe=None): + self.sock = _get_ctxt().socket(zmq_type) + self.addr = addr + self.type = zmq_type + self.subscriptions = [] + + # Support failures on sending/receiving on wrong socket type. + self.can_recv = zmq_type in (zmq.PULL, zmq.SUB) + self.can_send = zmq_type in (zmq.PUSH, zmq.PUB) + self.can_sub = zmq_type in (zmq.SUB, ) + + # Support list, str, & None for subscribe arg (cast to list) + do_sub = { + list: subscribe, + str: [subscribe], + type(None): [] + }[type(subscribe)] + + for f in do_sub: + self.subscribe(f) + + str_data = {'addr': addr, 'type': self.socket_s(), + 'subscribe': subscribe, 'bind': bind} + + LOG.debug("Connecting to %(addr)s with %(type)s", str_data) + LOG.debug("-> Subscribed to %(subscribe)s", str_data) + LOG.debug("-> bind: %(bind)s", str_data) + + try: + if bind: + self.sock.bind(addr) + else: + self.sock.connect(addr) + except Exception: + raise RPCException(_("Could not open socket.")) + + def socket_s(self): + """Get socket type as string.""" + t_enum = ('PUSH', 'PULL', 'PUB', 'SUB', 'REP', 'REQ', 'ROUTER', + 'DEALER') + return dict(map(lambda t: (getattr(zmq, t), t), t_enum))[self.type] + + def subscribe(self, msg_filter): + """Subscribe.""" + if not self.can_sub: + raise RPCException("Cannot subscribe on this socket.") + LOG.debug("Subscribing to %s", msg_filter) + + try: + self.sock.setsockopt(zmq.SUBSCRIBE, msg_filter) + except Exception: + return + + self.subscriptions.append(msg_filter) + + def unsubscribe(self, msg_filter): + """Unsubscribe.""" + if msg_filter not in self.subscriptions: + return + self.sock.setsockopt(zmq.UNSUBSCRIBE, msg_filter) + self.subscriptions.remove(msg_filter) + + def close(self): + if self.sock is None or self.sock.closed: + return + + # We must unsubscribe, or we'll leak descriptors. + if self.subscriptions: + for f in self.subscriptions: + try: + self.sock.setsockopt(zmq.UNSUBSCRIBE, f) + except Exception: + pass + self.subscriptions = [] + + try: + # Default is to linger + self.sock.close() + except Exception: + # While this is a bad thing to happen, + # it would be much worse if some of the code calling this + # were to fail. For now, lets log, and later evaluate + # if we can safely raise here. + LOG.error(_LE("ZeroMQ socket could not be closed.")) + self.sock = None + + def recv(self, **kwargs): + if not self.can_recv: + raise RPCException(_("You cannot recv on this socket.")) + return self.sock.recv_multipart(**kwargs) + + def send(self, data, **kwargs): + if not self.can_send: + raise RPCException(_("You cannot send on this socket.")) + self.sock.send_multipart(data, **kwargs) + + +class ZmqClient(object): + """Client for ZMQ sockets.""" + + def __init__(self, addr): + self.outq = ZmqSocket(addr, zmq.PUSH, bind=False) + + def cast(self, msg_id, topic, data, envelope): + msg_id = msg_id or 0 + + if not envelope: + self.outq.send(map(bytes, + (msg_id, topic, 'cast', _serialize(data)))) + return + + rpc_envelope = rpc_common.serialize_msg(data[1], envelope) + zmq_msg = moves.reduce(lambda x, y: x + y, rpc_envelope.items()) + self.outq.send(map(bytes, + (msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg)) + + def close(self): + self.outq.close() + + +class RpcContext(rpc_common.CommonRpcContext): + """Context that supports replying to a rpc.call.""" + def __init__(self, **kwargs): + self.replies = [] + super(RpcContext, self).__init__(**kwargs) + + def deepcopy(self): + values = self.to_dict() + values['replies'] = self.replies + return self.__class__(**values) + + def reply(self, reply=None, failure=None, ending=False): + if ending: + return + self.replies.append(reply) + + @classmethod + def marshal(self, ctx): + ctx_data = ctx.to_dict() + return _serialize(ctx_data) + + @classmethod + def unmarshal(self, data): + return RpcContext.from_dict(_deserialize(data)) + + +class InternalContext(object): + """Used by ConsumerBase as a private context for - methods.""" + + def __init__(self, proxy): + self.proxy = proxy + self.msg_waiter = None + + def _get_response(self, ctx, proxy, topic, data): + """Process a curried message and cast the result to topic.""" + LOG.debug("Running func with context: %s", ctx.to_dict()) + data.setdefault('version', None) + data.setdefault('args', {}) + + try: + result = proxy.dispatch( + ctx, data['version'], data['method'], + data.get('namespace'), **data['args']) + return ConsumerBase.normalize_reply(result, ctx.replies) + except greenlet.GreenletExit: + # ignore these since they are just from shutdowns + pass + except rpc_common.ClientException as e: + LOG.debug("Expected exception during message handling (%s)" % + e._exc_info[1]) + return {'exc': + rpc_common.serialize_remote_exception(e._exc_info, + log_failure=False)} + except Exception: + LOG.error(_LE("Exception during message handling")) + return {'exc': + rpc_common.serialize_remote_exception(sys.exc_info())} + + def reply(self, ctx, proxy, + msg_id=None, context=None, topic=None, msg=None): + """Reply to a casted call.""" + # NOTE(ewindisch): context kwarg exists for Grizzly compat. + # this may be able to be removed earlier than + # 'I' if ConsumerBase.process were refactored. + if type(msg) is list: + payload = msg[-1] + else: + payload = msg + + response = ConsumerBase.normalize_reply( + self._get_response(ctx, proxy, topic, payload), + ctx.replies) + + LOG.debug("Sending reply") + _multi_send(_cast, ctx, topic, { + 'method': '-process_reply', + 'args': { + 'msg_id': msg_id, # Include for Folsom compat. + 'response': response + } + }, _msg_id=msg_id) + + +class ConsumerBase(object): + """Base Consumer.""" + + def __init__(self): + self.private_ctx = InternalContext(None) + + @classmethod + def normalize_reply(self, result, replies): + # TODO(ewindisch): re-evaluate and document this method. + if isinstance(result, types.GeneratorType): + return list(result) + elif replies: + return replies + else: + return [result] + + def process(self, proxy, ctx, data): + data.setdefault('version', None) + data.setdefault('args', {}) + + # Method starting with - are + # processed internally. (non-valid method name) + method = data.get('method') + if not method: + LOG.error(_LE("RPC message did not include method.")) + return + + # Internal method + # uses internal context for safety. + if method == '-reply': + self.private_ctx.reply(ctx, proxy, **data['args']) + return + + proxy.dispatch(ctx, data['version'], + data['method'], data.get('namespace'), **data['args']) + + +class ZmqBaseReactor(ConsumerBase): + """A consumer class implementing a centralized casting broker (PULL-PUSH). + + Used for RoundRobin requests. + """ + + def __init__(self, conf): + super(ZmqBaseReactor, self).__init__() + + self.proxies = {} + self.threads = [] + self.sockets = [] + self.subscribe = {} + + self.pool = eventlet.greenpool.GreenPool(conf.rpc_thread_pool_size) + + def register(self, proxy, in_addr, zmq_type_in, + in_bind=True, subscribe=None): + + LOG.info(_LI("Registering reactor")) + + if zmq_type_in not in (zmq.PULL, zmq.SUB): + raise RPCException("Bad input socktype") + + # Items push in. + inq = ZmqSocket(in_addr, zmq_type_in, bind=in_bind, + subscribe=subscribe) + + self.proxies[inq] = proxy + self.sockets.append(inq) + + LOG.info(_LI("In reactor registered")) + + def consume_in_thread(self): + @excutils.forever_retry_uncaught_exceptions + def _consume(sock): + LOG.info(_LI("Consuming socket")) + while True: + self.consume(sock) + + for k in self.proxies.keys(): + self.threads.append( + self.pool.spawn(_consume, k) + ) + + def wait(self): + for t in self.threads: + t.wait() + + def close(self): + for s in self.sockets: + s.close() + + for t in self.threads: + t.kill() + + +class ZmqProxy(ZmqBaseReactor): + """A consumer class implementing a topic-based proxy. + + Forwards to IPC sockets. + """ + + def __init__(self, conf): + super(ZmqProxy, self).__init__(conf) + pathsep = set((os.path.sep or '', os.path.altsep or '', '/', '\\')) + self.badchars = re.compile(r'[%s]' % re.escape(''.join(pathsep))) + + self.topic_proxy = {} + + def consume(self, sock): + ipc_dir = CONF.rpc_zmq_ipc_dir + + data = sock.recv(copy=False) + topic = data[1].bytes + + if topic.startswith('fanout~'): + sock_type = zmq.PUB + topic = topic.split('.', 1)[0] + elif topic.startswith('zmq_replies'): + sock_type = zmq.PUB + else: + sock_type = zmq.PUSH + + if topic not in self.topic_proxy: + def publisher(waiter): + LOG.info(_LI("Creating proxy for topic: %s"), topic) + + try: + # The topic is received over the network, + # don't trust this input. + if self.badchars.search(topic) is not None: + emsg = _("Topic contained dangerous characters.") + LOG.warn(emsg) + raise RPCException(emsg) + + out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" % + (ipc_dir, topic), + sock_type, bind=True) + except RPCException: + waiter.send_exception(*sys.exc_info()) + return + + self.topic_proxy[topic] = eventlet.queue.LightQueue( + CONF.rpc_zmq_topic_backlog) + self.sockets.append(out_sock) + + # It takes some time for a pub socket to open, + # before we can have any faith in doing a send() to it. + if sock_type == zmq.PUB: + eventlet.sleep(.5) + + waiter.send(True) + + while(True): + data = self.topic_proxy[topic].get() + out_sock.send(data, copy=False) + + wait_sock_creation = eventlet.event.Event() + eventlet.spawn(publisher, wait_sock_creation) + + try: + wait_sock_creation.wait() + except RPCException: + LOG.error(_LE("Topic socket file creation failed.")) + return + + try: + self.topic_proxy[topic].put_nowait(data) + except eventlet.queue.Full: + LOG.error(_LE("Local per-topic backlog buffer full for topic " + "%(topic)s. Dropping message.") % {'topic': topic}) + + def consume_in_thread(self): + """Runs the ZmqProxy service.""" + ipc_dir = CONF.rpc_zmq_ipc_dir + consume_in = "tcp://%s:%s" % \ + (CONF.rpc_zmq_bind_address, + CONF.rpc_zmq_port) + consumption_proxy = InternalContext(None) + + try: + os.makedirs(ipc_dir) + except os.error: + if not os.path.isdir(ipc_dir): + with excutils.save_and_reraise_exception(): + LOG.error(_LE("Required IPC directory does not exist at" + " %s") % (ipc_dir, )) + try: + self.register(consumption_proxy, + consume_in, + zmq.PULL) + except zmq.ZMQError: + if os.access(ipc_dir, os.X_OK): + with excutils.save_and_reraise_exception(): + LOG.error(_LE("Permission denied to IPC directory at" + " %s") % (ipc_dir, )) + with excutils.save_and_reraise_exception(): + LOG.error(_LE("Could not create ZeroMQ receiver daemon. " + "Socket may already be in use.")) + + super(ZmqProxy, self).consume_in_thread() + + +def unflatten_envelope(packenv): + """Unflattens the RPC envelope. + + Takes a list and returns a dictionary. + i.e. [1,2,3,4] => {1: 2, 3: 4} + """ + i = iter(packenv) + h = {} + try: + while True: + k = six.next(i) + h[k] = six.next(i) + except StopIteration: + return h + + +class ZmqReactor(ZmqBaseReactor): + """A consumer class implementing a consumer for messages. + + Can also be used as a 1:1 proxy + """ + + def __init__(self, conf): + super(ZmqReactor, self).__init__(conf) + + def consume(self, sock): + # TODO(ewindisch): use zero-copy (i.e. references, not copying) + data = sock.recv() + LOG.debug("CONSUMER RECEIVED DATA: %s", data) + + proxy = self.proxies[sock] + + if data[2] == 'cast': # Legacy protocol + packenv = data[3] + + ctx, msg = _deserialize(packenv) + request = rpc_common.deserialize_msg(msg) + ctx = RpcContext.unmarshal(ctx) + elif data[2] == 'impl_zmq_v2': + packenv = data[4:] + + msg = unflatten_envelope(packenv) + request = rpc_common.deserialize_msg(msg) + + # Unmarshal only after verifying the message. + ctx = RpcContext.unmarshal(data[3]) + else: + LOG.error(_LE("ZMQ Envelope version unsupported or unknown.")) + return + + self.pool.spawn_n(self.process, proxy, ctx, request) + + +class Connection(rpc_common.Connection): + """Manages connections and threads.""" + + def __init__(self, conf): + self.topics = [] + self.reactor = ZmqReactor(conf) + + def create_consumer(self, topic, proxy, fanout=False): + # Register with matchmaker. + _get_matchmaker().register(topic, CONF.rpc_zmq_host) + + # Subscription scenarios + if fanout: + sock_type = zmq.SUB + subscribe = ('', fanout)[type(fanout) == str] + topic = 'fanout~' + topic.split('.', 1)[0] + else: + sock_type = zmq.PULL + subscribe = None + topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host)) + + if topic in self.topics: + LOG.info(_LI("Skipping topic registration. Already registered.")) + return + + # Receive messages from (local) proxy + inaddr = "ipc://%s/zmq_topic_%s" % \ + (CONF.rpc_zmq_ipc_dir, topic) + + LOG.debug("Consumer is a zmq.%s", + ['PULL', 'SUB'][sock_type == zmq.SUB]) + + self.reactor.register(proxy, inaddr, sock_type, + subscribe=subscribe, in_bind=False) + self.topics.append(topic) + + def close(self): + _get_matchmaker().stop_heartbeat() + for topic in self.topics: + _get_matchmaker().unregister(topic, CONF.rpc_zmq_host) + + self.reactor.close() + self.topics = [] + + def wait(self): + self.reactor.wait() + + def consume_in_thread(self): + _get_matchmaker().start_heartbeat() + self.reactor.consume_in_thread() + + +def _cast(addr, context, topic, msg, timeout=None, envelope=False, + _msg_id=None): + timeout_cast = timeout or CONF.rpc_cast_timeout + payload = [RpcContext.marshal(context), msg] + + with Timeout(timeout_cast, exception=rpc_common.Timeout): + try: + conn = ZmqClient(addr) + + # assumes cast can't return an exception + conn.cast(_msg_id, topic, payload, envelope) + except zmq.ZMQError: + raise RPCException("Cast failed. ZMQ Socket Exception") + finally: + if 'conn' in vars(): + conn.close() + + +def _call(addr, context, topic, msg, timeout=None, + envelope=False): + # timeout_response is how long we wait for a response + timeout = timeout or CONF.rpc_response_timeout + + # The msg_id is used to track replies. + msg_id = uuid.uuid4().hex + + # Replies always come into the reply service. + reply_topic = "zmq_replies.%s" % CONF.rpc_zmq_host + + LOG.debug("Creating payload") + # Curry the original request into a reply method. + mcontext = RpcContext.marshal(context) + payload = { + 'method': '-reply', + 'args': { + 'msg_id': msg_id, + 'topic': reply_topic, + # TODO(ewindisch): safe to remove mcontext in I. + 'msg': [mcontext, msg] + } + } + + LOG.debug("Creating queue socket for reply waiter") + + # Messages arriving async. + # TODO(ewindisch): have reply consumer with dynamic subscription mgmt + with Timeout(timeout, exception=rpc_common.Timeout): + try: + msg_waiter = ZmqSocket( + "ipc://%s/zmq_topic_zmq_replies.%s" % + (CONF.rpc_zmq_ipc_dir, + CONF.rpc_zmq_host), + zmq.SUB, subscribe=msg_id, bind=False + ) + + LOG.debug("Sending cast") + _cast(addr, context, topic, payload, envelope) + + LOG.debug("Cast sent; Waiting reply") + # Blocks until receives reply + msg = msg_waiter.recv() + LOG.debug("Received message: %s", msg) + LOG.debug("Unpacking response") + + if msg[2] == 'cast': # Legacy version + raw_msg = _deserialize(msg[-1])[-1] + elif msg[2] == 'impl_zmq_v2': + rpc_envelope = unflatten_envelope(msg[4:]) + raw_msg = rpc_common.deserialize_msg(rpc_envelope) + else: + raise rpc_common.UnsupportedRpcEnvelopeVersion( + _("Unsupported or unknown ZMQ envelope returned.")) + + responses = raw_msg['args']['response'] + # ZMQError trumps the Timeout error. + except zmq.ZMQError: + raise RPCException("ZMQ Socket Error") + except (IndexError, KeyError): + raise RPCException(_("RPC Message Invalid.")) + finally: + if 'msg_waiter' in vars(): + msg_waiter.close() + + # It seems we don't need to do all of the following, + # but perhaps it would be useful for multicall? + # One effect of this is that we're checking all + # responses for Exceptions. + for resp in responses: + if isinstance(resp, types.DictType) and 'exc' in resp: + raise rpc_common.deserialize_remote_exception(CONF, resp['exc']) + + return responses[-1] + + +def _multi_send(method, context, topic, msg, timeout=None, + envelope=False, _msg_id=None): + """Wraps the sending of messages. + + Dispatches to the matchmaker and sends message to all relevant hosts. + """ + conf = CONF + LOG.debug("%(msg)s" % {'msg': ' '.join(map(pformat, (topic, msg)))}) + + queues = _get_matchmaker().queues(topic) + LOG.debug("Sending message(s) to: %s", queues) + + # Don't stack if we have no matchmaker results + if not queues: + LOG.warn(_("No matchmaker results. Not casting.")) + # While not strictly a timeout, callers know how to handle + # this exception and a timeout isn't too big a lie. + raise rpc_common.Timeout(_("No match from matchmaker.")) + + # This supports brokerless fanout (addresses > 1) + for queue in queues: + (_topic, ip_addr) = queue + _addr = "tcp://%s:%s" % (ip_addr, conf.rpc_zmq_port) + + if method.__name__ == '_cast': + eventlet.spawn_n(method, _addr, context, + _topic, msg, timeout, envelope, + _msg_id) + return + return method(_addr, context, _topic, msg, timeout, + envelope) + + +def create_connection(conf, new=True): + return Connection(conf) + + +def multicall(conf, *args, **kwargs): + """Multiple calls.""" + return _multi_send(_call, *args, **kwargs) + + +def call(conf, *args, **kwargs): + """Send a message, expect a response.""" + data = _multi_send(_call, *args, **kwargs) + return data[-1] + + +def cast(conf, *args, **kwargs): + """Send a message expecting no reply.""" + _multi_send(_cast, *args, **kwargs) + + +def fanout_cast(conf, context, topic, msg, **kwargs): + """Send a message to all listening and expect no reply.""" + # NOTE(ewindisch): fanout~ is used because it avoid splitting on . + # and acts as a non-subtle hint to the matchmaker and ZmqProxy. + _multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs) + + +def notify(conf, context, topic, msg, envelope): + """Send notification event. + + Notifications are sent to topic-priority. + This differs from the AMQP drivers which send to topic.priority. + """ + # NOTE(ewindisch): dot-priority in rpc notifier does not + # work with our assumptions. + topic = topic.replace('.', '-') + cast(conf, context, topic, msg, envelope=envelope) + + +def cleanup(): + """Clean up resources in use by implementation.""" + global ZMQ_CTX + if ZMQ_CTX: + ZMQ_CTX.term() + ZMQ_CTX = None + + global matchmaker + matchmaker = None + + +def _get_ctxt(): + if not zmq: + raise ImportError("Failed to import eventlet.green.zmq") + + global ZMQ_CTX + if not ZMQ_CTX: + ZMQ_CTX = zmq.Context(CONF.rpc_zmq_contexts) + return ZMQ_CTX + + +def _get_matchmaker(*args, **kwargs): + global matchmaker + if not matchmaker: + mm = CONF.rpc_zmq_matchmaker + if mm.endswith('matchmaker.MatchMakerRing'): + mm.replace('matchmaker', 'matchmaker_ring') + LOG.warn(_('rpc_zmq_matchmaker = %(orig)s is deprecated; use' + ' %(new)s instead') % dict( + orig=CONF.rpc_zmq_matchmaker, new=mm)) + matchmaker = importutils.import_object(mm, *args, **kwargs) + return matchmaker diff --git a/oslo/log/openstack/common/rpc/matchmaker.py b/oslo/log/openstack/common/rpc/matchmaker.py new file mode 100644 index 0000000..f77c437 --- /dev/null +++ b/oslo/log/openstack/common/rpc/matchmaker.py @@ -0,0 +1,323 @@ +# Copyright 2011 Cloudscaling Group, Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +The MatchMaker classes should except a Topic or Fanout exchange key and +return keys for direct exchanges, per (approximate) AMQP parlance. +""" + +import contextlib + +import eventlet +from oslo.config import cfg + +from oslo.log.openstack.common.gettextutils import _, _LI +from oslo.log.openstack.common import log as logging + + +matchmaker_opts = [ + cfg.IntOpt('matchmaker_heartbeat_freq', + default=300, + help='Heartbeat frequency'), + cfg.IntOpt('matchmaker_heartbeat_ttl', + default=600, + help='Heartbeat time-to-live.'), +] + +CONF = cfg.CONF +CONF.register_opts(matchmaker_opts) +LOG = logging.getLogger(__name__) +contextmanager = contextlib.contextmanager + + +class MatchMakerException(Exception): + """Signified a match could not be found.""" + message = _("Match not found by MatchMaker.") + + +class Exchange(object): + """Implements lookups. + + Subclass this to support hashtables, dns, etc. + """ + def __init__(self): + pass + + def run(self, key): + raise NotImplementedError() + + +class Binding(object): + """A binding on which to perform a lookup.""" + def __init__(self): + pass + + def test(self, key): + raise NotImplementedError() + + +class MatchMakerBase(object): + """Match Maker Base Class. + + Build off HeartbeatMatchMakerBase if building a heartbeat-capable + MatchMaker. + """ + def __init__(self): + # Array of tuples. Index [2] toggles negation, [3] is last-if-true + self.bindings = [] + + self.no_heartbeat_msg = _('Matchmaker does not implement ' + 'registration or heartbeat.') + + def register(self, key, host): + """Register a host on a backend. + + Heartbeats, if applicable, may keepalive registration. + """ + pass + + def ack_alive(self, key, host): + """Acknowledge that a key.host is alive. + + Used internally for updating heartbeats, but may also be used + publicly to acknowledge a system is alive (i.e. rpc message + successfully sent to host) + """ + pass + + def is_alive(self, topic, host): + """Checks if a host is alive.""" + pass + + def expire(self, topic, host): + """Explicitly expire a host's registration.""" + pass + + def send_heartbeats(self): + """Send all heartbeats. + + Use start_heartbeat to spawn a heartbeat greenthread, + which loops this method. + """ + pass + + def unregister(self, key, host): + """Unregister a topic.""" + pass + + def start_heartbeat(self): + """Spawn heartbeat greenthread.""" + pass + + def stop_heartbeat(self): + """Destroys the heartbeat greenthread.""" + pass + + def add_binding(self, binding, rule, last=True): + self.bindings.append((binding, rule, False, last)) + + # NOTE(ewindisch): kept the following method in case we implement the + # underlying support. + # def add_negate_binding(self, binding, rule, last=True): + # self.bindings.append((binding, rule, True, last)) + + def queues(self, key): + workers = [] + + # bit is for negate bindings - if we choose to implement it. + # last stops processing rules if this matches. + for (binding, exchange, bit, last) in self.bindings: + if binding.test(key): + workers.extend(exchange.run(key)) + + # Support last. + if last: + return workers + return workers + + +class HeartbeatMatchMakerBase(MatchMakerBase): + """Base for a heart-beat capable MatchMaker. + + Provides common methods for registering, unregistering, and maintaining + heartbeats. + """ + def __init__(self): + self.hosts = set() + self._heart = None + self.host_topic = {} + + super(HeartbeatMatchMakerBase, self).__init__() + + def send_heartbeats(self): + """Send all heartbeats. + + Use start_heartbeat to spawn a heartbeat greenthread, + which loops this method. + """ + for key, host in self.host_topic: + self.ack_alive(key, host) + + def ack_alive(self, key, host): + """Acknowledge that a host.topic is alive. + + Used internally for updating heartbeats, but may also be used + publicly to acknowledge a system is alive (i.e. rpc message + successfully sent to host) + """ + raise NotImplementedError("Must implement ack_alive") + + def backend_register(self, key, host): + """Implements registration logic. + + Called by register(self,key,host) + """ + raise NotImplementedError("Must implement backend_register") + + def backend_unregister(self, key, key_host): + """Implements de-registration logic. + + Called by unregister(self,key,host) + """ + raise NotImplementedError("Must implement backend_unregister") + + def register(self, key, host): + """Register a host on a backend. + + Heartbeats, if applicable, may keepalive registration. + """ + self.hosts.add(host) + self.host_topic[(key, host)] = host + key_host = '.'.join((key, host)) + + self.backend_register(key, key_host) + + self.ack_alive(key, host) + + def unregister(self, key, host): + """Unregister a topic.""" + if (key, host) in self.host_topic: + del self.host_topic[(key, host)] + + self.hosts.discard(host) + self.backend_unregister(key, '.'.join((key, host))) + + LOG.info(_LI("Matchmaker unregistered: %(key)s, %(host)s"), + {'key': key, 'host': host}) + + def start_heartbeat(self): + """Implementation of MatchMakerBase.start_heartbeat. + + Launches greenthread looping send_heartbeats(), + yielding for CONF.matchmaker_heartbeat_freq seconds + between iterations. + """ + if not self.hosts: + raise MatchMakerException( + _("Register before starting heartbeat.")) + + def do_heartbeat(): + while True: + self.send_heartbeats() + eventlet.sleep(CONF.matchmaker_heartbeat_freq) + + self._heart = eventlet.spawn(do_heartbeat) + + def stop_heartbeat(self): + """Destroys the heartbeat greenthread.""" + if self._heart: + self._heart.kill() + + +class DirectBinding(Binding): + """Specifies a host in the key via a '.' character. + + Although dots are used in the key, the behavior here is + that it maps directly to a host, thus direct. + """ + def test(self, key): + return '.' in key + + +class TopicBinding(Binding): + """Where a 'bare' key without dots. + + AMQP generally considers topic exchanges to be those *with* dots, + but we deviate here in terminology as the behavior here matches + that of a topic exchange (whereas where there are dots, behavior + matches that of a direct exchange. + """ + def test(self, key): + return '.' not in key + + +class FanoutBinding(Binding): + """Match on fanout keys, where key starts with 'fanout.' string.""" + def test(self, key): + return key.startswith('fanout~') + + +class StubExchange(Exchange): + """Exchange that does nothing.""" + def run(self, key): + return [(key, None)] + + +class LocalhostExchange(Exchange): + """Exchange where all direct topics are local.""" + def __init__(self, host='localhost'): + self.host = host + super(Exchange, self).__init__() + + def run(self, key): + return [('.'.join((key.split('.')[0], self.host)), self.host)] + + +class DirectExchange(Exchange): + """Exchange where all topic keys are split, sending to second half. + + i.e. "compute.host" sends a message to "compute.host" running on "host" + """ + def __init__(self): + super(Exchange, self).__init__() + + def run(self, key): + e = key.split('.', 1)[1] + return [(key, e)] + + +class MatchMakerLocalhost(MatchMakerBase): + """Match Maker where all bare topics resolve to localhost. + + Useful for testing. + """ + def __init__(self, host='localhost'): + super(MatchMakerLocalhost, self).__init__() + self.add_binding(FanoutBinding(), LocalhostExchange(host)) + self.add_binding(DirectBinding(), DirectExchange()) + self.add_binding(TopicBinding(), LocalhostExchange(host)) + + +class MatchMakerStub(MatchMakerBase): + """Match Maker where topics are untouched. + + Useful for testing, or for AMQP/brokered queues. + Will not work where knowledge of hosts is known (i.e. zeromq) + """ + def __init__(self): + super(MatchMakerStub, self).__init__() + + self.add_binding(FanoutBinding(), StubExchange()) + self.add_binding(DirectBinding(), StubExchange()) + self.add_binding(TopicBinding(), StubExchange()) diff --git a/oslo/log/openstack/common/rpc/matchmaker_redis.py b/oslo/log/openstack/common/rpc/matchmaker_redis.py new file mode 100644 index 0000000..071de30 --- /dev/null +++ b/oslo/log/openstack/common/rpc/matchmaker_redis.py @@ -0,0 +1,143 @@ +# Copyright 2013 Cloudscaling Group, Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +The MatchMaker classes should accept a Topic or Fanout exchange key and +return keys for direct exchanges, per (approximate) AMQP parlance. +""" + +from oslo.config import cfg + +from oslo.log.openstack.common import importutils +from oslo.log.openstack.common import log as logging +from oslo.log.openstack.common.rpc import matchmaker as mm_common + +redis = importutils.try_import('redis') + + +matchmaker_redis_opts = [ + cfg.StrOpt('host', + default='127.0.0.1', + help='Host to locate redis'), + cfg.IntOpt('port', + default=6379, + help='Use this port to connect to redis host.'), + cfg.StrOpt('password', + help='Password for Redis server. (optional)'), +] + +CONF = cfg.CONF +opt_group = cfg.OptGroup(name='matchmaker_redis', + title='Options for Redis-based MatchMaker') +CONF.register_group(opt_group) +CONF.register_opts(matchmaker_redis_opts, opt_group) +LOG = logging.getLogger(__name__) + + +class RedisExchange(mm_common.Exchange): + def __init__(self, matchmaker): + self.matchmaker = matchmaker + self.redis = matchmaker.redis + super(RedisExchange, self).__init__() + + +class RedisTopicExchange(RedisExchange): + """Exchange where all topic keys are split, sending to second half. + + i.e. "compute.host" sends a message to "compute" running on "host" + """ + def run(self, topic): + while True: + member_name = self.redis.srandmember(topic) + + if not member_name: + # If this happens, there are no + # longer any members. + break + + if not self.matchmaker.is_alive(topic, member_name): + continue + + host = member_name.split('.', 1)[1] + return [(member_name, host)] + return [] + + +class RedisFanoutExchange(RedisExchange): + """Return a list of all hosts.""" + def run(self, topic): + topic = topic.split('~', 1)[1] + hosts = self.redis.smembers(topic) + good_hosts = filter( + lambda host: self.matchmaker.is_alive(topic, host), hosts) + + return [(x, x.split('.', 1)[1]) for x in good_hosts] + + +class MatchMakerRedis(mm_common.HeartbeatMatchMakerBase): + """MatchMaker registering and looking-up hosts with a Redis server.""" + def __init__(self): + super(MatchMakerRedis, self).__init__() + + if not redis: + raise ImportError("Failed to import module redis.") + + self.redis = redis.Redis( + host=CONF.matchmaker_redis.host, + port=CONF.matchmaker_redis.port, + password=CONF.matchmaker_redis.password) + + self.add_binding(mm_common.FanoutBinding(), RedisFanoutExchange(self)) + self.add_binding(mm_common.DirectBinding(), mm_common.DirectExchange()) + self.add_binding(mm_common.TopicBinding(), RedisTopicExchange(self)) + + def ack_alive(self, key, host): + topic = "%s.%s" % (key, host) + if not self.redis.expire(topic, CONF.matchmaker_heartbeat_ttl): + # If we could not update the expiration, the key + # might have been pruned. Re-register, creating a new + # key in Redis. + self.register(self.topic_host[host], host) + + def is_alive(self, topic, host): + if self.redis.ttl(host) == -1: + self.expire(topic, host) + return False + return True + + def expire(self, topic, host): + with self.redis.pipeline() as pipe: + pipe.multi() + pipe.delete(host) + pipe.srem(topic, host) + pipe.execute() + + def backend_register(self, key, key_host): + with self.redis.pipeline() as pipe: + pipe.multi() + pipe.sadd(key, key_host) + + # No value is needed, we just + # care if it exists. Sets aren't viable + # because only keys can expire. + pipe.set(key_host, '') + + pipe.execute() + + def backend_unregister(self, key, key_host): + with self.redis.pipeline() as pipe: + pipe.multi() + pipe.srem(key, key_host) + pipe.delete(key_host) + pipe.execute() diff --git a/oslo/log/openstack/common/rpc/matchmaker_ring.py b/oslo/log/openstack/common/rpc/matchmaker_ring.py new file mode 100644 index 0000000..e060998 --- /dev/null +++ b/oslo/log/openstack/common/rpc/matchmaker_ring.py @@ -0,0 +1,106 @@ +# Copyright 2011-2013 Cloudscaling Group, Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +The MatchMaker classes should except a Topic or Fanout exchange key and +return keys for direct exchanges, per (approximate) AMQP parlance. +""" + +import itertools +import json + +from oslo.config import cfg + +from oslo.log.openstack.common.gettextutils import _LW +from oslo.log.openstack.common import log as logging +from oslo.log.openstack.common.rpc import matchmaker as mm + + +matchmaker_opts = [ + # Matchmaker ring file + cfg.StrOpt('ringfile', + deprecated_name='matchmaker_ringfile', + deprecated_group='DEFAULT', + default='/etc/oslo/matchmaker_ring.json', + help='Matchmaker ring file (JSON)'), +] + +CONF = cfg.CONF +CONF.register_opts(matchmaker_opts, 'matchmaker_ring') +LOG = logging.getLogger(__name__) + + +class RingExchange(mm.Exchange): + """Match Maker where hosts are loaded from a static JSON formatted file. + + __init__ takes optional ring dictionary argument, otherwise + loads the ringfile from CONF.mathcmaker_ringfile. + """ + def __init__(self, ring=None): + super(RingExchange, self).__init__() + + if ring: + self.ring = ring + else: + with open(CONF.matchmaker_ring.ringfile, 'r') as fh: + self.ring = json.load(fh) + + self.ring0 = {} + for k in self.ring.keys(): + self.ring0[k] = itertools.cycle(self.ring[k]) + + def _ring_has(self, key): + return key in self.ring0 + + +class RoundRobinRingExchange(RingExchange): + """A Topic Exchange based on a hashmap.""" + def __init__(self, ring=None): + super(RoundRobinRingExchange, self).__init__(ring) + + def run(self, key): + if not self._ring_has(key): + LOG.warn( + _LW("No key defining hosts for topic '%s', " + "see ringfile") % (key, ) + ) + return [] + host = next(self.ring0[key]) + return [(key + '.' + host, host)] + + +class FanoutRingExchange(RingExchange): + """Fanout Exchange based on a hashmap.""" + def __init__(self, ring=None): + super(FanoutRingExchange, self).__init__(ring) + + def run(self, key): + # Assume starts with "fanout~", strip it for lookup. + nkey = key.split('fanout~')[1:][0] + if not self._ring_has(nkey): + LOG.warn( + _LW("No key defining hosts for topic '%s', " + "see ringfile") % (nkey, ) + ) + return [] + return map(lambda x: (key + '.' + x, x), self.ring[nkey]) + + +class MatchMakerRing(mm.MatchMakerBase): + """Match Maker where hosts are loaded from a static hashmap.""" + def __init__(self, ring=None): + super(MatchMakerRing, self).__init__() + self.add_binding(mm.FanoutBinding(), FanoutRingExchange(ring)) + self.add_binding(mm.DirectBinding(), mm.DirectExchange()) + self.add_binding(mm.TopicBinding(), RoundRobinRingExchange(ring)) diff --git a/oslo/log/openstack/common/rpc/proxy.py b/oslo/log/openstack/common/rpc/proxy.py new file mode 100644 index 0000000..9ab61c1 --- /dev/null +++ b/oslo/log/openstack/common/rpc/proxy.py @@ -0,0 +1,225 @@ +# Copyright 2012-2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +A helper class for proxy objects to remote APIs. + +For more information about rpc API version numbers, see: + rpc/dispatcher.py +""" + +import six + +from oslo.log.openstack.common import rpc +from oslo.log.openstack.common.rpc import common as rpc_common +from oslo.log.openstack.common.rpc import serializer as rpc_serializer + + +class RpcProxy(object): + """A helper class for rpc clients. + + This class is a wrapper around the RPC client API. It allows you to + specify the topic and API version in a single place. This is intended to + be used as a base class for a class that implements the client side of an + rpc API. + """ + + # The default namespace, which can be overridden in a subclass. + RPC_API_NAMESPACE = None + + def __init__(self, topic, default_version, version_cap=None, + serializer=None): + """Initialize an RpcProxy. + + :param topic: The topic to use for all messages. + :param default_version: The default API version to request in all + outgoing messages. This can be overridden on a per-message + basis. + :param version_cap: Optionally cap the maximum version used for sent + messages. + :param serializer: Optionally (de-)serialize entities with a + provided helper. + """ + self.topic = topic + self.default_version = default_version + self.version_cap = version_cap + if serializer is None: + serializer = rpc_serializer.NoOpSerializer() + self.serializer = serializer + super(RpcProxy, self).__init__() + + def _set_version(self, msg, vers): + """Helper method to set the version in a message. + + :param msg: The message having a version added to it. + :param vers: The version number to add to the message. + """ + v = vers if vers else self.default_version + if (self.version_cap and not + rpc_common.version_is_compatible(self.version_cap, v)): + raise rpc_common.RpcVersionCapError(version_cap=self.version_cap) + msg['version'] = v + + def _get_topic(self, topic): + """Return the topic to use for a message.""" + return topic if topic else self.topic + + def can_send_version(self, version): + """Check to see if a version is compatible with the version cap.""" + return (not self.version_cap or + rpc_common.version_is_compatible(self.version_cap, version)) + + @staticmethod + def make_namespaced_msg(method, namespace, **kwargs): + return {'method': method, 'namespace': namespace, 'args': kwargs} + + def make_msg(self, method, **kwargs): + return self.make_namespaced_msg(method, self.RPC_API_NAMESPACE, + **kwargs) + + def _serialize_msg_args(self, context, kwargs): + """Helper method called to serialize message arguments. + + This calls our serializer on each argument, returning a new + set of args that have been serialized. + + :param context: The request context + :param kwargs: The arguments to serialize + :returns: A new set of serialized arguments + """ + new_kwargs = dict() + for argname, arg in six.iteritems(kwargs): + new_kwargs[argname] = self.serializer.serialize_entity(context, + arg) + return new_kwargs + + def call(self, context, msg, topic=None, version=None, timeout=None): + """rpc.call() a remote method. + + :param context: The request context + :param msg: The message to send, including the method and args. + :param topic: Override the topic for this message. + :param version: (Optional) Override the requested API version in this + message. + :param timeout: (Optional) A timeout to use when waiting for the + response. If no timeout is specified, a default timeout will be + used that is usually sufficient. + + :returns: The return value from the remote method. + """ + self._set_version(msg, version) + msg['args'] = self._serialize_msg_args(context, msg['args']) + real_topic = self._get_topic(topic) + try: + result = rpc.call(context, real_topic, msg, timeout) + return self.serializer.deserialize_entity(context, result) + except rpc.common.Timeout as exc: + raise rpc.common.Timeout( + exc.info, real_topic, msg.get('method')) + + def multicall(self, context, msg, topic=None, version=None, timeout=None): + """rpc.multicall() a remote method. + + :param context: The request context + :param msg: The message to send, including the method and args. + :param topic: Override the topic for this message. + :param version: (Optional) Override the requested API version in this + message. + :param timeout: (Optional) A timeout to use when waiting for the + response. If no timeout is specified, a default timeout will be + used that is usually sufficient. + + :returns: An iterator that lets you process each of the returned values + from the remote method as they arrive. + """ + self._set_version(msg, version) + msg['args'] = self._serialize_msg_args(context, msg['args']) + real_topic = self._get_topic(topic) + try: + result = rpc.multicall(context, real_topic, msg, timeout) + return self.serializer.deserialize_entity(context, result) + except rpc.common.Timeout as exc: + raise rpc.common.Timeout( + exc.info, real_topic, msg.get('method')) + + def cast(self, context, msg, topic=None, version=None): + """rpc.cast() a remote method. + + :param context: The request context + :param msg: The message to send, including the method and args. + :param topic: Override the topic for this message. + :param version: (Optional) Override the requested API version in this + message. + + :returns: None. rpc.cast() does not wait on any return value from the + remote method. + """ + self._set_version(msg, version) + msg['args'] = self._serialize_msg_args(context, msg['args']) + rpc.cast(context, self._get_topic(topic), msg) + + def fanout_cast(self, context, msg, topic=None, version=None): + """rpc.fanout_cast() a remote method. + + :param context: The request context + :param msg: The message to send, including the method and args. + :param topic: Override the topic for this message. + :param version: (Optional) Override the requested API version in this + message. + + :returns: None. rpc.fanout_cast() does not wait on any return value + from the remote method. + """ + self._set_version(msg, version) + msg['args'] = self._serialize_msg_args(context, msg['args']) + rpc.fanout_cast(context, self._get_topic(topic), msg) + + def cast_to_server(self, context, server_params, msg, topic=None, + version=None): + """rpc.cast_to_server() a remote method. + + :param context: The request context + :param server_params: Server parameters. See rpc.cast_to_server() for + details. + :param msg: The message to send, including the method and args. + :param topic: Override the topic for this message. + :param version: (Optional) Override the requested API version in this + message. + + :returns: None. rpc.cast_to_server() does not wait on any + return values. + """ + self._set_version(msg, version) + msg['args'] = self._serialize_msg_args(context, msg['args']) + rpc.cast_to_server(context, server_params, self._get_topic(topic), msg) + + def fanout_cast_to_server(self, context, server_params, msg, topic=None, + version=None): + """rpc.fanout_cast_to_server() a remote method. + + :param context: The request context + :param server_params: Server parameters. See rpc.cast_to_server() for + details. + :param msg: The message to send, including the method and args. + :param topic: Override the topic for this message. + :param version: (Optional) Override the requested API version in this + message. + + :returns: None. rpc.fanout_cast_to_server() does not wait on any + return values. + """ + self._set_version(msg, version) + msg['args'] = self._serialize_msg_args(context, msg['args']) + rpc.fanout_cast_to_server(context, server_params, + self._get_topic(topic), msg) diff --git a/oslo/log/openstack/common/rpc/serializer.py b/oslo/log/openstack/common/rpc/serializer.py new file mode 100644 index 0000000..9bc6e2a --- /dev/null +++ b/oslo/log/openstack/common/rpc/serializer.py @@ -0,0 +1,54 @@ +# Copyright 2013 IBM Corp. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Provides the definition of an RPC serialization handler""" + +import abc + +import six + + +@six.add_metaclass(abc.ABCMeta) +class Serializer(object): + """Generic (de-)serialization definition base class.""" + + @abc.abstractmethod + def serialize_entity(self, context, entity): + """Serialize something to primitive form. + + :param context: Security context + :param entity: Entity to be serialized + :returns: Serialized form of entity + """ + pass + + @abc.abstractmethod + def deserialize_entity(self, context, entity): + """Deserialize something from primitive form. + + :param context: Security context + :param entity: Primitive to be deserialized + :returns: Deserialized form of entity + """ + pass + + +class NoOpSerializer(Serializer): + """A serializer that does nothing.""" + + def serialize_entity(self, context, entity): + return entity + + def deserialize_entity(self, context, entity): + return entity diff --git a/oslo/log/openstack/common/rpc/service.py b/oslo/log/openstack/common/rpc/service.py new file mode 100644 index 0000000..637694f --- /dev/null +++ b/oslo/log/openstack/common/rpc/service.py @@ -0,0 +1,75 @@ +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# Copyright 2011 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo.log.openstack.common import log as logging +from oslo.log.openstack.common import rpc +from oslo.log.openstack.common.rpc import dispatcher as rpc_dispatcher +from oslo.log.openstack.common import service + + +LOG = logging.getLogger(__name__) + + +class Service(service.Service): + """Service object for binaries running on hosts. + + A service enables rpc by listening to queues based on topic and host. + """ + def __init__(self, host, topic, manager=None, serializer=None): + super(Service, self).__init__() + self.host = host + self.topic = topic + self.serializer = serializer + if manager is None: + self.manager = self + else: + self.manager = manager + + def start(self): + super(Service, self).start() + + self.conn = rpc.create_connection(new=True) + LOG.debug("Creating Consumer connection for Service %s" % + self.topic) + + dispatcher = rpc_dispatcher.RpcDispatcher([self.manager], + self.serializer) + + # Share this same connection for these Consumers + self.conn.create_consumer(self.topic, dispatcher, fanout=False) + + node_topic = '%s.%s' % (self.topic, self.host) + self.conn.create_consumer(node_topic, dispatcher, fanout=False) + + self.conn.create_consumer(self.topic, dispatcher, fanout=True) + + # Hook to allow the manager to do other initializations after + # the rpc connection is created. + if callable(getattr(self.manager, 'initialize_service_hook', None)): + self.manager.initialize_service_hook(self) + + # Consume from all consumers in a thread + self.conn.consume_in_thread() + + def stop(self): + # Try to shut the connection down, but if we get any sort of + # errors, go ahead and ignore them.. as we're shutting down anyway + try: + self.conn.close() + except Exception: + pass + super(Service, self).stop() diff --git a/oslo/log/openstack/common/rpc/zmq_receiver.py b/oslo/log/openstack/common/rpc/zmq_receiver.py new file mode 100644 index 0000000..558f261 --- /dev/null +++ b/oslo/log/openstack/common/rpc/zmq_receiver.py @@ -0,0 +1,38 @@ +# Copyright 2011 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import eventlet +eventlet.monkey_patch() + +import contextlib +import sys + +from oslo.config import cfg + +from oslo.log.openstack.common import log as logging +from oslo.log.openstack.common import rpc +from oslo.log.openstack.common.rpc import impl_zmq + +CONF = cfg.CONF +CONF.register_opts(rpc.rpc_opts) +CONF.register_opts(impl_zmq.zmq_opts) + + +def main(): + CONF(sys.argv[1:], project='oslo') + logging.setup("oslo") + + with contextlib.closing(impl_zmq.ZmqProxy(CONF)) as reactor: + reactor.consume_in_thread() + reactor.wait() diff --git a/oslo/log/openstack/common/service.py b/oslo/log/openstack/common/service.py new file mode 100644 index 0000000..29544d2 --- /dev/null +++ b/oslo/log/openstack/common/service.py @@ -0,0 +1,512 @@ +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# Copyright 2011 Justin Santa Barbara +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Generic Node base class for all workers that run on hosts.""" + +import errno +import logging as std_logging +import os +import random +import signal +import sys +import time + +try: + # Importing just the symbol here because the io module does not + # exist in Python 2.6. + from io import UnsupportedOperation # noqa +except ImportError: + # Python 2.6 + UnsupportedOperation = None + +import eventlet +from eventlet import event +from oslo.config import cfg + +from oslo.log.openstack.common import eventlet_backdoor +from oslo.log.openstack.common.gettextutils import _LE, _LI, _LW +from oslo.log.openstack.common import importutils +from oslo.log.openstack.common import log as logging +from oslo.log.openstack.common import systemd +from oslo.log.openstack.common import threadgroup + + +rpc = importutils.try_import('oslo.log.openstack.common.rpc') +CONF = cfg.CONF +LOG = logging.getLogger(__name__) + + +def _sighup_supported(): + return hasattr(signal, 'SIGHUP') + + +def _is_daemon(): + # The process group for a foreground process will match the + # process group of the controlling terminal. If those values do + # not match, or ioctl() fails on the stdout file handle, we assume + # the process is running in the background as a daemon. + # http://www.gnu.org/software/bash/manual/bashref.html#Job-Control-Basics + try: + is_daemon = os.getpgrp() != os.tcgetpgrp(sys.stdout.fileno()) + except OSError as err: + if err.errno == errno.ENOTTY: + # Assume we are a daemon because there is no terminal. + is_daemon = True + else: + raise + except UnsupportedOperation: + # Could not get the fileno for stdout, so we must be a daemon. + is_daemon = True + return is_daemon + + +def _is_sighup_and_daemon(signo): + if not (_sighup_supported() and signo == signal.SIGHUP): + # Avoid checking if we are a daemon, because the signal isn't + # SIGHUP. + return False + return _is_daemon() + + +def _signo_to_signame(signo): + signals = {signal.SIGTERM: 'SIGTERM', + signal.SIGINT: 'SIGINT'} + if _sighup_supported(): + signals[signal.SIGHUP] = 'SIGHUP' + return signals[signo] + + +def _set_signals_handler(handler): + signal.signal(signal.SIGTERM, handler) + signal.signal(signal.SIGINT, handler) + if _sighup_supported(): + signal.signal(signal.SIGHUP, handler) + + +class Launcher(object): + """Launch one or more services and wait for them to complete.""" + + def __init__(self): + """Initialize the service launcher. + + :returns: None + + """ + self.services = Services() + self.backdoor_port = eventlet_backdoor.initialize_if_enabled() + + def launch_service(self, service): + """Load and start the given service. + + :param service: The service you would like to start. + :returns: None + + """ + service.backdoor_port = self.backdoor_port + self.services.add(service) + + def stop(self): + """Stop all services which are currently running. + + :returns: None + + """ + self.services.stop() + + def wait(self): + """Waits until all services have been stopped, and then returns. + + :returns: None + + """ + self.services.wait() + + def restart(self): + """Reload config files and restart service. + + :returns: None + + """ + cfg.CONF.reload_config_files() + self.services.restart() + + +class SignalExit(SystemExit): + def __init__(self, signo, exccode=1): + super(SignalExit, self).__init__(exccode) + self.signo = signo + + +class ServiceLauncher(Launcher): + def _handle_signal(self, signo, frame): + # Allow the process to be killed again and die from natural causes + _set_signals_handler(signal.SIG_DFL) + raise SignalExit(signo) + + def handle_signal(self): + _set_signals_handler(self._handle_signal) + + def _wait_for_exit_or_signal(self, ready_callback=None): + status = None + signo = 0 + + LOG.debug('Full set of CONF:') + CONF.log_opt_values(LOG, std_logging.DEBUG) + + try: + if ready_callback: + ready_callback() + super(ServiceLauncher, self).wait() + except SignalExit as exc: + signame = _signo_to_signame(exc.signo) + LOG.info(_LI('Caught %s, exiting'), signame) + status = exc.code + signo = exc.signo + except SystemExit as exc: + status = exc.code + finally: + self.stop() + if rpc: + try: + rpc.cleanup() + except Exception: + # We're shutting down, so it doesn't matter at this point. + LOG.exception(_LE('Exception during rpc cleanup.')) + + return status, signo + + def wait(self, ready_callback=None): + systemd.notify_once() + while True: + self.handle_signal() + status, signo = self._wait_for_exit_or_signal(ready_callback) + if not _is_sighup_and_daemon(signo): + return status + self.restart() + + +class ServiceWrapper(object): + def __init__(self, service, workers): + self.service = service + self.workers = workers + self.children = set() + self.forktimes = [] + + +class ProcessLauncher(object): + def __init__(self, wait_interval=0.01): + """Constructor. + + :param wait_interval: The interval to sleep for between checks + of child process exit. + """ + self.children = {} + self.sigcaught = None + self.running = True + self.wait_interval = wait_interval + rfd, self.writepipe = os.pipe() + self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r') + self.handle_signal() + + def handle_signal(self): + _set_signals_handler(self._handle_signal) + + def _handle_signal(self, signo, frame): + self.sigcaught = signo + self.running = False + + # Allow the process to be killed again and die from natural causes + _set_signals_handler(signal.SIG_DFL) + + def _pipe_watcher(self): + # This will block until the write end is closed when the parent + # dies unexpectedly + self.readpipe.read() + + LOG.info(_LI('Parent process has died unexpectedly, exiting')) + + sys.exit(1) + + def _child_process_handle_signal(self): + # Setup child signal handlers differently + def _sigterm(*args): + signal.signal(signal.SIGTERM, signal.SIG_DFL) + raise SignalExit(signal.SIGTERM) + + def _sighup(*args): + signal.signal(signal.SIGHUP, signal.SIG_DFL) + raise SignalExit(signal.SIGHUP) + + signal.signal(signal.SIGTERM, _sigterm) + if _sighup_supported(): + signal.signal(signal.SIGHUP, _sighup) + # Block SIGINT and let the parent send us a SIGTERM + signal.signal(signal.SIGINT, signal.SIG_IGN) + + def _child_wait_for_exit_or_signal(self, launcher): + status = 0 + signo = 0 + + # NOTE(johannes): All exceptions are caught to ensure this + # doesn't fallback into the loop spawning children. It would + # be bad for a child to spawn more children. + try: + launcher.wait() + except SignalExit as exc: + signame = _signo_to_signame(exc.signo) + LOG.info(_LI('Child caught %s, exiting'), signame) + status = exc.code + signo = exc.signo + except SystemExit as exc: + status = exc.code + except BaseException: + LOG.exception(_LE('Unhandled exception')) + status = 2 + finally: + launcher.stop() + + return status, signo + + def _child_process(self, service): + self._child_process_handle_signal() + + # Reopen the eventlet hub to make sure we don't share an epoll + # fd with parent and/or siblings, which would be bad + eventlet.hubs.use_hub() + + # Close write to ensure only parent has it open + os.close(self.writepipe) + # Create greenthread to watch for parent to close pipe + eventlet.spawn_n(self._pipe_watcher) + + # Reseed random number generator + random.seed() + + launcher = Launcher() + launcher.launch_service(service) + return launcher + + def _start_child(self, wrap): + if len(wrap.forktimes) > wrap.workers: + # Limit ourselves to one process a second (over the period of + # number of workers * 1 second). This will allow workers to + # start up quickly but ensure we don't fork off children that + # die instantly too quickly. + if time.time() - wrap.forktimes[0] < wrap.workers: + LOG.info(_LI('Forking too fast, sleeping')) + time.sleep(1) + + wrap.forktimes.pop(0) + + wrap.forktimes.append(time.time()) + + pid = os.fork() + if pid == 0: + launcher = self._child_process(wrap.service) + while True: + self._child_process_handle_signal() + status, signo = self._child_wait_for_exit_or_signal(launcher) + if not _is_sighup_and_daemon(signo): + break + launcher.restart() + + os._exit(status) + + LOG.info(_LI('Started child %d'), pid) + + wrap.children.add(pid) + self.children[pid] = wrap + + return pid + + def launch_service(self, service, workers=1): + wrap = ServiceWrapper(service, workers) + + LOG.info(_LI('Starting %d workers'), wrap.workers) + while self.running and len(wrap.children) < wrap.workers: + self._start_child(wrap) + + def _wait_child(self): + try: + # Don't block if no child processes have exited + pid, status = os.waitpid(0, os.WNOHANG) + if not pid: + return None + except OSError as exc: + if exc.errno not in (errno.EINTR, errno.ECHILD): + raise + return None + + if os.WIFSIGNALED(status): + sig = os.WTERMSIG(status) + LOG.info(_LI('Child %(pid)d killed by signal %(sig)d'), + dict(pid=pid, sig=sig)) + else: + code = os.WEXITSTATUS(status) + LOG.info(_LI('Child %(pid)s exited with status %(code)d'), + dict(pid=pid, code=code)) + + if pid not in self.children: + LOG.warning(_LW('pid %d not in child list'), pid) + return None + + wrap = self.children.pop(pid) + wrap.children.remove(pid) + return wrap + + def _respawn_children(self): + while self.running: + wrap = self._wait_child() + if not wrap: + # Yield to other threads if no children have exited + # Sleep for a short time to avoid excessive CPU usage + # (see bug #1095346) + eventlet.greenthread.sleep(self.wait_interval) + continue + while self.running and len(wrap.children) < wrap.workers: + self._start_child(wrap) + + def wait(self): + """Loop waiting on children to die and respawning as necessary.""" + + systemd.notify_once() + LOG.debug('Full set of CONF:') + CONF.log_opt_values(LOG, std_logging.DEBUG) + + try: + while True: + self.handle_signal() + self._respawn_children() + # No signal means that stop was called. Don't clean up here. + if not self.sigcaught: + return + + signame = _signo_to_signame(self.sigcaught) + LOG.info(_LI('Caught %s, stopping children'), signame) + if not _is_sighup_and_daemon(self.sigcaught): + break + + for pid in self.children: + os.kill(pid, signal.SIGHUP) + self.running = True + self.sigcaught = None + except eventlet.greenlet.GreenletExit: + LOG.info(_LI("Wait called after thread killed. Cleaning up.")) + + self.stop() + + def stop(self): + """Terminate child processes and wait on each.""" + self.running = False + for pid in self.children: + try: + os.kill(pid, signal.SIGTERM) + except OSError as exc: + if exc.errno != errno.ESRCH: + raise + + # Wait for children to die + if self.children: + LOG.info(_LI('Waiting on %d children to exit'), len(self.children)) + while self.children: + self._wait_child() + + +class Service(object): + """Service object for binaries running on hosts.""" + + def __init__(self, threads=1000): + self.tg = threadgroup.ThreadGroup(threads) + + # signal that the service is done shutting itself down: + self._done = event.Event() + + def reset(self): + # NOTE(Fengqian): docs for Event.reset() recommend against using it + self._done = event.Event() + + def start(self): + pass + + def stop(self): + self.tg.stop() + self.tg.wait() + # Signal that service cleanup is done: + if not self._done.ready(): + self._done.send() + + def wait(self): + self._done.wait() + + +class Services(object): + + def __init__(self): + self.services = [] + self.tg = threadgroup.ThreadGroup() + self.done = event.Event() + + def add(self, service): + self.services.append(service) + self.tg.add_thread(self.run_service, service, self.done) + + def stop(self): + # wait for graceful shutdown of services: + for service in self.services: + service.stop() + service.wait() + + # Each service has performed cleanup, now signal that the run_service + # wrapper threads can now die: + if not self.done.ready(): + self.done.send() + + # reap threads: + self.tg.stop() + + def wait(self): + self.tg.wait() + + def restart(self): + self.stop() + self.done = event.Event() + for restart_service in self.services: + restart_service.reset() + self.tg.add_thread(self.run_service, restart_service, self.done) + + @staticmethod + def run_service(service, done): + """Service start wrapper. + + :param service: service to run + :param done: event to wait on until a shutdown is triggered + :returns: None + + """ + service.start() + done.wait() + + +def launch(service, workers=1): + if workers is None or workers == 1: + launcher = ServiceLauncher() + launcher.launch_service(service) + else: + launcher = ProcessLauncher() + launcher.launch_service(service, workers=workers) + + return launcher diff --git a/oslo/log/openstack/common/sslutils.py b/oslo/log/openstack/common/sslutils.py new file mode 100644 index 0000000..82e1d3e --- /dev/null +++ b/oslo/log/openstack/common/sslutils.py @@ -0,0 +1,95 @@ +# Copyright 2013 IBM Corp. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +import ssl + +from oslo.config import cfg + +from oslo.log.openstack.common.gettextutils import _ + + +ssl_opts = [ + cfg.StrOpt('ca_file', + help="CA certificate file to use to verify " + "connecting clients."), + cfg.StrOpt('cert_file', + help="Certificate file to use when starting " + "the server securely."), + cfg.StrOpt('key_file', + help="Private key file to use when starting " + "the server securely."), +] + + +CONF = cfg.CONF +CONF.register_opts(ssl_opts, "ssl") + + +def is_enabled(): + cert_file = CONF.ssl.cert_file + key_file = CONF.ssl.key_file + ca_file = CONF.ssl.ca_file + use_ssl = cert_file or key_file + + if cert_file and not os.path.exists(cert_file): + raise RuntimeError(_("Unable to find cert_file : %s") % cert_file) + + if ca_file and not os.path.exists(ca_file): + raise RuntimeError(_("Unable to find ca_file : %s") % ca_file) + + if key_file and not os.path.exists(key_file): + raise RuntimeError(_("Unable to find key_file : %s") % key_file) + + if use_ssl and (not cert_file or not key_file): + raise RuntimeError(_("When running server in SSL mode, you must " + "specify both a cert_file and key_file " + "option value in your configuration file")) + + return use_ssl + + +def wrap(sock): + ssl_kwargs = { + 'server_side': True, + 'certfile': CONF.ssl.cert_file, + 'keyfile': CONF.ssl.key_file, + 'cert_reqs': ssl.CERT_NONE, + } + + if CONF.ssl.ca_file: + ssl_kwargs['ca_certs'] = CONF.ssl.ca_file + ssl_kwargs['cert_reqs'] = ssl.CERT_REQUIRED + + return ssl.wrap_socket(sock, **ssl_kwargs) + + +_SSL_PROTOCOLS = { + "tlsv1": ssl.PROTOCOL_TLSv1, + "sslv23": ssl.PROTOCOL_SSLv23, + "sslv3": ssl.PROTOCOL_SSLv3 +} + +try: + _SSL_PROTOCOLS["sslv2"] = ssl.PROTOCOL_SSLv2 +except AttributeError: + pass + + +def validate_ssl_version(version): + key = version.lower() + try: + return _SSL_PROTOCOLS[key] + except KeyError: + raise RuntimeError(_("Invalid SSL version : %s") % version) diff --git a/oslo/log/openstack/common/strutils.py b/oslo/log/openstack/common/strutils.py new file mode 100644 index 0000000..4f1bdeb --- /dev/null +++ b/oslo/log/openstack/common/strutils.py @@ -0,0 +1,311 @@ +# Copyright 2011 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +System-level utilities and helper functions. +""" + +import math +import re +import sys +import unicodedata + +import six + +from oslo.log.openstack.common.gettextutils import _ + + +UNIT_PREFIX_EXPONENT = { + 'k': 1, + 'K': 1, + 'Ki': 1, + 'M': 2, + 'Mi': 2, + 'G': 3, + 'Gi': 3, + 'T': 4, + 'Ti': 4, +} +UNIT_SYSTEM_INFO = { + 'IEC': (1024, re.compile(r'(^[-+]?\d*\.?\d+)([KMGT]i?)?(b|bit|B)$')), + 'SI': (1000, re.compile(r'(^[-+]?\d*\.?\d+)([kMGT])?(b|bit|B)$')), +} + +TRUE_STRINGS = ('1', 't', 'true', 'on', 'y', 'yes') +FALSE_STRINGS = ('0', 'f', 'false', 'off', 'n', 'no') + +SLUGIFY_STRIP_RE = re.compile(r"[^\w\s-]") +SLUGIFY_HYPHENATE_RE = re.compile(r"[-\s]+") + + +# NOTE(flaper87): The following globals are used by `mask_password` +_SANITIZE_KEYS = ['adminPass', 'admin_pass', 'password', 'admin_password'] + +# NOTE(ldbragst): Let's build a list of regex objects using the list of +# _SANITIZE_KEYS we already have. This way, we only have to add the new key +# to the list of _SANITIZE_KEYS and we can generate regular expressions +# for XML and JSON automatically. +_SANITIZE_PATTERNS_2 = [] +_SANITIZE_PATTERNS_1 = [] + +# NOTE(amrith): Some regular expressions have only one parameter, some +# have two parameters. Use different lists of patterns here. +_FORMAT_PATTERNS_1 = [r'(%(key)s\s*[=]\s*)[^\s^\'^\"]+'] +_FORMAT_PATTERNS_2 = [r'(%(key)s\s*[=]\s*[\"\']).*?([\"\'])', + r'(%(key)s\s+[\"\']).*?([\"\'])', + r'([-]{2}%(key)s\s+)[^\'^\"^=^\s]+([\s]*)', + r'(<%(key)s>).*?(</%(key)s>)', + r'([\"\']%(key)s[\"\']\s*:\s*[\"\']).*?([\"\'])', + r'([\'"].*?%(key)s[\'"]\s*:\s*u?[\'"]).*?([\'"])', + r'([\'"].*?%(key)s[\'"]\s*,\s*\'--?[A-z]+\'\s*,\s*u?' + '[\'"]).*?([\'"])', + r'(%(key)s\s*--?[A-z]+\s*)\S+(\s*)'] + +for key in _SANITIZE_KEYS: + for pattern in _FORMAT_PATTERNS_2: + reg_ex = re.compile(pattern % {'key': key}, re.DOTALL) + _SANITIZE_PATTERNS_2.append(reg_ex) + + for pattern in _FORMAT_PATTERNS_1: + reg_ex = re.compile(pattern % {'key': key}, re.DOTALL) + _SANITIZE_PATTERNS_1.append(reg_ex) + + +def int_from_bool_as_string(subject): + """Interpret a string as a boolean and return either 1 or 0. + + Any string value in: + + ('True', 'true', 'On', 'on', '1') + + is interpreted as a boolean True. + + Useful for JSON-decoded stuff and config file parsing + """ + return bool_from_string(subject) and 1 or 0 + + +def bool_from_string(subject, strict=False, default=False): + """Interpret a string as a boolean. + + A case-insensitive match is performed such that strings matching 't', + 'true', 'on', 'y', 'yes', or '1' are considered True and, when + `strict=False`, anything else returns the value specified by 'default'. + + Useful for JSON-decoded stuff and config file parsing. + + If `strict=True`, unrecognized values, including None, will raise a + ValueError which is useful when parsing values passed in from an API call. + Strings yielding False are 'f', 'false', 'off', 'n', 'no', or '0'. + """ + if not isinstance(subject, six.string_types): + subject = six.text_type(subject) + + lowered = subject.strip().lower() + + if lowered in TRUE_STRINGS: + return True + elif lowered in FALSE_STRINGS: + return False + elif strict: + acceptable = ', '.join( + "'%s'" % s for s in sorted(TRUE_STRINGS + FALSE_STRINGS)) + msg = _("Unrecognized value '%(val)s', acceptable values are:" + " %(acceptable)s") % {'val': subject, + 'acceptable': acceptable} + raise ValueError(msg) + else: + return default + + +def safe_decode(text, incoming=None, errors='strict'): + """Decodes incoming text/bytes string using `incoming` if they're not + already unicode. + + :param incoming: Text's current encoding + :param errors: Errors handling policy. See here for valid + values http://docs.python.org/2/library/codecs.html + :returns: text or a unicode `incoming` encoded + representation of it. + :raises TypeError: If text is not an instance of str + """ + if not isinstance(text, (six.string_types, six.binary_type)): + raise TypeError("%s can't be decoded" % type(text)) + + if isinstance(text, six.text_type): + return text + + if not incoming: + incoming = (sys.stdin.encoding or + sys.getdefaultencoding()) + + try: + return text.decode(incoming, errors) + except UnicodeDecodeError: + # Note(flaper87) If we get here, it means that + # sys.stdin.encoding / sys.getdefaultencoding + # didn't return a suitable encoding to decode + # text. This happens mostly when global LANG + # var is not set correctly and there's no + # default encoding. In this case, most likely + # python will use ASCII or ANSI encoders as + # default encodings but they won't be capable + # of decoding non-ASCII characters. + # + # Also, UTF-8 is being used since it's an ASCII + # extension. + return text.decode('utf-8', errors) + + +def safe_encode(text, incoming=None, + encoding='utf-8', errors='strict'): + """Encodes incoming text/bytes string using `encoding`. + + If incoming is not specified, text is expected to be encoded with + current python's default encoding. (`sys.getdefaultencoding`) + + :param incoming: Text's current encoding + :param encoding: Expected encoding for text (Default UTF-8) + :param errors: Errors handling policy. See here for valid + values http://docs.python.org/2/library/codecs.html + :returns: text or a bytestring `encoding` encoded + representation of it. + :raises TypeError: If text is not an instance of str + """ + if not isinstance(text, (six.string_types, six.binary_type)): + raise TypeError("%s can't be encoded" % type(text)) + + if not incoming: + incoming = (sys.stdin.encoding or + sys.getdefaultencoding()) + + if isinstance(text, six.text_type): + return text.encode(encoding, errors) + elif text and encoding != incoming: + # Decode text before encoding it with `encoding` + text = safe_decode(text, incoming, errors) + return text.encode(encoding, errors) + else: + return text + + +def string_to_bytes(text, unit_system='IEC', return_int=False): + """Converts a string into an float representation of bytes. + + The units supported for IEC :: + + Kb(it), Kib(it), Mb(it), Mib(it), Gb(it), Gib(it), Tb(it), Tib(it) + KB, KiB, MB, MiB, GB, GiB, TB, TiB + + The units supported for SI :: + + kb(it), Mb(it), Gb(it), Tb(it) + kB, MB, GB, TB + + Note that the SI unit system does not support capital letter 'K' + + :param text: String input for bytes size conversion. + :param unit_system: Unit system for byte size conversion. + :param return_int: If True, returns integer representation of text + in bytes. (default: decimal) + :returns: Numerical representation of text in bytes. + :raises ValueError: If text has an invalid value. + + """ + try: + base, reg_ex = UNIT_SYSTEM_INFO[unit_system] + except KeyError: + msg = _('Invalid unit system: "%s"') % unit_system + raise ValueError(msg) + match = reg_ex.match(text) + if match: + magnitude = float(match.group(1)) + unit_prefix = match.group(2) + if match.group(3) in ['b', 'bit']: + magnitude /= 8 + else: + msg = _('Invalid string format: %s') % text + raise ValueError(msg) + if not unit_prefix: + res = magnitude + else: + res = magnitude * pow(base, UNIT_PREFIX_EXPONENT[unit_prefix]) + if return_int: + return int(math.ceil(res)) + return res + + +def to_slug(value, incoming=None, errors="strict"): + """Normalize string. + + Convert to lowercase, remove non-word characters, and convert spaces + to hyphens. + + Inspired by Django's `slugify` filter. + + :param value: Text to slugify + :param incoming: Text's current encoding + :param errors: Errors handling policy. See here for valid + values http://docs.python.org/2/library/codecs.html + :returns: slugified unicode representation of `value` + :raises TypeError: If text is not an instance of str + """ + value = safe_decode(value, incoming, errors) + # NOTE(aababilov): no need to use safe_(encode|decode) here: + # encodings are always "ascii", error handling is always "ignore" + # and types are always known (first: unicode; second: str) + value = unicodedata.normalize("NFKD", value).encode( + "ascii", "ignore").decode("ascii") + value = SLUGIFY_STRIP_RE.sub("", value).strip().lower() + return SLUGIFY_HYPHENATE_RE.sub("-", value) + + +def mask_password(message, secret="***"): + """Replace password with 'secret' in message. + + :param message: The string which includes security information. + :param secret: value with which to replace passwords. + :returns: The unicode value of message with the password fields masked. + + For example: + + >>> mask_password("'adminPass' : 'aaaaa'") + "'adminPass' : '***'" + >>> mask_password("'admin_pass' : 'aaaaa'") + "'admin_pass' : '***'" + >>> mask_password('"password" : "aaaaa"') + '"password" : "***"' + >>> mask_password("'original_password' : 'aaaaa'") + "'original_password' : '***'" + >>> mask_password("u'original_password' : u'aaaaa'") + "u'original_password' : u'***'" + """ + message = six.text_type(message) + + # NOTE(ldbragst): Check to see if anything in message contains any key + # specified in _SANITIZE_KEYS, if not then just return the message since + # we don't have to mask any passwords. + if not any(key in message for key in _SANITIZE_KEYS): + return message + + substitute = r'\g<1>' + secret + r'\g<2>' + for pattern in _SANITIZE_PATTERNS_2: + message = re.sub(pattern, substitute, message) + + substitute = r'\g<1>' + secret + for pattern in _SANITIZE_PATTERNS_1: + message = re.sub(pattern, substitute, message) + + return message diff --git a/oslo/log/openstack/common/systemd.py b/oslo/log/openstack/common/systemd.py new file mode 100644 index 0000000..b23cfc0 --- /dev/null +++ b/oslo/log/openstack/common/systemd.py @@ -0,0 +1,106 @@ +# Copyright 2012-2014 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Helper module for systemd service readiness notification. +""" + +import os +import socket +import sys + +from oslo.log.openstack.common import log as logging + + +LOG = logging.getLogger(__name__) + + +def _abstractify(socket_name): + if socket_name.startswith('@'): + # abstract namespace socket + socket_name = '\0%s' % socket_name[1:] + return socket_name + + +def _sd_notify(unset_env, msg): + notify_socket = os.getenv('NOTIFY_SOCKET') + if notify_socket: + sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + try: + sock.connect(_abstractify(notify_socket)) + sock.sendall(msg) + if unset_env: + del os.environ['NOTIFY_SOCKET'] + except EnvironmentError: + LOG.debug("Systemd notification failed", exc_info=True) + finally: + sock.close() + + +def notify(): + """Send notification to Systemd that service is ready. + + For details see + http://www.freedesktop.org/software/systemd/man/sd_notify.html + """ + _sd_notify(False, 'READY=1') + + +def notify_once(): + """Send notification once to Systemd that service is ready. + + Systemd sets NOTIFY_SOCKET environment variable with the name of the + socket listening for notifications from services. + This method removes the NOTIFY_SOCKET environment variable to ensure + notification is sent only once. + """ + _sd_notify(True, 'READY=1') + + +def onready(notify_socket, timeout): + """Wait for systemd style notification on the socket. + + :param notify_socket: local socket address + :type notify_socket: string + :param timeout: socket timeout + :type timeout: float + :returns: 0 service ready + 1 service not ready + 2 timeout occurred + """ + sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + sock.settimeout(timeout) + sock.bind(_abstractify(notify_socket)) + try: + msg = sock.recv(512) + except socket.timeout: + return 2 + finally: + sock.close() + if 'READY=1' in msg: + return 0 + else: + return 1 + + +if __name__ == '__main__': + # simple CLI for testing + if len(sys.argv) == 1: + notify() + elif len(sys.argv) >= 2: + timeout = float(sys.argv[1]) + notify_socket = os.getenv('NOTIFY_SOCKET') + if notify_socket: + retval = onready(notify_socket, timeout) + sys.exit(retval) diff --git a/oslo/log/openstack/common/threadgroup.py b/oslo/log/openstack/common/threadgroup.py new file mode 100644 index 0000000..624e532 --- /dev/null +++ b/oslo/log/openstack/common/threadgroup.py @@ -0,0 +1,147 @@ +# Copyright 2012 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import threading + +import eventlet +from eventlet import greenpool + +from oslo.log.openstack.common import log as logging +from oslo.log.openstack.common import loopingcall + + +LOG = logging.getLogger(__name__) + + +def _thread_done(gt, *args, **kwargs): + """Callback function to be passed to GreenThread.link() when we spawn() + Calls the :class:`ThreadGroup` to notify if. + + """ + kwargs['group'].thread_done(kwargs['thread']) + + +class Thread(object): + """Wrapper around a greenthread, that holds a reference to the + :class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when + it has done so it can be removed from the threads list. + """ + def __init__(self, thread, group): + self.thread = thread + self.thread.link(_thread_done, group=group, thread=self) + + def stop(self): + self.thread.kill() + + def wait(self): + return self.thread.wait() + + def link(self, func, *args, **kwargs): + self.thread.link(func, *args, **kwargs) + + +class ThreadGroup(object): + """The point of the ThreadGroup class is to: + + * keep track of timers and greenthreads (making it easier to stop them + when need be). + * provide an easy API to add timers. + """ + def __init__(self, thread_pool_size=10): + self.pool = greenpool.GreenPool(thread_pool_size) + self.threads = [] + self.timers = [] + + def add_dynamic_timer(self, callback, initial_delay=None, + periodic_interval_max=None, *args, **kwargs): + timer = loopingcall.DynamicLoopingCall(callback, *args, **kwargs) + timer.start(initial_delay=initial_delay, + periodic_interval_max=periodic_interval_max) + self.timers.append(timer) + + def add_timer(self, interval, callback, initial_delay=None, + *args, **kwargs): + pulse = loopingcall.FixedIntervalLoopingCall(callback, *args, **kwargs) + pulse.start(interval=interval, + initial_delay=initial_delay) + self.timers.append(pulse) + + def add_thread(self, callback, *args, **kwargs): + gt = self.pool.spawn(callback, *args, **kwargs) + th = Thread(gt, self) + self.threads.append(th) + return th + + def thread_done(self, thread): + self.threads.remove(thread) + + def _stop_threads(self): + current = threading.current_thread() + + # Iterate over a copy of self.threads so thread_done doesn't + # modify the list while we're iterating + for x in self.threads[:]: + if x is current: + # don't kill the current thread. + continue + try: + x.stop() + except Exception as ex: + LOG.exception(ex) + + def stop_timers(self): + for x in self.timers: + try: + x.stop() + except Exception as ex: + LOG.exception(ex) + self.timers = [] + + def stop(self, graceful=False): + """stop function has the option of graceful=True/False. + + * In case of graceful=True, wait for all threads to be finished. + Never kill threads. + * In case of graceful=False, kill threads immediately. + """ + self.stop_timers() + if graceful: + # In case of graceful=True, wait for all threads to be + # finished, never kill threads + self.wait() + else: + # In case of graceful=False(Default), kill threads + # immediately + self._stop_threads() + + def wait(self): + for x in self.timers: + try: + x.wait() + except eventlet.greenlet.GreenletExit: + pass + except Exception as ex: + LOG.exception(ex) + current = threading.current_thread() + + # Iterate over a copy of self.threads so thread_done doesn't + # modify the list while we're iterating + for x in self.threads[:]: + if x is current: + continue + try: + x.wait() + except eventlet.greenlet.GreenletExit: + pass + except Exception as ex: + LOG.exception(ex) diff --git a/oslo/log/openstack/common/timeutils.py b/oslo/log/openstack/common/timeutils.py new file mode 100644 index 0000000..c48da95 --- /dev/null +++ b/oslo/log/openstack/common/timeutils.py @@ -0,0 +1,210 @@ +# Copyright 2011 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Time related utilities and helper functions. +""" + +import calendar +import datetime +import time + +import iso8601 +import six + + +# ISO 8601 extended time format with microseconds +_ISO8601_TIME_FORMAT_SUBSECOND = '%Y-%m-%dT%H:%M:%S.%f' +_ISO8601_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S' +PERFECT_TIME_FORMAT = _ISO8601_TIME_FORMAT_SUBSECOND + + +def isotime(at=None, subsecond=False): + """Stringify time in ISO 8601 format.""" + if not at: + at = utcnow() + st = at.strftime(_ISO8601_TIME_FORMAT + if not subsecond + else _ISO8601_TIME_FORMAT_SUBSECOND) + tz = at.tzinfo.tzname(None) if at.tzinfo else 'UTC' + st += ('Z' if tz == 'UTC' else tz) + return st + + +def parse_isotime(timestr): + """Parse time from ISO 8601 format.""" + try: + return iso8601.parse_date(timestr) + except iso8601.ParseError as e: + raise ValueError(six.text_type(e)) + except TypeError as e: + raise ValueError(six.text_type(e)) + + +def strtime(at=None, fmt=PERFECT_TIME_FORMAT): + """Returns formatted utcnow.""" + if not at: + at = utcnow() + return at.strftime(fmt) + + +def parse_strtime(timestr, fmt=PERFECT_TIME_FORMAT): + """Turn a formatted time back into a datetime.""" + return datetime.datetime.strptime(timestr, fmt) + + +def normalize_time(timestamp): + """Normalize time in arbitrary timezone to UTC naive object.""" + offset = timestamp.utcoffset() + if offset is None: + return timestamp + return timestamp.replace(tzinfo=None) - offset + + +def is_older_than(before, seconds): + """Return True if before is older than seconds.""" + if isinstance(before, six.string_types): + before = parse_strtime(before).replace(tzinfo=None) + else: + before = before.replace(tzinfo=None) + + return utcnow() - before > datetime.timedelta(seconds=seconds) + + +def is_newer_than(after, seconds): + """Return True if after is newer than seconds.""" + if isinstance(after, six.string_types): + after = parse_strtime(after).replace(tzinfo=None) + else: + after = after.replace(tzinfo=None) + + return after - utcnow() > datetime.timedelta(seconds=seconds) + + +def utcnow_ts(): + """Timestamp version of our utcnow function.""" + if utcnow.override_time is None: + # NOTE(kgriffs): This is several times faster + # than going through calendar.timegm(...) + return int(time.time()) + + return calendar.timegm(utcnow().timetuple()) + + +def utcnow(): + """Overridable version of utils.utcnow.""" + if utcnow.override_time: + try: + return utcnow.override_time.pop(0) + except AttributeError: + return utcnow.override_time + return datetime.datetime.utcnow() + + +def iso8601_from_timestamp(timestamp): + """Returns an iso8601 formatted date from timestamp.""" + return isotime(datetime.datetime.utcfromtimestamp(timestamp)) + + +utcnow.override_time = None + + +def set_time_override(override_time=None): + """Overrides utils.utcnow. + + Make it return a constant time or a list thereof, one at a time. + + :param override_time: datetime instance or list thereof. If not + given, defaults to the current UTC time. + """ + utcnow.override_time = override_time or datetime.datetime.utcnow() + + +def advance_time_delta(timedelta): + """Advance overridden time using a datetime.timedelta.""" + assert utcnow.override_time is not None + try: + for dt in utcnow.override_time: + dt += timedelta + except TypeError: + utcnow.override_time += timedelta + + +def advance_time_seconds(seconds): + """Advance overridden time by seconds.""" + advance_time_delta(datetime.timedelta(0, seconds)) + + +def clear_time_override(): + """Remove the overridden time.""" + utcnow.override_time = None + + +def marshall_now(now=None): + """Make an rpc-safe datetime with microseconds. + + Note: tzinfo is stripped, but not required for relative times. + """ + if not now: + now = utcnow() + return dict(day=now.day, month=now.month, year=now.year, hour=now.hour, + minute=now.minute, second=now.second, + microsecond=now.microsecond) + + +def unmarshall_time(tyme): + """Unmarshall a datetime dict.""" + return datetime.datetime(day=tyme['day'], + month=tyme['month'], + year=tyme['year'], + hour=tyme['hour'], + minute=tyme['minute'], + second=tyme['second'], + microsecond=tyme['microsecond']) + + +def delta_seconds(before, after): + """Return the difference between two timing objects. + + Compute the difference in seconds between two date, time, or + datetime objects (as a float, to microsecond resolution). + """ + delta = after - before + return total_seconds(delta) + + +def total_seconds(delta): + """Return the total seconds of datetime.timedelta object. + + Compute total seconds of datetime.timedelta, datetime.timedelta + doesn't have method total_seconds in Python2.6, calculate it manually. + """ + try: + return delta.total_seconds() + except AttributeError: + return ((delta.days * 24 * 3600) + delta.seconds + + float(delta.microseconds) / (10 ** 6)) + + +def is_soon(dt, window): + """Determines if time is going to happen in the next window seconds. + + :param dt: the time + :param window: minimum seconds to remain to consider the time not soon + + :return: True if expiration is within the given duration + """ + soon = (utcnow() + datetime.timedelta(seconds=window)) + return normalize_time(dt) <= soon diff --git a/oslo/log/openstack/common/versionutils.py b/oslo/log/openstack/common/versionutils.py new file mode 100644 index 0000000..e0b39b8 --- /dev/null +++ b/oslo/log/openstack/common/versionutils.py @@ -0,0 +1,203 @@ +# Copyright (c) 2013 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Helpers for comparing version strings. +""" + +import functools +import inspect + +import pkg_resources +import six + +from oslo.log.openstack.common.gettextutils import _ +from oslo.log.openstack.common import log as logging + + +LOG = logging.getLogger(__name__) + + +class deprecated(object): + """A decorator to mark callables as deprecated. + + This decorator logs a deprecation message when the callable it decorates is + used. The message will include the release where the callable was + deprecated, the release where it may be removed and possibly an optional + replacement. + + Examples: + + 1. Specifying the required deprecated release + + >>> @deprecated(as_of=deprecated.ICEHOUSE) + ... def a(): pass + + 2. Specifying a replacement: + + >>> @deprecated(as_of=deprecated.ICEHOUSE, in_favor_of='f()') + ... def b(): pass + + 3. Specifying the release where the functionality may be removed: + + >>> @deprecated(as_of=deprecated.ICEHOUSE, remove_in=+1) + ... def c(): pass + + 4. Specifying the deprecated functionality will not be removed: + >>> @deprecated(as_of=deprecated.ICEHOUSE, remove_in=0) + ... def d(): pass + + 5. Specifying a replacement, deprecated functionality will not be removed: + >>> @deprecated(as_of=deprecated.ICEHOUSE, in_favor_of='f()', remove_in=0) + ... def e(): pass + + """ + + # NOTE(morganfainberg): Bexar is used for unit test purposes, it is + # expected we maintain a gap between Bexar and Folsom in this list. + BEXAR = 'B' + FOLSOM = 'F' + GRIZZLY = 'G' + HAVANA = 'H' + ICEHOUSE = 'I' + JUNO = 'J' + KILO = 'K' + + _RELEASES = { + # NOTE(morganfainberg): Bexar is used for unit test purposes, it is + # expected we maintain a gap between Bexar and Folsom in this list. + 'B': 'Bexar', + 'F': 'Folsom', + 'G': 'Grizzly', + 'H': 'Havana', + 'I': 'Icehouse', + 'J': 'Juno', + 'K': 'Kilo', + } + + _deprecated_msg_with_alternative = _( + '%(what)s is deprecated as of %(as_of)s in favor of ' + '%(in_favor_of)s and may be removed in %(remove_in)s.') + + _deprecated_msg_no_alternative = _( + '%(what)s is deprecated as of %(as_of)s and may be ' + 'removed in %(remove_in)s. It will not be superseded.') + + _deprecated_msg_with_alternative_no_removal = _( + '%(what)s is deprecated as of %(as_of)s in favor of %(in_favor_of)s.') + + _deprecated_msg_with_no_alternative_no_removal = _( + '%(what)s is deprecated as of %(as_of)s. It will not be superseded.') + + def __init__(self, as_of, in_favor_of=None, remove_in=2, what=None): + """Initialize decorator + + :param as_of: the release deprecating the callable. Constants + are define in this class for convenience. + :param in_favor_of: the replacement for the callable (optional) + :param remove_in: an integer specifying how many releases to wait + before removing (default: 2) + :param what: name of the thing being deprecated (default: the + callable's name) + + """ + self.as_of = as_of + self.in_favor_of = in_favor_of + self.remove_in = remove_in + self.what = what + + def __call__(self, func_or_cls): + if not self.what: + self.what = func_or_cls.__name__ + '()' + msg, details = self._build_message() + + if inspect.isfunction(func_or_cls): + + @six.wraps(func_or_cls) + def wrapped(*args, **kwargs): + LOG.deprecated(msg, details) + return func_or_cls(*args, **kwargs) + return wrapped + elif inspect.isclass(func_or_cls): + orig_init = func_or_cls.__init__ + + # TODO(tsufiev): change `functools` module to `six` as + # soon as six 1.7.4 (with fix for passing `assigned` + # argument to underlying `functools.wraps`) is released + # and added to the oslo.log-incubator requrements + @functools.wraps(orig_init, assigned=('__name__', '__doc__')) + def new_init(self, *args, **kwargs): + LOG.deprecated(msg, details) + orig_init(self, *args, **kwargs) + func_or_cls.__init__ = new_init + return func_or_cls + else: + raise TypeError('deprecated can be used only with functions or ' + 'classes') + + def _get_safe_to_remove_release(self, release): + # TODO(dstanek): this method will have to be reimplemented once + # when we get to the X release because once we get to the Y + # release, what is Y+2? + new_release = chr(ord(release) + self.remove_in) + if new_release in self._RELEASES: + return self._RELEASES[new_release] + else: + return new_release + + def _build_message(self): + details = dict(what=self.what, + as_of=self._RELEASES[self.as_of], + remove_in=self._get_safe_to_remove_release(self.as_of)) + + if self.in_favor_of: + details['in_favor_of'] = self.in_favor_of + if self.remove_in > 0: + msg = self._deprecated_msg_with_alternative + else: + # There are no plans to remove this function, but it is + # now deprecated. + msg = self._deprecated_msg_with_alternative_no_removal + else: + if self.remove_in > 0: + msg = self._deprecated_msg_no_alternative + else: + # There are no plans to remove this function, but it is + # now deprecated. + msg = self._deprecated_msg_with_no_alternative_no_removal + return msg, details + + +def is_compatible(requested_version, current_version, same_major=True): + """Determine whether `requested_version` is satisfied by + `current_version`; in other words, `current_version` is >= + `requested_version`. + + :param requested_version: version to check for compatibility + :param current_version: version to check against + :param same_major: if True, the major version must be identical between + `requested_version` and `current_version`. This is used when a + major-version difference indicates incompatibility between the two + versions. Since this is the common-case in practice, the default is + True. + :returns: True if compatible, False if not + """ + requested_parts = pkg_resources.parse_version(requested_version) + current_parts = pkg_resources.parse_version(current_version) + + if same_major and (requested_parts[0] != current_parts[0]): + return False + + return current_parts >= requested_parts diff --git a/requirements.txt b/requirements.txt index dbb4dd1..6c95fdd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,5 @@ -Babel>=0.9.6
\ No newline at end of file +Babel>=0.9.6 +six>=1.6.0 +iso8601>=0.1.9 +oslo.config>=1.2.0 +oslo.i18n>=0.1.0
\ No newline at end of file @@ -22,10 +22,12 @@ classifier = [files] packages = oslo - oslo.log namespace_packages = oslo +[pbr] +warnerrors = true + [build_sphinx] source-dir = doc/source build-dir = doc/build @@ -46,4 +48,4 @@ input_file = oslo.log/locale/oslo.log.pot [extract_messages] keywords = _ gettext ngettext l_ lazy_gettext mapping_file = babel.cfg -output_file = oslo.log/locale/oslo.log.pot
\ No newline at end of file +output_file = oslo.log/locale/oslo.log.pot @@ -19,4 +19,4 @@ import setuptools setuptools.setup( setup_requires=['pbr'], - pbr=True)
\ No newline at end of file + pbr=True) diff --git a/test-requirements.txt b/test-requirements.txt index 0d6d489..7b64565 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -1,6 +1,21 @@ -hacking>=0.5.6,<0.8 +hacking>=0.9.1,<0.10 + +discover +fixtures>=0.3.14 +python-subunit>=0.0.18 +testrepository>=0.0.18 +testscenarios>=0.4 +testtools>=0.9.34 oslotest -# These are needed for docs generation +# when we can require tox>= 1.4, this can go into tox.ini: +# [testenv:cover] +# deps = {[testenv]deps} coverage +coverage>=3.6 + +# this is required for the docs build jobs +sphinx>=1.2.1,<1.3 oslosphinx -sphinx>=1.1.2,!=1.2.0,<1.3
\ No newline at end of file + +# mocking framework +mock>=1.0 diff --git a/tests/__init__.py b/tests/__init__.py index e69de29..19f5e72 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -0,0 +1,13 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/tests/test_log.py b/tests/unit/__init__.py index de2a34a..19f5e72 100644 --- a/tests/test_log.py +++ b/tests/unit/__init__.py @@ -11,18 +11,3 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. - -""" -test_log ----------------------------------- - -Tests for `log` module. -""" - -from oslotest import base - - -class TestLog(base.TestCase): - - def test_something(self): - pass
\ No newline at end of file diff --git a/tests/unit/fixture/__init__.py b/tests/unit/fixture/__init__.py new file mode 100644 index 0000000..19f5e72 --- /dev/null +++ b/tests/unit/fixture/__init__.py @@ -0,0 +1,13 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/tests/unit/fixture/test_logging.py b/tests/unit/fixture/test_logging.py index 6f82b49..6826d8b 100644 --- a/tests/unit/fixture/test_logging.py +++ b/tests/unit/fixture/test_logging.py @@ -11,14 +11,14 @@ # under the License. -from openstack.common.fixture import logging as logging_fixture -from openstack.common import log as logging -from tests import utils +from oslo.log.fixture import logging as logging_fixture +from oslo.log import log as logging +from oslotest import base as test_base LOG = logging.getLogger(__name__) -class TestLoggingFixture(utils.BaseTestCase): +class TestLoggingFixture(test_base.BaseTestCase): def test_logging_handle_error(self): LOG.info('pid of first child is %(foo)s', 1) self.useFixture(logging_fixture.get_logging_handle_error_fixture()) diff --git a/tests/unit/test_context.py b/tests/unit/test_context.py index a1cb05d..f3e9b8f 100644 --- a/tests/unit/test_context.py +++ b/tests/unit/test_context.py @@ -15,7 +15,7 @@ from oslotest import base as test_base -from openstack.common import context +from oslo.log import context class ContextTest(test_base.BaseTestCase): diff --git a/tests/unit/test_local.py b/tests/unit/test_local.py index f52edb2..277a20c 100644 --- a/tests/unit/test_local.py +++ b/tests/unit/test_local.py @@ -18,7 +18,7 @@ import threading from oslotest import base as test_base from six import moves -from openstack.common import local +from oslo.log import local class Dict(dict): diff --git a/tests/unit/test_log.py b/tests/unit/test_log.py index 8a39307..d9447bf 100644 --- a/tests/unit/test_log.py +++ b/tests/unit/test_log.py @@ -25,15 +25,15 @@ from oslotest import base as test_base from oslotest import moxstubout import six -from openstack.common import context -from openstack.common import fileutils -from openstack.common.fixture import config -from openstack.common import gettextutils -from openstack.common import jsonutils -from openstack.common import local -from openstack.common import log -from openstack.common import log_handler -from openstack.common.notifier import api as notifier +from oslo.log import context +from oslo.log import local +from oslo.log import log +from oslo.log.openstack.common import fileutils +from oslo.log.openstack.common.fixture import config +from oslo.log.openstack.common import gettextutils +from oslo.log.openstack.common import jsonutils +from oslo.log.openstack.common import log_handler +from oslo.log.openstack.common.notifier import api as notifier def _fake_context(): diff --git a/tools/run_cross_tests.sh b/tools/run_cross_tests.sh new file mode 100755 index 0000000..5e7bc11 --- /dev/null +++ b/tools/run_cross_tests.sh @@ -0,0 +1,91 @@ +#!/bin/bash +# +# Run cross-project tests +# +# Usage: +# +# run_cross_tests.sh project_dir venv + +# Fail the build if any command fails +set -e + +project_dir="$1" +venv="$2" + +if [ -z "$project_dir" -o -z "$venv" ] +then + cat - <<EOF +ERROR: Missing argument(s) + +Usage: + + $0 PROJECT_DIR VIRTUAL_ENV + +Example, run the python 2.7 tests for python-neutronclient: + + $0 /opt/stack/python-neutronclient py27 + +EOF + exit 1 +fi + +# Set up the virtualenv without running the tests +(cd $project_dir && tox --notest -e $venv) + +tox_envbin=$project_dir/.tox/$venv/bin + +our_name=$(python setup.py --name) + +# Replace the pip-installed package with the version in our source +# tree. Look to see if we are already installed before trying to +# uninstall ourselves, to avoid failures from packages that do not use us +# yet. +if $tox_envbin/pip freeze | grep -q $our_name +then + $tox_envbin/pip uninstall -y $our_name +fi +$tox_envbin/pip install -U . + +# Run the tests +(cd $project_dir && tox -e $venv) +result=$? + + +# The below checks are modified from +# openstack-infra/config/modules/jenkins/files/slave_scripts/run-unittests.sh. + +# They expect to be run in the project being tested. +cd $project_dir + +echo "Begin pip freeze output from test virtualenv:" +echo "======================================================================" +.tox/$venv/bin/pip freeze +echo "======================================================================" + +# We only want to run the next check if the tool is installed, so look +# for it before continuing. +if [ -f /usr/local/jenkins/slave_scripts/subunit2html.py -a -d ".testrepository" ] ; then + if [ -f ".testrepository/0.2" ] ; then + cp .testrepository/0.2 ./subunit_log.txt + elif [ -f ".testrepository/0" ] ; then + .tox/$venv/bin/subunit-1to2 < .testrepository/0 > ./subunit_log.txt + fi + .tox/$venv/bin/python /usr/local/jenkins/slave_scripts/subunit2html.py ./subunit_log.txt testr_results.html + gzip -9 ./subunit_log.txt + gzip -9 ./testr_results.html + + export PYTHON=.tox/$venv/bin/python + set -e + rancount=$(.tox/$venv/bin/testr last | sed -ne 's/Ran \([0-9]\+\).*tests in.*/\1/p') + if [ "$rancount" -eq "0" ] ; then + echo + echo "Zero tests were run. At least one test should have been run." + echo "Failing this test as a result" + echo + exit 1 + fi +fi + +# If we make it this far, report status based on the tests that were +# run. +exit $result @@ -1,6 +1,6 @@ [tox] minversion = 1.6 -envlist = py33,py26,py27,pypy,pep8 +envlist = py26,py27,py33,pypy,pep8 # NOTE(dhellmann): We cannot set skipdist=True # for oslo libraries because of the namespace package. #skipsdist = True @@ -31,8 +31,12 @@ commands = python setup.py testr --coverage --testr-args='{posargs}' [flake8] # H803 skipped on purpose per list discussion. # E123, E125 skipped as they are invalid PEP-8. +# H305 skipped b/c of hacking is confused with oslo.* namespace show-source = True -ignore = E123,E125,H803 +ignore = E123,E125,H305,H307,H405,H803,H904 builtins = _ -exclude=.venv,.git,.tox,dist,doc,*openstack/common*,*lib/python*,*egg,build
\ No newline at end of file +exclude=.venv,.git,.tox,dist,doc,*openstack/common*,*lib/python*,*egg,build,__init__.py + +[hacking] +import_exceptions = oslo.log.openstack.common.gettextutils |