summaryrefslogtreecommitdiff
path: root/db.py
diff options
context:
space:
mode:
authorSylvain Th?nault <sylvain.thenault@logilab.fr>2010-03-08 18:04:02 +0100
committerSylvain Th?nault <sylvain.thenault@logilab.fr>2010-03-08 18:04:02 +0100
commit42d891ec4457a83e6422dfd2bf6eb391fc029010 (patch)
treeef2fa62823fb07801060d98a5a58006ac007f749 /db.py
parent49e8fa0d2010387ac4a41a43292038c17eab62be (diff)
downloadlogilab-common-42d891ec4457a83e6422dfd2bf6eb391fc029010.tar.gz
adbh, db, sqlgen modules moved to the new logilab-database package
Diffstat (limited to 'db.py')
-rw-r--r--db.py903
1 files changed, 4 insertions, 899 deletions
diff --git a/db.py b/db.py
index 3d31444..2456292 100644
--- a/db.py
+++ b/db.py
@@ -22,904 +22,9 @@ helper for your database using the `get_adv_func_helper` function.
"""
__docformat__ = "restructuredtext en"
-import sys
-import re
from warnings import warn
-import threading
-import datetime
+warn('this module is deprecated, use logilab.db instead',
+ DeprecationWarning, stacklevel=1)
-import logilab.common as lgc
-from logilab.common.deprecation import obsolete
-
-try:
- from mx.DateTime import DateTimeType, DateTimeDeltaType, strptime
- HAS_MX_DATETIME = True
-except:
- HAS_MX_DATETIME = False
-
-__all__ = ['get_dbapi_compliant_module',
- 'get_connection', 'set_prefered_driver',
- 'PyConnection', 'PyCursor',
- 'UnknownDriver', 'NoAdapterFound',
- ]
-
-class UnknownDriver(Exception):
- """raised when a unknown driver is given to get connection"""
-
-class NoAdapterFound(Exception):
- """Raised when no Adapter to DBAPI was found"""
- def __init__(self, obj, objname=None, protocol='DBAPI'):
- if objname is None:
- objname = obj.__name__
- Exception.__init__(self, "Could not adapt %s to protocol %s" %
- (objname, protocol))
- self.adapted_obj = obj
- self.objname = objname
- self._protocol = protocol
-
-
-def _import_driver_module(driver, drivers, imported_elements=None, quiet=True):
- """Imports the first module found in 'drivers' for 'driver'
-
- :rtype: tuple
- :returns: the tuple module_object, module_name where module_object
- is the dbapi module, and modname the module's name
- """
- if not driver in drivers:
- raise UnknownDriver(driver)
- imported_elements = imported_elements or []
- for modname in drivers[driver]:
- try:
- if not quiet:
- print >> sys.stderr, 'Trying %s' % modname
- module = __import__(modname, globals(), locals(), imported_elements)
- break
- except ImportError:
- if not quiet:
- print >> sys.stderr, '%s is not available' % modname
- continue
- else:
- raise ImportError('Unable to import a %s module' % driver)
- if not imported_elements:
- for part in modname.split('.')[1:]:
- module = getattr(module, part)
- return module, modname
-
-
-## Connection and cursor wrappers #############################################
-
-class SimpleConnectionWrapper:
- """A simple connection wrapper in python to decorated C-level connections
- with additional attributes
- """
- def __init__(self, cnx):
- """Wraps the original connection object"""
- self._cnx = cnx
-
- # XXX : Would it work if only __getattr__ was defined
- def cursor(self):
- """Wraps cursor()"""
- return self._cnx.cursor()
-
- def commit(self):
- """Wraps commit()"""
- return self._cnx.commit()
-
- def rollback(self):
- """Wraps rollback()"""
- return self._cnx.rollback()
-
- def close(self):
- """Wraps close()"""
- return self._cnx.close()
-
- def __getattr__(self, attrname):
- return getattr(self._cnx, attrname)
-
-class PyConnection(SimpleConnectionWrapper):
- """A simple connection wrapper in python, generating wrapper for cursors as
- well (useful for profiling)
- """
- def __init__(self, cnx):
- """Wraps the original connection object"""
- self._cnx = cnx
-
- def cursor(self):
- """Wraps cursor()"""
- return PyCursor(self._cnx.cursor())
-
-
-
-class PyCursor:
- """A simple cursor wrapper in python (useful for profiling)"""
- def __init__(self, cursor):
- self._cursor = cursor
-
- def close(self):
- """Wraps close()"""
- return self._cursor.close()
-
- def execute(self, *args, **kwargs):
- """Wraps execute()"""
- return self._cursor.execute(*args, **kwargs)
-
- def executemany(self, *args, **kwargs):
- """Wraps executemany()"""
- return self._cursor.executemany(*args, **kwargs)
-
- def fetchone(self, *args, **kwargs):
- """Wraps fetchone()"""
- return self._cursor.fetchone(*args, **kwargs)
-
- def fetchmany(self, *args, **kwargs):
- """Wraps execute()"""
- return self._cursor.fetchmany(*args, **kwargs)
-
- def fetchall(self, *args, **kwargs):
- """Wraps fetchall()"""
- return self._cursor.fetchall(*args, **kwargs)
-
- def __getattr__(self, attrname):
- return getattr(self._cursor, attrname)
-
-
-## Adapters list ##############################################################
-
-class DBAPIAdapter:
- """Base class for all DBAPI adapters"""
- UNKNOWN = None
-
- def __init__(self, native_module, pywrap=False):
- """
- :type native_module: module
- :param native_module: the database's driver adapted module
- """
- self._native_module = native_module
- self._pywrap = pywrap
- # optimization: copy type codes from the native module to this instance
- # since the .process_value method may be heavily used
- for typecode in ('STRING', 'BOOLEAN', 'BINARY', 'DATETIME', 'NUMBER',
- 'UNKNOWN'):
- try:
- setattr(self, typecode, getattr(self, typecode))
- except AttributeError:
- print 'WARNING: %s adapter has no %s type code' % (self, typecode)
-
- def connect(self, host='', database='', user='', password='', port='', extra_args=None):
- """Wraps the native module connect method"""
- kwargs = {'host' : host, 'port' : port, 'database' : database,
- 'user' : user, 'password' : password}
- cnx = self._native_module.connect(**kwargs)
- return self._wrap_if_needed(cnx, user)
-
- def _wrap_if_needed(self, cnx, user):
- """Wraps the connection object if self._pywrap is True, and returns it
- If false, returns the original cnx object
- """
- if self._pywrap:
- cnx = PyConnection(cnx)
- try:
- cnx.logged_user = user
- except AttributeError:
- # C or __slots__ object
- cnx = SimpleConnectionWrapper(cnx)
- cnx.logged_user = user
- return cnx
-
- def __getattr__(self, attrname):
- return getattr(self._native_module, attrname)
-
- def process_value(self, value, description, encoding='utf-8', binarywrap=None):
- # if the dbapi module isn't supporting type codes, override to return value directly
- typecode = description[1]
- assert typecode is not None, self
- if typecode == self.STRING:
- if isinstance(value, str):
- return unicode(value, encoding, 'replace')
- elif typecode == self.BOOLEAN:
- return bool(value)
- elif typecode == self.BINARY and not binarywrap is None:
- return binarywrap(value)
- elif typecode == self.UNKNOWN:
- # may occurs on constant selection for instance (e.g. SELECT 'hop')
- # with postgresql at least
- if isinstance(value, str):
- return unicode(value, encoding, 'replace')
-
-## elif typecode == dbapimod.DATETIME:
-## pass
-## elif typecode == dbapimod.NUMBER:
-## pass
-## else:
-## self.warning("type -%s- unknown for %r (%s) ",
-## typecode, value, type(value))
- return value
-
-
-# Postgresql #########################################################
-
-class _PgdbAdapter(DBAPIAdapter):
- """Simple PGDB Adapter to DBAPI (pgdb modules lacks Binary() and NUMBER)
- """
- def __init__(self, native_module, pywrap=False):
- DBAPIAdapter.__init__(self, native_module, pywrap)
- self.NUMBER = native_module.pgdbType('int2', 'int4', 'serial',
- 'int8', 'float4', 'float8',
- 'numeric', 'bool', 'money')
-
- def connect(self, host='', database='', user='', password='', port='', extra_args=None):
- """Wraps the native module connect method"""
- if port:
- warn("pgdb doesn't support 'port' parameter in connect()", UserWarning)
- kwargs = {'host' : host, 'database' : database,
- 'user' : user, 'password' : password}
- cnx = self._native_module.connect(**kwargs)
- return self._wrap_if_needed(cnx, user)
-
-
-class _PsycopgAdapter(DBAPIAdapter):
- """Simple Psycopg Adapter to DBAPI (cnx_string differs from classical ones)
- """
- def connect(self, host='', database='', user='', password='', port='', extra_args=None):
- """Handles psycopg connection format"""
- if host:
- cnx_string = 'host=%s dbname=%s user=%s' % (host, database, user)
- else:
- cnx_string = 'dbname=%s user=%s' % (database, user)
- if port:
- cnx_string += ' port=%s' % port
- if password:
- cnx_string = '%s password=%s' % (cnx_string, password)
- cnx = self._native_module.connect(cnx_string)
- cnx.set_isolation_level(1)
- return self._wrap_if_needed(cnx, user)
-
-
-class _Psycopg2Adapter(_PsycopgAdapter):
- """Simple Psycopg2 Adapter to DBAPI (cnx_string differs from classical ones)
- """
- # not defined in psycopg2.extensions
- # "select typname from pg_type where oid=705";
- UNKNOWN = 705
-
- def __init__(self, native_module, pywrap=False):
- from psycopg2 import extensions
- self.BOOLEAN = extensions.BOOLEAN
- DBAPIAdapter.__init__(self, native_module, pywrap)
- self._init_psycopg2()
-
- def _init_psycopg2(self):
- """initialize psycopg2 to use mx.DateTime for date and timestamps
- instead for datetime.datetime"""
- psycopg2 = self._native_module
- if hasattr(psycopg2, '_lc_initialized'):
- return
- psycopg2._lc_initialized = 1
- # use mxDateTime instead of datetime if available
- if HAS_MX_DATETIME and lgc.USE_MX_DATETIME:
- from psycopg2 import extensions
- extensions.register_type(psycopg2._psycopg.MXDATETIME)
- extensions.register_type(psycopg2._psycopg.MXINTERVAL)
- extensions.register_type(psycopg2._psycopg.MXDATE)
- extensions.register_type(psycopg2._psycopg.MXTIME)
- # StringIO/cStringIO adaptation
- # XXX (syt) todo, see my december discussion on the psycopg2 list
- # for a working solution
- #def adapt_stringio(stringio):
- # print 'ADAPTING', stringio
- # return psycopg2.Binary(stringio.getvalue())
- #import StringIO
- #extensions.register_adapter(StringIO.StringIO, adapt_stringio)
- #import cStringIO
- #extensions.register_adapter(cStringIO.StringIO, adapt_stringio)
-
-
-class _PgsqlAdapter(DBAPIAdapter):
- """Simple pyPgSQL Adapter to DBAPI
- """
- def connect(self, host='', database='', user='', password='', port='', extra_args=None):
- """Handles psycopg connection format"""
- kwargs = {'host' : host, 'port': port or None,
- 'database' : database,
- 'user' : user, 'password' : password or None}
- cnx = self._native_module.connect(**kwargs)
- return self._wrap_if_needed(cnx, user)
-
-
- def Binary(self, string):
- """Emulates the Binary (cf. DB-API) function"""
- return str
-
- def __getattr__(self, attrname):
- # __import__('pyPgSQL.PgSQL', ...) imports the toplevel package
- return getattr(self._native_module, attrname)
-
-
-# Sqlite #############################################################
-
-class _PySqlite2Adapter(DBAPIAdapter):
- """Simple pysqlite2 Adapter to DBAPI
- """
- # no type code in pysqlite2
- BINARY = 'XXX'
- STRING = 'XXX'
- DATETIME = 'XXX'
- NUMBER = 'XXX'
- BOOLEAN = 'XXX'
-
- def __init__(self, native_module, pywrap=False):
- DBAPIAdapter.__init__(self, native_module, pywrap)
- self._init_pysqlite2()
-
- def _init_pysqlite2(self):
- """initialize pysqlite2 to use mx.DateTime for date and timestamps"""
- sqlite = self._native_module
- if hasattr(sqlite, '_lc_initialized'):
- return
- sqlite._lc_initialized = 1
-
- # bytea type handling
- from StringIO import StringIO
- def adapt_bytea(data):
- return data.getvalue()
- sqlite.register_adapter(StringIO, adapt_bytea)
- def convert_bytea(data, Binary=sqlite.Binary):
- return Binary(data)
- sqlite.register_converter('bytea', convert_bytea)
-
- # boolean type handling
- def adapt_boolean(bval):
- return str(bval).upper()
- sqlite.register_adapter(bool, adapt_boolean)
- def convert_boolean(ustr):
- if ustr.upper() in ('F', 'FALSE'):
- return False
- return True
- sqlite.register_converter('boolean', convert_boolean)
-
-
- # decimal type handling
- from decimal import Decimal
- def adapt_decimal(data):
- return str(data)
- sqlite.register_adapter(Decimal, adapt_decimal)
-
- def convert_decimal(data):
- return Decimal(data)
- sqlite.register_converter('decimal', convert_decimal)
-
- # date/time types handling
- if HAS_MX_DATETIME and lgc.USE_MX_DATETIME:
- def adapt_mxdatetime(mxd):
- return mxd.strftime('%Y-%m-%d %H:%M:%S')
- sqlite.register_adapter(DateTimeType, adapt_mxdatetime)
- def adapt_mxdatetimedelta(mxd):
- return mxd.strftime('%H:%M:%S')
- sqlite.register_adapter(DateTimeDeltaType, adapt_mxdatetimedelta)
-
- def convert_mxdate(ustr):
- return strptime(ustr, '%Y-%m-%d %H:%M:%S')
- sqlite.register_converter('date', convert_mxdate)
- def convert_mxdatetime(ustr):
- return strptime(ustr, '%Y-%m-%d %H:%M:%S')
- sqlite.register_converter('timestamp', convert_mxdatetime)
- def convert_mxtime(ustr):
- try:
- return strptime(ustr, '%H:%M:%S')
- except:
- # DateTime used as Time?
- return strptime(ustr, '%Y-%m-%d %H:%M:%S')
- sqlite.register_converter('time', convert_mxtime)
- # else use datetime.datetime
- else:
- from datetime import time, timedelta
- # datetime.time
- def adapt_time(data):
- return data.strftime('%H:%M:%S')
- sqlite.register_adapter(time, adapt_time)
- def convert_time(data):
- return time(*[int(i) for i in data.split(':')])
- sqlite.register_converter('time', convert_time)
- # datetime.timedelta
- def adapt_timedelta(data):
- '''the sign in the result only refers to the number of days. day
- fractions always indicate a positive offset. this may seem strange,
- but it is the same that is done by the default __str__ method. we
- redefine it here anyways (instead of simply doing "str") because we
- do not want any "days," string within the representation.
- '''
- days = data.days
- frac = data - timedelta(days)
- return "%d %s" % (data.days, frac)
- sqlite.register_adapter(timedelta, adapt_timedelta)
- def convert_timedelta(data):
- parts = data.split(" ")
- if len(parts) == 2:
- daypart, timepart = parts
- days = int(daypart)
- else:
- days = 0
- timepart = parts[-1]
- timepart_full = timepart.split(".")
- hours, minutes, seconds = map(int, timepart_full[0].split(":"))
- if len(timepart_full) == 2:
- microseconds = int(float("0." + timepart_full[1]) * 1000000)
- else:
- microseconds = 0
- data = timedelta(days,
- hours*3600 + minutes*60 + seconds,
- microseconds)
- return data
- sqlite.register_converter('interval', convert_timedelta)
-
-
- def connect(self, host='', database='', user='', password='', port=None, extra_args=None):
- """Handles sqlite connection format"""
- sqlite = self._native_module
-
- class PySqlite2Cursor(sqlite.Cursor):
- """cursor adapting usual dict format to pysqlite named format
- in SQL queries
- """
- def _replace_parameters(self, sql, kwargs):
- if isinstance(kwargs, dict):
- return re.sub(r'%\(([^\)]+)\)s', r':\1', sql)
- # XXX dumb
- return re.sub(r'%s', r'?', sql)
-
- def execute(self, sql, kwargs=None):
- if kwargs is None:
- self.__class__.__bases__[0].execute(self, sql)
- else:
- final_sql = self._replace_parameters(sql, kwargs)
- self.__class__.__bases__[0].execute(self, final_sql , kwargs)
-
- def executemany(self, sql, kwargss):
- if not isinstance(kwargss, (list, tuple)):
- kwargss = tuple(kwargss)
- self.__class__.__bases__[0].executemany(self, self._replace_parameters(sql, kwargss[0]), kwargss)
-
- class PySqlite2CnxWrapper:
- def __init__(self, cnx):
- self._cnx = cnx
-
- def cursor(self):
- return self._cnx.cursor(PySqlite2Cursor)
- def __getattr__(self, attrname):
- return getattr(self._cnx, attrname)
- cnx = sqlite.connect(database, detect_types=sqlite.PARSE_DECLTYPES)
- return self._wrap_if_needed(PySqlite2CnxWrapper(cnx), user)
-
- def process_value(self, value, description, encoding='utf-8', binarywrap=None):
- if binarywrap is not None and isinstance(value, self._native_module.Binary):
- return binarywrap(value)
- return value # no type code support, can't do anything
-
-
-class _SqliteAdapter(DBAPIAdapter):
- """Simple sqlite Adapter to DBAPI
- """
- def __init__(self, native_module, pywrap=False):
- DBAPIAdapter.__init__(self, native_module, pywrap)
- self.DATETIME = native_module.TIMESTAMP
-
- def connect(self, host='', database='', user='', password='', port='', extra_args=None):
- """Handles sqlite connection format"""
- cnx = self._native_module.connect(database)
- return self._wrap_if_needed(cnx, user)
-
-
-# Mysql ##############################################################
-
-class _MySqlDBAdapter(DBAPIAdapter):
- """Simple mysql Adapter to DBAPI
- """
- BOOLEAN = 'XXX' # no specific type code for boolean
-
- def __init__(self, native_module, pywrap=False):
- DBAPIAdapter.__init__(self, native_module, pywrap)
- self._init_module()
-
- def _init_module(self):
- """initialize mysqldb to use mx.DateTime for date and timestamps"""
- natmod = self._native_module
- if hasattr(natmod, '_lc_initialized'):
- return
- natmod._lc_initialized = 1
- # date/time types handling
- if HAS_MX_DATETIME and lgc.USE_MX_DATETIME:
- from MySQLdb import times
- from mx import DateTime as mxdt
- times.Date = times.date = mxdt.Date
- times.Time = times.time = mxdt.Time
- times.Timestamp = times.datetime = mxdt.DateTime
- times.TimeDelta = times.timedelta = mxdt.TimeDelta
- times.DateTimeType = mxdt.DateTimeType
- times.DateTimeDeltaType = mxdt.DateTimeDeltaType
-
- def connect(self, host='', database='', user='', password='', port=None,
- unicode=True, charset='utf8', extra_args=None):
- """Handles mysqldb connection format
- the unicode named argument asks to use Unicode objects for strings
- in result sets and query parameters
- """
- kwargs = {'host' : host or '', 'db' : database,
- 'user' : user, 'passwd' : password,
- 'use_unicode' : unicode}
- # MySQLdb doesn't support None port
- if port:
- kwargs['port'] = int(port)
- cnx = self._native_module.connect(**kwargs)
- if unicode:
- if charset.lower() == 'utf-8':
- charset = 'utf8'
- cnx.set_character_set(charset)
- return self._wrap_if_needed(cnx, user)
-
- def process_value(self, value, description, encoding='utf-8', binarywrap=None):
- typecode = description[1]
- # hack to differentiate mediumtext (String) and tinyblob/longblog
- # (Password/Bytes) which are all sharing the same type code :(
- if typecode == self.BINARY:
- if hasattr(value, 'tostring'): # may be an array
- value = value.tostring()
- maxsize = description[3]
- # mediumtext can hold up to (2**24 - 1) characters (16777215)
- # but if utf8 is set, each character is stored on 3 bytes words,
- # so we have to test for 3 * (2**24 - 1) (i.e. 50331645)
- # XXX: what about other encodings ??
- if maxsize in (16777215, 50331645): # mediumtext (2**24 - 1)
- if isinstance(value, str):
- return unicode(value, encoding, 'replace')
- return value
- #if maxsize == 255: # tinyblob (2**8 - 1)
- # return value
- if binarywrap is None:
- return value
- return binarywrap(value)
- return DBAPIAdapter.process_value(self, value, description, encoding, binarywrap)
-
- def type_code_test(self, cursor):
- for typename in ('STRING', 'BOOLEAN', 'BINARY', 'DATETIME', 'NUMBER'):
- print typename, getattr(self, typename)
- try:
- cursor.execute("""CREATE TABLE _type_code_test(
- varchar_field varchar(50),
- text_field text unicode,
- mtext_field mediumtext,
- binary_field tinyblob,
- blob_field blob,
- lblob_field longblob
- )""")
- cursor.execute("INSERT INTO _type_code_test VALUES ('1','2','3','4', '5', '6')")
- cursor.execute("SELECT * FROM _type_code_test")
- descr = cursor.description
- print 'db fields type codes'
- for i, name in enumerate(('varchar', 'text', 'mediumtext',
- 'binary', 'blob', 'longblob')):
- print name, descr[i]
- finally:
- cursor.execute("DROP TABLE _type_code_test")
-
-class _BaseSqlServerAdapter(DBAPIAdapter):
- driver = 'Override in subclass'
- _use_trusted_connection = False
- _use_autocommit = False
- _fetch_lock = threading.Lock()
-
- @classmethod
- def use_trusted_connection(klass, use_trusted=False):
- """
- pass True to this class method to enable Windows
- Authentication (i.e. passwordless auth)
- """
- klass._use_trusted_connection = use_trusted
-
- @classmethod
- def use_autocommit(klass, use_autocommit=False):
- """
- pass True to this class method to enable autocommit (required
- for backup and restore)
- """
- klass._use_autocommit = use_autocommit
-
- @classmethod
- def _process_extra_args(klass, arguments):
- arguments = arguments.lower().split(';')
- if 'trusted_connection' in arguments:
- klass.use_trusted_connection(True)
- if 'autocommit' in arguments:
- klass.use_autocommit(True)
-
- def connect(self, host='', database='', user='', password='', port=None, extra_args=None):
- """Handles pyodbc connection format
-
- If extra_args is not None, it is expected to be a string
- containing a list of semicolon separated keywords. The only
- keyword currently supported is Trusted_Connection : if found
- the connection string to the database will include
- Trusted_Connection=yes (which for SqlServer will trigger using
- Windows Authentication, and therefore no login/password is
- required.
- """
- lock = self._fetch_lock
- class SqlServerCursor(object):
- """cursor adapting usual dict format to pyodbc/adobdapi format
- in SQL queries
- """
- def __init__(self, cursor):
- self._cursor = cursor
- self._fetch_lock = lock
- def _replace_parameters(self, sql, kwargs, _date_class=datetime.date):
- if isinstance(kwargs, dict):
- new_sql = re.sub(r'%\(([^\)]+)\)s', r'?', sql)
- key_order = re.findall(r'%\(([^\)]+)\)s', sql)
- args = []
- for key in key_order:
- arg = kwargs[key]
- if arg.__class__ == _date_class:
- arg = datetime.datetime.combine(arg, datetime.time(0))
- args.append(arg)
-
- return new_sql, tuple(args)
-
- # XXX dumb
- return re.sub(r'%s', r'?', sql), kwargs
-
- def execute(self, sql, kwargs=None):
- if kwargs is None:
- self._cursor.execute(sql)
- else:
- final_sql, args = self._replace_parameters(sql, kwargs)
- self._cursor.execute(final_sql , args)
- def executemany(self, sql, kwargss):
- if not isinstance(kwargss, (list, tuple)):
- kwargss = tuple(kwargss)
- self._cursor.executemany(self, self._replace_parameters(sql, kwargss[0]), kwargss)
-
- def _get_smalldate_columns(self):
- cols = []
- for i, coldef in enumerate(self._cursor.description):
- if coldef[1] is datetime.datetime and coldef[3] == 16:
- cols.append(i)
- return cols
-
- def fetchone(self):
- smalldate_cols = self._get_smalldate_columns()
- self._fetch_lock.acquire()
- try:
- row = self._cursor.fetchone()
- finally:
- self._fetch_lock.release()
- return self._replace_smalldate(row, smalldate_cols)
-
- def fetchall (self):
- smalldate_cols = self._get_smalldate_columns()
- rows = []
- while True:
- self._fetch_lock.acquire()
- try:
- batch = self._cursor.fetchmany(1024)
- finally:
- self._fetch_lock.release()
- if not batch:
- break
- for row in batch:
- rows.append(self._replace_smalldate(row, smalldate_cols))
- return rows
-
- def _replace_smalldate(self, row, smalldate_cols):
- if smalldate_cols:
- new_row = row[:]
- for col in smalldate_cols:
- new_row[col] = new_row[col].date()
- return new_row
- else:
- return row
- def __getattr__(self, attrname):
- return getattr(self._cursor, attrname)
-
- class SqlServerCnxWrapper:
- def __init__(self, cnx):
- self._cnx = cnx
- def cursor(self):
- return SqlServerCursor(self._cnx.cursor())
- def __getattr__(self, attrname):
- return getattr(self._cnx, attrname)
- cnx = self._connect(host=host, database=database, user=user, password=password, port=port, extra_args=extra_args)
- return self._wrap_if_needed(SqlServerCnxWrapper(cnx), user)
-
- def process_value(self, value, description, encoding='utf-8', binarywrap=None):
- # if the dbapi module isn't supporting type codes, override to return value directly
- typecode = description[1]
- assert typecode is not None, self
- if typecode == self.STRING:
- if isinstance(value, str):
- return unicode(value, encoding, 'replace')
- elif typecode == self.BINARY: # value is a python buffer
- if binarywrap is not None:
- return binarywrap(value[:])
- else:
- return value[:]
- elif typecode == self.UNKNOWN:
- # may occurs on constant selection for instance (e.g. SELECT 'hop')
- # with postgresql at least
- if isinstance(value, str):
- return unicode(value, encoding, 'replace')
-
- return value
-
-class _PyodbcAdapter(_BaseSqlServerAdapter):
- def _connect(self, host='', database='', user='', password='', port=None, extra_args=None):
- if extra_args is not None:
- self._process_extra_args(extra_args)
- cnx_string_bits = ['DRIVER={%(driver)s}']
- variables = {'host' : host,
- 'database' : database,
- 'user' : user, 'password' : password,
- 'driver': self.driver}
- if self._use_trusted_connection:
- variables['Trusted_Connection'] = 'yes'
- del variables['user']
- del variables['password']
- if self._use_autocommit:
- variables['autocommit'] = True
- return self._native_module.connect(**variables)
-
-class _PyodbcSqlServer2000Adapter(_PyodbcAdapter):
- driver = "SQL Server"
-
-class _PyodbcSqlServer2005Adapter(_PyodbcAdapter):
- driver = "SQL Server Native Client 10.0"
-
-class _PyodbcSqlServer2008Adapter(_PyodbcAdapter):
- driver = "SQL Server Native Client 10.0"
-
-class _AdodbapiAdapter(_BaseSqlServerAdapter):
-
- def _connect(self, host='', database='', user='', password='', port=None, extra_args=None):
- if extra_args is not None:
- self._process_extra_args(extra_args)
- if self._use_trusted_connection:
- # this will open a MS-SQL table with Windows authentication
- auth = 'Integrated Security=SSPI'
- else:
- # this set opens a MS-SQL table with SQL authentication
- auth = 'user ID=%s; Password=%s;' % (user, password)
- constr = r"Initial Catalog=%s; Data Source=%s; Provider=SQLOLEDB.1; %s" %(database, host, auth)
- return self._native_module.connect(constr)
-
-class _AdodbapiSqlServer2000Adapter(_AdodbapiAdapter):
- driver = "SQL Server"
-
-class _AdodbapiSqlServer2005Adapter(_AdodbapiAdapter):
- driver = "SQL Server Native Client 10.0"
-
-class _AdodbapiSqlServer2008Adapter(_AdodbapiAdapter):
- driver = "SQL Server Native Client 10.0"
-
-
-## Drivers, Adapters and helpers registries ###################################
-
-
-PREFERED_DRIVERS = {
- "postgres" : [ 'psycopg2', 'psycopg', 'pgdb', 'pyPgSQL.PgSQL',
- ],
- "mysql" : [ 'MySQLdb', ], # 'pyMySQL.MySQL, ],
- "sqlite" : ['pysqlite2.dbapi2', 'sqlite', 'sqlite3',],
- "sqlserver2000" : ['pyodbc', 'adodbapi', ],
- "sqlserver2005" : ['pyodbc', 'adodbapi', ],
- "sqlserver2008" : ['pyodbc', 'adodbapi', ],
- }
-
-_ADAPTERS = {
- 'postgres' : { 'pgdb' : _PgdbAdapter,
- 'psycopg' : _PsycopgAdapter,
- 'psycopg2' : _Psycopg2Adapter,
- 'pyPgSQL.PgSQL' : _PgsqlAdapter,
- },
- 'mysql' : { 'MySQLdb' : _MySqlDBAdapter, },
- 'sqlite' : { 'pysqlite2.dbapi2' : _PySqlite2Adapter,
- 'sqlite' : _SqliteAdapter,
- 'sqlite3' : _PySqlite2Adapter, },
- "sqlserver2000" : {'adodbapi': _AdodbapiSqlServer2000Adapter,
- 'pyodbc': _PyodbcSqlServer2000Adapter},
- "sqlserver2005" : {'adodbapi': _AdodbapiSqlServer2005Adapter,
- 'pyodbc': _PyodbcSqlServer2005Adapter},
- "sqlserver2008" : {'adodbapi': _AdodbapiSqlServer2008Adapter,
- 'pyodbc': _PyodbcSqlServer2008Adapter},
- }
-
-# _AdapterDirectory could be more generic by adding a 'protocol' parameter
-# This one would become an adapter for 'DBAPI' protocol
-class _AdapterDirectory(dict):
- """A simple dict that registers all adapters"""
- def register_adapter(self, adapter, driver, modname):
- """Registers 'adapter' in directory as adapting 'mod'"""
- try:
- driver_dict = self[driver]
- except KeyError:
- self[driver] = {}
-
- # XXX Should we have a list of adapters ?
- driver_dict[modname] = adapter
-
- def adapt(self, database, prefered_drivers = None, pywrap = False):
- """Returns an dbapi-compliant object based for database"""
- prefered_drivers = prefered_drivers or PREFERED_DRIVERS
- module, modname = _import_driver_module(database, prefered_drivers)
- try:
- return self[database][modname](module, pywrap=pywrap)
- except KeyError:
- raise NoAdapterFound(obj=module)
-
- def get_adapter(self, database, modname):
- try:
- return self[database][modname]
- except KeyError:
- raise NoAdapterFound(None, modname)
-
-ADAPTER_DIRECTORY = _AdapterDirectory(_ADAPTERS)
-del _AdapterDirectory
-
-
-## Main functions #############################################################
-
-def set_prefered_driver(database, module, _drivers=PREFERED_DRIVERS):
- """sets the preferred driver module for database
- database is the name of the db engine (postgresql, mysql...)
- module is the name of the module providing the connect function
- syntax is (params_func, post_process_func_or_None)
- _drivers is a optional dictionary of drivers
- """
- try:
- modules = _drivers[database]
- except KeyError:
- raise UnknownDriver('Unknown database %s' % database)
- # Remove module from modules list, and re-insert it in first position
- try:
- modules.remove(module)
- except ValueError:
- raise UnknownDriver('Unknown module %s for %s' % (module, database))
- modules.insert(0, module)
-
-def get_dbapi_compliant_module(driver, prefered_drivers = None, quiet = False,
- pywrap = False):
- """returns a fully dbapi compliant module"""
- try:
- mod = ADAPTER_DIRECTORY.adapt(driver, prefered_drivers, pywrap=pywrap)
- except NoAdapterFound, err:
- if not quiet:
- msg = 'No Adapter found for %s, returning native module'
- print >> sys.stderr, msg % err.objname
- mod = err.adapted_obj
- from logilab.common.adbh import get_adv_func_helper
- mod.adv_func_helper = get_adv_func_helper(driver)
- return mod
-
-def get_connection(driver='postgres', host='', database='', user='',
- password='', port='', quiet=False, drivers=PREFERED_DRIVERS,
- pywrap=False, extra_args=None):
- """return a db connection according to given arguments"""
- module, modname = _import_driver_module(driver, drivers, ['connect'])
- try:
- adapter = ADAPTER_DIRECTORY.get_adapter(driver, modname)
- except NoAdapterFound, err:
- if not quiet:
- msg = 'No Adapter found for %s, using default one' % err.objname
- print >> sys.stderr, msg
- adapted_module = DBAPIAdapter(module, pywrap)
- else:
- adapted_module = adapter(module, pywrap)
- if host and not port:
- try:
- host, port = host.split(':', 1)
- except ValueError:
- pass
- if port:
- port = int(port)
- return adapted_module.connect(host, database, user, password, port=port, extra_args=extra_args)
-
-
-from logilab.common.deprecation import moved
-get_adv_func_helper = moved('logilab.common.adbh', 'get_adv_func_helper')
+from logilab.common.db import (get_connection, set_prefered_driver,
+ get_dbapi_compliant_module)