summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/dialects/postgresql/psycopg2.py
blob: a09697e79061952db281eef0bb72eb6e90d98b37 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
"""Support for the PostgreSQL database via the psycopg2 driver.

Driver
------

The psycopg2 driver is supported, available at http://pypi.python.org/pypi/psycopg2/ .
The dialect has several behaviors  which are specifically tailored towards compatibility 
with this module.

Note that psycopg1 is **not** supported.

Connecting
----------

URLs are of the form `postgresql+psycopg2://user@password@host:port/dbname[?key=value&key=value...]`.

psycopg2-specific keyword arguments which are accepted by :func:`~sqlalchemy.create_engine()` are:

* *server_side_cursors* - Enable the usage of "server side cursors" for SQL statements which support
  this feature.  What this essentially means from a psycopg2 point of view is that the cursor is 
  created using a name, e.g. `connection.cursor('some name')`, which has the effect that result rows
  are not immediately pre-fetched and buffered after statement execution, but are instead left 
  on the server and only retrieved as needed.    SQLAlchemy's :class:`~sqlalchemy.engine.base.ResultProxy`
  uses special row-buffering behavior when this feature is enabled, such that groups of 100 rows 
  at a time are fetched over the wire to reduce conversational overhead.

* *isolation_level* - Sets the transaction isolation level for each transaction
  within the engine. Valid isolation levels are `READ_COMMITTED`,
  `READ_UNCOMMITTED`, `REPEATABLE_READ`, and `SERIALIZABLE`.

Transactions
------------

The psycopg2 dialect fully supports SAVEPOINT and two-phase commit operations.


"""

import decimal, random, re
from sqlalchemy import util
from sqlalchemy.engine import base, default
from sqlalchemy.sql import expression
from sqlalchemy.sql import operators as sql_operators
from sqlalchemy import types as sqltypes
from sqlalchemy.dialects.postgresql.base import PGDialect, PGCompiler, PGIdentifierPreparer

class _PGNumeric(sqltypes.Numeric):
    def bind_processor(self, dialect):
        return None

    def result_processor(self, dialect):
        if self.asdecimal:
            return None
        else:
            def process(value):
                if isinstance(value, decimal.Decimal):
                    return float(value)
                else:
                    return value
            return process


# TODO: filter out 'FOR UPDATE' statements
SERVER_SIDE_CURSOR_RE = re.compile(
    r'\s*SELECT',
    re.I | re.UNICODE)

class PostgreSQL_psycopg2ExecutionContext(default.DefaultExecutionContext):
    def create_cursor(self):
        # TODO: coverage for server side cursors + select.for_update()
        is_server_side = \
            self.dialect.server_side_cursors and \
            not self.should_autocommit and \
            ((self.compiled and isinstance(self.compiled.statement, expression.Selectable) 
                and not getattr(self.compiled.statement, 'for_update', False)) \
            or \
            (
                (not self.compiled or isinstance(self.compiled.statement, expression._TextClause)) 
                and self.statement and SERVER_SIDE_CURSOR_RE.match(self.statement))
            )

        self.__is_server_side = is_server_side
        if is_server_side:
            # use server-side cursors:
            # http://lists.initd.org/pipermail/psycopg/2007-January/005251.html
            ident = "c_%s_%s" % (hex(id(self))[2:], hex(random.randint(0, 65535))[2:])
            return self._connection.connection.cursor(ident)
        else:
            return self._connection.connection.cursor()

    def get_result_proxy(self):
        if self.__is_server_side:
            return base.BufferedRowResultProxy(self)
        else:
            return base.ResultProxy(self)


class PostgreSQL_psycopg2Compiler(PGCompiler):
    def visit_mod(self, binary, **kw):
        return self.process(binary.left) + " %% " + self.process(binary.right)
    
    def post_process_text(self, text):
        return text.replace('%', '%%')


class PostgreSQL_psycopg2IdentifierPreparer(PGIdentifierPreparer):
    def _escape_identifier(self, value):
        value = value.replace(self.escape_quote, self.escape_to_quote)
        return value.replace('%', '%%')


class PostgreSQL_psycopg2(PGDialect):
    driver = 'psycopg2'
    supports_unicode_statements = False
    default_paramstyle = 'pyformat'
    supports_sane_multi_rowcount = False
    execution_ctx_cls = PostgreSQL_psycopg2ExecutionContext
    statement_compiler = PostgreSQL_psycopg2Compiler
    preparer = PostgreSQL_psycopg2IdentifierPreparer

    colspecs = util.update_copy(
        PGDialect.colspecs,
        {
            sqltypes.Numeric : _PGNumeric,
            sqltypes.Float: sqltypes.Float,  # prevents _PGNumeric from being used
        }
    )

    def __init__(self, server_side_cursors=False, **kwargs):
        PGDialect.__init__(self, **kwargs)
        self.server_side_cursors = server_side_cursors

    @classmethod
    def dbapi(cls):
        psycopg = __import__('psycopg2')
        return psycopg

    def create_connect_args(self, url):
        opts = url.translate_connect_args(username='user')
        if 'port' in opts:
            opts['port'] = int(opts['port'])
        opts.update(url.query)
        return ([], opts)

    def is_disconnect(self, e):
        if isinstance(e, self.dbapi.OperationalError):
            return 'closed the connection' in str(e) or 'connection not open' in str(e)
        elif isinstance(e, self.dbapi.InterfaceError):
            return 'connection already closed' in str(e) or 'cursor already closed' in str(e)
        elif isinstance(e, self.dbapi.ProgrammingError):
            # yes, it really says "losed", not "closed"
            return "losed the connection unexpectedly" in str(e)
        else:
            return False

dialect = PostgreSQL_psycopg2