summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/engine/url.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2019-01-06 01:14:26 -0500
committermike bayer <mike_mp@zzzcomputing.com>2019-01-06 17:34:50 +0000
commit1e1a38e7801f410f244e4bbb44ec795ae152e04e (patch)
tree28e725c5c8188bd0cfd133d1e268dbca9b524978 /lib/sqlalchemy/engine/url.py
parent404e69426b05a82d905cbb3ad33adafccddb00dd (diff)
downloadsqlalchemy-1e1a38e7801f410f244e4bbb44ec795ae152e04e.tar.gz
Run black -l 79 against all source files
This is a straight reformat run using black as is, with no edits applied at all. The black run will format code consistently, however in some cases that are prevalent in SQLAlchemy code it produces too-long lines. The too-long lines will be resolved in the following commit that will resolve all remaining flake8 issues including shadowed builtins, long lines, import order, unused imports, duplicate imports, and docstring issues. Change-Id: I7eda77fed3d8e73df84b3651fd6cfcfe858d4dc9
Diffstat (limited to 'lib/sqlalchemy/engine/url.py')
-rw-r--r--lib/sqlalchemy/engine/url.py115
1 files changed, 65 insertions, 50 deletions
diff --git a/lib/sqlalchemy/engine/url.py b/lib/sqlalchemy/engine/url.py
index 1662efe20..e92e57b8e 100644
--- a/lib/sqlalchemy/engine/url.py
+++ b/lib/sqlalchemy/engine/url.py
@@ -50,8 +50,16 @@ class URL(object):
"""
- def __init__(self, drivername, username=None, password=None,
- host=None, port=None, database=None, query=None):
+ def __init__(
+ self,
+ drivername,
+ username=None,
+ password=None,
+ host=None,
+ port=None,
+ database=None,
+ query=None,
+ ):
self.drivername = drivername
self.username = username
self.password_original = password
@@ -68,26 +76,26 @@ class URL(object):
if self.username is not None:
s += _rfc_1738_quote(self.username)
if self.password is not None:
- s += ':' + ('***' if hide_password
- else _rfc_1738_quote(self.password))
+ s += ":" + (
+ "***" if hide_password else _rfc_1738_quote(self.password)
+ )
s += "@"
if self.host is not None:
- if ':' in self.host:
+ if ":" in self.host:
s += "[%s]" % self.host
else:
s += self.host
if self.port is not None:
- s += ':' + str(self.port)
+ s += ":" + str(self.port)
if self.database is not None:
- s += '/' + self.database
+ s += "/" + self.database
if self.query:
keys = list(self.query)
keys.sort()
- s += '?' + "&".join(
- "%s=%s" % (
- k,
- element
- ) for k in keys for element in util.to_list(self.query[k])
+ s += "?" + "&".join(
+ "%s=%s" % (k, element)
+ for k in keys
+ for element in util.to_list(self.query[k])
)
return s
@@ -101,14 +109,15 @@ class URL(object):
return hash(str(self))
def __eq__(self, other):
- return \
- isinstance(other, URL) and \
- self.drivername == other.drivername and \
- self.username == other.username and \
- self.password == other.password and \
- self.host == other.host and \
- self.database == other.database and \
- self.query == other.query
+ return (
+ isinstance(other, URL)
+ and self.drivername == other.drivername
+ and self.username == other.username
+ and self.password == other.password
+ and self.host == other.host
+ and self.database == other.database
+ and self.query == other.query
+ )
@property
def password(self):
@@ -122,20 +131,20 @@ class URL(object):
self.password_original = password
def get_backend_name(self):
- if '+' not in self.drivername:
+ if "+" not in self.drivername:
return self.drivername
else:
- return self.drivername.split('+')[0]
+ return self.drivername.split("+")[0]
def get_driver_name(self):
- if '+' not in self.drivername:
+ if "+" not in self.drivername:
return self.get_dialect().driver
else:
- return self.drivername.split('+')[1]
+ return self.drivername.split("+")[1]
def _instantiate_plugins(self, kwargs):
- plugin_names = util.to_list(self.query.get('plugin', ()))
- plugin_names += kwargs.get('plugins', [])
+ plugin_names = util.to_list(self.query.get("plugin", ()))
+ plugin_names += kwargs.get("plugins", [])
return [
plugins.load(plugin_name)(self, kwargs)
@@ -149,17 +158,19 @@ class URL(object):
returned class implements the get_dialect_cls() method.
"""
- if '+' not in self.drivername:
+ if "+" not in self.drivername:
name = self.drivername
else:
- name = self.drivername.replace('+', '.')
+ name = self.drivername.replace("+", ".")
cls = registry.load(name)
# check for legacy dialects that
# would return a module with 'dialect' as the
# actual class
- if hasattr(cls, 'dialect') and \
- isinstance(cls.dialect, type) and \
- issubclass(cls.dialect, Dialect):
+ if (
+ hasattr(cls, "dialect")
+ and isinstance(cls.dialect, type)
+ and issubclass(cls.dialect, Dialect)
+ ):
return cls.dialect
else:
return cls
@@ -187,7 +198,7 @@ class URL(object):
"""
translated = {}
- attribute_names = ['host', 'database', 'username', 'password', 'port']
+ attribute_names = ["host", "database", "username", "password", "port"]
for sname in attribute_names:
if names:
name = names.pop(0)
@@ -214,7 +225,8 @@ def make_url(name_or_url):
def _parse_rfc1738_args(name):
- pattern = re.compile(r'''
+ pattern = re.compile(
+ r"""
(?P<name>[\w\+]+)://
(?:
(?P<username>[^:/]*)
@@ -228,21 +240,23 @@ def _parse_rfc1738_args(name):
(?::(?P<port>[^/]*))?
)?
(?:/(?P<database>.*))?
- ''', re.X)
+ """,
+ re.X,
+ )
m = pattern.match(name)
if m is not None:
components = m.groupdict()
- if components['database'] is not None:
- tokens = components['database'].split('?', 2)
- components['database'] = tokens[0]
+ if components["database"] is not None:
+ tokens = components["database"].split("?", 2)
+ components["database"] = tokens[0]
if len(tokens) > 1:
query = {}
for key, value in util.parse_qsl(tokens[1]):
if util.py2k:
- key = key.encode('ascii')
+ key = key.encode("ascii")
if key in query:
query[key] = util.to_list(query[key])
query[key].append(value)
@@ -252,26 +266,27 @@ def _parse_rfc1738_args(name):
query = None
else:
query = None
- components['query'] = query
+ components["query"] = query
- if components['username'] is not None:
- components['username'] = _rfc_1738_unquote(components['username'])
+ if components["username"] is not None:
+ components["username"] = _rfc_1738_unquote(components["username"])
- if components['password'] is not None:
- components['password'] = _rfc_1738_unquote(components['password'])
+ if components["password"] is not None:
+ components["password"] = _rfc_1738_unquote(components["password"])
- ipv4host = components.pop('ipv4host')
- ipv6host = components.pop('ipv6host')
- components['host'] = ipv4host or ipv6host
- name = components.pop('name')
+ ipv4host = components.pop("ipv4host")
+ ipv6host = components.pop("ipv6host")
+ components["host"] = ipv4host or ipv6host
+ name = components.pop("name")
return URL(name, **components)
else:
raise exc.ArgumentError(
- "Could not parse rfc1738 URL from string '%s'" % name)
+ "Could not parse rfc1738 URL from string '%s'" % name
+ )
def _rfc_1738_quote(text):
- return re.sub(r'[:@/]', lambda m: "%%%X" % ord(m.group(0)), text)
+ return re.sub(r"[:@/]", lambda m: "%%%X" % ord(m.group(0)), text)
def _rfc_1738_unquote(text):
@@ -279,7 +294,7 @@ def _rfc_1738_unquote(text):
def _parse_keyvalue_args(name):
- m = re.match(r'(\w+)://(.*)', name)
+ m = re.match(r"(\w+)://(.*)", name)
if m is not None:
(name, args) = m.group(1, 2)
opts = dict(util.parse_qsl(args))