diff options
author | Simon Charette <charette.s@gmail.com> | 2022-06-19 23:46:22 -0400 |
---|---|---|
committer | Mariusz Felisiak <felisiak.mariusz@gmail.com> | 2022-07-06 07:40:07 +0200 |
commit | 877c800f255ccaa7abde1fb944de45d1616f5cc9 (patch) | |
tree | 1fd6fa46ea847249eab6339213d4de5ee8f05f65 /django/db/backends/mysql/operations.py | |
parent | 73766c118781a7f7052bf0a5fbee38b944964e31 (diff) | |
download | django-877c800f255ccaa7abde1fb944de45d1616f5cc9.tar.gz |
Refs CVE-2022-34265 -- Properly escaped Extract() and Trunc() parameters.
Co-authored-by: Mariusz Felisiak <felisiak.mariusz@gmail.com>
Diffstat (limited to 'django/db/backends/mysql/operations.py')
-rw-r--r-- | django/db/backends/mysql/operations.py | 117 |
1 files changed, 58 insertions, 59 deletions
diff --git a/django/db/backends/mysql/operations.py b/django/db/backends/mysql/operations.py index 7c4e21671b..34cdfc0292 100644 --- a/django/db/backends/mysql/operations.py +++ b/django/db/backends/mysql/operations.py @@ -7,6 +7,7 @@ from django.db.models import Exists, ExpressionWrapper, Lookup from django.db.models.constants import OnConflict from django.utils import timezone from django.utils.encoding import force_str +from django.utils.regex_helper import _lazy_re_compile class DatabaseOperations(BaseDatabaseOperations): @@ -37,117 +38,115 @@ class DatabaseOperations(BaseDatabaseOperations): cast_char_field_without_max_length = "char" explain_prefix = "EXPLAIN" - def date_extract_sql(self, lookup_type, field_name): + # EXTRACT format cannot be passed in parameters. + _extract_format_re = _lazy_re_compile(r"[A-Z_]+") + + def date_extract_sql(self, lookup_type, sql, params): # https://dev.mysql.com/doc/mysql/en/date-and-time-functions.html if lookup_type == "week_day": # DAYOFWEEK() returns an integer, 1-7, Sunday=1. - return "DAYOFWEEK(%s)" % field_name + return f"DAYOFWEEK({sql})", params elif lookup_type == "iso_week_day": # WEEKDAY() returns an integer, 0-6, Monday=0. - return "WEEKDAY(%s) + 1" % field_name + return f"WEEKDAY({sql}) + 1", params elif lookup_type == "week": # Override the value of default_week_format for consistency with # other database backends. # Mode 3: Monday, 1-53, with 4 or more days this year. - return "WEEK(%s, 3)" % field_name + return f"WEEK({sql}, 3)", params elif lookup_type == "iso_year": # Get the year part from the YEARWEEK function, which returns a # number as year * 100 + week. - return "TRUNCATE(YEARWEEK(%s, 3), -2) / 100" % field_name + return f"TRUNCATE(YEARWEEK({sql}, 3), -2) / 100", params else: # EXTRACT returns 1-53 based on ISO-8601 for the week number. - return "EXTRACT(%s FROM %s)" % (lookup_type.upper(), field_name) + lookup_type = lookup_type.upper() + if not self._extract_format_re.fullmatch(lookup_type): + raise ValueError(f"Invalid loookup type: {lookup_type!r}") + return f"EXTRACT({lookup_type} FROM {sql})", params - def date_trunc_sql(self, lookup_type, field_name, tzname=None): - field_name = self._convert_field_to_tz(field_name, tzname) + def date_trunc_sql(self, lookup_type, sql, params, tzname=None): + sql, params = self._convert_field_to_tz(sql, params, tzname) fields = { - "year": "%%Y-01-01", - "month": "%%Y-%%m-01", - } # Use double percents to escape. + "year": "%Y-01-01", + "month": "%Y-%m-01", + } if lookup_type in fields: format_str = fields[lookup_type] - return "CAST(DATE_FORMAT(%s, '%s') AS DATE)" % (field_name, format_str) + return f"CAST(DATE_FORMAT({sql}, %s) AS DATE)", (*params, format_str) elif lookup_type == "quarter": return ( - "MAKEDATE(YEAR(%s), 1) + " - "INTERVAL QUARTER(%s) QUARTER - INTERVAL 1 QUARTER" - % (field_name, field_name) + f"MAKEDATE(YEAR({sql}), 1) + " + f"INTERVAL QUARTER({sql}) QUARTER - INTERVAL 1 QUARTER", + (*params, *params), ) elif lookup_type == "week": - return "DATE_SUB(%s, INTERVAL WEEKDAY(%s) DAY)" % (field_name, field_name) + return f"DATE_SUB({sql}, INTERVAL WEEKDAY({sql}) DAY)", (*params, *params) else: - return "DATE(%s)" % (field_name) + return f"DATE({sql})", params def _prepare_tzname_delta(self, tzname): tzname, sign, offset = split_tzname_delta(tzname) return f"{sign}{offset}" if offset else tzname - def _convert_field_to_tz(self, field_name, tzname): + def _convert_field_to_tz(self, sql, params, tzname): if tzname and settings.USE_TZ and self.connection.timezone_name != tzname: - field_name = "CONVERT_TZ(%s, '%s', '%s')" % ( - field_name, + return f"CONVERT_TZ({sql}, %s, %s)", ( + *params, self.connection.timezone_name, self._prepare_tzname_delta(tzname), ) - return field_name + return sql, params - def datetime_cast_date_sql(self, field_name, tzname): - field_name = self._convert_field_to_tz(field_name, tzname) - return "DATE(%s)" % field_name + def datetime_cast_date_sql(self, sql, params, tzname): + sql, params = self._convert_field_to_tz(sql, params, tzname) + return f"DATE({sql})", params - def datetime_cast_time_sql(self, field_name, tzname): - field_name = self._convert_field_to_tz(field_name, tzname) - return "TIME(%s)" % field_name + def datetime_cast_time_sql(self, sql, params, tzname): + sql, params = self._convert_field_to_tz(sql, params, tzname) + return f"TIME({sql})", params - def datetime_extract_sql(self, lookup_type, field_name, tzname): - field_name = self._convert_field_to_tz(field_name, tzname) - return self.date_extract_sql(lookup_type, field_name) + def datetime_extract_sql(self, lookup_type, sql, params, tzname): + sql, params = self._convert_field_to_tz(sql, params, tzname) + return self.date_extract_sql(lookup_type, sql, params) - def datetime_trunc_sql(self, lookup_type, field_name, tzname): - field_name = self._convert_field_to_tz(field_name, tzname) + def datetime_trunc_sql(self, lookup_type, sql, params, tzname): + sql, params = self._convert_field_to_tz(sql, params, tzname) fields = ["year", "month", "day", "hour", "minute", "second"] - format = ( - "%%Y-", - "%%m", - "-%%d", - " %%H:", - "%%i", - ":%%s", - ) # Use double percents to escape. + format = ("%Y-", "%m", "-%d", " %H:", "%i", ":%s") format_def = ("0000-", "01", "-01", " 00:", "00", ":00") if lookup_type == "quarter": return ( - "CAST(DATE_FORMAT(MAKEDATE(YEAR({field_name}), 1) + " - "INTERVAL QUARTER({field_name}) QUARTER - " - + "INTERVAL 1 QUARTER, '%%Y-%%m-01 00:00:00') AS DATETIME)" - ).format(field_name=field_name) + f"CAST(DATE_FORMAT(MAKEDATE(YEAR({sql}), 1) + " + f"INTERVAL QUARTER({sql}) QUARTER - " + f"INTERVAL 1 QUARTER, %s) AS DATETIME)" + ), (*params, *params, "%Y-%m-01 00:00:00") if lookup_type == "week": return ( - "CAST(DATE_FORMAT(DATE_SUB({field_name}, " - "INTERVAL WEEKDAY({field_name}) DAY), " - "'%%Y-%%m-%%d 00:00:00') AS DATETIME)" - ).format(field_name=field_name) + f"CAST(DATE_FORMAT(" + f"DATE_SUB({sql}, INTERVAL WEEKDAY({sql}) DAY), %s) AS DATETIME)" + ), (*params, *params, "%Y-%m-%d 00:00:00") try: i = fields.index(lookup_type) + 1 except ValueError: - sql = field_name + pass else: format_str = "".join(format[:i] + format_def[i:]) - sql = "CAST(DATE_FORMAT(%s, '%s') AS DATETIME)" % (field_name, format_str) - return sql + return f"CAST(DATE_FORMAT({sql}, %s) AS DATETIME)", (*params, format_str) + return sql, params - def time_trunc_sql(self, lookup_type, field_name, tzname=None): - field_name = self._convert_field_to_tz(field_name, tzname) + def time_trunc_sql(self, lookup_type, sql, params, tzname=None): + sql, params = self._convert_field_to_tz(sql, params, tzname) fields = { - "hour": "%%H:00:00", - "minute": "%%H:%%i:00", - "second": "%%H:%%i:%%s", - } # Use double percents to escape. + "hour": "%H:00:00", + "minute": "%H:%i:00", + "second": "%H:%i:%s", + } if lookup_type in fields: format_str = fields[lookup_type] - return "CAST(DATE_FORMAT(%s, '%s') AS TIME)" % (field_name, format_str) + return f"CAST(DATE_FORMAT({sql}, %s) AS TIME)", (*params, format_str) else: - return "TIME(%s)" % (field_name) + return f"TIME({sql})", params def fetch_returned_insert_rows(self, cursor): """ |