# copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved. # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr # # This file is part of logilab-common. # # logilab-common is free software: you can redistribute it and/or modify it under # the terms of the GNU Lesser General Public License as published by the Free # Software Foundation, either version 2.1 of the License, or (at your option) any # later version. # # logilab-common is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more # details. # # You should have received a copy of the GNU Lesser General Public License along # with logilab-common. If not, see . """Classes to handle advanced configuration in simple to complex applications. Allows to load the configuration from a file or from command line options, to generate a sample configuration file or to display program's usage. Fills the gap between optik/optparse and ConfigParser by adding data types (which are also available as a standalone optik extension in the `optik_ext` module). Quick start: simplest usage --------------------------- .. python :: >>> import sys >>> from logilab.common.configuration import Configuration >>> options = [('dothis', {'type':'yn', 'default': True, 'metavar': ''}), ... ('value', {'type': 'string', 'metavar': ''}), ... ('multiple', {'type': 'csv', 'default': ('yop',), ... 'metavar': '', ... 'help': 'you can also document the option'}), ... ('number', {'type': 'int', 'default':2, 'metavar':''}), ... ] >>> config = Configuration(options=options, name='My config') >>> print config['dothis'] True >>> print config['value'] None >>> print config['multiple'] ('yop',) >>> print config['number'] 2 >>> print config.help() Usage: [options] Options: -h, --help show this help message and exit --dothis= --value= --multiple= you can also document the option [current: none] --number= >>> f = open('myconfig.ini', 'w') >>> f.write('''[MY CONFIG] ... number = 3 ... dothis = no ... multiple = 1,2,3 ... ''') >>> f.close() >>> config.load_file_configuration('myconfig.ini') >>> print config['dothis'] False >>> print config['value'] None >>> print config['multiple'] ['1', '2', '3'] >>> print config['number'] 3 >>> sys.argv = ['mon prog', '--value', 'bacon', '--multiple', '4,5,6', ... 'nonoptionargument'] >>> print config.load_command_line_configuration() ['nonoptionargument'] >>> print config['value'] bacon >>> config.generate_config() # class for simple configurations which don't need the # manager / providers model and prefer delegation to inheritance # # configuration values are accessible through a dict like interface # [MY CONFIG] dothis=no value=bacon # you can also document the option multiple=4,5,6 number=3 Note : starting with Python 2.7 ConfigParser is able to take into account the order of occurrences of the options into a file (by using an OrderedDict). If you have two options changing some common state, like a 'disable-all-stuff' and a 'enable-some-stuff-a', their order of appearance will be significant : the last specified in the file wins. For earlier version of python and logilab.common newer than 0.61 the behaviour is unspecified. """ from __future__ import print_function __docformat__ = "restructuredtext en" __all__ = ('OptionsManagerMixIn', 'OptionsProviderMixIn', 'ConfigurationMixIn', 'Configuration', 'OptionsManager2ConfigurationAdapter') import os import sys import re from os.path import exists, expanduser from copy import copy from warnings import warn from six import string_types from six.moves import range, configparser as cp, input from logilab.common.compat import str_encode as _encode from logilab.common.deprecation import deprecated from logilab.common.textutils import normalize_text, unquote from logilab.common import optik_ext OptionError = optik_ext.OptionError REQUIRED = [] class UnsupportedAction(Exception): """raised by set_option when it doesn't know what to do for an action""" def _get_encoding(encoding, stream): encoding = encoding or getattr(stream, 'encoding', None) if not encoding: import locale encoding = locale.getpreferredencoding() return encoding # validation functions ######################################################## # validators will return the validated value or raise optparse.OptionValueError # XXX add to documentation def choice_validator(optdict, name, value): """validate and return a converted value for option of type 'choice' """ if not value in optdict['choices']: msg = "option %s: invalid value: %r, should be in %s" raise optik_ext.OptionValueError(msg % (name, value, optdict['choices'])) return value def multiple_choice_validator(optdict, name, value): """validate and return a converted value for option of type 'choice' """ choices = optdict['choices'] values = optik_ext.check_csv(None, name, value) for value in values: if not value in choices: msg = "option %s: invalid value: %r, should be in %s" raise optik_ext.OptionValueError(msg % (name, value, choices)) return values def csv_validator(optdict, name, value): """validate and return a converted value for option of type 'csv' """ return optik_ext.check_csv(None, name, value) def yn_validator(optdict, name, value): """validate and return a converted value for option of type 'yn' """ return optik_ext.check_yn(None, name, value) def named_validator(optdict, name, value): """validate and return a converted value for option of type 'named' """ return optik_ext.check_named(None, name, value) def file_validator(optdict, name, value): """validate and return a filepath for option of type 'file'""" return optik_ext.check_file(None, name, value) def color_validator(optdict, name, value): """validate and return a valid color for option of type 'color'""" return optik_ext.check_color(None, name, value) def password_validator(optdict, name, value): """validate and return a string for option of type 'password'""" return optik_ext.check_password(None, name, value) def date_validator(optdict, name, value): """validate and return a mx DateTime object for option of type 'date'""" return optik_ext.check_date(None, name, value) def time_validator(optdict, name, value): """validate and return a time object for option of type 'time'""" return optik_ext.check_time(None, name, value) def bytes_validator(optdict, name, value): """validate and return an integer for option of type 'bytes'""" return optik_ext.check_bytes(None, name, value) VALIDATORS = {'string': unquote, 'int': int, 'float': float, 'file': file_validator, 'font': unquote, 'color': color_validator, 'regexp': re.compile, 'csv': csv_validator, 'yn': yn_validator, 'bool': yn_validator, 'named': named_validator, 'password': password_validator, 'date': date_validator, 'time': time_validator, 'bytes': bytes_validator, 'choice': choice_validator, 'multiple_choice': multiple_choice_validator, } def _call_validator(opttype, optdict, option, value): if opttype not in VALIDATORS: raise Exception('Unsupported type "%s"' % opttype) try: return VALIDATORS[opttype](optdict, option, value) except TypeError: try: return VALIDATORS[opttype](value) except optik_ext.OptionValueError: raise except: raise optik_ext.OptionValueError('%s value (%r) should be of type %s' % (option, value, opttype)) # user input functions ######################################################## # user input functions will ask the user for input on stdin then validate # the result and return the validated value or raise optparse.OptionValueError # XXX add to documentation def input_password(optdict, question='password:'): from getpass import getpass while True: value = getpass(question) value2 = getpass('confirm: ') if value == value2: return value print('password mismatch, try again') def input_string(optdict, question): value = input(question).strip() return value or None def _make_input_function(opttype): def input_validator(optdict, question): while True: value = input(question) if not value.strip(): return None try: return _call_validator(opttype, optdict, None, value) except optik_ext.OptionValueError as ex: msg = str(ex).split(':', 1)[-1].strip() print('bad value: %s' % msg) return input_validator INPUT_FUNCTIONS = { 'string': input_string, 'password': input_password, } for opttype in VALIDATORS.keys(): INPUT_FUNCTIONS.setdefault(opttype, _make_input_function(opttype)) # utility functions ############################################################ def expand_default(self, option): """monkey patch OptionParser.expand_default since we have a particular way to handle defaults to avoid overriding values in the configuration file """ if self.parser is None or not self.default_tag: return option.help optname = option._long_opts[0][2:] try: provider = self.parser.options_manager._all_options[optname] except KeyError: value = None else: optdict = provider.get_option_def(optname) optname = provider.option_attrname(optname, optdict) value = getattr(provider.config, optname, optdict) value = format_option_value(optdict, value) if value is optik_ext.NO_DEFAULT or not value: value = self.NO_DEFAULT_VALUE return option.help.replace(self.default_tag, str(value)) def _validate(value, optdict, name=''): """return a validated value for an option according to its type optional argument name is only used for error message formatting """ try: _type = optdict['type'] except KeyError: # FIXME return value return _call_validator(_type, optdict, name, value) convert = deprecated('[0.60] convert() was renamed _validate()')(_validate) # format and output functions ################################################## def comment(string): """return string as a comment""" lines = [line.strip() for line in string.splitlines()] return '# ' + ('%s# ' % os.linesep).join(lines) def format_time(value): if not value: return '0' if value != int(value): return '%.2fs' % value value = int(value) nbmin, nbsec = divmod(value, 60) if nbsec: return '%ss' % value nbhour, nbmin_ = divmod(nbmin, 60) if nbmin_: return '%smin' % nbmin nbday, nbhour_ = divmod(nbhour, 24) if nbhour_: return '%sh' % nbhour return '%sd' % nbday def format_bytes(value): if not value: return '0' if value != int(value): return '%.2fB' % value value = int(value) prevunit = 'B' for unit in ('KB', 'MB', 'GB', 'TB'): next, remain = divmod(value, 1024) if remain: return '%s%s' % (value, prevunit) prevunit = unit value = next return '%s%s' % (value, unit) def format_option_value(optdict, value): """return the user input's value from a 'compiled' value""" if isinstance(value, (list, tuple)): value = ','.join(value) elif isinstance(value, dict): value = ','.join(['%s:%s' % (k, v) for k, v in value.items()]) elif hasattr(value, 'match'): # optdict.get('type') == 'regexp' # compiled regexp value = value.pattern elif optdict.get('type') == 'yn': value = value and 'yes' or 'no' elif isinstance(value, string_types) and value.isspace(): value = "'%s'" % value elif optdict.get('type') == 'time' and isinstance(value, (float, int, long)): value = format_time(value) elif optdict.get('type') == 'bytes' and hasattr(value, '__int__'): value = format_bytes(value) return value def ini_format_section(stream, section, options, encoding=None, doc=None): """format an options section using the INI format""" encoding = _get_encoding(encoding, stream) if doc: print(_encode(comment(doc), encoding), file=stream) print('[%s]' % section, file=stream) ini_format(stream, options, encoding) def ini_format(stream, options, encoding): """format options using the INI format""" for optname, optdict, value in options: value = format_option_value(optdict, value) help = optdict.get('help') if help: help = normalize_text(help, line_len=79, indent='# ') print(file=stream) print(_encode(help, encoding), file=stream) else: print(file=stream) if value is None: print('#%s=' % optname, file=stream) else: value = _encode(value, encoding).strip() print('%s=%s' % (optname, value), file=stream) format_section = ini_format_section def rest_format_section(stream, section, options, encoding=None, doc=None): """format an options section using as ReST formatted output""" encoding = _get_encoding(encoding, stream) if section: print >> stream, '%s\n%s' % (section, "'"*len(section)) if doc: print >> stream, _encode(normalize_text(doc, line_len=79, indent=''), encoding) print >> stream for optname, optdict, value in options: help = optdict.get('help') print >> stream, ':%s:' % optname if help: help = normalize_text(help, line_len=79, indent=' ') print >> stream, _encode(help, encoding) if value: value = _encode(format_option_value(optdict, value), encoding) print >> stream, '' print >> stream, ' Default: ``%s``' % value.replace("`` ", "```` ``") # Options Manager ############################################################## class OptionsManagerMixIn(object): """MixIn to handle a configuration from both a configuration file and command line options """ def __init__(self, usage, config_file=None, version=None, quiet=0): self.config_file = config_file self.reset_parsers(usage, version=version) # list of registered options providers self.options_providers = [] # dictionary associating option name to checker self._all_options = {} self._short_options = {} self._nocallback_options = {} self._mygroups = dict() # verbosity self.quiet = quiet self._maxlevel = 0 def reset_parsers(self, usage='', version=None): # configuration file parser self.cfgfile_parser = cp.ConfigParser() # command line parser self.cmdline_parser = optik_ext.OptionParser(usage=usage, version=version) self.cmdline_parser.options_manager = self self._optik_option_attrs = set(self.cmdline_parser.option_class.ATTRS) def register_options_provider(self, provider, own_group=True): """register an options provider""" assert provider.priority <= 0, "provider's priority can't be >= 0" for i in range(len(self.options_providers)): if provider.priority > self.options_providers[i].priority: self.options_providers.insert(i, provider) break else: self.options_providers.append(provider) non_group_spec_options = [option for option in provider.options if 'group' not in option[1]] groups = getattr(provider, 'option_groups', ()) if own_group and non_group_spec_options: self.add_option_group(provider.name.upper(), provider.__doc__, non_group_spec_options, provider) else: for opt, optdict in non_group_spec_options: self.add_optik_option(provider, self.cmdline_parser, opt, optdict) for gname, gdoc in groups: gname = gname.upper() goptions = [option for option in provider.options if option[1].get('group', '').upper() == gname] self.add_option_group(gname, gdoc, goptions, provider) def add_option_group(self, group_name, doc, options, provider): """add an option group including the listed options """ assert options # add option group to the command line parser if group_name in self._mygroups: group = self._mygroups[group_name] else: group = optik_ext.OptionGroup(self.cmdline_parser, title=group_name.capitalize()) self.cmdline_parser.add_option_group(group) group.level = provider.level self._mygroups[group_name] = group # add section to the config file if group_name != "DEFAULT": self.cfgfile_parser.add_section(group_name) # add provider's specific options for opt, optdict in options: self.add_optik_option(provider, group, opt, optdict) def add_optik_option(self, provider, optikcontainer, opt, optdict): if 'inputlevel' in optdict: warn('[0.50] "inputlevel" in option dictionary for %s is deprecated,' ' use "level"' % opt, DeprecationWarning) optdict['level'] = optdict.pop('inputlevel') args, optdict = self.optik_option(provider, opt, optdict) option = optikcontainer.add_option(*args, **optdict) self._all_options[opt] = provider self._maxlevel = max(self._maxlevel, option.level or 0) def optik_option(self, provider, opt, optdict): """get our personal option definition and return a suitable form for use with optik/optparse """ optdict = copy(optdict) others = {} if 'action' in optdict: self._nocallback_options[provider] = opt else: optdict['action'] = 'callback' optdict['callback'] = self.cb_set_provider_option # default is handled here and *must not* be given to optik if you # want the whole machinery to work if 'default' in optdict: if ('help' in optdict and optdict.get('default') is not None and not optdict['action'] in ('store_true', 'store_false')): optdict['help'] += ' [current: %default]' del optdict['default'] args = ['--' + str(opt)] if 'short' in optdict: self._short_options[optdict['short']] = opt args.append('-' + optdict['short']) del optdict['short'] # cleanup option definition dict before giving it to optik for key in list(optdict.keys()): if not key in self._optik_option_attrs: optdict.pop(key) return args, optdict def cb_set_provider_option(self, option, opt, value, parser): """optik callback for option setting""" if opt.startswith('--'): # remove -- on long option opt = opt[2:] else: # short option, get its long equivalent opt = self._short_options[opt[1:]] # trick since we can't set action='store_true' on options if value is None: value = 1 self.global_set_option(opt, value) def global_set_option(self, opt, value): """set option on the correct option provider""" self._all_options[opt].set_option(opt, value) def generate_config(self, stream=None, skipsections=(), encoding=None): """write a configuration file according to the current configuration into the given stream or stdout """ options_by_section = {} sections = [] for provider in self.options_providers: for section, options in provider.options_by_section(): if section is None: section = provider.name if section in skipsections: continue options = [(n, d, v) for (n, d, v) in options if d.get('type') is not None] if not options: continue if not section in sections: sections.append(section) alloptions = options_by_section.setdefault(section, []) alloptions += options stream = stream or sys.stdout encoding = _get_encoding(encoding, stream) printed = False for section in sections: if printed: print('\n', file=stream) format_section(stream, section.upper(), options_by_section[section], encoding) printed = True def generate_manpage(self, pkginfo, section=1, stream=None): """write a man page for the current configuration into the given stream or stdout """ self._monkeypatch_expand_default() try: optik_ext.generate_manpage(self.cmdline_parser, pkginfo, section, stream=stream or sys.stdout, level=self._maxlevel) finally: self._unmonkeypatch_expand_default() # initialization methods ################################################## def load_provider_defaults(self): """initialize configuration using default values""" for provider in self.options_providers: provider.load_defaults() def load_file_configuration(self, config_file=None): """load the configuration from file""" self.read_config_file(config_file) self.load_config_file() def read_config_file(self, config_file=None): """read the configuration file but do not load it (i.e. dispatching values to each options provider) """ helplevel = 1 while helplevel <= self._maxlevel: opt = '-'.join(['long'] * helplevel) + '-help' if opt in self._all_options: break # already processed def helpfunc(option, opt, val, p, level=helplevel): print(self.help(level)) sys.exit(0) helpmsg = '%s verbose help.' % ' '.join(['more'] * helplevel) optdict = {'action' : 'callback', 'callback' : helpfunc, 'help' : helpmsg} provider = self.options_providers[0] self.add_optik_option(provider, self.cmdline_parser, opt, optdict) provider.options += ( (opt, optdict), ) helplevel += 1 if config_file is None: config_file = self.config_file if config_file is not None: config_file = expanduser(config_file) if config_file and exists(config_file): parser = self.cfgfile_parser parser.read([config_file]) # normalize sections'title for sect, values in parser._sections.items(): if not sect.isupper() and values: parser._sections[sect.upper()] = values elif not self.quiet: msg = 'No config file found, using default configuration' print(msg, file=sys.stderr) return def input_config(self, onlysection=None, inputlevel=0, stream=None): """interactively get configuration values by asking to the user and generate a configuration file """ if onlysection is not None: onlysection = onlysection.upper() for provider in self.options_providers: for section, option, optdict in provider.all_options(): if onlysection is not None and section != onlysection: continue if not 'type' in optdict: # ignore action without type (callback, store_true...) continue provider.input_option(option, optdict, inputlevel) # now we can generate the configuration file if stream is not None: self.generate_config(stream) def load_config_file(self): """dispatch values previously read from a configuration file to each options provider) """ parser = self.cfgfile_parser for section in parser.sections(): for option, value in parser.items(section): try: self.global_set_option(option, value) except (KeyError, OptionError): # TODO handle here undeclared options appearing in the config file continue def load_configuration(self, **kwargs): """override configuration according to given parameters """ for opt, opt_value in kwargs.items(): opt = opt.replace('_', '-') provider = self._all_options[opt] provider.set_option(opt, opt_value) def load_command_line_configuration(self, args=None): """override configuration according to command line parameters return additional arguments """ self._monkeypatch_expand_default() try: if args is None: args = sys.argv[1:] else: args = list(args) (options, args) = self.cmdline_parser.parse_args(args=args) for provider in self._nocallback_options.keys(): config = provider.config for attr in config.__dict__.keys(): value = getattr(options, attr, None) if value is None: continue setattr(config, attr, value) return args finally: self._unmonkeypatch_expand_default() # help methods ############################################################ def add_help_section(self, title, description, level=0): """add a dummy option section for help purpose """ group = optik_ext.OptionGroup(self.cmdline_parser, title=title.capitalize(), description=description) group.level = level self._maxlevel = max(self._maxlevel, level) self.cmdline_parser.add_option_group(group) def _monkeypatch_expand_default(self): # monkey patch optik_ext to deal with our default values try: self.__expand_default_backup = optik_ext.HelpFormatter.expand_default optik_ext.HelpFormatter.expand_default = expand_default except AttributeError: # python < 2.4: nothing to be done pass def _unmonkeypatch_expand_default(self): # remove monkey patch if hasattr(optik_ext.HelpFormatter, 'expand_default'): # unpatch optik_ext to avoid side effects optik_ext.HelpFormatter.expand_default = self.__expand_default_backup def help(self, level=0): """return the usage string for available options """ self.cmdline_parser.formatter.output_level = level self._monkeypatch_expand_default() try: return self.cmdline_parser.format_help() finally: self._unmonkeypatch_expand_default() class Method(object): """used to ease late binding of default method (so you can define options on the class using default methods on the configuration instance) """ def __init__(self, methname): self.method = methname self._inst = None def bind(self, instance): """bind the method to its instance""" if self._inst is None: self._inst = instance def __call__(self, *args, **kwargs): assert self._inst, 'unbound method' return getattr(self._inst, self.method)(*args, **kwargs) # Options Provider ############################################################# class OptionsProviderMixIn(object): """Mixin to provide options to an OptionsManager""" # those attributes should be overridden priority = -1 name = 'default' options = () level = 0 def __init__(self): self.config = optik_ext.Values() for option in self.options: try: option, optdict = option except ValueError: raise Exception('Bad option: %r' % option) if isinstance(optdict.get('default'), Method): optdict['default'].bind(self) elif isinstance(optdict.get('callback'), Method): optdict['callback'].bind(self) self.load_defaults() def load_defaults(self): """initialize the provider using default values""" for opt, optdict in self.options: action = optdict.get('action') if action != 'callback': # callback action have no default default = self.option_default(opt, optdict) if default is REQUIRED: continue self.set_option(opt, default, action, optdict) def option_default(self, opt, optdict=None): """return the default value for an option""" if optdict is None: optdict = self.get_option_def(opt) default = optdict.get('default') if callable(default): default = default() return default def option_attrname(self, opt, optdict=None): """get the config attribute corresponding to opt """ if optdict is None: optdict = self.get_option_def(opt) return optdict.get('dest', opt.replace('-', '_')) option_name = deprecated('[0.60] OptionsProviderMixIn.option_name() was renamed to option_attrname()')(option_attrname) def option_value(self, opt): """get the current value for the given option""" return getattr(self.config, self.option_attrname(opt), None) def set_option(self, opt, value, action=None, optdict=None): """method called to set an option (registered in the options list) """ if optdict is None: optdict = self.get_option_def(opt) if value is not None: value = _validate(value, optdict, opt) if action is None: action = optdict.get('action', 'store') if optdict.get('type') == 'named': # XXX need specific handling optname = self.option_attrname(opt, optdict) currentvalue = getattr(self.config, optname, None) if currentvalue: currentvalue.update(value) value = currentvalue if action == 'store': setattr(self.config, self.option_attrname(opt, optdict), value) elif action in ('store_true', 'count'): setattr(self.config, self.option_attrname(opt, optdict), 0) elif action == 'store_false': setattr(self.config, self.option_attrname(opt, optdict), 1) elif action == 'append': opt = self.option_attrname(opt, optdict) _list = getattr(self.config, opt, None) if _list is None: if isinstance(value, (list, tuple)): _list = value elif value is not None: _list = [] _list.append(value) setattr(self.config, opt, _list) elif isinstance(_list, tuple): setattr(self.config, opt, _list + (value,)) else: _list.append(value) elif action == 'callback': optdict['callback'](None, opt, value, None) else: raise UnsupportedAction(action) def input_option(self, option, optdict, inputlevel=99): default = self.option_default(option, optdict) if default is REQUIRED: defaultstr = '(required): ' elif optdict.get('level', 0) > inputlevel: return elif optdict['type'] == 'password' or default is None: defaultstr = ': ' else: defaultstr = '(default: %s): ' % format_option_value(optdict, default) print(':%s:' % option) print(optdict.get('help') or option) inputfunc = INPUT_FUNCTIONS[optdict['type']] value = inputfunc(optdict, defaultstr) while default is REQUIRED and not value: print('please specify a value') value = inputfunc(optdict, '%s: ' % option) if value is None and default is not None: value = default self.set_option(option, value, optdict=optdict) def get_option_def(self, opt): """return the dictionary defining an option given it's name""" assert self.options for option in self.options: if option[0] == opt: return option[1] raise OptionError('no such option %s in section %r' % (opt, self.name), opt) def all_options(self): """return an iterator on available options for this provider option are actually described by a 3-uple: (section, option name, option dictionary) """ for section, options in self.options_by_section(): if section is None: if self.name is None: continue section = self.name.upper() for option, optiondict, value in options: yield section, option, optiondict def options_by_section(self): """return an iterator on options grouped by section (section, [list of (optname, optdict, optvalue)]) """ sections = {} for optname, optdict in self.options: sections.setdefault(optdict.get('group'), []).append( (optname, optdict, self.option_value(optname))) if None in sections: yield None, sections.pop(None) for section, options in sections.items(): yield section.upper(), options def options_and_values(self, options=None): if options is None: options = self.options for optname, optdict in options: yield (optname, optdict, self.option_value(optname)) # configuration ################################################################ class ConfigurationMixIn(OptionsManagerMixIn, OptionsProviderMixIn): """basic mixin for simple configurations which don't need the manager / providers model """ def __init__(self, *args, **kwargs): if not args: kwargs.setdefault('usage', '') kwargs.setdefault('quiet', 1) OptionsManagerMixIn.__init__(self, *args, **kwargs) OptionsProviderMixIn.__init__(self) if not getattr(self, 'option_groups', None): self.option_groups = [] for option, optdict in self.options: try: gdef = (optdict['group'].upper(), '') except KeyError: continue if not gdef in self.option_groups: self.option_groups.append(gdef) self.register_options_provider(self, own_group=False) def register_options(self, options): """add some options to the configuration""" options_by_group = {} for optname, optdict in options: options_by_group.setdefault(optdict.get('group', self.name.upper()), []).append((optname, optdict)) for group, options in options_by_group.items(): self.add_option_group(group, None, options, self) self.options += tuple(options) def load_defaults(self): OptionsProviderMixIn.load_defaults(self) def __iter__(self): return iter(self.config.__dict__.iteritems()) def __getitem__(self, key): try: return getattr(self.config, self.option_attrname(key)) except (optik_ext.OptionValueError, AttributeError): raise KeyError(key) def __setitem__(self, key, value): self.set_option(key, value) def get(self, key, default=None): try: return getattr(self.config, self.option_attrname(key)) except (OptionError, AttributeError): return default class Configuration(ConfigurationMixIn): """class for simple configurations which don't need the manager / providers model and prefer delegation to inheritance configuration values are accessible through a dict like interface """ def __init__(self, config_file=None, options=None, name=None, usage=None, doc=None, version=None): if options is not None: self.options = options if name is not None: self.name = name if doc is not None: self.__doc__ = doc super(Configuration, self).__init__(config_file=config_file, usage=usage, version=version) class OptionsManager2ConfigurationAdapter(object): """Adapt an option manager to behave like a `logilab.common.configuration.Configuration` instance """ def __init__(self, provider): self.config = provider def __getattr__(self, key): return getattr(self.config, key) def __getitem__(self, key): provider = self.config._all_options[key] try: return getattr(provider.config, provider.option_attrname(key)) except AttributeError: raise KeyError(key) def __setitem__(self, key, value): self.config.global_set_option(self.config.option_attrname(key), value) def get(self, key, default=None): provider = self.config._all_options[key] try: return getattr(provider.config, provider.option_attrname(key)) except AttributeError: return default # other functions ############################################################## def read_old_config(newconfig, changes, configfile): """initialize newconfig from a deprecated configuration file possible changes: * ('renamed', oldname, newname) * ('moved', option, oldgroup, newgroup) * ('typechanged', option, oldtype, newvalue) """ # build an index of changes changesindex = {} for action in changes: if action[0] == 'moved': option, oldgroup, newgroup = action[1:] changesindex.setdefault(option, []).append((action[0], oldgroup, newgroup)) continue if action[0] == 'renamed': oldname, newname = action[1:] changesindex.setdefault(newname, []).append((action[0], oldname)) continue if action[0] == 'typechanged': option, oldtype, newvalue = action[1:] changesindex.setdefault(option, []).append((action[0], oldtype, newvalue)) continue if action[1] in ('added', 'removed'): continue # nothing to do here raise Exception('unknown change %s' % action[0]) # build a config object able to read the old config options = [] for optname, optdef in newconfig.options: for action in changesindex.pop(optname, ()): if action[0] == 'moved': oldgroup, newgroup = action[1:] optdef = optdef.copy() optdef['group'] = oldgroup elif action[0] == 'renamed': optname = action[1] elif action[0] == 'typechanged': oldtype = action[1] optdef = optdef.copy() optdef['type'] = oldtype options.append((optname, optdef)) if changesindex: raise Exception('unapplied changes: %s' % changesindex) oldconfig = Configuration(options=options, name=newconfig.name) # read the old config oldconfig.load_file_configuration(configfile) # apply values reverting changes changes.reverse() done = set() for action in changes: if action[0] == 'renamed': oldname, newname = action[1:] newconfig[newname] = oldconfig[oldname] done.add(newname) elif action[0] == 'typechanged': optname, oldtype, newvalue = action[1:] newconfig[optname] = newvalue done.add(optname) for optname, optdef in newconfig.options: if optdef.get('type') and not optname in done: newconfig.set_option(optname, oldconfig[optname], optdict=optdef) def merge_options(options, optgroup=None): """preprocess a list of options and remove duplicates, returning a new list (tuple actually) of options. Options dictionaries are copied to avoid later side-effect. Also, if `otpgroup` argument is specified, ensure all options are in the given group. """ alloptions = {} options = list(options) for i in range(len(options)-1, -1, -1): optname, optdict = options[i] if optname in alloptions: options.pop(i) alloptions[optname].update(optdict) else: optdict = optdict.copy() options[i] = (optname, optdict) alloptions[optname] = optdict if optgroup is not None: alloptions[optname]['group'] = optgroup return tuple(options)