diff options
author | root <devnull@localhost> | 2006-04-26 10:48:09 +0000 |
---|---|---|
committer | root <devnull@localhost> | 2006-04-26 10:48:09 +0000 |
commit | 8b1e1c104bdff504b3e775b450432e6462b8d09b (patch) | |
tree | 0367359f6a18f318741f387d82dc3dcfd8139950 /__init__.py | |
download | logilab-common-8b1e1c104bdff504b3e775b450432e6462b8d09b.tar.gz |
forget the past.
forget the past.
Diffstat (limited to '__init__.py')
-rw-r--r-- | __init__.py | 309 |
1 files changed, 309 insertions, 0 deletions
diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..d1917b7 --- /dev/null +++ b/__init__.py @@ -0,0 +1,309 @@ +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +""" Copyright (c) 2000-2002 LOGILAB S.A. (Paris, FRANCE). + http://www.logilab.fr/ -- mailto:contact@logilab.fr + +Logilab common libraries +""" + +from __future__ import nested_scopes + +__revision__ = "$Id: __init__.py,v 1.30 2006-03-24 10:37:48 syt Exp $" + +# FIXME: move all those functions in a separated module + +def intersection(list1, list2): + """return the intersection of list1 and list2""" + intersect_dict, result = {}, [] + for item in list1: + intersect_dict[item] = 1 + for item in list2: + if intersect_dict.has_key(item): + result.append(item) + return result + +def difference(list1, list2): + """return elements of list1 not in list2""" + tmp, result = {}, [] + for i in list2: + tmp[i] = 1 + for i in list1: + if not tmp.has_key(i): + result.append(i) + return result + +def union(list1, list2): + """return list1 union list2""" + tmp = {} + for i in list1: + tmp[i] = 1 + for i in list2: + tmp[i] = 1 + return tmp.keys() + +def make_domains(lists): + """ + given a list of lists, return a list of domain for each list to produce all + combinaisons of possibles values + + ex: (['a', 'b'], ['c','d', 'e']) + -> (['a', 'b', 'a', 'b', 'a', 'b'], + ['c', 'c', 'd', 'd', 'e', 'e']) + """ + domains = [] + for iterable in lists: + new_domain = iterable[:] + for i in range(len(domains)): + domains[i] = domains[i]*len(iterable) + if domains: + missing = (len(domains[0]) - len(iterable)) / len(iterable) + i = 0 + for j in range(len(iterable)): + value = iterable[j] + for dummy in range(missing): + new_domain.insert(i, value) + i += 1 + i += 1 + domains.append(new_domain) + return domains + + +def flatten(iterable, tr_func=None, results=None): + """flatten a list of list with any level + + if tr_func is not None, it should be a one argument function that'll be called + on each final element + """ + if results is None: + results = [] + for val in iterable: + if type(val) in (type(()), type([])): + flatten(val, tr_func, results) + elif tr_func is None: + results.append(val) + else: + results.append(tr_func(val)) + return results + + +def get_cycles(graph_dict, vertices=None): + '''given a dictionnary representing an ordered graph (i.e. key are vertices + and values is a list of destination vertices representing edges), return a + list of detected cycles + ''' + if not graph_dict: + return () + result = [] + if vertices is None: + vertices = graph_dict.keys() + for vertice in vertices: + _get_cycles(graph_dict, vertice, [], result) + return result + +def _get_cycles(graph_dict, vertice=None, path=None, result=None): + """recursive function doing the real work for get_cycles""" + if vertice in path: + cycle = [vertice] + for i in range(len(path)-1, 0, -1): + node = path[i] + if node == vertice: + break + cycle.insert(0, node) + # make a canonical representation + start_from = min(cycle) + index = cycle.index(start_from) + cycle = cycle[index:] + cycle[0:index] + # append it to result if not already in + if not cycle in result: + result.append(cycle) + return + path.append(vertice) + try: + for node in graph_dict[vertice]: + _get_cycles(graph_dict, node, path, result) + except KeyError: + pass + path.pop() + + +def cached(callableobj, keyarg=None): + """simple decorator to cache result of method call""" + #print callableobj, keyarg, callableobj.func_code.co_argcount + if callableobj.func_code.co_argcount == 1 or keyarg == 0: + + def cache_wrapper1(self, *args): + cache = '_%s_cache_' % callableobj.__name__ + #print 'cache1?', cache + try: + return getattr(self, cache) + except AttributeError: + #print 'miss' + value = callableobj(self, *args) + setattr(self, cache, value) + return value + return cache_wrapper1 + + elif keyarg: + + def cache_wrapper2(self, *args, **kwargs): + cache = '_%s_cache_' % callableobj.__name__ + key = args[keyarg-1] + #print 'cache2?', cache, self, key + try: + _cache = getattr(self, cache) + except AttributeError: + #print 'init' + _cache = {} + setattr(self, cache, _cache) + try: + return _cache[key] + except KeyError: + #print 'miss', self, cache, key + _cache[key] = callableobj(self, *args, **kwargs) + return _cache[key] + return cache_wrapper2 + def cache_wrapper3(self, *args): + cache = '_%s_cache_' % callableobj.__name__ + #print 'cache3?', cache, self, args + try: + _cache = getattr(self, cache) + except AttributeError: + #print 'init' + _cache = {} + setattr(self, cache, _cache) + try: + return _cache[args] + except KeyError: + #print 'miss' + _cache[args] = callableobj(self, *args) + return _cache[args] + return cache_wrapper3 + +import sys + +class ProgressBar(object): + """a simple text progression bar""" + + def __init__(self, nbops, size=20., stream=sys.stdout): + self._dotevery = max(nbops / size, 1) + self._fstr = '\r[%-20s]' + self._dotcount, self._dots = 1, [] + self._stream = stream + + def update(self): + """update the progression bar""" + self._dotcount += 1 + if self._dotcount >= self._dotevery: + self._dotcount = 1 + self._dots.append('.') + self._stream.write(self._fstr % ''.join(self._dots)) + self._stream.flush() + + +import tempfile +import os +import time +from os.path import exists + +class Execute: + """This is a deadlock save version of popen2 (no stdin), that returns + an object with errorlevel, out and err + """ + + def __init__(self, command): + outfile = tempfile.mktemp() + errfile = tempfile.mktemp() + self.status = os.system("( %s ) >%s 2>%s" % + (command, outfile, errfile)) >> 8 + self.out = open(outfile,"r").read() + self.err = open(errfile,"r").read() + os.remove(outfile) + os.remove(errfile) + +def acquire_lock(lock_file, max_try=10, delay=10): + """acquire a lock represented by a file on the file system""" + count = 0 + while max_try <= 0 or count < max_try: + if not exists(lock_file): + break + count += 1 + time.sleep(delay) + else: + raise Exception('Unable to acquire %s' % lock_file) + stream = open(lock_file, 'w') + stream.write(str(os.getpid())) + stream.close() + +def release_lock(lock_file): + """release a lock represented by a file on the file system""" + os.remove(lock_file) + + +## Deprecation utilities ######################### + +from warnings import warn + +class deprecated(type): + """metaclass to print a warning on instantiation of a deprecated class""" + + def __call__(cls, *args, **kwargs): + msg = getattr(cls, "__deprecation_warning__", + "%s is deprecated" % cls.__name__) + warn(msg, DeprecationWarning, stacklevel=2) + return type.__call__(cls, *args, **kwargs) + + +def class_renamed(old_name, new_class, message=None): + """automatically creates a class which fires a DeprecationWarning + when instantiated. + + >>> Set = class_renamed('Set', set, 'Set is now replaced by set') + >>> s = Set() + sample.py:57: DeprecationWarning: Set is now replaced by set + s = Set() + >>> + """ + clsdict = {} + if message is not None: + clsdict['__deprecation_warning__'] = message + try: + # new-style class + return deprecated(old_name, (new_class,), clsdict) + except (NameError, TypeError): + # old-style class + class DeprecatedClass(new_class): + """FIXME: There might be a better way to handle old/new-style class + """ + def __init__(self, *args, **kwargs): + warn(message, DeprecationWarning, stacklevel=2) + new_class.__init__(self, *args, **kwargs) + return DeprecatedClass + + +def deprecated_function(new_func, message=None): + """creates a function which fires a DeprecationWarning when used + + For example, if <bar> is deprecated in favour of <foo> : + >>> bar = deprecated_function(foo, 'bar is deprecated') + >>> bar() + sample.py:57: DeprecationWarning: bar is deprecated + bar() + >>> + """ + if message is None: + message = "this function is deprecated, use %s instead" % ( + new_func.func_name) + def deprecated(*args, **kwargs): + warn(message, DeprecationWarning, stacklevel=2) + return new_func(*args, **kwargs) + return deprecated |