diff options
-rw-r--r-- | doc/build/changelog/unreleased_20/4038.rst | 19 | ||||
-rw-r--r-- | lib/sqlalchemy/dialects/mysql/reflection.py | 84 | ||||
-rw-r--r-- | test/dialect/mysql/test_reflection.py | 164 |
3 files changed, 267 insertions, 0 deletions
diff --git a/doc/build/changelog/unreleased_20/4038.rst b/doc/build/changelog/unreleased_20/4038.rst new file mode 100644 index 000000000..8e65aef43 --- /dev/null +++ b/doc/build/changelog/unreleased_20/4038.rst @@ -0,0 +1,19 @@ +.. change:: + :tags: schema, mysql, mariadb + :tickets: 4038 + + Add support for Partitioning and Sample pages on MySQL and MariaDB + reflected options. + The options are stored in the table dialect options dictionary, so + the following keyword need to be prefixed with ``mysql_`` or ``mariadb_`` + depending on the backend. + Supported options are: + + * ``stats_sample_pages`` + * ``partition_by`` + * ``partitions`` + * ``subpartition_by`` + + These options are also reflected when loading a table from database, + and will populate the table :attr:`_schema.Table.dialect_options`. + Pull request courtesy of Ramon Will. diff --git a/lib/sqlalchemy/dialects/mysql/reflection.py b/lib/sqlalchemy/dialects/mysql/reflection.py index 2ce8473d0..e7a6b157f 100644 --- a/lib/sqlalchemy/dialects/mysql/reflection.py +++ b/lib/sqlalchemy/dialects/mysql/reflection.py @@ -54,6 +54,8 @@ class MySQLTableDefinitionParser: pass elif line.startswith("CREATE "): self._parse_table_name(line, state) + elif "PARTITION" in line: + self._parse_partition_options(line, state) # Not present in real reflection, but may be if # loading from a file. elif not line: @@ -162,6 +164,62 @@ class MySQLTableDefinitionParser: for opt, val in options.items(): state.table_options["%s_%s" % (self.dialect.name, opt)] = val + def _parse_partition_options(self, line, state): + options = {} + new_line = line[:] + + while new_line.startswith("(") or new_line.startswith(" "): + new_line = new_line[1:] + + for regex, cleanup in self._pr_options: + m = regex.search(new_line) + if not m or "PARTITION" not in regex.pattern: + continue + + directive = m.group("directive") + directive = directive.lower() + is_subpartition = directive == "subpartition" + + if directive == "partition" or is_subpartition: + new_line = new_line.replace(") */", "") + new_line = new_line.replace(",", "") + if is_subpartition and new_line.endswith(")"): + new_line = new_line[:-1] + if self.dialect.name == "mariadb" and new_line.endswith(")"): + if ( + "MAXVALUE" in new_line + or "MINVALUE" in new_line + or "ENGINE" in new_line + ): + # final line of MariaDB partition endswith ")" + new_line = new_line[:-1] + + defs = "%s_%s_definitions" % (self.dialect.name, directive) + options[defs] = new_line + + else: + directive = directive.replace(" ", "_") + value = m.group("val") + if cleanup: + value = cleanup(value) + options[directive] = value + break + + for opt, val in options.items(): + part_def = "%s_partition_definitions" % (self.dialect.name) + subpart_def = "%s_subpartition_definitions" % (self.dialect.name) + if opt == part_def or opt == subpart_def: + # builds a string of definitions + if opt not in state.table_options: + state.table_options[opt] = val + else: + state.table_options[opt] = "%s, %s" % ( + state.table_options[opt], + val, + ) + else: + state.table_options["%s_%s" % (self.dialect.name, opt)] = val + def _parse_column(self, line, state): """Extract column details. @@ -489,9 +547,20 @@ class MySQLTableDefinitionParser: "PACK_KEYS", "ROW_FORMAT", "KEY_BLOCK_SIZE", + "STATS_SAMPLE_PAGES", ): self._add_option_word(option) + for option in ( + "PARTITION BY", + "SUBPARTITION BY", + "PARTITIONS", + "SUBPARTITIONS", + "PARTITION", + "SUBPARTITION", + ): + self._add_partition_option_word(option) + self._add_option_regex("UNION", r"\([^\)]+\)") self._add_option_regex("TABLESPACE", r".*? STORAGE DISK") self._add_option_regex( @@ -519,6 +588,21 @@ class MySQLTableDefinitionParser: ) self._pr_options.append(_pr_compile(regex)) + def _add_partition_option_word(self, directive): + if directive == "PARTITION BY" or directive == "SUBPARTITION BY": + regex = r"(?<!\S)(?P<directive>%s)%s" r"(?P<val>\w+.*)" % ( + re.escape(directive), + self._optional_equals, + ) + elif directive == "SUBPARTITIONS" or directive == "PARTITIONS": + regex = r"(?<!\S)(?P<directive>%s)%s" r"(?P<val>\d+)" % ( + re.escape(directive), + self._optional_equals, + ) + else: + regex = r"(?<!\S)(?P<directive>%s)(?!\S)" % (re.escape(directive),) + self._pr_options.append(_pr_compile(regex)) + def _add_option_regex(self, directive, regex): regex = r"(?P<directive>%s)%s" r"(?P<val>%s)" % ( re.escape(directive), diff --git a/test/dialect/mysql/test_reflection.py b/test/dialect/mysql/test_reflection.py index 0a23282bf..f815a7b3c 100644 --- a/test/dialect/mysql/test_reflection.py +++ b/test/dialect/mysql/test_reflection.py @@ -320,6 +320,7 @@ class ReflectionTest(fixtures.TestBase, AssertsCompiledSQL): mysql_avg_row_length="3", mysql_password="secret", mysql_connection="fish", + mysql_stats_sample_pages="4", ) def_table = Table( @@ -364,6 +365,7 @@ class ReflectionTest(fixtures.TestBase, AssertsCompiledSQL): assert def_table.kwargs["mysql_avg_row_length"] == "3" assert def_table.kwargs["mysql_password"] == "secret" assert def_table.kwargs["mysql_connection"] == "fish" + assert def_table.kwargs["mysql_stats_sample_pages"] == "4" assert reflected.kwargs["mysql_engine"] == "MEMORY" @@ -375,6 +377,7 @@ class ReflectionTest(fixtures.TestBase, AssertsCompiledSQL): ) assert reflected.kwargs["mysql_avg_row_length"] == "3" assert reflected.kwargs["mysql_connection"] == "fish" + assert reflected.kwargs["mysql_stats_sample_pages"] == "4" # This field doesn't seem to be returned by mysql itself. # assert reflected.kwargs['mysql_password'] == 'secret' @@ -382,6 +385,167 @@ class ReflectionTest(fixtures.TestBase, AssertsCompiledSQL): # This is explicitly ignored when reflecting schema. # assert reflected.kwargs['mysql_auto_increment'] == '5' + def _norm_str(self, value, is_definition=False): + if is_definition: + # partition names on MariaDB contain backticks + return value.replace("`", "") + else: + return value.replace("`", "").replace(" ", "").lower() + + def _norm_reflected_table(self, dialect_name, kwords): + dialect_name += "_" + normalised_table = {} + for k, v in kwords.items(): + option = k.replace(dialect_name, "") + normalised_table[option] = self._norm_str(v) + return normalised_table + + def _get_dialect_key(self): + if testing.against("mariadb"): + return "mariadb" + else: + return "mysql" + + def test_reflection_with_partition_options(self, metadata, connection): + base_kwargs = dict( + engine="InnoDB", + default_charset="utf8", + partition_by="HASH(MONTH(c2))", + partitions="6", + ) + dk = self._get_dialect_key() + + kwargs = {f"{dk}_{key}": v for key, v in base_kwargs.items()} + + def_table = Table( + "mysql_def", + metadata, + Column("c1", Integer()), + Column("c2", DateTime), + **kwargs, + ) + eq_(def_table.kwargs[f"{dk}_partition_by"], "HASH(MONTH(c2))") + eq_(def_table.kwargs[f"{dk}_partitions"], "6") + + metadata.create_all(connection) + reflected = Table("mysql_def", MetaData(), autoload_with=connection) + ref_kw = self._norm_reflected_table(dk, reflected.kwargs) + eq_(ref_kw["partition_by"], "hash(month(c2))") + eq_(ref_kw["partitions"], "6") + + def test_reflection_with_subpartition_options(self, connection, metadata): + + subpartititon_text = """HASH (TO_DAYS (c2)) + SUBPARTITIONS 2( + PARTITION p0 VALUES LESS THAN (1990), + PARTITION p1 VALUES LESS THAN (2000), + PARTITION p2 VALUES LESS THAN MAXVALUE + );""" + + base_kwargs = dict( + engine="InnoDB", + default_charset="utf8", + partition_by="RANGE(YEAR(c2))", + subpartition_by=subpartititon_text, + ) + dk = self._get_dialect_key() + kwargs = {f"{dk}_{key}": v for key, v in base_kwargs.items()} + + def_table = Table( + "mysql_def", + metadata, + Column("c1", Integer()), + Column("c2", DateTime), + **kwargs, + ) + + eq_(def_table.kwargs[f"{dk}_partition_by"], "RANGE(YEAR(c2))") + metadata.create_all(connection) + + reflected = Table("mysql_def", MetaData(), autoload_with=connection) + ref_kw = self._norm_reflected_table(dk, reflected.kwargs) + opts = reflected.dialect_options[dk] + + eq_(ref_kw["partition_by"], "range(year(c2))") + eq_(ref_kw["subpartition_by"], "hash(to_days(c2))") + eq_(ref_kw["subpartitions"], "2") + part_definitions = ( + "PARTITION p0 VALUES LESS THAN (1990) ENGINE = InnoDB," + " PARTITION p1 VALUES LESS THAN (2000) ENGINE = InnoDB," + " PARTITION p2 VALUES LESS THAN MAXVALUE ENGINE = InnoDB" + ) + eq_( + self._norm_str(opts["partition_definitions"], True), + part_definitions, + ) + + def test_reflection_with_subpartition_options_two( + self, connection, metadata + ): + partititon_text = """RANGE (YEAR (c2)) + SUBPARTITION BY HASH( TO_DAYS(c2))( + PARTITION p0 VALUES LESS THAN (1990)( + SUBPARTITION s0, + SUBPARTITION s1 + ), + PARTITION p1 VALUES LESS THAN (2000)( + SUBPARTITION s2, + SUBPARTITION s3 + ), + PARTITION p2 VALUES LESS THAN MAXVALUE( + SUBPARTITION s4, + SUBPARTITION s5 + ) + );""" + + base_kwargs = dict( + engine="InnoDB", + default_charset="utf8", + partition_by=partititon_text, + ) + dk = self._get_dialect_key() + kwargs = {f"{dk}_{key}": v for key, v in base_kwargs.items()} + + def_table = Table( + "mysql_def", + metadata, + Column("c1", Integer()), + Column("c2", DateTime), + **kwargs, + ) + eq_(def_table.kwargs[f"{dk}_partition_by"], partititon_text) + + metadata.create_all(connection) + reflected = Table("mysql_def", MetaData(), autoload_with=connection) + + ref_kw = self._norm_reflected_table(dk, reflected.kwargs) + opts = reflected.dialect_options[dk] + eq_(ref_kw["partition_by"], "range(year(c2))") + eq_(ref_kw["subpartition_by"], "hash(to_days(c2))") + + part_definitions = ( + "PARTITION p0 VALUES LESS THAN (1990)," + " PARTITION p1 VALUES LESS THAN (2000)," + " PARTITION p2 VALUES LESS THAN MAXVALUE" + ) + subpart_definitions = ( + "SUBPARTITION s0 ENGINE = InnoDB," + " SUBPARTITION s1 ENGINE = InnoDB," + " SUBPARTITION s2 ENGINE = InnoDB," + " SUBPARTITION s3 ENGINE = InnoDB," + " SUBPARTITION s4 ENGINE = InnoDB," + " SUBPARTITION s5 ENGINE = InnoDB" + ) + + eq_( + self._norm_str(opts["partition_definitions"], True), + part_definitions, + ) + eq_( + self._norm_str(opts["subpartition_definitions"], True), + subpart_definitions, + ) + def test_reflection_on_include_columns(self, metadata, connection): """Test reflection of include_columns to be sure they respect case.""" |