diff options
Diffstat (limited to 'sqlplain/connection.py')
-rw-r--r-- | sqlplain/connection.py | 298 |
1 files changed, 0 insertions, 298 deletions
diff --git a/sqlplain/connection.py b/sqlplain/connection.py deleted file mode 100644 index c93d760..0000000 --- a/sqlplain/connection.py +++ /dev/null @@ -1,298 +0,0 @@ -import sys, threading, itertools, string -from operator import attrgetter -try: - from collections import namedtuple -except ImportError: - from sqlplain.namedtuple import namedtuple -from sqlplain.uri import URI -from sqlplain.sql_support import get_args_templ -from decorator import decorator - -class NotFoundError(Exception): - pass - -@decorator -def retry(func, conn, *args, **kw): - """ - A decorator making a function taking a connection as first argument - to retry in case of errors - """ - try: - return func(conn, *args, **kw) - except conn.driver.Error, e: - conn.close() # retry with a fresh connection - return func(conn, *args, **kw) - -Field = namedtuple( - 'Field', - 'name type_code display_size internal_size precision scale null_ok'.split()) - -counter = itertools.count(1) - -def noname(): - return 'noname%d' % counter.next() - -class TupleList(list): - "Used as result of LazyConn.execute" - header = None - rowcount = None - descr = None - -class Transaction(object): - """ - A wrapper around database actions with signature (conn, *args, **kw). - """ - def __init__(self, action, conn, *args, **kw): - self.action = action - self.conn = conn - self.args = args - self.kw = kw - - def run(self, conn=None, args=None, kw=None, commit=True): - "Execute the action in a transaction" - conn = conn or self.conn - args = args or self.args - kw = kw or self.kw - try: - return self.action(conn, *args, **kw) - except Exception, e: - conn.rollback() - raise e.__class__, e, sys.exc_info()[2] - else: - if commit: - conn.commit() - else: - conn.rollback() - -class _Storage(object): - "A place where to store the low level connection and cursor" - - @classmethod - def new(cls, connect, args): - "Make a subclass of _Storage and instantiate it" - subc = type('%sSubclass' % cls.__name__, (cls,), - dict(connect=staticmethod(connect), args=args)) - return subc() - - @property - def conn(self): - "Return the low level connection" - conn = getattr(self, '_conn', None) - if conn is None: - connect, args = self.__class__.connect, self.__class__.args - conn = self._conn = connect(*args) - return conn - - @property - def curs(self): - "Return the low level cursor" - curs = getattr(self, '_curs', None) - if curs is None: - curs = self._curs = self.conn.cursor() - return curs - - def close(self): - """The next time you will call an active method, a fresh new - connection will be instantiated""" - if getattr(self, '_curs', False): - try: - self._curs.close() - except: # ignore if already closed - pass - self._curs = None - if getattr(self, '_conn', False): - try: - self._conn.close() - except: # ignore if already closed - pass - self._conn = None - -class _ThreadLocalStorage(threading.local, _Storage): - "A threadlocal object where to store low level connection and cursor" - -class LazyConnection(object): - """ - A lazy connection object. It is lazy since the database connection - is performed at execution time, not at inizialization time. Notice - that this class does not manage any kind of logging, on purpose. - There is however a chatty method for easy of debugging. - """ - def __init__(self, uri, isolation_level=None, threadlocal=False, - params='SEQUENCE'): - if params not in ('SEQUENCE', 'MAPPING'): - raise TypeError("params must be 'SEQUENCE' or 'MAPPING', you " - "passed %s" % params) - self.params = params - self.uri = URI(uri) - self.name = self.uri['database'] - self.dbtype = self.uri['dbtype'] - self.driver, params = self.uri.get_driver_params() - connect = self.driver.connect - args = params, isolation_level - self.chatty = False - self.isolation_level = isolation_level - self.threadlocal = threadlocal - if threadlocal: - self._storage = _ThreadLocalStorage.new(connect, args) - else: - self._storage = _Storage.new(connect, args) - - def _raw_execute(self, templ, args): - """ - Call a dbapi2 cursor; return the rowcount or a list of tuples, - plus an header (None in the case of the rowcount). - """ - cursor = self._storage.curs - try: - # this special casing is needed for psycopg2 - if args: - cursor.execute(templ, args) - else: - cursor.execute(templ) - except Exception, e: - tb = sys.exc_info()[2] - raise e.__class__, '%s\nQUERY WAS:%s%s' % (e, templ, args), tb - descr = cursor.description - if descr is None: # after an update - return None, cursor.rowcount - else: # after a select - return descr, cursor.fetchall() - - def execute(self, templ, args=(), ntuple=None, scalar=False): - "args must be a sequence, not a dictionary" - if self.dbtype == 'mssql': - # converts unicode arguments to utf8 - lst = [] - for a in args: - if isinstance(a, bool): - lst.append(int(a)) - elif isinstance(a, unicode): - lst.append(a.encode('utf8')) - else: - lst.append(a) - args = tuple(lst) - if self.driver.placeholder: # the template has to be interpolated - argnames, templ = get_args_templ(templ, self.driver.placeholder) - if len(argnames) != len(args): # especially useful for mssql - raise TypeError( - "TypeError when executing %s\nExpected %d arguments (%s), got %d %s" % - (templ, len(argnames), ', '.join(argnames), len(args), args)) - if self.params == 'MAPPING': - # replace the passed dictionary with the corresponding tuple - args = tuple(args[name] for name in argnames) - descr, res = self._raw_execute(templ, args) - cursor = self._storage.curs # needed to make the reset work - if self.chatty: - print(cursor.rowcount, templ, args) - if scalar: # you expected a scalar result - if not res: - raise NotFoundError( - "No result\nQUERY WAS: %s%s\n" % (templ, args)) - elif len(res) > 1: - raise ValueError( - "Expected to get a scalar result, got %s\nQUERY WAS:%s%s\n" - % (res, templ, args)) - return res[0][0] - if descr: # the query was a SELECT - fields = [Field(d) for d in descr] - header = [f.name or noname() for f in fields] - if ntuple is None: - Ntuple = namedtuple('DBTuple', header) - elif isinstance(ntuple, str): - Ntuple = namedtuple(ntuple, header) - else: - Ntuple = ntuple - res = TupleList(Ntuple(row) for row in res) - res.descr = fields - res.header = Ntuple(header) - return res - - def executescript(self, sql, *dicts, **kw): - "A driver-independent method to execute sql templates" - d = {} - for dct in dicts + (kw,): - d.update(dct) - if d: - sql = string.Template(sql).substitute(d) - if self.dbtype == 'sqlite': - self._storage.curs.executescript(sql) - else: # psycopg and pymssql are already able to execute chunks - self.execute(sql) - - def cursor(self): - "Return a new cursor at each call. Here for DB API 2 compatibility" - return self._storage.conn.cursor() - - def open(self): - "Return the low level underlying connection" - return self._storage.conn - - def close(self): - """The next time you will call an active method, a fresh new - connection will be instantiated""" - self._storage.close() - - def rollback(self): - return self._storage.conn.rollback() - - def commit(self): - return self._storage.conn.commit() - - def __enter__(self): - return self - - def __exit__(self, exc_class, exc, tb): - if exc_class: - self.rollback() - raise exc_class, exc, tb - else: - self.commit() - - def __repr__(self): - return "<%s %s, isolation_level=%s>" % ( - self.__class__.__name__, self.uri, self.isolation_level) - - @property - def rowcount(self): - return self._storage.curs.rowcount - -class RetryingConnection(LazyConnection): - """A LazyConnection which retries once in case of execution errors. - It makes sense for long running applications in autocommit mode.""" - _raw_execute = retry(LazyConnection._raw_execute.im_func) - -class NullObject(object): - '''Implements the NullObject pattern. - - >>> n = NullObject() - >>> n.dosomething(1,2,3) - ''' - def __getattr__(self, name): - return lambda *a, **k: None - def __repr__(self): - return 'None' - def __nonzero__(self): - return False - def __iter__(self): - return () - def __call__(self, *a, **k): - return None - -class FakeConnection(object): - def __init__(self, iodict): - self.iodict = iodict - self._storage = NullObject() - def execute(self, templ, args=()): - return self.iodict[(templ,) + args] - def executescript(self, templ, *dicts, **kw): - pass - def commit(self): - pass - def rollback(self): - pass - def close(self): - pass - def __enter__(self): - return self - def __exit_(self, exctype=None, exc=None, tb=None): - pass |