1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
|
"""Support for the PostgreSQL database via the psycopg2 driver.
Driver
------
The psycopg2 driver is supported, available at http://pypi.python.org/pypi/psycopg2/ .
The dialect has several behaviors which are specifically tailored towards compatibility
with this module.
Note that psycopg1 is **not** supported.
Connecting
----------
URLs are of the form `postgresql+psycopg2://user@password@host:port/dbname[?key=value&key=value...]`.
psycopg2-specific keyword arguments which are accepted by :func:`~sqlalchemy.create_engine()` are:
* *server_side_cursors* - Enable the usage of "server side cursors" for SQL statements which support
this feature. What this essentially means from a psycopg2 point of view is that the cursor is
created using a name, e.g. `connection.cursor('some name')`, which has the effect that result rows
are not immediately pre-fetched and buffered after statement execution, but are instead left
on the server and only retrieved as needed. SQLAlchemy's :class:`~sqlalchemy.engine.base.ResultProxy`
uses special row-buffering behavior when this feature is enabled, such that groups of 100 rows
at a time are fetched over the wire to reduce conversational overhead.
* *isolation_level* - Sets the transaction isolation level for each transaction
within the engine. Valid isolation levels are `READ_COMMITTED`,
`READ_UNCOMMITTED`, `REPEATABLE_READ`, and `SERIALIZABLE`.
Transactions
------------
The psycopg2 dialect fully supports SAVEPOINT and two-phase commit operations.
"""
import decimal, random, re
from sqlalchemy import util
from sqlalchemy.engine import base, default
from sqlalchemy.sql import expression
from sqlalchemy.sql import operators as sql_operators
from sqlalchemy import types as sqltypes
from sqlalchemy.dialects.postgresql.base import PGDialect, PGCompiler, PGIdentifierPreparer, PGExecutionContext
class _PGNumeric(sqltypes.Numeric):
def bind_processor(self, dialect):
return None
def result_processor(self, dialect):
if self.asdecimal:
return None
else:
def process(value):
if isinstance(value, decimal.Decimal):
return float(value)
else:
return value
return process
# TODO: filter out 'FOR UPDATE' statements
SERVER_SIDE_CURSOR_RE = re.compile(
r'\s*SELECT',
re.I | re.UNICODE)
class PostgreSQL_psycopg2ExecutionContext(PGExecutionContext):
def create_cursor(self):
# TODO: coverage for server side cursors + select.for_update()
is_server_side = \
self.dialect.server_side_cursors and \
not self.should_autocommit and \
((self.compiled and isinstance(self.compiled.statement, expression.Selectable)
and not getattr(self.compiled.statement, 'for_update', False)) \
or \
(
(not self.compiled or isinstance(self.compiled.statement, expression._TextClause))
and self.statement and SERVER_SIDE_CURSOR_RE.match(self.statement))
)
self.__is_server_side = is_server_side
if is_server_side:
# use server-side cursors:
# http://lists.initd.org/pipermail/psycopg/2007-January/005251.html
ident = "c_%s_%s" % (hex(id(self))[2:], hex(random.randint(0, 65535))[2:])
return self._connection.connection.cursor(ident)
else:
return self._connection.connection.cursor()
def get_result_proxy(self):
if self.__is_server_side:
return base.BufferedRowResultProxy(self)
else:
return base.ResultProxy(self)
class PostgreSQL_psycopg2Compiler(PGCompiler):
def visit_mod(self, binary, **kw):
return self.process(binary.left) + " %% " + self.process(binary.right)
def post_process_text(self, text):
return text.replace('%', '%%')
class PostgreSQL_psycopg2IdentifierPreparer(PGIdentifierPreparer):
def _escape_identifier(self, value):
value = value.replace(self.escape_quote, self.escape_to_quote)
return value.replace('%', '%%')
class PostgreSQL_psycopg2(PGDialect):
driver = 'psycopg2'
supports_unicode_statements = False
default_paramstyle = 'pyformat'
supports_sane_multi_rowcount = False
execution_ctx_cls = PostgreSQL_psycopg2ExecutionContext
statement_compiler = PostgreSQL_psycopg2Compiler
preparer = PostgreSQL_psycopg2IdentifierPreparer
colspecs = util.update_copy(
PGDialect.colspecs,
{
sqltypes.Numeric : _PGNumeric,
sqltypes.Float: sqltypes.Float, # prevents _PGNumeric from being used
}
)
def __init__(self, server_side_cursors=False, **kwargs):
PGDialect.__init__(self, **kwargs)
self.server_side_cursors = server_side_cursors
@classmethod
def dbapi(cls):
psycopg = __import__('psycopg2')
return psycopg
def create_connect_args(self, url):
opts = url.translate_connect_args(username='user')
if 'port' in opts:
opts['port'] = int(opts['port'])
opts.update(url.query)
return ([], opts)
def is_disconnect(self, e):
if isinstance(e, self.dbapi.OperationalError):
return 'closed the connection' in str(e) or 'connection not open' in str(e)
elif isinstance(e, self.dbapi.InterfaceError):
return 'connection already closed' in str(e) or 'cursor already closed' in str(e)
elif isinstance(e, self.dbapi.ProgrammingError):
# yes, it really says "losed", not "closed"
return "losed the connection unexpectedly" in str(e)
else:
return False
dialect = PostgreSQL_psycopg2
|