diff options
author | Sylvain Th?nault <sylvain.thenault@logilab.fr> | 2010-02-26 17:41:30 +0100 |
---|---|---|
committer | Sylvain Th?nault <sylvain.thenault@logilab.fr> | 2010-02-26 17:41:30 +0100 |
commit | 4111087387eac61b811d0cff8f53784dc77213db (patch) | |
tree | 5b0ed04ec773d9e14c23c1ef56f832b0ec4692f7 | |
parent | d8b855b674ba51018a46d6b440e8a7eadd9bf77a (diff) | |
parent | c895beff74efbf82142b1052b2cc4760a1b61ae1 (diff) | |
download | logilab-common-4111087387eac61b811d0cff8f53784dc77213db.tar.gz |
backport stable
-rw-r--r-- | _pyodbcwrap.py | 443 | ||||
-rw-r--r-- | db.py | 67 |
2 files changed, 1 insertions, 509 deletions
diff --git a/_pyodbcwrap.py b/_pyodbcwrap.py deleted file mode 100644 index ae92a28..0000000 --- a/_pyodbcwrap.py +++ /dev/null @@ -1,443 +0,0 @@ -""" -Wrapper around Pyodbc to alleviate bad perf issue when using sqlserver -in multithreaded mode. - -:copyright: 2010 LOGILAB S.A. (Paris, FRANCE), all rights reserved. -:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr -:license: General Public License version 2 - http://www.gnu.org/licenses -""" - -## Communication protocol: - -## command len\n -## len bytes (pickled data) - -## commands: -## NEW (classname, args, kwargs) -> object id -## CALL (obj id, method, args, kwargs) -> (VAL length\n return value) -## (REF 0 object_id\n) -## (EXC length\n exception) -## (NONE 0) -## DEL (object id) - -import sys -import subprocess -from cPickle import dumps, loads, HIGHEST_PROTOCOL -import struct # if we need binary protocol -from exceptions import * - -if sys.platform == 'win32': - #pylint:disable-msg=F0401 - import pyodbc as dbapimodule - from pyodbc import * - import pyodbc -else: - import psycopg2 as dbapimodule - from psycopg2 import * - from psycopg2 import _psycopg - -class ProtocolError(OperationalError): - pass - - -def connect(*args, **kwargs): - return ConnectionProxy._new_connection(*args, **kwargs) - - -class Binary(object): - def __init__(self, value): - self.value = value[:] - - -class Proxy(object): - remote_class = None # Provide remote class name, None means _new will fail - def __init__(self, obj_id, pipe): - self._id = obj_id - self._pipe = pipe - - - def _call(self, method, args=None, kwargs=None, proxy_class=None): - try: - _in = self._pipe.stdout - _out = self._pipe.stdin - if args is None: - args = () - if kwargs is None: - kwargs = {} - data = dumps((self._id, method, args, kwargs), HIGHEST_PROTOCOL) - length = len(data) - _out.write('CALL %d\r\n' % length) - _out.write(data) - _out.flush() - raw_answer = _in.readline() - if raw_answer == '': - raise ProtocolError('connection lost while waiting answer ' - 'to CALL %s %s %s %s' % (self._id, - method, - args, - kwargs)) - answer_head = raw_answer.strip().split() - if answer_head[0] not in ('REF', 'VAL', 'EXC', 'NONE'): - head = ' '.join(answer_head) - raise ProtocolError('Unknown answer header [%r]' % head) - length = int(answer_head[1]) - if answer_head[0] == 'REF': - if proxy_class is None: - raise ProtocolError('unexpected REF message') - obj_id = int(answer_head[2]) - return proxy_class(obj_id, self._pipe) - elif answer_head[0] == 'NONE': - return None - else: - data = _in.read(length) - value = loads(data) - if answer_head[0] == 'VAL': - return value - else: # 'EXC': - exc_class_name, args = value - exc = globals()[exc_class_name](*args) - raise exc - except IOError, exc: - raise ProtocolError('IOError %s' % exc) - - @classmethod - def _new(cls, pipe, args, kwargs): - if cls.remote_class is None: - raise TypeError('%s is note remotely instanciable' % cls.__name__) - try: - data = dumps((cls.remote_class, args, kwargs), HIGHEST_PROTOCOL) - length = len(data) - pipe.stdin.write('NEW %d\r\n' % length) - pipe.stdin.write(data) - pipe.stdin.flush() - raw_answer = pipe.stdout.readline() - if raw_answer == '': - raise ProtocolError('connection lost while waiting answer ' - 'to NEW %s %s %s' % (cls.remote_class, - args, - kwargs)) - answer_head = raw_answer.strip().split() - if answer_head[0] not in ('REF', 'EXC'): - head = ' '.join(answer_head) - raise ProtocolError('Unknown answer header [%r]' % head) - length = int(answer_head[1]) - if answer_head[0] == 'REF': - obj_id = int(answer_head[2]) - return cls(obj_id, pipe) - else: # exception - data = pipe.stdout.read(length) - value = loads(data) - exc_class_name, args = value - exc = globals()[exc_class_name](*args) - raise exc - except IOError, exc: - raise ProtocolError('IOError %s' % exc) - - def __del__(self): - data = dumps(self._id, HIGHEST_PROTOCOL) - try: - self._pipe.stdin.write('DEL %d\r\n' % len(data)) - self._pipe.stdin.write(data) - self._pipe.stdin.flush() - except IOError: - pass - - def __getattr__(self, name): - return self._call('__getattr__', (name,)) - -class ConnectionProxy(Proxy): - remote_class = 'RemoteConnection' - @classmethod - def _new_connection(cls, *args, **kwargs): - cmd = [sys.executable, '-u', __file__] - pipe = subprocess.Popen(cmd, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=sys.stderr) - return cls._new(pipe, args, kwargs) - - def cursor(self): - return self._call('cursor', proxy_class=CursorProxy) - - def close(self): - self._call('close') - - def commit(self): - self._call('commit') - - def rollback(self): - self._call('rollback') - - #pycopg2 specific - def set_isolation_level(self, *args, **kwargs): - self._call('set_isolation_level', args, kwargs) - - -class CursorProxy(Proxy): - def execute(self, sql, params=()): - self._call('execute', (sql, params)) - - def executemany(self, sql, params=()): - self._call('executemany', (sql, params)) - - def fetchone(self): - return self._call('fetchone', proxy_class=RowProxy) - - def fetchmany(self, size): - rows = self._call('fetchmany', (size,), proxy_class=RowListProxy) - if isinstance(rows, RowListProxy): - rows = [r for r in rows] - return rows - - def fetchall(self): - rows = self._call('fetchall', proxy_class=RowListProxy) - if isinstance(rows, RowListProxy): - rows = [r for r in rows] - return rows - - def close(self): - return self._call('close') - - # pyodbc specific - def tables(self): - return self._call('tables') - -class RowProxy(Proxy): - def __iter__(self): - i = 0 - while True: - try: - col = self[i] - except IndexError: - break - yield col - i += 1 - - def __getitem__(self, index): - return self._call('__getitem__', (index,), proxy_class=BinaryProxy) - -class BinaryProxy(Proxy): - def getbinary(self): - return self._call('getvalue') - -class RowListProxy(Proxy): - def __iter__(self): - i = 0 - while True: - try: - row = self[i] - except IndexError: - break - yield row - i += 1 - - def __getitem__(self, index): - return self._call('__getitem__', (index,), proxy_class=RowProxy) - -if __name__ == '__main__': - import os - #log = open('/dev/null', 'a') - log = sys.stderr - import traceback - from logilab.common._pyodbcwrap import Binary # otherwise the isinstance test below will fail - - class RemoteControler(object): - def __init__(self, read=sys.stdin, write=sys.stdout): - self.objects = {} - self.input = read - self.output = write - self.__objcount = 0 - - def unregister(self, obj_id): - try: - del self.objects[obj_id] - #print >> log, 'DELETED', obj_id - except KeyError: - pass - - def register(self, obj): - self.__objcount += 1 - self.objects[self.__objcount] = obj - obj.obj_id = self.__objcount - return self.__objcount - - def control_loop(self): - while True: - try: - line = self.input.readline() - except KeyboardInterrupt: - break - if line == '': - break - command, length = line.strip().split() - #print >> log, command - length = int(length) - if length > 0: - try: - data = loads(self.input.read(length)) - #print >> log, data - except KeyboardInterrupt: - break - else: - data = None - - if command == 'CALL': - obj_id, method, args, kwargs = data - obj = self.objects[obj_id] - meth = getattr(obj, method) - try: - result = meth(*args, **kwargs) - #print >> log, 'result:', result - if result is None: - msg_head = 'NONE 0\r\n' - msg_data = '' - elif isinstance(result, RemoteWrapper): - msg_head = 'REF 0 %d\r\n' % result.obj_id - msg_data = '' - else: - msg_data = dumps(result) - msg_head = 'VAL %d\r\n' % len(msg_data) - except Exception, exc: - if not isinstance(exc, IndexError): - traceback.print_exc(file=sys.stderr) - msg_data = dumps((exc.__class__.__name__, exc.args)) - msg_head = 'EXC %d\r\n' % len(msg_data) - #print >> log, 'EXC' - elif command == 'NEW': - class_name, args, kwargs = data - klass = globals()[class_name] - try: - result = klass(*args, **kwargs) - msg_head = 'REF 0 %d\r\n' % result.obj_id - msg_data = '' - except Exception, exc: - traceback.print_exc(file=sys.stderr) - msg_data = dumps((exc.__class__.__name__, exc.args)) - msg_head = 'EXC %d\r\n' % len(msg_data) - - elif command == 'DEL': - self.unregister(data) - msg_head = None - msg_data = None - - if msg_head is not None: - try: - self.output.write(msg_head) - self.output.flush() - if msg_data: - self.output.write(msg_data) - self.output.flush() - except IOError: - break - #print >> log, 'STATS:', len(self.objects), 'objs alive' - - class RemoteWrapper(object): - def __init__(self, wrapped): - self.wrapped = wrapped - REMOTE_CONTROLER.register(self) - - def __getattr__(self, name): - return getattr(self.wrapped, name) - - def __del__(self): - if REMOTE_CONTROLER is not None: - REMOTE_CONTROLER.unregister(self.obj_id) - - def __str__(self): - return '<%s %d>' % (self.__class__.__name__, self.obj_id) - - class RemoteConnection(RemoteWrapper): - def __init__(self, *args, **kwargs): - cnx = dbapimodule.connect(*args, **kwargs) - super(RemoteConnection, self).__init__(cnx) - - def cursor(self): - return RemoteCursor(self.wrapped.cursor()) - - - - class RemoteCursor(RemoteWrapper): - def __init__(self, cursor): - super(RemoteCursor, self).__init__(cursor) - - def _require_row_wrap(self): - return True - for col_desc in self.wrapped.description: - if col_desc[1] == dbapimodule.BINARY: - #print >> log, 'require wrap' - return True - return False - - def execute(self, sql, params): - #print >>log, sql, params - if isinstance(params, (tuple, list)): - tmp_params = [] - for p in params: - if isinstance(p, Binary): - p = dbapimodule.Binary(p.value) - tmp_params.append(p) - params = tmp_params - elif isinstance(params, dict): - for k,v in params.iteritems(): - #print >> log, k, v, Binary, isinstance(v, Binary) - if isinstance(v, Binary): - params[k] = dbapimodule.Binary(v.value) - #print >>log, sql, params - self.wrapped.execute(sql, params) - - def fetchone(self): - row = self.wrapped.fetchone() - if self._require_row_wrap(): - return RemoteRow(row) - else: - return row - - def fetchall(self): - rows = self.wrapped.fetchall() - #print >> log, 'fetchall', rows - if self._require_row_wrap(): - return RemoteRowList(rows) - else: - return rows - - def fetchmany(self, size): - rows = self.wrapped.fetchmany(size) - if self._require_row_wrap(): - return RemoteRowList(rows) - else: - return rows - - def tables(self): - self.wrapped.tables() - - - class RemoteRow(RemoteWrapper): - def __init__(self, row): - super(RemoteRow, self).__init__(row) - - def __getitem__(self, index): - #print >>log, 'getitem', self, self.wrapped, index - data = self.wrapped[index] - if isinstance(data, buffer): - data = RemoteBinary(data) - return data - - class RemoteRowList(RemoteWrapper): - def __init__(self, rowlist): - rows = [RemoteRow(r) for r in rowlist] - super(RemoteRowList, self).__init__(rows) - - def __getitem__(self, index): - #print >>log, 'getitem', self, self.wrapped, index - return self.wrapped[index] - - class RemoteBinary(RemoteWrapper): - def __init__(self, binary): - super(RemoteBinary, self).__init__(binary) - - def getvalue(self): - #print >>log, 'getvalue', self - return self.wrapped[:] - - REMOTE_CONTROLER = RemoteControler() - REMOTE_CONTROLER.control_loop() @@ -313,32 +313,6 @@ class _Psycopg2Adapter(_PsycopgAdapter): #import cStringIO #extensions.register_adapter(cStringIO.StringIO, adapt_stringio) -class _pyodbcwrappedPsycoPg2Adapter(_Psycopg2Adapter): - """ - used to test and debug _pyodbcwrap under Linux. - No sense in using this class in production. - """ - def process_value(self, value, description, encoding='utf-8', binarywrap=None): - #return _Psycopg2Adapter.process_value(self, value, description, encoding, binarywrap) - # if the dbapi module isn't supporting type codes, override to return value directly - typecode = description[1] - assert typecode is not None, self - if typecode == self.STRING: - if isinstance(value, str): - return unicode(value, encoding, 'replace') - elif typecode == self.BOOLEAN: - return bool(value) - elif typecode == self.BINARY and not binarywrap is None: - #print "*"*500 - #print 'binary', binarywrap(value.getbinary()) - return binarywrap(value.getbinary()) - elif typecode == self.UNKNOWN: - # may occurs on constant selection for instance (e.g. SELECT 'hop') - # with postgresql at least - if isinstance(value, str): - return unicode(value, encoding, 'replace') - return value - class _PgsqlAdapter(DBAPIAdapter): """Simple pyPgSQL Adapter to DBAPI @@ -792,27 +766,6 @@ class _PyodbcAdapter(_BaseSqlServerAdapter): variables['autocommit'] = True return self._native_module.connect(**variables) -class _PyodbcAdapterMT(_PyodbcAdapter): - def process_value(self, value, description, encoding='utf-8', binarywrap=None): - # if the dbapi module isn't supporting type codes, override to return value directly - typecode = description[1] - assert typecode is not None, self - if typecode == self.STRING: - if isinstance(value, str): - return unicode(value, encoding, 'replace') - elif typecode == self.BINARY: # value is a python buffer - if binarywrap is not None: - return binarywrap(value.getbinary()) - else: - return value.getbinary() - elif typecode == self.UNKNOWN: - # may occurs on constant selection for instance (e.g. SELECT 'hop') - # with postgresql at least - if isinstance(value, str): - return unicode(value, encoding, 'replace') - - return value - class _PyodbcSqlServer2000Adapter(_PyodbcAdapter): driver = "SQL Server" @@ -822,15 +775,6 @@ class _PyodbcSqlServer2005Adapter(_PyodbcAdapter): class _PyodbcSqlServer2008Adapter(_PyodbcAdapter): driver = "SQL Server Native Client 10.0" -class _PyodbcSqlServer2000AdapterMT(_PyodbcAdapterMT): - driver = "SQL Server" - -class _PyodbcSqlServer2005AdapterMT(_PyodbcAdapterMT): - driver = "SQL Server Native Client 10.0" - -class _PyodbcSqlServer2008AdapterMT(_PyodbcAdapterMT): - driver = "SQL Server Native Client 10.0" - class _AdodbapiAdapter(_BaseSqlServerAdapter): def _connect(self, host='', database='', user='', password='', port=None, extra_args=None): @@ -859,25 +803,19 @@ class _AdodbapiSqlServer2008Adapter(_AdodbapiAdapter): PREFERED_DRIVERS = { - "postgres" : [ #'logilab.common._pyodbcwrap', - 'psycopg2', 'psycopg', 'pgdb', 'pyPgSQL.PgSQL', + "postgres" : [ 'psycopg2', 'psycopg', 'pgdb', 'pyPgSQL.PgSQL', ], "mysql" : [ 'MySQLdb', ], # 'pyMySQL.MySQL, ], "sqlite" : ['pysqlite2.dbapi2', 'sqlite', 'sqlite3',], "sqlserver2000" : ['pyodbc', 'adodbapi', ], "sqlserver2005" : ['pyodbc', 'adodbapi', ], "sqlserver2008" : ['pyodbc', 'adodbapi', ], - # for use in multithreaded applications, e.g. CubicWeb - "sqlserver2000_mt" : ['logilab.common._pyodbcwrap'], - "sqlserver2005_mt" : ['logilab.common._pyodbcwrap'], - "sqlserver2008_mt" : ['logilab.common._pyodbcwrap'], } _ADAPTERS = { 'postgres' : { 'pgdb' : _PgdbAdapter, 'psycopg' : _PsycopgAdapter, 'psycopg2' : _Psycopg2Adapter, - #'logilab.common._pyodbcwrap': _pyodbcwrappedPsycoPg2Adapter, 'pyPgSQL.PgSQL' : _PgsqlAdapter, }, 'mysql' : { 'MySQLdb' : _MySqlDBAdapter, }, @@ -890,9 +828,6 @@ _ADAPTERS = { 'pyodbc': _PyodbcSqlServer2005Adapter}, "sqlserver2008" : {'adodbapi': _AdodbapiSqlServer2008Adapter, 'pyodbc': _PyodbcSqlServer2008Adapter}, - "sqlserver2000_mt" : {'logilab.common._pyodbcwrap': _PyodbcSqlServer2000AdapterMT}, - "sqlserver2005_mt" : {'logilab.common._pyodbcwrap': _PyodbcSqlServer2005AdapterMT}, - "sqlserver2008_mt" : {'logilab.common._pyodbcwrap': _PyodbcSqlServer2008AdapterMT}, } # _AdapterDirectory could be more generic by adding a 'protocol' parameter |