diff options
Diffstat (limited to 'lib/sqlalchemy/engine/url.py')
-rw-r--r-- | lib/sqlalchemy/engine/url.py | 115 |
1 files changed, 65 insertions, 50 deletions
diff --git a/lib/sqlalchemy/engine/url.py b/lib/sqlalchemy/engine/url.py index 1662efe20..e92e57b8e 100644 --- a/lib/sqlalchemy/engine/url.py +++ b/lib/sqlalchemy/engine/url.py @@ -50,8 +50,16 @@ class URL(object): """ - def __init__(self, drivername, username=None, password=None, - host=None, port=None, database=None, query=None): + def __init__( + self, + drivername, + username=None, + password=None, + host=None, + port=None, + database=None, + query=None, + ): self.drivername = drivername self.username = username self.password_original = password @@ -68,26 +76,26 @@ class URL(object): if self.username is not None: s += _rfc_1738_quote(self.username) if self.password is not None: - s += ':' + ('***' if hide_password - else _rfc_1738_quote(self.password)) + s += ":" + ( + "***" if hide_password else _rfc_1738_quote(self.password) + ) s += "@" if self.host is not None: - if ':' in self.host: + if ":" in self.host: s += "[%s]" % self.host else: s += self.host if self.port is not None: - s += ':' + str(self.port) + s += ":" + str(self.port) if self.database is not None: - s += '/' + self.database + s += "/" + self.database if self.query: keys = list(self.query) keys.sort() - s += '?' + "&".join( - "%s=%s" % ( - k, - element - ) for k in keys for element in util.to_list(self.query[k]) + s += "?" + "&".join( + "%s=%s" % (k, element) + for k in keys + for element in util.to_list(self.query[k]) ) return s @@ -101,14 +109,15 @@ class URL(object): return hash(str(self)) def __eq__(self, other): - return \ - isinstance(other, URL) and \ - self.drivername == other.drivername and \ - self.username == other.username and \ - self.password == other.password and \ - self.host == other.host and \ - self.database == other.database and \ - self.query == other.query + return ( + isinstance(other, URL) + and self.drivername == other.drivername + and self.username == other.username + and self.password == other.password + and self.host == other.host + and self.database == other.database + and self.query == other.query + ) @property def password(self): @@ -122,20 +131,20 @@ class URL(object): self.password_original = password def get_backend_name(self): - if '+' not in self.drivername: + if "+" not in self.drivername: return self.drivername else: - return self.drivername.split('+')[0] + return self.drivername.split("+")[0] def get_driver_name(self): - if '+' not in self.drivername: + if "+" not in self.drivername: return self.get_dialect().driver else: - return self.drivername.split('+')[1] + return self.drivername.split("+")[1] def _instantiate_plugins(self, kwargs): - plugin_names = util.to_list(self.query.get('plugin', ())) - plugin_names += kwargs.get('plugins', []) + plugin_names = util.to_list(self.query.get("plugin", ())) + plugin_names += kwargs.get("plugins", []) return [ plugins.load(plugin_name)(self, kwargs) @@ -149,17 +158,19 @@ class URL(object): returned class implements the get_dialect_cls() method. """ - if '+' not in self.drivername: + if "+" not in self.drivername: name = self.drivername else: - name = self.drivername.replace('+', '.') + name = self.drivername.replace("+", ".") cls = registry.load(name) # check for legacy dialects that # would return a module with 'dialect' as the # actual class - if hasattr(cls, 'dialect') and \ - isinstance(cls.dialect, type) and \ - issubclass(cls.dialect, Dialect): + if ( + hasattr(cls, "dialect") + and isinstance(cls.dialect, type) + and issubclass(cls.dialect, Dialect) + ): return cls.dialect else: return cls @@ -187,7 +198,7 @@ class URL(object): """ translated = {} - attribute_names = ['host', 'database', 'username', 'password', 'port'] + attribute_names = ["host", "database", "username", "password", "port"] for sname in attribute_names: if names: name = names.pop(0) @@ -214,7 +225,8 @@ def make_url(name_or_url): def _parse_rfc1738_args(name): - pattern = re.compile(r''' + pattern = re.compile( + r""" (?P<name>[\w\+]+):// (?: (?P<username>[^:/]*) @@ -228,21 +240,23 @@ def _parse_rfc1738_args(name): (?::(?P<port>[^/]*))? )? (?:/(?P<database>.*))? - ''', re.X) + """, + re.X, + ) m = pattern.match(name) if m is not None: components = m.groupdict() - if components['database'] is not None: - tokens = components['database'].split('?', 2) - components['database'] = tokens[0] + if components["database"] is not None: + tokens = components["database"].split("?", 2) + components["database"] = tokens[0] if len(tokens) > 1: query = {} for key, value in util.parse_qsl(tokens[1]): if util.py2k: - key = key.encode('ascii') + key = key.encode("ascii") if key in query: query[key] = util.to_list(query[key]) query[key].append(value) @@ -252,26 +266,27 @@ def _parse_rfc1738_args(name): query = None else: query = None - components['query'] = query + components["query"] = query - if components['username'] is not None: - components['username'] = _rfc_1738_unquote(components['username']) + if components["username"] is not None: + components["username"] = _rfc_1738_unquote(components["username"]) - if components['password'] is not None: - components['password'] = _rfc_1738_unquote(components['password']) + if components["password"] is not None: + components["password"] = _rfc_1738_unquote(components["password"]) - ipv4host = components.pop('ipv4host') - ipv6host = components.pop('ipv6host') - components['host'] = ipv4host or ipv6host - name = components.pop('name') + ipv4host = components.pop("ipv4host") + ipv6host = components.pop("ipv6host") + components["host"] = ipv4host or ipv6host + name = components.pop("name") return URL(name, **components) else: raise exc.ArgumentError( - "Could not parse rfc1738 URL from string '%s'" % name) + "Could not parse rfc1738 URL from string '%s'" % name + ) def _rfc_1738_quote(text): - return re.sub(r'[:@/]', lambda m: "%%%X" % ord(m.group(0)), text) + return re.sub(r"[:@/]", lambda m: "%%%X" % ord(m.group(0)), text) def _rfc_1738_unquote(text): @@ -279,7 +294,7 @@ def _rfc_1738_unquote(text): def _parse_keyvalue_args(name): - m = re.match(r'(\w+)://(.*)', name) + m = re.match(r"(\w+)://(.*)", name) if m is not None: (name, args) = m.group(1, 2) opts = dict(util.parse_qsl(args)) |