summaryrefslogtreecommitdiff
path: root/django/db/backends/mysql/operations.py
diff options
context:
space:
mode:
authorSimon Charette <charette.s@gmail.com>2022-06-19 23:46:22 -0400
committerMariusz Felisiak <felisiak.mariusz@gmail.com>2022-07-06 07:40:07 +0200
commit877c800f255ccaa7abde1fb944de45d1616f5cc9 (patch)
tree1fd6fa46ea847249eab6339213d4de5ee8f05f65 /django/db/backends/mysql/operations.py
parent73766c118781a7f7052bf0a5fbee38b944964e31 (diff)
downloaddjango-877c800f255ccaa7abde1fb944de45d1616f5cc9.tar.gz
Refs CVE-2022-34265 -- Properly escaped Extract() and Trunc() parameters.
Co-authored-by: Mariusz Felisiak <felisiak.mariusz@gmail.com>
Diffstat (limited to 'django/db/backends/mysql/operations.py')
-rw-r--r--django/db/backends/mysql/operations.py117
1 files changed, 58 insertions, 59 deletions
diff --git a/django/db/backends/mysql/operations.py b/django/db/backends/mysql/operations.py
index 7c4e21671b..34cdfc0292 100644
--- a/django/db/backends/mysql/operations.py
+++ b/django/db/backends/mysql/operations.py
@@ -7,6 +7,7 @@ from django.db.models import Exists, ExpressionWrapper, Lookup
from django.db.models.constants import OnConflict
from django.utils import timezone
from django.utils.encoding import force_str
+from django.utils.regex_helper import _lazy_re_compile
class DatabaseOperations(BaseDatabaseOperations):
@@ -37,117 +38,115 @@ class DatabaseOperations(BaseDatabaseOperations):
cast_char_field_without_max_length = "char"
explain_prefix = "EXPLAIN"
- def date_extract_sql(self, lookup_type, field_name):
+ # EXTRACT format cannot be passed in parameters.
+ _extract_format_re = _lazy_re_compile(r"[A-Z_]+")
+
+ def date_extract_sql(self, lookup_type, sql, params):
# https://dev.mysql.com/doc/mysql/en/date-and-time-functions.html
if lookup_type == "week_day":
# DAYOFWEEK() returns an integer, 1-7, Sunday=1.
- return "DAYOFWEEK(%s)" % field_name
+ return f"DAYOFWEEK({sql})", params
elif lookup_type == "iso_week_day":
# WEEKDAY() returns an integer, 0-6, Monday=0.
- return "WEEKDAY(%s) + 1" % field_name
+ return f"WEEKDAY({sql}) + 1", params
elif lookup_type == "week":
# Override the value of default_week_format for consistency with
# other database backends.
# Mode 3: Monday, 1-53, with 4 or more days this year.
- return "WEEK(%s, 3)" % field_name
+ return f"WEEK({sql}, 3)", params
elif lookup_type == "iso_year":
# Get the year part from the YEARWEEK function, which returns a
# number as year * 100 + week.
- return "TRUNCATE(YEARWEEK(%s, 3), -2) / 100" % field_name
+ return f"TRUNCATE(YEARWEEK({sql}, 3), -2) / 100", params
else:
# EXTRACT returns 1-53 based on ISO-8601 for the week number.
- return "EXTRACT(%s FROM %s)" % (lookup_type.upper(), field_name)
+ lookup_type = lookup_type.upper()
+ if not self._extract_format_re.fullmatch(lookup_type):
+ raise ValueError(f"Invalid loookup type: {lookup_type!r}")
+ return f"EXTRACT({lookup_type} FROM {sql})", params
- def date_trunc_sql(self, lookup_type, field_name, tzname=None):
- field_name = self._convert_field_to_tz(field_name, tzname)
+ def date_trunc_sql(self, lookup_type, sql, params, tzname=None):
+ sql, params = self._convert_field_to_tz(sql, params, tzname)
fields = {
- "year": "%%Y-01-01",
- "month": "%%Y-%%m-01",
- } # Use double percents to escape.
+ "year": "%Y-01-01",
+ "month": "%Y-%m-01",
+ }
if lookup_type in fields:
format_str = fields[lookup_type]
- return "CAST(DATE_FORMAT(%s, '%s') AS DATE)" % (field_name, format_str)
+ return f"CAST(DATE_FORMAT({sql}, %s) AS DATE)", (*params, format_str)
elif lookup_type == "quarter":
return (
- "MAKEDATE(YEAR(%s), 1) + "
- "INTERVAL QUARTER(%s) QUARTER - INTERVAL 1 QUARTER"
- % (field_name, field_name)
+ f"MAKEDATE(YEAR({sql}), 1) + "
+ f"INTERVAL QUARTER({sql}) QUARTER - INTERVAL 1 QUARTER",
+ (*params, *params),
)
elif lookup_type == "week":
- return "DATE_SUB(%s, INTERVAL WEEKDAY(%s) DAY)" % (field_name, field_name)
+ return f"DATE_SUB({sql}, INTERVAL WEEKDAY({sql}) DAY)", (*params, *params)
else:
- return "DATE(%s)" % (field_name)
+ return f"DATE({sql})", params
def _prepare_tzname_delta(self, tzname):
tzname, sign, offset = split_tzname_delta(tzname)
return f"{sign}{offset}" if offset else tzname
- def _convert_field_to_tz(self, field_name, tzname):
+ def _convert_field_to_tz(self, sql, params, tzname):
if tzname and settings.USE_TZ and self.connection.timezone_name != tzname:
- field_name = "CONVERT_TZ(%s, '%s', '%s')" % (
- field_name,
+ return f"CONVERT_TZ({sql}, %s, %s)", (
+ *params,
self.connection.timezone_name,
self._prepare_tzname_delta(tzname),
)
- return field_name
+ return sql, params
- def datetime_cast_date_sql(self, field_name, tzname):
- field_name = self._convert_field_to_tz(field_name, tzname)
- return "DATE(%s)" % field_name
+ def datetime_cast_date_sql(self, sql, params, tzname):
+ sql, params = self._convert_field_to_tz(sql, params, tzname)
+ return f"DATE({sql})", params
- def datetime_cast_time_sql(self, field_name, tzname):
- field_name = self._convert_field_to_tz(field_name, tzname)
- return "TIME(%s)" % field_name
+ def datetime_cast_time_sql(self, sql, params, tzname):
+ sql, params = self._convert_field_to_tz(sql, params, tzname)
+ return f"TIME({sql})", params
- def datetime_extract_sql(self, lookup_type, field_name, tzname):
- field_name = self._convert_field_to_tz(field_name, tzname)
- return self.date_extract_sql(lookup_type, field_name)
+ def datetime_extract_sql(self, lookup_type, sql, params, tzname):
+ sql, params = self._convert_field_to_tz(sql, params, tzname)
+ return self.date_extract_sql(lookup_type, sql, params)
- def datetime_trunc_sql(self, lookup_type, field_name, tzname):
- field_name = self._convert_field_to_tz(field_name, tzname)
+ def datetime_trunc_sql(self, lookup_type, sql, params, tzname):
+ sql, params = self._convert_field_to_tz(sql, params, tzname)
fields = ["year", "month", "day", "hour", "minute", "second"]
- format = (
- "%%Y-",
- "%%m",
- "-%%d",
- " %%H:",
- "%%i",
- ":%%s",
- ) # Use double percents to escape.
+ format = ("%Y-", "%m", "-%d", " %H:", "%i", ":%s")
format_def = ("0000-", "01", "-01", " 00:", "00", ":00")
if lookup_type == "quarter":
return (
- "CAST(DATE_FORMAT(MAKEDATE(YEAR({field_name}), 1) + "
- "INTERVAL QUARTER({field_name}) QUARTER - "
- + "INTERVAL 1 QUARTER, '%%Y-%%m-01 00:00:00') AS DATETIME)"
- ).format(field_name=field_name)
+ f"CAST(DATE_FORMAT(MAKEDATE(YEAR({sql}), 1) + "
+ f"INTERVAL QUARTER({sql}) QUARTER - "
+ f"INTERVAL 1 QUARTER, %s) AS DATETIME)"
+ ), (*params, *params, "%Y-%m-01 00:00:00")
if lookup_type == "week":
return (
- "CAST(DATE_FORMAT(DATE_SUB({field_name}, "
- "INTERVAL WEEKDAY({field_name}) DAY), "
- "'%%Y-%%m-%%d 00:00:00') AS DATETIME)"
- ).format(field_name=field_name)
+ f"CAST(DATE_FORMAT("
+ f"DATE_SUB({sql}, INTERVAL WEEKDAY({sql}) DAY), %s) AS DATETIME)"
+ ), (*params, *params, "%Y-%m-%d 00:00:00")
try:
i = fields.index(lookup_type) + 1
except ValueError:
- sql = field_name
+ pass
else:
format_str = "".join(format[:i] + format_def[i:])
- sql = "CAST(DATE_FORMAT(%s, '%s') AS DATETIME)" % (field_name, format_str)
- return sql
+ return f"CAST(DATE_FORMAT({sql}, %s) AS DATETIME)", (*params, format_str)
+ return sql, params
- def time_trunc_sql(self, lookup_type, field_name, tzname=None):
- field_name = self._convert_field_to_tz(field_name, tzname)
+ def time_trunc_sql(self, lookup_type, sql, params, tzname=None):
+ sql, params = self._convert_field_to_tz(sql, params, tzname)
fields = {
- "hour": "%%H:00:00",
- "minute": "%%H:%%i:00",
- "second": "%%H:%%i:%%s",
- } # Use double percents to escape.
+ "hour": "%H:00:00",
+ "minute": "%H:%i:00",
+ "second": "%H:%i:%s",
+ }
if lookup_type in fields:
format_str = fields[lookup_type]
- return "CAST(DATE_FORMAT(%s, '%s') AS TIME)" % (field_name, format_str)
+ return f"CAST(DATE_FORMAT({sql}, %s) AS TIME)", (*params, format_str)
else:
- return "TIME(%s)" % (field_name)
+ return f"TIME({sql})", params
def fetch_returned_insert_rows(self, cursor):
"""