diff options
author | Julien Danjou <julien@danjou.info> | 2013-01-08 16:13:06 +0100 |
---|---|---|
committer | Julien Danjou <julien@danjou.info> | 2013-01-22 11:09:24 +0100 |
commit | 4c5fc2204a0249cbea70cc20d287666ce5c72a57 (patch) | |
tree | 79735391e6927fd0a19999a3cb12e90b93ea5a49 | |
parent | 04e54c4f3eb79e197244b1a01b6bdcb8340af779 (diff) | |
download | ceilometer-4c5fc2204a0249cbea70cc20d287666ce5c72a57.tar.gz |
Update openstack.common
Change-Id: I952bc668ce10d05944eb0d2b06c8eff917c22af8
Signed-off-by: Julien Danjou <julien@danjou.info>
36 files changed, 1773 insertions, 902 deletions
diff --git a/ceilometer/api/acl.py b/ceilometer/api/acl.py index a912d7f1..d6edbe92 100644 --- a/ceilometer/api/acl.py +++ b/ceilometer/api/acl.py @@ -49,7 +49,5 @@ class AdminAuthHook(hooks.PecanHook): def before(self, state): headers = state.request.headers - if not policy.check_is_admin(headers.get('X-Roles', "").split(","), - headers.get('X-Tenant-Id'), - headers.get('X-Tenant-Name')): + if not policy.check_is_admin(headers.get('X-Roles', "").split(",")): raise exc.HTTPUnauthorized() diff --git a/ceilometer/api/v1/acl.py b/ceilometer/api/v1/acl.py index e64737f4..3a76612f 100644 --- a/ceilometer/api/v1/acl.py +++ b/ceilometer/api/v1/acl.py @@ -22,7 +22,5 @@ from ceilometer import policy def get_limited_to_project(headers): """Return the tenant the request should be limited to.""" - if not policy.check_is_admin(headers.get('X-Roles', "").split(","), - headers.get('X-Tenant-Id'), - headers.get('X-Tenant-Name')): + if not policy.check_is_admin(headers.get('X-Roles', "").split(",")): return headers.get('X-Tenant-Id') diff --git a/ceilometer/openstack/common/cfg.py b/ceilometer/openstack/common/cfg.py index b024da8c..1beb2988 100644 --- a/ceilometer/openstack/common/cfg.py +++ b/ceilometer/openstack/common/cfg.py @@ -205,27 +205,11 @@ Option values may reference other values using PEP 292 string substitution:: Note that interpolation can be avoided by using '$$'. -For command line utilities that dispatch to other command line utilities, the -disable_interspersed_args() method is available. If this this method is called, -then parsing e.g.:: - - script --verbose cmd --debug /tmp/mything - -will no longer return:: - - ['cmd', '/tmp/mything'] - -as the leftover arguments, but will instead return:: - - ['cmd', '--debug', '/tmp/mything'] - -i.e. argument parsing is stopped at the first non-option argument. - Options may be declared as required so that an error is raised if the user does not supply a value for the option. Options may be declared as secret so that their values are not leaked into -log files: +log files:: opts = [ cfg.StrOpt('s3_store_access_key', secret=True), @@ -233,29 +217,53 @@ log files: ... ] -This module also contains a global instance of the CommonConfigOpts class -in order to support a common usage pattern in OpenStack: +This module also contains a global instance of the ConfigOpts class +in order to support a common usage pattern in OpenStack:: - from ceilometer.openstack.common import cfg + from ceilometer.openstack.common import cfg - opts = [ - cfg.StrOpt('bind_host', default='0.0.0.0'), - cfg.IntOpt('bind_port', default=9292), - ] + opts = [ + cfg.StrOpt('bind_host', default='0.0.0.0'), + cfg.IntOpt('bind_port', default=9292), + ] + + CONF = cfg.CONF + CONF.register_opts(opts) + + def start(server, app): + server.start(app, CONF.bind_port, CONF.bind_host) - CONF = cfg.CONF - CONF.register_opts(opts) +Positional command line arguments are supported via a 'positional' Opt +constructor argument:: - def start(server, app): - server.start(app, CONF.bind_port, CONF.bind_host) + >>> conf = ConfigOpts() + >>> conf.register_cli_opt(MultiStrOpt('bar', positional=True)) + True + >>> conf(['a', 'b']) + >>> conf.bar + ['a', 'b'] + +It is also possible to use argparse "sub-parsers" to parse additional +command line arguments using the SubCommandOpt class: + + >>> def add_parsers(subparsers): + ... list_action = subparsers.add_parser('list') + ... list_action.add_argument('id') + ... + >>> conf = ConfigOpts() + >>> conf.register_cli_opt(SubCommandOpt('action', handler=add_parsers)) + True + >>> conf(args=['list', '10']) + >>> conf.action.name, conf.action.id + ('list', '10') """ +import argparse import collections import copy import functools import glob -import optparse import os import string import sys @@ -474,6 +482,13 @@ def _is_opt_registered(opts, opt): return False +def set_defaults(opts, **kwargs): + for opt in opts: + if opt.dest in kwargs: + opt.default = kwargs[opt.dest] + break + + class Opt(object): """Base class for all configuration options. @@ -489,6 +504,8 @@ class Opt(object): a single character CLI option name default: the default value of the option + positional: + True if the option is a positional CLI argument metavar: the name shown as the argument to a CLI option in --help output help: @@ -497,8 +514,8 @@ class Opt(object): multi = False def __init__(self, name, dest=None, short=None, default=None, - metavar=None, help=None, secret=False, required=False, - deprecated_name=None): + positional=False, metavar=None, help=None, + secret=False, required=False, deprecated_name=None): """Construct an Opt object. The only required parameter is the option's name. However, it is @@ -508,6 +525,7 @@ class Opt(object): :param dest: the name of the corresponding ConfigOpts property :param short: a single character CLI option name :param default: the default value of the option + :param positional: True if the option is a positional CLI argument :param metavar: the option argument to show in --help :param help: an explanation of how the option is used :param secret: true iff the value should be obfuscated in log output @@ -521,6 +539,7 @@ class Opt(object): self.dest = dest self.short = short self.default = default + self.positional = positional self.metavar = metavar self.help = help self.secret = secret @@ -561,64 +580,73 @@ class Opt(object): :param parser: the CLI option parser :param group: an optional OptGroup object """ - container = self._get_optparse_container(parser, group) - kwargs = self._get_optparse_kwargs(group) - prefix = self._get_optparse_prefix('', group) - self._add_to_optparse(container, self.name, self.short, kwargs, prefix, - self.deprecated_name) + container = self._get_argparse_container(parser, group) + kwargs = self._get_argparse_kwargs(group) + prefix = self._get_argparse_prefix('', group) + self._add_to_argparse(container, self.name, self.short, kwargs, prefix, + self.positional, self.deprecated_name) - def _add_to_optparse(self, container, name, short, kwargs, prefix='', - deprecated_name=None): - """Add an option to an optparse parser or group. + def _add_to_argparse(self, container, name, short, kwargs, prefix='', + positional=False, deprecated_name=None): + """Add an option to an argparse parser or group. - :param container: an optparse.OptionContainer object + :param container: an argparse._ArgumentGroup object :param name: the opt name :param short: the short opt name - :param kwargs: the keyword arguments for add_option() + :param kwargs: the keyword arguments for add_argument() :param prefix: an optional prefix to prepend to the opt name + :param position: whether the optional is a positional CLI argument :raises: DuplicateOptError if a naming confict is detected """ - args = ['--' + prefix + name] + def hyphen(arg): + return arg if not positional else '' + + args = [hyphen('--') + prefix + name] if short: - args += ['-' + short] + args.append(hyphen('-') + short) if deprecated_name: - args += ['--' + prefix + deprecated_name] - for a in args: - if container.has_option(a): - raise DuplicateOptError(a) - container.add_option(*args, **kwargs) + args.append(hyphen('--') + prefix + deprecated_name) + + try: + container.add_argument(*args, **kwargs) + except argparse.ArgumentError as e: + raise DuplicateOptError(e) - def _get_optparse_container(self, parser, group): - """Returns an optparse.OptionContainer. + def _get_argparse_container(self, parser, group): + """Returns an argparse._ArgumentGroup. - :param parser: an optparse.OptionParser + :param parser: an argparse.ArgumentParser :param group: an (optional) OptGroup object - :returns: an optparse.OptionGroup if a group is given, else the parser + :returns: an argparse._ArgumentGroup if group is given, else parser """ if group is not None: - return group._get_optparse_group(parser) + return group._get_argparse_group(parser) else: return parser - def _get_optparse_kwargs(self, group, **kwargs): - """Build a dict of keyword arguments for optparse's add_option(). + def _get_argparse_kwargs(self, group, **kwargs): + """Build a dict of keyword arguments for argparse's add_argument(). Most opt types extend this method to customize the behaviour of the - options added to optparse. + options added to argparse. :param group: an optional group :param kwargs: optional keyword arguments to add to :returns: a dict of keyword arguments """ - dest = self.dest - if group is not None: - dest = group.name + '_' + dest - kwargs.update({'dest': dest, + if not self.positional: + dest = self.dest + if group is not None: + dest = group.name + '_' + dest + kwargs['dest'] = dest + else: + kwargs['nargs'] = '?' + kwargs.update({'default': None, 'metavar': self.metavar, 'help': self.help, }) return kwargs - def _get_optparse_prefix(self, prefix, group): + def _get_argparse_prefix(self, prefix, group): """Build a prefix for the CLI option name, if required. CLI options in a group are prefixed with the group's name in order @@ -656,6 +684,11 @@ class BoolOpt(Opt): _boolean_states = {'1': True, 'yes': True, 'true': True, 'on': True, '0': False, 'no': False, 'false': False, 'off': False} + def __init__(self, *args, **kwargs): + if 'positional' in kwargs: + raise ValueError('positional boolean args not supported') + super(BoolOpt, self).__init__(*args, **kwargs) + def _get_from_config_parser(self, cparser, section): """Retrieve the opt value as a boolean from ConfigParser.""" def convert_bool(v): @@ -671,21 +704,32 @@ class BoolOpt(Opt): def _add_to_cli(self, parser, group=None): """Extends the base class method to add the --nooptname option.""" super(BoolOpt, self)._add_to_cli(parser, group) - self._add_inverse_to_optparse(parser, group) + self._add_inverse_to_argparse(parser, group) - def _add_inverse_to_optparse(self, parser, group): + def _add_inverse_to_argparse(self, parser, group): """Add the --nooptname option to the option parser.""" - container = self._get_optparse_container(parser, group) - kwargs = self._get_optparse_kwargs(group, action='store_false') - prefix = self._get_optparse_prefix('no', group) + container = self._get_argparse_container(parser, group) + kwargs = self._get_argparse_kwargs(group, action='store_false') + prefix = self._get_argparse_prefix('no', group) kwargs["help"] = "The inverse of --" + self.name - self._add_to_optparse(container, self.name, None, kwargs, prefix, - self.deprecated_name) + self._add_to_argparse(container, self.name, None, kwargs, prefix, + self.positional, self.deprecated_name) + + def _get_argparse_kwargs(self, group, action='store_true', **kwargs): + """Extends the base argparse keyword dict for boolean options.""" + + kwargs = super(BoolOpt, self)._get_argparse_kwargs(group, **kwargs) + + # metavar has no effect for BoolOpt + if 'metavar' in kwargs: + del kwargs['metavar'] + + if action != 'store_true': + action = 'store_false' - def _get_optparse_kwargs(self, group, action='store_true', **kwargs): - """Extends the base optparse keyword dict for boolean options.""" - return super(BoolOpt, - self)._get_optparse_kwargs(group, action=action, **kwargs) + kwargs['action'] = action + + return kwargs class IntOpt(Opt): @@ -697,10 +741,10 @@ class IntOpt(Opt): return [int(v) for v in self._cparser_get_with_deprecated(cparser, section)] - def _get_optparse_kwargs(self, group, **kwargs): - """Extends the base optparse keyword dict for integer options.""" + def _get_argparse_kwargs(self, group, **kwargs): + """Extends the base argparse keyword dict for integer options.""" return super(IntOpt, - self)._get_optparse_kwargs(group, type='int', **kwargs) + self)._get_argparse_kwargs(group, type=int, **kwargs) class FloatOpt(Opt): @@ -712,10 +756,10 @@ class FloatOpt(Opt): return [float(v) for v in self._cparser_get_with_deprecated(cparser, section)] - def _get_optparse_kwargs(self, group, **kwargs): - """Extends the base optparse keyword dict for float options.""" - return super(FloatOpt, - self)._get_optparse_kwargs(group, type='float', **kwargs) + def _get_argparse_kwargs(self, group, **kwargs): + """Extends the base argparse keyword dict for float options.""" + return super(FloatOpt, self)._get_argparse_kwargs(group, + type=float, **kwargs) class ListOpt(Opt): @@ -725,23 +769,26 @@ class ListOpt(Opt): is a list containing these strings. """ + class _StoreListAction(argparse.Action): + """ + An argparse action for parsing an option value into a list. + """ + def __call__(self, parser, namespace, values, option_string=None): + if values is not None: + values = [a.strip() for a in values.split(',')] + setattr(namespace, self.dest, values) + def _get_from_config_parser(self, cparser, section): """Retrieve the opt value as a list from ConfigParser.""" - return [v.split(',') for v in + return [[a.strip() for a in v.split(',')] for v in self._cparser_get_with_deprecated(cparser, section)] - def _get_optparse_kwargs(self, group, **kwargs): - """Extends the base optparse keyword dict for list options.""" - return super(ListOpt, - self)._get_optparse_kwargs(group, - type='string', - action='callback', - callback=self._parse_list, - **kwargs) - - def _parse_list(self, option, opt, value, parser): - """An optparse callback for parsing an option value into a list.""" - setattr(parser.values, self.dest, value.split(',')) + def _get_argparse_kwargs(self, group, **kwargs): + """Extends the base argparse keyword dict for list options.""" + return Opt._get_argparse_kwargs(self, + group, + action=ListOpt._StoreListAction, + **kwargs) class MultiStrOpt(Opt): @@ -752,10 +799,14 @@ class MultiStrOpt(Opt): """ multi = True - def _get_optparse_kwargs(self, group, **kwargs): - """Extends the base optparse keyword dict for multi str options.""" - return super(MultiStrOpt, - self)._get_optparse_kwargs(group, action='append') + def _get_argparse_kwargs(self, group, **kwargs): + """Extends the base argparse keyword dict for multi str options.""" + kwargs = super(MultiStrOpt, self)._get_argparse_kwargs(group) + if not self.positional: + kwargs['action'] = 'append' + else: + kwargs['nargs'] = '*' + return kwargs def _cparser_get_with_deprecated(self, cparser, section): """If cannot find option as dest try deprecated_name alias.""" @@ -765,6 +816,57 @@ class MultiStrOpt(Opt): return cparser.get(section, [self.dest], multi=True) +class SubCommandOpt(Opt): + + """ + Sub-command options allow argparse sub-parsers to be used to parse + additional command line arguments. + + The handler argument to the SubCommandOpt contructor is a callable + which is supplied an argparse subparsers object. Use this handler + callable to add sub-parsers. + + The opt value is SubCommandAttr object with the name of the chosen + sub-parser stored in the 'name' attribute and the values of other + sub-parser arguments available as additional attributes. + """ + + def __init__(self, name, dest=None, handler=None, + title=None, description=None, help=None): + """Construct an sub-command parsing option. + + This behaves similarly to other Opt sub-classes but adds a + 'handler' argument. The handler is a callable which is supplied + an subparsers object when invoked. The add_parser() method on + this subparsers object can be used to register parsers for + sub-commands. + + :param name: the option's name + :param dest: the name of the corresponding ConfigOpts property + :param title: title of the sub-commands group in help output + :param description: description of the group in help output + :param help: a help string giving an overview of available sub-commands + """ + super(SubCommandOpt, self).__init__(name, dest=dest, help=help) + self.handler = handler + self.title = title + self.description = description + + def _add_to_cli(self, parser, group=None): + """Add argparse sub-parsers and invoke the handler method.""" + dest = self.dest + if group is not None: + dest = group.name + '_' + dest + + subparsers = parser.add_subparsers(dest=dest, + title=self.title, + description=self.description, + help=self.help) + + if not self.handler is None: + self.handler(subparsers) + + class OptGroup(object): """ @@ -800,19 +902,20 @@ class OptGroup(object): self.help = help self._opts = {} # dict of dicts of (opt:, override:, default:) - self._optparse_group = None + self._argparse_group = None - def _register_opt(self, opt): + def _register_opt(self, opt, cli=False): """Add an opt to this group. :param opt: an Opt object + :param cli: whether this is a CLI option :returns: False if previously registered, True otherwise :raises: DuplicateOptError if a naming conflict is detected """ if _is_opt_registered(self._opts, opt): return False - self._opts[opt.dest] = {'opt': opt} + self._opts[opt.dest] = {'opt': opt, 'cli': cli} return True @@ -824,16 +927,16 @@ class OptGroup(object): if opt.dest in self._opts: del self._opts[opt.dest] - def _get_optparse_group(self, parser): - """Build an optparse.OptionGroup for this group.""" - if self._optparse_group is None: - self._optparse_group = optparse.OptionGroup(parser, self.title, - self.help) - return self._optparse_group + def _get_argparse_group(self, parser): + if self._argparse_group is None: + """Build an argparse._ArgumentGroup for this group.""" + self._argparse_group = parser.add_argument_group(self.title, + self.help) + return self._argparse_group def _clear(self): """Clear this group's option parsing state.""" - self._optparse_group = None + self._argparse_group = None class ParseError(iniparser.ParseError): @@ -928,26 +1031,31 @@ class ConfigOpts(collections.Mapping): self._groups = {} self._args = None + self._oparser = None self._cparser = None self._cli_values = {} self.__cache = {} self._config_opts = [] - self._disable_interspersed_args = False - def _setup(self, project, prog, version, usage, default_config_files): - """Initialize a ConfigOpts object for option parsing.""" + def _pre_setup(self, project, prog, version, usage, default_config_files): + """Initialize a ConfigCliParser object for option parsing.""" + if prog is None: prog = os.path.basename(sys.argv[0]) if default_config_files is None: default_config_files = find_config_files(project, prog) - self._oparser = optparse.OptionParser(prog=prog, - version=version, - usage=usage) - if self._disable_interspersed_args: - self._oparser.disable_interspersed_args() + self._oparser = argparse.ArgumentParser(prog=prog, usage=usage) + self._oparser.add_argument('--version', + action='version', + version=version) + + return prog, default_config_files + + def _setup(self, project, prog, version, usage, default_config_files): + """Initialize a ConfigOpts object for option parsing.""" self._config_opts = [ MultiStrOpt('config-file', @@ -1017,18 +1125,23 @@ class ConfigOpts(collections.Mapping): :raises: SystemExit, ConfigFilesNotFoundError, ConfigFileParseError, RequiredOptError, DuplicateOptError """ + self.clear() + prog, default_config_files = self._pre_setup(project, + prog, + version, + usage, + default_config_files) + self._setup(project, prog, version, usage, default_config_files) - self._cli_values, leftovers = self._parse_cli_opts(args) + self._cli_values = self._parse_cli_opts(args) self._parse_config_files() self._check_required_opts() - return leftovers - def __getattr__(self, name): """Look up an option value and perform string substitution. @@ -1062,17 +1175,21 @@ class ConfigOpts(collections.Mapping): @__clear_cache def clear(self): - """Clear the state of the object to before it was called.""" + """Clear the state of the object to before it was called. + + Any subparsers added using the add_cli_subparsers() will also be + removed as a side-effect of this method. + """ self._args = None self._cli_values.clear() - self._oparser = None + self._oparser = argparse.ArgumentParser() self._cparser = None self.unregister_opts(self._config_opts) for group in self._groups.values(): group._clear() @__clear_cache - def register_opt(self, opt, group=None): + def register_opt(self, opt, group=None, cli=False): """Register an option schema. Registering an option schema makes any option value which is previously @@ -1080,17 +1197,19 @@ class ConfigOpts(collections.Mapping): as an attribute of this object. :param opt: an instance of an Opt sub-class + :param cli: whether this is a CLI option :param group: an optional OptGroup object or group name :return: False if the opt was already register, True otherwise :raises: DuplicateOptError """ if group is not None: - return self._get_group(group, autocreate=True)._register_opt(opt) + group = self._get_group(group, autocreate=True) + return group._register_opt(opt, cli) if _is_opt_registered(self._opts, opt): return False - self._opts[opt.dest] = {'opt': opt} + self._opts[opt.dest] = {'opt': opt, 'cli': cli} return True @@ -1116,7 +1235,7 @@ class ConfigOpts(collections.Mapping): if self._args is not None: raise ArgsAlreadyParsedError("cannot register CLI option") - return self.register_opt(opt, group, clear_cache=False) + return self.register_opt(opt, group, cli=True, clear_cache=False) @__clear_cache def register_cli_opts(self, opts, group=None): @@ -1243,10 +1362,11 @@ class ConfigOpts(collections.Mapping): for info in group._opts.values(): yield info, group - def _all_opts(self): - """A generator function for iteration opts.""" + def _all_cli_opts(self): + """A generator function for iterating CLI opts.""" for info, group in self._all_opt_infos(): - yield info['opt'], group + if info['cli']: + yield info['opt'], group def _unset_defaults_and_overrides(self): """Unset any default or override on all options.""" @@ -1254,31 +1374,6 @@ class ConfigOpts(collections.Mapping): info.pop('default', None) info.pop('override', None) - def disable_interspersed_args(self): - """Set parsing to stop on the first non-option. - - If this this method is called, then parsing e.g. - - script --verbose cmd --debug /tmp/mything - - will no longer return: - - ['cmd', '/tmp/mything'] - - as the leftover arguments, but will instead return: - - ['cmd', '--debug', '/tmp/mything'] - - i.e. argument parsing is stopped at the first non-option argument. - """ - self._disable_interspersed_args = True - - def enable_interspersed_args(self): - """Set parsing to not stop on the first non-option. - - This it the default behaviour.""" - self._disable_interspersed_args = False - def find_file(self, name): """Locate a file located alongside the config files. @@ -1377,6 +1472,9 @@ class ConfigOpts(collections.Mapping): info = self._get_opt_info(name, group) opt = info['opt'] + if isinstance(opt, SubCommandOpt): + return self.SubCommandAttr(self, group, opt.dest) + if 'override' in info: return info['override'] @@ -1401,6 +1499,10 @@ class ConfigOpts(collections.Mapping): if not opt.multi: return value + # argparse ignores default=None for nargs='*' + if opt.positional and not value: + value = opt.default + return value + values if values: @@ -1523,12 +1625,10 @@ class ConfigOpts(collections.Mapping): """ self._args = args - for opt, group in self._all_opts(): + for opt, group in self._all_cli_opts(): opt._add_to_cli(self._oparser, group) - values, leftovers = self._oparser.parse_args(args) - - return vars(values), leftovers + return vars(self._oparser.parse_args(args)) class GroupAttr(collections.Mapping): @@ -1543,12 +1643,12 @@ class ConfigOpts(collections.Mapping): :param conf: a ConfigOpts object :param group: an OptGroup object """ - self.conf = conf - self.group = group + self._conf = conf + self._group = group def __getattr__(self, name): """Look up an option value and perform template substitution.""" - return self.conf._get(name, self.group) + return self._conf._get(name, self._group) def __getitem__(self, key): """Look up an option value and perform string substitution.""" @@ -1556,16 +1656,50 @@ class ConfigOpts(collections.Mapping): def __contains__(self, key): """Return True if key is the name of a registered opt or group.""" - return key in self.group._opts + return key in self._group._opts def __iter__(self): """Iterate over all registered opt and group names.""" - for key in self.group._opts.keys(): + for key in self._group._opts.keys(): yield key def __len__(self): """Return the number of options and option groups.""" - return len(self.group._opts) + return len(self._group._opts) + + class SubCommandAttr(object): + + """ + A helper class representing the name and arguments of an argparse + sub-parser. + """ + + def __init__(self, conf, group, dest): + """Construct a SubCommandAttr object. + + :param conf: a ConfigOpts object + :param group: an OptGroup object + :param dest: the name of the sub-parser + """ + self._conf = conf + self._group = group + self._dest = dest + + def __getattr__(self, name): + """Look up a sub-parser name or argument value.""" + if name == 'name': + name = self._dest + if self._group is not None: + name = self._group.name + '_' + name + return self._conf._cli_values[name] + + if name in self._conf: + raise DuplicateOptError(name) + + try: + return self._conf._cli_values[name] + except KeyError: + raise NoSuchOptError(name) class StrSubWrapper(object): @@ -1594,60 +1728,4 @@ class ConfigOpts(collections.Mapping): return value -class CommonConfigOpts(ConfigOpts): - - DEFAULT_LOG_FORMAT = "%(asctime)s %(levelname)8s [%(name)s] %(message)s" - DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S" - - common_cli_opts = [ - BoolOpt('debug', - short='d', - default=False, - help='Print debugging output'), - BoolOpt('verbose', - short='v', - default=False, - help='Print more verbose output'), - ] - - logging_cli_opts = [ - StrOpt('log-config', - metavar='PATH', - help='If this option is specified, the logging configuration ' - 'file specified is used and overrides any other logging ' - 'options specified. Please see the Python logging module ' - 'documentation for details on logging configuration ' - 'files.'), - StrOpt('log-format', - default=DEFAULT_LOG_FORMAT, - metavar='FORMAT', - help='A logging.Formatter log message format string which may ' - 'use any of the available logging.LogRecord attributes. ' - 'Default: %default'), - StrOpt('log-date-format', - default=DEFAULT_LOG_DATE_FORMAT, - metavar='DATE_FORMAT', - help='Format string for %(asctime)s in log records. ' - 'Default: %default'), - StrOpt('log-file', - metavar='PATH', - help='(Optional) Name of log file to output to. ' - 'If not set, logging will go to stdout.'), - StrOpt('log-dir', - help='(Optional) The directory to keep log files in ' - '(will be prepended to --logfile)'), - BoolOpt('use-syslog', - default=False, - help='Use syslog for logging.'), - StrOpt('syslog-log-facility', - default='LOG_USER', - help='syslog facility to receive log lines') - ] - - def __init__(self): - super(CommonConfigOpts, self).__init__() - self.register_cli_opts(self.common_cli_opts) - self.register_cli_opts(self.logging_cli_opts) - - -CONF = CommonConfigOpts() +CONF = ConfigOpts() diff --git a/ceilometer/openstack/common/eventlet_backdoor.py b/ceilometer/openstack/common/eventlet_backdoor.py index bf4e0e03..58e00589 100644 --- a/ceilometer/openstack/common/eventlet_backdoor.py +++ b/ceilometer/openstack/common/eventlet_backdoor.py @@ -46,7 +46,7 @@ def _find_objects(t): def _print_greenthreads(): - for i, gt in enumerate(find_objects(greenlet.greenlet)): + for i, gt in enumerate(_find_objects(greenlet.greenlet)): print i, gt traceback.print_stack(gt.gr_frame) print diff --git a/ceilometer/openstack/common/excutils.py b/ceilometer/openstack/common/excutils.py index 5dd48301..414cc27e 100644 --- a/ceilometer/openstack/common/excutils.py +++ b/ceilometer/openstack/common/excutils.py @@ -24,6 +24,8 @@ import logging import sys import traceback +from ceilometer.openstack.common.gettextutils import _ + @contextlib.contextmanager def save_and_reraise_exception(): @@ -43,7 +45,7 @@ def save_and_reraise_exception(): try: yield except Exception: - logging.error('Original exception being dropped: %s' % - (traceback.format_exception(type_, value, tb))) + logging.error(_('Original exception being dropped: %s'), + traceback.format_exception(type_, value, tb)) raise raise type_, value, tb diff --git a/ceilometer/openstack/common/importutils.py b/ceilometer/openstack/common/importutils.py index f45372b4..9dec764f 100644 --- a/ceilometer/openstack/common/importutils.py +++ b/ceilometer/openstack/common/importutils.py @@ -29,7 +29,7 @@ def import_class(import_str): try: __import__(mod_str) return getattr(sys.modules[mod_str], class_str) - except (ValueError, AttributeError), exc: + except (ValueError, AttributeError): raise ImportError('Class %s cannot be found (%s)' % (class_str, traceback.format_exception(*sys.exc_info()))) @@ -57,3 +57,11 @@ def import_module(import_str): """Import a module.""" __import__(import_str) return sys.modules[import_str] + + +def try_import(import_str, default=None): + """Try to import a module and if it fails return default.""" + try: + return import_module(import_str) + except ImportError: + return default diff --git a/ceilometer/openstack/common/jsonutils.py b/ceilometer/openstack/common/jsonutils.py index dad84dfd..47ec79e8 100644 --- a/ceilometer/openstack/common/jsonutils.py +++ b/ceilometer/openstack/common/jsonutils.py @@ -120,7 +120,7 @@ def to_primitive(value, convert_instances=False, level=0): level=level + 1) else: return value - except TypeError, e: + except TypeError: # Class objects are tricky since they may define something like # __iter__ defined but it isn't callable as list(). return unicode(value) diff --git a/ceilometer/openstack/common/log.py b/ceilometer/openstack/common/log.py index ea8d2833..07219f77 100644 --- a/ceilometer/openstack/common/log.py +++ b/ceilometer/openstack/common/log.py @@ -47,21 +47,83 @@ from ceilometer.openstack.common import local from ceilometer.openstack.common import notifier +_DEFAULT_LOG_FORMAT = "%(asctime)s %(levelname)8s [%(name)s] %(message)s" +_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S" + +common_cli_opts = [ + cfg.BoolOpt('debug', + short='d', + default=False, + help='Print debugging output (set logging level to ' + 'DEBUG instead of default WARNING level).'), + cfg.BoolOpt('verbose', + short='v', + default=False, + help='Print more verbose output (set logging level to ' + 'INFO instead of default WARNING level).'), +] + +logging_cli_opts = [ + cfg.StrOpt('log-config', + metavar='PATH', + help='If this option is specified, the logging configuration ' + 'file specified is used and overrides any other logging ' + 'options specified. Please see the Python logging module ' + 'documentation for details on logging configuration ' + 'files.'), + cfg.StrOpt('log-format', + default=_DEFAULT_LOG_FORMAT, + metavar='FORMAT', + help='A logging.Formatter log message format string which may ' + 'use any of the available logging.LogRecord attributes. ' + 'Default: %(default)s'), + cfg.StrOpt('log-date-format', + default=_DEFAULT_LOG_DATE_FORMAT, + metavar='DATE_FORMAT', + help='Format string for %%(asctime)s in log records. ' + 'Default: %(default)s'), + cfg.StrOpt('log-file', + metavar='PATH', + deprecated_name='logfile', + help='(Optional) Name of log file to output to. ' + 'If not set, logging will go to stdout.'), + cfg.StrOpt('log-dir', + deprecated_name='logdir', + help='(Optional) The directory to keep log files in ' + '(will be prepended to --log-file)'), + cfg.BoolOpt('use-syslog', + default=False, + help='Use syslog for logging.'), + cfg.StrOpt('syslog-log-facility', + default='LOG_USER', + help='syslog facility to receive log lines') +] + +generic_log_opts = [ + cfg.BoolOpt('use_stderr', + default=True, + help='Log output to standard error'), + cfg.StrOpt('logfile_mode', + default='0644', + help='Default file mode used when creating log files'), +] + log_opts = [ cfg.StrOpt('logging_context_format_string', - default='%(asctime)s %(levelname)s %(name)s [%(request_id)s ' - '%(user)s %(tenant)s] %(instance)s' + default='%(asctime)s.%(msecs)03d %(levelname)s %(name)s ' + '[%(request_id)s %(user)s %(tenant)s] %(instance)s' '%(message)s', help='format string to use for log messages with context'), cfg.StrOpt('logging_default_format_string', - default='%(asctime)s %(process)d %(levelname)s %(name)s [-]' - ' %(instance)s%(message)s', + default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s ' + '%(name)s [-] %(instance)s%(message)s', help='format string to use for log messages without context'), cfg.StrOpt('logging_debug_format_suffix', default='%(funcName)s %(pathname)s:%(lineno)d', help='data to append to log format when level is DEBUG'), cfg.StrOpt('logging_exception_prefix', - default='%(asctime)s %(process)d TRACE %(name)s %(instance)s', + default='%(asctime)s.%(msecs)03d %(process)d TRACE %(name)s ' + '%(instance)s', help='prefix each line of exception output with this format'), cfg.ListOpt('default_log_levels', default=[ @@ -93,24 +155,9 @@ log_opts = [ 'format it like this'), ] - -generic_log_opts = [ - cfg.StrOpt('logdir', - default=None, - help='Log output to a per-service log file in named directory'), - cfg.StrOpt('logfile', - default=None, - help='Log output to a named file'), - cfg.BoolOpt('use_stderr', - default=True, - help='Log output to standard error'), - cfg.StrOpt('logfile_mode', - default='0644', - help='Default file mode used when creating log files'), -] - - CONF = cfg.CONF +CONF.register_cli_opts(common_cli_opts) +CONF.register_cli_opts(logging_cli_opts) CONF.register_opts(generic_log_opts) CONF.register_opts(log_opts) @@ -148,8 +195,8 @@ def _get_binary_name(): def _get_log_file_path(binary=None): - logfile = CONF.log_file or CONF.logfile - logdir = CONF.log_dir or CONF.logdir + logfile = CONF.log_file + logdir = CONF.log_dir if logfile and not logdir: return logfile @@ -174,7 +221,7 @@ class ContextAdapter(logging.LoggerAdapter): self.log(logging.AUDIT, msg, *args, **kwargs) def deprecated(self, msg, *args, **kwargs): - stdmsg = _("Deprecated Config: %s") % msg + stdmsg = _("Deprecated: %s") % msg if CONF.fatal_deprecations: self.critical(stdmsg, *args, **kwargs) raise DeprecatedConfig(msg=stdmsg) @@ -289,6 +336,12 @@ def setup(product_name): _setup_logging_from_conf(product_name) +def set_defaults(logging_context_format_string): + cfg.set_defaults(log_opts, + logging_context_format_string= + logging_context_format_string) + + def _find_facility_from_conf(): facility_names = logging.handlers.SysLogHandler.facility_names facility = getattr(logging.handlers.SysLogHandler, @@ -354,10 +407,12 @@ def _setup_logging_from_conf(product_name): datefmt=datefmt)) handler.setFormatter(LegacyFormatter(datefmt=datefmt)) - if CONF.verbose or CONF.debug: + if CONF.debug: log_root.setLevel(logging.DEBUG) - else: + elif CONF.verbose: log_root.setLevel(logging.INFO) + else: + log_root.setLevel(logging.WARNING) level = logging.NOTSET for pair in CONF.default_log_levels: diff --git a/ceilometer/openstack/common/loopingcall.py b/ceilometer/openstack/common/loopingcall.py index 2c81e6ca..0f07c838 100644 --- a/ceilometer/openstack/common/loopingcall.py +++ b/ceilometer/openstack/common/loopingcall.py @@ -24,6 +24,7 @@ from eventlet import greenthread from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import log as logging +from ceilometer.openstack.common import timeutils LOG = logging.getLogger(__name__) @@ -62,10 +63,16 @@ class LoopingCall(object): try: while self._running: + start = timeutils.utcnow() self.f(*self.args, **self.kw) + end = timeutils.utcnow() if not self._running: break - greenthread.sleep(interval) + delay = interval - timeutils.delta_seconds(start, end) + if delay <= 0: + LOG.warn(_('task run outlasted interval by %s sec') % + -delay) + greenthread.sleep(delay if delay > 0 else 0) except LoopingCallDone, e: self.stop() done.send(e.retvalue) diff --git a/ceilometer/openstack/common/notifier/api.py b/ceilometer/openstack/common/notifier/api.py index 92f654cd..6e91c0cc 100644 --- a/ceilometer/openstack/common/notifier/api.py +++ b/ceilometer/openstack/common/notifier/api.py @@ -137,10 +137,11 @@ def notify(context, publisher_id, event_type, priority, payload): for driver in _get_drivers(): try: driver.notify(context, msg) - except Exception, e: + except Exception as e: LOG.exception(_("Problem '%(e)s' attempting to " "send to notification system. " - "Payload=%(payload)s") % locals()) + "Payload=%(payload)s") + % dict(e=e, payload=payload)) _drivers = None @@ -166,7 +167,7 @@ def add_driver(notification_driver): try: driver = importutils.import_module(notification_driver) _drivers[notification_driver] = driver - except ImportError as e: + except ImportError: LOG.exception(_("Failed to load notifier %s. " "These notifications will not be sent.") % notification_driver) diff --git a/ceilometer/openstack/common/notifier/rpc_notifier.py b/ceilometer/openstack/common/notifier/rpc_notifier.py index e51caba0..c5f554ee 100644 --- a/ceilometer/openstack/common/notifier/rpc_notifier.py +++ b/ceilometer/openstack/common/notifier/rpc_notifier.py @@ -41,6 +41,6 @@ def notify(context, message): topic = '%s.%s' % (topic, priority) try: rpc.notify(context, topic, message) - except Exception, e: + except Exception: LOG.exception(_("Could not send notification to %(topic)s. " "Payload=%(message)s"), locals()) diff --git a/ceilometer/openstack/common/notifier/rpc_notifier2.py b/ceilometer/openstack/common/notifier/rpc_notifier2.py new file mode 100644 index 00000000..833e0bbe --- /dev/null +++ b/ceilometer/openstack/common/notifier/rpc_notifier2.py @@ -0,0 +1,51 @@ +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +'''messaging based notification driver, with message envelopes''' + +from ceilometer.openstack.common import cfg +from ceilometer.openstack.common import context as req_context +from ceilometer.openstack.common.gettextutils import _ +from ceilometer.openstack.common import log as logging +from ceilometer.openstack.common import rpc + +LOG = logging.getLogger(__name__) + +notification_topic_opt = cfg.ListOpt( + 'topics', default=['notifications', ], + help='AMQP topic(s) used for openstack notifications') + +opt_group = cfg.OptGroup(name='rpc_notifier2', + title='Options for rpc_notifier2') + +CONF = cfg.CONF +CONF.register_group(opt_group) +CONF.register_opt(notification_topic_opt, opt_group) + + +def notify(context, message): + """Sends a notification via RPC""" + if not context: + context = req_context.get_admin_context() + priority = message.get('priority', + CONF.default_notification_level) + priority = priority.lower() + for topic in CONF.rpc_notifier2.topics: + topic = '%s.%s' % (topic, priority) + try: + rpc.notify(context, topic, message, envelope=True) + except Exception: + LOG.exception(_("Could not send notification to %(topic)s. " + "Payload=%(message)s"), locals()) diff --git a/ceilometer/openstack/common/policy.py b/ceilometer/openstack/common/policy.py index 831dac25..3c482548 100644 --- a/ceilometer/openstack/common/policy.py +++ b/ceilometer/openstack/common/policy.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright (c) 2011 OpenStack, LLC. +# Copyright (c) 2012 OpenStack, LLC. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -15,10 +15,52 @@ # License for the specific language governing permissions and limitations # under the License. -"""Common Policy Engine Implementation""" +""" +Common Policy Engine Implementation +Policies can be expressed in one of two forms: A list of lists, or a +string written in the new policy language. + +In the list-of-lists representation, each check inside the innermost +list is combined as with an "and" conjunction--for that check to pass, +all the specified checks must pass. These innermost lists are then +combined as with an "or" conjunction. This is the original way of +expressing policies, but there now exists a new way: the policy +language. + +In the policy language, each check is specified the same way as in the +list-of-lists representation: a simple "a:b" pair that is matched to +the correct code to perform that check. However, conjunction +operators are available, allowing for more expressiveness in crafting +policies. + +As an example, take the following rule, expressed in the list-of-lists +representation:: + + [["role:admin"], ["project_id:%(project_id)s", "role:projectadmin"]] + +In the policy language, this becomes:: + + role:admin or (project_id:%(project_id)s and role:projectadmin) + +The policy language also has the "not" operator, allowing a richer +policy rule:: + + project_id:%(project_id)s and not role:dunce + +Finally, two special policy checks should be mentioned; the policy +check "@" will always accept an access, and the policy check "!" will +always reject an access. (Note that if a rule is either the empty +list ("[]") or the empty string, this is equivalent to the "@" policy +check.) Of these, the "!" policy check is probably the most useful, +as it allows particular rules to be explicitly disabled. +""" + +import abc import logging +import re import urllib + import urllib2 from ceilometer.openstack.common.gettextutils import _ @@ -28,218 +70,650 @@ from ceilometer.openstack.common import jsonutils LOG = logging.getLogger(__name__) -_BRAIN = None +_rules = None +_checks = {} -def set_brain(brain): - """Set the brain used by enforce(). +class Rules(dict): + """ + A store for rules. Handles the default_rule setting directly. + """ - Defaults use Brain() if not set. + @classmethod + def load_json(cls, data, default_rule=None): + """ + Allow loading of JSON rule data. + """ - """ - global _BRAIN - _BRAIN = brain + # Suck in the JSON data and parse the rules + rules = dict((k, parse_rule(v)) for k, v in + jsonutils.loads(data).items()) + return cls(rules, default_rule) -def reset(): - """Clear the brain used by enforce().""" - global _BRAIN - _BRAIN = None + def __init__(self, rules=None, default_rule=None): + """Initialize the Rules store.""" + super(Rules, self).__init__(rules or {}) + self.default_rule = default_rule -def enforce(match_list, target_dict, credentials_dict, exc=None, - *args, **kwargs): - """Enforces authorization of some rules against credentials. + def __missing__(self, key): + """Implements the default rule handling.""" - :param match_list: nested tuples of data to match against + # If the default rule isn't actually defined, do something + # reasonably intelligent + if not self.default_rule or self.default_rule not in self: + raise KeyError(key) - The basic brain supports three types of match lists: + return self[self.default_rule] - 1) rules + def __str__(self): + """Dumps a string representation of the rules.""" - looks like: ``('rule:compute:get_instance',)`` + # Start by building the canonical strings for the rules + out_rules = {} + for key, value in self.items(): + # Use empty string for singleton TrueCheck instances + if isinstance(value, TrueCheck): + out_rules[key] = '' + else: + out_rules[key] = str(value) - Retrieves the named rule from the rules dict and recursively - checks against the contents of the rule. + # Dump a pretty-printed JSON representation + return jsonutils.dumps(out_rules, indent=4) - 2) roles - looks like: ``('role:compute:admin',)`` +# Really have to figure out a way to deprecate this +def set_rules(rules): + """Set the rules in use for policy checks.""" - Matches if the specified role is in credentials_dict['roles']. + global _rules - 3) generic + _rules = rules - looks like: ``('tenant_id:%(tenant_id)s',)`` - Substitutes values from the target dict into the match using - the % operator and matches them against the creds dict. +# Ditto +def reset(): + """Clear the rules used for policy checks.""" - Combining rules: + global _rules - The brain returns True if any of the outer tuple of rules - match and also True if all of the inner tuples match. You - can use this to perform simple boolean logic. For - example, the following rule would return True if the creds - contain the role 'admin' OR the if the tenant_id matches - the target dict AND the the creds contains the role - 'compute_sysadmin': + _rules = None - :: - { - "rule:combined": ( - 'role:admin', - ('tenant_id:%(tenant_id)s', 'role:compute_sysadmin') - ) - } +def check(rule, target, creds, exc=None, *args, **kwargs): + """ + Checks authorization of a rule against the target and credentials. + + :param rule: The rule to evaluate. + :param target: As much information about the object being operated + on as possible, as a dictionary. + :param creds: As much information about the user performing the + action as possible, as a dictionary. + :param exc: Class of the exception to raise if the check fails. + Any remaining arguments passed to check() (both + positional and keyword arguments) will be passed to + the exception class. If exc is not provided, returns + False. + + :return: Returns False if the policy does not allow the action and + exc is not provided; otherwise, returns a value that + evaluates to True. Note: for rules using the "case" + expression, this True value will be the specified string + from the expression. + """ - Note that rule and role are reserved words in the credentials match, so - you can't match against properties with those names. Custom brains may - also add new reserved words. For example, the HttpBrain adds http as a - reserved word. + # Allow the rule to be a Check tree + if isinstance(rule, BaseCheck): + result = rule(target, creds) + elif not _rules: + # No rules to reference means we're going to fail closed + result = False + else: + try: + # Evaluate the rule + result = _rules[rule](target, creds) + except KeyError: + # If the rule doesn't exist, fail closed + result = False - :param target_dict: dict of object properties + # If it is False, raise the exception if requested + if exc and result is False: + raise exc(*args, **kwargs) - Target dicts contain as much information as we can about the object being - operated on. + return result - :param credentials_dict: dict of actor properties - Credentials dicts contain as much information as we can about the user - performing the action. +class BaseCheck(object): + """ + Abstract base class for Check classes. + """ - :param exc: exception to raise + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def __str__(self): + """ + Retrieve a string representation of the Check tree rooted at + this node. + """ + + pass + + @abc.abstractmethod + def __call__(self, target, cred): + """ + Perform the check. Returns False to reject the access or a + true value (not necessary True) to accept the access. + """ + + pass - Class of the exception to raise if the check fails. Any remaining - arguments passed to enforce() (both positional and keyword arguments) - will be passed to the exception class. If exc is not provided, returns - False. - :return: True if the policy allows the action - :return: False if the policy does not allow the action and exc is not set +class FalseCheck(BaseCheck): """ - global _BRAIN - if not _BRAIN: - _BRAIN = Brain() - if not _BRAIN.check(match_list, target_dict, credentials_dict): - if exc: - raise exc(*args, **kwargs) + A policy check that always returns False (disallow). + """ + + def __str__(self): + """Return a string representation of this check.""" + + return "!" + + def __call__(self, target, cred): + """Check the policy.""" + return False - return True -class Brain(object): - """Implements policy checking.""" +class TrueCheck(BaseCheck): + """ + A policy check that always returns True (allow). + """ - _checks = {} + def __str__(self): + """Return a string representation of this check.""" - @classmethod - def _register(cls, name, func): - cls._checks[name] = func + return "@" - @classmethod - def load_json(cls, data, default_rule=None): - """Init a brain using json instead of a rules dictionary.""" - rules_dict = jsonutils.loads(data) - return cls(rules=rules_dict, default_rule=default_rule) + def __call__(self, target, cred): + """Check the policy.""" - def __init__(self, rules=None, default_rule=None): - if self.__class__ != Brain: - LOG.warning(_("Inheritance-based rules are deprecated; use " - "the default brain instead of %s.") % - self.__class__.__name__) + return True - self.rules = rules or {} - self.default_rule = default_rule - def add_rule(self, key, match): - self.rules[key] = match +class Check(BaseCheck): + """ + A base class to allow for user-defined policy checks. + """ - def _check(self, match, target_dict, cred_dict): - try: - match_kind, match_value = match.split(':', 1) - except Exception: - LOG.exception(_("Failed to understand rule %(match)r") % locals()) - # If the rule is invalid, fail closed - return False + def __init__(self, kind, match): + """ + :param kind: The kind of the check, i.e., the field before the + ':'. + :param match: The match of the check, i.e., the field after + the ':'. + """ - func = None - try: - old_func = getattr(self, '_check_%s' % match_kind) - except AttributeError: - func = self._checks.get(match_kind, self._checks.get(None, None)) - else: - LOG.warning(_("Inheritance-based rules are deprecated; update " - "_check_%s") % match_kind) - func = lambda brain, kind, value, target, cred: old_func(value, - target, - cred) - - if not func: - LOG.error(_("No handler for matches of kind %s") % match_kind) - # Fail closed - return False + self.kind = kind + self.match = match + + def __str__(self): + """Return a string representation of this check.""" + + return "%s:%s" % (self.kind, self.match) + + +class NotCheck(BaseCheck): + """ + A policy check that inverts the result of another policy check. + Implements the "not" operator. + """ + + def __init__(self, rule): + """ + Initialize the 'not' check. + + :param rule: The rule to negate. Must be a Check. + """ + + self.rule = rule + + def __str__(self): + """Return a string representation of this check.""" + + return "not %s" % self.rule - return func(self, match_kind, match_value, target_dict, cred_dict) + def __call__(self, target, cred): + """ + Check the policy. Returns the logical inverse of the wrapped + check. + """ + + return not self.rule(target, cred) + + +class AndCheck(BaseCheck): + """ + A policy check that requires that a list of other checks all + return True. Implements the "and" operator. + """ - def check(self, match_list, target_dict, cred_dict): - """Checks authorization of some rules against credentials. + def __init__(self, rules): + """ + Initialize the 'and' check. + + :param rules: A list of rules that will be tested. + """ - Detailed description of the check with examples in policy.enforce(). + self.rules = rules - :param match_list: nested tuples of data to match against - :param target_dict: dict of object properties - :param credentials_dict: dict of actor properties + def __str__(self): + """Return a string representation of this check.""" - :returns: True if the check passes + return "(%s)" % ' and '.join(str(r) for r in self.rules) + def __call__(self, target, cred): + """ + Check the policy. Requires that all rules accept in order to + return True. """ - if not match_list: - return True - for and_list in match_list: - if isinstance(and_list, basestring): - and_list = (and_list,) - if all([self._check(item, target_dict, cred_dict) - for item in and_list]): + + for rule in self.rules: + if not rule(target, cred): + return False + + return True + + def add_check(self, rule): + """ + Allows addition of another rule to the list of rules that will + be tested. Returns the AndCheck object for convenience. + """ + + self.rules.append(rule) + return self + + +class OrCheck(BaseCheck): + """ + A policy check that requires that at least one of a list of other + checks returns True. Implements the "or" operator. + """ + + def __init__(self, rules): + """ + Initialize the 'or' check. + + :param rules: A list of rules that will be tested. + """ + + self.rules = rules + + def __str__(self): + """Return a string representation of this check.""" + + return "(%s)" % ' or '.join(str(r) for r in self.rules) + + def __call__(self, target, cred): + """ + Check the policy. Requires that at least one rule accept in + order to return True. + """ + + for rule in self.rules: + if rule(target, cred): return True + return False + def add_check(self, rule): + """ + Allows addition of another rule to the list of rules that will + be tested. Returns the OrCheck object for convenience. + """ -class HttpBrain(Brain): - """A brain that can check external urls for policy. + self.rules.append(rule) + return self - Posts json blobs for target and credentials. - Note that this brain is deprecated; the http check is registered - by default. +def _parse_check(rule): + """ + Parse a single base check rule into an appropriate Check object. + """ + + # Handle the special checks + if rule == '!': + return FalseCheck() + elif rule == '@': + return TrueCheck() + + try: + kind, match = rule.split(':', 1) + except Exception: + LOG.exception(_("Failed to understand rule %(rule)s") % locals()) + # If the rule is invalid, we'll fail closed + return FalseCheck() + + # Find what implements the check + if kind in _checks: + return _checks[kind](kind, match) + elif None in _checks: + return _checks[None](kind, match) + else: + LOG.error(_("No handler for matches of kind %s") % kind) + return FalseCheck() + + +def _parse_list_rule(rule): + """ + Provided for backwards compatibility. Translates the old + list-of-lists syntax into a tree of Check objects. """ - pass + # Empty rule defaults to True + if not rule: + return TrueCheck() + + # Outer list is joined by "or"; inner list by "and" + or_list = [] + for inner_rule in rule: + # Elide empty inner lists + if not inner_rule: + continue + + # Handle bare strings + if isinstance(inner_rule, basestring): + inner_rule = [inner_rule] + + # Parse the inner rules into Check objects + and_list = [_parse_check(r) for r in inner_rule] + + # Append the appropriate check to the or_list + if len(and_list) == 1: + or_list.append(and_list[0]) + else: + or_list.append(AndCheck(and_list)) + + # If we have only one check, omit the "or" + if len(or_list) == 0: + return FalseCheck() + elif len(or_list) == 1: + return or_list[0] + + return OrCheck(or_list) + + +# Used for tokenizing the policy language +_tokenize_re = re.compile(r'\s+') + + +def _parse_tokenize(rule): + """ + Tokenizer for the policy language. + + Most of the single-character tokens are specified in the + _tokenize_re; however, parentheses need to be handled specially, + because they can appear inside a check string. Thankfully, those + parentheses that appear inside a check string can never occur at + the very beginning or end ("%(variable)s" is the correct syntax). + """ + + for tok in _tokenize_re.split(rule): + # Skip empty tokens + if not tok or tok.isspace(): + continue + + # Handle leading parens on the token + clean = tok.lstrip('(') + for i in range(len(tok) - len(clean)): + yield '(', '(' + + # If it was only parentheses, continue + if not clean: + continue + else: + tok = clean + + # Handle trailing parens on the token + clean = tok.rstrip(')') + trail = len(tok) - len(clean) + + # Yield the cleaned token + lowered = clean.lower() + if lowered in ('and', 'or', 'not'): + # Special tokens + yield lowered, clean + elif clean: + # Not a special token, but not composed solely of ')' + if len(tok) >= 2 and ((tok[0], tok[-1]) in + [('"', '"'), ("'", "'")]): + # It's a quoted string + yield 'string', tok[1:-1] + else: + yield 'check', _parse_check(clean) + + # Yield the trailing parens + for i in range(trail): + yield ')', ')' + + +class ParseStateMeta(type): + """ + Metaclass for the ParseState class. Facilitates identifying + reduction methods. + """ + + def __new__(mcs, name, bases, cls_dict): + """ + Create the class. Injects the 'reducers' list, a list of + tuples matching token sequences to the names of the + corresponding reduction methods. + """ + + reducers = [] + + for key, value in cls_dict.items(): + if not hasattr(value, 'reducers'): + continue + for reduction in value.reducers: + reducers.append((reduction, key)) + + cls_dict['reducers'] = reducers + + return super(ParseStateMeta, mcs).__new__(mcs, name, bases, cls_dict) + + +def reducer(*tokens): + """ + Decorator for reduction methods. Arguments are a sequence of + tokens, in order, which should trigger running this reduction + method. + """ + + def decorator(func): + # Make sure we have a list of reducer sequences + if not hasattr(func, 'reducers'): + func.reducers = [] + + # Add the tokens to the list of reducer sequences + func.reducers.append(list(tokens)) + + return func + + return decorator + + +class ParseState(object): + """ + Implement the core of parsing the policy language. Uses a greedy + reduction algorithm to reduce a sequence of tokens into a single + terminal, the value of which will be the root of the Check tree. + + Note: error reporting is rather lacking. The best we can get with + this parser formulation is an overall "parse failed" error. + Fortunately, the policy language is simple enough that this + shouldn't be that big a problem. + """ + + __metaclass__ = ParseStateMeta + + def __init__(self): + """Initialize the ParseState.""" + + self.tokens = [] + self.values = [] + + def reduce(self): + """ + Perform a greedy reduction of the token stream. If a reducer + method matches, it will be executed, then the reduce() method + will be called recursively to search for any more possible + reductions. + """ + + for reduction, methname in self.reducers: + if (len(self.tokens) >= len(reduction) and + self.tokens[-len(reduction):] == reduction): + # Get the reduction method + meth = getattr(self, methname) + + # Reduce the token stream + results = meth(*self.values[-len(reduction):]) + + # Update the tokens and values + self.tokens[-len(reduction):] = [r[0] for r in results] + self.values[-len(reduction):] = [r[1] for r in results] + + # Check for any more reductions + return self.reduce() + + def shift(self, tok, value): + """Adds one more token to the state. Calls reduce().""" + + self.tokens.append(tok) + self.values.append(value) + + # Do a greedy reduce... + self.reduce() + + @property + def result(self): + """ + Obtain the final result of the parse. Raises ValueError if + the parse failed to reduce to a single result. + """ + + if len(self.values) != 1: + raise ValueError("Could not parse rule") + return self.values[0] + + @reducer('(', 'check', ')') + @reducer('(', 'and_expr', ')') + @reducer('(', 'or_expr', ')') + def _wrap_check(self, _p1, check, _p2): + """Turn parenthesized expressions into a 'check' token.""" + + return [('check', check)] + + @reducer('check', 'and', 'check') + def _make_and_expr(self, check1, _and, check2): + """ + Create an 'and_expr' from two checks joined by the 'and' + operator. + """ + + return [('and_expr', AndCheck([check1, check2]))] + + @reducer('and_expr', 'and', 'check') + def _extend_and_expr(self, and_expr, _and, check): + """ + Extend an 'and_expr' by adding one more check. + """ + + return [('and_expr', and_expr.add_check(check))] + + @reducer('check', 'or', 'check') + def _make_or_expr(self, check1, _or, check2): + """ + Create an 'or_expr' from two checks joined by the 'or' + operator. + """ + + return [('or_expr', OrCheck([check1, check2]))] + + @reducer('or_expr', 'or', 'check') + def _extend_or_expr(self, or_expr, _or, check): + """ + Extend an 'or_expr' by adding one more check. + """ + + return [('or_expr', or_expr.add_check(check))] + + @reducer('not', 'check') + def _make_not_expr(self, _not, check): + """Invert the result of another check.""" + + return [('check', NotCheck(check))] + + +def _parse_text_rule(rule): + """ + Translates a policy written in the policy language into a tree of + Check objects. + """ + + # Empty rule means always accept + if not rule: + return TrueCheck() + + # Parse the token stream + state = ParseState() + for tok, value in _parse_tokenize(rule): + state.shift(tok, value) + + try: + return state.result + except ValueError: + # Couldn't parse the rule + LOG.exception(_("Failed to understand rule %(rule)r") % locals()) + + # Fail closed + return FalseCheck() + + +def parse_rule(rule): + """ + Parses a policy rule into a tree of Check objects. + """ + + # If the rule is a string, it's in the policy language + if isinstance(rule, basestring): + return _parse_text_rule(rule) + return _parse_list_rule(rule) def register(name, func=None): """ - Register a function as a policy check. + Register a function or Check class as a policy check. :param name: Gives the name of the check type, e.g., 'rule', - 'role', etc. If name is None, a default function + 'role', etc. If name is None, a default check type will be registered. - :param func: If given, provides the function to register. If not - given, returns a function taking one argument to - specify the function to register, allowing use as a - decorator. + :param func: If given, provides the function or class to register. + If not given, returns a function taking one argument + to specify the function or class to register, + allowing use as a decorator. """ - # Perform the actual decoration by registering the function. - # Returns the function for compliance with the decorator - # interface. + # Perform the actual decoration by registering the function or + # class. Returns the function or class for compliance with the + # decorator interface. def decorator(func): - # Register the function - Brain._register(name, func) + _checks[name] = func return func - # If the function is given, do the registration + # If the function or class is given, do the registration if func: return decorator(func) @@ -247,55 +721,59 @@ def register(name, func=None): @register("rule") -def _check_rule(brain, match_kind, match, target_dict, cred_dict): - """Recursively checks credentials based on the brains rules.""" - try: - new_match_list = brain.rules[match] - except KeyError: - if brain.default_rule and match != brain.default_rule: - new_match_list = ('rule:%s' % brain.default_rule,) - else: - return False +class RuleCheck(Check): + def __call__(self, target, creds): + """ + Recursively checks credentials based on the defined rules. + """ - return brain.check(new_match_list, target_dict, cred_dict) + try: + return _rules[self.match](target, creds) + except KeyError: + # We don't have any matching rule; fail closed + return False @register("role") -def _check_role(brain, match_kind, match, target_dict, cred_dict): - """Check that there is a matching role in the cred dict.""" - return match.lower() in [x.lower() for x in cred_dict['roles']] +class RoleCheck(Check): + def __call__(self, target, creds): + """Check that there is a matching role in the cred dict.""" + + return self.match.lower() in [x.lower() for x in creds['roles']] @register('http') -def _check_http(brain, match_kind, match, target_dict, cred_dict): - """Check http: rules by calling to a remote server. +class HttpCheck(Check): + def __call__(self, target, creds): + """ + Check http: rules by calling to a remote server. - This example implementation simply verifies that the response is - exactly 'True'. A custom brain using response codes could easily - be implemented. + This example implementation simply verifies that the response + is exactly 'True'. + """ - """ - url = 'http:' + (match % target_dict) - data = {'target': jsonutils.dumps(target_dict), - 'credentials': jsonutils.dumps(cred_dict)} - post_data = urllib.urlencode(data) - f = urllib2.urlopen(url, post_data) - return f.read() == "True" + url = ('http:' + self.match) % target + data = {'target': jsonutils.dumps(target), + 'credentials': jsonutils.dumps(creds)} + post_data = urllib.urlencode(data) + f = urllib2.urlopen(url, post_data) + return f.read() == "True" @register(None) -def _check_generic(brain, match_kind, match, target_dict, cred_dict): - """Check an individual match. - - Matches look like: +class GenericCheck(Check): + def __call__(self, target, creds): + """ + Check an individual match. - tenant:%(tenant_id)s - role:compute:admin + Matches look like: - """ + tenant:%(tenant_id)s + role:compute:admin + """ - # TODO(termie): do dict inspection via dot syntax - match = match % target_dict - if match_kind in cred_dict: - return match == unicode(cred_dict[match_kind]) - return False + # TODO(termie): do dict inspection via dot syntax + match = self.match % target + if self.kind in creds: + return match == unicode(creds[self.kind]) + return False diff --git a/ceilometer/openstack/common/rpc/__init__.py b/ceilometer/openstack/common/rpc/__init__.py index e20279bb..e91ac20e 100644 --- a/ceilometer/openstack/common/rpc/__init__.py +++ b/ceilometer/openstack/common/rpc/__init__.py @@ -50,25 +50,26 @@ rpc_opts = [ default=['ceilometer.openstack.common.exception', 'nova.exception', 'cinder.exception', + 'exceptions', ], help='Modules of exceptions that are permitted to be recreated' 'upon receiving exception data from an rpc call.'), cfg.BoolOpt('fake_rabbit', default=False, help='If passed, use a fake RabbitMQ provider'), - # - # The following options are not registered here, but are expected to be - # present. The project using this library must register these options with - # the configuration so that project-specific defaults may be defined. - # - #cfg.StrOpt('control_exchange', - # default='nova', - # help='AMQP exchange to connect to if using RabbitMQ or Qpid'), + cfg.StrOpt('control_exchange', + default='openstack', + help='AMQP exchange to connect to if using RabbitMQ or Qpid'), ] cfg.CONF.register_opts(rpc_opts) +def set_defaults(control_exchange): + cfg.set_defaults(rpc_opts, + control_exchange=control_exchange) + + def create_connection(new=True): """Create a connection to the message bus used for rpc. @@ -177,17 +178,18 @@ def multicall(context, topic, msg, timeout=None): return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout) -def notify(context, topic, msg): +def notify(context, topic, msg, envelope=False): """Send notification event. :param context: Information that identifies the user that has made this request. :param topic: The topic to send the notification to. :param msg: This is a dict of content of event. + :param envelope: Set to True to enable message envelope for notifications. :returns: None """ - return _get_impl().notify(cfg.CONF, context, topic, msg) + return _get_impl().notify(cfg.CONF, context, topic, msg, envelope) def cleanup(): diff --git a/ceilometer/openstack/common/rpc/amqp.py b/ceilometer/openstack/common/rpc/amqp.py index e86e748a..a69b0ced 100644 --- a/ceilometer/openstack/common/rpc/amqp.py +++ b/ceilometer/openstack/common/rpc/amqp.py @@ -26,7 +26,6 @@ AMQP, but is deprecated and predates this code. """ import inspect -import logging import sys import uuid @@ -34,10 +33,10 @@ from eventlet import greenpool from eventlet import pools from eventlet import semaphore -from ceilometer.openstack.common import cfg from ceilometer.openstack.common import excutils from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import local +from ceilometer.openstack.common import log as logging from ceilometer.openstack.common.rpc import common as rpc_common @@ -55,7 +54,7 @@ class Pool(pools.Pool): # TODO(comstud): Timeout connections not used in a while def create(self): - LOG.debug('Pool creating new connection') + LOG.debug(_('Pool creating new connection')) return self.connection_cls(self.conf) def empty(self): @@ -150,7 +149,7 @@ class ConnectionContext(rpc_common.Connection): def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None, - ending=False): + ending=False, log_failure=True): """Sends a reply or an error on the channel signified by msg_id. Failure should be a sys.exc_info() tuple. @@ -158,7 +157,8 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None, """ with ConnectionContext(conf, connection_pool) as conn: if failure: - failure = rpc_common.serialize_remote_exception(failure) + failure = rpc_common.serialize_remote_exception(failure, + log_failure) try: msg = {'result': reply, 'failure': failure} @@ -168,7 +168,7 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None, 'failure': failure} if ending: msg['ending'] = True - conn.direct_send(msg_id, msg) + conn.direct_send(msg_id, rpc_common.serialize_msg(msg)) class RpcContext(rpc_common.CommonRpcContext): @@ -185,10 +185,10 @@ class RpcContext(rpc_common.CommonRpcContext): return self.__class__(**values) def reply(self, reply=None, failure=None, ending=False, - connection_pool=None): + connection_pool=None, log_failure=True): if self.msg_id: msg_reply(self.conf, self.msg_id, connection_pool, reply, failure, - ending) + ending, log_failure) if ending: self.msg_id = None @@ -282,11 +282,21 @@ class ProxyCallback(object): ctxt.reply(rval, None, connection_pool=self.connection_pool) # This final None tells multicall that it is done. ctxt.reply(ending=True, connection_pool=self.connection_pool) - except Exception as e: - LOG.exception('Exception during message handling') + except rpc_common.ClientException as e: + LOG.debug(_('Expected exception during message handling (%s)') % + e._exc_info[1]) + ctxt.reply(None, e._exc_info, + connection_pool=self.connection_pool, + log_failure=False) + except Exception: + LOG.exception(_('Exception during message handling')) ctxt.reply(None, sys.exc_info(), connection_pool=self.connection_pool) + def wait(self): + """Wait for all callback threads to exit.""" + self.pool.waitall() + class MulticallWaiter(object): def __init__(self, conf, connection, timeout): @@ -349,7 +359,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool): # that will continue to use the connection. When it's done, # connection.close() will get called which will put it back into # the pool - LOG.debug(_('Making asynchronous call on %s ...'), topic) + LOG.debug(_('Making synchronous call on %s ...'), topic) msg_id = uuid.uuid4().hex msg.update({'_msg_id': msg_id}) LOG.debug(_('MSG_ID is %s') % (msg_id)) @@ -358,7 +368,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool): conn = ConnectionContext(conf, connection_pool) wait_msg = MulticallWaiter(conf, conn, timeout) conn.declare_direct_consumer(msg_id, wait_msg) - conn.topic_send(topic, msg) + conn.topic_send(topic, rpc_common.serialize_msg(msg)) return wait_msg @@ -377,7 +387,7 @@ def cast(conf, context, topic, msg, connection_pool): LOG.debug(_('Making asynchronous cast on %s...'), topic) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: - conn.topic_send(topic, msg) + conn.topic_send(topic, rpc_common.serialize_msg(msg)) def fanout_cast(conf, context, topic, msg, connection_pool): @@ -385,7 +395,7 @@ def fanout_cast(conf, context, topic, msg, connection_pool): LOG.debug(_('Making asynchronous fanout cast...')) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: - conn.fanout_send(topic, msg) + conn.fanout_send(topic, rpc_common.serialize_msg(msg)) def cast_to_server(conf, context, server_params, topic, msg, connection_pool): @@ -393,7 +403,7 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool): pack_context(msg, context) with ConnectionContext(conf, connection_pool, pooled=False, server_params=server_params) as conn: - conn.topic_send(topic, msg) + conn.topic_send(topic, rpc_common.serialize_msg(msg)) def fanout_cast_to_server(conf, context, server_params, topic, msg, @@ -402,15 +412,18 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg, pack_context(msg, context) with ConnectionContext(conf, connection_pool, pooled=False, server_params=server_params) as conn: - conn.fanout_send(topic, msg) + conn.fanout_send(topic, rpc_common.serialize_msg(msg)) -def notify(conf, context, topic, msg, connection_pool): +def notify(conf, context, topic, msg, connection_pool, envelope): """Sends a notification event on a topic.""" - event_type = msg.get('event_type') - LOG.debug(_('Sending %(event_type)s on %(topic)s'), locals()) + LOG.debug(_('Sending %(event_type)s on %(topic)s'), + dict(event_type=msg.get('event_type'), + topic=topic)) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: + if envelope: + msg = rpc_common.serialize_msg(msg, force_envelope=True) conn.notify_send(topic, msg) @@ -420,7 +433,4 @@ def cleanup(connection_pool): def get_control_exchange(conf): - try: - return conf.control_exchange - except cfg.NoSuchOptError: - return 'openstack' + return conf.control_exchange diff --git a/ceilometer/openstack/common/rpc/common.py b/ceilometer/openstack/common/rpc/common.py index 84f06c1b..0bfbb8a3 100644 --- a/ceilometer/openstack/common/rpc/common.py +++ b/ceilometer/openstack/common/rpc/common.py @@ -18,18 +18,61 @@ # under the License. import copy -import logging +import sys import traceback +from ceilometer.openstack.common import cfg from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import importutils from ceilometer.openstack.common import jsonutils from ceilometer.openstack.common import local +from ceilometer.openstack.common import log as logging +CONF = cfg.CONF LOG = logging.getLogger(__name__) +'''RPC Envelope Version. + +This version number applies to the top level structure of messages sent out. +It does *not* apply to the message payload, which must be versioned +independently. For example, when using rpc APIs, a version number is applied +for changes to the API being exposed over rpc. This version number is handled +in the rpc proxy and dispatcher modules. + +This version number applies to the message envelope that is used in the +serialization done inside the rpc layer. See serialize_msg() and +deserialize_msg(). + +The current message format (version 2.0) is very simple. It is: + + { + 'ceilometer.version': <RPC Envelope Version as a String>, + 'ceilometer.message': <Application Message Payload, JSON encoded> + } + +Message format version '1.0' is just considered to be the messages we sent +without a message envelope. + +So, the current message envelope just includes the envelope version. It may +eventually contain additional information, such as a signature for the message +payload. + +We will JSON encode the application message payload. The message envelope, +which includes the JSON encoded application message body, will be passed down +to the messaging libraries as a dict. +''' +_RPC_ENVELOPE_VERSION = '2.0' + +_VERSION_KEY = 'ceilometer.version' +_MESSAGE_KEY = 'ceilometer.message' + + +# TODO(russellb) Turn this on after Grizzly. +_SEND_RPC_ENVELOPE = False + + class RPCException(Exception): message = _("An unknown RPC related exception occurred.") @@ -40,7 +83,7 @@ class RPCException(Exception): try: message = self.message % kwargs - except Exception as e: + except Exception: # kwargs doesn't match a variable in the message # log the issue and the kwargs LOG.exception(_('Exception in string format operation')) @@ -90,6 +133,11 @@ class UnsupportedRpcVersion(RPCException): "this endpoint.") +class UnsupportedRpcEnvelopeVersion(RPCException): + message = _("Specified RPC envelope version, %(version)s, " + "not supported by this endpoint.") + + class Connection(object): """A connection, returned by rpc.create_connection(). @@ -164,8 +212,12 @@ class Connection(object): def _safe_log(log_func, msg, msg_data): """Sanitizes the msg_data field before logging.""" - SANITIZE = {'set_admin_password': ('new_pass',), - 'run_instance': ('admin_password',), } + SANITIZE = {'set_admin_password': [('args', 'new_pass')], + 'run_instance': [('args', 'admin_password')], + 'route_message': [('args', 'message', 'args', 'method_info', + 'method_kwargs', 'password'), + ('args', 'message', 'args', 'method_info', + 'method_kwargs', 'admin_password')]} has_method = 'method' in msg_data and msg_data['method'] in SANITIZE has_context_token = '_context_auth_token' in msg_data @@ -177,14 +229,16 @@ def _safe_log(log_func, msg, msg_data): msg_data = copy.deepcopy(msg_data) if has_method: - method = msg_data['method'] - if method in SANITIZE: - args_to_sanitize = SANITIZE[method] - for arg in args_to_sanitize: - try: - msg_data['args'][arg] = "<SANITIZED>" - except KeyError: - pass + for arg in SANITIZE.get(msg_data['method'], []): + try: + d = msg_data + for elem in arg[:-1]: + d = d[elem] + d[arg[-1]] = '<SANITIZED>' + except KeyError, e: + LOG.info(_('Failed to sanitize %(item)s. Key error %(err)s'), + {'item': arg, + 'err': e}) if has_context_token: msg_data['_context_auth_token'] = '<SANITIZED>' @@ -195,7 +249,7 @@ def _safe_log(log_func, msg, msg_data): return log_func(msg, msg_data) -def serialize_remote_exception(failure_info): +def serialize_remote_exception(failure_info, log_failure=True): """Prepares exception data to be sent over rpc. Failure_info should be a sys.exc_info() tuple. @@ -203,8 +257,9 @@ def serialize_remote_exception(failure_info): """ tb = traceback.format_exception(*failure_info) failure = failure_info[1] - LOG.error(_("Returning exception %s to caller"), unicode(failure)) - LOG.error(tb) + if log_failure: + LOG.error(_("Returning exception %s to caller"), unicode(failure)) + LOG.error(tb) kwargs = {} if hasattr(failure, 'kwargs'): @@ -258,7 +313,7 @@ def deserialize_remote_exception(conf, data): # we cannot necessarily change an exception message so we must override # the __str__ method. failure.__class__ = new_ex_type - except TypeError as e: + except TypeError: # NOTE(ameade): If a core exception then just add the traceback to the # first exception argument. failure.args = (message,) + failure.args[1:] @@ -309,3 +364,107 @@ class CommonRpcContext(object): context.values['read_deleted'] = read_deleted return context + + +class ClientException(Exception): + """This encapsulates some actual exception that is expected to be + hit by an RPC proxy object. Merely instantiating it records the + current exception information, which will be passed back to the + RPC client without exceptional logging.""" + def __init__(self): + self._exc_info = sys.exc_info() + + +def catch_client_exception(exceptions, func, *args, **kwargs): + try: + return func(*args, **kwargs) + except Exception, e: + if type(e) in exceptions: + raise ClientException() + else: + raise + + +def client_exceptions(*exceptions): + """Decorator for manager methods that raise expected exceptions. + Marking a Manager method with this decorator allows the declaration + of expected exceptions that the RPC layer should not consider fatal, + and not log as if they were generated in a real error scenario. Note + that this will cause listed exceptions to be wrapped in a + ClientException, which is used internally by the RPC layer.""" + def outer(func): + def inner(*args, **kwargs): + return catch_client_exception(exceptions, func, *args, **kwargs) + return inner + return outer + + +def version_is_compatible(imp_version, version): + """Determine whether versions are compatible. + + :param imp_version: The version implemented + :param version: The version requested by an incoming message. + """ + version_parts = version.split('.') + imp_version_parts = imp_version.split('.') + if int(version_parts[0]) != int(imp_version_parts[0]): # Major + return False + if int(version_parts[1]) > int(imp_version_parts[1]): # Minor + return False + return True + + +def serialize_msg(raw_msg, force_envelope=False): + if not _SEND_RPC_ENVELOPE and not force_envelope: + return raw_msg + + # NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more + # information about this format. + msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION, + _MESSAGE_KEY: jsonutils.dumps(raw_msg)} + + return msg + + +def deserialize_msg(msg): + # NOTE(russellb): Hang on to your hats, this road is about to + # get a little bumpy. + # + # Robustness Principle: + # "Be strict in what you send, liberal in what you accept." + # + # At this point we have to do a bit of guessing about what it + # is we just received. Here is the set of possibilities: + # + # 1) We received a dict. This could be 2 things: + # + # a) Inspect it to see if it looks like a standard message envelope. + # If so, great! + # + # b) If it doesn't look like a standard message envelope, it could either + # be a notification, or a message from before we added a message + # envelope (referred to as version 1.0). + # Just return the message as-is. + # + # 2) It's any other non-dict type. Just return it and hope for the best. + # This case covers return values from rpc.call() from before message + # envelopes were used. (messages to call a method were always a dict) + + if not isinstance(msg, dict): + # See #2 above. + return msg + + base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY) + if not all(map(lambda key: key in msg, base_envelope_keys)): + # See #1.b above. + return msg + + # At this point we think we have the message envelope + # format we were expecting. (#1.a above) + + if not version_is_compatible(_RPC_ENVELOPE_VERSION, msg[_VERSION_KEY]): + raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY]) + + raw_msg = jsonutils.loads(msg[_MESSAGE_KEY]) + + return raw_msg diff --git a/ceilometer/openstack/common/rpc/dispatcher.py b/ceilometer/openstack/common/rpc/dispatcher.py index d1136acb..927af0db 100644 --- a/ceilometer/openstack/common/rpc/dispatcher.py +++ b/ceilometer/openstack/common/rpc/dispatcher.py @@ -41,8 +41,8 @@ server side of the API at the same time. However, as the code stands today, there can be both versioned and unversioned APIs implemented in the same code base. - -EXAMPLES: +EXAMPLES +======== Nova was the first project to use versioned rpc APIs. Consider the compute rpc API as an example. The client side is in nova/compute/rpcapi.py and the server @@ -50,12 +50,13 @@ side is in nova/compute/manager.py. Example 1) Adding a new method. +------------------------------- Adding a new method is a backwards compatible change. It should be added to nova/compute/manager.py, and RPC_API_VERSION should be bumped from X.Y to X.Y+1. On the client side, the new method in nova/compute/rpcapi.py should have a specific version specified to indicate the minimum API version that must -be implemented for the method to be supported. For example: +be implemented for the method to be supported. For example:: def get_host_uptime(self, ctxt, host): topic = _compute_topic(self.topic, ctxt, host, None) @@ -67,10 +68,11 @@ get_host_uptime() method. Example 2) Adding a new parameter. +---------------------------------- Adding a new parameter to an rpc method can be made backwards compatible. The RPC_API_VERSION on the server side (nova/compute/manager.py) should be bumped. -The implementation of the method must not expect the parameter to be present. +The implementation of the method must not expect the parameter to be present.:: def some_remote_method(self, arg1, arg2, newarg=None): # The code needs to deal with newarg=None for cases @@ -101,21 +103,6 @@ class RpcDispatcher(object): self.callbacks = callbacks super(RpcDispatcher, self).__init__() - @staticmethod - def _is_compatible(mversion, version): - """Determine whether versions are compatible. - - :param mversion: The API version implemented by a callback. - :param version: The API version requested by an incoming message. - """ - version_parts = version.split('.') - mversion_parts = mversion.split('.') - if int(version_parts[0]) != int(mversion_parts[0]): # Major - return False - if int(version_parts[1]) > int(mversion_parts[1]): # Minor - return False - return True - def dispatch(self, ctxt, version, method, **kwargs): """Dispatch a message based on a requested version. @@ -137,7 +124,8 @@ class RpcDispatcher(object): rpc_api_version = proxyobj.RPC_API_VERSION else: rpc_api_version = '1.0' - is_compatible = self._is_compatible(rpc_api_version, version) + is_compatible = rpc_common.version_is_compatible(rpc_api_version, + version) had_compatible = had_compatible or is_compatible if not hasattr(proxyobj, method): continue diff --git a/ceilometer/openstack/common/rpc/impl_fake.py b/ceilometer/openstack/common/rpc/impl_fake.py index f1efab2b..72041bfe 100644 --- a/ceilometer/openstack/common/rpc/impl_fake.py +++ b/ceilometer/openstack/common/rpc/impl_fake.py @@ -18,11 +18,15 @@ queues. Casts will block, but this is very useful for tests. """ import inspect +# NOTE(russellb): We specifically want to use json, not our own jsonutils. +# jsonutils has some extra logic to automatically convert objects to primitive +# types so that they can be serialized. We want to catch all cases where +# non-primitive types make it into this code and treat it as an error. +import json import time import eventlet -from ceilometer.openstack.common import jsonutils from ceilometer.openstack.common.rpc import common as rpc_common CONSUMERS = {} @@ -75,6 +79,8 @@ class Consumer(object): else: res.append(rval) done.send(res) + except rpc_common.ClientException as e: + done.send_exception(e._exc_info[1]) except Exception as e: done.send_exception(e) @@ -121,7 +127,7 @@ def create_connection(conf, new=True): def check_serialize(msg): """Make sure a message intended for rpc can be serialized.""" - jsonutils.dumps(msg) + json.dumps(msg) def multicall(conf, context, topic, msg, timeout=None): @@ -154,13 +160,14 @@ def call(conf, context, topic, msg, timeout=None): def cast(conf, context, topic, msg): + check_serialize(msg) try: call(conf, context, topic, msg) except Exception: pass -def notify(conf, context, topic, msg): +def notify(conf, context, topic, msg, envelope): check_serialize(msg) diff --git a/ceilometer/openstack/common/rpc/impl_kombu.py b/ceilometer/openstack/common/rpc/impl_kombu.py index 6e06357d..e120e3bb 100644 --- a/ceilometer/openstack/common/rpc/impl_kombu.py +++ b/ceilometer/openstack/common/rpc/impl_kombu.py @@ -162,7 +162,8 @@ class ConsumerBase(object): def _callback(raw_message): message = self.channel.message_to_python(raw_message) try: - callback(message.payload) + msg = rpc_common.deserialize_msg(message.payload) + callback(msg) message.ack() except Exception: LOG.exception(_("Failed to process message... skipping it.")) @@ -196,7 +197,7 @@ class DirectConsumer(ConsumerBase): # Default options options = {'durable': False, 'auto_delete': True, - 'exclusive': True} + 'exclusive': False} options.update(kwargs) exchange = kombu.entity.Exchange(name=msg_id, type='direct', @@ -269,7 +270,7 @@ class FanoutConsumer(ConsumerBase): options = {'durable': False, 'queue_arguments': _get_queue_arguments(conf), 'auto_delete': True, - 'exclusive': True} + 'exclusive': False} options.update(kwargs) exchange = kombu.entity.Exchange(name=exchange_name, type='fanout', durable=options['durable'], @@ -316,7 +317,7 @@ class DirectPublisher(Publisher): options = {'durable': False, 'auto_delete': True, - 'exclusive': True} + 'exclusive': False} options.update(kwargs) super(DirectPublisher, self).__init__(channel, msg_id, msg_id, type='direct', **options) @@ -350,7 +351,7 @@ class FanoutPublisher(Publisher): """ options = {'durable': False, 'auto_delete': True, - 'exclusive': True} + 'exclusive': False} options.update(kwargs) super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic, None, type='fanout', **options) @@ -387,6 +388,7 @@ class Connection(object): def __init__(self, conf, server_params=None): self.consumers = [] self.consumer_thread = None + self.proxy_callbacks = [] self.conf = conf self.max_retries = self.conf.rabbit_max_retries # Try forever? @@ -469,7 +471,7 @@ class Connection(object): LOG.info(_("Reconnecting to AMQP server on " "%(hostname)s:%(port)d") % params) try: - self.connection.close() + self.connection.release() except self.connection_errors: pass # Setting this in case the next statement fails, though @@ -573,12 +575,14 @@ class Connection(object): def close(self): """Close/release this connection""" self.cancel_consumer_thread() + self.wait_on_proxy_callbacks() self.connection.release() self.connection = None def reset(self): """Reset a connection so it can be used again""" self.cancel_consumer_thread() + self.wait_on_proxy_callbacks() self.channel.close() self.channel = self.connection.channel() # work around 'memory' transport bug in 1.1.3 @@ -644,6 +648,11 @@ class Connection(object): pass self.consumer_thread = None + def wait_on_proxy_callbacks(self): + """Wait for all proxy callback threads to exit.""" + for proxy_cb in self.proxy_callbacks: + proxy_cb.wait() + def publisher_send(self, cls, topic, msg, **kwargs): """Send to a publisher based on the publisher class""" @@ -719,6 +728,7 @@ class Connection(object): proxy_cb = rpc_amqp.ProxyCallback( self.conf, proxy, rpc_amqp.get_connection_pool(self.conf, Connection)) + self.proxy_callbacks.append(proxy_cb) if fanout: self.declare_fanout_consumer(topic, proxy_cb) @@ -730,6 +740,7 @@ class Connection(object): proxy_cb = rpc_amqp.ProxyCallback( self.conf, proxy, rpc_amqp.get_connection_pool(self.conf, Connection)) + self.proxy_callbacks.append(proxy_cb) self.declare_topic_consumer(topic, proxy_cb, pool_name) @@ -782,11 +793,12 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg): rpc_amqp.get_connection_pool(conf, Connection)) -def notify(conf, context, topic, msg): +def notify(conf, context, topic, msg, envelope): """Sends a notification event on a topic.""" return rpc_amqp.notify( conf, context, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + rpc_amqp.get_connection_pool(conf, Connection), + envelope) def cleanup(): diff --git a/ceilometer/openstack/common/rpc/impl_qpid.py b/ceilometer/openstack/common/rpc/impl_qpid.py index 0a6c4fac..0defe021 100644 --- a/ceilometer/openstack/common/rpc/impl_qpid.py +++ b/ceilometer/openstack/common/rpc/impl_qpid.py @@ -17,21 +17,23 @@ import functools import itertools -import logging import time import uuid import eventlet import greenlet -import qpid.messaging -import qpid.messaging.exceptions from ceilometer.openstack.common import cfg from ceilometer.openstack.common.gettextutils import _ +from ceilometer.openstack.common import importutils from ceilometer.openstack.common import jsonutils +from ceilometer.openstack.common import log as logging from ceilometer.openstack.common.rpc import amqp as rpc_amqp from ceilometer.openstack.common.rpc import common as rpc_common +qpid_messaging = importutils.try_import("qpid.messaging") +qpid_exceptions = importutils.try_import("qpid.messaging.exceptions") + LOG = logging.getLogger(__name__) qpid_opts = [ @@ -41,6 +43,9 @@ qpid_opts = [ cfg.StrOpt('qpid_port', default='5672', help='Qpid broker port'), + cfg.ListOpt('qpid_hosts', + default=['$qpid_hostname:$qpid_port'], + help='Qpid HA cluster host:port pairs'), cfg.StrOpt('qpid_username', default='', help='Username for qpid connection'), @@ -121,7 +126,8 @@ class ConsumerBase(object): """Fetch the message and pass it to the callback object""" message = self.receiver.fetch() try: - self.callback(message.content) + msg = rpc_common.deserialize_msg(message.content) + self.callback(msg) except Exception: LOG.exception(_("Failed to process message... skipping it.")) finally: @@ -271,28 +277,38 @@ class Connection(object): pool = None def __init__(self, conf, server_params=None): + if not qpid_messaging: + raise ImportError("Failed to import qpid.messaging") + self.session = None self.consumers = {} self.consumer_thread = None + self.proxy_callbacks = [] self.conf = conf + if server_params and 'hostname' in server_params: + # NOTE(russellb) This enables support for cast_to_server. + server_params['qpid_hosts'] = [ + '%s:%d' % (server_params['hostname'], + server_params.get('port', 5672)) + ] + params = { - 'hostname': self.conf.qpid_hostname, - 'port': self.conf.qpid_port, + 'qpid_hosts': self.conf.qpid_hosts, 'username': self.conf.qpid_username, 'password': self.conf.qpid_password, } params.update(server_params or {}) - self.broker = params['hostname'] + ":" + str(params['port']) + self.brokers = params['qpid_hosts'] self.username = params['username'] self.password = params['password'] - self.connection_create() + self.connection_create(self.brokers[0]) self.reconnect() - def connection_create(self): + def connection_create(self, broker): # Create the connection - this does not open the connection - self.connection = qpid.messaging.Connection(self.broker) + self.connection = qpid_messaging.Connection(broker) # Check if flags are set and if so set them for the connection # before we call open @@ -317,15 +333,19 @@ class Connection(object): if self.connection.opened(): try: self.connection.close() - except qpid.messaging.exceptions.ConnectionError: + except qpid_exceptions.ConnectionError: pass + attempt = 0 delay = 1 while True: + broker = self.brokers[attempt % len(self.brokers)] + attempt += 1 + try: - self.connection_create() + self.connection_create(broker) self.connection.open() - except qpid.messaging.exceptions.ConnectionError, e: + except qpid_exceptions.ConnectionError, e: msg_dict = dict(e=e, delay=delay) msg = _("Unable to connect to AMQP server: %(e)s. " "Sleeping %(delay)s seconds") % msg_dict @@ -333,10 +353,9 @@ class Connection(object): time.sleep(delay) delay = min(2 * delay, 60) else: + LOG.info(_('Connected to AMQP server on %s'), broker) break - LOG.info(_('Connected to AMQP server on %s'), self.broker) - self.session = self.connection.session() if self.consumers: @@ -353,8 +372,8 @@ class Connection(object): while True: try: return method(*args, **kwargs) - except (qpid.messaging.exceptions.Empty, - qpid.messaging.exceptions.ConnectionError), e: + except (qpid_exceptions.Empty, + qpid_exceptions.ConnectionError), e: if error_callback: error_callback(e) self.reconnect() @@ -362,12 +381,14 @@ class Connection(object): def close(self): """Close/release this connection""" self.cancel_consumer_thread() + self.wait_on_proxy_callbacks() self.connection.close() self.connection = None def reset(self): """Reset a connection so it can be used again""" self.cancel_consumer_thread() + self.wait_on_proxy_callbacks() self.session.close() self.session = self.connection.session() self.consumers = {} @@ -392,7 +413,7 @@ class Connection(object): """Return an iterator that will consume from all queues/consumers""" def _error_callback(exc): - if isinstance(exc, qpid.messaging.exceptions.Empty): + if isinstance(exc, qpid_exceptions.Empty): LOG.exception(_('Timed out waiting for RPC response: %s') % str(exc)) raise rpc_common.Timeout() @@ -422,6 +443,11 @@ class Connection(object): pass self.consumer_thread = None + def wait_on_proxy_callbacks(self): + """Wait for all proxy callback threads to exit.""" + for proxy_cb in self.proxy_callbacks: + proxy_cb.wait() + def publisher_send(self, cls, topic, msg): """Send to a publisher based on the publisher class""" @@ -497,6 +523,7 @@ class Connection(object): proxy_cb = rpc_amqp.ProxyCallback( self.conf, proxy, rpc_amqp.get_connection_pool(self.conf, Connection)) + self.proxy_callbacks.append(proxy_cb) if fanout: consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb) @@ -512,6 +539,7 @@ class Connection(object): proxy_cb = rpc_amqp.ProxyCallback( self.conf, proxy, rpc_amqp.get_connection_pool(self.conf, Connection)) + self.proxy_callbacks.append(proxy_cb) consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb, name=pool_name) @@ -570,10 +598,11 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg): rpc_amqp.get_connection_pool(conf, Connection)) -def notify(conf, context, topic, msg): +def notify(conf, context, topic, msg, envelope): """Sends a notification event on a topic.""" return rpc_amqp.notify(conf, context, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + rpc_amqp.get_connection_pool(conf, Connection), + envelope) def cleanup(): diff --git a/ceilometer/openstack/common/rpc/impl_zmq.py b/ceilometer/openstack/common/rpc/impl_zmq.py index 2db31faa..1c22ec8d 100644 --- a/ceilometer/openstack/common/rpc/impl_zmq.py +++ b/ceilometer/openstack/common/rpc/impl_zmq.py @@ -14,6 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. +import os import pprint import socket import string @@ -22,15 +23,16 @@ import types import uuid import eventlet -from eventlet.green import zmq import greenlet from ceilometer.openstack.common import cfg from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import importutils from ceilometer.openstack.common import jsonutils +from ceilometer.openstack.common import processutils as utils from ceilometer.openstack.common.rpc import common as rpc_common +zmq = importutils.try_import('eventlet.green.zmq') # for convenience, are not modified. pformat = pprint.pformat @@ -61,6 +63,10 @@ zmq_opts = [ cfg.IntOpt('rpc_zmq_contexts', default=1, help='Number of ZeroMQ contexts, defaults to 1'), + cfg.IntOpt('rpc_zmq_topic_backlog', default=None, + help='Maximum number of ingress messages to locally buffer ' + 'per topic. Default is unlimited.'), + cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack', help='Directory for holding IPC sockets'), @@ -70,9 +76,9 @@ zmq_opts = [ ] -# These globals are defined in register_opts(conf), -# a mandatory initialization call -CONF = None +CONF = cfg.CONF +CONF.register_opts(zmq_opts) + ZMQ_CTX = None # ZeroMQ Context, must be global. matchmaker = None # memoized matchmaker object @@ -107,7 +113,7 @@ class ZmqSocket(object): """ def __init__(self, addr, zmq_type, bind=True, subscribe=None): - self.sock = ZMQ_CTX.socket(zmq_type) + self.sock = _get_ctxt().socket(zmq_type) self.addr = addr self.type = zmq_type self.subscriptions = [] @@ -181,11 +187,15 @@ class ZmqSocket(object): pass self.subscriptions = [] - # Linger -1 prevents lost/dropped messages try: - self.sock.close(linger=-1) + # Default is to linger + self.sock.close() except Exception: - pass + # While this is a bad thing to happen, + # it would be much worse if some of the code calling this + # were to fail. For now, lets log, and later evaluate + # if we can safely raise here. + LOG.error("ZeroMQ socket could not be closed.") self.sock = None def recv(self): @@ -202,10 +212,14 @@ class ZmqSocket(object): class ZmqClient(object): """Client for ZMQ sockets.""" - def __init__(self, addr, socket_type=zmq.PUSH, bind=False): + def __init__(self, addr, socket_type=None, bind=False): + if socket_type is None: + socket_type = zmq.PUSH self.outq = ZmqSocket(addr, socket_type, bind=bind) - def cast(self, msg_id, topic, data): + def cast(self, msg_id, topic, data, serialize=True, force_envelope=False): + if serialize: + data = rpc_common.serialize_msg(data, force_envelope) self.outq.send([str(msg_id), str(topic), str('cast'), _serialize(data)]) @@ -250,7 +264,7 @@ class InternalContext(object): """Process a curried message and cast the result to topic.""" LOG.debug(_("Running func with context: %s"), ctx.to_dict()) data.setdefault('version', None) - data.setdefault('args', []) + data.setdefault('args', {}) try: result = proxy.dispatch( @@ -259,7 +273,14 @@ class InternalContext(object): except greenlet.GreenletExit: # ignore these since they are just from shutdowns pass + except rpc_common.ClientException, e: + LOG.debug(_("Expected exception during message handling (%s)") % + e._exc_info[1]) + return {'exc': + rpc_common.serialize_remote_exception(e._exc_info, + log_failure=False)} except Exception: + LOG.error(_("Exception during message handling")) return {'exc': rpc_common.serialize_remote_exception(sys.exc_info())} @@ -314,7 +335,7 @@ class ConsumerBase(object): return data.setdefault('version', None) - data.setdefault('args', []) + data.setdefault('args', {}) proxy.dispatch(ctx, data['version'], data['method'], **data['args']) @@ -404,12 +425,6 @@ class ZmqProxy(ZmqBaseReactor): super(ZmqProxy, self).__init__(conf) self.topic_proxy = {} - ipc_dir = CONF.rpc_zmq_ipc_dir - - self.topic_proxy['zmq_replies'] = \ - ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ), - zmq.PUB, bind=True) - self.sockets.append(self.topic_proxy['zmq_replies']) def consume(self, sock): ipc_dir = CONF.rpc_zmq_ipc_dir @@ -426,7 +441,7 @@ class ZmqProxy(ZmqBaseReactor): sock_type = zmq.PUB elif topic.startswith('zmq_replies'): sock_type = zmq.PUB - inside = _deserialize(in_msg) + inside = rpc_common.deserialize_msg(_deserialize(in_msg)) msg_id = inside[-1]['args']['msg_id'] response = inside[-1]['args']['response'] LOG.debug(_("->response->%s"), response) @@ -435,20 +450,81 @@ class ZmqProxy(ZmqBaseReactor): sock_type = zmq.PUSH if not topic in self.topic_proxy: - outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic), - sock_type, bind=True) - self.topic_proxy[topic] = outq - self.sockets.append(outq) - LOG.info(_("Created topic proxy: %s"), topic) + def publisher(waiter): + LOG.info(_("Creating proxy for topic: %s"), topic) - # It takes some time for a pub socket to open, - # before we can have any faith in doing a send() to it. - if sock_type == zmq.PUB: - eventlet.sleep(.5) + try: + out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" % + (ipc_dir, topic), + sock_type, bind=True) + except RPCException: + waiter.send_exception(*sys.exc_info()) + return + + self.topic_proxy[topic] = eventlet.queue.LightQueue( + CONF.rpc_zmq_topic_backlog) + self.sockets.append(out_sock) + + # It takes some time for a pub socket to open, + # before we can have any faith in doing a send() to it. + if sock_type == zmq.PUB: + eventlet.sleep(.5) + + waiter.send(True) + + while(True): + data = self.topic_proxy[topic].get() + out_sock.send(data) + LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % + {'data': data}) + + wait_sock_creation = eventlet.event.Event() + eventlet.spawn(publisher, wait_sock_creation) + + try: + wait_sock_creation.wait() + except RPCException: + LOG.error(_("Topic socket file creation failed.")) + return - LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data}) - self.topic_proxy[topic].send(data) - LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data}) + try: + self.topic_proxy[topic].put_nowait(data) + LOG.debug(_("ROUTER RELAY-OUT QUEUED %(data)s") % + {'data': data}) + except eventlet.queue.Full: + LOG.error(_("Local per-topic backlog buffer full for topic " + "%(topic)s. Dropping message.") % {'topic': topic}) + + def consume_in_thread(self): + """Runs the ZmqProxy service""" + ipc_dir = CONF.rpc_zmq_ipc_dir + consume_in = "tcp://%s:%s" % \ + (CONF.rpc_zmq_bind_address, + CONF.rpc_zmq_port) + consumption_proxy = InternalContext(None) + + if not os.path.isdir(ipc_dir): + try: + utils.execute('mkdir', '-p', ipc_dir, run_as_root=True) + utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()), + ipc_dir, run_as_root=True) + utils.execute('chmod', '750', ipc_dir, run_as_root=True) + except utils.ProcessExecutionError: + LOG.error(_("Could not create IPC directory %s") % + (ipc_dir, )) + raise + + try: + self.register(consumption_proxy, + consume_in, + zmq.PULL, + out_bind=True) + except zmq.ZMQError: + LOG.error(_("Could not create ZeroMQ receiver daemon. " + "Socket may already be in use.")) + raise + + super(ZmqProxy, self).consume_in_thread() class ZmqReactor(ZmqBaseReactor): @@ -473,7 +549,7 @@ class ZmqReactor(ZmqBaseReactor): msg_id, topic, style, in_msg = data - ctx, request = _deserialize(in_msg) + ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg)) ctx = RpcContext.unmarshal(ctx) proxy = self.proxies[sock] @@ -524,7 +600,8 @@ class Connection(rpc_common.Connection): self.reactor.consume_in_thread() -def _cast(addr, context, msg_id, topic, msg, timeout=None): +def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True, + force_envelope=False): timeout_cast = timeout or CONF.rpc_cast_timeout payload = [RpcContext.marshal(context), msg] @@ -533,7 +610,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None): conn = ZmqClient(addr) # assumes cast can't return an exception - conn.cast(msg_id, topic, payload) + conn.cast(msg_id, topic, payload, serialize, force_envelope) except zmq.ZMQError: raise RPCException("Cast failed. ZMQ Socket Exception") finally: @@ -541,7 +618,8 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None): conn.close() -def _call(addr, context, msg_id, topic, msg, timeout=None): +def _call(addr, context, msg_id, topic, msg, timeout=None, + serialize=True, force_envelope=False): # timeout_response is how long we wait for a response timeout = timeout or CONF.rpc_response_timeout @@ -576,7 +654,8 @@ def _call(addr, context, msg_id, topic, msg, timeout=None): ) LOG.debug(_("Sending cast")) - _cast(addr, context, msg_id, topic, payload) + _cast(addr, context, msg_id, topic, payload, + serialize=serialize, force_envelope=force_envelope) LOG.debug(_("Cast sent; Waiting reply")) # Blocks until receives reply @@ -602,7 +681,8 @@ def _call(addr, context, msg_id, topic, msg, timeout=None): return responses[-1] -def _multi_send(method, context, topic, msg, timeout=None): +def _multi_send(method, context, topic, msg, timeout=None, serialize=True, + force_envelope=False): """ Wraps the sending of messages, dispatches to the matchmaker and sends @@ -611,7 +691,7 @@ def _multi_send(method, context, topic, msg, timeout=None): conf = CONF LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))}) - queues = matchmaker.queues(topic) + queues = _get_matchmaker().queues(topic) LOG.debug(_("Sending message(s) to: %s"), queues) # Don't stack if we have no matchmaker results @@ -628,9 +708,11 @@ def _multi_send(method, context, topic, msg, timeout=None): if method.__name__ == '_cast': eventlet.spawn_n(method, _addr, context, - _topic, _topic, msg, timeout) + _topic, _topic, msg, timeout, serialize, + force_envelope) return - return method(_addr, context, _topic, _topic, msg, timeout) + return method(_addr, context, _topic, _topic, msg, timeout, + serialize, force_envelope) def create_connection(conf, new=True): @@ -669,38 +751,37 @@ def notify(conf, context, topic, msg, **kwargs): # NOTE(ewindisch): dot-priority in rpc notifier does not # work with our assumptions. topic.replace('.', '-') + kwargs['serialize'] = kwargs.pop('envelope') + kwargs['force_envelope'] = True cast(conf, context, topic, msg, **kwargs) def cleanup(): """Clean up resources in use by implementation.""" global ZMQ_CTX + if ZMQ_CTX: + ZMQ_CTX.term() + ZMQ_CTX = None + global matchmaker matchmaker = None - ZMQ_CTX.term() - ZMQ_CTX = None -def register_opts(conf): - """Registration of options for this driver.""" - #NOTE(ewindisch): ZMQ_CTX and matchmaker - # are initialized here as this is as good - # an initialization method as any. +def _get_ctxt(): + if not zmq: + raise ImportError("Failed to import eventlet.green.zmq") - # We memoize through these globals global ZMQ_CTX - global matchmaker - global CONF - - if not CONF: - conf.register_opts(zmq_opts) - CONF = conf - # Don't re-set, if this method is called twice. if not ZMQ_CTX: - ZMQ_CTX = zmq.Context(conf.rpc_zmq_contexts) + ZMQ_CTX = zmq.Context(CONF.rpc_zmq_contexts) + return ZMQ_CTX + + +def _get_matchmaker(): + global matchmaker if not matchmaker: # rpc_zmq_matchmaker should be set to a 'module.Class' - mm_path = conf.rpc_zmq_matchmaker.split('.') + mm_path = CONF.rpc_zmq_matchmaker.split('.') mm_module = '.'.join(mm_path[:-1]) mm_class = mm_path[-1] @@ -713,6 +794,4 @@ def register_opts(conf): mm_impl = importutils.import_module(mm_module) mm_constructor = getattr(mm_impl, mm_class) matchmaker = mm_constructor() - - -register_opts(cfg.CONF) + return matchmaker diff --git a/ceilometer/openstack/common/rpc/matchmaker.py b/ceilometer/openstack/common/rpc/matchmaker.py index bf4f85d4..87679e8e 100644 --- a/ceilometer/openstack/common/rpc/matchmaker.py +++ b/ceilometer/openstack/common/rpc/matchmaker.py @@ -21,10 +21,10 @@ return keys for direct exchanges, per (approximate) AMQP parlance. import contextlib import itertools import json -import logging from ceilometer.openstack.common import cfg from ceilometer.openstack.common.gettextutils import _ +from ceilometer.openstack.common import log as logging matchmaker_opts = [ diff --git a/ceilometer/openstack/common/service.py b/ceilometer/openstack/common/service.py index 0f22d4a8..57e5bc00 100644 --- a/ceilometer/openstack/common/service.py +++ b/ceilometer/openstack/common/service.py @@ -27,20 +27,17 @@ import sys import time import eventlet -import greenlet import logging as std_logging from ceilometer.openstack.common import cfg from ceilometer.openstack.common import eventlet_backdoor from ceilometer.openstack.common.gettextutils import _ +from ceilometer.openstack.common import importutils from ceilometer.openstack.common import log as logging from ceilometer.openstack.common import threadgroup -try: - from ceilometer.openstack.common import rpc -except ImportError: - rpc = None +rpc = importutils.try_import('ceilometer.openstack.common.rpc') CONF = cfg.CONF LOG = logging.getLogger(__name__) @@ -54,7 +51,7 @@ class Launcher(object): :returns: None """ - self._services = [] + self._services = threadgroup.ThreadGroup() eventlet_backdoor.initialize_if_enabled() @staticmethod @@ -75,8 +72,7 @@ class Launcher(object): :returns: None """ - gt = eventlet.spawn(self.run_service, service) - self._services.append(gt) + self._services.add_thread(self.run_service, service) def stop(self): """Stop all services which are currently running. @@ -84,8 +80,7 @@ class Launcher(object): :returns: None """ - for service in self._services: - service.kill() + self._services.stop() def wait(self): """Waits until all services have been stopped, and then returns. @@ -93,11 +88,7 @@ class Launcher(object): :returns: None """ - for service in self._services: - try: - service.wait() - except greenlet.GreenletExit: - pass + self._services.wait() class SignalExit(SystemExit): @@ -132,9 +123,9 @@ class ServiceLauncher(Launcher): except SystemExit as exc: status = exc.code finally: - self.stop() if rpc: rpc.cleanup() + self.stop() return status @@ -252,7 +243,10 @@ class ProcessLauncher(object): def _wait_child(self): try: - pid, status = os.wait() + # Don't block if no child processes have exited + pid, status = os.waitpid(0, os.WNOHANG) + if not pid: + return None except OSError as exc: if exc.errno not in (errno.EINTR, errno.ECHILD): raise @@ -260,10 +254,12 @@ class ProcessLauncher(object): if os.WIFSIGNALED(status): sig = os.WTERMSIG(status) - LOG.info(_('Child %(pid)d killed by signal %(sig)d'), locals()) + LOG.info(_('Child %(pid)d killed by signal %(sig)d'), + dict(pid=pid, sig=sig)) else: code = os.WEXITSTATUS(status) - LOG.info(_('Child %(pid)d exited with status %(code)d'), locals()) + LOG.info(_('Child %(pid)s exited with status %(code)d'), + dict(pid=pid, code=code)) if pid not in self.children: LOG.warning(_('pid %d not in child list'), pid) @@ -282,6 +278,10 @@ class ProcessLauncher(object): while self.running: wrap = self._wait_child() if not wrap: + # Yield to other threads if no children have exited + # Sleep for a short time to avoid excessive CPU usage + # (see bug #1095346) + eventlet.greenthread.sleep(.01) continue while self.running and len(wrap.children) < wrap.workers: @@ -309,8 +309,8 @@ class ProcessLauncher(object): class Service(object): """Service object for binaries running on hosts.""" - def __init__(self): - self.tg = threadgroup.ThreadGroup('service') + def __init__(self, threads=1000): + self.tg = threadgroup.ThreadGroup(threads) def start(self): pass diff --git a/ceilometer/openstack/common/setup.py b/ceilometer/openstack/common/setup.py index e6f72f03..81a3d20e 100644 --- a/ceilometer/openstack/common/setup.py +++ b/ceilometer/openstack/common/setup.py @@ -1,6 +1,7 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright 2011 OpenStack LLC. +# Copyright 2012-2013 Hewlett-Packard Development Company, L.P. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -19,7 +20,7 @@ Utilities with minimum-depends for use in setup.py """ -import datetime +import email import os import re import subprocess @@ -33,11 +34,12 @@ def parse_mailmap(mailmap='.mailmap'): if os.path.exists(mailmap): with open(mailmap, 'r') as fp: for l in fp: - l = l.strip() - if not l.startswith('#') and ' ' in l: - canonical_email, alias = [x for x in l.split(' ') - if x.startswith('<')] - mapping[alias] = canonical_email + try: + canonical_email, alias = re.match( + r'[^#]*?(<.+>).*(<.+>).*', l).groups() + except AttributeError: + continue + mapping[alias] = canonical_email return mapping @@ -45,8 +47,8 @@ def canonicalize_emails(changelog, mapping): """Takes in a string and an email alias mapping and replaces all instances of the aliases in the string with their real email. """ - for alias, email in mapping.iteritems(): - changelog = changelog.replace(alias, email) + for alias, email_address in mapping.iteritems(): + changelog = changelog.replace(alias, email_address) return changelog @@ -106,23 +108,17 @@ def parse_dependency_links(requirements_files=['requirements.txt', return dependency_links -def write_requirements(): - venv = os.environ.get('VIRTUAL_ENV', None) - if venv is not None: - with open("requirements.txt", "w") as req_file: - output = subprocess.Popen(["pip", "-E", venv, "freeze", "-l"], - stdout=subprocess.PIPE) - requirements = output.communicate()[0].strip() - req_file.write(requirements) - - -def _run_shell_command(cmd): +def _run_shell_command(cmd, throw_on_error=False): if os.name == 'nt': output = subprocess.Popen(["cmd.exe", "/C", cmd], - stdout=subprocess.PIPE) + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) else: output = subprocess.Popen(["/bin/sh", "-c", cmd], - stdout=subprocess.PIPE) + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + if output.returncode and throw_on_error: + raise Exception("%s returned %d" % cmd, output.returncode) out = output.communicate() if len(out) == 0: return None @@ -131,57 +127,6 @@ def _run_shell_command(cmd): return out[0].strip() -def _get_git_next_version_suffix(branch_name): - datestamp = datetime.datetime.now().strftime('%Y%m%d') - if branch_name == 'milestone-proposed': - revno_prefix = "r" - else: - revno_prefix = "" - _run_shell_command("git fetch origin +refs/meta/*:refs/remotes/meta/*") - milestone_cmd = "git show meta/openstack/release:%s" % branch_name - milestonever = _run_shell_command(milestone_cmd) - if milestonever: - first_half = "%s~%s" % (milestonever, datestamp) - else: - first_half = datestamp - - post_version = _get_git_post_version() - # post version should look like: - # 0.1.1.4.gcc9e28a - # where the bit after the last . is the short sha, and the bit between - # the last and second to last is the revno count - (revno, sha) = post_version.split(".")[-2:] - second_half = "%s%s.%s" % (revno_prefix, revno, sha) - return ".".join((first_half, second_half)) - - -def _get_git_current_tag(): - return _run_shell_command("git tag --contains HEAD") - - -def _get_git_tag_info(): - return _run_shell_command("git describe --tags") - - -def _get_git_post_version(): - current_tag = _get_git_current_tag() - if current_tag is not None: - return current_tag - else: - tag_info = _get_git_tag_info() - if tag_info is None: - base_version = "0.0" - cmd = "git --no-pager log --oneline" - out = _run_shell_command(cmd) - revno = len(out.split("\n")) - sha = _run_shell_command("git describe --always") - else: - tag_infos = tag_info.split("-") - base_version = "-".join(tag_infos[:-2]) - (revno, sha) = tag_infos[-2:] - return "%s.%s.%s" % (base_version, revno, sha) - - def write_git_changelog(): """Write a changelog based on the git changelog.""" new_changelog = 'ChangeLog' @@ -227,26 +172,6 @@ _rst_template = """%(heading)s """ -def read_versioninfo(project): - """Read the versioninfo file. If it doesn't exist, we're in a github - zipball, and there's really no way to know what version we really - are, but that should be ok, because the utility of that should be - just about nil if this code path is in use in the first place.""" - versioninfo_path = os.path.join(project, 'versioninfo') - if os.path.exists(versioninfo_path): - with open(versioninfo_path, 'r') as vinfo: - version = vinfo.read().strip() - else: - version = "0.0.0" - return version - - -def write_versioninfo(project, version): - """Write a simple file containing the version of the package.""" - with open(os.path.join(project, 'versioninfo'), 'w') as fil: - fil.write("%s\n" % version) - - def get_cmdclass(): """Return dict of commands to run from setup.py.""" @@ -276,6 +201,9 @@ def get_cmdclass(): from sphinx.setup_command import BuildDoc class LocalBuildDoc(BuildDoc): + + builders = ['html', 'man'] + def generate_autoindex(self): print "**Autodocumenting from %s" % os.path.abspath(os.curdir) modules = {} @@ -311,56 +239,83 @@ def get_cmdclass(): if not os.getenv('SPHINX_DEBUG'): self.generate_autoindex() - for builder in ['html', 'man']: + for builder in self.builders: self.builder = builder self.finalize_options() self.project = self.distribution.get_name() self.version = self.distribution.get_version() self.release = self.distribution.get_version() BuildDoc.run(self) + + class LocalBuildLatex(LocalBuildDoc): + builders = ['latex'] + cmdclass['build_sphinx'] = LocalBuildDoc + cmdclass['build_sphinx_latex'] = LocalBuildLatex except ImportError: pass return cmdclass -def get_git_branchname(): - for branch in _run_shell_command("git branch --color=never").split("\n"): - if branch.startswith('*'): - _branch_name = branch.split()[1].strip() - if _branch_name == "(no": - _branch_name = "no-branch" - return _branch_name - +def get_version_from_git(pre_version): + """Return a version which is equal to the tag that's on the current + revision if there is one, or tag plus number of additional revisions + if the current revision has no tag.""" -def get_pre_version(projectname, base_version): - """Return a version which is leading up to a version that will - be released in the future.""" if os.path.isdir('.git'): - current_tag = _get_git_current_tag() - if current_tag is not None: - version = current_tag + if pre_version: + try: + return _run_shell_command( + "git describe --exact-match", + throw_on_error=True).replace('-', '.') + except Exception: + sha = _run_shell_command("git log -n1 --pretty=format:%h") + describe = _run_shell_command("git describe --always") + revno = describe.rsplit("-", 2)[-2] + return "%s.a%s.g%s" % (pre_version, revno, sha) else: - branch_name = os.getenv('BRANCHNAME', - os.getenv('GERRIT_REFNAME', - get_git_branchname())) - version_suffix = _get_git_next_version_suffix(branch_name) - version = "%s~%s" % (base_version, version_suffix) - write_versioninfo(projectname, version) - return version - else: - version = read_versioninfo(projectname) - return version + return _run_shell_command( + "git describe --always").replace('-', '.') + return None -def get_post_version(projectname): - """Return a version which is equal to the tag that's on the current - revision if there is one, or tag plus number of additional revisions - if the current revision has no tag.""" +def get_version_from_pkg_info(package_name): + """Get the version from PKG-INFO file if we can.""" + try: + pkg_info_file = open('PKG-INFO', 'r') + except (IOError, OSError): + return None + try: + pkg_info = email.message_from_file(pkg_info_file) + except email.MessageError: + return None + # Check to make sure we're in our own dir + if pkg_info.get('Name', None) != package_name: + return None + return pkg_info.get('Version', None) - if os.path.isdir('.git'): - version = _get_git_post_version() - write_versioninfo(projectname, version) + +def get_version(package_name, pre_version=None): + """Get the version of the project. First, try getting it from PKG-INFO, if + it exists. If it does, that means we're in a distribution tarball or that + install has happened. Otherwise, if there is no PKG-INFO file, pull the + version from git. + + We do not support setup.py version sanity in git archive tarballs, nor do + we support packagers directly sucking our git repo into theirs. We expect + that a source tarball be made from our git repo - or that if someone wants + to make a source tarball from a fork of our repo with additional tags in it + that they understand and desire the results of doing that. + """ + version = os.environ.get("OSLO_PACKAGE_VERSION", None) + if version: + return version + version = get_version_from_pkg_info(package_name) + if version: + return version + version = get_version_from_git(pre_version) + if version: return version - return read_versioninfo(projectname) + raise Exception("Versioning for this project requires either an sdist" + " tarball, or access to an upstream git repository.") diff --git a/ceilometer/openstack/common/threadgroup.py b/ceilometer/openstack/common/threadgroup.py index ae7bb0a8..801e852e 100644 --- a/ceilometer/openstack/common/threadgroup.py +++ b/ceilometer/openstack/common/threadgroup.py @@ -18,7 +18,6 @@ from eventlet import greenlet from eventlet import greenpool from eventlet import greenthread -from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import log as logging from ceilometer.openstack.common import loopingcall @@ -27,22 +26,19 @@ LOG = logging.getLogger(__name__) def _thread_done(gt, *args, **kwargs): - ''' - Callback function to be passed to GreenThread.link() when we spawn() - Calls the ThreadGroup to notify if. - ''' + """ Callback function to be passed to GreenThread.link() when we spawn() + Calls the :class:`ThreadGroup` to notify if. + + """ kwargs['group'].thread_done(kwargs['thread']) class Thread(object): + """ Wrapper around a greenthread, that holds a reference to the + :class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when + it has done so it can be removed from the threads list. """ - Wrapper around a greenthread, that holds a reference to - the ThreadGroup. The Thread will notify the ThreadGroup - when it has done so it can be removed from the threads - list. - """ - def __init__(self, name, thread, group): - self.name = name + def __init__(self, thread, group): self.thread = thread self.thread.link(_thread_done, group=group, thread=self) @@ -54,14 +50,13 @@ class Thread(object): class ThreadGroup(object): - """ - The point of this class is to: - - keep track of timers and greenthreads (making it easier to stop them + """ The point of the ThreadGroup classis to: + + * keep track of timers and greenthreads (making it easier to stop them when need be). - - provide an easy API to add timers. + * provide an easy API to add timers. """ - def __init__(self, name, thread_pool_size=10): - self.name = name + def __init__(self, thread_pool_size=10): self.pool = greenpool.GreenPool(thread_pool_size) self.threads = [] self.timers = [] @@ -75,7 +70,7 @@ class ThreadGroup(object): def add_thread(self, callback, *args, **kwargs): gt = self.pool.spawn(callback, *args, **kwargs) - th = Thread(callback.__name__, gt, self) + th = Thread(gt, self) self.threads.append(th) def thread_done(self, thread): diff --git a/ceilometer/openstack/common/timeutils.py b/ceilometer/openstack/common/timeutils.py index 86004391..0f346087 100644 --- a/ceilometer/openstack/common/timeutils.py +++ b/ceilometer/openstack/common/timeutils.py @@ -71,11 +71,15 @@ def normalize_time(timestamp): def is_older_than(before, seconds): """Return True if before is older than seconds.""" + if isinstance(before, basestring): + before = parse_strtime(before).replace(tzinfo=None) return utcnow() - before > datetime.timedelta(seconds=seconds) def is_newer_than(after, seconds): """Return True if after is newer than seconds.""" + if isinstance(after, basestring): + after = parse_strtime(after).replace(tzinfo=None) return after - utcnow() > datetime.timedelta(seconds=seconds) @@ -87,7 +91,10 @@ def utcnow_ts(): def utcnow(): """Overridable version of utils.utcnow.""" if utcnow.override_time: - return utcnow.override_time + try: + return utcnow.override_time.pop(0) + except AttributeError: + return utcnow.override_time return datetime.datetime.utcnow() @@ -95,14 +102,21 @@ utcnow.override_time = None def set_time_override(override_time=datetime.datetime.utcnow()): - """Override utils.utcnow to return a constant time.""" + """ + Override utils.utcnow to return a constant time or a list thereof, + one at a time. + """ utcnow.override_time = override_time def advance_time_delta(timedelta): """Advance overridden time using a datetime.timedelta.""" assert(not utcnow.override_time is None) - utcnow.override_time += timedelta + try: + for dt in utcnow.override_time: + dt += timedelta + except TypeError: + utcnow.override_time += timedelta def advance_time_seconds(seconds): @@ -135,3 +149,16 @@ def unmarshall_time(tyme): minute=tyme['minute'], second=tyme['second'], microsecond=tyme['microsecond']) + + +def delta_seconds(before, after): + """ + Compute the difference in seconds between two date, time, or + datetime objects (as a float, to microsecond resolution). + """ + delta = after - before + try: + return delta.total_seconds() + except AttributeError: + return ((delta.days * 24 * 3600) + delta.seconds + + float(delta.microseconds) / (10 ** 6)) diff --git a/ceilometer/openstack/common/version.py b/ceilometer/openstack/common/version.py index a19e4226..3653ad0d 100644 --- a/ceilometer/openstack/common/version.py +++ b/ceilometer/openstack/common/version.py @@ -1,6 +1,6 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright 2012 OpenStack LLC +# Copyright 2012-2013 Hewlett-Packard Development Company, L.P. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -15,134 +15,65 @@ # under the License. """ -Utilities for consuming the auto-generated versioninfo files. +Utilities for consuming the version from pkg_resources. """ -import datetime import pkg_resources -import setup - - -class _deferred_version_string(object): - """Internal helper class which provides delayed version calculation.""" - def __init__(self, version_info, prefix): - self.version_info = version_info - self.prefix = prefix - - def __str__(self): - return "%s%s" % (self.prefix, self.version_info.version_string()) - - def __repr__(self): - return "%s%s" % (self.prefix, self.version_info.version_string()) - class VersionInfo(object): - def __init__(self, package, python_package=None, pre_version=None): + def __init__(self, package): """Object that understands versioning for a package - :param package: name of the top level python namespace. For glance, - this would be "glance" for python-glanceclient, it - would be "glanceclient" - :param python_package: optional name of the project name. For - glance this can be left unset. For - python-glanceclient, this would be - "python-glanceclient" - :param pre_version: optional version that the project is working to + :param package: name of the python package, such as glance, or + python-glanceclient """ self.package = package - if python_package is None: - self.python_package = package - else: - self.python_package = python_package - self.pre_version = pre_version + self.release = None self.version = None + self._cached_version = None - def _generate_version(self): - """Defer to the openstack.common.setup routines for making a - version from git.""" - if self.pre_version is None: - return setup.get_post_version(self.python_package) - else: - return setup.get_pre_version(self.python_package, self.pre_version) + def _get_version_from_pkg_resources(self): + """Get the version of the package from the pkg_resources record + associated with the package.""" + requirement = pkg_resources.Requirement.parse(self.package) + provider = pkg_resources.get_provider(requirement) + return provider.version - def _newer_version(self, pending_version): - """Check to see if we're working with a stale version or not. - We expect a version string that either looks like: - 2012.2~f3~20120708.10.4426392 - which is an unreleased version of a pre-version, or: - 0.1.1.4.gcc9e28a - which is an unreleased version of a post-version, or: - 0.1.1 - Which is a release and which should match tag. - For now, if we have a date-embedded version, check to see if it's - old, and if so re-generate. Otherwise, just deal with it. - """ - try: - version_date = int(self.version.split("~")[-1].split('.')[0]) - if version_date < int(datetime.date.today().strftime('%Y%m%d')): - return self._generate_version() - else: - return pending_version - except Exception: - return pending_version - - def version_string_with_vcs(self, always=False): + def release_string(self): """Return the full version of the package including suffixes indicating VCS status. - - For instance, if we are working towards the 2012.2 release, - canonical_version_string should return 2012.2 if this is a final - release, or else something like 2012.2~f1~20120705.20 if it's not. - - :param always: if true, skip all version caching """ - if always: - self.version = self._generate_version() + if self.release is None: + self.release = self._get_version_from_pkg_resources() - if self.version is None: + return self.release - requirement = pkg_resources.Requirement.parse(self.python_package) - versioninfo = "%s/versioninfo" % self.package - try: - raw_version = pkg_resources.resource_string(requirement, - versioninfo) - self.version = self._newer_version(raw_version.strip()) - except (IOError, pkg_resources.DistributionNotFound): - self.version = self._generate_version() + def version_string(self): + """Return the short version minus any alpha/beta tags.""" + if self.version is None: + parts = [] + for part in self.release_string().split('.'): + if part[0].isdigit(): + parts.append(part) + else: + break + self.version = ".".join(parts) return self.version - def canonical_version_string(self, always=False): - """Return the simple version of the package excluding any suffixes. - - For instance, if we are working towards the 2012.2 release, - canonical_version_string should return 2012.2 in all cases. - - :param always: if true, skip all version caching - """ - return self.version_string_with_vcs(always).split('~')[0] - - def version_string(self, always=False): - """Return the base version of the package. - - For instance, if we are working towards the 2012.2 release, - version_string should return 2012.2 if this is a final release, or - 2012.2-dev if it is not. - - :param always: if true, skip all version caching - """ - version_parts = self.version_string_with_vcs(always).split('~') - if len(version_parts) == 1: - return version_parts[0] - else: - return '%s-dev' % (version_parts[0],) + # Compatibility functions + canonical_version_string = version_string + version_string_with_vcs = release_string - def deferred_version_string(self, prefix=""): + def cached_version_string(self, prefix=""): """Generate an object which will expand in a string context to the results of version_string(). We do this so that don't call into pkg_resources every time we start up a program when passing version information into the CONF constructor, but rather only do the calculation when and if a version is requested """ - return _deferred_version_string(self, prefix) + if not self._cached_version: + self._cached_version = "%s%s" % (prefix, + self.version_string()) + return self._cached_version diff --git a/ceilometer/policy.py b/ceilometer/policy.py index e468be25..e2c32cdf 100644 --- a/ceilometer/policy.py +++ b/ceilometer/policy.py @@ -49,26 +49,18 @@ def init(): if not _POLICY_PATH: raise cfg.ConfigFilesNotFoundError([cfg.CONF.policy_file]) utils.read_cached_file(_POLICY_PATH, _POLICY_CACHE, - reload_func=_set_brain) + reload_func=_set_rules) -def _set_brain(data): +def _set_rules(data): default_rule = cfg.CONF.policy_default_rule - policy.set_brain(policy.Brain.load_json(data, default_rule)) + policy.set_rules(policy.Rules.load_json(data, default_rule)) -def check_is_admin(roles, project_id, project_name): +def check_is_admin(roles): """Whether or not roles contains 'admin' role according to policy setting. """ init() - match_list = ('rule:context_is_admin',) - target = {} - credentials = { - 'roles': roles, - 'project_id': project_id, - 'project_name': project_name, - } - - return policy.enforce(match_list, target, credentials) + return policy.check('context_is_admin', {}, {'roles': roles}) diff --git a/ceilometer/publish.py b/ceilometer/publish.py index 24aab9a6..2d52fa28 100644 --- a/ceilometer/publish.py +++ b/ceilometer/publish.py @@ -30,9 +30,6 @@ PUBLISH_OPTS = [ default='metering', help='the topic ceilometer uses for metering messages', ), - cfg.StrOpt('control_exchange', - default='ceilometer', - help='AMQP exchange to connect to if using RabbitMQ or Qpid'), ] diff --git a/ceilometer/service.py b/ceilometer/service.py index 609faecd..d2975171 100644 --- a/ceilometer/service.py +++ b/ceilometer/service.py @@ -21,6 +21,7 @@ import os import socket from ceilometer.openstack.common import cfg +from ceilometer.openstack.common import rpc from ceilometer.openstack.common import context from ceilometer.openstack.common import log from ceilometer.openstack.common.rpc import service as rpc_service @@ -77,5 +78,6 @@ def _sanitize_cmd_line(argv): def prepare_service(argv=[]): + rpc.set_defaults(control_exchange='ceilometer') cfg.CONF(argv[1:], project='ceilometer') log.setup('ceilometer') diff --git a/ceilometer/version.py b/ceilometer/version.py index df88c050..40dc5d9b 100644 --- a/ceilometer/version.py +++ b/ceilometer/version.py @@ -22,5 +22,4 @@ from ceilometer.openstack.common import version as common_version NEXT_VERSION = '2013.1' -version_info = common_version.VersionInfo('ceilometer', - pre_version=NEXT_VERSION) +version_info = common_version.VersionInfo('ceilometer') @@ -22,13 +22,12 @@ import os import setuptools from ceilometer.openstack.common import setup as common_setup -from ceilometer.version import version_info +from ceilometer.version import NEXT_VERSION requires = common_setup.parse_requirements(['tools/pip-requires']) depend_links = common_setup.parse_dependency_links(['tools/pip-requires']) url_base = 'http://tarballs.openstack.org/ceilometer/ceilometer-%s.tar.gz' -version_string = version_info.canonical_version_string(always=True) def directories(target_dir): @@ -39,7 +38,7 @@ def directories(target_dir): setuptools.setup( name='ceilometer', - version=version_string, + version=NEXT_VERSION, description='cloud computing metering', @@ -47,7 +46,7 @@ setuptools.setup( author_email='ceilometer@lists.launchpad.net', url='https://launchpad.net/ceilometer', - download_url=url_base % version_string, + download_url=url_base % NEXT_VERSION, classifiers=[ 'Development Status :: 3 - Alpha', diff --git a/tests/api/v2/test_statistics.py b/tests/api/v2/test_statistics.py index fa440d9f..2be17091 100644 --- a/tests/api/v2/test_statistics.py +++ b/tests/api/v2/test_statistics.py @@ -35,17 +35,17 @@ class TestStatisticsDuration(unittest.TestCase): # Create events relative to the range and pretend # that the intervening events exist. - self.early1 = datetime.datetime(2012, 8, 27, 7, 0) + self.early1 = datetime.datetime(2012, 8, 27, 7, 0) self.early2 = datetime.datetime(2012, 8, 27, 17, 0) self.start = datetime.datetime(2012, 8, 28, 0, 0) - self.middle1 = datetime.datetime(2012, 8, 28, 8, 0) + self.middle1 = datetime.datetime(2012, 8, 28, 8, 0) self.middle2 = datetime.datetime(2012, 8, 28, 18, 0) self.end = datetime.datetime(2012, 8, 28, 23, 59) - self.late1 = datetime.datetime(2012, 8, 29, 9, 0) + self.late1 = datetime.datetime(2012, 8, 29, 9, 0) self.late2 = datetime.datetime(2012, 8, 29, 19, 0) def test_nulls(self): diff --git a/tests/test_bin.py b/tests/test_bin.py index 19c60721..35a99c97 100644 --- a/tests/test_bin.py +++ b/tests/test_bin.py @@ -19,10 +19,21 @@ import subprocess import unittest +import tempfile +import os class BinDbsyncTestCase(unittest.TestCase): + def setUp(self): + self.tempfile = tempfile.mktemp() + with open(self.tempfile, 'w') as tmp: + tmp.write("[DEFAULT]\n") + tmp.write("database_connection=log://localhost\n") + def test_dbsync_run(self): subp = subprocess.Popen(["../bin/ceilometer-dbsync", - "--database_connection=log://localhost"]) + "--config-file=%s" % self.tempfile]) self.assertEqual(subp.wait(), 0) + + def tearDown(self): + os.unlink(self.tempfile) diff --git a/tools/pip-requires b/tools/pip-requires index 803bb217..7cb17f47 100644 --- a/tools/pip-requires +++ b/tools/pip-requires @@ -17,3 +17,4 @@ python-keystoneclient>=0.2,<0.3 python-swiftclient lxml requests<1.0 +extras @@ -21,7 +21,7 @@ commands = {toxinidir}/run_tests.sh --no-path-adjustment --with-coverage --cover [testenv:pep8] deps = pep8==1.3.3 -commands = pep8 --repeat --show-source ceilometer setup.py bin/ceilometer-agent-central bin/ceilometer-agent-compute bin/ceilometer-collector bin/ceilometer-api tests +commands = pep8 --repeat --ignore=E125 --show-source ceilometer setup.py bin/ceilometer-agent-central bin/ceilometer-agent-compute bin/ceilometer-collector bin/ceilometer-api tests [testenv:venv] deps = -r{toxinidir}/tools/test-requires |