diff options
author | Rémi Cardona <remi.cardona@logilab.fr> | 2015-06-26 12:07:34 +0200 |
---|---|---|
committer | Rémi Cardona <remi.cardona@logilab.fr> | 2015-06-26 12:07:34 +0200 |
commit | ebbc0e18257b3dc0fba24ec5416651d9ee8642b8 (patch) | |
tree | b1a2479ea1a805d723293311cb5deac5a714bc6c /logilab | |
parent | 1c9eddb0ad19842d1bc4edd19cf47f4847e06861 (diff) | |
download | logilab-common-ebbc0e18257b3dc0fba24ec5416651d9ee8642b8.tar.gz |
delete unused and untested modules (closes #2402)
It's spring^Wsummer cleaning time.
Diffstat (limited to 'logilab')
-rw-r--r-- | logilab/common/cli.py | 211 | ||||
-rw-r--r-- | logilab/common/contexts.py | 5 | ||||
-rw-r--r-- | logilab/common/corbautils.py | 117 | ||||
-rw-r--r-- | logilab/common/dbf.py | 231 | ||||
-rw-r--r-- | logilab/common/pyro_ext.py | 180 | ||||
-rw-r--r-- | logilab/common/xmlrpcutils.py | 131 |
6 files changed, 0 insertions, 875 deletions
diff --git a/logilab/common/cli.py b/logilab/common/cli.py deleted file mode 100644 index cdeef97..0000000 --- a/logilab/common/cli.py +++ /dev/null @@ -1,211 +0,0 @@ -# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved. -# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr -# -# This file is part of logilab-common. -# -# logilab-common is free software: you can redistribute it and/or modify it under -# the terms of the GNU Lesser General Public License as published by the Free -# Software Foundation, either version 2.1 of the License, or (at your option) any -# later version. -# -# logilab-common is distributed in the hope that it will be useful, but WITHOUT -# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS -# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more -# details. -# -# You should have received a copy of the GNU Lesser General Public License along -# with logilab-common. If not, see <http://www.gnu.org/licenses/>. -"""Command line interface helper classes. - -It provides some default commands, a help system, a default readline -configuration with completion and persistent history. - -Example:: - - class BookShell(CLIHelper): - - def __init__(self): - # quit and help are builtins - # CMD_MAP keys are commands, values are topics - self.CMD_MAP['pionce'] = _("Sommeil") - self.CMD_MAP['ronfle'] = _("Sommeil") - CLIHelper.__init__(self) - - help_do_pionce = ("pionce", "pionce duree", _("met ton corps en veille")) - def do_pionce(self): - print('nap is good') - - help_do_ronfle = ("ronfle", "ronfle volume", _("met les autres en veille")) - def do_ronfle(self): - print('fuuuuuuuuuuuu rhhhhhrhrhrrh') - - cl = BookShell() -""" - -from __future__ import print_function - -__docformat__ = "restructuredtext en" - -from six.moves import builtins, input - -if not hasattr(builtins, '_'): - builtins._ = str - - -def init_readline(complete_method, histfile=None): - """Init the readline library if available.""" - try: - import readline - readline.parse_and_bind("tab: complete") - readline.set_completer(complete_method) - string = readline.get_completer_delims().replace(':', '') - readline.set_completer_delims(string) - if histfile is not None: - try: - readline.read_history_file(histfile) - except IOError: - pass - import atexit - atexit.register(readline.write_history_file, histfile) - except: - print('readline is not available :-(') - - -class Completer : - """Readline completer.""" - - def __init__(self, commands): - self.list = commands - - def complete(self, text, state): - """Hook called by readline when <tab> is pressed.""" - n = len(text) - matches = [] - for cmd in self.list : - if cmd[:n] == text : - matches.append(cmd) - try: - return matches[state] - except IndexError: - return None - - -class CLIHelper: - """An abstract command line interface client which recognize commands - and provide an help system. - """ - - CMD_MAP = {'help': _("Others"), - 'quit': _("Others"), - } - CMD_PREFIX = '' - - def __init__(self, histfile=None) : - self._topics = {} - self.commands = None - self._completer = Completer(self._register_commands()) - init_readline(self._completer.complete, histfile) - - def run(self): - """loop on user input, exit on EOF""" - while True: - try: - line = input('>>> ') - except EOFError: - print - break - s_line = line.strip() - if not s_line: - continue - args = s_line.split() - if args[0] in self.commands: - try: - cmd = 'do_%s' % self.commands[args[0]] - getattr(self, cmd)(*args[1:]) - except EOFError: - break - except: - import traceback - traceback.print_exc() - else: - try: - self.handle_line(s_line) - except: - import traceback - traceback.print_exc() - - def handle_line(self, stripped_line): - """Method to overload in the concrete class (should handle - lines which are not commands). - """ - raise NotImplementedError() - - - # private methods ######################################################### - - def _register_commands(self): - """ register available commands method and return the list of - commands name - """ - self.commands = {} - self._command_help = {} - commands = [attr[3:] for attr in dir(self) if attr[:3] == 'do_'] - for command in commands: - topic = self.CMD_MAP[command] - help_method = getattr(self, 'help_do_%s' % command) - self._topics.setdefault(topic, []).append(help_method) - self.commands[self.CMD_PREFIX + command] = command - self._command_help[command] = help_method - return self.commands.keys() - - def _print_help(self, cmd, syntax, explanation): - print(_('Command %s') % cmd) - print(_('Syntax: %s') % syntax) - print('\t', explanation) - print() - - - # predefined commands ##################################################### - - def do_help(self, command=None) : - """base input of the help system""" - if command in self._command_help: - self._print_help(*self._command_help[command]) - elif command is None or command not in self._topics: - print(_("Use help <topic> or help <command>.")) - print(_("Available topics are:")) - topics = sorted(self._topics.keys()) - for topic in topics: - print('\t', topic) - print() - print(_("Available commands are:")) - commands = self.commands.keys() - commands.sort() - for command in commands: - print('\t', command[len(self.CMD_PREFIX):]) - - else: - print(_('Available commands about %s:') % command) - print - for command_help_method in self._topics[command]: - try: - if callable(command_help_method): - self._print_help(*command_help_method()) - else: - self._print_help(*command_help_method) - except: - import traceback - traceback.print_exc() - print('ERROR in help method %s'% ( - command_help_method.__name__)) - - help_do_help = ("help", "help [topic|command]", - _("print help message for the given topic/command or \ -available topics when no argument")) - - def do_quit(self): - """quit the CLI""" - raise EOFError() - - def help_do_quit(self): - return ("quit", "quit", _("quit the application")) diff --git a/logilab/common/contexts.py b/logilab/common/contexts.py deleted file mode 100644 index d78c327..0000000 --- a/logilab/common/contexts.py +++ /dev/null @@ -1,5 +0,0 @@ -from warnings import warn -warn('logilab.common.contexts module is deprecated, use logilab.common.shellutils instead', - DeprecationWarning, stacklevel=1) - -from logilab.common.shellutils import tempfile, pushd diff --git a/logilab/common/corbautils.py b/logilab/common/corbautils.py deleted file mode 100644 index 65c301d..0000000 --- a/logilab/common/corbautils.py +++ /dev/null @@ -1,117 +0,0 @@ -# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved. -# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr -# -# This file is part of logilab-common. -# -# logilab-common is free software: you can redistribute it and/or modify it under -# the terms of the GNU Lesser General Public License as published by the Free -# Software Foundation, either version 2.1 of the License, or (at your option) any -# later version. -# -# logilab-common is distributed in the hope that it will be useful, but WITHOUT -# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS -# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more -# details. -# -# You should have received a copy of the GNU Lesser General Public License along -# with logilab-common. If not, see <http://www.gnu.org/licenses/>. -"""A set of utility function to ease the use of OmniORBpy. - - - - -""" -__docformat__ = "restructuredtext en" - -from omniORB import CORBA, PortableServer -import CosNaming - -orb = None - -def get_orb(): - """ - returns a reference to the ORB. - The first call to the method initialized the ORB - This method is mainly used internally in the module. - """ - - global orb - if orb is None: - import sys - orb = CORBA.ORB_init(sys.argv, CORBA.ORB_ID) - return orb - -def get_root_context(): - """ - returns a reference to the NameService object. - This method is mainly used internally in the module. - """ - - orb = get_orb() - nss = orb.resolve_initial_references("NameService") - rootContext = nss._narrow(CosNaming.NamingContext) - assert rootContext is not None, "Failed to narrow root naming context" - return rootContext - -def register_object_name(object, namepath): - """ - Registers a object in the NamingService. - The name path is a list of 2-uples (id,kind) giving the path. - - For instance if the path of an object is [('foo',''),('bar','')], - it is possible to get a reference to the object using the URL - 'corbaname::hostname#foo/bar'. - [('logilab','rootmodule'),('chatbot','application'),('chatter','server')] - is mapped to - 'corbaname::hostname#logilab.rootmodule/chatbot.application/chatter.server' - - The get_object_reference() function can be used to resolve such a URL. - """ - context = get_root_context() - for id, kind in namepath[:-1]: - name = [CosNaming.NameComponent(id, kind)] - try: - context = context.bind_new_context(name) - except CosNaming.NamingContext.AlreadyBound as ex: - context = context.resolve(name)._narrow(CosNaming.NamingContext) - assert context is not None, \ - 'test context exists but is not a NamingContext' - - id, kind = namepath[-1] - name = [CosNaming.NameComponent(id, kind)] - try: - context.bind(name, object._this()) - except CosNaming.NamingContext.AlreadyBound as ex: - context.rebind(name, object._this()) - -def activate_POA(): - """ - This methods activates the Portable Object Adapter. - You need to call it to enable the reception of messages in your code, - on both the client and the server. - """ - orb = get_orb() - poa = orb.resolve_initial_references('RootPOA') - poaManager = poa._get_the_POAManager() - poaManager.activate() - -def run_orb(): - """ - Enters the ORB mainloop on the server. - You should not call this method on the client. - """ - get_orb().run() - -def get_object_reference(url): - """ - Resolves a corbaname URL to an object proxy. - See register_object_name() for examples URLs - """ - return get_orb().string_to_object(url) - -def get_object_string(host, namepath): - """given an host name and a name path as described in register_object_name, - return a corba string identifier - """ - strname = '/'.join(['.'.join(path_elt) for path_elt in namepath]) - return 'corbaname::%s#%s' % (host, strname) diff --git a/logilab/common/dbf.py b/logilab/common/dbf.py deleted file mode 100644 index ab142b2..0000000 --- a/logilab/common/dbf.py +++ /dev/null @@ -1,231 +0,0 @@ -# -*- coding: utf-8 -*- -# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved. -# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr -# -# This file is part of logilab-common. -# -# logilab-common is free software: you can redistribute it and/or modify it under -# the terms of the GNU Lesser General Public License as published by the Free -# Software Foundation, either version 2.1 of the License, or (at your option) any -# later version. -# -# logilab-common is distributed in the hope that it will be useful, but WITHOUT -# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS -# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more -# details. -# -# You should have received a copy of the GNU Lesser General Public License along -# with logilab-common. If not, see <http://www.gnu.org/licenses/>. -"""This is a DBF reader which reads Visual Fox Pro DBF format with Memo field - -Usage: - ->>> rec = readDbf('test.dbf') ->>> for line in rec: ->>> print line['name'] - - -:date: 13/07/2007 - -http://www.physics.ox.ac.uk/users/santoso/Software.Repository.html -page says code is "available as is without any warranty or support". -""" -from __future__ import print_function - -import struct -import os, os.path -import sys -import csv -import tempfile - -from six.moves import range - -class Dbase: - def __init__(self): - self.fdb = None - self.fmemo = None - self.db_data = None - self.memo_data = None - self.fields = None - self.num_records = 0 - self.header = None - self.memo_file = '' - self.memo_header = None - self.memo_block_size = 0 - self.memo_header_len = 0 - - def _drop_after_NULL(self, txt): - for i in range(0, len(txt)): - if ord(struct.unpack('c', txt[i])[0])==0: - return txt[:i] - return txt - - def _reverse_endian(self, num): - if not len(num): - return 0 - val = struct.unpack('<L', num) - val = struct.pack('>L', val[0]) - val = struct.unpack('>L', val) - return val[0] - - def _assign_ids(self, lst, ids): - result = {} - idx = 0 - for item in lst: - id = ids[idx] - result[id] = item - idx += 1 - return result - - def open(self, db_name): - filesize = os.path.getsize(db_name) - if filesize <= 68: - raise IOError('The file is not large enough to be a dbf file') - - self.fdb = open(db_name, 'rb') - - self.memo_file = '' - if os.path.isfile(db_name[0:-1] + 't'): - self.memo_file = db_name[0:-1] + 't' - elif os.path.isfile(db_name[0:-3] + 'fpt'): - self.memo_file = db_name[0:-3] + 'fpt' - - if self.memo_file: - #Read memo file - self.fmemo = open(self.memo_file, 'rb') - self.memo_data = self.fmemo.read() - self.memo_header = self._assign_ids(struct.unpack('>6x1H', self.memo_data[:8]), ['Block size']) - block_size = self.memo_header['Block size'] - if not block_size: - block_size = 512 - self.memo_block_size = block_size - self.memo_header_len = block_size - memo_size = os.path.getsize(self.memo_file) - - #Start reading data file - data = self.fdb.read(32) - self.header = self._assign_ids(struct.unpack('<B 3B L 2H 20x', data), ['id', 'Year', 'Month', 'Day', '# of Records', 'Header Size', 'Record Size']) - self.header['id'] = hex(self.header['id']) - - self.num_records = self.header['# of Records'] - data = self.fdb.read(self.header['Header Size']-34) - self.fields = {} - x = 0 - header_pattern = '<11s c 4x B B 14x' - ids = ['Field Name', 'Field Type', 'Field Length', 'Field Precision'] - pattern_len = 32 - for offset in range(0, len(data), 32): - if ord(data[offset])==0x0d: - break - x += 1 - data_subset = data[offset: offset+pattern_len] - if len(data_subset) < pattern_len: - data_subset += ' '*(pattern_len-len(data_subset)) - self.fields[x] = self._assign_ids(struct.unpack(header_pattern, data_subset), ids) - self.fields[x]['Field Name'] = self._drop_after_NULL(self.fields[x]['Field Name']) - - self.fdb.read(3) - if self.header['# of Records']: - data_size = (self.header['# of Records'] * self.header['Record Size']) - 1 - self.db_data = self.fdb.read(data_size) - else: - self.db_data = '' - self.row_format = '<' - self.row_ids = [] - self.row_len = 0 - for key in self.fields: - field = self.fields[key] - self.row_format += '%ds ' % (field['Field Length']) - self.row_ids.append(field['Field Name']) - self.row_len += field['Field Length'] - - def close(self): - if self.fdb: - self.fdb.close() - if self.fmemo: - self.fmemo.close() - - def get_numrecords(self): - return self.num_records - - def get_record_with_names(self, rec_no): - """ - This function accept record number from 0 to N-1 - """ - if rec_no < 0 or rec_no > self.num_records: - raise Exception('Unable to extract data outside the range') - - offset = self.header['Record Size'] * rec_no - data = self.db_data[offset:offset+self.row_len] - record = self._assign_ids(struct.unpack(self.row_format, data), self.row_ids) - - if self.memo_file: - for key in self.fields: - field = self.fields[key] - f_type = field['Field Type'] - f_name = field['Field Name'] - c_data = record[f_name] - - if f_type=='M' or f_type=='G' or f_type=='B' or f_type=='P': - c_data = self._reverse_endian(c_data) - if c_data: - record[f_name] = self.read_memo(c_data-1).strip() - else: - record[f_name] = c_data.strip() - return record - - def read_memo_record(self, num, in_length): - """ - Read the record of given number. The second parameter is the length of - the record to read. It can be undefined, meaning read the whole record, - and it can be negative, meaning at most the length - """ - if in_length < 0: - in_length = -self.memo_block_size - - offset = self.memo_header_len + num * self.memo_block_size - self.fmemo.seek(offset) - if in_length<0: - in_length = -in_length - if in_length==0: - return '' - return self.fmemo.read(in_length) - - def read_memo(self, num): - result = '' - buffer = self.read_memo_record(num, -1) - if len(buffer)<=0: - return '' - length = struct.unpack('>L', buffer[4:4+4])[0] + 8 - - block_size = self.memo_block_size - if length < block_size: - return buffer[8:length] - rest_length = length - block_size - rest_data = self.read_memo_record(num+1, rest_length) - if len(rest_data)<=0: - return '' - return buffer[8:] + rest_data - -def readDbf(filename): - """ - Read the DBF file specified by the filename and - return the records as a list of dictionary. - - :param: filename File name of the DBF - :return: List of rows - """ - db = Dbase() - db.open(filename) - num = db.get_numrecords() - rec = [] - for i in range(0, num): - record = db.get_record_with_names(i) - rec.append(record) - db.close() - return rec - -if __name__=='__main__': - rec = readDbf('dbf/sptable.dbf') - for line in rec: - print('%s %s' % (line['GENUS'].strip(), line['SPECIES'].strip())) diff --git a/logilab/common/pyro_ext.py b/logilab/common/pyro_ext.py deleted file mode 100644 index 5204b1b..0000000 --- a/logilab/common/pyro_ext.py +++ /dev/null @@ -1,180 +0,0 @@ -# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved. -# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr -# -# This file is part of logilab-common. -# -# logilab-common is free software: you can redistribute it and/or modify it under -# the terms of the GNU Lesser General Public License as published by the Free -# Software Foundation, either version 2.1 of the License, or (at your option) any -# later version. -# -# logilab-common is distributed in the hope that it will be useful, but WITHOUT -# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS -# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more -# details. -# -# You should have received a copy of the GNU Lesser General Public License along -# with logilab-common. If not, see <http://www.gnu.org/licenses/>. -"""Python Remote Object utilities - -Main functions available: - -* `register_object` to expose arbitrary object through pyro using delegation - approach and register it in the nameserver. -* `ns_unregister` unregister an object identifier from the nameserver. -* `ns_get_proxy` get a pyro proxy from a nameserver object identifier. -""" - -__docformat__ = "restructuredtext en" - -import logging -import tempfile - -from Pyro import core, naming, errors, util, config - -_LOGGER = logging.getLogger('pyro') -_MARKER = object() - -config.PYRO_STORAGE = tempfile.gettempdir() - -def ns_group_and_id(idstr, defaultnsgroup=_MARKER): - try: - nsgroup, nsid = idstr.rsplit('.', 1) - except ValueError: - if defaultnsgroup is _MARKER: - nsgroup = config.PYRO_NS_DEFAULTGROUP - else: - nsgroup = defaultnsgroup - nsid = idstr - if nsgroup is not None and not nsgroup.startswith(':'): - nsgroup = ':' + nsgroup - return nsgroup, nsid - -def host_and_port(hoststr): - if not hoststr: - return None, None - try: - hoststr, port = hoststr.split(':') - except ValueError: - port = None - else: - port = int(port) - return hoststr, port - -_DAEMONS = {} -_PYRO_OBJS = {} -def _get_daemon(daemonhost, start=True): - if not daemonhost in _DAEMONS: - if not start: - raise Exception('no daemon for %s' % daemonhost) - if not _DAEMONS: - core.initServer(banner=0) - host, port = host_and_port(daemonhost) - daemon = core.Daemon(host=host, port=port) - _DAEMONS[daemonhost] = daemon - return _DAEMONS[daemonhost] - - -def locate_ns(nshost): - """locate and return the pyro name server to the daemon""" - core.initClient(banner=False) - return naming.NameServerLocator().getNS(*host_and_port(nshost)) - - -def register_object(object, nsid, defaultnsgroup=_MARKER, - daemonhost=None, nshost=None, use_pyrons=True): - """expose the object as a pyro object and register it in the name-server - - if use_pyrons is False, then the object is exposed, but no - attempt to register it to a pyro nameserver is made. - - return the pyro daemon object - """ - nsgroup, nsid = ns_group_and_id(nsid, defaultnsgroup) - daemon = _get_daemon(daemonhost) - if use_pyrons: - nsd = locate_ns(nshost) - # make sure our namespace group exists - try: - nsd.createGroup(nsgroup) - except errors.NamingError: - pass - daemon.useNameServer(nsd) - # use Delegation approach - impl = core.ObjBase() - impl.delegateTo(object) - qnsid = '%s.%s' % (nsgroup, nsid) - uri = daemon.connect(impl, qnsid) - _PYRO_OBJS[qnsid] = str(uri) - _LOGGER.info('registered %s a pyro object using group %s and id %s', - object, nsgroup, nsid) - return daemon - -def get_object_uri(qnsid): - return _PYRO_OBJS[qnsid] - -def ns_unregister(nsid, defaultnsgroup=_MARKER, nshost=None): - """unregister the object with the given nsid from the pyro name server""" - nsgroup, nsid = ns_group_and_id(nsid, defaultnsgroup) - try: - nsd = locate_ns(nshost) - except errors.PyroError as ex: - # name server not responding - _LOGGER.error('can\'t locate pyro name server: %s', ex) - else: - try: - nsd.unregister('%s.%s' % (nsgroup, nsid)) - _LOGGER.info('%s unregistered from pyro name server', nsid) - except errors.NamingError: - _LOGGER.warning('%s not registered in pyro name server', nsid) - - -def ns_reregister(nsid, defaultnsgroup=_MARKER, nshost=None): - """reregister a pyro object into the name server. You only have to specify - the name-server id of the object (though you MUST have gone through - `register_object` for the given object previously). - - This is especially useful for long running server while the name server may - have been restarted, and its records lost. - """ - nsgroup, nsid = ns_group_and_id(nsid, defaultnsgroup) - qnsid = '%s.%s' % (nsgroup, nsid) - nsd = locate_ns(nshost) - try: - nsd.unregister(qnsid) - except errors.NamingError: - # make sure our namespace group exists - try: - nsd.createGroup(nsgroup) - except errors.NamingError: - pass - nsd.register(qnsid, _PYRO_OBJS[qnsid]) - -def ns_get_proxy(nsid, defaultnsgroup=_MARKER, nshost=None): - """ - if nshost is None, the nameserver is found by a broadcast. - """ - # resolve the Pyro object - nsgroup, nsid = ns_group_and_id(nsid, defaultnsgroup) - try: - nsd = locate_ns(nshost) - pyrouri = nsd.resolve('%s.%s' % (nsgroup, nsid)) - except errors.ProtocolError as ex: - raise errors.PyroError( - 'Could not connect to the Pyro name server (host: %s)' % nshost) - except errors.NamingError: - raise errors.PyroError( - 'Could not get proxy for %s (not registered in Pyro), ' - 'you may have to restart your server-side application' % nsid) - return core.getProxyForURI(pyrouri) - -def get_proxy(pyro_uri): - """get a proxy for the passed pyro uri without using a nameserver - """ - return core.getProxyForURI(pyro_uri) - -def set_pyro_log_threshold(level): - pyrologger = logging.getLogger('Pyro.%s' % str(id(util.Log))) - # remove handlers so only the root handler is used - pyrologger.handlers = [] - pyrologger.setLevel(level) diff --git a/logilab/common/xmlrpcutils.py b/logilab/common/xmlrpcutils.py deleted file mode 100644 index 1d30d82..0000000 --- a/logilab/common/xmlrpcutils.py +++ /dev/null @@ -1,131 +0,0 @@ -# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved. -# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr -# -# This file is part of logilab-common. -# -# logilab-common is free software: you can redistribute it and/or modify it under -# the terms of the GNU Lesser General Public License as published by the Free -# Software Foundation, either version 2.1 of the License, or (at your option) any -# later version. -# -# logilab-common is distributed in the hope that it will be useful, but WITHOUT -# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS -# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more -# details. -# -# You should have received a copy of the GNU Lesser General Public License along -# with logilab-common. If not, see <http://www.gnu.org/licenses/>. -"""XML-RPC utilities.""" -__docformat__ = "restructuredtext en" - -import xmlrpclib -from base64 import encodestring -#from cStringIO import StringIO - -ProtocolError = xmlrpclib.ProtocolError - -## class BasicAuthTransport(xmlrpclib.Transport): -## def __init__(self, username=None, password=None): -## self.username = username -## self.password = password -## self.verbose = None -## self.has_ssl = httplib.__dict__.has_key("HTTPConnection") - -## def request(self, host, handler, request_body, verbose=None): -## # issue XML-RPC request -## if self.has_ssl: -## if host.startswith("https:"): h = httplib.HTTPSConnection(host) -## else: h = httplib.HTTPConnection(host) -## else: h = httplib.HTTP(host) - -## h.putrequest("POST", handler) - -## # required by HTTP/1.1 -## if not self.has_ssl: # HTTPConnection already does 1.1 -## h.putheader("Host", host) -## h.putheader("Connection", "close") - -## if request_body: h.send(request_body) -## if self.has_ssl: -## response = h.getresponse() -## if response.status != 200: -## raise xmlrpclib.ProtocolError(host + handler, -## response.status, -## response.reason, -## response.msg) -## file = response.fp -## else: -## errcode, errmsg, headers = h.getreply() -## if errcode != 200: -## raise xmlrpclib.ProtocolError(host + handler, errcode, -## errmsg, headers) - -## file = h.getfile() - -## return self.parse_response(file) - - - -class AuthMixin: - """basic http authentication mixin for xmlrpc transports""" - - def __init__(self, username, password, encoding): - self.verbose = 0 - self.username = username - self.password = password - self.encoding = encoding - - def request(self, host, handler, request_body, verbose=0): - """issue XML-RPC request""" - h = self.make_connection(host) - h.putrequest("POST", handler) - # required by XML-RPC - h.putheader("User-Agent", self.user_agent) - h.putheader("Content-Type", "text/xml") - h.putheader("Content-Length", str(len(request_body))) - h.putheader("Host", host) - h.putheader("Connection", "close") - # basic auth - if self.username is not None and self.password is not None: - h.putheader("AUTHORIZATION", "Basic %s" % encodestring( - "%s:%s" % (self.username, self.password)).replace("\012", "")) - h.endheaders() - # send body - if request_body: - h.send(request_body) - # get and check reply - errcode, errmsg, headers = h.getreply() - if errcode != 200: - raise ProtocolError(host + handler, errcode, errmsg, headers) - file = h.getfile() -## # FIXME: encoding ??? iirc, this fix a bug in xmlrpclib but... -## data = h.getfile().read() -## if self.encoding != 'UTF-8': -## data = data.replace("version='1.0'", -## "version='1.0' encoding='%s'" % self.encoding) -## result = StringIO() -## result.write(data) -## result.seek(0) -## return self.parse_response(result) - return self.parse_response(file) - -class BasicAuthTransport(AuthMixin, xmlrpclib.Transport): - """basic http authentication transport""" - -class BasicAuthSafeTransport(AuthMixin, xmlrpclib.SafeTransport): - """basic https authentication transport""" - - -def connect(url, user=None, passwd=None, encoding='ISO-8859-1'): - """return an xml rpc server on <url>, using user / password if specified - """ - if user or passwd: - assert user and passwd is not None - if url.startswith('https://'): - transport = BasicAuthSafeTransport(user, passwd, encoding) - else: - transport = BasicAuthTransport(user, passwd, encoding) - else: - transport = None - server = xmlrpclib.ServerProxy(url, transport, encoding=encoding) - return server |