summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/engine/url.py
blob: 98d4d442e1784fe182390b221a8ab0c6e09878c4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
"""Provide the URL object as well as the make_url parsing function."""

import re, cgi, sys, urllib
from sqlalchemy import exceptions


class URL(object):
    """Represent the components of a URL used to connect to a database.

    This object is suitable to be passed directly to a ``create_engine()``
    call.  The fields of the URL are parsed from a string by the
    ``module-level make_url()`` function.  the string format of the URL is
    an RFC-1738-style string.

    Attributes on URL include:

    drivername
      The name of the database backend.  this name will correspond to a module in sqlalchemy/databases

    username
      The user name for the connection.

    password
      database password.

    host
      The name of the host.

    port
      The port number.

    database
      The database.

    query
      A dictionary containing key/value pairs representing the URL's query string.
    """

    def __init__(self, drivername, username=None, password=None, host=None, port=None, database=None, query=None):
        self.drivername = drivername
        self.username = username
        self.password = password
        self.host = host
        if port is not None:
            self.port = int(port)
        else:
            self.port = None
        self.database= database
        self.query = query or {}

    def __str__(self):
        s = self.drivername + "://"
        if self.username is not None:
            s += self.username
            if self.password is not None:
                s += ':' + urllib.quote_plus(self.password)
            s += "@"
        if self.host is not None:
            s += self.host
        if self.port is not None:
            s += ':' + str(self.port)
        if self.database is not None:
            s += '/' + self.database
        if self.query:
            keys = self.query.keys()
            keys.sort()
            s += '?' + "&".join(["%s=%s" % (k, self.query[k]) for k in keys])
        return s

    def get_dialect(self):
        """Return the SQLAlchemy database dialect class corresponding to this URL's driver name."""
        dialect=None
        if self.drivername == 'ansi':
            import sqlalchemy.ansisql
            return sqlalchemy.ansisql.dialect

        try:
            module=getattr(__import__('sqlalchemy.databases.%s' % self.drivername).databases, self.drivername)
            dialect=module.dialect
        except ImportError:
            if sys.exc_info()[2].tb_next is None:
                import pkg_resources
                for res in pkg_resources.iter_entry_points('sqlalchemy.databases'):
                    if res.name==self.drivername:
                        dialect=res.load()
            else:
               raise
        if dialect is not None:
            return dialect
        raise ImportError('unknown database %r' % self.drivername) 
  
    def translate_connect_args(self, names):
        """Translate this URL's attributes into a dictionary of connection arguments.

        Given a list of argument names corresponding to the URL
        attributes (`host`, `database`, `username`, `password`,
        `port`), will assemble the attribute values of this URL into
        the dictionary using the given names.
        """

        a = {}
        attribute_names = ['host', 'database', 'username', 'password', 'port']
        for n in names:
            sname = attribute_names.pop(0)
            if n is None:
                continue
            if getattr(self, sname, None):
                a[n] = getattr(self, sname)
        return a

def make_url(name_or_url):
    """Given a string or unicode instance, produce a new URL instance.

    The given string is parsed according to the rfc1738 spec.  If an
    existing URL object is passed, just returns the object.
    """

    if isinstance(name_or_url, basestring):
        return _parse_rfc1738_args(name_or_url)
    else:
        return name_or_url

def _parse_rfc1738_args(name):
    pattern = re.compile(r'''
            (\w+)://
            (?:
                ([^:/]*)
                (?::([^/]*))?
            @)?
            (?:
                ([^/:]*)
                (?::([^/]*))?
            )?
            (?:/(.*))?
            '''
            , re.X)

    m = pattern.match(name)
    if m is not None:
        (name, username, password, host, port, database) = m.group(1, 2, 3, 4, 5, 6)
        if database is not None:
            tokens = database.split(r"?", 2)
            database = tokens[0]
            query = (len(tokens) > 1 and dict( cgi.parse_qsl(tokens[1]) ) or None)
            if query is not None:
                query = dict([(k.encode('ascii'), query[k]) for k in query])
        else:
            query = None
        opts = {'username':username,'password':password,'host':host,'port':port,'database':database, 'query':query}
        if opts['password'] is not None:
            opts['password'] = urllib.unquote_plus(opts['password'])
        return URL(name, **opts)
    else:
        raise exceptions.ArgumentError("Could not parse rfc1738 URL from string '%s'" % name)

def _parse_keyvalue_args(name):
    m = re.match( r'(\w+)://(.*)', name)
    if m is not None:
        (name, args) = m.group(1, 2)
        opts = dict( cgi.parse_qsl( args ) )
        return URL(name, *opts)
    else:
        return None