summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/dialects/mysql/pyodbc.py
blob: 0da5a8eafc0004df7e3e09dbd128d501bf970d48 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# mysql/pyodbc.py
# Copyright (C) 2005-2019 the SQLAlchemy authors and contributors
# <see AUTHORS file>
#
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php

"""


.. dialect:: mysql+pyodbc
    :name: PyODBC
    :dbapi: pyodbc
    :connectstring: mysql+pyodbc://<username>:<password>@<dsnname>
    :url: http://pypi.python.org/pypi/pyodbc/

    .. note:: The PyODBC for MySQL dialect is not well supported, and
       is subject to unresolved character encoding issues
       which exist within the current ODBC drivers available.
       (see http://code.google.com/p/pyodbc/issues/detail?id=25).
       Other dialects for MySQL are recommended.

"""

import re
import sys

from .base import MySQLDialect
from .base import MySQLExecutionContext
from .types import TIME
from ... import util
from ...connectors.pyodbc import PyODBCConnector
from ...sql.sqltypes import Time


class _pyodbcTIME(TIME):
    def result_processor(self, dialect, coltype):
        def process(value):
            # pyodbc returns a datetime.time object; no need to convert
            return value

        return process


class MySQLExecutionContext_pyodbc(MySQLExecutionContext):
    def get_lastrowid(self):
        cursor = self.create_cursor()
        cursor.execute("SELECT LAST_INSERT_ID()")
        lastrowid = cursor.fetchone()[0]
        cursor.close()
        return lastrowid


class MySQLDialect_pyodbc(PyODBCConnector, MySQLDialect):
    colspecs = util.update_copy(MySQLDialect.colspecs, {Time: _pyodbcTIME})
    supports_unicode_statements = True
    execution_ctx_cls = MySQLExecutionContext_pyodbc

    pyodbc_driver_name = "MySQL"

    def _detect_charset(self, connection):
        """Sniff out the character set in use for connection results."""

        # Prefer 'character_set_results' for the current connection over the
        # value in the driver.  SET NAMES or individual variable SETs will
        # change the charset without updating the driver's view of the world.
        #
        # If it's decided that issuing that sort of SQL leaves you SOL, then
        # this can prefer the driver value.
        rs = connection.execute("SHOW VARIABLES LIKE 'character_set%%'")
        opts = {row[0]: row[1] for row in self._compat_fetchall(rs)}
        for key in ("character_set_connection", "character_set"):
            if opts.get(key, None):
                return opts[key]

        util.warn(
            "Could not detect the connection character set.  "
            "Assuming latin1."
        )
        return "latin1"

    def _extract_error_code(self, exception):
        m = re.compile(r"\((\d+)\)").search(str(exception.args))
        c = m.group(1)
        if c:
            return int(c)
        else:
            return None

    def on_connect(self):
        super_ = super(MySQLDialect_pyodbc, self).on_connect()

        def on_connect(conn):
            if super_ is not None:
                super_(conn)

            # declare Unicode encoding for pyodbc as per
            #   https://github.com/mkleehammer/pyodbc/wiki/Unicode
            pyodbc_SQL_CHAR = 1  # pyodbc.SQL_CHAR
            pyodbc_SQL_WCHAR = -8  # pyodbc.SQL_WCHAR
            if sys.version_info.major > 2:
                conn.setdecoding(pyodbc_SQL_CHAR, encoding='utf-8')
                conn.setdecoding(pyodbc_SQL_WCHAR, encoding='utf-8')
                conn.setencoding(encoding='utf-8')
            else:
                conn.setdecoding(pyodbc_SQL_CHAR, encoding='utf-8')
                conn.setdecoding(pyodbc_SQL_WCHAR, encoding='utf-8')
                conn.setencoding(str, encoding='utf-8')
                conn.setencoding(unicode, encoding='utf-8')

        return on_connect


dialect = MySQLDialect_pyodbc