summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJulien Danjou <julien@danjou.info>2013-01-08 16:13:06 +0100
committerJulien Danjou <julien@danjou.info>2013-01-22 11:09:24 +0100
commit4c5fc2204a0249cbea70cc20d287666ce5c72a57 (patch)
tree79735391e6927fd0a19999a3cb12e90b93ea5a49
parent04e54c4f3eb79e197244b1a01b6bdcb8340af779 (diff)
downloadceilometer-4c5fc2204a0249cbea70cc20d287666ce5c72a57.tar.gz
Update openstack.common
Change-Id: I952bc668ce10d05944eb0d2b06c8eff917c22af8 Signed-off-by: Julien Danjou <julien@danjou.info>
-rw-r--r--ceilometer/api/acl.py4
-rw-r--r--ceilometer/api/v1/acl.py4
-rw-r--r--ceilometer/openstack/common/cfg.py514
-rw-r--r--ceilometer/openstack/common/eventlet_backdoor.py2
-rw-r--r--ceilometer/openstack/common/excutils.py6
-rw-r--r--ceilometer/openstack/common/importutils.py10
-rw-r--r--ceilometer/openstack/common/jsonutils.py2
-rw-r--r--ceilometer/openstack/common/log.py109
-rw-r--r--ceilometer/openstack/common/loopingcall.py9
-rw-r--r--ceilometer/openstack/common/notifier/api.py7
-rw-r--r--ceilometer/openstack/common/notifier/rpc_notifier.py2
-rw-r--r--ceilometer/openstack/common/notifier/rpc_notifier2.py51
-rw-r--r--ceilometer/openstack/common/policy.py840
-rw-r--r--ceilometer/openstack/common/rpc/__init__.py22
-rw-r--r--ceilometer/openstack/common/rpc/amqp.py56
-rw-r--r--ceilometer/openstack/common/rpc/common.py191
-rw-r--r--ceilometer/openstack/common/rpc/dispatcher.py28
-rw-r--r--ceilometer/openstack/common/rpc/impl_fake.py13
-rw-r--r--ceilometer/openstack/common/rpc/impl_kombu.py28
-rw-r--r--ceilometer/openstack/common/rpc/impl_qpid.py69
-rw-r--r--ceilometer/openstack/common/rpc/impl_zmq.py199
-rw-r--r--ceilometer/openstack/common/rpc/matchmaker.py2
-rw-r--r--ceilometer/openstack/common/service.py42
-rw-r--r--ceilometer/openstack/common/setup.py205
-rw-r--r--ceilometer/openstack/common/threadgroup.py33
-rw-r--r--ceilometer/openstack/common/timeutils.py33
-rw-r--r--ceilometer/openstack/common/version.py139
-rw-r--r--ceilometer/policy.py18
-rw-r--r--ceilometer/publish.py3
-rw-r--r--ceilometer/service.py2
-rw-r--r--ceilometer/version.py3
-rwxr-xr-xsetup.py7
-rw-r--r--tests/api/v2/test_statistics.py6
-rw-r--r--tests/test_bin.py13
-rw-r--r--tools/pip-requires1
-rw-r--r--tox.ini2
36 files changed, 1773 insertions, 902 deletions
diff --git a/ceilometer/api/acl.py b/ceilometer/api/acl.py
index a912d7f1..d6edbe92 100644
--- a/ceilometer/api/acl.py
+++ b/ceilometer/api/acl.py
@@ -49,7 +49,5 @@ class AdminAuthHook(hooks.PecanHook):
def before(self, state):
headers = state.request.headers
- if not policy.check_is_admin(headers.get('X-Roles', "").split(","),
- headers.get('X-Tenant-Id'),
- headers.get('X-Tenant-Name')):
+ if not policy.check_is_admin(headers.get('X-Roles', "").split(",")):
raise exc.HTTPUnauthorized()
diff --git a/ceilometer/api/v1/acl.py b/ceilometer/api/v1/acl.py
index e64737f4..3a76612f 100644
--- a/ceilometer/api/v1/acl.py
+++ b/ceilometer/api/v1/acl.py
@@ -22,7 +22,5 @@ from ceilometer import policy
def get_limited_to_project(headers):
"""Return the tenant the request should be limited to."""
- if not policy.check_is_admin(headers.get('X-Roles', "").split(","),
- headers.get('X-Tenant-Id'),
- headers.get('X-Tenant-Name')):
+ if not policy.check_is_admin(headers.get('X-Roles', "").split(",")):
return headers.get('X-Tenant-Id')
diff --git a/ceilometer/openstack/common/cfg.py b/ceilometer/openstack/common/cfg.py
index b024da8c..1beb2988 100644
--- a/ceilometer/openstack/common/cfg.py
+++ b/ceilometer/openstack/common/cfg.py
@@ -205,27 +205,11 @@ Option values may reference other values using PEP 292 string substitution::
Note that interpolation can be avoided by using '$$'.
-For command line utilities that dispatch to other command line utilities, the
-disable_interspersed_args() method is available. If this this method is called,
-then parsing e.g.::
-
- script --verbose cmd --debug /tmp/mything
-
-will no longer return::
-
- ['cmd', '/tmp/mything']
-
-as the leftover arguments, but will instead return::
-
- ['cmd', '--debug', '/tmp/mything']
-
-i.e. argument parsing is stopped at the first non-option argument.
-
Options may be declared as required so that an error is raised if the user
does not supply a value for the option.
Options may be declared as secret so that their values are not leaked into
-log files:
+log files::
opts = [
cfg.StrOpt('s3_store_access_key', secret=True),
@@ -233,29 +217,53 @@ log files:
...
]
-This module also contains a global instance of the CommonConfigOpts class
-in order to support a common usage pattern in OpenStack:
+This module also contains a global instance of the ConfigOpts class
+in order to support a common usage pattern in OpenStack::
- from ceilometer.openstack.common import cfg
+ from ceilometer.openstack.common import cfg
- opts = [
- cfg.StrOpt('bind_host', default='0.0.0.0'),
- cfg.IntOpt('bind_port', default=9292),
- ]
+ opts = [
+ cfg.StrOpt('bind_host', default='0.0.0.0'),
+ cfg.IntOpt('bind_port', default=9292),
+ ]
+
+ CONF = cfg.CONF
+ CONF.register_opts(opts)
+
+ def start(server, app):
+ server.start(app, CONF.bind_port, CONF.bind_host)
- CONF = cfg.CONF
- CONF.register_opts(opts)
+Positional command line arguments are supported via a 'positional' Opt
+constructor argument::
- def start(server, app):
- server.start(app, CONF.bind_port, CONF.bind_host)
+ >>> conf = ConfigOpts()
+ >>> conf.register_cli_opt(MultiStrOpt('bar', positional=True))
+ True
+ >>> conf(['a', 'b'])
+ >>> conf.bar
+ ['a', 'b']
+
+It is also possible to use argparse "sub-parsers" to parse additional
+command line arguments using the SubCommandOpt class:
+
+ >>> def add_parsers(subparsers):
+ ... list_action = subparsers.add_parser('list')
+ ... list_action.add_argument('id')
+ ...
+ >>> conf = ConfigOpts()
+ >>> conf.register_cli_opt(SubCommandOpt('action', handler=add_parsers))
+ True
+ >>> conf(args=['list', '10'])
+ >>> conf.action.name, conf.action.id
+ ('list', '10')
"""
+import argparse
import collections
import copy
import functools
import glob
-import optparse
import os
import string
import sys
@@ -474,6 +482,13 @@ def _is_opt_registered(opts, opt):
return False
+def set_defaults(opts, **kwargs):
+ for opt in opts:
+ if opt.dest in kwargs:
+ opt.default = kwargs[opt.dest]
+ break
+
+
class Opt(object):
"""Base class for all configuration options.
@@ -489,6 +504,8 @@ class Opt(object):
a single character CLI option name
default:
the default value of the option
+ positional:
+ True if the option is a positional CLI argument
metavar:
the name shown as the argument to a CLI option in --help output
help:
@@ -497,8 +514,8 @@ class Opt(object):
multi = False
def __init__(self, name, dest=None, short=None, default=None,
- metavar=None, help=None, secret=False, required=False,
- deprecated_name=None):
+ positional=False, metavar=None, help=None,
+ secret=False, required=False, deprecated_name=None):
"""Construct an Opt object.
The only required parameter is the option's name. However, it is
@@ -508,6 +525,7 @@ class Opt(object):
:param dest: the name of the corresponding ConfigOpts property
:param short: a single character CLI option name
:param default: the default value of the option
+ :param positional: True if the option is a positional CLI argument
:param metavar: the option argument to show in --help
:param help: an explanation of how the option is used
:param secret: true iff the value should be obfuscated in log output
@@ -521,6 +539,7 @@ class Opt(object):
self.dest = dest
self.short = short
self.default = default
+ self.positional = positional
self.metavar = metavar
self.help = help
self.secret = secret
@@ -561,64 +580,73 @@ class Opt(object):
:param parser: the CLI option parser
:param group: an optional OptGroup object
"""
- container = self._get_optparse_container(parser, group)
- kwargs = self._get_optparse_kwargs(group)
- prefix = self._get_optparse_prefix('', group)
- self._add_to_optparse(container, self.name, self.short, kwargs, prefix,
- self.deprecated_name)
+ container = self._get_argparse_container(parser, group)
+ kwargs = self._get_argparse_kwargs(group)
+ prefix = self._get_argparse_prefix('', group)
+ self._add_to_argparse(container, self.name, self.short, kwargs, prefix,
+ self.positional, self.deprecated_name)
- def _add_to_optparse(self, container, name, short, kwargs, prefix='',
- deprecated_name=None):
- """Add an option to an optparse parser or group.
+ def _add_to_argparse(self, container, name, short, kwargs, prefix='',
+ positional=False, deprecated_name=None):
+ """Add an option to an argparse parser or group.
- :param container: an optparse.OptionContainer object
+ :param container: an argparse._ArgumentGroup object
:param name: the opt name
:param short: the short opt name
- :param kwargs: the keyword arguments for add_option()
+ :param kwargs: the keyword arguments for add_argument()
:param prefix: an optional prefix to prepend to the opt name
+ :param position: whether the optional is a positional CLI argument
:raises: DuplicateOptError if a naming confict is detected
"""
- args = ['--' + prefix + name]
+ def hyphen(arg):
+ return arg if not positional else ''
+
+ args = [hyphen('--') + prefix + name]
if short:
- args += ['-' + short]
+ args.append(hyphen('-') + short)
if deprecated_name:
- args += ['--' + prefix + deprecated_name]
- for a in args:
- if container.has_option(a):
- raise DuplicateOptError(a)
- container.add_option(*args, **kwargs)
+ args.append(hyphen('--') + prefix + deprecated_name)
+
+ try:
+ container.add_argument(*args, **kwargs)
+ except argparse.ArgumentError as e:
+ raise DuplicateOptError(e)
- def _get_optparse_container(self, parser, group):
- """Returns an optparse.OptionContainer.
+ def _get_argparse_container(self, parser, group):
+ """Returns an argparse._ArgumentGroup.
- :param parser: an optparse.OptionParser
+ :param parser: an argparse.ArgumentParser
:param group: an (optional) OptGroup object
- :returns: an optparse.OptionGroup if a group is given, else the parser
+ :returns: an argparse._ArgumentGroup if group is given, else parser
"""
if group is not None:
- return group._get_optparse_group(parser)
+ return group._get_argparse_group(parser)
else:
return parser
- def _get_optparse_kwargs(self, group, **kwargs):
- """Build a dict of keyword arguments for optparse's add_option().
+ def _get_argparse_kwargs(self, group, **kwargs):
+ """Build a dict of keyword arguments for argparse's add_argument().
Most opt types extend this method to customize the behaviour of the
- options added to optparse.
+ options added to argparse.
:param group: an optional group
:param kwargs: optional keyword arguments to add to
:returns: a dict of keyword arguments
"""
- dest = self.dest
- if group is not None:
- dest = group.name + '_' + dest
- kwargs.update({'dest': dest,
+ if not self.positional:
+ dest = self.dest
+ if group is not None:
+ dest = group.name + '_' + dest
+ kwargs['dest'] = dest
+ else:
+ kwargs['nargs'] = '?'
+ kwargs.update({'default': None,
'metavar': self.metavar,
'help': self.help, })
return kwargs
- def _get_optparse_prefix(self, prefix, group):
+ def _get_argparse_prefix(self, prefix, group):
"""Build a prefix for the CLI option name, if required.
CLI options in a group are prefixed with the group's name in order
@@ -656,6 +684,11 @@ class BoolOpt(Opt):
_boolean_states = {'1': True, 'yes': True, 'true': True, 'on': True,
'0': False, 'no': False, 'false': False, 'off': False}
+ def __init__(self, *args, **kwargs):
+ if 'positional' in kwargs:
+ raise ValueError('positional boolean args not supported')
+ super(BoolOpt, self).__init__(*args, **kwargs)
+
def _get_from_config_parser(self, cparser, section):
"""Retrieve the opt value as a boolean from ConfigParser."""
def convert_bool(v):
@@ -671,21 +704,32 @@ class BoolOpt(Opt):
def _add_to_cli(self, parser, group=None):
"""Extends the base class method to add the --nooptname option."""
super(BoolOpt, self)._add_to_cli(parser, group)
- self._add_inverse_to_optparse(parser, group)
+ self._add_inverse_to_argparse(parser, group)
- def _add_inverse_to_optparse(self, parser, group):
+ def _add_inverse_to_argparse(self, parser, group):
"""Add the --nooptname option to the option parser."""
- container = self._get_optparse_container(parser, group)
- kwargs = self._get_optparse_kwargs(group, action='store_false')
- prefix = self._get_optparse_prefix('no', group)
+ container = self._get_argparse_container(parser, group)
+ kwargs = self._get_argparse_kwargs(group, action='store_false')
+ prefix = self._get_argparse_prefix('no', group)
kwargs["help"] = "The inverse of --" + self.name
- self._add_to_optparse(container, self.name, None, kwargs, prefix,
- self.deprecated_name)
+ self._add_to_argparse(container, self.name, None, kwargs, prefix,
+ self.positional, self.deprecated_name)
+
+ def _get_argparse_kwargs(self, group, action='store_true', **kwargs):
+ """Extends the base argparse keyword dict for boolean options."""
+
+ kwargs = super(BoolOpt, self)._get_argparse_kwargs(group, **kwargs)
+
+ # metavar has no effect for BoolOpt
+ if 'metavar' in kwargs:
+ del kwargs['metavar']
+
+ if action != 'store_true':
+ action = 'store_false'
- def _get_optparse_kwargs(self, group, action='store_true', **kwargs):
- """Extends the base optparse keyword dict for boolean options."""
- return super(BoolOpt,
- self)._get_optparse_kwargs(group, action=action, **kwargs)
+ kwargs['action'] = action
+
+ return kwargs
class IntOpt(Opt):
@@ -697,10 +741,10 @@ class IntOpt(Opt):
return [int(v) for v in self._cparser_get_with_deprecated(cparser,
section)]
- def _get_optparse_kwargs(self, group, **kwargs):
- """Extends the base optparse keyword dict for integer options."""
+ def _get_argparse_kwargs(self, group, **kwargs):
+ """Extends the base argparse keyword dict for integer options."""
return super(IntOpt,
- self)._get_optparse_kwargs(group, type='int', **kwargs)
+ self)._get_argparse_kwargs(group, type=int, **kwargs)
class FloatOpt(Opt):
@@ -712,10 +756,10 @@ class FloatOpt(Opt):
return [float(v) for v in
self._cparser_get_with_deprecated(cparser, section)]
- def _get_optparse_kwargs(self, group, **kwargs):
- """Extends the base optparse keyword dict for float options."""
- return super(FloatOpt,
- self)._get_optparse_kwargs(group, type='float', **kwargs)
+ def _get_argparse_kwargs(self, group, **kwargs):
+ """Extends the base argparse keyword dict for float options."""
+ return super(FloatOpt, self)._get_argparse_kwargs(group,
+ type=float, **kwargs)
class ListOpt(Opt):
@@ -725,23 +769,26 @@ class ListOpt(Opt):
is a list containing these strings.
"""
+ class _StoreListAction(argparse.Action):
+ """
+ An argparse action for parsing an option value into a list.
+ """
+ def __call__(self, parser, namespace, values, option_string=None):
+ if values is not None:
+ values = [a.strip() for a in values.split(',')]
+ setattr(namespace, self.dest, values)
+
def _get_from_config_parser(self, cparser, section):
"""Retrieve the opt value as a list from ConfigParser."""
- return [v.split(',') for v in
+ return [[a.strip() for a in v.split(',')] for v in
self._cparser_get_with_deprecated(cparser, section)]
- def _get_optparse_kwargs(self, group, **kwargs):
- """Extends the base optparse keyword dict for list options."""
- return super(ListOpt,
- self)._get_optparse_kwargs(group,
- type='string',
- action='callback',
- callback=self._parse_list,
- **kwargs)
-
- def _parse_list(self, option, opt, value, parser):
- """An optparse callback for parsing an option value into a list."""
- setattr(parser.values, self.dest, value.split(','))
+ def _get_argparse_kwargs(self, group, **kwargs):
+ """Extends the base argparse keyword dict for list options."""
+ return Opt._get_argparse_kwargs(self,
+ group,
+ action=ListOpt._StoreListAction,
+ **kwargs)
class MultiStrOpt(Opt):
@@ -752,10 +799,14 @@ class MultiStrOpt(Opt):
"""
multi = True
- def _get_optparse_kwargs(self, group, **kwargs):
- """Extends the base optparse keyword dict for multi str options."""
- return super(MultiStrOpt,
- self)._get_optparse_kwargs(group, action='append')
+ def _get_argparse_kwargs(self, group, **kwargs):
+ """Extends the base argparse keyword dict for multi str options."""
+ kwargs = super(MultiStrOpt, self)._get_argparse_kwargs(group)
+ if not self.positional:
+ kwargs['action'] = 'append'
+ else:
+ kwargs['nargs'] = '*'
+ return kwargs
def _cparser_get_with_deprecated(self, cparser, section):
"""If cannot find option as dest try deprecated_name alias."""
@@ -765,6 +816,57 @@ class MultiStrOpt(Opt):
return cparser.get(section, [self.dest], multi=True)
+class SubCommandOpt(Opt):
+
+ """
+ Sub-command options allow argparse sub-parsers to be used to parse
+ additional command line arguments.
+
+ The handler argument to the SubCommandOpt contructor is a callable
+ which is supplied an argparse subparsers object. Use this handler
+ callable to add sub-parsers.
+
+ The opt value is SubCommandAttr object with the name of the chosen
+ sub-parser stored in the 'name' attribute and the values of other
+ sub-parser arguments available as additional attributes.
+ """
+
+ def __init__(self, name, dest=None, handler=None,
+ title=None, description=None, help=None):
+ """Construct an sub-command parsing option.
+
+ This behaves similarly to other Opt sub-classes but adds a
+ 'handler' argument. The handler is a callable which is supplied
+ an subparsers object when invoked. The add_parser() method on
+ this subparsers object can be used to register parsers for
+ sub-commands.
+
+ :param name: the option's name
+ :param dest: the name of the corresponding ConfigOpts property
+ :param title: title of the sub-commands group in help output
+ :param description: description of the group in help output
+ :param help: a help string giving an overview of available sub-commands
+ """
+ super(SubCommandOpt, self).__init__(name, dest=dest, help=help)
+ self.handler = handler
+ self.title = title
+ self.description = description
+
+ def _add_to_cli(self, parser, group=None):
+ """Add argparse sub-parsers and invoke the handler method."""
+ dest = self.dest
+ if group is not None:
+ dest = group.name + '_' + dest
+
+ subparsers = parser.add_subparsers(dest=dest,
+ title=self.title,
+ description=self.description,
+ help=self.help)
+
+ if not self.handler is None:
+ self.handler(subparsers)
+
+
class OptGroup(object):
"""
@@ -800,19 +902,20 @@ class OptGroup(object):
self.help = help
self._opts = {} # dict of dicts of (opt:, override:, default:)
- self._optparse_group = None
+ self._argparse_group = None
- def _register_opt(self, opt):
+ def _register_opt(self, opt, cli=False):
"""Add an opt to this group.
:param opt: an Opt object
+ :param cli: whether this is a CLI option
:returns: False if previously registered, True otherwise
:raises: DuplicateOptError if a naming conflict is detected
"""
if _is_opt_registered(self._opts, opt):
return False
- self._opts[opt.dest] = {'opt': opt}
+ self._opts[opt.dest] = {'opt': opt, 'cli': cli}
return True
@@ -824,16 +927,16 @@ class OptGroup(object):
if opt.dest in self._opts:
del self._opts[opt.dest]
- def _get_optparse_group(self, parser):
- """Build an optparse.OptionGroup for this group."""
- if self._optparse_group is None:
- self._optparse_group = optparse.OptionGroup(parser, self.title,
- self.help)
- return self._optparse_group
+ def _get_argparse_group(self, parser):
+ if self._argparse_group is None:
+ """Build an argparse._ArgumentGroup for this group."""
+ self._argparse_group = parser.add_argument_group(self.title,
+ self.help)
+ return self._argparse_group
def _clear(self):
"""Clear this group's option parsing state."""
- self._optparse_group = None
+ self._argparse_group = None
class ParseError(iniparser.ParseError):
@@ -928,26 +1031,31 @@ class ConfigOpts(collections.Mapping):
self._groups = {}
self._args = None
+
self._oparser = None
self._cparser = None
self._cli_values = {}
self.__cache = {}
self._config_opts = []
- self._disable_interspersed_args = False
- def _setup(self, project, prog, version, usage, default_config_files):
- """Initialize a ConfigOpts object for option parsing."""
+ def _pre_setup(self, project, prog, version, usage, default_config_files):
+ """Initialize a ConfigCliParser object for option parsing."""
+
if prog is None:
prog = os.path.basename(sys.argv[0])
if default_config_files is None:
default_config_files = find_config_files(project, prog)
- self._oparser = optparse.OptionParser(prog=prog,
- version=version,
- usage=usage)
- if self._disable_interspersed_args:
- self._oparser.disable_interspersed_args()
+ self._oparser = argparse.ArgumentParser(prog=prog, usage=usage)
+ self._oparser.add_argument('--version',
+ action='version',
+ version=version)
+
+ return prog, default_config_files
+
+ def _setup(self, project, prog, version, usage, default_config_files):
+ """Initialize a ConfigOpts object for option parsing."""
self._config_opts = [
MultiStrOpt('config-file',
@@ -1017,18 +1125,23 @@ class ConfigOpts(collections.Mapping):
:raises: SystemExit, ConfigFilesNotFoundError, ConfigFileParseError,
RequiredOptError, DuplicateOptError
"""
+
self.clear()
+ prog, default_config_files = self._pre_setup(project,
+ prog,
+ version,
+ usage,
+ default_config_files)
+
self._setup(project, prog, version, usage, default_config_files)
- self._cli_values, leftovers = self._parse_cli_opts(args)
+ self._cli_values = self._parse_cli_opts(args)
self._parse_config_files()
self._check_required_opts()
- return leftovers
-
def __getattr__(self, name):
"""Look up an option value and perform string substitution.
@@ -1062,17 +1175,21 @@ class ConfigOpts(collections.Mapping):
@__clear_cache
def clear(self):
- """Clear the state of the object to before it was called."""
+ """Clear the state of the object to before it was called.
+
+ Any subparsers added using the add_cli_subparsers() will also be
+ removed as a side-effect of this method.
+ """
self._args = None
self._cli_values.clear()
- self._oparser = None
+ self._oparser = argparse.ArgumentParser()
self._cparser = None
self.unregister_opts(self._config_opts)
for group in self._groups.values():
group._clear()
@__clear_cache
- def register_opt(self, opt, group=None):
+ def register_opt(self, opt, group=None, cli=False):
"""Register an option schema.
Registering an option schema makes any option value which is previously
@@ -1080,17 +1197,19 @@ class ConfigOpts(collections.Mapping):
as an attribute of this object.
:param opt: an instance of an Opt sub-class
+ :param cli: whether this is a CLI option
:param group: an optional OptGroup object or group name
:return: False if the opt was already register, True otherwise
:raises: DuplicateOptError
"""
if group is not None:
- return self._get_group(group, autocreate=True)._register_opt(opt)
+ group = self._get_group(group, autocreate=True)
+ return group._register_opt(opt, cli)
if _is_opt_registered(self._opts, opt):
return False
- self._opts[opt.dest] = {'opt': opt}
+ self._opts[opt.dest] = {'opt': opt, 'cli': cli}
return True
@@ -1116,7 +1235,7 @@ class ConfigOpts(collections.Mapping):
if self._args is not None:
raise ArgsAlreadyParsedError("cannot register CLI option")
- return self.register_opt(opt, group, clear_cache=False)
+ return self.register_opt(opt, group, cli=True, clear_cache=False)
@__clear_cache
def register_cli_opts(self, opts, group=None):
@@ -1243,10 +1362,11 @@ class ConfigOpts(collections.Mapping):
for info in group._opts.values():
yield info, group
- def _all_opts(self):
- """A generator function for iteration opts."""
+ def _all_cli_opts(self):
+ """A generator function for iterating CLI opts."""
for info, group in self._all_opt_infos():
- yield info['opt'], group
+ if info['cli']:
+ yield info['opt'], group
def _unset_defaults_and_overrides(self):
"""Unset any default or override on all options."""
@@ -1254,31 +1374,6 @@ class ConfigOpts(collections.Mapping):
info.pop('default', None)
info.pop('override', None)
- def disable_interspersed_args(self):
- """Set parsing to stop on the first non-option.
-
- If this this method is called, then parsing e.g.
-
- script --verbose cmd --debug /tmp/mything
-
- will no longer return:
-
- ['cmd', '/tmp/mything']
-
- as the leftover arguments, but will instead return:
-
- ['cmd', '--debug', '/tmp/mything']
-
- i.e. argument parsing is stopped at the first non-option argument.
- """
- self._disable_interspersed_args = True
-
- def enable_interspersed_args(self):
- """Set parsing to not stop on the first non-option.
-
- This it the default behaviour."""
- self._disable_interspersed_args = False
-
def find_file(self, name):
"""Locate a file located alongside the config files.
@@ -1377,6 +1472,9 @@ class ConfigOpts(collections.Mapping):
info = self._get_opt_info(name, group)
opt = info['opt']
+ if isinstance(opt, SubCommandOpt):
+ return self.SubCommandAttr(self, group, opt.dest)
+
if 'override' in info:
return info['override']
@@ -1401,6 +1499,10 @@ class ConfigOpts(collections.Mapping):
if not opt.multi:
return value
+ # argparse ignores default=None for nargs='*'
+ if opt.positional and not value:
+ value = opt.default
+
return value + values
if values:
@@ -1523,12 +1625,10 @@ class ConfigOpts(collections.Mapping):
"""
self._args = args
- for opt, group in self._all_opts():
+ for opt, group in self._all_cli_opts():
opt._add_to_cli(self._oparser, group)
- values, leftovers = self._oparser.parse_args(args)
-
- return vars(values), leftovers
+ return vars(self._oparser.parse_args(args))
class GroupAttr(collections.Mapping):
@@ -1543,12 +1643,12 @@ class ConfigOpts(collections.Mapping):
:param conf: a ConfigOpts object
:param group: an OptGroup object
"""
- self.conf = conf
- self.group = group
+ self._conf = conf
+ self._group = group
def __getattr__(self, name):
"""Look up an option value and perform template substitution."""
- return self.conf._get(name, self.group)
+ return self._conf._get(name, self._group)
def __getitem__(self, key):
"""Look up an option value and perform string substitution."""
@@ -1556,16 +1656,50 @@ class ConfigOpts(collections.Mapping):
def __contains__(self, key):
"""Return True if key is the name of a registered opt or group."""
- return key in self.group._opts
+ return key in self._group._opts
def __iter__(self):
"""Iterate over all registered opt and group names."""
- for key in self.group._opts.keys():
+ for key in self._group._opts.keys():
yield key
def __len__(self):
"""Return the number of options and option groups."""
- return len(self.group._opts)
+ return len(self._group._opts)
+
+ class SubCommandAttr(object):
+
+ """
+ A helper class representing the name and arguments of an argparse
+ sub-parser.
+ """
+
+ def __init__(self, conf, group, dest):
+ """Construct a SubCommandAttr object.
+
+ :param conf: a ConfigOpts object
+ :param group: an OptGroup object
+ :param dest: the name of the sub-parser
+ """
+ self._conf = conf
+ self._group = group
+ self._dest = dest
+
+ def __getattr__(self, name):
+ """Look up a sub-parser name or argument value."""
+ if name == 'name':
+ name = self._dest
+ if self._group is not None:
+ name = self._group.name + '_' + name
+ return self._conf._cli_values[name]
+
+ if name in self._conf:
+ raise DuplicateOptError(name)
+
+ try:
+ return self._conf._cli_values[name]
+ except KeyError:
+ raise NoSuchOptError(name)
class StrSubWrapper(object):
@@ -1594,60 +1728,4 @@ class ConfigOpts(collections.Mapping):
return value
-class CommonConfigOpts(ConfigOpts):
-
- DEFAULT_LOG_FORMAT = "%(asctime)s %(levelname)8s [%(name)s] %(message)s"
- DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
-
- common_cli_opts = [
- BoolOpt('debug',
- short='d',
- default=False,
- help='Print debugging output'),
- BoolOpt('verbose',
- short='v',
- default=False,
- help='Print more verbose output'),
- ]
-
- logging_cli_opts = [
- StrOpt('log-config',
- metavar='PATH',
- help='If this option is specified, the logging configuration '
- 'file specified is used and overrides any other logging '
- 'options specified. Please see the Python logging module '
- 'documentation for details on logging configuration '
- 'files.'),
- StrOpt('log-format',
- default=DEFAULT_LOG_FORMAT,
- metavar='FORMAT',
- help='A logging.Formatter log message format string which may '
- 'use any of the available logging.LogRecord attributes. '
- 'Default: %default'),
- StrOpt('log-date-format',
- default=DEFAULT_LOG_DATE_FORMAT,
- metavar='DATE_FORMAT',
- help='Format string for %(asctime)s in log records. '
- 'Default: %default'),
- StrOpt('log-file',
- metavar='PATH',
- help='(Optional) Name of log file to output to. '
- 'If not set, logging will go to stdout.'),
- StrOpt('log-dir',
- help='(Optional) The directory to keep log files in '
- '(will be prepended to --logfile)'),
- BoolOpt('use-syslog',
- default=False,
- help='Use syslog for logging.'),
- StrOpt('syslog-log-facility',
- default='LOG_USER',
- help='syslog facility to receive log lines')
- ]
-
- def __init__(self):
- super(CommonConfigOpts, self).__init__()
- self.register_cli_opts(self.common_cli_opts)
- self.register_cli_opts(self.logging_cli_opts)
-
-
-CONF = CommonConfigOpts()
+CONF = ConfigOpts()
diff --git a/ceilometer/openstack/common/eventlet_backdoor.py b/ceilometer/openstack/common/eventlet_backdoor.py
index bf4e0e03..58e00589 100644
--- a/ceilometer/openstack/common/eventlet_backdoor.py
+++ b/ceilometer/openstack/common/eventlet_backdoor.py
@@ -46,7 +46,7 @@ def _find_objects(t):
def _print_greenthreads():
- for i, gt in enumerate(find_objects(greenlet.greenlet)):
+ for i, gt in enumerate(_find_objects(greenlet.greenlet)):
print i, gt
traceback.print_stack(gt.gr_frame)
print
diff --git a/ceilometer/openstack/common/excutils.py b/ceilometer/openstack/common/excutils.py
index 5dd48301..414cc27e 100644
--- a/ceilometer/openstack/common/excutils.py
+++ b/ceilometer/openstack/common/excutils.py
@@ -24,6 +24,8 @@ import logging
import sys
import traceback
+from ceilometer.openstack.common.gettextutils import _
+
@contextlib.contextmanager
def save_and_reraise_exception():
@@ -43,7 +45,7 @@ def save_and_reraise_exception():
try:
yield
except Exception:
- logging.error('Original exception being dropped: %s' %
- (traceback.format_exception(type_, value, tb)))
+ logging.error(_('Original exception being dropped: %s'),
+ traceback.format_exception(type_, value, tb))
raise
raise type_, value, tb
diff --git a/ceilometer/openstack/common/importutils.py b/ceilometer/openstack/common/importutils.py
index f45372b4..9dec764f 100644
--- a/ceilometer/openstack/common/importutils.py
+++ b/ceilometer/openstack/common/importutils.py
@@ -29,7 +29,7 @@ def import_class(import_str):
try:
__import__(mod_str)
return getattr(sys.modules[mod_str], class_str)
- except (ValueError, AttributeError), exc:
+ except (ValueError, AttributeError):
raise ImportError('Class %s cannot be found (%s)' %
(class_str,
traceback.format_exception(*sys.exc_info())))
@@ -57,3 +57,11 @@ def import_module(import_str):
"""Import a module."""
__import__(import_str)
return sys.modules[import_str]
+
+
+def try_import(import_str, default=None):
+ """Try to import a module and if it fails return default."""
+ try:
+ return import_module(import_str)
+ except ImportError:
+ return default
diff --git a/ceilometer/openstack/common/jsonutils.py b/ceilometer/openstack/common/jsonutils.py
index dad84dfd..47ec79e8 100644
--- a/ceilometer/openstack/common/jsonutils.py
+++ b/ceilometer/openstack/common/jsonutils.py
@@ -120,7 +120,7 @@ def to_primitive(value, convert_instances=False, level=0):
level=level + 1)
else:
return value
- except TypeError, e:
+ except TypeError:
# Class objects are tricky since they may define something like
# __iter__ defined but it isn't callable as list().
return unicode(value)
diff --git a/ceilometer/openstack/common/log.py b/ceilometer/openstack/common/log.py
index ea8d2833..07219f77 100644
--- a/ceilometer/openstack/common/log.py
+++ b/ceilometer/openstack/common/log.py
@@ -47,21 +47,83 @@ from ceilometer.openstack.common import local
from ceilometer.openstack.common import notifier
+_DEFAULT_LOG_FORMAT = "%(asctime)s %(levelname)8s [%(name)s] %(message)s"
+_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
+
+common_cli_opts = [
+ cfg.BoolOpt('debug',
+ short='d',
+ default=False,
+ help='Print debugging output (set logging level to '
+ 'DEBUG instead of default WARNING level).'),
+ cfg.BoolOpt('verbose',
+ short='v',
+ default=False,
+ help='Print more verbose output (set logging level to '
+ 'INFO instead of default WARNING level).'),
+]
+
+logging_cli_opts = [
+ cfg.StrOpt('log-config',
+ metavar='PATH',
+ help='If this option is specified, the logging configuration '
+ 'file specified is used and overrides any other logging '
+ 'options specified. Please see the Python logging module '
+ 'documentation for details on logging configuration '
+ 'files.'),
+ cfg.StrOpt('log-format',
+ default=_DEFAULT_LOG_FORMAT,
+ metavar='FORMAT',
+ help='A logging.Formatter log message format string which may '
+ 'use any of the available logging.LogRecord attributes. '
+ 'Default: %(default)s'),
+ cfg.StrOpt('log-date-format',
+ default=_DEFAULT_LOG_DATE_FORMAT,
+ metavar='DATE_FORMAT',
+ help='Format string for %%(asctime)s in log records. '
+ 'Default: %(default)s'),
+ cfg.StrOpt('log-file',
+ metavar='PATH',
+ deprecated_name='logfile',
+ help='(Optional) Name of log file to output to. '
+ 'If not set, logging will go to stdout.'),
+ cfg.StrOpt('log-dir',
+ deprecated_name='logdir',
+ help='(Optional) The directory to keep log files in '
+ '(will be prepended to --log-file)'),
+ cfg.BoolOpt('use-syslog',
+ default=False,
+ help='Use syslog for logging.'),
+ cfg.StrOpt('syslog-log-facility',
+ default='LOG_USER',
+ help='syslog facility to receive log lines')
+]
+
+generic_log_opts = [
+ cfg.BoolOpt('use_stderr',
+ default=True,
+ help='Log output to standard error'),
+ cfg.StrOpt('logfile_mode',
+ default='0644',
+ help='Default file mode used when creating log files'),
+]
+
log_opts = [
cfg.StrOpt('logging_context_format_string',
- default='%(asctime)s %(levelname)s %(name)s [%(request_id)s '
- '%(user)s %(tenant)s] %(instance)s'
+ default='%(asctime)s.%(msecs)03d %(levelname)s %(name)s '
+ '[%(request_id)s %(user)s %(tenant)s] %(instance)s'
'%(message)s',
help='format string to use for log messages with context'),
cfg.StrOpt('logging_default_format_string',
- default='%(asctime)s %(process)d %(levelname)s %(name)s [-]'
- ' %(instance)s%(message)s',
+ default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
+ '%(name)s [-] %(instance)s%(message)s',
help='format string to use for log messages without context'),
cfg.StrOpt('logging_debug_format_suffix',
default='%(funcName)s %(pathname)s:%(lineno)d',
help='data to append to log format when level is DEBUG'),
cfg.StrOpt('logging_exception_prefix',
- default='%(asctime)s %(process)d TRACE %(name)s %(instance)s',
+ default='%(asctime)s.%(msecs)03d %(process)d TRACE %(name)s '
+ '%(instance)s',
help='prefix each line of exception output with this format'),
cfg.ListOpt('default_log_levels',
default=[
@@ -93,24 +155,9 @@ log_opts = [
'format it like this'),
]
-
-generic_log_opts = [
- cfg.StrOpt('logdir',
- default=None,
- help='Log output to a per-service log file in named directory'),
- cfg.StrOpt('logfile',
- default=None,
- help='Log output to a named file'),
- cfg.BoolOpt('use_stderr',
- default=True,
- help='Log output to standard error'),
- cfg.StrOpt('logfile_mode',
- default='0644',
- help='Default file mode used when creating log files'),
-]
-
-
CONF = cfg.CONF
+CONF.register_cli_opts(common_cli_opts)
+CONF.register_cli_opts(logging_cli_opts)
CONF.register_opts(generic_log_opts)
CONF.register_opts(log_opts)
@@ -148,8 +195,8 @@ def _get_binary_name():
def _get_log_file_path(binary=None):
- logfile = CONF.log_file or CONF.logfile
- logdir = CONF.log_dir or CONF.logdir
+ logfile = CONF.log_file
+ logdir = CONF.log_dir
if logfile and not logdir:
return logfile
@@ -174,7 +221,7 @@ class ContextAdapter(logging.LoggerAdapter):
self.log(logging.AUDIT, msg, *args, **kwargs)
def deprecated(self, msg, *args, **kwargs):
- stdmsg = _("Deprecated Config: %s") % msg
+ stdmsg = _("Deprecated: %s") % msg
if CONF.fatal_deprecations:
self.critical(stdmsg, *args, **kwargs)
raise DeprecatedConfig(msg=stdmsg)
@@ -289,6 +336,12 @@ def setup(product_name):
_setup_logging_from_conf(product_name)
+def set_defaults(logging_context_format_string):
+ cfg.set_defaults(log_opts,
+ logging_context_format_string=
+ logging_context_format_string)
+
+
def _find_facility_from_conf():
facility_names = logging.handlers.SysLogHandler.facility_names
facility = getattr(logging.handlers.SysLogHandler,
@@ -354,10 +407,12 @@ def _setup_logging_from_conf(product_name):
datefmt=datefmt))
handler.setFormatter(LegacyFormatter(datefmt=datefmt))
- if CONF.verbose or CONF.debug:
+ if CONF.debug:
log_root.setLevel(logging.DEBUG)
- else:
+ elif CONF.verbose:
log_root.setLevel(logging.INFO)
+ else:
+ log_root.setLevel(logging.WARNING)
level = logging.NOTSET
for pair in CONF.default_log_levels:
diff --git a/ceilometer/openstack/common/loopingcall.py b/ceilometer/openstack/common/loopingcall.py
index 2c81e6ca..0f07c838 100644
--- a/ceilometer/openstack/common/loopingcall.py
+++ b/ceilometer/openstack/common/loopingcall.py
@@ -24,6 +24,7 @@ from eventlet import greenthread
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import log as logging
+from ceilometer.openstack.common import timeutils
LOG = logging.getLogger(__name__)
@@ -62,10 +63,16 @@ class LoopingCall(object):
try:
while self._running:
+ start = timeutils.utcnow()
self.f(*self.args, **self.kw)
+ end = timeutils.utcnow()
if not self._running:
break
- greenthread.sleep(interval)
+ delay = interval - timeutils.delta_seconds(start, end)
+ if delay <= 0:
+ LOG.warn(_('task run outlasted interval by %s sec') %
+ -delay)
+ greenthread.sleep(delay if delay > 0 else 0)
except LoopingCallDone, e:
self.stop()
done.send(e.retvalue)
diff --git a/ceilometer/openstack/common/notifier/api.py b/ceilometer/openstack/common/notifier/api.py
index 92f654cd..6e91c0cc 100644
--- a/ceilometer/openstack/common/notifier/api.py
+++ b/ceilometer/openstack/common/notifier/api.py
@@ -137,10 +137,11 @@ def notify(context, publisher_id, event_type, priority, payload):
for driver in _get_drivers():
try:
driver.notify(context, msg)
- except Exception, e:
+ except Exception as e:
LOG.exception(_("Problem '%(e)s' attempting to "
"send to notification system. "
- "Payload=%(payload)s") % locals())
+ "Payload=%(payload)s")
+ % dict(e=e, payload=payload))
_drivers = None
@@ -166,7 +167,7 @@ def add_driver(notification_driver):
try:
driver = importutils.import_module(notification_driver)
_drivers[notification_driver] = driver
- except ImportError as e:
+ except ImportError:
LOG.exception(_("Failed to load notifier %s. "
"These notifications will not be sent.") %
notification_driver)
diff --git a/ceilometer/openstack/common/notifier/rpc_notifier.py b/ceilometer/openstack/common/notifier/rpc_notifier.py
index e51caba0..c5f554ee 100644
--- a/ceilometer/openstack/common/notifier/rpc_notifier.py
+++ b/ceilometer/openstack/common/notifier/rpc_notifier.py
@@ -41,6 +41,6 @@ def notify(context, message):
topic = '%s.%s' % (topic, priority)
try:
rpc.notify(context, topic, message)
- except Exception, e:
+ except Exception:
LOG.exception(_("Could not send notification to %(topic)s. "
"Payload=%(message)s"), locals())
diff --git a/ceilometer/openstack/common/notifier/rpc_notifier2.py b/ceilometer/openstack/common/notifier/rpc_notifier2.py
new file mode 100644
index 00000000..833e0bbe
--- /dev/null
+++ b/ceilometer/openstack/common/notifier/rpc_notifier2.py
@@ -0,0 +1,51 @@
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+'''messaging based notification driver, with message envelopes'''
+
+from ceilometer.openstack.common import cfg
+from ceilometer.openstack.common import context as req_context
+from ceilometer.openstack.common.gettextutils import _
+from ceilometer.openstack.common import log as logging
+from ceilometer.openstack.common import rpc
+
+LOG = logging.getLogger(__name__)
+
+notification_topic_opt = cfg.ListOpt(
+ 'topics', default=['notifications', ],
+ help='AMQP topic(s) used for openstack notifications')
+
+opt_group = cfg.OptGroup(name='rpc_notifier2',
+ title='Options for rpc_notifier2')
+
+CONF = cfg.CONF
+CONF.register_group(opt_group)
+CONF.register_opt(notification_topic_opt, opt_group)
+
+
+def notify(context, message):
+ """Sends a notification via RPC"""
+ if not context:
+ context = req_context.get_admin_context()
+ priority = message.get('priority',
+ CONF.default_notification_level)
+ priority = priority.lower()
+ for topic in CONF.rpc_notifier2.topics:
+ topic = '%s.%s' % (topic, priority)
+ try:
+ rpc.notify(context, topic, message, envelope=True)
+ except Exception:
+ LOG.exception(_("Could not send notification to %(topic)s. "
+ "Payload=%(message)s"), locals())
diff --git a/ceilometer/openstack/common/policy.py b/ceilometer/openstack/common/policy.py
index 831dac25..3c482548 100644
--- a/ceilometer/openstack/common/policy.py
+++ b/ceilometer/openstack/common/policy.py
@@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
-# Copyright (c) 2011 OpenStack, LLC.
+# Copyright (c) 2012 OpenStack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -15,10 +15,52 @@
# License for the specific language governing permissions and limitations
# under the License.
-"""Common Policy Engine Implementation"""
+"""
+Common Policy Engine Implementation
+Policies can be expressed in one of two forms: A list of lists, or a
+string written in the new policy language.
+
+In the list-of-lists representation, each check inside the innermost
+list is combined as with an "and" conjunction--for that check to pass,
+all the specified checks must pass. These innermost lists are then
+combined as with an "or" conjunction. This is the original way of
+expressing policies, but there now exists a new way: the policy
+language.
+
+In the policy language, each check is specified the same way as in the
+list-of-lists representation: a simple "a:b" pair that is matched to
+the correct code to perform that check. However, conjunction
+operators are available, allowing for more expressiveness in crafting
+policies.
+
+As an example, take the following rule, expressed in the list-of-lists
+representation::
+
+ [["role:admin"], ["project_id:%(project_id)s", "role:projectadmin"]]
+
+In the policy language, this becomes::
+
+ role:admin or (project_id:%(project_id)s and role:projectadmin)
+
+The policy language also has the "not" operator, allowing a richer
+policy rule::
+
+ project_id:%(project_id)s and not role:dunce
+
+Finally, two special policy checks should be mentioned; the policy
+check "@" will always accept an access, and the policy check "!" will
+always reject an access. (Note that if a rule is either the empty
+list ("[]") or the empty string, this is equivalent to the "@" policy
+check.) Of these, the "!" policy check is probably the most useful,
+as it allows particular rules to be explicitly disabled.
+"""
+
+import abc
import logging
+import re
import urllib
+
import urllib2
from ceilometer.openstack.common.gettextutils import _
@@ -28,218 +70,650 @@ from ceilometer.openstack.common import jsonutils
LOG = logging.getLogger(__name__)
-_BRAIN = None
+_rules = None
+_checks = {}
-def set_brain(brain):
- """Set the brain used by enforce().
+class Rules(dict):
+ """
+ A store for rules. Handles the default_rule setting directly.
+ """
- Defaults use Brain() if not set.
+ @classmethod
+ def load_json(cls, data, default_rule=None):
+ """
+ Allow loading of JSON rule data.
+ """
- """
- global _BRAIN
- _BRAIN = brain
+ # Suck in the JSON data and parse the rules
+ rules = dict((k, parse_rule(v)) for k, v in
+ jsonutils.loads(data).items())
+ return cls(rules, default_rule)
-def reset():
- """Clear the brain used by enforce()."""
- global _BRAIN
- _BRAIN = None
+ def __init__(self, rules=None, default_rule=None):
+ """Initialize the Rules store."""
+ super(Rules, self).__init__(rules or {})
+ self.default_rule = default_rule
-def enforce(match_list, target_dict, credentials_dict, exc=None,
- *args, **kwargs):
- """Enforces authorization of some rules against credentials.
+ def __missing__(self, key):
+ """Implements the default rule handling."""
- :param match_list: nested tuples of data to match against
+ # If the default rule isn't actually defined, do something
+ # reasonably intelligent
+ if not self.default_rule or self.default_rule not in self:
+ raise KeyError(key)
- The basic brain supports three types of match lists:
+ return self[self.default_rule]
- 1) rules
+ def __str__(self):
+ """Dumps a string representation of the rules."""
- looks like: ``('rule:compute:get_instance',)``
+ # Start by building the canonical strings for the rules
+ out_rules = {}
+ for key, value in self.items():
+ # Use empty string for singleton TrueCheck instances
+ if isinstance(value, TrueCheck):
+ out_rules[key] = ''
+ else:
+ out_rules[key] = str(value)
- Retrieves the named rule from the rules dict and recursively
- checks against the contents of the rule.
+ # Dump a pretty-printed JSON representation
+ return jsonutils.dumps(out_rules, indent=4)
- 2) roles
- looks like: ``('role:compute:admin',)``
+# Really have to figure out a way to deprecate this
+def set_rules(rules):
+ """Set the rules in use for policy checks."""
- Matches if the specified role is in credentials_dict['roles'].
+ global _rules
- 3) generic
+ _rules = rules
- looks like: ``('tenant_id:%(tenant_id)s',)``
- Substitutes values from the target dict into the match using
- the % operator and matches them against the creds dict.
+# Ditto
+def reset():
+ """Clear the rules used for policy checks."""
- Combining rules:
+ global _rules
- The brain returns True if any of the outer tuple of rules
- match and also True if all of the inner tuples match. You
- can use this to perform simple boolean logic. For
- example, the following rule would return True if the creds
- contain the role 'admin' OR the if the tenant_id matches
- the target dict AND the the creds contains the role
- 'compute_sysadmin':
+ _rules = None
- ::
- {
- "rule:combined": (
- 'role:admin',
- ('tenant_id:%(tenant_id)s', 'role:compute_sysadmin')
- )
- }
+def check(rule, target, creds, exc=None, *args, **kwargs):
+ """
+ Checks authorization of a rule against the target and credentials.
+
+ :param rule: The rule to evaluate.
+ :param target: As much information about the object being operated
+ on as possible, as a dictionary.
+ :param creds: As much information about the user performing the
+ action as possible, as a dictionary.
+ :param exc: Class of the exception to raise if the check fails.
+ Any remaining arguments passed to check() (both
+ positional and keyword arguments) will be passed to
+ the exception class. If exc is not provided, returns
+ False.
+
+ :return: Returns False if the policy does not allow the action and
+ exc is not provided; otherwise, returns a value that
+ evaluates to True. Note: for rules using the "case"
+ expression, this True value will be the specified string
+ from the expression.
+ """
- Note that rule and role are reserved words in the credentials match, so
- you can't match against properties with those names. Custom brains may
- also add new reserved words. For example, the HttpBrain adds http as a
- reserved word.
+ # Allow the rule to be a Check tree
+ if isinstance(rule, BaseCheck):
+ result = rule(target, creds)
+ elif not _rules:
+ # No rules to reference means we're going to fail closed
+ result = False
+ else:
+ try:
+ # Evaluate the rule
+ result = _rules[rule](target, creds)
+ except KeyError:
+ # If the rule doesn't exist, fail closed
+ result = False
- :param target_dict: dict of object properties
+ # If it is False, raise the exception if requested
+ if exc and result is False:
+ raise exc(*args, **kwargs)
- Target dicts contain as much information as we can about the object being
- operated on.
+ return result
- :param credentials_dict: dict of actor properties
- Credentials dicts contain as much information as we can about the user
- performing the action.
+class BaseCheck(object):
+ """
+ Abstract base class for Check classes.
+ """
- :param exc: exception to raise
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def __str__(self):
+ """
+ Retrieve a string representation of the Check tree rooted at
+ this node.
+ """
+
+ pass
+
+ @abc.abstractmethod
+ def __call__(self, target, cred):
+ """
+ Perform the check. Returns False to reject the access or a
+ true value (not necessary True) to accept the access.
+ """
+
+ pass
- Class of the exception to raise if the check fails. Any remaining
- arguments passed to enforce() (both positional and keyword arguments)
- will be passed to the exception class. If exc is not provided, returns
- False.
- :return: True if the policy allows the action
- :return: False if the policy does not allow the action and exc is not set
+class FalseCheck(BaseCheck):
"""
- global _BRAIN
- if not _BRAIN:
- _BRAIN = Brain()
- if not _BRAIN.check(match_list, target_dict, credentials_dict):
- if exc:
- raise exc(*args, **kwargs)
+ A policy check that always returns False (disallow).
+ """
+
+ def __str__(self):
+ """Return a string representation of this check."""
+
+ return "!"
+
+ def __call__(self, target, cred):
+ """Check the policy."""
+
return False
- return True
-class Brain(object):
- """Implements policy checking."""
+class TrueCheck(BaseCheck):
+ """
+ A policy check that always returns True (allow).
+ """
- _checks = {}
+ def __str__(self):
+ """Return a string representation of this check."""
- @classmethod
- def _register(cls, name, func):
- cls._checks[name] = func
+ return "@"
- @classmethod
- def load_json(cls, data, default_rule=None):
- """Init a brain using json instead of a rules dictionary."""
- rules_dict = jsonutils.loads(data)
- return cls(rules=rules_dict, default_rule=default_rule)
+ def __call__(self, target, cred):
+ """Check the policy."""
- def __init__(self, rules=None, default_rule=None):
- if self.__class__ != Brain:
- LOG.warning(_("Inheritance-based rules are deprecated; use "
- "the default brain instead of %s.") %
- self.__class__.__name__)
+ return True
- self.rules = rules or {}
- self.default_rule = default_rule
- def add_rule(self, key, match):
- self.rules[key] = match
+class Check(BaseCheck):
+ """
+ A base class to allow for user-defined policy checks.
+ """
- def _check(self, match, target_dict, cred_dict):
- try:
- match_kind, match_value = match.split(':', 1)
- except Exception:
- LOG.exception(_("Failed to understand rule %(match)r") % locals())
- # If the rule is invalid, fail closed
- return False
+ def __init__(self, kind, match):
+ """
+ :param kind: The kind of the check, i.e., the field before the
+ ':'.
+ :param match: The match of the check, i.e., the field after
+ the ':'.
+ """
- func = None
- try:
- old_func = getattr(self, '_check_%s' % match_kind)
- except AttributeError:
- func = self._checks.get(match_kind, self._checks.get(None, None))
- else:
- LOG.warning(_("Inheritance-based rules are deprecated; update "
- "_check_%s") % match_kind)
- func = lambda brain, kind, value, target, cred: old_func(value,
- target,
- cred)
-
- if not func:
- LOG.error(_("No handler for matches of kind %s") % match_kind)
- # Fail closed
- return False
+ self.kind = kind
+ self.match = match
+
+ def __str__(self):
+ """Return a string representation of this check."""
+
+ return "%s:%s" % (self.kind, self.match)
+
+
+class NotCheck(BaseCheck):
+ """
+ A policy check that inverts the result of another policy check.
+ Implements the "not" operator.
+ """
+
+ def __init__(self, rule):
+ """
+ Initialize the 'not' check.
+
+ :param rule: The rule to negate. Must be a Check.
+ """
+
+ self.rule = rule
+
+ def __str__(self):
+ """Return a string representation of this check."""
+
+ return "not %s" % self.rule
- return func(self, match_kind, match_value, target_dict, cred_dict)
+ def __call__(self, target, cred):
+ """
+ Check the policy. Returns the logical inverse of the wrapped
+ check.
+ """
+
+ return not self.rule(target, cred)
+
+
+class AndCheck(BaseCheck):
+ """
+ A policy check that requires that a list of other checks all
+ return True. Implements the "and" operator.
+ """
- def check(self, match_list, target_dict, cred_dict):
- """Checks authorization of some rules against credentials.
+ def __init__(self, rules):
+ """
+ Initialize the 'and' check.
+
+ :param rules: A list of rules that will be tested.
+ """
- Detailed description of the check with examples in policy.enforce().
+ self.rules = rules
- :param match_list: nested tuples of data to match against
- :param target_dict: dict of object properties
- :param credentials_dict: dict of actor properties
+ def __str__(self):
+ """Return a string representation of this check."""
- :returns: True if the check passes
+ return "(%s)" % ' and '.join(str(r) for r in self.rules)
+ def __call__(self, target, cred):
+ """
+ Check the policy. Requires that all rules accept in order to
+ return True.
"""
- if not match_list:
- return True
- for and_list in match_list:
- if isinstance(and_list, basestring):
- and_list = (and_list,)
- if all([self._check(item, target_dict, cred_dict)
- for item in and_list]):
+
+ for rule in self.rules:
+ if not rule(target, cred):
+ return False
+
+ return True
+
+ def add_check(self, rule):
+ """
+ Allows addition of another rule to the list of rules that will
+ be tested. Returns the AndCheck object for convenience.
+ """
+
+ self.rules.append(rule)
+ return self
+
+
+class OrCheck(BaseCheck):
+ """
+ A policy check that requires that at least one of a list of other
+ checks returns True. Implements the "or" operator.
+ """
+
+ def __init__(self, rules):
+ """
+ Initialize the 'or' check.
+
+ :param rules: A list of rules that will be tested.
+ """
+
+ self.rules = rules
+
+ def __str__(self):
+ """Return a string representation of this check."""
+
+ return "(%s)" % ' or '.join(str(r) for r in self.rules)
+
+ def __call__(self, target, cred):
+ """
+ Check the policy. Requires that at least one rule accept in
+ order to return True.
+ """
+
+ for rule in self.rules:
+ if rule(target, cred):
return True
+
return False
+ def add_check(self, rule):
+ """
+ Allows addition of another rule to the list of rules that will
+ be tested. Returns the OrCheck object for convenience.
+ """
-class HttpBrain(Brain):
- """A brain that can check external urls for policy.
+ self.rules.append(rule)
+ return self
- Posts json blobs for target and credentials.
- Note that this brain is deprecated; the http check is registered
- by default.
+def _parse_check(rule):
+ """
+ Parse a single base check rule into an appropriate Check object.
+ """
+
+ # Handle the special checks
+ if rule == '!':
+ return FalseCheck()
+ elif rule == '@':
+ return TrueCheck()
+
+ try:
+ kind, match = rule.split(':', 1)
+ except Exception:
+ LOG.exception(_("Failed to understand rule %(rule)s") % locals())
+ # If the rule is invalid, we'll fail closed
+ return FalseCheck()
+
+ # Find what implements the check
+ if kind in _checks:
+ return _checks[kind](kind, match)
+ elif None in _checks:
+ return _checks[None](kind, match)
+ else:
+ LOG.error(_("No handler for matches of kind %s") % kind)
+ return FalseCheck()
+
+
+def _parse_list_rule(rule):
+ """
+ Provided for backwards compatibility. Translates the old
+ list-of-lists syntax into a tree of Check objects.
"""
- pass
+ # Empty rule defaults to True
+ if not rule:
+ return TrueCheck()
+
+ # Outer list is joined by "or"; inner list by "and"
+ or_list = []
+ for inner_rule in rule:
+ # Elide empty inner lists
+ if not inner_rule:
+ continue
+
+ # Handle bare strings
+ if isinstance(inner_rule, basestring):
+ inner_rule = [inner_rule]
+
+ # Parse the inner rules into Check objects
+ and_list = [_parse_check(r) for r in inner_rule]
+
+ # Append the appropriate check to the or_list
+ if len(and_list) == 1:
+ or_list.append(and_list[0])
+ else:
+ or_list.append(AndCheck(and_list))
+
+ # If we have only one check, omit the "or"
+ if len(or_list) == 0:
+ return FalseCheck()
+ elif len(or_list) == 1:
+ return or_list[0]
+
+ return OrCheck(or_list)
+
+
+# Used for tokenizing the policy language
+_tokenize_re = re.compile(r'\s+')
+
+
+def _parse_tokenize(rule):
+ """
+ Tokenizer for the policy language.
+
+ Most of the single-character tokens are specified in the
+ _tokenize_re; however, parentheses need to be handled specially,
+ because they can appear inside a check string. Thankfully, those
+ parentheses that appear inside a check string can never occur at
+ the very beginning or end ("%(variable)s" is the correct syntax).
+ """
+
+ for tok in _tokenize_re.split(rule):
+ # Skip empty tokens
+ if not tok or tok.isspace():
+ continue
+
+ # Handle leading parens on the token
+ clean = tok.lstrip('(')
+ for i in range(len(tok) - len(clean)):
+ yield '(', '('
+
+ # If it was only parentheses, continue
+ if not clean:
+ continue
+ else:
+ tok = clean
+
+ # Handle trailing parens on the token
+ clean = tok.rstrip(')')
+ trail = len(tok) - len(clean)
+
+ # Yield the cleaned token
+ lowered = clean.lower()
+ if lowered in ('and', 'or', 'not'):
+ # Special tokens
+ yield lowered, clean
+ elif clean:
+ # Not a special token, but not composed solely of ')'
+ if len(tok) >= 2 and ((tok[0], tok[-1]) in
+ [('"', '"'), ("'", "'")]):
+ # It's a quoted string
+ yield 'string', tok[1:-1]
+ else:
+ yield 'check', _parse_check(clean)
+
+ # Yield the trailing parens
+ for i in range(trail):
+ yield ')', ')'
+
+
+class ParseStateMeta(type):
+ """
+ Metaclass for the ParseState class. Facilitates identifying
+ reduction methods.
+ """
+
+ def __new__(mcs, name, bases, cls_dict):
+ """
+ Create the class. Injects the 'reducers' list, a list of
+ tuples matching token sequences to the names of the
+ corresponding reduction methods.
+ """
+
+ reducers = []
+
+ for key, value in cls_dict.items():
+ if not hasattr(value, 'reducers'):
+ continue
+ for reduction in value.reducers:
+ reducers.append((reduction, key))
+
+ cls_dict['reducers'] = reducers
+
+ return super(ParseStateMeta, mcs).__new__(mcs, name, bases, cls_dict)
+
+
+def reducer(*tokens):
+ """
+ Decorator for reduction methods. Arguments are a sequence of
+ tokens, in order, which should trigger running this reduction
+ method.
+ """
+
+ def decorator(func):
+ # Make sure we have a list of reducer sequences
+ if not hasattr(func, 'reducers'):
+ func.reducers = []
+
+ # Add the tokens to the list of reducer sequences
+ func.reducers.append(list(tokens))
+
+ return func
+
+ return decorator
+
+
+class ParseState(object):
+ """
+ Implement the core of parsing the policy language. Uses a greedy
+ reduction algorithm to reduce a sequence of tokens into a single
+ terminal, the value of which will be the root of the Check tree.
+
+ Note: error reporting is rather lacking. The best we can get with
+ this parser formulation is an overall "parse failed" error.
+ Fortunately, the policy language is simple enough that this
+ shouldn't be that big a problem.
+ """
+
+ __metaclass__ = ParseStateMeta
+
+ def __init__(self):
+ """Initialize the ParseState."""
+
+ self.tokens = []
+ self.values = []
+
+ def reduce(self):
+ """
+ Perform a greedy reduction of the token stream. If a reducer
+ method matches, it will be executed, then the reduce() method
+ will be called recursively to search for any more possible
+ reductions.
+ """
+
+ for reduction, methname in self.reducers:
+ if (len(self.tokens) >= len(reduction) and
+ self.tokens[-len(reduction):] == reduction):
+ # Get the reduction method
+ meth = getattr(self, methname)
+
+ # Reduce the token stream
+ results = meth(*self.values[-len(reduction):])
+
+ # Update the tokens and values
+ self.tokens[-len(reduction):] = [r[0] for r in results]
+ self.values[-len(reduction):] = [r[1] for r in results]
+
+ # Check for any more reductions
+ return self.reduce()
+
+ def shift(self, tok, value):
+ """Adds one more token to the state. Calls reduce()."""
+
+ self.tokens.append(tok)
+ self.values.append(value)
+
+ # Do a greedy reduce...
+ self.reduce()
+
+ @property
+ def result(self):
+ """
+ Obtain the final result of the parse. Raises ValueError if
+ the parse failed to reduce to a single result.
+ """
+
+ if len(self.values) != 1:
+ raise ValueError("Could not parse rule")
+ return self.values[0]
+
+ @reducer('(', 'check', ')')
+ @reducer('(', 'and_expr', ')')
+ @reducer('(', 'or_expr', ')')
+ def _wrap_check(self, _p1, check, _p2):
+ """Turn parenthesized expressions into a 'check' token."""
+
+ return [('check', check)]
+
+ @reducer('check', 'and', 'check')
+ def _make_and_expr(self, check1, _and, check2):
+ """
+ Create an 'and_expr' from two checks joined by the 'and'
+ operator.
+ """
+
+ return [('and_expr', AndCheck([check1, check2]))]
+
+ @reducer('and_expr', 'and', 'check')
+ def _extend_and_expr(self, and_expr, _and, check):
+ """
+ Extend an 'and_expr' by adding one more check.
+ """
+
+ return [('and_expr', and_expr.add_check(check))]
+
+ @reducer('check', 'or', 'check')
+ def _make_or_expr(self, check1, _or, check2):
+ """
+ Create an 'or_expr' from two checks joined by the 'or'
+ operator.
+ """
+
+ return [('or_expr', OrCheck([check1, check2]))]
+
+ @reducer('or_expr', 'or', 'check')
+ def _extend_or_expr(self, or_expr, _or, check):
+ """
+ Extend an 'or_expr' by adding one more check.
+ """
+
+ return [('or_expr', or_expr.add_check(check))]
+
+ @reducer('not', 'check')
+ def _make_not_expr(self, _not, check):
+ """Invert the result of another check."""
+
+ return [('check', NotCheck(check))]
+
+
+def _parse_text_rule(rule):
+ """
+ Translates a policy written in the policy language into a tree of
+ Check objects.
+ """
+
+ # Empty rule means always accept
+ if not rule:
+ return TrueCheck()
+
+ # Parse the token stream
+ state = ParseState()
+ for tok, value in _parse_tokenize(rule):
+ state.shift(tok, value)
+
+ try:
+ return state.result
+ except ValueError:
+ # Couldn't parse the rule
+ LOG.exception(_("Failed to understand rule %(rule)r") % locals())
+
+ # Fail closed
+ return FalseCheck()
+
+
+def parse_rule(rule):
+ """
+ Parses a policy rule into a tree of Check objects.
+ """
+
+ # If the rule is a string, it's in the policy language
+ if isinstance(rule, basestring):
+ return _parse_text_rule(rule)
+ return _parse_list_rule(rule)
def register(name, func=None):
"""
- Register a function as a policy check.
+ Register a function or Check class as a policy check.
:param name: Gives the name of the check type, e.g., 'rule',
- 'role', etc. If name is None, a default function
+ 'role', etc. If name is None, a default check type
will be registered.
- :param func: If given, provides the function to register. If not
- given, returns a function taking one argument to
- specify the function to register, allowing use as a
- decorator.
+ :param func: If given, provides the function or class to register.
+ If not given, returns a function taking one argument
+ to specify the function or class to register,
+ allowing use as a decorator.
"""
- # Perform the actual decoration by registering the function.
- # Returns the function for compliance with the decorator
- # interface.
+ # Perform the actual decoration by registering the function or
+ # class. Returns the function or class for compliance with the
+ # decorator interface.
def decorator(func):
- # Register the function
- Brain._register(name, func)
+ _checks[name] = func
return func
- # If the function is given, do the registration
+ # If the function or class is given, do the registration
if func:
return decorator(func)
@@ -247,55 +721,59 @@ def register(name, func=None):
@register("rule")
-def _check_rule(brain, match_kind, match, target_dict, cred_dict):
- """Recursively checks credentials based on the brains rules."""
- try:
- new_match_list = brain.rules[match]
- except KeyError:
- if brain.default_rule and match != brain.default_rule:
- new_match_list = ('rule:%s' % brain.default_rule,)
- else:
- return False
+class RuleCheck(Check):
+ def __call__(self, target, creds):
+ """
+ Recursively checks credentials based on the defined rules.
+ """
- return brain.check(new_match_list, target_dict, cred_dict)
+ try:
+ return _rules[self.match](target, creds)
+ except KeyError:
+ # We don't have any matching rule; fail closed
+ return False
@register("role")
-def _check_role(brain, match_kind, match, target_dict, cred_dict):
- """Check that there is a matching role in the cred dict."""
- return match.lower() in [x.lower() for x in cred_dict['roles']]
+class RoleCheck(Check):
+ def __call__(self, target, creds):
+ """Check that there is a matching role in the cred dict."""
+
+ return self.match.lower() in [x.lower() for x in creds['roles']]
@register('http')
-def _check_http(brain, match_kind, match, target_dict, cred_dict):
- """Check http: rules by calling to a remote server.
+class HttpCheck(Check):
+ def __call__(self, target, creds):
+ """
+ Check http: rules by calling to a remote server.
- This example implementation simply verifies that the response is
- exactly 'True'. A custom brain using response codes could easily
- be implemented.
+ This example implementation simply verifies that the response
+ is exactly 'True'.
+ """
- """
- url = 'http:' + (match % target_dict)
- data = {'target': jsonutils.dumps(target_dict),
- 'credentials': jsonutils.dumps(cred_dict)}
- post_data = urllib.urlencode(data)
- f = urllib2.urlopen(url, post_data)
- return f.read() == "True"
+ url = ('http:' + self.match) % target
+ data = {'target': jsonutils.dumps(target),
+ 'credentials': jsonutils.dumps(creds)}
+ post_data = urllib.urlencode(data)
+ f = urllib2.urlopen(url, post_data)
+ return f.read() == "True"
@register(None)
-def _check_generic(brain, match_kind, match, target_dict, cred_dict):
- """Check an individual match.
-
- Matches look like:
+class GenericCheck(Check):
+ def __call__(self, target, creds):
+ """
+ Check an individual match.
- tenant:%(tenant_id)s
- role:compute:admin
+ Matches look like:
- """
+ tenant:%(tenant_id)s
+ role:compute:admin
+ """
- # TODO(termie): do dict inspection via dot syntax
- match = match % target_dict
- if match_kind in cred_dict:
- return match == unicode(cred_dict[match_kind])
- return False
+ # TODO(termie): do dict inspection via dot syntax
+ match = self.match % target
+ if self.kind in creds:
+ return match == unicode(creds[self.kind])
+ return False
diff --git a/ceilometer/openstack/common/rpc/__init__.py b/ceilometer/openstack/common/rpc/__init__.py
index e20279bb..e91ac20e 100644
--- a/ceilometer/openstack/common/rpc/__init__.py
+++ b/ceilometer/openstack/common/rpc/__init__.py
@@ -50,25 +50,26 @@ rpc_opts = [
default=['ceilometer.openstack.common.exception',
'nova.exception',
'cinder.exception',
+ 'exceptions',
],
help='Modules of exceptions that are permitted to be recreated'
'upon receiving exception data from an rpc call.'),
cfg.BoolOpt('fake_rabbit',
default=False,
help='If passed, use a fake RabbitMQ provider'),
- #
- # The following options are not registered here, but are expected to be
- # present. The project using this library must register these options with
- # the configuration so that project-specific defaults may be defined.
- #
- #cfg.StrOpt('control_exchange',
- # default='nova',
- # help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
+ cfg.StrOpt('control_exchange',
+ default='openstack',
+ help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
]
cfg.CONF.register_opts(rpc_opts)
+def set_defaults(control_exchange):
+ cfg.set_defaults(rpc_opts,
+ control_exchange=control_exchange)
+
+
def create_connection(new=True):
"""Create a connection to the message bus used for rpc.
@@ -177,17 +178,18 @@ def multicall(context, topic, msg, timeout=None):
return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout)
-def notify(context, topic, msg):
+def notify(context, topic, msg, envelope=False):
"""Send notification event.
:param context: Information that identifies the user that has made this
request.
:param topic: The topic to send the notification to.
:param msg: This is a dict of content of event.
+ :param envelope: Set to True to enable message envelope for notifications.
:returns: None
"""
- return _get_impl().notify(cfg.CONF, context, topic, msg)
+ return _get_impl().notify(cfg.CONF, context, topic, msg, envelope)
def cleanup():
diff --git a/ceilometer/openstack/common/rpc/amqp.py b/ceilometer/openstack/common/rpc/amqp.py
index e86e748a..a69b0ced 100644
--- a/ceilometer/openstack/common/rpc/amqp.py
+++ b/ceilometer/openstack/common/rpc/amqp.py
@@ -26,7 +26,6 @@ AMQP, but is deprecated and predates this code.
"""
import inspect
-import logging
import sys
import uuid
@@ -34,10 +33,10 @@ from eventlet import greenpool
from eventlet import pools
from eventlet import semaphore
-from ceilometer.openstack.common import cfg
from ceilometer.openstack.common import excutils
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import local
+from ceilometer.openstack.common import log as logging
from ceilometer.openstack.common.rpc import common as rpc_common
@@ -55,7 +54,7 @@ class Pool(pools.Pool):
# TODO(comstud): Timeout connections not used in a while
def create(self):
- LOG.debug('Pool creating new connection')
+ LOG.debug(_('Pool creating new connection'))
return self.connection_cls(self.conf)
def empty(self):
@@ -150,7 +149,7 @@ class ConnectionContext(rpc_common.Connection):
def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
- ending=False):
+ ending=False, log_failure=True):
"""Sends a reply or an error on the channel signified by msg_id.
Failure should be a sys.exc_info() tuple.
@@ -158,7 +157,8 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
"""
with ConnectionContext(conf, connection_pool) as conn:
if failure:
- failure = rpc_common.serialize_remote_exception(failure)
+ failure = rpc_common.serialize_remote_exception(failure,
+ log_failure)
try:
msg = {'result': reply, 'failure': failure}
@@ -168,7 +168,7 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
'failure': failure}
if ending:
msg['ending'] = True
- conn.direct_send(msg_id, msg)
+ conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
class RpcContext(rpc_common.CommonRpcContext):
@@ -185,10 +185,10 @@ class RpcContext(rpc_common.CommonRpcContext):
return self.__class__(**values)
def reply(self, reply=None, failure=None, ending=False,
- connection_pool=None):
+ connection_pool=None, log_failure=True):
if self.msg_id:
msg_reply(self.conf, self.msg_id, connection_pool, reply, failure,
- ending)
+ ending, log_failure)
if ending:
self.msg_id = None
@@ -282,11 +282,21 @@ class ProxyCallback(object):
ctxt.reply(rval, None, connection_pool=self.connection_pool)
# This final None tells multicall that it is done.
ctxt.reply(ending=True, connection_pool=self.connection_pool)
- except Exception as e:
- LOG.exception('Exception during message handling')
+ except rpc_common.ClientException as e:
+ LOG.debug(_('Expected exception during message handling (%s)') %
+ e._exc_info[1])
+ ctxt.reply(None, e._exc_info,
+ connection_pool=self.connection_pool,
+ log_failure=False)
+ except Exception:
+ LOG.exception(_('Exception during message handling'))
ctxt.reply(None, sys.exc_info(),
connection_pool=self.connection_pool)
+ def wait(self):
+ """Wait for all callback threads to exit."""
+ self.pool.waitall()
+
class MulticallWaiter(object):
def __init__(self, conf, connection, timeout):
@@ -349,7 +359,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
# that will continue to use the connection. When it's done,
# connection.close() will get called which will put it back into
# the pool
- LOG.debug(_('Making asynchronous call on %s ...'), topic)
+ LOG.debug(_('Making synchronous call on %s ...'), topic)
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
LOG.debug(_('MSG_ID is %s') % (msg_id))
@@ -358,7 +368,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
conn = ConnectionContext(conf, connection_pool)
wait_msg = MulticallWaiter(conf, conn, timeout)
conn.declare_direct_consumer(msg_id, wait_msg)
- conn.topic_send(topic, msg)
+ conn.topic_send(topic, rpc_common.serialize_msg(msg))
return wait_msg
@@ -377,7 +387,7 @@ def cast(conf, context, topic, msg, connection_pool):
LOG.debug(_('Making asynchronous cast on %s...'), topic)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
- conn.topic_send(topic, msg)
+ conn.topic_send(topic, rpc_common.serialize_msg(msg))
def fanout_cast(conf, context, topic, msg, connection_pool):
@@ -385,7 +395,7 @@ def fanout_cast(conf, context, topic, msg, connection_pool):
LOG.debug(_('Making asynchronous fanout cast...'))
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
- conn.fanout_send(topic, msg)
+ conn.fanout_send(topic, rpc_common.serialize_msg(msg))
def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
@@ -393,7 +403,7 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn:
- conn.topic_send(topic, msg)
+ conn.topic_send(topic, rpc_common.serialize_msg(msg))
def fanout_cast_to_server(conf, context, server_params, topic, msg,
@@ -402,15 +412,18 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg,
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn:
- conn.fanout_send(topic, msg)
+ conn.fanout_send(topic, rpc_common.serialize_msg(msg))
-def notify(conf, context, topic, msg, connection_pool):
+def notify(conf, context, topic, msg, connection_pool, envelope):
"""Sends a notification event on a topic."""
- event_type = msg.get('event_type')
- LOG.debug(_('Sending %(event_type)s on %(topic)s'), locals())
+ LOG.debug(_('Sending %(event_type)s on %(topic)s'),
+ dict(event_type=msg.get('event_type'),
+ topic=topic))
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
+ if envelope:
+ msg = rpc_common.serialize_msg(msg, force_envelope=True)
conn.notify_send(topic, msg)
@@ -420,7 +433,4 @@ def cleanup(connection_pool):
def get_control_exchange(conf):
- try:
- return conf.control_exchange
- except cfg.NoSuchOptError:
- return 'openstack'
+ return conf.control_exchange
diff --git a/ceilometer/openstack/common/rpc/common.py b/ceilometer/openstack/common/rpc/common.py
index 84f06c1b..0bfbb8a3 100644
--- a/ceilometer/openstack/common/rpc/common.py
+++ b/ceilometer/openstack/common/rpc/common.py
@@ -18,18 +18,61 @@
# under the License.
import copy
-import logging
+import sys
import traceback
+from ceilometer.openstack.common import cfg
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import importutils
from ceilometer.openstack.common import jsonutils
from ceilometer.openstack.common import local
+from ceilometer.openstack.common import log as logging
+CONF = cfg.CONF
LOG = logging.getLogger(__name__)
+'''RPC Envelope Version.
+
+This version number applies to the top level structure of messages sent out.
+It does *not* apply to the message payload, which must be versioned
+independently. For example, when using rpc APIs, a version number is applied
+for changes to the API being exposed over rpc. This version number is handled
+in the rpc proxy and dispatcher modules.
+
+This version number applies to the message envelope that is used in the
+serialization done inside the rpc layer. See serialize_msg() and
+deserialize_msg().
+
+The current message format (version 2.0) is very simple. It is:
+
+ {
+ 'ceilometer.version': <RPC Envelope Version as a String>,
+ 'ceilometer.message': <Application Message Payload, JSON encoded>
+ }
+
+Message format version '1.0' is just considered to be the messages we sent
+without a message envelope.
+
+So, the current message envelope just includes the envelope version. It may
+eventually contain additional information, such as a signature for the message
+payload.
+
+We will JSON encode the application message payload. The message envelope,
+which includes the JSON encoded application message body, will be passed down
+to the messaging libraries as a dict.
+'''
+_RPC_ENVELOPE_VERSION = '2.0'
+
+_VERSION_KEY = 'ceilometer.version'
+_MESSAGE_KEY = 'ceilometer.message'
+
+
+# TODO(russellb) Turn this on after Grizzly.
+_SEND_RPC_ENVELOPE = False
+
+
class RPCException(Exception):
message = _("An unknown RPC related exception occurred.")
@@ -40,7 +83,7 @@ class RPCException(Exception):
try:
message = self.message % kwargs
- except Exception as e:
+ except Exception:
# kwargs doesn't match a variable in the message
# log the issue and the kwargs
LOG.exception(_('Exception in string format operation'))
@@ -90,6 +133,11 @@ class UnsupportedRpcVersion(RPCException):
"this endpoint.")
+class UnsupportedRpcEnvelopeVersion(RPCException):
+ message = _("Specified RPC envelope version, %(version)s, "
+ "not supported by this endpoint.")
+
+
class Connection(object):
"""A connection, returned by rpc.create_connection().
@@ -164,8 +212,12 @@ class Connection(object):
def _safe_log(log_func, msg, msg_data):
"""Sanitizes the msg_data field before logging."""
- SANITIZE = {'set_admin_password': ('new_pass',),
- 'run_instance': ('admin_password',), }
+ SANITIZE = {'set_admin_password': [('args', 'new_pass')],
+ 'run_instance': [('args', 'admin_password')],
+ 'route_message': [('args', 'message', 'args', 'method_info',
+ 'method_kwargs', 'password'),
+ ('args', 'message', 'args', 'method_info',
+ 'method_kwargs', 'admin_password')]}
has_method = 'method' in msg_data and msg_data['method'] in SANITIZE
has_context_token = '_context_auth_token' in msg_data
@@ -177,14 +229,16 @@ def _safe_log(log_func, msg, msg_data):
msg_data = copy.deepcopy(msg_data)
if has_method:
- method = msg_data['method']
- if method in SANITIZE:
- args_to_sanitize = SANITIZE[method]
- for arg in args_to_sanitize:
- try:
- msg_data['args'][arg] = "<SANITIZED>"
- except KeyError:
- pass
+ for arg in SANITIZE.get(msg_data['method'], []):
+ try:
+ d = msg_data
+ for elem in arg[:-1]:
+ d = d[elem]
+ d[arg[-1]] = '<SANITIZED>'
+ except KeyError, e:
+ LOG.info(_('Failed to sanitize %(item)s. Key error %(err)s'),
+ {'item': arg,
+ 'err': e})
if has_context_token:
msg_data['_context_auth_token'] = '<SANITIZED>'
@@ -195,7 +249,7 @@ def _safe_log(log_func, msg, msg_data):
return log_func(msg, msg_data)
-def serialize_remote_exception(failure_info):
+def serialize_remote_exception(failure_info, log_failure=True):
"""Prepares exception data to be sent over rpc.
Failure_info should be a sys.exc_info() tuple.
@@ -203,8 +257,9 @@ def serialize_remote_exception(failure_info):
"""
tb = traceback.format_exception(*failure_info)
failure = failure_info[1]
- LOG.error(_("Returning exception %s to caller"), unicode(failure))
- LOG.error(tb)
+ if log_failure:
+ LOG.error(_("Returning exception %s to caller"), unicode(failure))
+ LOG.error(tb)
kwargs = {}
if hasattr(failure, 'kwargs'):
@@ -258,7 +313,7 @@ def deserialize_remote_exception(conf, data):
# we cannot necessarily change an exception message so we must override
# the __str__ method.
failure.__class__ = new_ex_type
- except TypeError as e:
+ except TypeError:
# NOTE(ameade): If a core exception then just add the traceback to the
# first exception argument.
failure.args = (message,) + failure.args[1:]
@@ -309,3 +364,107 @@ class CommonRpcContext(object):
context.values['read_deleted'] = read_deleted
return context
+
+
+class ClientException(Exception):
+ """This encapsulates some actual exception that is expected to be
+ hit by an RPC proxy object. Merely instantiating it records the
+ current exception information, which will be passed back to the
+ RPC client without exceptional logging."""
+ def __init__(self):
+ self._exc_info = sys.exc_info()
+
+
+def catch_client_exception(exceptions, func, *args, **kwargs):
+ try:
+ return func(*args, **kwargs)
+ except Exception, e:
+ if type(e) in exceptions:
+ raise ClientException()
+ else:
+ raise
+
+
+def client_exceptions(*exceptions):
+ """Decorator for manager methods that raise expected exceptions.
+ Marking a Manager method with this decorator allows the declaration
+ of expected exceptions that the RPC layer should not consider fatal,
+ and not log as if they were generated in a real error scenario. Note
+ that this will cause listed exceptions to be wrapped in a
+ ClientException, which is used internally by the RPC layer."""
+ def outer(func):
+ def inner(*args, **kwargs):
+ return catch_client_exception(exceptions, func, *args, **kwargs)
+ return inner
+ return outer
+
+
+def version_is_compatible(imp_version, version):
+ """Determine whether versions are compatible.
+
+ :param imp_version: The version implemented
+ :param version: The version requested by an incoming message.
+ """
+ version_parts = version.split('.')
+ imp_version_parts = imp_version.split('.')
+ if int(version_parts[0]) != int(imp_version_parts[0]): # Major
+ return False
+ if int(version_parts[1]) > int(imp_version_parts[1]): # Minor
+ return False
+ return True
+
+
+def serialize_msg(raw_msg, force_envelope=False):
+ if not _SEND_RPC_ENVELOPE and not force_envelope:
+ return raw_msg
+
+ # NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
+ # information about this format.
+ msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,
+ _MESSAGE_KEY: jsonutils.dumps(raw_msg)}
+
+ return msg
+
+
+def deserialize_msg(msg):
+ # NOTE(russellb): Hang on to your hats, this road is about to
+ # get a little bumpy.
+ #
+ # Robustness Principle:
+ # "Be strict in what you send, liberal in what you accept."
+ #
+ # At this point we have to do a bit of guessing about what it
+ # is we just received. Here is the set of possibilities:
+ #
+ # 1) We received a dict. This could be 2 things:
+ #
+ # a) Inspect it to see if it looks like a standard message envelope.
+ # If so, great!
+ #
+ # b) If it doesn't look like a standard message envelope, it could either
+ # be a notification, or a message from before we added a message
+ # envelope (referred to as version 1.0).
+ # Just return the message as-is.
+ #
+ # 2) It's any other non-dict type. Just return it and hope for the best.
+ # This case covers return values from rpc.call() from before message
+ # envelopes were used. (messages to call a method were always a dict)
+
+ if not isinstance(msg, dict):
+ # See #2 above.
+ return msg
+
+ base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY)
+ if not all(map(lambda key: key in msg, base_envelope_keys)):
+ # See #1.b above.
+ return msg
+
+ # At this point we think we have the message envelope
+ # format we were expecting. (#1.a above)
+
+ if not version_is_compatible(_RPC_ENVELOPE_VERSION, msg[_VERSION_KEY]):
+ raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY])
+
+ raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
+
+ return raw_msg
diff --git a/ceilometer/openstack/common/rpc/dispatcher.py b/ceilometer/openstack/common/rpc/dispatcher.py
index d1136acb..927af0db 100644
--- a/ceilometer/openstack/common/rpc/dispatcher.py
+++ b/ceilometer/openstack/common/rpc/dispatcher.py
@@ -41,8 +41,8 @@ server side of the API at the same time. However, as the code stands today,
there can be both versioned and unversioned APIs implemented in the same code
base.
-
-EXAMPLES:
+EXAMPLES
+========
Nova was the first project to use versioned rpc APIs. Consider the compute rpc
API as an example. The client side is in nova/compute/rpcapi.py and the server
@@ -50,12 +50,13 @@ side is in nova/compute/manager.py.
Example 1) Adding a new method.
+-------------------------------
Adding a new method is a backwards compatible change. It should be added to
nova/compute/manager.py, and RPC_API_VERSION should be bumped from X.Y to
X.Y+1. On the client side, the new method in nova/compute/rpcapi.py should
have a specific version specified to indicate the minimum API version that must
-be implemented for the method to be supported. For example:
+be implemented for the method to be supported. For example::
def get_host_uptime(self, ctxt, host):
topic = _compute_topic(self.topic, ctxt, host, None)
@@ -67,10 +68,11 @@ get_host_uptime() method.
Example 2) Adding a new parameter.
+----------------------------------
Adding a new parameter to an rpc method can be made backwards compatible. The
RPC_API_VERSION on the server side (nova/compute/manager.py) should be bumped.
-The implementation of the method must not expect the parameter to be present.
+The implementation of the method must not expect the parameter to be present.::
def some_remote_method(self, arg1, arg2, newarg=None):
# The code needs to deal with newarg=None for cases
@@ -101,21 +103,6 @@ class RpcDispatcher(object):
self.callbacks = callbacks
super(RpcDispatcher, self).__init__()
- @staticmethod
- def _is_compatible(mversion, version):
- """Determine whether versions are compatible.
-
- :param mversion: The API version implemented by a callback.
- :param version: The API version requested by an incoming message.
- """
- version_parts = version.split('.')
- mversion_parts = mversion.split('.')
- if int(version_parts[0]) != int(mversion_parts[0]): # Major
- return False
- if int(version_parts[1]) > int(mversion_parts[1]): # Minor
- return False
- return True
-
def dispatch(self, ctxt, version, method, **kwargs):
"""Dispatch a message based on a requested version.
@@ -137,7 +124,8 @@ class RpcDispatcher(object):
rpc_api_version = proxyobj.RPC_API_VERSION
else:
rpc_api_version = '1.0'
- is_compatible = self._is_compatible(rpc_api_version, version)
+ is_compatible = rpc_common.version_is_compatible(rpc_api_version,
+ version)
had_compatible = had_compatible or is_compatible
if not hasattr(proxyobj, method):
continue
diff --git a/ceilometer/openstack/common/rpc/impl_fake.py b/ceilometer/openstack/common/rpc/impl_fake.py
index f1efab2b..72041bfe 100644
--- a/ceilometer/openstack/common/rpc/impl_fake.py
+++ b/ceilometer/openstack/common/rpc/impl_fake.py
@@ -18,11 +18,15 @@ queues. Casts will block, but this is very useful for tests.
"""
import inspect
+# NOTE(russellb): We specifically want to use json, not our own jsonutils.
+# jsonutils has some extra logic to automatically convert objects to primitive
+# types so that they can be serialized. We want to catch all cases where
+# non-primitive types make it into this code and treat it as an error.
+import json
import time
import eventlet
-from ceilometer.openstack.common import jsonutils
from ceilometer.openstack.common.rpc import common as rpc_common
CONSUMERS = {}
@@ -75,6 +79,8 @@ class Consumer(object):
else:
res.append(rval)
done.send(res)
+ except rpc_common.ClientException as e:
+ done.send_exception(e._exc_info[1])
except Exception as e:
done.send_exception(e)
@@ -121,7 +127,7 @@ def create_connection(conf, new=True):
def check_serialize(msg):
"""Make sure a message intended for rpc can be serialized."""
- jsonutils.dumps(msg)
+ json.dumps(msg)
def multicall(conf, context, topic, msg, timeout=None):
@@ -154,13 +160,14 @@ def call(conf, context, topic, msg, timeout=None):
def cast(conf, context, topic, msg):
+ check_serialize(msg)
try:
call(conf, context, topic, msg)
except Exception:
pass
-def notify(conf, context, topic, msg):
+def notify(conf, context, topic, msg, envelope):
check_serialize(msg)
diff --git a/ceilometer/openstack/common/rpc/impl_kombu.py b/ceilometer/openstack/common/rpc/impl_kombu.py
index 6e06357d..e120e3bb 100644
--- a/ceilometer/openstack/common/rpc/impl_kombu.py
+++ b/ceilometer/openstack/common/rpc/impl_kombu.py
@@ -162,7 +162,8 @@ class ConsumerBase(object):
def _callback(raw_message):
message = self.channel.message_to_python(raw_message)
try:
- callback(message.payload)
+ msg = rpc_common.deserialize_msg(message.payload)
+ callback(msg)
message.ack()
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
@@ -196,7 +197,7 @@ class DirectConsumer(ConsumerBase):
# Default options
options = {'durable': False,
'auto_delete': True,
- 'exclusive': True}
+ 'exclusive': False}
options.update(kwargs)
exchange = kombu.entity.Exchange(name=msg_id,
type='direct',
@@ -269,7 +270,7 @@ class FanoutConsumer(ConsumerBase):
options = {'durable': False,
'queue_arguments': _get_queue_arguments(conf),
'auto_delete': True,
- 'exclusive': True}
+ 'exclusive': False}
options.update(kwargs)
exchange = kombu.entity.Exchange(name=exchange_name, type='fanout',
durable=options['durable'],
@@ -316,7 +317,7 @@ class DirectPublisher(Publisher):
options = {'durable': False,
'auto_delete': True,
- 'exclusive': True}
+ 'exclusive': False}
options.update(kwargs)
super(DirectPublisher, self).__init__(channel, msg_id, msg_id,
type='direct', **options)
@@ -350,7 +351,7 @@ class FanoutPublisher(Publisher):
"""
options = {'durable': False,
'auto_delete': True,
- 'exclusive': True}
+ 'exclusive': False}
options.update(kwargs)
super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic,
None, type='fanout', **options)
@@ -387,6 +388,7 @@ class Connection(object):
def __init__(self, conf, server_params=None):
self.consumers = []
self.consumer_thread = None
+ self.proxy_callbacks = []
self.conf = conf
self.max_retries = self.conf.rabbit_max_retries
# Try forever?
@@ -469,7 +471,7 @@ class Connection(object):
LOG.info(_("Reconnecting to AMQP server on "
"%(hostname)s:%(port)d") % params)
try:
- self.connection.close()
+ self.connection.release()
except self.connection_errors:
pass
# Setting this in case the next statement fails, though
@@ -573,12 +575,14 @@ class Connection(object):
def close(self):
"""Close/release this connection"""
self.cancel_consumer_thread()
+ self.wait_on_proxy_callbacks()
self.connection.release()
self.connection = None
def reset(self):
"""Reset a connection so it can be used again"""
self.cancel_consumer_thread()
+ self.wait_on_proxy_callbacks()
self.channel.close()
self.channel = self.connection.channel()
# work around 'memory' transport bug in 1.1.3
@@ -644,6 +648,11 @@ class Connection(object):
pass
self.consumer_thread = None
+ def wait_on_proxy_callbacks(self):
+ """Wait for all proxy callback threads to exit."""
+ for proxy_cb in self.proxy_callbacks:
+ proxy_cb.wait()
+
def publisher_send(self, cls, topic, msg, **kwargs):
"""Send to a publisher based on the publisher class"""
@@ -719,6 +728,7 @@ class Connection(object):
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
+ self.proxy_callbacks.append(proxy_cb)
if fanout:
self.declare_fanout_consumer(topic, proxy_cb)
@@ -730,6 +740,7 @@ class Connection(object):
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
+ self.proxy_callbacks.append(proxy_cb)
self.declare_topic_consumer(topic, proxy_cb, pool_name)
@@ -782,11 +793,12 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg):
rpc_amqp.get_connection_pool(conf, Connection))
-def notify(conf, context, topic, msg):
+def notify(conf, context, topic, msg, envelope):
"""Sends a notification event on a topic."""
return rpc_amqp.notify(
conf, context, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
+ rpc_amqp.get_connection_pool(conf, Connection),
+ envelope)
def cleanup():
diff --git a/ceilometer/openstack/common/rpc/impl_qpid.py b/ceilometer/openstack/common/rpc/impl_qpid.py
index 0a6c4fac..0defe021 100644
--- a/ceilometer/openstack/common/rpc/impl_qpid.py
+++ b/ceilometer/openstack/common/rpc/impl_qpid.py
@@ -17,21 +17,23 @@
import functools
import itertools
-import logging
import time
import uuid
import eventlet
import greenlet
-import qpid.messaging
-import qpid.messaging.exceptions
from ceilometer.openstack.common import cfg
from ceilometer.openstack.common.gettextutils import _
+from ceilometer.openstack.common import importutils
from ceilometer.openstack.common import jsonutils
+from ceilometer.openstack.common import log as logging
from ceilometer.openstack.common.rpc import amqp as rpc_amqp
from ceilometer.openstack.common.rpc import common as rpc_common
+qpid_messaging = importutils.try_import("qpid.messaging")
+qpid_exceptions = importutils.try_import("qpid.messaging.exceptions")
+
LOG = logging.getLogger(__name__)
qpid_opts = [
@@ -41,6 +43,9 @@ qpid_opts = [
cfg.StrOpt('qpid_port',
default='5672',
help='Qpid broker port'),
+ cfg.ListOpt('qpid_hosts',
+ default=['$qpid_hostname:$qpid_port'],
+ help='Qpid HA cluster host:port pairs'),
cfg.StrOpt('qpid_username',
default='',
help='Username for qpid connection'),
@@ -121,7 +126,8 @@ class ConsumerBase(object):
"""Fetch the message and pass it to the callback object"""
message = self.receiver.fetch()
try:
- self.callback(message.content)
+ msg = rpc_common.deserialize_msg(message.content)
+ self.callback(msg)
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
finally:
@@ -271,28 +277,38 @@ class Connection(object):
pool = None
def __init__(self, conf, server_params=None):
+ if not qpid_messaging:
+ raise ImportError("Failed to import qpid.messaging")
+
self.session = None
self.consumers = {}
self.consumer_thread = None
+ self.proxy_callbacks = []
self.conf = conf
+ if server_params and 'hostname' in server_params:
+ # NOTE(russellb) This enables support for cast_to_server.
+ server_params['qpid_hosts'] = [
+ '%s:%d' % (server_params['hostname'],
+ server_params.get('port', 5672))
+ ]
+
params = {
- 'hostname': self.conf.qpid_hostname,
- 'port': self.conf.qpid_port,
+ 'qpid_hosts': self.conf.qpid_hosts,
'username': self.conf.qpid_username,
'password': self.conf.qpid_password,
}
params.update(server_params or {})
- self.broker = params['hostname'] + ":" + str(params['port'])
+ self.brokers = params['qpid_hosts']
self.username = params['username']
self.password = params['password']
- self.connection_create()
+ self.connection_create(self.brokers[0])
self.reconnect()
- def connection_create(self):
+ def connection_create(self, broker):
# Create the connection - this does not open the connection
- self.connection = qpid.messaging.Connection(self.broker)
+ self.connection = qpid_messaging.Connection(broker)
# Check if flags are set and if so set them for the connection
# before we call open
@@ -317,15 +333,19 @@ class Connection(object):
if self.connection.opened():
try:
self.connection.close()
- except qpid.messaging.exceptions.ConnectionError:
+ except qpid_exceptions.ConnectionError:
pass
+ attempt = 0
delay = 1
while True:
+ broker = self.brokers[attempt % len(self.brokers)]
+ attempt += 1
+
try:
- self.connection_create()
+ self.connection_create(broker)
self.connection.open()
- except qpid.messaging.exceptions.ConnectionError, e:
+ except qpid_exceptions.ConnectionError, e:
msg_dict = dict(e=e, delay=delay)
msg = _("Unable to connect to AMQP server: %(e)s. "
"Sleeping %(delay)s seconds") % msg_dict
@@ -333,10 +353,9 @@ class Connection(object):
time.sleep(delay)
delay = min(2 * delay, 60)
else:
+ LOG.info(_('Connected to AMQP server on %s'), broker)
break
- LOG.info(_('Connected to AMQP server on %s'), self.broker)
-
self.session = self.connection.session()
if self.consumers:
@@ -353,8 +372,8 @@ class Connection(object):
while True:
try:
return method(*args, **kwargs)
- except (qpid.messaging.exceptions.Empty,
- qpid.messaging.exceptions.ConnectionError), e:
+ except (qpid_exceptions.Empty,
+ qpid_exceptions.ConnectionError), e:
if error_callback:
error_callback(e)
self.reconnect()
@@ -362,12 +381,14 @@ class Connection(object):
def close(self):
"""Close/release this connection"""
self.cancel_consumer_thread()
+ self.wait_on_proxy_callbacks()
self.connection.close()
self.connection = None
def reset(self):
"""Reset a connection so it can be used again"""
self.cancel_consumer_thread()
+ self.wait_on_proxy_callbacks()
self.session.close()
self.session = self.connection.session()
self.consumers = {}
@@ -392,7 +413,7 @@ class Connection(object):
"""Return an iterator that will consume from all queues/consumers"""
def _error_callback(exc):
- if isinstance(exc, qpid.messaging.exceptions.Empty):
+ if isinstance(exc, qpid_exceptions.Empty):
LOG.exception(_('Timed out waiting for RPC response: %s') %
str(exc))
raise rpc_common.Timeout()
@@ -422,6 +443,11 @@ class Connection(object):
pass
self.consumer_thread = None
+ def wait_on_proxy_callbacks(self):
+ """Wait for all proxy callback threads to exit."""
+ for proxy_cb in self.proxy_callbacks:
+ proxy_cb.wait()
+
def publisher_send(self, cls, topic, msg):
"""Send to a publisher based on the publisher class"""
@@ -497,6 +523,7 @@ class Connection(object):
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
+ self.proxy_callbacks.append(proxy_cb)
if fanout:
consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb)
@@ -512,6 +539,7 @@ class Connection(object):
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
+ self.proxy_callbacks.append(proxy_cb)
consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb,
name=pool_name)
@@ -570,10 +598,11 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg):
rpc_amqp.get_connection_pool(conf, Connection))
-def notify(conf, context, topic, msg):
+def notify(conf, context, topic, msg, envelope):
"""Sends a notification event on a topic."""
return rpc_amqp.notify(conf, context, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
+ rpc_amqp.get_connection_pool(conf, Connection),
+ envelope)
def cleanup():
diff --git a/ceilometer/openstack/common/rpc/impl_zmq.py b/ceilometer/openstack/common/rpc/impl_zmq.py
index 2db31faa..1c22ec8d 100644
--- a/ceilometer/openstack/common/rpc/impl_zmq.py
+++ b/ceilometer/openstack/common/rpc/impl_zmq.py
@@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import os
import pprint
import socket
import string
@@ -22,15 +23,16 @@ import types
import uuid
import eventlet
-from eventlet.green import zmq
import greenlet
from ceilometer.openstack.common import cfg
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import importutils
from ceilometer.openstack.common import jsonutils
+from ceilometer.openstack.common import processutils as utils
from ceilometer.openstack.common.rpc import common as rpc_common
+zmq = importutils.try_import('eventlet.green.zmq')
# for convenience, are not modified.
pformat = pprint.pformat
@@ -61,6 +63,10 @@ zmq_opts = [
cfg.IntOpt('rpc_zmq_contexts', default=1,
help='Number of ZeroMQ contexts, defaults to 1'),
+ cfg.IntOpt('rpc_zmq_topic_backlog', default=None,
+ help='Maximum number of ingress messages to locally buffer '
+ 'per topic. Default is unlimited.'),
+
cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
help='Directory for holding IPC sockets'),
@@ -70,9 +76,9 @@ zmq_opts = [
]
-# These globals are defined in register_opts(conf),
-# a mandatory initialization call
-CONF = None
+CONF = cfg.CONF
+CONF.register_opts(zmq_opts)
+
ZMQ_CTX = None # ZeroMQ Context, must be global.
matchmaker = None # memoized matchmaker object
@@ -107,7 +113,7 @@ class ZmqSocket(object):
"""
def __init__(self, addr, zmq_type, bind=True, subscribe=None):
- self.sock = ZMQ_CTX.socket(zmq_type)
+ self.sock = _get_ctxt().socket(zmq_type)
self.addr = addr
self.type = zmq_type
self.subscriptions = []
@@ -181,11 +187,15 @@ class ZmqSocket(object):
pass
self.subscriptions = []
- # Linger -1 prevents lost/dropped messages
try:
- self.sock.close(linger=-1)
+ # Default is to linger
+ self.sock.close()
except Exception:
- pass
+ # While this is a bad thing to happen,
+ # it would be much worse if some of the code calling this
+ # were to fail. For now, lets log, and later evaluate
+ # if we can safely raise here.
+ LOG.error("ZeroMQ socket could not be closed.")
self.sock = None
def recv(self):
@@ -202,10 +212,14 @@ class ZmqSocket(object):
class ZmqClient(object):
"""Client for ZMQ sockets."""
- def __init__(self, addr, socket_type=zmq.PUSH, bind=False):
+ def __init__(self, addr, socket_type=None, bind=False):
+ if socket_type is None:
+ socket_type = zmq.PUSH
self.outq = ZmqSocket(addr, socket_type, bind=bind)
- def cast(self, msg_id, topic, data):
+ def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
+ if serialize:
+ data = rpc_common.serialize_msg(data, force_envelope)
self.outq.send([str(msg_id), str(topic), str('cast'),
_serialize(data)])
@@ -250,7 +264,7 @@ class InternalContext(object):
"""Process a curried message and cast the result to topic."""
LOG.debug(_("Running func with context: %s"), ctx.to_dict())
data.setdefault('version', None)
- data.setdefault('args', [])
+ data.setdefault('args', {})
try:
result = proxy.dispatch(
@@ -259,7 +273,14 @@ class InternalContext(object):
except greenlet.GreenletExit:
# ignore these since they are just from shutdowns
pass
+ except rpc_common.ClientException, e:
+ LOG.debug(_("Expected exception during message handling (%s)") %
+ e._exc_info[1])
+ return {'exc':
+ rpc_common.serialize_remote_exception(e._exc_info,
+ log_failure=False)}
except Exception:
+ LOG.error(_("Exception during message handling"))
return {'exc':
rpc_common.serialize_remote_exception(sys.exc_info())}
@@ -314,7 +335,7 @@ class ConsumerBase(object):
return
data.setdefault('version', None)
- data.setdefault('args', [])
+ data.setdefault('args', {})
proxy.dispatch(ctx, data['version'],
data['method'], **data['args'])
@@ -404,12 +425,6 @@ class ZmqProxy(ZmqBaseReactor):
super(ZmqProxy, self).__init__(conf)
self.topic_proxy = {}
- ipc_dir = CONF.rpc_zmq_ipc_dir
-
- self.topic_proxy['zmq_replies'] = \
- ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ),
- zmq.PUB, bind=True)
- self.sockets.append(self.topic_proxy['zmq_replies'])
def consume(self, sock):
ipc_dir = CONF.rpc_zmq_ipc_dir
@@ -426,7 +441,7 @@ class ZmqProxy(ZmqBaseReactor):
sock_type = zmq.PUB
elif topic.startswith('zmq_replies'):
sock_type = zmq.PUB
- inside = _deserialize(in_msg)
+ inside = rpc_common.deserialize_msg(_deserialize(in_msg))
msg_id = inside[-1]['args']['msg_id']
response = inside[-1]['args']['response']
LOG.debug(_("->response->%s"), response)
@@ -435,20 +450,81 @@ class ZmqProxy(ZmqBaseReactor):
sock_type = zmq.PUSH
if not topic in self.topic_proxy:
- outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
- sock_type, bind=True)
- self.topic_proxy[topic] = outq
- self.sockets.append(outq)
- LOG.info(_("Created topic proxy: %s"), topic)
+ def publisher(waiter):
+ LOG.info(_("Creating proxy for topic: %s"), topic)
- # It takes some time for a pub socket to open,
- # before we can have any faith in doing a send() to it.
- if sock_type == zmq.PUB:
- eventlet.sleep(.5)
+ try:
+ out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" %
+ (ipc_dir, topic),
+ sock_type, bind=True)
+ except RPCException:
+ waiter.send_exception(*sys.exc_info())
+ return
+
+ self.topic_proxy[topic] = eventlet.queue.LightQueue(
+ CONF.rpc_zmq_topic_backlog)
+ self.sockets.append(out_sock)
+
+ # It takes some time for a pub socket to open,
+ # before we can have any faith in doing a send() to it.
+ if sock_type == zmq.PUB:
+ eventlet.sleep(.5)
+
+ waiter.send(True)
+
+ while(True):
+ data = self.topic_proxy[topic].get()
+ out_sock.send(data)
+ LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") %
+ {'data': data})
+
+ wait_sock_creation = eventlet.event.Event()
+ eventlet.spawn(publisher, wait_sock_creation)
+
+ try:
+ wait_sock_creation.wait()
+ except RPCException:
+ LOG.error(_("Topic socket file creation failed."))
+ return
- LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data})
- self.topic_proxy[topic].send(data)
- LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data})
+ try:
+ self.topic_proxy[topic].put_nowait(data)
+ LOG.debug(_("ROUTER RELAY-OUT QUEUED %(data)s") %
+ {'data': data})
+ except eventlet.queue.Full:
+ LOG.error(_("Local per-topic backlog buffer full for topic "
+ "%(topic)s. Dropping message.") % {'topic': topic})
+
+ def consume_in_thread(self):
+ """Runs the ZmqProxy service"""
+ ipc_dir = CONF.rpc_zmq_ipc_dir
+ consume_in = "tcp://%s:%s" % \
+ (CONF.rpc_zmq_bind_address,
+ CONF.rpc_zmq_port)
+ consumption_proxy = InternalContext(None)
+
+ if not os.path.isdir(ipc_dir):
+ try:
+ utils.execute('mkdir', '-p', ipc_dir, run_as_root=True)
+ utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()),
+ ipc_dir, run_as_root=True)
+ utils.execute('chmod', '750', ipc_dir, run_as_root=True)
+ except utils.ProcessExecutionError:
+ LOG.error(_("Could not create IPC directory %s") %
+ (ipc_dir, ))
+ raise
+
+ try:
+ self.register(consumption_proxy,
+ consume_in,
+ zmq.PULL,
+ out_bind=True)
+ except zmq.ZMQError:
+ LOG.error(_("Could not create ZeroMQ receiver daemon. "
+ "Socket may already be in use."))
+ raise
+
+ super(ZmqProxy, self).consume_in_thread()
class ZmqReactor(ZmqBaseReactor):
@@ -473,7 +549,7 @@ class ZmqReactor(ZmqBaseReactor):
msg_id, topic, style, in_msg = data
- ctx, request = _deserialize(in_msg)
+ ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg))
ctx = RpcContext.unmarshal(ctx)
proxy = self.proxies[sock]
@@ -524,7 +600,8 @@ class Connection(rpc_common.Connection):
self.reactor.consume_in_thread()
-def _cast(addr, context, msg_id, topic, msg, timeout=None):
+def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
+ force_envelope=False):
timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg]
@@ -533,7 +610,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None):
conn = ZmqClient(addr)
# assumes cast can't return an exception
- conn.cast(msg_id, topic, payload)
+ conn.cast(msg_id, topic, payload, serialize, force_envelope)
except zmq.ZMQError:
raise RPCException("Cast failed. ZMQ Socket Exception")
finally:
@@ -541,7 +618,8 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None):
conn.close()
-def _call(addr, context, msg_id, topic, msg, timeout=None):
+def _call(addr, context, msg_id, topic, msg, timeout=None,
+ serialize=True, force_envelope=False):
# timeout_response is how long we wait for a response
timeout = timeout or CONF.rpc_response_timeout
@@ -576,7 +654,8 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
)
LOG.debug(_("Sending cast"))
- _cast(addr, context, msg_id, topic, payload)
+ _cast(addr, context, msg_id, topic, payload,
+ serialize=serialize, force_envelope=force_envelope)
LOG.debug(_("Cast sent; Waiting reply"))
# Blocks until receives reply
@@ -602,7 +681,8 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
return responses[-1]
-def _multi_send(method, context, topic, msg, timeout=None):
+def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
+ force_envelope=False):
"""
Wraps the sending of messages,
dispatches to the matchmaker and sends
@@ -611,7 +691,7 @@ def _multi_send(method, context, topic, msg, timeout=None):
conf = CONF
LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
- queues = matchmaker.queues(topic)
+ queues = _get_matchmaker().queues(topic)
LOG.debug(_("Sending message(s) to: %s"), queues)
# Don't stack if we have no matchmaker results
@@ -628,9 +708,11 @@ def _multi_send(method, context, topic, msg, timeout=None):
if method.__name__ == '_cast':
eventlet.spawn_n(method, _addr, context,
- _topic, _topic, msg, timeout)
+ _topic, _topic, msg, timeout, serialize,
+ force_envelope)
return
- return method(_addr, context, _topic, _topic, msg, timeout)
+ return method(_addr, context, _topic, _topic, msg, timeout,
+ serialize, force_envelope)
def create_connection(conf, new=True):
@@ -669,38 +751,37 @@ def notify(conf, context, topic, msg, **kwargs):
# NOTE(ewindisch): dot-priority in rpc notifier does not
# work with our assumptions.
topic.replace('.', '-')
+ kwargs['serialize'] = kwargs.pop('envelope')
+ kwargs['force_envelope'] = True
cast(conf, context, topic, msg, **kwargs)
def cleanup():
"""Clean up resources in use by implementation."""
global ZMQ_CTX
+ if ZMQ_CTX:
+ ZMQ_CTX.term()
+ ZMQ_CTX = None
+
global matchmaker
matchmaker = None
- ZMQ_CTX.term()
- ZMQ_CTX = None
-def register_opts(conf):
- """Registration of options for this driver."""
- #NOTE(ewindisch): ZMQ_CTX and matchmaker
- # are initialized here as this is as good
- # an initialization method as any.
+def _get_ctxt():
+ if not zmq:
+ raise ImportError("Failed to import eventlet.green.zmq")
- # We memoize through these globals
global ZMQ_CTX
- global matchmaker
- global CONF
-
- if not CONF:
- conf.register_opts(zmq_opts)
- CONF = conf
- # Don't re-set, if this method is called twice.
if not ZMQ_CTX:
- ZMQ_CTX = zmq.Context(conf.rpc_zmq_contexts)
+ ZMQ_CTX = zmq.Context(CONF.rpc_zmq_contexts)
+ return ZMQ_CTX
+
+
+def _get_matchmaker():
+ global matchmaker
if not matchmaker:
# rpc_zmq_matchmaker should be set to a 'module.Class'
- mm_path = conf.rpc_zmq_matchmaker.split('.')
+ mm_path = CONF.rpc_zmq_matchmaker.split('.')
mm_module = '.'.join(mm_path[:-1])
mm_class = mm_path[-1]
@@ -713,6 +794,4 @@ def register_opts(conf):
mm_impl = importutils.import_module(mm_module)
mm_constructor = getattr(mm_impl, mm_class)
matchmaker = mm_constructor()
-
-
-register_opts(cfg.CONF)
+ return matchmaker
diff --git a/ceilometer/openstack/common/rpc/matchmaker.py b/ceilometer/openstack/common/rpc/matchmaker.py
index bf4f85d4..87679e8e 100644
--- a/ceilometer/openstack/common/rpc/matchmaker.py
+++ b/ceilometer/openstack/common/rpc/matchmaker.py
@@ -21,10 +21,10 @@ return keys for direct exchanges, per (approximate) AMQP parlance.
import contextlib
import itertools
import json
-import logging
from ceilometer.openstack.common import cfg
from ceilometer.openstack.common.gettextutils import _
+from ceilometer.openstack.common import log as logging
matchmaker_opts = [
diff --git a/ceilometer/openstack/common/service.py b/ceilometer/openstack/common/service.py
index 0f22d4a8..57e5bc00 100644
--- a/ceilometer/openstack/common/service.py
+++ b/ceilometer/openstack/common/service.py
@@ -27,20 +27,17 @@ import sys
import time
import eventlet
-import greenlet
import logging as std_logging
from ceilometer.openstack.common import cfg
from ceilometer.openstack.common import eventlet_backdoor
from ceilometer.openstack.common.gettextutils import _
+from ceilometer.openstack.common import importutils
from ceilometer.openstack.common import log as logging
from ceilometer.openstack.common import threadgroup
-try:
- from ceilometer.openstack.common import rpc
-except ImportError:
- rpc = None
+rpc = importutils.try_import('ceilometer.openstack.common.rpc')
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
@@ -54,7 +51,7 @@ class Launcher(object):
:returns: None
"""
- self._services = []
+ self._services = threadgroup.ThreadGroup()
eventlet_backdoor.initialize_if_enabled()
@staticmethod
@@ -75,8 +72,7 @@ class Launcher(object):
:returns: None
"""
- gt = eventlet.spawn(self.run_service, service)
- self._services.append(gt)
+ self._services.add_thread(self.run_service, service)
def stop(self):
"""Stop all services which are currently running.
@@ -84,8 +80,7 @@ class Launcher(object):
:returns: None
"""
- for service in self._services:
- service.kill()
+ self._services.stop()
def wait(self):
"""Waits until all services have been stopped, and then returns.
@@ -93,11 +88,7 @@ class Launcher(object):
:returns: None
"""
- for service in self._services:
- try:
- service.wait()
- except greenlet.GreenletExit:
- pass
+ self._services.wait()
class SignalExit(SystemExit):
@@ -132,9 +123,9 @@ class ServiceLauncher(Launcher):
except SystemExit as exc:
status = exc.code
finally:
- self.stop()
if rpc:
rpc.cleanup()
+ self.stop()
return status
@@ -252,7 +243,10 @@ class ProcessLauncher(object):
def _wait_child(self):
try:
- pid, status = os.wait()
+ # Don't block if no child processes have exited
+ pid, status = os.waitpid(0, os.WNOHANG)
+ if not pid:
+ return None
except OSError as exc:
if exc.errno not in (errno.EINTR, errno.ECHILD):
raise
@@ -260,10 +254,12 @@ class ProcessLauncher(object):
if os.WIFSIGNALED(status):
sig = os.WTERMSIG(status)
- LOG.info(_('Child %(pid)d killed by signal %(sig)d'), locals())
+ LOG.info(_('Child %(pid)d killed by signal %(sig)d'),
+ dict(pid=pid, sig=sig))
else:
code = os.WEXITSTATUS(status)
- LOG.info(_('Child %(pid)d exited with status %(code)d'), locals())
+ LOG.info(_('Child %(pid)s exited with status %(code)d'),
+ dict(pid=pid, code=code))
if pid not in self.children:
LOG.warning(_('pid %d not in child list'), pid)
@@ -282,6 +278,10 @@ class ProcessLauncher(object):
while self.running:
wrap = self._wait_child()
if not wrap:
+ # Yield to other threads if no children have exited
+ # Sleep for a short time to avoid excessive CPU usage
+ # (see bug #1095346)
+ eventlet.greenthread.sleep(.01)
continue
while self.running and len(wrap.children) < wrap.workers:
@@ -309,8 +309,8 @@ class ProcessLauncher(object):
class Service(object):
"""Service object for binaries running on hosts."""
- def __init__(self):
- self.tg = threadgroup.ThreadGroup('service')
+ def __init__(self, threads=1000):
+ self.tg = threadgroup.ThreadGroup(threads)
def start(self):
pass
diff --git a/ceilometer/openstack/common/setup.py b/ceilometer/openstack/common/setup.py
index e6f72f03..81a3d20e 100644
--- a/ceilometer/openstack/common/setup.py
+++ b/ceilometer/openstack/common/setup.py
@@ -1,6 +1,7 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
+# Copyright 2012-2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -19,7 +20,7 @@
Utilities with minimum-depends for use in setup.py
"""
-import datetime
+import email
import os
import re
import subprocess
@@ -33,11 +34,12 @@ def parse_mailmap(mailmap='.mailmap'):
if os.path.exists(mailmap):
with open(mailmap, 'r') as fp:
for l in fp:
- l = l.strip()
- if not l.startswith('#') and ' ' in l:
- canonical_email, alias = [x for x in l.split(' ')
- if x.startswith('<')]
- mapping[alias] = canonical_email
+ try:
+ canonical_email, alias = re.match(
+ r'[^#]*?(<.+>).*(<.+>).*', l).groups()
+ except AttributeError:
+ continue
+ mapping[alias] = canonical_email
return mapping
@@ -45,8 +47,8 @@ def canonicalize_emails(changelog, mapping):
"""Takes in a string and an email alias mapping and replaces all
instances of the aliases in the string with their real email.
"""
- for alias, email in mapping.iteritems():
- changelog = changelog.replace(alias, email)
+ for alias, email_address in mapping.iteritems():
+ changelog = changelog.replace(alias, email_address)
return changelog
@@ -106,23 +108,17 @@ def parse_dependency_links(requirements_files=['requirements.txt',
return dependency_links
-def write_requirements():
- venv = os.environ.get('VIRTUAL_ENV', None)
- if venv is not None:
- with open("requirements.txt", "w") as req_file:
- output = subprocess.Popen(["pip", "-E", venv, "freeze", "-l"],
- stdout=subprocess.PIPE)
- requirements = output.communicate()[0].strip()
- req_file.write(requirements)
-
-
-def _run_shell_command(cmd):
+def _run_shell_command(cmd, throw_on_error=False):
if os.name == 'nt':
output = subprocess.Popen(["cmd.exe", "/C", cmd],
- stdout=subprocess.PIPE)
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
else:
output = subprocess.Popen(["/bin/sh", "-c", cmd],
- stdout=subprocess.PIPE)
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ if output.returncode and throw_on_error:
+ raise Exception("%s returned %d" % cmd, output.returncode)
out = output.communicate()
if len(out) == 0:
return None
@@ -131,57 +127,6 @@ def _run_shell_command(cmd):
return out[0].strip()
-def _get_git_next_version_suffix(branch_name):
- datestamp = datetime.datetime.now().strftime('%Y%m%d')
- if branch_name == 'milestone-proposed':
- revno_prefix = "r"
- else:
- revno_prefix = ""
- _run_shell_command("git fetch origin +refs/meta/*:refs/remotes/meta/*")
- milestone_cmd = "git show meta/openstack/release:%s" % branch_name
- milestonever = _run_shell_command(milestone_cmd)
- if milestonever:
- first_half = "%s~%s" % (milestonever, datestamp)
- else:
- first_half = datestamp
-
- post_version = _get_git_post_version()
- # post version should look like:
- # 0.1.1.4.gcc9e28a
- # where the bit after the last . is the short sha, and the bit between
- # the last and second to last is the revno count
- (revno, sha) = post_version.split(".")[-2:]
- second_half = "%s%s.%s" % (revno_prefix, revno, sha)
- return ".".join((first_half, second_half))
-
-
-def _get_git_current_tag():
- return _run_shell_command("git tag --contains HEAD")
-
-
-def _get_git_tag_info():
- return _run_shell_command("git describe --tags")
-
-
-def _get_git_post_version():
- current_tag = _get_git_current_tag()
- if current_tag is not None:
- return current_tag
- else:
- tag_info = _get_git_tag_info()
- if tag_info is None:
- base_version = "0.0"
- cmd = "git --no-pager log --oneline"
- out = _run_shell_command(cmd)
- revno = len(out.split("\n"))
- sha = _run_shell_command("git describe --always")
- else:
- tag_infos = tag_info.split("-")
- base_version = "-".join(tag_infos[:-2])
- (revno, sha) = tag_infos[-2:]
- return "%s.%s.%s" % (base_version, revno, sha)
-
-
def write_git_changelog():
"""Write a changelog based on the git changelog."""
new_changelog = 'ChangeLog'
@@ -227,26 +172,6 @@ _rst_template = """%(heading)s
"""
-def read_versioninfo(project):
- """Read the versioninfo file. If it doesn't exist, we're in a github
- zipball, and there's really no way to know what version we really
- are, but that should be ok, because the utility of that should be
- just about nil if this code path is in use in the first place."""
- versioninfo_path = os.path.join(project, 'versioninfo')
- if os.path.exists(versioninfo_path):
- with open(versioninfo_path, 'r') as vinfo:
- version = vinfo.read().strip()
- else:
- version = "0.0.0"
- return version
-
-
-def write_versioninfo(project, version):
- """Write a simple file containing the version of the package."""
- with open(os.path.join(project, 'versioninfo'), 'w') as fil:
- fil.write("%s\n" % version)
-
-
def get_cmdclass():
"""Return dict of commands to run from setup.py."""
@@ -276,6 +201,9 @@ def get_cmdclass():
from sphinx.setup_command import BuildDoc
class LocalBuildDoc(BuildDoc):
+
+ builders = ['html', 'man']
+
def generate_autoindex(self):
print "**Autodocumenting from %s" % os.path.abspath(os.curdir)
modules = {}
@@ -311,56 +239,83 @@ def get_cmdclass():
if not os.getenv('SPHINX_DEBUG'):
self.generate_autoindex()
- for builder in ['html', 'man']:
+ for builder in self.builders:
self.builder = builder
self.finalize_options()
self.project = self.distribution.get_name()
self.version = self.distribution.get_version()
self.release = self.distribution.get_version()
BuildDoc.run(self)
+
+ class LocalBuildLatex(LocalBuildDoc):
+ builders = ['latex']
+
cmdclass['build_sphinx'] = LocalBuildDoc
+ cmdclass['build_sphinx_latex'] = LocalBuildLatex
except ImportError:
pass
return cmdclass
-def get_git_branchname():
- for branch in _run_shell_command("git branch --color=never").split("\n"):
- if branch.startswith('*'):
- _branch_name = branch.split()[1].strip()
- if _branch_name == "(no":
- _branch_name = "no-branch"
- return _branch_name
-
+def get_version_from_git(pre_version):
+ """Return a version which is equal to the tag that's on the current
+ revision if there is one, or tag plus number of additional revisions
+ if the current revision has no tag."""
-def get_pre_version(projectname, base_version):
- """Return a version which is leading up to a version that will
- be released in the future."""
if os.path.isdir('.git'):
- current_tag = _get_git_current_tag()
- if current_tag is not None:
- version = current_tag
+ if pre_version:
+ try:
+ return _run_shell_command(
+ "git describe --exact-match",
+ throw_on_error=True).replace('-', '.')
+ except Exception:
+ sha = _run_shell_command("git log -n1 --pretty=format:%h")
+ describe = _run_shell_command("git describe --always")
+ revno = describe.rsplit("-", 2)[-2]
+ return "%s.a%s.g%s" % (pre_version, revno, sha)
else:
- branch_name = os.getenv('BRANCHNAME',
- os.getenv('GERRIT_REFNAME',
- get_git_branchname()))
- version_suffix = _get_git_next_version_suffix(branch_name)
- version = "%s~%s" % (base_version, version_suffix)
- write_versioninfo(projectname, version)
- return version
- else:
- version = read_versioninfo(projectname)
- return version
+ return _run_shell_command(
+ "git describe --always").replace('-', '.')
+ return None
-def get_post_version(projectname):
- """Return a version which is equal to the tag that's on the current
- revision if there is one, or tag plus number of additional revisions
- if the current revision has no tag."""
+def get_version_from_pkg_info(package_name):
+ """Get the version from PKG-INFO file if we can."""
+ try:
+ pkg_info_file = open('PKG-INFO', 'r')
+ except (IOError, OSError):
+ return None
+ try:
+ pkg_info = email.message_from_file(pkg_info_file)
+ except email.MessageError:
+ return None
+ # Check to make sure we're in our own dir
+ if pkg_info.get('Name', None) != package_name:
+ return None
+ return pkg_info.get('Version', None)
- if os.path.isdir('.git'):
- version = _get_git_post_version()
- write_versioninfo(projectname, version)
+
+def get_version(package_name, pre_version=None):
+ """Get the version of the project. First, try getting it from PKG-INFO, if
+ it exists. If it does, that means we're in a distribution tarball or that
+ install has happened. Otherwise, if there is no PKG-INFO file, pull the
+ version from git.
+
+ We do not support setup.py version sanity in git archive tarballs, nor do
+ we support packagers directly sucking our git repo into theirs. We expect
+ that a source tarball be made from our git repo - or that if someone wants
+ to make a source tarball from a fork of our repo with additional tags in it
+ that they understand and desire the results of doing that.
+ """
+ version = os.environ.get("OSLO_PACKAGE_VERSION", None)
+ if version:
+ return version
+ version = get_version_from_pkg_info(package_name)
+ if version:
+ return version
+ version = get_version_from_git(pre_version)
+ if version:
return version
- return read_versioninfo(projectname)
+ raise Exception("Versioning for this project requires either an sdist"
+ " tarball, or access to an upstream git repository.")
diff --git a/ceilometer/openstack/common/threadgroup.py b/ceilometer/openstack/common/threadgroup.py
index ae7bb0a8..801e852e 100644
--- a/ceilometer/openstack/common/threadgroup.py
+++ b/ceilometer/openstack/common/threadgroup.py
@@ -18,7 +18,6 @@ from eventlet import greenlet
from eventlet import greenpool
from eventlet import greenthread
-from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import log as logging
from ceilometer.openstack.common import loopingcall
@@ -27,22 +26,19 @@ LOG = logging.getLogger(__name__)
def _thread_done(gt, *args, **kwargs):
- '''
- Callback function to be passed to GreenThread.link() when we spawn()
- Calls the ThreadGroup to notify if.
- '''
+ """ Callback function to be passed to GreenThread.link() when we spawn()
+ Calls the :class:`ThreadGroup` to notify if.
+
+ """
kwargs['group'].thread_done(kwargs['thread'])
class Thread(object):
+ """ Wrapper around a greenthread, that holds a reference to the
+ :class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when
+ it has done so it can be removed from the threads list.
"""
- Wrapper around a greenthread, that holds a reference to
- the ThreadGroup. The Thread will notify the ThreadGroup
- when it has done so it can be removed from the threads
- list.
- """
- def __init__(self, name, thread, group):
- self.name = name
+ def __init__(self, thread, group):
self.thread = thread
self.thread.link(_thread_done, group=group, thread=self)
@@ -54,14 +50,13 @@ class Thread(object):
class ThreadGroup(object):
- """
- The point of this class is to:
- - keep track of timers and greenthreads (making it easier to stop them
+ """ The point of the ThreadGroup classis to:
+
+ * keep track of timers and greenthreads (making it easier to stop them
when need be).
- - provide an easy API to add timers.
+ * provide an easy API to add timers.
"""
- def __init__(self, name, thread_pool_size=10):
- self.name = name
+ def __init__(self, thread_pool_size=10):
self.pool = greenpool.GreenPool(thread_pool_size)
self.threads = []
self.timers = []
@@ -75,7 +70,7 @@ class ThreadGroup(object):
def add_thread(self, callback, *args, **kwargs):
gt = self.pool.spawn(callback, *args, **kwargs)
- th = Thread(callback.__name__, gt, self)
+ th = Thread(gt, self)
self.threads.append(th)
def thread_done(self, thread):
diff --git a/ceilometer/openstack/common/timeutils.py b/ceilometer/openstack/common/timeutils.py
index 86004391..0f346087 100644
--- a/ceilometer/openstack/common/timeutils.py
+++ b/ceilometer/openstack/common/timeutils.py
@@ -71,11 +71,15 @@ def normalize_time(timestamp):
def is_older_than(before, seconds):
"""Return True if before is older than seconds."""
+ if isinstance(before, basestring):
+ before = parse_strtime(before).replace(tzinfo=None)
return utcnow() - before > datetime.timedelta(seconds=seconds)
def is_newer_than(after, seconds):
"""Return True if after is newer than seconds."""
+ if isinstance(after, basestring):
+ after = parse_strtime(after).replace(tzinfo=None)
return after - utcnow() > datetime.timedelta(seconds=seconds)
@@ -87,7 +91,10 @@ def utcnow_ts():
def utcnow():
"""Overridable version of utils.utcnow."""
if utcnow.override_time:
- return utcnow.override_time
+ try:
+ return utcnow.override_time.pop(0)
+ except AttributeError:
+ return utcnow.override_time
return datetime.datetime.utcnow()
@@ -95,14 +102,21 @@ utcnow.override_time = None
def set_time_override(override_time=datetime.datetime.utcnow()):
- """Override utils.utcnow to return a constant time."""
+ """
+ Override utils.utcnow to return a constant time or a list thereof,
+ one at a time.
+ """
utcnow.override_time = override_time
def advance_time_delta(timedelta):
"""Advance overridden time using a datetime.timedelta."""
assert(not utcnow.override_time is None)
- utcnow.override_time += timedelta
+ try:
+ for dt in utcnow.override_time:
+ dt += timedelta
+ except TypeError:
+ utcnow.override_time += timedelta
def advance_time_seconds(seconds):
@@ -135,3 +149,16 @@ def unmarshall_time(tyme):
minute=tyme['minute'],
second=tyme['second'],
microsecond=tyme['microsecond'])
+
+
+def delta_seconds(before, after):
+ """
+ Compute the difference in seconds between two date, time, or
+ datetime objects (as a float, to microsecond resolution).
+ """
+ delta = after - before
+ try:
+ return delta.total_seconds()
+ except AttributeError:
+ return ((delta.days * 24 * 3600) + delta.seconds +
+ float(delta.microseconds) / (10 ** 6))
diff --git a/ceilometer/openstack/common/version.py b/ceilometer/openstack/common/version.py
index a19e4226..3653ad0d 100644
--- a/ceilometer/openstack/common/version.py
+++ b/ceilometer/openstack/common/version.py
@@ -1,6 +1,6 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack LLC
+# Copyright 2012-2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@@ -15,134 +15,65 @@
# under the License.
"""
-Utilities for consuming the auto-generated versioninfo files.
+Utilities for consuming the version from pkg_resources.
"""
-import datetime
import pkg_resources
-import setup
-
-
-class _deferred_version_string(object):
- """Internal helper class which provides delayed version calculation."""
- def __init__(self, version_info, prefix):
- self.version_info = version_info
- self.prefix = prefix
-
- def __str__(self):
- return "%s%s" % (self.prefix, self.version_info.version_string())
-
- def __repr__(self):
- return "%s%s" % (self.prefix, self.version_info.version_string())
-
class VersionInfo(object):
- def __init__(self, package, python_package=None, pre_version=None):
+ def __init__(self, package):
"""Object that understands versioning for a package
- :param package: name of the top level python namespace. For glance,
- this would be "glance" for python-glanceclient, it
- would be "glanceclient"
- :param python_package: optional name of the project name. For
- glance this can be left unset. For
- python-glanceclient, this would be
- "python-glanceclient"
- :param pre_version: optional version that the project is working to
+ :param package: name of the python package, such as glance, or
+ python-glanceclient
"""
self.package = package
- if python_package is None:
- self.python_package = package
- else:
- self.python_package = python_package
- self.pre_version = pre_version
+ self.release = None
self.version = None
+ self._cached_version = None
- def _generate_version(self):
- """Defer to the openstack.common.setup routines for making a
- version from git."""
- if self.pre_version is None:
- return setup.get_post_version(self.python_package)
- else:
- return setup.get_pre_version(self.python_package, self.pre_version)
+ def _get_version_from_pkg_resources(self):
+ """Get the version of the package from the pkg_resources record
+ associated with the package."""
+ requirement = pkg_resources.Requirement.parse(self.package)
+ provider = pkg_resources.get_provider(requirement)
+ return provider.version
- def _newer_version(self, pending_version):
- """Check to see if we're working with a stale version or not.
- We expect a version string that either looks like:
- 2012.2~f3~20120708.10.4426392
- which is an unreleased version of a pre-version, or:
- 0.1.1.4.gcc9e28a
- which is an unreleased version of a post-version, or:
- 0.1.1
- Which is a release and which should match tag.
- For now, if we have a date-embedded version, check to see if it's
- old, and if so re-generate. Otherwise, just deal with it.
- """
- try:
- version_date = int(self.version.split("~")[-1].split('.')[0])
- if version_date < int(datetime.date.today().strftime('%Y%m%d')):
- return self._generate_version()
- else:
- return pending_version
- except Exception:
- return pending_version
-
- def version_string_with_vcs(self, always=False):
+ def release_string(self):
"""Return the full version of the package including suffixes indicating
VCS status.
-
- For instance, if we are working towards the 2012.2 release,
- canonical_version_string should return 2012.2 if this is a final
- release, or else something like 2012.2~f1~20120705.20 if it's not.
-
- :param always: if true, skip all version caching
"""
- if always:
- self.version = self._generate_version()
+ if self.release is None:
+ self.release = self._get_version_from_pkg_resources()
- if self.version is None:
+ return self.release
- requirement = pkg_resources.Requirement.parse(self.python_package)
- versioninfo = "%s/versioninfo" % self.package
- try:
- raw_version = pkg_resources.resource_string(requirement,
- versioninfo)
- self.version = self._newer_version(raw_version.strip())
- except (IOError, pkg_resources.DistributionNotFound):
- self.version = self._generate_version()
+ def version_string(self):
+ """Return the short version minus any alpha/beta tags."""
+ if self.version is None:
+ parts = []
+ for part in self.release_string().split('.'):
+ if part[0].isdigit():
+ parts.append(part)
+ else:
+ break
+ self.version = ".".join(parts)
return self.version
- def canonical_version_string(self, always=False):
- """Return the simple version of the package excluding any suffixes.
-
- For instance, if we are working towards the 2012.2 release,
- canonical_version_string should return 2012.2 in all cases.
-
- :param always: if true, skip all version caching
- """
- return self.version_string_with_vcs(always).split('~')[0]
-
- def version_string(self, always=False):
- """Return the base version of the package.
-
- For instance, if we are working towards the 2012.2 release,
- version_string should return 2012.2 if this is a final release, or
- 2012.2-dev if it is not.
-
- :param always: if true, skip all version caching
- """
- version_parts = self.version_string_with_vcs(always).split('~')
- if len(version_parts) == 1:
- return version_parts[0]
- else:
- return '%s-dev' % (version_parts[0],)
+ # Compatibility functions
+ canonical_version_string = version_string
+ version_string_with_vcs = release_string
- def deferred_version_string(self, prefix=""):
+ def cached_version_string(self, prefix=""):
"""Generate an object which will expand in a string context to
the results of version_string(). We do this so that don't
call into pkg_resources every time we start up a program when
passing version information into the CONF constructor, but
rather only do the calculation when and if a version is requested
"""
- return _deferred_version_string(self, prefix)
+ if not self._cached_version:
+ self._cached_version = "%s%s" % (prefix,
+ self.version_string())
+ return self._cached_version
diff --git a/ceilometer/policy.py b/ceilometer/policy.py
index e468be25..e2c32cdf 100644
--- a/ceilometer/policy.py
+++ b/ceilometer/policy.py
@@ -49,26 +49,18 @@ def init():
if not _POLICY_PATH:
raise cfg.ConfigFilesNotFoundError([cfg.CONF.policy_file])
utils.read_cached_file(_POLICY_PATH, _POLICY_CACHE,
- reload_func=_set_brain)
+ reload_func=_set_rules)
-def _set_brain(data):
+def _set_rules(data):
default_rule = cfg.CONF.policy_default_rule
- policy.set_brain(policy.Brain.load_json(data, default_rule))
+ policy.set_rules(policy.Rules.load_json(data, default_rule))
-def check_is_admin(roles, project_id, project_name):
+def check_is_admin(roles):
"""Whether or not roles contains 'admin' role according to policy setting.
"""
init()
- match_list = ('rule:context_is_admin',)
- target = {}
- credentials = {
- 'roles': roles,
- 'project_id': project_id,
- 'project_name': project_name,
- }
-
- return policy.enforce(match_list, target, credentials)
+ return policy.check('context_is_admin', {}, {'roles': roles})
diff --git a/ceilometer/publish.py b/ceilometer/publish.py
index 24aab9a6..2d52fa28 100644
--- a/ceilometer/publish.py
+++ b/ceilometer/publish.py
@@ -30,9 +30,6 @@ PUBLISH_OPTS = [
default='metering',
help='the topic ceilometer uses for metering messages',
),
- cfg.StrOpt('control_exchange',
- default='ceilometer',
- help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
]
diff --git a/ceilometer/service.py b/ceilometer/service.py
index 609faecd..d2975171 100644
--- a/ceilometer/service.py
+++ b/ceilometer/service.py
@@ -21,6 +21,7 @@ import os
import socket
from ceilometer.openstack.common import cfg
+from ceilometer.openstack.common import rpc
from ceilometer.openstack.common import context
from ceilometer.openstack.common import log
from ceilometer.openstack.common.rpc import service as rpc_service
@@ -77,5 +78,6 @@ def _sanitize_cmd_line(argv):
def prepare_service(argv=[]):
+ rpc.set_defaults(control_exchange='ceilometer')
cfg.CONF(argv[1:], project='ceilometer')
log.setup('ceilometer')
diff --git a/ceilometer/version.py b/ceilometer/version.py
index df88c050..40dc5d9b 100644
--- a/ceilometer/version.py
+++ b/ceilometer/version.py
@@ -22,5 +22,4 @@ from ceilometer.openstack.common import version as common_version
NEXT_VERSION = '2013.1'
-version_info = common_version.VersionInfo('ceilometer',
- pre_version=NEXT_VERSION)
+version_info = common_version.VersionInfo('ceilometer')
diff --git a/setup.py b/setup.py
index 21f8fa44..debc0c0a 100755
--- a/setup.py
+++ b/setup.py
@@ -22,13 +22,12 @@ import os
import setuptools
from ceilometer.openstack.common import setup as common_setup
-from ceilometer.version import version_info
+from ceilometer.version import NEXT_VERSION
requires = common_setup.parse_requirements(['tools/pip-requires'])
depend_links = common_setup.parse_dependency_links(['tools/pip-requires'])
url_base = 'http://tarballs.openstack.org/ceilometer/ceilometer-%s.tar.gz'
-version_string = version_info.canonical_version_string(always=True)
def directories(target_dir):
@@ -39,7 +38,7 @@ def directories(target_dir):
setuptools.setup(
name='ceilometer',
- version=version_string,
+ version=NEXT_VERSION,
description='cloud computing metering',
@@ -47,7 +46,7 @@ setuptools.setup(
author_email='ceilometer@lists.launchpad.net',
url='https://launchpad.net/ceilometer',
- download_url=url_base % version_string,
+ download_url=url_base % NEXT_VERSION,
classifiers=[
'Development Status :: 3 - Alpha',
diff --git a/tests/api/v2/test_statistics.py b/tests/api/v2/test_statistics.py
index fa440d9f..2be17091 100644
--- a/tests/api/v2/test_statistics.py
+++ b/tests/api/v2/test_statistics.py
@@ -35,17 +35,17 @@ class TestStatisticsDuration(unittest.TestCase):
# Create events relative to the range and pretend
# that the intervening events exist.
- self.early1 = datetime.datetime(2012, 8, 27, 7, 0)
+ self.early1 = datetime.datetime(2012, 8, 27, 7, 0)
self.early2 = datetime.datetime(2012, 8, 27, 17, 0)
self.start = datetime.datetime(2012, 8, 28, 0, 0)
- self.middle1 = datetime.datetime(2012, 8, 28, 8, 0)
+ self.middle1 = datetime.datetime(2012, 8, 28, 8, 0)
self.middle2 = datetime.datetime(2012, 8, 28, 18, 0)
self.end = datetime.datetime(2012, 8, 28, 23, 59)
- self.late1 = datetime.datetime(2012, 8, 29, 9, 0)
+ self.late1 = datetime.datetime(2012, 8, 29, 9, 0)
self.late2 = datetime.datetime(2012, 8, 29, 19, 0)
def test_nulls(self):
diff --git a/tests/test_bin.py b/tests/test_bin.py
index 19c60721..35a99c97 100644
--- a/tests/test_bin.py
+++ b/tests/test_bin.py
@@ -19,10 +19,21 @@
import subprocess
import unittest
+import tempfile
+import os
class BinDbsyncTestCase(unittest.TestCase):
+ def setUp(self):
+ self.tempfile = tempfile.mktemp()
+ with open(self.tempfile, 'w') as tmp:
+ tmp.write("[DEFAULT]\n")
+ tmp.write("database_connection=log://localhost\n")
+
def test_dbsync_run(self):
subp = subprocess.Popen(["../bin/ceilometer-dbsync",
- "--database_connection=log://localhost"])
+ "--config-file=%s" % self.tempfile])
self.assertEqual(subp.wait(), 0)
+
+ def tearDown(self):
+ os.unlink(self.tempfile)
diff --git a/tools/pip-requires b/tools/pip-requires
index 803bb217..7cb17f47 100644
--- a/tools/pip-requires
+++ b/tools/pip-requires
@@ -17,3 +17,4 @@ python-keystoneclient>=0.2,<0.3
python-swiftclient
lxml
requests<1.0
+extras
diff --git a/tox.ini b/tox.ini
index 571b83f8..c7d73202 100644
--- a/tox.ini
+++ b/tox.ini
@@ -21,7 +21,7 @@ commands = {toxinidir}/run_tests.sh --no-path-adjustment --with-coverage --cover
[testenv:pep8]
deps = pep8==1.3.3
-commands = pep8 --repeat --show-source ceilometer setup.py bin/ceilometer-agent-central bin/ceilometer-agent-compute bin/ceilometer-collector bin/ceilometer-api tests
+commands = pep8 --repeat --ignore=E125 --show-source ceilometer setup.py bin/ceilometer-agent-central bin/ceilometer-agent-compute bin/ceilometer-collector bin/ceilometer-api tests
[testenv:venv]
deps = -r{toxinidir}/tools/test-requires