summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSachin <sachin.setiya@mariadb.com>2019-09-01 13:25:16 +0530
committerSachin <sachin.setiya@mariadb.com>2019-09-11 15:09:35 +0530
commit967c14c04e9e14062bd2b8288e054514af3cd322 (patch)
tree5892bd31febe816c9ec6c6526802fd5db6bd80d5
parent0636645e7e13d07a0262571f72bde11e5680a420 (diff)
downloadmariadb-git-967c14c04e9e14062bd2b8288e054514af3cd322.tar.gz
MDEV-20477 Merge binlog extended metadata support from the upstream
Cherry-pick the commits the mysql and some changes. WL#4618 RBR: extended table metadata in the binary log This patch extends Table Map Event. It appends some new fields for more metadata. The new metadata includes: - Signedness of Numberic Columns - Character Set of Character Columns and Binary Columns - Column Name - String Value of SET Columns - String Value of ENUM Columns - Primary Key - Character Set of SET Columns and ENUM Columns - Geometry Type Some of them are optional, the patch introduces a GLOBAL system variable to control it. It is binlog_row_metadata. - Scope: GLOBAL - Dynamic: Yes - Type: ENUM - Values: {NO_LOG, MINIMAL, FULL} - Default: NO_LOG Only Signedness, character set and geometry type are logged if it is MINIMAL. Otherwise all of them are logged. Also add a binlog_type_info() to field, So that we can have extract relevant binlog info from field.
-rw-r--r--client/client_priv.h1
-rw-r--r--client/mysqlbinlog.cc6
-rw-r--r--mysql-test/include/search_pattern_in_file.inc13
-rw-r--r--mysql-test/main/mysqld--help.result7
-rw-r--r--mysql-test/suite/binlog/include/print_optional_metadata.inc34
-rw-r--r--mysql-test/suite/binlog/r/binlog_table_map_optional_metadata.result312
-rw-r--r--mysql-test/suite/binlog/r/binlog_table_map_optional_metadata_binary.result64
-rw-r--r--mysql-test/suite/binlog/r/binlog_table_map_optional_metadata_ucs2.result64
-rw-r--r--mysql-test/suite/binlog/r/binlog_table_map_optional_metadata_utf32.result64
-rw-r--r--mysql-test/suite/binlog/t/binlog_table_map_optional_metadata.test334
-rw-r--r--mysql-test/suite/binlog/t/binlog_table_map_optional_metadata_binary.test73
-rw-r--r--mysql-test/suite/binlog/t/binlog_table_map_optional_metadata_ucs2.test74
-rw-r--r--mysql-test/suite/binlog/t/binlog_table_map_optional_metadata_utf32.test74
-rw-r--r--mysql-test/suite/sys_vars/r/binlog_row_metadata_basic.result90
-rw-r--r--mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result14
-rw-r--r--mysql-test/suite/sys_vars/t/binlog_row_metadata_basic.test121
-rw-r--r--sql/field.cc115
-rw-r--r--sql/field.h185
-rw-r--r--sql/log_event.cc254
-rw-r--r--sql/log_event.h302
-rw-r--r--sql/log_event_client.cc473
-rw-r--r--sql/log_event_server.cc414
-rw-r--r--sql/mysqld.cc1
-rw-r--r--sql/mysqld.h1
-rw-r--r--sql/sql_type_geom.cc7
-rw-r--r--sql/sql_type_geom.h1
-rw-r--r--sql/sys_vars.cc13
27 files changed, 3000 insertions, 111 deletions
diff --git a/client/client_priv.h b/client/client_priv.h
index ce6070b6149..9729fcf40fc 100644
--- a/client/client_priv.h
+++ b/client/client_priv.h
@@ -102,6 +102,7 @@ enum options_client
OPT_PRINT_ROW_COUNT, OPT_PRINT_ROW_EVENT_POSITIONS,
OPT_SHUTDOWN_WAIT_FOR_SLAVES,
OPT_COPY_S3_TABLES,
+ OPT_PRINT_TABLE_METADATA,
OPT_MAX_CLIENT_OPTION /* should be always the last */
};
diff --git a/client/mysqlbinlog.cc b/client/mysqlbinlog.cc
index 19973724a82..2d861ce077f 100644
--- a/client/mysqlbinlog.cc
+++ b/client/mysqlbinlog.cc
@@ -144,6 +144,7 @@ static const char* dirname_for_local_load= 0;
static bool opt_skip_annotate_row_events= 0;
static my_bool opt_flashback;
+static bool opt_print_table_metadata;
#ifdef WHEN_FLASHBACK_REVIEW_READY
static my_bool opt_flashback_review;
static char *flashback_review_dbname, *flashback_review_tablename;
@@ -1095,6 +1096,7 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
print_event_info->hexdump_from= pos;
print_event_info->base64_output_mode= opt_base64_output_mode;
+ print_event_info->print_table_metadata= opt_print_table_metadata;
DBUG_PRINT("debug", ("event_type: %s", ev->get_type_str()));
@@ -1788,6 +1790,10 @@ Example: rewrite-db='from->to'.",
(uchar**) &opt_skip_annotate_row_events,
(uchar**) &opt_skip_annotate_row_events,
0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
+ {"print-table-metadata", OPT_PRINT_TABLE_METADATA,
+ "Print metadata stored in Table_map_log_event",
+ &opt_print_table_metadata, &opt_print_table_metadata, 0,
+ GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
{0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
};
diff --git a/mysql-test/include/search_pattern_in_file.inc b/mysql-test/include/search_pattern_in_file.inc
index 6bead628fb0..6cd725c3698 100644
--- a/mysql-test/include/search_pattern_in_file.inc
+++ b/mysql-test/include/search_pattern_in_file.inc
@@ -9,6 +9,9 @@
#
# The environment variables SEARCH_FILE and SEARCH_PATTERN must be set
# before sourcing this routine.
+# SEARCH_TYPE can also be set to either NULL(default) or _gm_
+# NULL is equivalent of using m/SEARCH_PATTERN/gs
+# _gm_ is equivalent of using m/SEARCH_RANGE/gm
#
# Optionally, SEARCH_RANGE can be set to the max number of bytes of the file
# to search. If negative, it will search that many bytes at the end of the
@@ -77,7 +80,15 @@ perl;
close(FILE);
$content.= $file_content;
}
- my @matches=($content =~ m/$search_pattern/gs);
+ my @matches;
+ if (not defined($ENV{SEARCH_TYPE}))
+ {
+ @matches=($content =~ /$search_pattern/gs);
+ }
+ elsif($ENV{SEARCH_TYPE} == "_gm_")
+ {
+ @matches=($content =~ /$search_pattern/gm);
+ }
my $res=@matches ? "FOUND " . scalar(@matches) : "NOT FOUND";
$ENV{SEARCH_FILE} =~ s{^.*?([^/\\]+)$}{$1};
diff --git a/mysql-test/main/mysqld--help.result b/mysql-test/main/mysqld--help.result
index 2bcabd83d0a..de43900ed76 100644
--- a/mysql-test/main/mysqld--help.result
+++ b/mysql-test/main/mysqld--help.result
@@ -114,6 +114,12 @@ The following specify which files/extra groups are read (specified before remain
the table) is logged in the before image, and only
changed columns are logged in the after image. (Default:
FULL).
+ --binlog-row-metadata=name
+ Controls whether metadata is logged using FULL , MINIMAL
+ format and NO_LOG.FULL causes all metadata to be logged;
+ MINIMAL means that only metadata actually required by
+ slave is logged; NO_LOG NO metadata will be
+ logged.Default: NO_LOG.
--binlog-stmt-cache-size=#
The size of the statement cache for updates to
non-transactional engines for the binary log. If you
@@ -1433,6 +1439,7 @@ binlog-format MIXED
binlog-optimize-thread-scheduling TRUE
binlog-row-event-max-size 8192
binlog-row-image FULL
+binlog-row-metadata NO_LOG
binlog-stmt-cache-size 32768
bulk-insert-buffer-size 8388608
character-set-client-handshake TRUE
diff --git a/mysql-test/suite/binlog/include/print_optional_metadata.inc b/mysql-test/suite/binlog/include/print_optional_metadata.inc
new file mode 100644
index 00000000000..739903ab190
--- /dev/null
+++ b/mysql-test/suite/binlog/include/print_optional_metadata.inc
@@ -0,0 +1,34 @@
+# Auxaliary file for printing optional metadata in table_map_log_event
+# Usage :
+# --let $binlog_file=
+# [--let $stop_position]
+# [--let $print_primary_key]
+# --source extra/binlog_tests/print_optional_metadata.inc
+
+--let $output_file= $MYSQLTEST_VARDIR/tmp/mysqlbinlog.output
+
+--let $_stop_position_opt=
+if ($stop_position)
+{
+ --let $_stop_position_opt=--stop-position=$stop_position
+}
+
+--exec $MYSQL_BINLOG -F --print-table-metadata $_stop_position_opt $binlog_file > $output_file
+
+
+--let SEARCH_PATTERN= # (?:Columns\(| {8}).*
+--let SEARCH_FILE= $output_file
+--let SEARCH_OUTPUT=matches
+--let SEARCH_TYPE="_gm_"
+--source include/search_pattern_in_file.inc
+
+if ($print_primary_key)
+{
+ --let SEARCH_PATTERN= # Primary Key
+ --source include/search_pattern_in_file.inc
+}
+--remove_file $output_file
+--let $stop_position=
+--let $_stop_position_opt=
+
+
diff --git a/mysql-test/suite/binlog/r/binlog_table_map_optional_metadata.result b/mysql-test/suite/binlog/r/binlog_table_map_optional_metadata.result
new file mode 100644
index 00000000000..cb34f48fb69
--- /dev/null
+++ b/mysql-test/suite/binlog/r/binlog_table_map_optional_metadata.result
@@ -0,0 +1,312 @@
+RESET MASTER;
+SET GLOBAL binlog_row_metadata = MINIMAL;
+#
+# Temporal types can be printed correctly
+#
+CREATE TABLE t1(c_year YEAR, c_date DATE, c_time TIME, c_time_f TIME(3),
+c_datetime DATETIME, c_datetime_f DATETIME(3),
+c_timestamp TIMESTAMP, c_timestamp_f TIMESTAMP(3) DEFAULT "2017-1-1 10:10:10");
+INSERT INTO t1(c_year) VALUES(2017);
+# Columns(YEAR,
+# DATE,
+# TIME,
+# TIME(3),
+# DATETIME,
+# DATETIME(3),
+# TIMESTAMP NOT NULL,
+# TIMESTAMP(3) NOT NULL)
+DROP TABLE t1;
+RESET MASTER;
+#
+# Geometry types can be printed correctly
+#
+CREATE TABLE t1 (c_geo GEOMETRY, c_point POINT, c_linestring LINESTRING,
+c_polygon POLYGON, c_multi_point MULTIPOINT,
+c_multi_linestring MULTILINESTRING, c_multi_polygon MULTIPOLYGON,
+c_geometrycollection GEOMETRYCOLLECTION, c_char CHAR(100));
+INSERT INTO t1(c_point) VALUES(ST_PointFromText('POINT(10 10)'));
+# Columns(GEOMETRY,
+# POINT,
+# LINESTRING,
+# POLYGON,
+# MULTIPOINT,
+# MULTILINESTRING,
+# MULTIPOLYGON,
+# GEOMETRYCOLLECTION,
+# CHAR(100) CHARSET latin1 COLLATE latin1_swedish_ci)
+RESET MASTER;
+SET GLOBAL binlog_row_metadata = FULL;
+INSERT INTO t1(c_point) VALUES(ST_PointFromText('POINT(10 10)'));
+# Columns(`c_geo` GEOMETRY,
+# `c_point` POINT,
+# `c_linestring` LINESTRING,
+# `c_polygon` POLYGON,
+# `c_multi_point` MULTIPOINT,
+# `c_multi_linestring` MULTILINESTRING,
+# `c_multi_polygon` MULTIPOLYGON,
+# `c_geometrycollection` GEOMETRYCOLLECTION,
+# `c_char` CHAR(100) CHARSET latin1 COLLATE latin1_swedish_ci)
+DROP TABLE t1;
+RESET MASTER;
+#
+# Numeric types can be printed correctly
+#
+CREATE TABLE t1(c_bit BIT(10), c_bool BOOL, c_smallint SMALLINT,
+c_mediumint MEDIUMINT, c_int INT UNSIGNED, c_bigint BIGINT,
+c_float FLOAT UNSIGNED, c_double DOUBLE, c_decimal DECIMAL(10, 2));
+SET GLOBAL binlog_row_metadata = MINIMAL;
+INSERT INTO t1(c_bool) VALUES(1);
+# UNSIGNED flag should be printed
+# Columns(BIT(10),
+# TINYINT,
+# SMALLINT,
+# MEDIUMINT,
+# INT UNSIGNED,
+# BIGINT,
+# FLOAT UNSIGNED,
+# DOUBLE,
+# DECIMAL(10,2))
+RESET MASTER;
+SET GLOBAL binlog_row_metadata = FULL;
+INSERT INTO t1(c_bool) VALUES(1);
+# Columns(`c_bit` BIT(10),
+# `c_bool` TINYINT,
+# `c_smallint` SMALLINT,
+# `c_mediumint` MEDIUMINT,
+# `c_int` INT UNSIGNED,
+# `c_bigint` BIGINT,
+# `c_float` FLOAT UNSIGNED,
+# `c_double` DOUBLE,
+# `c_decimal` DECIMAL(10,2))
+DROP TABLE t1;
+RESET MASTER;
+#
+# Character types can be printed correctly
+#
+CREATE TABLE t1(c_char CHAR(10), c_varchar VARCHAR(500),
+c_tinytext TINYTEXT, c_text TEXT,
+c_mediumtext MEDIUMTEXT, c_longtext LONGTEXT CHARSET utf8);
+SET GLOBAL binlog_row_metadata = MINIMAL;
+INSERT INTO t1(c_char) VALUES("1");
+# Columns(CHAR(10) CHARSET latin1 COLLATE latin1_swedish_ci,
+# VARCHAR(500) CHARSET latin1 COLLATE latin1_swedish_ci,
+# TINYTEXT CHARSET latin1 COLLATE latin1_swedish_ci,
+# TEXT CHARSET latin1 COLLATE latin1_swedish_ci,
+# MEDIUMTEXT CHARSET latin1 COLLATE latin1_swedish_ci,
+# LONGTEXT CHARSET utf8 COLLATE utf8_general_ci)
+RESET MASTER;
+SET GLOBAL binlog_row_metadata = FULL;
+INSERT INTO t1(c_char) VALUES("1");
+# Columns(`c_char` CHAR(10) CHARSET latin1 COLLATE latin1_swedish_ci,
+# `c_varchar` VARCHAR(500) CHARSET latin1 COLLATE latin1_swedish_ci,
+# `c_tinytext` TINYTEXT CHARSET latin1 COLLATE latin1_swedish_ci,
+# `c_text` TEXT CHARSET latin1 COLLATE latin1_swedish_ci,
+# `c_mediumtext` MEDIUMTEXT CHARSET latin1 COLLATE latin1_swedish_ci,
+# `c_longtext` LONGTEXT CHARSET utf8 COLLATE utf8_general_ci)
+DROP TABLE t1;
+RESET MASTER;
+#
+# Column names with non-ascii characters and escape characters can be printed correctly
+#
+set names utf8;
+CREATE TABLE t1(`åäö表\a'``"` INT);
+SHOW CREATE TABLE t1;
+Table Create Table
+t1 CREATE TABLE `t1` (
+ `åäö表\a'``"` int(11) DEFAULT NULL
+) ENGINE=MyISAM DEFAULT CHARSET=latin1
+INSERT INTO t1 VALUES(1);
+# Columns(`åäö表\\a\'`"` INT)
+DROP TABLE t1;
+RESET MASTER;
+#
+# Charsets can be printed correctly
+#
+CREATE TABLE t1(c_char_utf8 CHAR(10) CHARSET utf8,
+c_varchar_utf8 VARCHAR(10) CHARSET utf8,
+c_text_utf8 TEXT CHARSET utf8);
+INSERT INTO t1 VALUES("1", "2", "3");
+# Columns(`c_char_utf8` CHAR(10) CHARSET utf8 COLLATE utf8_general_ci,
+# `c_varchar_utf8` VARCHAR(10) CHARSET utf8 COLLATE utf8_general_ci,
+# `c_text_utf8` TEXT CHARSET utf8 COLLATE utf8_general_ci)
+DROP TABLE t1;
+RESET MASTER;
+CREATE TABLE t1(c_utf8mb4_520 CHAR(10) CHARSET utf8mb4 COLLATE utf8mb4_unicode_ci,
+c_utf8mb4_0900 VARCHAR(10) CHARSET utf8mb4 COLLATE utf8mb4_polish_ci,
+c_utf8mb4_def TEXT CHARSET utf8mb4);
+INSERT INTO t1 VALUES("1", "2", "3");
+# Columns(`c_utf8mb4_520` CHAR(10) CHARSET utf8mb4 COLLATE utf8mb4_unicode_ci,
+# `c_utf8mb4_0900` VARCHAR(10) CHARSET utf8mb4 COLLATE utf8mb4_polish_ci,
+# `c_utf8mb4_def` TEXT CHARSET utf8mb4 COLLATE utf8mb4_general_ci)
+DROP TABLE t1;
+RESET MASTER;
+#
+# Blob and binary columns can be printed correctly
+#
+CREATE TABLE t1(c_binary BINARY(10), c_varbinary VARBINARY(10),
+c_tinyblob TINYBLOB, c_blob BLOB,
+c_mediumblob MEDIUMBLOB, c_longblob LONGBLOB);
+INSERT INTO t1 VALUES("1", "2", "3", "4", "5", "6");
+# Columns(`c_binary` BINARY(10),
+# `c_varbinary` VARBINARY(10),
+# `c_tinyblob` TINYBLOB,
+# `c_blob` BLOB,
+# `c_mediumblob` MEDIUMBLOB,
+# `c_longblob` LONGBLOB)
+DROP TABLE t1;
+RESET MASTER;
+#
+# Verify that SET string values and character sets can be printed correctly
+#
+set names utf8;
+CREATE TABLE t1(
+c_set_1 SET("set1_v1_å", "set1_v2_ä", "set1_v3_ö"),
+c_set_2 SET("set2_v1_å", "set2_v2_ä", "set2_v3_ö") CHARACTER SET latin1,
+c_set_4 SET("set3_v1_å", "set3_v2_ä", "set3_v3_ö") CHARACTER SET swe7 COLLATE swe7_bin);
+SET GLOBAL binlog_row_metadata = MINIMAL;
+INSERT INTO t1 VALUES("set1_v1_å", "set2_v3_ö", "set3_v1_å");
+# Columns(SET,
+# SET,
+# SET)
+RESET MASTER;
+SET GLOBAL binlog_row_metadata = FULL;
+INSERT INTO t1 VALUES("set1_v1_å", "set2_v3_ö", "set3_v1_å");
+# Columns(`c_set_1` SET('set1_v1_å','set1_v2_ä','set1_v3_ö') CHARSET latin1 COLLATE latin1_swedish_ci,
+# `c_set_2` SET('set2_v1_å','set2_v2_ä','set2_v3_ö') CHARSET latin1 COLLATE latin1_swedish_ci,
+# `c_set_4` SET('set3_v1_}','set3_v2_{','set3_v3_|') CHARSET swe7 COLLATE swe7_bin)
+DROP TABLE t1;
+RESET MASTER;
+#
+# Verify that ENUM string values and character sets can be printed correctly
+#
+CREATE TABLE t1(
+c_enum_1 ENUM("enum1_v1_å", "enum1_v2_ä", "enum1_v3_ö"),
+c_enum_3 ENUM("enum2_v1_å", "enum2_v2_ä", "enum2_v3_ö") CHARACTER SET latin1,
+c_enum_4 ENUM("enum3_v1_å", "enum3_v2_ä", "enum3_v3_ö") CHARACTER SET swe7 COLLATE swe7_bin);
+SET GLOBAL binlog_row_metadata = MINIMAL;
+INSERT INTO t1 VALUES("enum1_v1_å", "enum2_v3_ö", "enum3_v1_å");
+# Columns(ENUM,
+# ENUM,
+# ENUM)
+RESET MASTER;
+SET GLOBAL binlog_row_metadata = FULL;
+INSERT INTO t1 VALUES("enum1_v1_å", "enum2_v3_ö", "enum3_v1_å");
+# Columns(`c_enum_1` ENUM('enum1_v1_å','enum1_v2_ä','enum1_v3_ö') CHARSET latin1 COLLATE latin1_swedish_ci,
+# `c_enum_3` ENUM('enum2_v1_å','enum2_v2_ä','enum2_v3_ö') CHARSET latin1 COLLATE latin1_swedish_ci,
+# `c_enum_4` ENUM('enum3_v1_}','enum3_v2_{','enum3_v3_|') CHARSET swe7 COLLATE swe7_bin)
+DROP TABLE t1;
+RESET MASTER;
+#
+# Verify that explicit NOT NULL can be printed correctly
+#
+CREATE TABLE t1(c_not_null1 INT NOT NULL, c_null1 INT, c_not_null2 INT NOT NULL,
+c_null2 INT);
+INSERT INTO t1 VALUES(1, 2, 3, 4);
+# Columns(`c_not_null1` INT NOT NULL,
+# `c_null1` INT,
+# `c_not_null2` INT NOT NULL,
+# `c_null2` INT)
+DROP TABLE t1;
+RESET MASTER;
+#
+# Verify that primary key can be printed correctly
+#
+CREATE TABLE t1(c_key1 INT, c_key3 INT, c_not_key INT, c_key2 INT,
+PRIMARY KEY(c_key1, c_key2, c_key3));
+INSERT INTO t1 VALUES(1, 2, 3, 4);
+# Columns(`c_key1` INT NOT NULL,
+# `c_key3` INT NOT NULL,
+# `c_not_key` INT,
+# `c_key2` INT NOT NULL)
+# Primary Key
+DROP TABLE t1;
+RESET MASTER;
+CREATE TABLE t1(c_key1 CHAR(100), c_key3 CHAR(100), c_not_key INT, c_key2 CHAR(10),
+PRIMARY KEY(c_key1(5), c_key2, c_key3(10)));
+INSERT INTO t1 VALUES("1", "2", 3, "4");
+# Columns(`c_key1` CHAR(100) NOT NULL CHARSET latin1 COLLATE latin1_swedish_ci,
+# `c_key3` CHAR(100) NOT NULL CHARSET latin1 COLLATE latin1_swedish_ci,
+# `c_not_key` INT,
+# `c_key2` CHAR(10) NOT NULL CHARSET latin1 COLLATE latin1_swedish_ci)
+# Primary Key
+RESET MASTER;
+SET GLOBAL binlog_row_metadata = MINIMAL;
+INSERT INTO t1 VALUES("2", "2", 3, "4");
+# Columns(CHAR(100) NOT NULL CHARSET latin1 COLLATE latin1_swedish_ci,
+# CHAR(100) NOT NULL CHARSET latin1 COLLATE latin1_swedish_ci,
+# INT,
+# CHAR(10) NOT NULL CHARSET latin1 COLLATE latin1_swedish_ci)
+RESET MASTER;
+#
+# Coverage test: Print column index instead of column name if column name
+# is not binlogged.
+#
+SET GLOBAL binlog_row_metadata = FULL;
+SET SESSION debug_dbug = 'd, dont_log_column_name';
+INSERT INTO t1 VALUES("3", "2", 3, "4");
+# Columns(`c_key1` CHAR(100) NOT NULL CHARSET latin1 COLLATE latin1_swedish_ci,
+# `c_key3` CHAR(100) NOT NULL CHARSET latin1 COLLATE latin1_swedish_ci,
+# `c_not_key` INT,
+# `c_key2` CHAR(10) NOT NULL CHARSET latin1 COLLATE latin1_swedish_ci)
+# Primary Key
+DROP TABLE t1;
+RESET MASTER;
+#
+# Coverage test: Inject an invalid column type
+#
+CREATE TABLE t1(c1 int, c2 BLOB);
+SET SESSION debug_dbug = 'd,inject_invalid_column_type';
+INSERT INTO t1 VALUES(1, "a");
+# Columns(`c1` INT,
+# `c2` INVALID_TYPE(230))
+RESET MASTER;
+#
+# Coverage test: Inject an invalid BLOB metadata
+#
+SET SESSION debug_dbug = 'd,inject_invalid_blob_size';
+INSERT INTO t1 VALUES(2, "b");
+# Columns(`c1` INT,
+# `c2` INVALID_BLOB(5))
+#
+# Coverage test: Inject an invalid Geometry type
+#
+DROP TABLE t1;
+CREATE TABLE t1(c_geometry GEOMETRY, c_point POINT, c_multilinestring MULTILINESTRING);
+RESET MASTER;
+SET SESSION debug_dbug = 'd,inject_invalid_geometry_type';
+INSERT INTO t1(c_point) VALUES(ST_PointFromText('POINT(10 10)'));
+# Columns(`c_geometry` INVALID_GEOMETRY_TYPE(100),
+# `c_point` INVALID_GEOMETRY_TYPE(100),
+# `c_multilinestring` INVALID_GEOMETRY_TYPE(100))
+DROP TABLE t1;
+RESET MASTER;
+#
+# Comptibility Test: Verify mysqlbinlog can print OLD table_map_log_event
+# without any optional metadata
+#
+CREATE TABLE t1(c_int INT, c_tiny_int_unsigned TINYINT UNSIGNED,
+c_binary BINARY(10), c_text TEXT, c_point POINT);
+SET session debug_dbug='d,simulate_no_optional_metadata';
+INSERT INTO t1(c_int) VALUES(1);
+# Columns(INT,
+# TINYINT,
+# BINARY(10),
+# BLOB,
+# GEOMETRY)
+DROP TABLE t1;
+RESET MASTER;
+#
+# Simulate error on initializing charset and primary key metadata
+#
+CREATE TABLE t1(c1 char(10) PRIMARY KEY);
+SET session debug_dbug='d,simulate_init_charset_field_error';
+INSERT INTO t1 VALUES("a");
+SET GLOBAL binlog_row_metadata = FULL;
+SET session debug_dbug='d,simulate_init_primary_key_field_error';
+INSERT INTO t1 VALUES("b");
+# Columns(BINARY(10) NOT NULL)
+# Columns(BINARY(10) NOT NULL)
+SET SESSION debug_dbug = '';
+SET GLOBAL binlog_row_metadata = NO_LOG;
+DROP TABLE t1;
+RESET MASTER;
diff --git a/mysql-test/suite/binlog/r/binlog_table_map_optional_metadata_binary.result b/mysql-test/suite/binlog/r/binlog_table_map_optional_metadata_binary.result
new file mode 100644
index 00000000000..789bc6cd178
--- /dev/null
+++ b/mysql-test/suite/binlog/r/binlog_table_map_optional_metadata_binary.result
@@ -0,0 +1,64 @@
+#
+# Verify that SET string values and character sets can be printed correctly
+#
+SET NAMES utf8;
+CREATE TABLE t1(
+c_set_1 SET("set1_v1_å", "set1_v2_ä", "set1_v3_ö"),
+c_set_2 SET("set2_v1_å", "set2_v2_ä", "set2_v3_ö") CHARACTER SET binary);
+SET GLOBAL binlog_row_metadata = MINIMAL;
+INSERT INTO t1 VALUES("set1_v1_å", "set2_v2_ä");
+# Columns(SET,
+# SET)
+RESET MASTER;
+SET GLOBAL binlog_row_metadata = FULL;
+INSERT INTO t1 VALUES("set1_v1_å", "set2_v2_ä");
+# Columns(`c_set_1` SET('set1_v1_å','set1_v2_ä','set1_v3_ö') CHARSET latin1 COLLATE latin1_swedish_ci,
+# `c_set_2` SET('set2_v1_å','set2_v2_ä','set2_v3_ö') CHARSET binary COLLATE binary)
+INSERT INTO t1 VALUES("set1_v3_ö", "set2_v3_ö");
+INSERT INTO t1 VALUES("set1_v1_Ã¥", "set2_v1_Ã¥");
+SELECT c_set_1, HEX(c_set_1) FROM t1;
+c_set_1 HEX(c_set_1)
+set1_v1_Ã¥ 736574315F76315FE5
+set1_v1_Ã¥ 736574315F76315FE5
+set1_v3_ö 736574315F76335FF6
+set1_v1_Ã¥ 736574315F76315FE5
+SELECT c_set_2, HEX(c_set_2) FROM t1;
+c_set_2 HEX(c_set_2)
+set2_v2_ä 736574325F76325FC3A4
+set2_v2_ä 736574325F76325FC3A4
+set2_v3_ö 736574325F76335FC3B6
+set2_v1_Ã¥ 736574325F76315FC3A5
+DROP TABLE t1;
+RESET MASTER;
+#
+# Verify that ENUM string values and character sets can be printed correctly
+#
+CREATE TABLE t1(
+c_enum_1 ENUM("enum1_v1_å", "enum1_v2_ä", "enum1_v3_ö"),
+c_enum_2 ENUM("enum2_v1_å", "enum2_v2_ä", "enum2_v3_ö") CHARACTER SET binary);
+SET GLOBAL binlog_row_metadata = MINIMAL;
+INSERT INTO t1 VALUES("enum1_v1_å", "enum2_v2_ä");
+# Columns(ENUM,
+# ENUM)
+RESET MASTER;
+SET GLOBAL binlog_row_metadata = FULL;
+INSERT INTO t1 VALUES("enum1_v1_å", "enum2_v2_ä");
+# Columns(`c_enum_1` ENUM('enum1_v1_å','enum1_v2_ä','enum1_v3_ö') CHARSET latin1 COLLATE latin1_swedish_ci,
+# `c_enum_2` ENUM('enum2_v1_å','enum2_v2_ä','enum2_v3_ö') CHARSET binary COLLATE binary)
+INSERT INTO t1 VALUES("enum1_v3_ö", "enum2_v3_ö");
+INSERT INTO t1 VALUES("enum1_v1_Ã¥", "enum2_v1_Ã¥");
+SELECT c_enum_1, HEX(c_enum_1) FROM t1;
+c_enum_1 HEX(c_enum_1)
+enum1_v1_Ã¥ 656E756D315F76315FE5
+enum1_v1_Ã¥ 656E756D315F76315FE5
+enum1_v3_ö 656E756D315F76335FF6
+enum1_v1_Ã¥ 656E756D315F76315FE5
+SELECT c_enum_2, HEX(c_enum_2) FROM t1;
+c_enum_2 HEX(c_enum_2)
+enum2_v2_ä 656E756D325F76325FC3A4
+enum2_v2_ä 656E756D325F76325FC3A4
+enum2_v3_ö 656E756D325F76335FC3B6
+enum2_v1_Ã¥ 656E756D325F76315FC3A5
+DROP TABLE t1;
+RESET MASTER;
+SET GLOBAL binlog_row_metadata = NO_LOG;
diff --git a/mysql-test/suite/binlog/r/binlog_table_map_optional_metadata_ucs2.result b/mysql-test/suite/binlog/r/binlog_table_map_optional_metadata_ucs2.result
new file mode 100644
index 00000000000..1b1d2a79725
--- /dev/null
+++ b/mysql-test/suite/binlog/r/binlog_table_map_optional_metadata_ucs2.result
@@ -0,0 +1,64 @@
+#
+# Verify that SET string values and character sets can be printed correctly
+#
+SET NAMES utf8;
+CREATE TABLE t1(
+c_set_1 SET("set1_v1_å", "set1_v2_ä", "set1_v3_ö"),
+c_set_2 SET("set2_v1_å", "set2_v2_ä", "set2_v3_ö") CHARACTER SET ucs2);
+SET GLOBAL binlog_row_metadata = MINIMAL;
+INSERT INTO t1 VALUES("set1_v1_å", "set2_v2_ä");
+# Columns(SET,
+# SET)
+RESET MASTER;
+SET GLOBAL binlog_row_metadata = FULL;
+INSERT INTO t1 VALUES("set1_v1_å", "set2_v2_ä");
+# Columns(`c_set_1` SET('set1_v1_å','set1_v2_ä','set1_v3_ö') CHARSET latin1 COLLATE latin1_swedish_ci,
+# `c_set_2` SET('\0s\0e\0t\02\0_\0v\01\0_\0å','\0s\0e\0t\02\0_\0v\02\0_\0ä','\0s\0e\0t\02\0_\0v\03\0_\0ö') CHARSET ucs2 COLLATE ucs2_general_ci)
+INSERT INTO t1 VALUES("set1_v3_ö", "set2_v3_ö");
+INSERT INTO t1 VALUES("set1_v1_Ã¥", "set2_v1_Ã¥");
+SELECT c_set_1, HEX(c_set_1) FROM t1;
+c_set_1 HEX(c_set_1)
+set1_v1_Ã¥ 736574315F76315FE5
+set1_v1_Ã¥ 736574315F76315FE5
+set1_v3_ö 736574315F76335FF6
+set1_v1_Ã¥ 736574315F76315FE5
+SELECT c_set_2, HEX(c_set_2) FROM t1;
+c_set_2 HEX(c_set_2)
+set2_v2_ä 0073006500740032005F00760032005F00E4
+set2_v2_ä 0073006500740032005F00760032005F00E4
+set2_v3_ö 0073006500740032005F00760033005F00F6
+set2_v1_Ã¥ 0073006500740032005F00760031005F00E5
+DROP TABLE t1;
+RESET MASTER;
+#
+# Verify that ENUM string values and character sets can be printed correctly
+#
+CREATE TABLE t1(
+c_enum_1 ENUM("enum1_v1_å", "enum1_v2_ä", "enum1_v3_ö"),
+c_enum_2 ENUM("enum2_v1_å", "enum2_v2_ä", "enum2_v3_ö") CHARACTER SET ucs2);
+SET GLOBAL binlog_row_metadata = MINIMAL;
+INSERT INTO t1 VALUES("enum1_v1_å", "enum2_v2_ä");
+# Columns(ENUM,
+# ENUM)
+RESET MASTER;
+SET GLOBAL binlog_row_metadata = FULL;
+INSERT INTO t1 VALUES("enum1_v1_å", "enum2_v2_ä");
+# Columns(`c_enum_1` ENUM('enum1_v1_å','enum1_v2_ä','enum1_v3_ö') CHARSET latin1 COLLATE latin1_swedish_ci,
+# `c_enum_2` ENUM('\0e\0n\0u\0m\02\0_\0v\01\0_\0å','\0e\0n\0u\0m\02\0_\0v\02\0_\0ä','\0e\0n\0u\0m\02\0_\0v\03\0_\0ö') CHARSET ucs2 COLLATE ucs2_general_ci)
+INSERT INTO t1 VALUES("enum1_v3_ö", "enum2_v3_ö");
+INSERT INTO t1 VALUES("enum1_v1_Ã¥", "enum2_v1_Ã¥");
+SELECT c_enum_1, HEX(c_enum_1) FROM t1;
+c_enum_1 HEX(c_enum_1)
+enum1_v1_Ã¥ 656E756D315F76315FE5
+enum1_v1_Ã¥ 656E756D315F76315FE5
+enum1_v3_ö 656E756D315F76335FF6
+enum1_v1_Ã¥ 656E756D315F76315FE5
+SELECT c_enum_2, HEX(c_enum_2) FROM t1;
+c_enum_2 HEX(c_enum_2)
+enum2_v2_ä 0065006E0075006D0032005F00760032005F00E4
+enum2_v2_ä 0065006E0075006D0032005F00760032005F00E4
+enum2_v3_ö 0065006E0075006D0032005F00760033005F00F6
+enum2_v1_Ã¥ 0065006E0075006D0032005F00760031005F00E5
+DROP TABLE t1;
+RESET MASTER;
+SET GLOBAL binlog_row_metadata = NO_LOG;
diff --git a/mysql-test/suite/binlog/r/binlog_table_map_optional_metadata_utf32.result b/mysql-test/suite/binlog/r/binlog_table_map_optional_metadata_utf32.result
new file mode 100644
index 00000000000..6fdda842bac
--- /dev/null
+++ b/mysql-test/suite/binlog/r/binlog_table_map_optional_metadata_utf32.result
@@ -0,0 +1,64 @@
+#
+# Verify that SET string values and character sets can be printed correctly
+#
+SET NAMES utf8;
+CREATE TABLE t1(
+c_set_1 SET("set1_v1_å", "set1_v2_ä", "set1_v3_ö"),
+c_set_2 SET("set2_v1_å", "set2_v2_ä", "set2_v3_ö") CHARACTER SET utf32);
+SET GLOBAL binlog_row_metadata = MINIMAL;
+INSERT INTO t1 VALUES("set1_v1_å", "set2_v2_ä");
+# Columns(SET,
+# SET)
+RESET MASTER;
+SET GLOBAL binlog_row_metadata = FULL;
+INSERT INTO t1 VALUES("set1_v1_å", "set2_v2_ä");
+# Columns(`c_set_1` SET('set1_v1_å','set1_v2_ä','set1_v3_ö') CHARSET latin1 COLLATE latin1_swedish_ci,
+# `c_set_2` SET('\0\0\0s\0\0\0e\0\0\0t\0\0\02\0\0\0_\0\0\0v\0\0\01\0\0\0_\0\0\0å','\0\0\0s\0\0\0e\0\0\0t\0\0\02\0\0\0_\0\0\0v\0\0\02\0\0\0_\0\0\0ä','\0\0\0s\0\0\0e\0\0\0t\0\0\02\0\0\0_\0\0\0v\0\0\03\0\0\0_\0\0\0ö') CHARSET utf32 COLLATE utf32_general_ci)
+INSERT INTO t1 VALUES("set1_v3_ö", "set2_v3_ö");
+INSERT INTO t1 VALUES("set1_v1_Ã¥", "set2_v1_Ã¥");
+SELECT c_set_1, HEX(c_set_1) FROM t1;
+c_set_1 HEX(c_set_1)
+set1_v1_Ã¥ 736574315F76315FE5
+set1_v1_Ã¥ 736574315F76315FE5
+set1_v3_ö 736574315F76335FF6
+set1_v1_Ã¥ 736574315F76315FE5
+SELECT c_set_2, HEX(c_set_2) FROM t1;
+c_set_2 HEX(c_set_2)
+set2_v2_ä 000000730000006500000074000000320000005F00000076000000320000005F000000E4
+set2_v2_ä 000000730000006500000074000000320000005F00000076000000320000005F000000E4
+set2_v3_ö 000000730000006500000074000000320000005F00000076000000330000005F000000F6
+set2_v1_Ã¥ 000000730000006500000074000000320000005F00000076000000310000005F000000E5
+DROP TABLE t1;
+RESET MASTER;
+#
+# Verify that ENUM string values and character sets can be printed correctly
+#
+CREATE TABLE t1(
+c_enum_1 ENUM("enum1_v1_å", "enum1_v2_ä", "enum1_v3_ö"),
+c_enum_2 ENUM("enum2_v1_å", "enum2_v2_ä", "enum2_v3_ö") CHARACTER SET utf32);
+SET GLOBAL binlog_row_metadata = MINIMAL;
+INSERT INTO t1 VALUES("enum1_v1_å", "enum2_v2_ä");
+# Columns(ENUM,
+# ENUM)
+RESET MASTER;
+SET GLOBAL binlog_row_metadata = FULL;
+INSERT INTO t1 VALUES("enum1_v1_å", "enum2_v2_ä");
+# Columns(`c_enum_1` ENUM('enum1_v1_å','enum1_v2_ä','enum1_v3_ö') CHARSET latin1 COLLATE latin1_swedish_ci,
+# `c_enum_2` ENUM('\0\0\0e\0\0\0n\0\0\0u\0\0\0m\0\0\02\0\0\0_\0\0\0v\0\0\01\0\0\0_\0\0\0å','\0\0\0e\0\0\0n\0\0\0u\0\0\0m\0\0\02\0\0\0_\0\0\0v\0\0\02\0\0\0_\0\0\0ä','\0\0\0e\0\0\0n\0\0\0u\0\0\0m\0\0\02\0\0\0_\0\0\0v\0\0\03\0\0\0_\0\0\0ö') CHARSET utf32 COLLATE utf32_general_ci)
+INSERT INTO t1 VALUES("enum1_v3_ö", "enum2_v3_ö");
+INSERT INTO t1 VALUES("enum1_v1_Ã¥", "enum2_v1_Ã¥");
+SELECT c_enum_1, HEX(c_enum_1) FROM t1;
+c_enum_1 HEX(c_enum_1)
+enum1_v1_Ã¥ 656E756D315F76315FE5
+enum1_v1_Ã¥ 656E756D315F76315FE5
+enum1_v3_ö 656E756D315F76335FF6
+enum1_v1_Ã¥ 656E756D315F76315FE5
+SELECT c_enum_2, HEX(c_enum_2) FROM t1;
+c_enum_2 HEX(c_enum_2)
+enum2_v2_ä 000000650000006E000000750000006D000000320000005F00000076000000320000005F000000E4
+enum2_v2_ä 000000650000006E000000750000006D000000320000005F00000076000000320000005F000000E4
+enum2_v3_ö 000000650000006E000000750000006D000000320000005F00000076000000330000005F000000F6
+enum2_v1_Ã¥ 000000650000006E000000750000006D000000320000005F00000076000000310000005F000000E5
+DROP TABLE t1;
+RESET MASTER;
+SET GLOBAL binlog_row_metadata = NO_LOG;
diff --git a/mysql-test/suite/binlog/t/binlog_table_map_optional_metadata.test b/mysql-test/suite/binlog/t/binlog_table_map_optional_metadata.test
new file mode 100644
index 00000000000..b55077b0dff
--- /dev/null
+++ b/mysql-test/suite/binlog/t/binlog_table_map_optional_metadata.test
@@ -0,0 +1,334 @@
+################################################################################
+# WL#4618 RBR: extended table metadata in the binary log
+#
+# Below metadata is logged into Table_map_log_event
+# - signedness of numeric columns
+# - charsets of character columns
+# - column names
+# - set/enum character sets and string values
+# - primary key
+#
+# The first two are always logged. The others are controlled by system
+# variable --binlog-row-metadata
+#
+# The test will verify if the metadata can be logged and printed by mysqlbinlog
+# correctly.
+# mysqlbinlog --print-table-metadata will print the extra metadata
+################################################################################
+--source include/have_debug.inc
+--source include/have_binlog_format_row.inc
+
+RESET MASTER;
+SET GLOBAL binlog_row_metadata = MINIMAL;
+
+--let $MYSQLD_DATADIR= `select @@datadir`
+--let $binlog_file= $MYSQLD_DATADIR/master-bin.000001
+
+--echo #
+--echo # Temporal types can be printed correctly
+--echo #
+CREATE TABLE t1(c_year YEAR, c_date DATE, c_time TIME, c_time_f TIME(3),
+ c_datetime DATETIME, c_datetime_f DATETIME(3),
+ c_timestamp TIMESTAMP, c_timestamp_f TIMESTAMP(3) DEFAULT "2017-1-1 10:10:10");
+
+INSERT INTO t1(c_year) VALUES(2017);
+--source include/print_optional_metadata.inc
+
+DROP TABLE t1;
+RESET MASTER;
+
+--echo #
+--echo # Geometry types can be printed correctly
+--echo #
+CREATE TABLE t1 (c_geo GEOMETRY, c_point POINT, c_linestring LINESTRING,
+ c_polygon POLYGON, c_multi_point MULTIPOINT,
+ c_multi_linestring MULTILINESTRING, c_multi_polygon MULTIPOLYGON,
+ c_geometrycollection GEOMETRYCOLLECTION, c_char CHAR(100));
+
+INSERT INTO t1(c_point) VALUES(ST_PointFromText('POINT(10 10)'));
+--source include/print_optional_metadata.inc
+
+RESET MASTER;
+SET GLOBAL binlog_row_metadata = FULL;
+
+# geometry type is binlogged, the real geometry types are printed
+INSERT INTO t1(c_point) VALUES(ST_PointFromText('POINT(10 10)'));
+--source include/print_optional_metadata.inc
+
+DROP TABLE t1;
+RESET MASTER;
+
+--echo #
+--echo # Numeric types can be printed correctly
+--echo #
+CREATE TABLE t1(c_bit BIT(10), c_bool BOOL, c_smallint SMALLINT,
+ c_mediumint MEDIUMINT, c_int INT UNSIGNED, c_bigint BIGINT,
+ c_float FLOAT UNSIGNED, c_double DOUBLE, c_decimal DECIMAL(10, 2));
+
+SET GLOBAL binlog_row_metadata = MINIMAL;
+INSERT INTO t1(c_bool) VALUES(1);
+
+--echo # UNSIGNED flag should be printed
+--source include/print_optional_metadata.inc
+
+RESET MASTER;
+SET GLOBAL binlog_row_metadata = FULL;
+INSERT INTO t1(c_bool) VALUES(1);
+
+--source include/print_optional_metadata.inc
+
+DROP TABLE t1;
+RESET MASTER;
+
+--echo #
+--echo # Character types can be printed correctly
+--echo #
+CREATE TABLE t1(c_char CHAR(10), c_varchar VARCHAR(500),
+ c_tinytext TINYTEXT, c_text TEXT,
+ c_mediumtext MEDIUMTEXT, c_longtext LONGTEXT CHARSET utf8);
+
+SET GLOBAL binlog_row_metadata = MINIMAL;
+INSERT INTO t1(c_char) VALUES("1");
+
+# Charset set is printed with default charset
+--source include/print_optional_metadata.inc
+
+RESET MASTER;
+SET GLOBAL binlog_row_metadata = FULL;
+INSERT INTO t1(c_char) VALUES("1");
+
+--source include/print_optional_metadata.inc
+
+DROP TABLE t1;
+RESET MASTER;
+
+--echo #
+--echo # Column names with non-ascii characters and escape characters can be printed correctly
+--echo #
+set names utf8;
+CREATE TABLE t1(`åäö表\a'``"` INT);
+
+SHOW CREATE TABLE t1;
+
+INSERT INTO t1 VALUES(1);
+--source include/print_optional_metadata.inc
+DROP TABLE t1;
+RESET MASTER;
+
+--echo #
+--echo # Charsets can be printed correctly
+--echo #
+CREATE TABLE t1(c_char_utf8 CHAR(10) CHARSET utf8,
+ c_varchar_utf8 VARCHAR(10) CHARSET utf8,
+ c_text_utf8 TEXT CHARSET utf8);
+
+INSERT INTO t1 VALUES("1", "2", "3");
+
+# Charset set is printed with Default charset
+--source include/print_optional_metadata.inc
+
+DROP TABLE t1;
+RESET MASTER;
+
+# Test collation number less than 250 and collation number greater than 250
+CREATE TABLE t1(c_utf8mb4_520 CHAR(10) CHARSET utf8mb4 COLLATE utf8mb4_unicode_ci,
+ c_utf8mb4_0900 VARCHAR(10) CHARSET utf8mb4 COLLATE utf8mb4_polish_ci,
+ c_utf8mb4_def TEXT CHARSET utf8mb4);
+
+INSERT INTO t1 VALUES("1", "2", "3");
+
+# Charset set is printed without default charset
+--source include/print_optional_metadata.inc
+
+DROP TABLE t1;
+RESET MASTER;
+
+--echo #
+--echo # Blob and binary columns can be printed correctly
+--echo #
+CREATE TABLE t1(c_binary BINARY(10), c_varbinary VARBINARY(10),
+ c_tinyblob TINYBLOB, c_blob BLOB,
+ c_mediumblob MEDIUMBLOB, c_longblob LONGBLOB);
+
+INSERT INTO t1 VALUES("1", "2", "3", "4", "5", "6");
+--source include/print_optional_metadata.inc
+
+DROP TABLE t1;
+RESET MASTER;
+
+--echo #
+--echo # Verify that SET string values and character sets can be printed correctly
+--echo #
+
+set names utf8;
+CREATE TABLE t1(
+ c_set_1 SET("set1_v1_å", "set1_v2_ä", "set1_v3_ö"),
+ c_set_2 SET("set2_v1_å", "set2_v2_ä", "set2_v3_ö") CHARACTER SET latin1,
+ c_set_4 SET("set3_v1_å", "set3_v2_ä", "set3_v3_ö") CHARACTER SET swe7 COLLATE swe7_bin);
+
+SET GLOBAL binlog_row_metadata = MINIMAL;
+INSERT INTO t1 VALUES("set1_v1_å", "set2_v3_ö", "set3_v1_å");
+--source include/print_optional_metadata.inc
+
+RESET MASTER;
+SET GLOBAL binlog_row_metadata = FULL;
+INSERT INTO t1 VALUES("set1_v1_å", "set2_v3_ö", "set3_v1_å");
+--source include/print_optional_metadata.inc
+
+DROP TABLE t1;
+RESET MASTER;
+
+--echo #
+--echo # Verify that ENUM string values and character sets can be printed correctly
+--echo #
+
+CREATE TABLE t1(
+ c_enum_1 ENUM("enum1_v1_å", "enum1_v2_ä", "enum1_v3_ö"),
+ c_enum_3 ENUM("enum2_v1_å", "enum2_v2_ä", "enum2_v3_ö") CHARACTER SET latin1,
+ c_enum_4 ENUM("enum3_v1_å", "enum3_v2_ä", "enum3_v3_ö") CHARACTER SET swe7 COLLATE swe7_bin);
+
+SET GLOBAL binlog_row_metadata = MINIMAL;
+INSERT INTO t1 VALUES("enum1_v1_å", "enum2_v3_ö", "enum3_v1_å");
+--source include/print_optional_metadata.inc
+
+RESET MASTER;
+SET GLOBAL binlog_row_metadata = FULL;
+INSERT INTO t1 VALUES("enum1_v1_å", "enum2_v3_ö", "enum3_v1_å");
+--source include/print_optional_metadata.inc
+
+DROP TABLE t1;
+RESET MASTER;
+
+--echo #
+--echo # Verify that explicit NOT NULL can be printed correctly
+--echo #
+CREATE TABLE t1(c_not_null1 INT NOT NULL, c_null1 INT, c_not_null2 INT NOT NULL,
+ c_null2 INT);
+
+INSERT INTO t1 VALUES(1, 2, 3, 4);
+--source include/print_optional_metadata.inc
+
+DROP TABLE t1;
+RESET MASTER;
+
+--echo #
+--echo # Verify that primary key can be printed correctly
+--echo #
+CREATE TABLE t1(c_key1 INT, c_key3 INT, c_not_key INT, c_key2 INT,
+PRIMARY KEY(c_key1, c_key2, c_key3));
+
+INSERT INTO t1 VALUES(1, 2, 3, 4);
+--let $print_primary_key= 1
+--source include/print_optional_metadata.inc
+
+DROP TABLE t1;
+RESET MASTER;
+
+# Key has prefix
+CREATE TABLE t1(c_key1 CHAR(100), c_key3 CHAR(100), c_not_key INT, c_key2 CHAR(10),
+PRIMARY KEY(c_key1(5), c_key2, c_key3(10)));
+
+INSERT INTO t1 VALUES("1", "2", 3, "4");
+--source include/print_optional_metadata.inc
+
+RESET MASTER;
+# Primary key should not be printed
+SET GLOBAL binlog_row_metadata = MINIMAL;
+
+INSERT INTO t1 VALUES("2", "2", 3, "4");
+--source include/print_optional_metadata.inc
+
+RESET MASTER;
+--echo #
+--echo # Coverage test: Print column index instead of column name if column name
+--echo # is not binlogged.
+--echo #
+SET GLOBAL binlog_row_metadata = FULL;
+
+SET SESSION debug_dbug = 'd, dont_log_column_name';
+INSERT INTO t1 VALUES("3", "2", 3, "4");
+--source include/print_optional_metadata.inc
+
+--let $print_primary_key=
+DROP TABLE t1;
+RESET MASTER;
+
+--echo #
+--echo # Coverage test: Inject an invalid column type
+--echo #
+CREATE TABLE t1(c1 int, c2 BLOB);
+
+SET SESSION debug_dbug = 'd,inject_invalid_column_type';
+INSERT INTO t1 VALUES(1, "a");
+# It prints an error
+--source include/print_optional_metadata.inc
+
+RESET MASTER;
+
+--echo #
+--echo # Coverage test: Inject an invalid BLOB metadata
+--echo #
+--let $start_pos= query_get_value(SHOW MASTER STATUS, Position, 1)
+
+SET SESSION debug_dbug = 'd,inject_invalid_blob_size';
+INSERT INTO t1 VALUES(2, "b");
+
+# The invalid metadata will case assertion failure on Write_rows_log_event
+# So we need to stop mysqlbinlog before reading Write_rows_log_event.
+--let $stop_position= query_get_value(SHOW BINLOG EVENTS FROM $start_pos LIMIT 3, End_log_pos, 3)
+--source include/print_optional_metadata.inc
+
+--echo #
+--echo # Coverage test: Inject an invalid Geometry type
+--echo #
+DROP TABLE t1;
+CREATE TABLE t1(c_geometry GEOMETRY, c_point POINT, c_multilinestring MULTILINESTRING);
+RESET MASTER;
+--let $start_pos= query_get_value(SHOW MASTER STATUS, Position, 1)
+
+SET SESSION debug_dbug = 'd,inject_invalid_geometry_type';
+INSERT INTO t1(c_point) VALUES(ST_PointFromText('POINT(10 10)'));
+
+# The invalid metadata will case assertion failure on Write_rows_log_event
+# So we need to stop mysqlbinlog before reading Write_rows_log_event.
+--let $stop_position= query_get_value(SHOW BINLOG EVENTS FROM $start_pos LIMIT 3, End_log_pos, 3)
+--source include/print_optional_metadata.inc
+
+DROP TABLE t1;
+RESET MASTER;
+--echo #
+--echo # Comptibility Test: Verify mysqlbinlog can print OLD table_map_log_event
+--echo # without any optional metadata
+--echo #
+CREATE TABLE t1(c_int INT, c_tiny_int_unsigned TINYINT UNSIGNED,
+ c_binary BINARY(10), c_text TEXT, c_point POINT);
+
+SET session debug_dbug='d,simulate_no_optional_metadata';
+INSERT INTO t1(c_int) VALUES(1);
+# TINYINT will be printed without UNSIGNED flag,
+# CHAR will be printed as BINARY(10)
+# POINT will be printed as GEOMETRY
+--let $stop_position=
+--source include/print_optional_metadata.inc
+
+DROP TABLE t1;
+RESET MASTER;
+--echo #
+--echo # Simulate error on initializing charset and primary key metadata
+--echo #
+CREATE TABLE t1(c1 char(10) PRIMARY KEY);
+
+SET session debug_dbug='d,simulate_init_charset_field_error';
+INSERT INTO t1 VALUES("a");
+
+SET GLOBAL binlog_row_metadata = FULL;
+SET session debug_dbug='d,simulate_init_primary_key_field_error';
+INSERT INTO t1 VALUES("b");
+
+--let $print_primary_key= 1
+--source include/print_optional_metadata.inc
+
+SET SESSION debug_dbug = '';
+SET GLOBAL binlog_row_metadata = NO_LOG;
+DROP TABLE t1;
+RESET MASTER;
diff --git a/mysql-test/suite/binlog/t/binlog_table_map_optional_metadata_binary.test b/mysql-test/suite/binlog/t/binlog_table_map_optional_metadata_binary.test
new file mode 100644
index 00000000000..5997cfd5d27
--- /dev/null
+++ b/mysql-test/suite/binlog/t/binlog_table_map_optional_metadata_binary.test
@@ -0,0 +1,73 @@
+################################################################################
+# WL#4618 RBR: extended table metadata in the binary log
+#
+# Below metadata is logged into Table_map_log_event
+# - signedness of numeric columns
+# - charsets of character columns
+# - column names
+# - set/enum character sets and string values
+# - primary key
+#
+# The first two are always logged. The others are controlled by system
+# variable --binlog-row-metadata
+#
+# The test will verify if the metadata can be logged and printed by mysqlbinlog
+# correctly.
+# mysqlbinlog --print-table-metadata will print the extra metadata
+################################################################################
+--source include/have_debug.inc
+--source include/have_binlog_format_row.inc
+--let $MYSQLD_DATADIR= `select @@datadir`
+--let $binlog_file= $MYSQLD_DATADIR/master-bin.000001
+
+--echo #
+--echo # Verify that SET string values and character sets can be printed correctly
+--echo #
+
+SET NAMES utf8;
+CREATE TABLE t1(
+ c_set_1 SET("set1_v1_å", "set1_v2_ä", "set1_v3_ö"),
+ c_set_2 SET("set2_v1_å", "set2_v2_ä", "set2_v3_ö") CHARACTER SET binary);
+
+SET GLOBAL binlog_row_metadata = MINIMAL;
+INSERT INTO t1 VALUES("set1_v1_å", "set2_v2_ä");
+--source include/print_optional_metadata.inc
+
+RESET MASTER;
+SET GLOBAL binlog_row_metadata = FULL;
+INSERT INTO t1 VALUES("set1_v1_å", "set2_v2_ä");
+--source include/print_optional_metadata.inc
+INSERT INTO t1 VALUES("set1_v3_ö", "set2_v3_ö");
+INSERT INTO t1 VALUES("set1_v1_Ã¥", "set2_v1_Ã¥");
+SELECT c_set_1, HEX(c_set_1) FROM t1;
+SELECT c_set_2, HEX(c_set_2) FROM t1;
+
+DROP TABLE t1;
+RESET MASTER;
+
+--echo #
+--echo # Verify that ENUM string values and character sets can be printed correctly
+--echo #
+
+CREATE TABLE t1(
+ c_enum_1 ENUM("enum1_v1_å", "enum1_v2_ä", "enum1_v3_ö"),
+ c_enum_2 ENUM("enum2_v1_å", "enum2_v2_ä", "enum2_v3_ö") CHARACTER SET binary);
+
+SET GLOBAL binlog_row_metadata = MINIMAL;
+INSERT INTO t1 VALUES("enum1_v1_å", "enum2_v2_ä");
+--source include/print_optional_metadata.inc
+
+
+RESET MASTER;
+SET GLOBAL binlog_row_metadata = FULL;
+INSERT INTO t1 VALUES("enum1_v1_å", "enum2_v2_ä");
+--source include/print_optional_metadata.inc
+INSERT INTO t1 VALUES("enum1_v3_ö", "enum2_v3_ö");
+INSERT INTO t1 VALUES("enum1_v1_Ã¥", "enum2_v1_Ã¥");
+SELECT c_enum_1, HEX(c_enum_1) FROM t1;
+SELECT c_enum_2, HEX(c_enum_2) FROM t1;
+
+DROP TABLE t1;
+RESET MASTER;
+
+SET GLOBAL binlog_row_metadata = NO_LOG;
diff --git a/mysql-test/suite/binlog/t/binlog_table_map_optional_metadata_ucs2.test b/mysql-test/suite/binlog/t/binlog_table_map_optional_metadata_ucs2.test
new file mode 100644
index 00000000000..1e218acdfea
--- /dev/null
+++ b/mysql-test/suite/binlog/t/binlog_table_map_optional_metadata_ucs2.test
@@ -0,0 +1,74 @@
+################################################################################
+# WL#4618 RBR: extended table metadata in the binary log
+#
+# Below metadata is logged into Table_map_log_event
+# - signedness of numeric columns
+# - charsets of character columns
+# - column names
+# - set/enum character sets and string values
+# - primary key
+#
+# The first two are always logged. The others are controlled by system
+# variable --binlog-row-metadata
+#
+# The test will verify if the metadata can be logged and printed by mysqlbinlog
+# correctly.
+# mysqlbinlog --print-table-metadata will print the extra metadata
+################################################################################
+--source include/have_debug.inc
+--source include/have_binlog_format_row.inc
+--source include/have_ucs2.inc
+--let $MYSQLD_DATADIR= `select @@datadir`
+--let $binlog_file= $MYSQLD_DATADIR/master-bin.000001
+
+--echo #
+--echo # Verify that SET string values and character sets can be printed correctly
+--echo #
+
+SET NAMES utf8;
+CREATE TABLE t1(
+ c_set_1 SET("set1_v1_å", "set1_v2_ä", "set1_v3_ö"),
+ c_set_2 SET("set2_v1_å", "set2_v2_ä", "set2_v3_ö") CHARACTER SET ucs2);
+
+SET GLOBAL binlog_row_metadata = MINIMAL;
+INSERT INTO t1 VALUES("set1_v1_å", "set2_v2_ä");
+--source include/print_optional_metadata.inc
+
+RESET MASTER;
+SET GLOBAL binlog_row_metadata = FULL;
+INSERT INTO t1 VALUES("set1_v1_å", "set2_v2_ä");
+--source include/print_optional_metadata.inc
+INSERT INTO t1 VALUES("set1_v3_ö", "set2_v3_ö");
+INSERT INTO t1 VALUES("set1_v1_Ã¥", "set2_v1_Ã¥");
+SELECT c_set_1, HEX(c_set_1) FROM t1;
+SELECT c_set_2, HEX(c_set_2) FROM t1;
+
+DROP TABLE t1;
+RESET MASTER;
+
+--echo #
+--echo # Verify that ENUM string values and character sets can be printed correctly
+--echo #
+
+CREATE TABLE t1(
+ c_enum_1 ENUM("enum1_v1_å", "enum1_v2_ä", "enum1_v3_ö"),
+ c_enum_2 ENUM("enum2_v1_å", "enum2_v2_ä", "enum2_v3_ö") CHARACTER SET ucs2);
+
+SET GLOBAL binlog_row_metadata = MINIMAL;
+INSERT INTO t1 VALUES("enum1_v1_å", "enum2_v2_ä");
+--source include/print_optional_metadata.inc
+
+
+RESET MASTER;
+SET GLOBAL binlog_row_metadata = FULL;
+INSERT INTO t1 VALUES("enum1_v1_å", "enum2_v2_ä");
+--source include/print_optional_metadata.inc
+INSERT INTO t1 VALUES("enum1_v3_ö", "enum2_v3_ö");
+INSERT INTO t1 VALUES("enum1_v1_Ã¥", "enum2_v1_Ã¥");
+SELECT c_enum_1, HEX(c_enum_1) FROM t1;
+SELECT c_enum_2, HEX(c_enum_2) FROM t1;
+
+DROP TABLE t1;
+RESET MASTER;
+
+SET GLOBAL binlog_row_metadata = NO_LOG;
diff --git a/mysql-test/suite/binlog/t/binlog_table_map_optional_metadata_utf32.test b/mysql-test/suite/binlog/t/binlog_table_map_optional_metadata_utf32.test
new file mode 100644
index 00000000000..c1d449abf2f
--- /dev/null
+++ b/mysql-test/suite/binlog/t/binlog_table_map_optional_metadata_utf32.test
@@ -0,0 +1,74 @@
+################################################################################
+# WL#4618 RBR: extended table metadata in the binary log
+#
+# Below metadata is logged into Table_map_log_event
+# - signedness of numeric columns
+# - charsets of character columns
+# - column names
+# - set/enum character sets and string values
+# - primary key
+#
+# The first two are always logged. The others are controlled by system
+# variable --binlog-row-metadata
+#
+# The test will verify if the metadata can be logged and printed by mysqlbinlog
+# correctly.
+# mysqlbinlog --print-table-metadata will print the extra metadata
+################################################################################
+--source include/have_debug.inc
+--source include/have_binlog_format_row.inc
+--source include/have_utf32.inc
+--let $MYSQLD_DATADIR= `select @@datadir`
+--let $binlog_file= $MYSQLD_DATADIR/master-bin.000001
+
+--echo #
+--echo # Verify that SET string values and character sets can be printed correctly
+--echo #
+
+SET NAMES utf8;
+CREATE TABLE t1(
+ c_set_1 SET("set1_v1_å", "set1_v2_ä", "set1_v3_ö"),
+ c_set_2 SET("set2_v1_å", "set2_v2_ä", "set2_v3_ö") CHARACTER SET utf32);
+
+SET GLOBAL binlog_row_metadata = MINIMAL;
+INSERT INTO t1 VALUES("set1_v1_å", "set2_v2_ä");
+--source include/print_optional_metadata.inc
+
+RESET MASTER;
+SET GLOBAL binlog_row_metadata = FULL;
+INSERT INTO t1 VALUES("set1_v1_å", "set2_v2_ä");
+--source include/print_optional_metadata.inc
+INSERT INTO t1 VALUES("set1_v3_ö", "set2_v3_ö");
+INSERT INTO t1 VALUES("set1_v1_Ã¥", "set2_v1_Ã¥");
+SELECT c_set_1, HEX(c_set_1) FROM t1;
+SELECT c_set_2, HEX(c_set_2) FROM t1;
+
+DROP TABLE t1;
+RESET MASTER;
+
+--echo #
+--echo # Verify that ENUM string values and character sets can be printed correctly
+--echo #
+
+CREATE TABLE t1(
+ c_enum_1 ENUM("enum1_v1_å", "enum1_v2_ä", "enum1_v3_ö"),
+ c_enum_2 ENUM("enum2_v1_å", "enum2_v2_ä", "enum2_v3_ö") CHARACTER SET utf32);
+
+SET GLOBAL binlog_row_metadata = MINIMAL;
+INSERT INTO t1 VALUES("enum1_v1_å", "enum2_v2_ä");
+--source include/print_optional_metadata.inc
+
+
+RESET MASTER;
+SET GLOBAL binlog_row_metadata = FULL;
+INSERT INTO t1 VALUES("enum1_v1_å", "enum2_v2_ä");
+--source include/print_optional_metadata.inc
+INSERT INTO t1 VALUES("enum1_v3_ö", "enum2_v3_ö");
+INSERT INTO t1 VALUES("enum1_v1_Ã¥", "enum2_v1_Ã¥");
+SELECT c_enum_1, HEX(c_enum_1) FROM t1;
+SELECT c_enum_2, HEX(c_enum_2) FROM t1;
+
+DROP TABLE t1;
+RESET MASTER;
+
+SET GLOBAL binlog_row_metadata = NO_LOG;
diff --git a/mysql-test/suite/sys_vars/r/binlog_row_metadata_basic.result b/mysql-test/suite/sys_vars/r/binlog_row_metadata_basic.result
new file mode 100644
index 00000000000..09287cff7fd
--- /dev/null
+++ b/mysql-test/suite/sys_vars/r/binlog_row_metadata_basic.result
@@ -0,0 +1,90 @@
+NO_LOG Expected
+SELECT @@GLOBAL.binlog_row_metadata;
+@@GLOBAL.binlog_row_metadata
+NO_LOG
+SELECT @@SESSION.binlog_row_metadata;
+ERROR HY000: Variable 'binlog_row_metadata' is a GLOBAL variable
+'#---------------------BS_STVARS_002_01----------------------#'
+SET @start_value= @@global.binlog_row_metadata;
+SELECT COUNT(@@GLOBAL.binlog_row_metadata);
+COUNT(@@GLOBAL.binlog_row_metadata)
+1
+1 Expected
+'#---------------------BS_STVARS_002_02----------------------#'
+SET @@GLOBAL.binlog_row_metadata=0;
+SELECT @@GLOBAL.binlog_row_metadata;
+@@GLOBAL.binlog_row_metadata
+NO_LOG
+NO_LOG Expected
+SET @@GLOBAL.binlog_row_metadata=1;
+SELECT @@GLOBAL.binlog_row_metadata;
+@@GLOBAL.binlog_row_metadata
+MINIMAL
+MINIMAL Expected
+SET @@GLOBAL.binlog_row_metadata=2;
+SELECT @@GLOBAL.binlog_row_metadata;
+@@GLOBAL.binlog_row_metadata
+FULL
+FULL Expected
+SET @@GLOBAL.binlog_row_metadata=NO_LOG;
+SELECT @@GLOBAL.binlog_row_metadata;
+@@GLOBAL.binlog_row_metadata
+NO_LOG
+NO_LOG Expected
+SET @@GLOBAL.binlog_row_metadata=MINIMAL;
+SELECT @@GLOBAL.binlog_row_metadata;
+@@GLOBAL.binlog_row_metadata
+MINIMAL
+MINIMAL Expected
+SET @@GLOBAL.binlog_row_metadata=FULL;
+SELECT @@GLOBAL.binlog_row_metadata;
+@@GLOBAL.binlog_row_metadata
+FULL
+FULL Expected
+SET @@GLOBAL.binlog_row_metadata='NO_LOG';
+SELECT @@GLOBAL.binlog_row_metadata;
+@@GLOBAL.binlog_row_metadata
+NO_LOG
+NO_LOG Expected
+SET @@GLOBAL.binlog_row_metadata='MINIMAL';
+SELECT @@GLOBAL.binlog_row_metadata;
+@@GLOBAL.binlog_row_metadata
+MINIMAL
+MINIMAL Expected
+SET @@GLOBAL.binlog_row_metadata='FULL';
+SELECT @@GLOBAL.binlog_row_metadata;
+@@GLOBAL.binlog_row_metadata
+FULL
+FULL Expected
+'#---------------------BS_STVARS_002_03----------------------#'
+SET @@GLOBAL.binlog_row_metadata='MINIMAl';
+SELECT *
+FROM information_schema.global_variables
+WHERE VARIABLE_NAME='binlog_row_metadata';
+VARIABLE_NAME VARIABLE_VALUE
+BINLOG_ROW_METADATA MINIMAL
+'#---------------------BS_STVARS_002_04----------------------#'
+SELECT *
+FROM information_schema.session_variables
+WHERE VARIABLE_NAME LIKE 'binlog_row_metadata';
+VARIABLE_NAME VARIABLE_VALUE
+BINLOG_ROW_METADATA MINIMAL
+'#---------------------BS_STVARS_002_05----------------------#'
+SELECT COUNT(@@binlog_row_metadata);
+COUNT(@@binlog_row_metadata)
+1
+1 Expected
+SELECT COUNT(@@GLOBAL.binlog_row_metadata);
+COUNT(@@GLOBAL.binlog_row_metadata)
+1
+1 Expected
+'#---------------------BS_STVARS_002_06----------------------#'
+SET GLOBAL binlog_row_metadata = full1;
+ERROR 42000: Variable 'binlog_row_metadata' can't be set to the value of 'full1'
+SET GLOBAL binlog_row_metadata = "full1";
+ERROR 42000: Variable 'binlog_row_metadata' can't be set to the value of 'full1'
+SET GLOBAL binlog_row_metadata = 3;
+ERROR 42000: Variable 'binlog_row_metadata' can't be set to the value of '3'
+SET GLOBAL binlog_row_metadata = -1;
+ERROR 42000: Variable 'binlog_row_metadata' can't be set to the value of '-1'
+SET @@global.binlog_row_metadata= @start_value;
diff --git a/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result b/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result
index f85f3cf92f5..08b803b379b 100644
--- a/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result
+++ b/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result
@@ -423,6 +423,20 @@ NUMERIC_BLOCK_SIZE NULL
ENUM_VALUE_LIST MINIMAL,NOBLOB,FULL
READ_ONLY NO
COMMAND_LINE_ARGUMENT REQUIRED
+VARIABLE_NAME BINLOG_ROW_METADATA
+SESSION_VALUE NULL
+GLOBAL_VALUE NO_LOG
+GLOBAL_VALUE_ORIGIN COMPILE-TIME
+DEFAULT_VALUE NO_LOG
+VARIABLE_SCOPE GLOBAL
+VARIABLE_TYPE ENUM
+VARIABLE_COMMENT Controls whether metadata is logged using FULL , MINIMAL format and NO_LOG.FULL causes all metadata to be logged; MINIMAL means that only metadata actually required by slave is logged; NO_LOG NO metadata will be logged.Default: NO_LOG.
+NUMERIC_MIN_VALUE NULL
+NUMERIC_MAX_VALUE NULL
+NUMERIC_BLOCK_SIZE NULL
+ENUM_VALUE_LIST NO_LOG,MINIMAL,FULL
+READ_ONLY NO
+COMMAND_LINE_ARGUMENT REQUIRED
VARIABLE_NAME BINLOG_STMT_CACHE_SIZE
VARIABLE_SCOPE GLOBAL
VARIABLE_TYPE BIGINT UNSIGNED
diff --git a/mysql-test/suite/sys_vars/t/binlog_row_metadata_basic.test b/mysql-test/suite/sys_vars/t/binlog_row_metadata_basic.test
new file mode 100644
index 00000000000..d08acd5a348
--- /dev/null
+++ b/mysql-test/suite/sys_vars/t/binlog_row_metadata_basic.test
@@ -0,0 +1,121 @@
+################## mysql-test\t\binlog_row_metadata_basic.test ################
+# #
+# Variable Name: binlog_row_metadata #
+# Scope: Global #
+# Data Type: enumeration #
+# #
+# Creation Date: 2017-01-23 #
+# Author : Libing Song #
+# #
+# #
+# Description:Test Cases of Dynamic System Variable binlog_row_metadata #
+# that checks the behavior of this variable in the following ways #
+# * Value Check #
+# * Scope Check #
+# #
+# Reference: #
+# http://dev.mysql.com/doc/refman/8.X/en/server-system-variables.html #
+# #
+###############################################################################
+
+
+--echo NO_LOG Expected
+SELECT @@GLOBAL.binlog_row_metadata;
+--error ER_INCORRECT_GLOBAL_LOCAL_VAR
+SELECT @@SESSION.binlog_row_metadata;
+
+--echo '#---------------------BS_STVARS_002_01----------------------#'
+####################################################################
+# Displaying default value #
+####################################################################
+SET @start_value= @@global.binlog_row_metadata;
+
+SELECT COUNT(@@GLOBAL.binlog_row_metadata);
+--echo 1 Expected
+
+--echo '#---------------------BS_STVARS_002_02----------------------#'
+####################################################################
+# Check if Value can set #
+####################################################################
+SET @@GLOBAL.binlog_row_metadata=0;
+SELECT @@GLOBAL.binlog_row_metadata;
+--echo NO_LOG Expected
+
+SET @@GLOBAL.binlog_row_metadata=1;
+SELECT @@GLOBAL.binlog_row_metadata;
+--echo MINIMAL Expected
+
+SET @@GLOBAL.binlog_row_metadata=2;
+SELECT @@GLOBAL.binlog_row_metadata;
+--echo FULL Expected
+
+SET @@GLOBAL.binlog_row_metadata=NO_LOG;
+SELECT @@GLOBAL.binlog_row_metadata;
+--echo NO_LOG Expected
+
+SET @@GLOBAL.binlog_row_metadata=MINIMAL;
+SELECT @@GLOBAL.binlog_row_metadata;
+--echo MINIMAL Expected
+
+SET @@GLOBAL.binlog_row_metadata=FULL;
+SELECT @@GLOBAL.binlog_row_metadata;
+--echo FULL Expected
+
+SET @@GLOBAL.binlog_row_metadata='NO_LOG';
+SELECT @@GLOBAL.binlog_row_metadata;
+--echo NO_LOG Expected
+
+SET @@GLOBAL.binlog_row_metadata='MINIMAL';
+SELECT @@GLOBAL.binlog_row_metadata;
+--echo MINIMAL Expected
+
+SET @@GLOBAL.binlog_row_metadata='FULL';
+SELECT @@GLOBAL.binlog_row_metadata;
+--echo FULL Expected
+
+--echo '#---------------------BS_STVARS_002_03----------------------#'
+#################################################################
+# Check if the value in GLOBAL Table matches value in variable #
+#################################################################
+
+SET @@GLOBAL.binlog_row_metadata='MINIMAl';
+SELECT *
+FROM information_schema.global_variables
+WHERE VARIABLE_NAME='binlog_row_metadata';
+
+--echo '#---------------------BS_STVARS_002_04----------------------#'
+#################################################################
+# Check if the value in SESSION Table matches value in variable #
+#################################################################
+
+SELECT *
+FROM information_schema.session_variables
+WHERE VARIABLE_NAME LIKE 'binlog_row_metadata';
+
+--echo '#---------------------BS_STVARS_002_05----------------------#'
+################################################################################
+# Check if binlog_row_metadata can be accessed with and without @@ sign #
+################################################################################
+
+SELECT COUNT(@@binlog_row_metadata);
+--echo 1 Expected
+SELECT COUNT(@@GLOBAL.binlog_row_metadata);
+--echo 1 Expected
+
+--echo '#---------------------BS_STVARS_002_06----------------------#'
+################################################################################
+# Check if binlog_row_metadata can handle invalid values correctly #
+################################################################################
+--error ER_WRONG_VALUE_FOR_VAR
+SET GLOBAL binlog_row_metadata = full1;
+
+--error ER_WRONG_VALUE_FOR_VAR
+SET GLOBAL binlog_row_metadata = "full1";
+
+--error ER_WRONG_VALUE_FOR_VAR
+SET GLOBAL binlog_row_metadata = 3;
+
+--error ER_WRONG_VALUE_FOR_VAR
+SET GLOBAL binlog_row_metadata = -1;
+
+SET @@global.binlog_row_metadata= @start_value;
diff --git a/sql/field.cc b/sql/field.cc
index 1ab02f61e46..2ec164c11de 100644
--- a/sql/field.cc
+++ b/sql/field.cc
@@ -3493,11 +3493,12 @@ void Field_new_decimal::sql_type(String &str) const
@returns number of bytes written to metadata_ptr
*/
-int Field_new_decimal::save_field_metadata(uchar *metadata_ptr)
+
+Binlog_type_info Field_new_decimal::binlog_type_info() const
{
- *metadata_ptr= precision;
- *(metadata_ptr + 1)= decimals();
- return 2;
+ DBUG_ASSERT(Field_new_decimal::type() == binlog_type());
+ return Binlog_type_info(Field_new_decimal::type(), precision +
+ (decimals() << 8), 2, binlog_signess());
}
@@ -4665,10 +4666,11 @@ bool Field_float::send_binary(Protocol *protocol)
@returns number of bytes written to metadata_ptr
*/
-int Field_float::save_field_metadata(uchar *metadata_ptr)
+Binlog_type_info Field_float::binlog_type_info() const
{
- *metadata_ptr= pack_length();
- return 1;
+ DBUG_ASSERT(Field_float::type() == binlog_type());
+ return Binlog_type_info(Field_float::type(), pack_length(), 1,
+ binlog_signess());
}
@@ -4976,10 +4978,11 @@ void Field_double::sort_string(uchar *to,uint length __attribute__((unused)))
@returns number of bytes written to metadata_ptr
*/
-int Field_double::save_field_metadata(uchar *metadata_ptr)
+Binlog_type_info Field_double::binlog_type_info() const
{
- *metadata_ptr= pack_length();
- return 1;
+ DBUG_ASSERT(Field_double::type() == binlog_type());
+ return Binlog_type_info(Field_double::type(), pack_length(), 1,
+ binlog_signess());
}
@@ -5627,6 +5630,11 @@ bool Field_timestampf::val_native(Native *to)
return Field::val_native(to);
}
+Binlog_type_info Field_timestampf::binlog_type_info() const
+{
+ return Binlog_type_info(Field_timestampf::binlog_type(), decimals(), 1);
+}
+
/*************************************************************/
sql_mode_t Field_temporal::can_handle_sql_mode_dependency_on_store() const
@@ -6244,6 +6252,10 @@ bool Field_timef::get_date(MYSQL_TIME *ltime, date_mode_t fuzzydate)
TIME_from_longlong_time_packed(ltime, tmp);
return false;
}
+Binlog_type_info Field_timef::binlog_type_info() const
+{
+ return Binlog_type_info(Field_timef::binlog_type(), decimals(), 1);
+}
/****************************************************************************
** year type
@@ -6955,6 +6967,10 @@ bool Field_datetimef::get_TIME(MYSQL_TIME *ltime, const uchar *pos,
TIME_from_longlong_datetime_packed(ltime, tmp);
return validate_MMDD(tmp, ltime->month, ltime->day, fuzzydate);
}
+Binlog_type_info Field_datetimef::binlog_type_info() const
+{
+ return Binlog_type_info(Field_datetimef::binlog_type(), decimals(), 1);
+}
/****************************************************************************
** string type
@@ -7536,15 +7552,16 @@ Field_string::unpack(uchar *to, const uchar *from, const uchar *from_end,
@returns number of bytes written to metadata_ptr
*/
-int Field_string::save_field_metadata(uchar *metadata_ptr)
+Binlog_type_info Field_string::binlog_type_info() const
{
+ uint16 a;
DBUG_ASSERT(field_length < 1024);
DBUG_ASSERT((real_type() & 0xF0) == 0xF0);
DBUG_PRINT("debug", ("field_length: %u, real_type: %u",
- field_length, real_type()));
- *metadata_ptr= (real_type() ^ ((field_length & 0x300) >> 4));
- *(metadata_ptr + 1)= field_length & 0xFF;
- return 2;
+ field_length, real_type()));
+ a= (real_type() ^ ((field_length & 0x300) >> 4)) + (((uint)(field_length & 0xFF)) << 8);
+ DBUG_ASSERT(Field_string::type() == binlog_type());
+ return Binlog_type_info(Field_string::type(), a, 2, charset());
}
@@ -7633,11 +7650,10 @@ const uint Field_varstring::MAX_SIZE= UINT_MAX16;
@returns number of bytes written to metadata_ptr
*/
-int Field_varstring::save_field_metadata(uchar *metadata_ptr)
+Binlog_type_info Field_varstring::binlog_type_info() const
{
- DBUG_ASSERT(field_length <= 65535);
- int2store((char*)metadata_ptr, field_length);
- return 2;
+ DBUG_ASSERT(Field_varstring::type() == binlog_type());
+ return Binlog_type_info(Field_varstring::type(), field_length, 2, charset());
}
@@ -8270,6 +8286,11 @@ int Field_varstring_compressed::cmp_max(const uchar *a_ptr, const uchar *b_ptr,
return sortcmp(&a, &b, field_charset());
}
+Binlog_type_info Field_varstring_compressed::binlog_type_info() const
+{
+ return Binlog_type_info(Field_varstring_compressed::binlog_type(),
+ field_length, 2, charset());
+}
/****************************************************************************
@@ -8614,12 +8635,11 @@ Field *Field_blob::new_key_field(MEM_ROOT *root, TABLE *new_table,
@returns number of bytes written to metadata_ptr
*/
-int Field_blob::save_field_metadata(uchar *metadata_ptr)
+Binlog_type_info Field_blob::binlog_type_info() const
{
- DBUG_ENTER("Field_blob::save_field_metadata");
- *metadata_ptr= pack_length_no_ptr();
- DBUG_PRINT("debug", ("metadata: %u (pack_length_no_ptr)", *metadata_ptr));
- DBUG_RETURN(1);
+ DBUG_ASSERT(Field_blob::type() == binlog_type());
+ return Binlog_type_info(Field_blob::type(), pack_length_no_ptr(), 1,
+ charset());
}
@@ -8874,6 +8894,11 @@ longlong Field_blob_compressed::val_int(void)
buf.ptr(), buf.length()).result();
}
+Binlog_type_info Field_blob_compressed::binlog_type_info() const
+{
+ return Binlog_type_info(Field_blob_compressed::binlog_type(),
+ pack_length_no_ptr(), 1, charset());
+}
/****************************************************************************
** enum type.
@@ -9008,11 +9033,11 @@ longlong Field_enum::val_int(const uchar *real_ptr) const
@returns number of bytes written to metadata_ptr
*/
-int Field_enum::save_field_metadata(uchar *metadata_ptr)
+Binlog_type_info Field_enum::binlog_type_info() const
{
- *metadata_ptr= real_type();
- *(metadata_ptr + 1)= pack_length();
- return 2;
+ DBUG_ASSERT(Field_enum::type() == binlog_type());
+ return Binlog_type_info(Field_enum::type(), real_type() + (pack_length() << 8),
+ 2, charset(), (TYPELIB *)get_typelib(), NULL);
}
@@ -9215,6 +9240,13 @@ void Field_set::sql_type(String &res) const
res.append(')');
}
+Binlog_type_info Field_set::binlog_type_info() const
+{
+ DBUG_ASSERT(Field_set::type() == binlog_type());
+ return Binlog_type_info(Field_set::type(), real_type()
+ + (pack_length() << 8), 2, charset(), NULL, (TYPELIB *)get_typelib());
+}
+
/**
@retval
1 if the fields are equally defined
@@ -9743,33 +9775,6 @@ uint Field_bit::get_key_image(uchar *buff, uint length, imagetype type_arg)
/**
- Save the field metadata for bit fields.
-
- Saves the bit length in the first byte and bytes in record in the
- second byte of the field metadata array at index of *metadata_ptr and
- *(metadata_ptr + 1).
-
- @param metadata_ptr First byte of field metadata
-
- @returns number of bytes written to metadata_ptr
-*/
-int Field_bit::save_field_metadata(uchar *metadata_ptr)
-{
- DBUG_ENTER("Field_bit::save_field_metadata");
- DBUG_PRINT("debug", ("bit_len: %d, bytes_in_rec: %d",
- bit_len, bytes_in_rec));
- /*
- Since this class and Field_bit_as_char have different ideas of
- what should be stored here, we compute the values of the metadata
- explicitly using the field_length.
- */
- metadata_ptr[0]= field_length % 8;
- metadata_ptr[1]= field_length / 8;
- DBUG_RETURN(2);
-}
-
-
-/**
Returns the number of bytes field uses in row-based replication
row packed size.
diff --git a/sql/field.h b/sql/field.h
index bc413865e7e..81057594efe 100644
--- a/sql/field.h
+++ b/sql/field.h
@@ -634,6 +634,91 @@ public:
inline void print(String*);
};
+class Binlog_type_info
+{
+public:
+ enum binlog_signess_t
+ {
+ SIGNED,
+ UNSIGNED,
+ SIGNESS_NOT_RELEVANT // for non-numeric types
+ };
+ uchar m_type_code; // according to Field::binlog_type()
+ /**
+ Retrieve the field metadata for fields.
+ */
+ uint16 m_metadata;
+ uint8 m_metadata_size;
+ binlog_signess_t m_signess;
+ CHARSET_INFO *m_cs; // NULL if not relevant
+ TYPELIB *m_enum_typelib; // NULL if not relevant
+ TYPELIB *m_set_typelib; // NULL if not relevant
+ uchar m_geom_type; // Non-geometry fields can return 0
+ Binlog_type_info(uchar type_code,
+ uint16 metadata,
+ uint8 metadata_size)
+ :m_type_code(type_code),
+ m_metadata(metadata),
+ m_metadata_size(metadata_size),
+ m_signess(SIGNESS_NOT_RELEVANT),
+ m_cs(NULL),
+ m_enum_typelib(NULL),
+ m_set_typelib(NULL),
+ m_geom_type(0)
+ {};
+ Binlog_type_info(uchar type_code, uint16 metadata,
+ uint8 metadata_size,
+ binlog_signess_t signess)
+ :m_type_code(type_code),
+ m_metadata(metadata),
+ m_metadata_size(metadata_size),
+ m_signess(signess),
+ m_cs(NULL),
+ m_enum_typelib(NULL),
+ m_set_typelib(NULL),
+ m_geom_type(0)
+ {};
+ Binlog_type_info(uchar type_code, uint16 metadata,
+ uint8 metadata_size,
+ CHARSET_INFO *cs)
+ :m_type_code(type_code),
+ m_metadata(metadata),
+ m_metadata_size(metadata_size),
+ m_signess(SIGNESS_NOT_RELEVANT),
+ m_cs(cs),
+ m_enum_typelib(NULL),
+ m_set_typelib(NULL),
+ m_geom_type(0)
+ {};
+ Binlog_type_info(uchar type_code, uint16 metadata,
+ uint8 metadata_size,
+ CHARSET_INFO *cs,
+ TYPELIB *t_enum, TYPELIB *t_set)
+ :m_type_code(type_code),
+ m_metadata(metadata),
+ m_metadata_size(metadata_size),
+ m_signess(SIGNESS_NOT_RELEVANT),
+ m_cs(cs),
+ m_enum_typelib(t_enum),
+ m_set_typelib(t_set),
+ m_geom_type(0)
+ {};
+ Binlog_type_info(uchar type_code, uint16 metadata,
+ uint8 metadata_size, CHARSET_INFO *cs,
+ uchar geom_type)
+ :m_type_code(type_code),
+ m_metadata(metadata),
+ m_metadata_size(metadata_size),
+ m_signess(SIGNESS_NOT_RELEVANT),
+ m_cs(cs),
+ m_enum_typelib(NULL),
+ m_set_typelib(NULL),
+ m_geom_type(geom_type)
+ {};
+ static void *operator new(size_t size, MEM_ROOT *mem_root) throw ()
+ { return alloc_root(mem_root, size); }
+};
+
class Field: public Value_source
{
Field(const Item &); /* Prevent use of these */
@@ -953,21 +1038,6 @@ public:
}
virtual uint row_pack_length() const { return 0; }
-
- /**
- Retrieve the field metadata for fields.
-
- This default implementation returns 0 and saves 0 in the first_byte value.
-
- @param first_byte First byte of field metadata
-
- @returns 0 no bytes written.
- */
-
- virtual int save_field_metadata(uchar *first_byte)
- { return 0; }
-
-
/*
data_length() return the "real size" of the data in memory.
*/
@@ -1110,6 +1180,11 @@ public:
*/
return type();
}
+ virtual Binlog_type_info binlog_type_info() const
+ {
+ DBUG_ASSERT(Field::type() == binlog_type());
+ return Binlog_type_info(Field::type(), 0, 0);
+ }
virtual en_fieldtype tmp_engine_column_type(bool use_packed_rows) const
{
return FIELD_NORMAL;
@@ -1820,6 +1895,11 @@ protected:
void prepend_zeros(String *value) const;
Item *get_equal_zerofill_const_item(THD *thd, const Context &ctx,
Item *const_item);
+ Binlog_type_info::binlog_signess_t binlog_signess() const
+ {
+ return (flags & UNSIGNED_FLAG) ? Binlog_type_info::UNSIGNED :
+ Binlog_type_info::SIGNED;
+ }
public:
const uint8 dec;
bool zerofill,unsigned_flag; // Purify cannot handle bit fields
@@ -1874,6 +1954,11 @@ public:
SEL_ARG *get_mm_leaf(RANGE_OPT_PARAM *param, KEY_PART *key_part,
const Item_bool_func *cond,
scalar_comparison_op op, Item *value);
+ Binlog_type_info binlog_type_info() const
+ {
+ DBUG_ASSERT(Field_num::type() == binlog_type());
+ return Binlog_type_info(Field_num::type(), 0, 0, binlog_signess());
+ }
};
@@ -1938,6 +2023,11 @@ public:
SEL_ARG *get_mm_leaf(RANGE_OPT_PARAM *param, KEY_PART *key_part,
const Item_bool_func *cond,
scalar_comparison_op op, Item *value);
+ Binlog_type_info binlog_type_info() const
+ {
+ DBUG_ASSERT(Field_str::type() == binlog_type());
+ return Binlog_type_info(Field_str::type(), 0, 0, charset());
+ }
};
/* base class for Field_string, Field_varstring and Field_blob */
@@ -2114,8 +2204,6 @@ public:
/* New decimal/numeric field which use fixed point arithmetic */
class Field_new_decimal :public Field_num {
-private:
- int save_field_metadata(uchar *first_byte);
public:
/* The maximum number of decimal digits can be stored */
uint precision;
@@ -2212,6 +2300,7 @@ public:
bool is_equal(const Column_definition &new_field) const;
virtual const uchar *unpack(uchar* to, const uchar *from, const uchar *from_end, uint param_data);
Item *get_equal_const_item(THD *thd, const Context &ctx, Item *const_item);
+ Binlog_type_info binlog_type_info() const;
};
@@ -2638,8 +2727,7 @@ public:
*/
return 0x1000000ULL;
}
-private:
- int save_field_metadata(uchar *first_byte);
+ Binlog_type_info binlog_type_info() const;
};
@@ -2703,8 +2791,7 @@ public:
*/
return 0x20000000000000ULL;
}
-private:
- int save_field_metadata(uchar *first_byte);
+ Binlog_type_info binlog_type_info() const;
};
@@ -3048,11 +3135,6 @@ public:
TIMESTAMP(0..6) - MySQL56 version
*/
class Field_timestampf :public Field_timestamp_with_dec {
- int save_field_metadata(uchar *metadata_ptr)
- {
- *metadata_ptr= (uchar) decimals();
- return 1;
- }
void store_TIMEVAL(const timeval &tv);
public:
Field_timestampf(uchar *ptr_arg,
@@ -3092,6 +3174,7 @@ public:
}
bool val_native(Native *to);
uint size_of() const { return sizeof(*this); }
+ Binlog_type_info binlog_type_info() const;
};
@@ -3376,11 +3459,6 @@ public:
*/
class Field_timef :public Field_time_with_dec {
void store_TIME(const MYSQL_TIME *ltime);
- int save_field_metadata(uchar *metadata_ptr)
- {
- *metadata_ptr= (uchar) decimals();
- return 1;
- }
public:
Field_timef(uchar *ptr_arg, uchar *null_ptr_arg, uchar null_bit_arg,
enum utype unireg_check_arg, const LEX_CSTRING *field_name_arg,
@@ -3419,6 +3497,7 @@ public:
int reset();
bool get_date(MYSQL_TIME *ltime, date_mode_t fuzzydate);
uint size_of() const { return sizeof(*this); }
+ Binlog_type_info binlog_type_info() const;
};
@@ -3545,11 +3624,6 @@ public:
class Field_datetimef :public Field_datetime_with_dec {
void store_TIME(const MYSQL_TIME *ltime);
bool get_TIME(MYSQL_TIME *ltime, const uchar *pos, date_mode_t fuzzydate) const;
- int save_field_metadata(uchar *metadata_ptr)
- {
- *metadata_ptr= (uchar) decimals();
- return 1;
- }
public:
Field_datetimef(uchar *ptr_arg, uchar *null_ptr_arg,
uchar null_bit_arg, enum utype unireg_check_arg,
@@ -3581,6 +3655,7 @@ public:
bool get_date(MYSQL_TIME *ltime, date_mode_t fuzzydate)
{ return Field_datetimef::get_TIME(ltime, ptr, fuzzydate); }
uint size_of() const { return sizeof(*this); }
+ Binlog_type_info binlog_type_info() const;
};
@@ -3726,8 +3801,7 @@ public:
sql_mode_t value_depends_on_sql_mode() const;
sql_mode_t can_handle_sql_mode_dependency_on_store() const;
void print_key_value(String *out, uint32 length);
-private:
- int save_field_metadata(uchar *first_byte);
+ Binlog_type_info binlog_type_info() const;
};
@@ -3843,8 +3917,7 @@ public:
void hash(ulong *nr, ulong *nr2);
uint length_size() const { return length_bytes; }
void print_key_value(String *out, uint32 length);
-private:
- int save_field_metadata(uchar *first_byte);
+ Binlog_type_info binlog_type_info() const;
};
@@ -3893,6 +3966,7 @@ private:
int key_cmp(const uchar *str, uint length) const
{ DBUG_ASSERT(0); return 0; }
using Field_varstring::key_cmp;
+ Binlog_type_info binlog_type_info() const;
};
@@ -4200,12 +4274,10 @@ public:
return table->file->can_convert_blob(this, new_type);
}
void print_key_value(String *out, uint32 length);
+ Binlog_type_info binlog_type_info() const;
friend void TABLE::remember_blob_values(String *blob_storage);
friend void TABLE::restore_blob_values(String *blob_storage);
-
-private:
- int save_field_metadata(uchar *first_byte);
};
@@ -4253,6 +4325,7 @@ private:
uchar *new_ptr, uint32 length,
uchar *new_null_ptr, uint new_null_bit)
{ DBUG_ASSERT(0); return 0; }
+ Binlog_type_info binlog_type_info() const;
};
@@ -4364,8 +4437,8 @@ public:
bool can_optimize_range(const Item_bool_func *cond,
const Item *item,
bool is_eq_func) const;
+ Binlog_type_info binlog_type_info() const;
private:
- int save_field_metadata(uchar *first_byte);
bool is_equal(const Column_definition &new_field) const;
};
@@ -4401,6 +4474,7 @@ public:
uint size_of() const { return sizeof(*this); }
const Type_handler *type_handler() const { return &type_handler_set; }
bool has_charset(void) const { return TRUE; }
+ Binlog_type_info binlog_type_info() const;
private:
const String empty_set_string;
};
@@ -4573,10 +4647,31 @@ public:
{
val_int_as_str(out, 1);
}
+ /**
+ Save the field metadata for bit fields.
+ Saves the bit length in the first byte and bytes in record in the
+ second byte of the field metadata array at index of *metadata_ptr and
+ *(metadata_ptr + 1).
+
+ @param metadata_ptr First byte of field metadata
+
+ @returns number of bytes written to metadata_ptr
+ */
+ Binlog_type_info binlog_type_info() const
+ {
+ DBUG_PRINT("debug", ("bit_len: %d, bytes_in_rec: %d",
+ bit_len, bytes_in_rec));
+ /*
+ Since this class and Field_bit_as_char have different ideas of
+ what should be stored here, we compute the values of the metadata
+ explicitly using the field_length.
+ */
+ return Binlog_type_info(type(),
+ field_length % 8 + ((field_length / 8) << 8), 2);
+ }
private:
virtual size_t do_last_null_byte() const;
- int save_field_metadata(uchar *first_byte);
};
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 351caa96446..560fec81f9e 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -3428,7 +3428,8 @@ Table_map_log_event::Table_map_log_event(const char *buf, uint event_len,
m_colcnt(0), m_coltype(0),
m_memory(NULL), m_table_id(ULONGLONG_MAX), m_flags(0),
m_data_size(0), m_field_metadata(0), m_field_metadata_size(0),
- m_null_bits(0), m_meta_memory(NULL)
+ m_null_bits(0), m_meta_memory(NULL),
+ m_optional_metadata_len(0), m_optional_metadata(NULL)
{
unsigned int bytes_read= 0;
DBUG_ENTER("Table_map_log_event::Table_map_log_event(const char*,uint,...)");
@@ -3517,8 +3518,28 @@ Table_map_log_event::Table_map_log_event(const char *buf, uint event_len,
memcpy(m_field_metadata, ptr_after_colcnt, m_field_metadata_size);
ptr_after_colcnt= (uchar*)ptr_after_colcnt + m_field_metadata_size;
memcpy(m_null_bits, ptr_after_colcnt, num_null_bytes);
+ ptr_after_colcnt= (unsigned char*)ptr_after_colcnt + num_null_bytes;
+ }
+
+ bytes_read= (uint) (ptr_after_colcnt - (uchar *)buf);
+
+ /* After null_bits field, there are some new fields for extra metadata. */
+ if (bytes_read < event_len)
+ {
+ m_optional_metadata_len= event_len - bytes_read;
+ m_optional_metadata=
+ static_cast<unsigned char*>(my_malloc(m_optional_metadata_len, MYF(MY_WME)));
+ memcpy(m_optional_metadata, ptr_after_colcnt, m_optional_metadata_len);
}
}
+#ifdef MYSQL_SERVER
+ if (!m_table)
+ DBUG_VOID_RETURN;
+ binlog_type_info_array= (Binlog_type_info *)thd->alloc(m_table->s->fields *
+ sizeof(Binlog_type_info));
+ for (uint i= 0; i < m_table->s->fields; i++)
+ binlog_type_info_array[i]= m_table->field[i]->binlog_type_info();
+#endif
DBUG_VOID_RETURN;
}
@@ -3528,6 +3549,237 @@ Table_map_log_event::~Table_map_log_event()
{
my_free(m_meta_memory);
my_free(m_memory);
+ my_free(m_optional_metadata);
+ m_optional_metadata= NULL;
+}
+
+/**
+ Parses SIGNEDNESS field.
+
+ @param[out] vec stores the signedness flags extracted from field.
+ @param[in] field SIGNEDNESS field in table_map_event.
+ @param[in] length length of the field
+ */
+static void parse_signedness(std::vector<bool> &vec,
+ unsigned char *field, unsigned int length)
+{
+ for (unsigned int i= 0; i < length; i++)
+ {
+ for (unsigned char c= 0x80; c != 0; c>>= 1)
+ vec.push_back(field[i] & c);
+ }
+}
+
+/**
+ Parses DEFAULT_CHARSET field.
+
+ @param[out] default_charset stores collation numbers extracted from field.
+ @param[in] field DEFAULT_CHARSET field in table_map_event.
+ @param[in] length length of the field
+ */
+static void parse_default_charset(Table_map_log_event::Optional_metadata_fields::
+ Default_charset &default_charset,
+ unsigned char *field, unsigned int length)
+{
+ unsigned char* p= field;
+
+ default_charset.default_charset= net_field_length(&p);
+ while (p < field + length)
+ {
+ unsigned int col_index= net_field_length(&p);
+ unsigned int col_charset= net_field_length(&p);
+
+ default_charset.charset_pairs.push_back(std::make_pair(col_index,
+ col_charset));
+ }
+}
+
+/**
+ Parses COLUMN_CHARSET field.
+
+ @param[out] vec stores collation numbers extracted from field.
+ @param[in] field COLUMN_CHARSET field in table_map_event.
+ @param[in] length length of the field
+ */
+static void parse_column_charset(std::vector<unsigned int> &vec,
+ unsigned char *field, unsigned int length)
+{
+ unsigned char* p= field;
+
+ while (p < field + length)
+ vec.push_back(net_field_length(&p));
+}
+
+/**
+ Parses COLUMN_NAME field.
+
+ @param[out] vec stores column names extracted from field.
+ @param[in] field COLUMN_NAME field in table_map_event.
+ @param[in] length length of the field
+ */
+static void parse_column_name(std::vector<std::string> &vec,
+ unsigned char *field, unsigned int length)
+{
+ unsigned char* p= field;
+
+ while (p < field + length)
+ {
+ unsigned len= net_field_length(&p);
+ vec.push_back(std::string(reinterpret_cast<char *>(p), len));
+ p+= len;
+ }
+}
+
+/**
+ Parses SET_STR_VALUE/ENUM_STR_VALUE field.
+
+ @param[out] vec stores SET/ENUM column's string values extracted from
+ field. Each SET/ENUM column's string values are stored
+ into a string separate vector. All of them are stored
+ in 'vec'.
+ @param[in] field COLUMN_NAME field in table_map_event.
+ @param[in] length length of the field
+ */
+static void parse_set_str_value(std::vector<Table_map_log_event::
+ Optional_metadata_fields::str_vector> &vec,
+ unsigned char *field, unsigned int length)
+{
+ unsigned char* p= field;
+
+ while (p < field + length)
+ {
+ unsigned int count= net_field_length(&p);
+
+ vec.push_back(std::vector<std::string>());
+ for (unsigned int i= 0; i < count; i++)
+ {
+ unsigned len1= net_field_length(&p);
+ vec.back().push_back(std::string(reinterpret_cast<char *>(p), len1));
+ p+= len1;
+ }
+ }
+}
+
+/**
+ Parses GEOMETRY_TYPE field.
+
+ @param[out] vec stores geometry column's types extracted from field.
+ @param[in] field GEOMETRY_TYPE field in table_map_event.
+ @param[in] length length of the field
+ */
+static void parse_geometry_type(std::vector<unsigned int> &vec,
+ unsigned char *field, unsigned int length)
+{
+ unsigned char* p= field;
+
+ while (p < field + length)
+ vec.push_back(net_field_length(&p));
+}
+
+/**
+ Parses SIMPLE_PRIMARY_KEY field.
+
+ @param[out] vec stores primary key's column information extracted from
+ field. Each column has an index and a prefix which are
+ stored as a unit_pair. prefix is always 0 for
+ SIMPLE_PRIMARY_KEY field.
+ @param[in] field SIMPLE_PRIMARY_KEY field in table_map_event.
+ @param[in] length length of the field
+ */
+static void parse_simple_pk(std::vector<Table_map_log_event::
+ Optional_metadata_fields::uint_pair> &vec,
+ unsigned char *field, unsigned int length)
+{
+ unsigned char* p= field;
+
+ while (p < field + length)
+ vec.push_back(std::make_pair(net_field_length(&p), 0));
+}
+
+/**
+ Parses PRIMARY_KEY_WITH_PREFIX field.
+
+ @param[out] vec stores primary key's column information extracted from
+ field. Each column has an index and a prefix which are
+ stored as a unit_pair.
+ @param[in] field PRIMARY_KEY_WITH_PREFIX field in table_map_event.
+ @param[in] length length of the field
+ */
+
+static void parse_pk_with_prefix(std::vector<Table_map_log_event::
+ Optional_metadata_fields::uint_pair> &vec,
+ unsigned char *field, unsigned int length)
+{
+ unsigned char* p= field;
+
+ while (p < field + length)
+ {
+ unsigned int col_index= net_field_length(&p);
+ unsigned int col_prefix= net_field_length(&p);
+ vec.push_back(std::make_pair(col_index, col_prefix));
+ }
+}
+
+Table_map_log_event::Optional_metadata_fields::
+Optional_metadata_fields(unsigned char* optional_metadata,
+ unsigned int optional_metadata_len)
+{
+ unsigned char* field= optional_metadata;
+
+ if (optional_metadata == NULL)
+ return;
+
+ while (field < optional_metadata + optional_metadata_len)
+ {
+ unsigned int len;
+ Optional_metadata_field_type type=
+ static_cast<Optional_metadata_field_type>(field[0]);
+
+ // Get length and move field to the value.
+ field++;
+ len= net_field_length(&field);
+
+ switch(type)
+ {
+ case SIGNEDNESS:
+ parse_signedness(m_signedness, field, len);
+ break;
+ case DEFAULT_CHARSET:
+ parse_default_charset(m_default_charset, field, len);
+ break;
+ case COLUMN_CHARSET:
+ parse_column_charset(m_column_charset, field, len);
+ break;
+ case COLUMN_NAME:
+ parse_column_name(m_column_name, field, len);
+ break;
+ case SET_STR_VALUE:
+ parse_set_str_value(m_set_str_value, field, len);
+ break;
+ case ENUM_STR_VALUE:
+ parse_set_str_value(m_enum_str_value, field, len);
+ break;
+ case GEOMETRY_TYPE:
+ parse_geometry_type(m_geometry_type, field, len);
+ break;
+ case SIMPLE_PRIMARY_KEY:
+ parse_simple_pk(m_primary_key, field, len);
+ break;
+ case PRIMARY_KEY_WITH_PREFIX:
+ parse_pk_with_prefix(m_primary_key, field, len);
+ break;
+ case ENUM_AND_SET_DEFAULT_CHARSET:
+ parse_default_charset(m_enum_and_set_default_charset, field, len);
+ break;
+ case ENUM_AND_SET_COLUMN_CHARSET:
+ parse_column_charset(m_enum_and_set_column_charset, field, len);
+ break;
+ default:
+ DBUG_ASSERT(0);
+ }
+ // next field
+ field+= len;
+ }
}
diff --git a/sql/log_event.h b/sql/log_event.h
index 274182af841..88a6e06c506 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -35,6 +35,11 @@
#include <my_bitmap.h>
#include "rpl_constants.h"
+#include <vector>
+#include <string>
+#include <functional>
+#include <memory>
+#include <map>
#ifdef MYSQL_CLIENT
#include "sql_const.h"
@@ -791,7 +796,6 @@ enum Int_event_type
INVALID_INT_EVENT = 0, LAST_INSERT_ID_EVENT = 1, INSERT_ID_EVENT = 2
};
-
#ifdef MYSQL_SERVER
class String;
class MYSQL_BIN_LOG;
@@ -881,6 +885,7 @@ typedef struct st_print_event_info
statement for it.
*/
bool skip_replication;
+ bool print_table_metadata;
/*
These two caches are used by the row-based replication events to
@@ -4063,6 +4068,18 @@ private:
ninth is in the least significant bit of the second byte, and so
on. </td>
</tr>
+ <tr>
+ <td>optional metadata fields</td>
+ <td>optional metadata fields are stored in Type, Length, Value(TLV) format.
+ Type takes 1 byte. Length is a packed integer value. Values takes
+ Length bytes.
+ </td>
+ <td>There are some optional metadata defined. They are listed in the table
+ @ref Table_table_map_event_optional_metadata. Optional metadata fields
+ follow null_bits. Whether binlogging an optional metadata is decided by the
+ server. The order is not defined, so they can be binlogged in any order.
+ </td>
+ </tr>
</table>
@@ -4268,6 +4285,123 @@ private:
</tr>
</table>
+ The table below lists all optional metadata types, along with the numerical
+ identifier for it and the size and interpretation of meta-data used
+ to describe the type.
+
+ @anchor Table_table_map_event_optional_metadata
+ <table>
+ <caption>Table_map_event optional metadata types: numerical identifier and
+ metadata. Optional metadata fields are stored in TLV fields.
+ Format of values are described in this table. </caption>
+ <tr>
+ <th>Type</th>
+ <th>Description</th>
+ <th>Format</th>
+ </tr>
+ <tr>
+ <td>SIGNEDNESS</td>
+ <td>signedness of numeric colums. This is included for all values of
+ binlog_row_metadata.</td>
+ <td>For each numeric column, a bit indicates whether the numeric
+ colunm has unsigned flag. 1 means it is unsigned. The number of
+ bytes needed for this is int((column_count + 7) / 8). The order is
+ the same as the order of column_type field.</td>
+ </tr>
+ <tr>
+ <td>DEFAULT_CHARSET</td>
+ <td>Charsets of character columns. It has a default charset for
+ the case that most of character columns have same charset and the
+ most used charset is binlogged as default charset.Collation
+ numbers are binlogged for identifying charsets. They are stored in
+ packed length format. Either DEFAULT_CHARSET or COLUMN_CHARSET is
+ included for all values of binlog_row_metadata.</td>
+ <td>Default charset's collation is logged first. The charsets which are not
+ same to default charset are logged following default charset. They are
+ logged as column index and charset collation number pair sequence. The
+ column index is counted only in all character columns. The order is same to
+ the order of column_type
+ field. </td>
+ </tr>
+ <tr>
+ <td>COLUMN_CHARSET</td>
+ <td>Charsets of character columns. For the case that most of columns have
+ different charsets, this field is logged. It is never logged with
+ DEFAULT_CHARSET together. Either DEFAULT_CHARSET or COLUMN_CHARSET is
+ included for all values of binlog_row_metadata.</td>
+ <td>It is a collation number sequence for all character columns.</td>
+ </tr>
+ <tr>
+ <td>COLUMN_NAME</td>
+ <td>Names of columns. This is only included if
+ binlog_row_metadata=FULL.</td>
+ <td>A sequence of column names. For each column name, 1 byte for
+ the string length in bytes is followed by a string without null
+ terminator.</td>
+ </tr>
+ <tr>
+ <td>SET_STR_VALUE</td>
+ <td>The string values of SET columns. This is only included if
+ binlog_row_metadata=FULL.</td>
+ <td>For each SET column, a pack_length representing the value
+ count is followed by a sequence of length and string pairs. length
+ is the byte count in pack_length format. The string has no null
+ terminator.</td>
+ </tr>
+ <tr>
+ <td>ENUM_STR_VALUE</td>
+ <td>The string values is ENUM columns. This is only included
+ if binlog_row_metadata=FULL.</td>
+ <td>The format is the same as SET_STR_VALUE.</td>
+ </tr>
+ <tr>
+ <td>GEOMETRY_TYPE</td>
+ <td>The real type of geometry columns. This is only included
+ if binlog_row_metadata=FULL.</td>
+ <td>A sequence of real type of geometry columns are stored in pack_length
+ format. </td>
+ </tr>
+ <tr>
+ <td>SIMPLE_PRIMARY_KEY</td>
+ <td>The primary key without any prefix. This is only included
+ if binlog_row_metadata=FULL and there is a primary key where every
+ key part covers an entire column.</td>
+ <td>A sequence of column indexes. The indexes are stored in pack_length
+ format.</td>
+ </tr>
+ <tr>
+ <td>PRIMARY_KEY_WITH_PREFIX</td>
+ <td>The primary key with some prefix. It doesn't appear together with
+ SIMPLE_PRIMARY_KEY. This is only included if
+ binlog_row_metadata=FULL and there is a primary key where some key
+ part covers a prefix of the column.</td>
+ <td>A sequence of column index and prefix length pairs. Both
+ column index and prefix length are in pack_length format. Prefix length
+ 0 means that the whole column value is used.</td>
+ </tr>
+ <tr>
+ <td>ENUM_AND_SET_DEFAULT_CHARSET</td>
+ <td>Charsets of ENUM and SET columns. It has the same layout as
+ DEFAULT_CHARSET. If there are SET or ENUM columns and
+ binlog_row_metadata=FULL, exactly one of
+ ENUM_AND_SET_DEFAULT_CHARSET and ENUM_AND_SET_COLUMN_CHARSET
+ appears (the encoder chooses the representation that uses the
+ least amount of space). Otherwise, none of them appears.</td>
+ <td>The same format as for DEFAULT_CHARSET, except it counts ENUM
+ and SET columns rather than character columns.</td>
+ </tr>
+ <tr>
+ <td>ENUM_AND_SET_COLUMN_CHARSET</td>
+ <td>Charsets of ENUM and SET columns. It has the same layout as
+ COLUMN_CHARSET. If there are SET or ENUM columns and
+ binlog_row_metadata=FULL, exactly one of
+ ENUM_AND_SET_DEFAULT_CHARSET and ENUM_AND_SET_COLUMN_CHARSET
+ appears (the encoder chooses the representation that uses the
+ least amount of space). Otherwise, none of them appears.</td>
+ <td>The same format as for COLUMN_CHARSET, except it counts ENUM
+ and SET columns rather than character columns.</td>
+ </tr>
+ </table>
*/
class Table_map_log_event : public Log_event
{
@@ -4303,6 +4437,124 @@ public:
};
typedef uint16 flag_set;
+ /**
+ DEFAULT_CHARSET and COLUMN_CHARSET don't appear together, and
+ ENUM_AND_SET_DEFAULT_CHARSET and ENUM_AND_SET_COLUMN_CHARSET don't
+ appear together. They are just alternative ways to pack character
+ set information. When binlogging, it logs character sets in the
+ way that occupies least storage.
+
+ SIMPLE_PRIMARY_KEY and PRIMARY_KEY_WITH_PREFIX don't appear together.
+ SIMPLE_PRIMARY_KEY is for the primary keys which only use whole values of
+ pk columns. PRIMARY_KEY_WITH_PREFIX is
+ for the primary keys which just use part value of pk columns.
+ */
+ enum Optional_metadata_field_type
+ {
+ SIGNEDNESS = 1, // UNSIGNED flag of numeric columns
+ DEFAULT_CHARSET, /* Character set of string columns, optimized to
+ minimize space when many columns have the
+ same charset. */
+ COLUMN_CHARSET, /* Character set of string columns, optimized to
+ minimize space when columns have many
+ different charsets. */
+ COLUMN_NAME,
+ SET_STR_VALUE, // String value of SET columns
+ ENUM_STR_VALUE, // String value of ENUM columns
+ GEOMETRY_TYPE, // Real type of geometry columns
+ SIMPLE_PRIMARY_KEY, // Primary key without prefix
+ PRIMARY_KEY_WITH_PREFIX, // Primary key with prefix
+ ENUM_AND_SET_DEFAULT_CHARSET, /* Character set of enum and set
+ columns, optimized to minimize
+ space when many columns have the
+ same charset. */
+ ENUM_AND_SET_COLUMN_CHARSET, /* Character set of enum and set
+ columns, optimized to minimize
+ space when many columns have the
+ same charset. */
+ };
+ /**
+ Metadata_fields organizes m_optional_metadata into a structured format which
+ is easy to access.
+ */
+ // Values for binlog_row_metadata sysvar
+ enum enum_binlog_row_metadata
+ {
+ BINLOG_ROW_METADATA_NO_LOG= 0,
+ BINLOG_ROW_METADATA_MINIMAL= 1,
+ BINLOG_ROW_METADATA_FULL= 2
+ };
+ struct Optional_metadata_fields
+ {
+ typedef std::pair<unsigned int, unsigned int> uint_pair;
+ typedef std::vector<std::string> str_vector;
+
+ struct Default_charset
+ {
+ Default_charset() : default_charset(0) {}
+ bool empty() const { return default_charset == 0; }
+
+ // Default charset for the columns which are not in charset_pairs.
+ unsigned int default_charset;
+
+ /* The uint_pair means <column index, column charset number>. */
+ std::vector<uint_pair> charset_pairs;
+ };
+
+ // Contents of DEFAULT_CHARSET field is converted into Default_charset.
+ Default_charset m_default_charset;
+ // Contents of ENUM_AND_SET_DEFAULT_CHARSET are converted into
+ // Default_charset.
+ Default_charset m_enum_and_set_default_charset;
+ std::vector<bool> m_signedness;
+ // Character set number of every string column
+ std::vector<unsigned int> m_column_charset;
+ // Character set number of every ENUM or SET column.
+ std::vector<unsigned int> m_enum_and_set_column_charset;
+ std::vector<std::string> m_column_name;
+ // each str_vector stores values of one enum/set column
+ std::vector<str_vector> m_enum_str_value;
+ std::vector<str_vector> m_set_str_value;
+ std::vector<unsigned int> m_geometry_type;
+ /*
+ The uint_pair means <column index, prefix length>. Prefix length is 0 if
+ whole column value is used.
+ */
+ std::vector<uint_pair> m_primary_key;
+
+ /*
+ It parses m_optional_metadata and populates into above variables.
+
+ @param[in] optional_metadata points to the begin of optional metadata
+ fields in table_map_event.
+ @param[in] optional_metadata_len length of optional_metadata field.
+ */
+ Optional_metadata_fields(unsigned char* optional_metadata,
+ unsigned int optional_metadata_len);
+ };
+
+ /**
+ Print column metadata. Its format looks like:
+ # Columns(colume_name type, colume_name type, ...)
+ if colume_name field is not logged into table_map_log_event, then
+ only type is printed.
+
+ @@param[out] file the place where colume metadata is printed
+ @@param[in] The metadata extracted from optional metadata fields
+ */
+ void print_columns(IO_CACHE *file,
+ const Optional_metadata_fields &fields);
+ /**
+ Print primary information. Its format looks like:
+ # Primary Key(colume_name, column_name(prifix), ...)
+ if colume_name field is not logged into table_map_log_event, then
+ colume index is printed.
+
+ @@param[out] file the place where primary key is printed
+ @@param[in] The metadata extracted from optional metadata fields
+ */
+ void print_primary_key(IO_CACHE *file,
+ const Optional_metadata_fields &fields);
/* Special constants representing sets of flags */
enum
@@ -4369,6 +4621,51 @@ private:
#ifdef MYSQL_SERVER
TABLE *m_table;
+ Binlog_type_info *binlog_type_info_array;
+
+
+ // Metadata fields buffer
+ StringBuffer<1024> m_metadata_buf;
+
+ /**
+ Capture the optional metadata fields which should be logged into
+ table_map_log_event and serialize them into m_metadata_buf.
+ */
+ void init_metadata_fields();
+ bool init_signedness_field();
+ /**
+ Capture and serialize character sets. Character sets for
+ character columns (TEXT etc) and character sets for ENUM and SET
+ columns are stored in different metadata fields. The reason is
+ that TEXT character sets are included even when
+ binlog_row_metadata=MINIMAL, whereas ENUM and SET character sets
+ are included only when binlog_row_metadata=FULL.
+
+ @param include_type Predicate to determine if a given Field object
+ is to be included in the metadata field.
+
+ @param default_charset_type Type code when storing in "default
+ charset" format. (See comment above Table_maps_log_event in
+ libbinlogevents/include/rows_event.h)
+
+ @param column_charset_type Type code when storing in "column
+ charset" format. (See comment above Table_maps_log_event in
+ libbinlogevents/include/rows_event.h)
+ */
+ bool init_charset_field(bool(* include_type)(Binlog_type_info *, Field *),
+ Optional_metadata_field_type default_charset_type,
+ Optional_metadata_field_type column_charset_type);
+ bool init_column_name_field();
+ bool init_set_str_value_field();
+ bool init_enum_str_value_field();
+ bool init_geometry_type_field();
+ bool init_primary_key_field();
+#endif
+
+#ifdef MYSQL_CLIENT
+ class Charset_iterator;
+ class Default_charset_iterator;
+ class Column_charset_iterator;
#endif
char const *m_dbnam;
size_t m_dblen;
@@ -4390,6 +4687,8 @@ private:
ulong m_field_metadata_size;
uchar *m_null_bits;
uchar *m_meta_memory;
+ unsigned int m_optional_metadata_len;
+ unsigned char *m_optional_metadata;
};
@@ -5261,5 +5560,4 @@ int row_log_event_uncompress(const Format_description_log_event *description_eve
const char *src, ulong src_len, char* buf, ulong buf_size, bool* is_malloc,
char **dst, ulong *newlen);
-
#endif /* _log_event_h */
diff --git a/sql/log_event_client.cc b/sql/log_event_client.cc
index 57b217813e2..fef7add140c 100644
--- a/sql/log_event_client.cc
+++ b/sql/log_event_client.cc
@@ -26,10 +26,11 @@
#endif
-static bool pretty_print_str(IO_CACHE* cache, const char* str, int len)
+static bool pretty_print_str(IO_CACHE* cache, const char* str,
+ size_t len, bool identifier)
{
const char* end = str + len;
- if (my_b_write_byte(cache, '\''))
+ if (my_b_write_byte(cache, identifier ? '`' : '\''))
goto err;
while (str < end)
@@ -52,12 +53,39 @@ static bool pretty_print_str(IO_CACHE* cache, const char* str, int len)
if (unlikely(error))
goto err;
}
- return my_b_write_byte(cache, '\'');
+ return my_b_write_byte(cache, identifier ? '`' : '\'');
err:
return 1;
}
+/**
+ Print src as an string enclosed with "'"
+
+ @param[out] cache IO_CACHE where the string will be printed.
+ @param[in] str the string will be printed.
+ @param[in] len length of the string.
+*/
+static inline bool pretty_print_str(IO_CACHE* cache, const char* str,
+ size_t len)
+{
+ return pretty_print_str(cache, str, len, false);
+}
+
+/**
+ Print src as an identifier enclosed with "`"
+
+ @param[out] cache IO_CACHE where the identifier will be printed.
+ @param[in] str the string will be printed.
+ @param[in] len length of the string.
+ */
+static inline bool pretty_print_identifier(IO_CACHE* cache, const char* str,
+ size_t len)
+{
+ return pretty_print_str(cache, str, len, true);
+}
+
+
/**
Prints a "session_var=value" string. Used by mysqlbinlog to print some SET
@@ -257,6 +285,46 @@ err:
return 1;
}
+static inline bool is_numeric_type(uint type)
+{
+ switch (type)
+ {
+ case MYSQL_TYPE_TINY:
+ case MYSQL_TYPE_SHORT:
+ case MYSQL_TYPE_INT24:
+ case MYSQL_TYPE_LONG:
+ case MYSQL_TYPE_LONGLONG:
+ case MYSQL_TYPE_NEWDECIMAL:
+ case MYSQL_TYPE_FLOAT:
+ case MYSQL_TYPE_DOUBLE:
+ return true;
+ default:
+ return false;
+ }
+ return false;
+}
+
+static inline bool is_character_type(uint type)
+{
+ switch (type)
+ {
+ case MYSQL_TYPE_STRING:
+ case MYSQL_TYPE_VAR_STRING:
+ case MYSQL_TYPE_VARCHAR:
+ case MYSQL_TYPE_BLOB:
+ // Base class is blob for geom type
+ case MYSQL_TYPE_GEOMETRY:
+ return true;
+ default:
+ return false;
+ }
+}
+
+static inline bool is_enum_or_set_type(uint type) {
+ return type == MYSQL_TYPE_ENUM || type == MYSQL_TYPE_SET;
+}
+
+
/*
Log_event::print_header()
*/
@@ -3110,6 +3178,15 @@ bool Table_map_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
}
if (!print_event_info->short_form || print_event_info->print_row_count)
{
+
+ if (print_event_info->print_table_metadata)
+ {
+ Optional_metadata_fields fields(m_optional_metadata,
+ m_optional_metadata_len);
+
+ print_columns(&print_event_info->head_cache, fields);
+ print_primary_key(&print_event_info->head_cache, fields);
+ }
bool do_print_encoded=
print_event_info->base64_output_mode != BASE64_OUTPUT_NEVER &&
print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS &&
@@ -3127,6 +3204,396 @@ err:
return 1;
}
+/**
+ Interface for iterator over charset columns.
+*/
+class Table_map_log_event::Charset_iterator
+{
+ public:
+ typedef Table_map_log_event::Optional_metadata_fields::Default_charset
+ Default_charset;
+ virtual const CHARSET_INFO *next()= 0;
+ virtual ~Charset_iterator(){};
+ /**
+ Factory method to create an instance of the appropriate subclass.
+ */
+ static std::unique_ptr<Charset_iterator> create_charset_iterator(
+ const Default_charset &default_charset,
+ const std::vector<uint> &column_charset);
+};
+
+/**
+ Implementation of charset iterator for the DEFAULT_CHARSET type.
+*/
+class Table_map_log_event::Default_charset_iterator : public Charset_iterator
+{
+ public:
+ Default_charset_iterator(const Default_charset &default_charset)
+ : m_iterator(default_charset.charset_pairs.begin()),
+ m_end(default_charset.charset_pairs.end()),
+ m_column_index(0),
+ m_default_charset_info(
+ get_charset(default_charset.default_charset, 0)) {}
+
+ const CHARSET_INFO *next() override {
+ const CHARSET_INFO *ret;
+ if (m_iterator != m_end && m_iterator->first == m_column_index) {
+ ret = get_charset(m_iterator->second, 0);
+ m_iterator++;
+ } else
+ ret = m_default_charset_info;
+ m_column_index++;
+ return ret;
+ }
+ ~Default_charset_iterator(){};
+
+ private:
+ std::vector<Optional_metadata_fields::uint_pair>::const_iterator m_iterator,
+ m_end;
+ uint m_column_index;
+ const CHARSET_INFO *m_default_charset_info;
+};
+//Table_map_log_event::Default_charset_iterator::~Default_charset_iterator(){int a=8;a++; a--;};
+/**
+ Implementation of charset iterator for the COLUMNT_CHARSET type.
+*/
+class Table_map_log_event::Column_charset_iterator : public Charset_iterator
+{
+ public:
+ Column_charset_iterator(const std::vector<uint> &column_charset)
+ : m_iterator(column_charset.begin()), m_end(column_charset.end()) {}
+
+ const CHARSET_INFO *next() override {
+ const CHARSET_INFO *ret = nullptr;
+ if (m_iterator != m_end) {
+ ret = get_charset(*m_iterator, 0);
+ m_iterator++;
+ }
+ return ret;
+ }
+
+ ~Column_charset_iterator(){};
+ private:
+ std::vector<uint>::const_iterator m_iterator;
+ std::vector<uint>::const_iterator m_end;
+};
+//Table_map_log_event::Column_charset_iterator::~Column_charset_iterator(){int a=8;a++; a--;};
+
+std::unique_ptr<Table_map_log_event::Charset_iterator>
+Table_map_log_event::Charset_iterator::create_charset_iterator(
+ const Default_charset &default_charset,
+ const std::vector<uint> &column_charset)
+{
+ if (!default_charset.empty())
+ return std::unique_ptr<Charset_iterator>(
+ new Default_charset_iterator(default_charset));
+ else
+ return std::unique_ptr<Charset_iterator>(
+ new Column_charset_iterator(column_charset));
+}
+/**
+ return the string name of a type.
+
+ @param[in] type type of a column
+ @param[in|out] meta_ptr the meta_ptr of the column. If the type doesn't have
+ metadata, it will not change meta_ptr, otherwise
+ meta_ptr will be moved to the end of the column's
+ metadat.
+ @param[in] cs charset of the column if it is a character column.
+ @param[out] typestr buffer to storing the string name of the type
+ @param[in] typestr_length length of typestr
+ @param[in] geometry_type internal geometry_type
+ */
+static void get_type_name(uint type, unsigned char** meta_ptr,
+ const CHARSET_INFO *cs, char *typestr,
+ uint typestr_length, unsigned int geometry_type)
+{
+ switch (type) {
+ case MYSQL_TYPE_LONG:
+ my_snprintf(typestr, typestr_length, "%s", "INT");
+ break;
+ case MYSQL_TYPE_TINY:
+ my_snprintf(typestr, typestr_length, "TINYINT");
+ break;
+ case MYSQL_TYPE_SHORT:
+ my_snprintf(typestr, typestr_length, "SMALLINT");
+ break;
+ case MYSQL_TYPE_INT24:
+ my_snprintf(typestr, typestr_length, "MEDIUMINT");
+ break;
+ case MYSQL_TYPE_LONGLONG:
+ my_snprintf(typestr, typestr_length, "BIGINT");
+ break;
+ case MYSQL_TYPE_NEWDECIMAL:
+ my_snprintf(typestr, typestr_length, "DECIMAL(%d,%d)",
+ (*meta_ptr)[0], (*meta_ptr)[1]);
+ (*meta_ptr)+= 2;
+ break;
+ case MYSQL_TYPE_FLOAT:
+ my_snprintf(typestr, typestr_length, "FLOAT");
+ (*meta_ptr)++;
+ break;
+ case MYSQL_TYPE_DOUBLE:
+ my_snprintf(typestr, typestr_length, "DOUBLE");
+ (*meta_ptr)++;
+ break;
+ case MYSQL_TYPE_BIT:
+ my_snprintf(typestr, typestr_length, "BIT(%d)",
+ (((*meta_ptr)[0])) + (*meta_ptr)[1]*8);
+ (*meta_ptr)+= 2;
+ break;
+ case MYSQL_TYPE_TIMESTAMP2:
+ if (**meta_ptr != 0)
+ my_snprintf(typestr, typestr_length, "TIMESTAMP(%d)", **meta_ptr);
+ else
+ my_snprintf(typestr, typestr_length, "TIMESTAMP");
+ (*meta_ptr)++;
+ break;
+ case MYSQL_TYPE_DATETIME2:
+ if (**meta_ptr != 0)
+ my_snprintf(typestr, typestr_length, "DATETIME(%d)", **meta_ptr);
+ else
+ my_snprintf(typestr, typestr_length, "DATETIME");
+ (*meta_ptr)++;
+ break;
+ case MYSQL_TYPE_TIME2:
+ if (**meta_ptr != 0)
+ my_snprintf(typestr, typestr_length, "TIME(%d)", **meta_ptr);
+ else
+ my_snprintf(typestr, typestr_length, "TIME");
+ (*meta_ptr)++;
+ break;
+ case MYSQL_TYPE_NEWDATE:
+ case MYSQL_TYPE_DATE:
+ my_snprintf(typestr, typestr_length, "DATE");
+ break;
+ case MYSQL_TYPE_YEAR:
+ my_snprintf(typestr, typestr_length, "YEAR");
+ break;
+ case MYSQL_TYPE_ENUM:
+ my_snprintf(typestr, typestr_length, "ENUM");
+ (*meta_ptr)+= 2;
+ break;
+ case MYSQL_TYPE_SET:
+ my_snprintf(typestr, typestr_length, "SET");
+ (*meta_ptr)+= 2;
+ break;
+ case MYSQL_TYPE_BLOB:
+ {
+ bool is_text= (cs && cs->number != my_charset_bin.number);
+ const char *names[5][2] = {
+ {"INVALID_BLOB(%d)", "INVALID_TEXT(%d)"},
+ {"TINYBLOB", "TINYTEXT"},
+ {"BLOB", "TEXT"},
+ {"MEDIUMBLOB", "MEDIUMTEXT"},
+ {"LONGBLOB", "LONGTEXT"}
+ };
+ unsigned char size= **meta_ptr;
+
+ if (size == 0 || size > 4)
+ my_snprintf(typestr, typestr_length, names[0][is_text], size);
+ else
+ my_snprintf(typestr, typestr_length, names[**meta_ptr][is_text]);
+
+ (*meta_ptr)++;
+ }
+ break;
+ case MYSQL_TYPE_VARCHAR:
+ case MYSQL_TYPE_VAR_STRING:
+ if (cs && cs->number != my_charset_bin.number)
+ my_snprintf(typestr, typestr_length, "VARCHAR(%d)",
+ uint2korr(*meta_ptr)/cs->mbmaxlen);
+ else
+ my_snprintf(typestr, typestr_length, "VARBINARY(%d)",
+ uint2korr(*meta_ptr));
+
+ (*meta_ptr)+= 2;
+ break;
+ case MYSQL_TYPE_STRING:
+ {
+ uint byte0= (*meta_ptr)[0];
+ uint byte1= (*meta_ptr)[1];
+ uint len= (((byte0 & 0x30) ^ 0x30) << 4) | byte1;
+
+ if (cs && cs->number != my_charset_bin.number)
+ my_snprintf(typestr, typestr_length, "CHAR(%d)", len/cs->mbmaxlen);
+ else
+ my_snprintf(typestr, typestr_length, "BINARY(%d)", len);
+
+ (*meta_ptr)+= 2;
+ }
+ break;
+ case MYSQL_TYPE_GEOMETRY:
+ {
+ const char* names[8] = {
+ "GEOMETRY", "POINT", "LINESTRING", "POLYGON", "MULTIPOINT",
+ "MULTILINESTRING", "MULTIPOLYGON", "GEOMETRYCOLLECTION"
+ };
+ if (geometry_type < 8)
+ my_snprintf(typestr, typestr_length, names[geometry_type]);
+ else
+ my_snprintf(typestr, typestr_length, "INVALID_GEOMETRY_TYPE(%u)",
+ geometry_type);
+ (*meta_ptr)++;
+ }
+ break;
+ default:
+ *typestr= 0;
+ break;
+ }
+}
+
+void Table_map_log_event::print_columns(IO_CACHE *file,
+ const Optional_metadata_fields &fields)
+{
+ unsigned char* field_metadata_ptr= m_field_metadata;
+ std::vector<bool>::const_iterator signedness_it= fields.m_signedness.begin();
+
+ std::unique_ptr<Charset_iterator> charset_it =
+ Charset_iterator::create_charset_iterator(fields.m_default_charset,
+ fields.m_column_charset);
+ std::unique_ptr<Charset_iterator> enum_and_set_charset_it =
+ Charset_iterator::create_charset_iterator(
+ fields.m_enum_and_set_default_charset,
+ fields.m_enum_and_set_column_charset);
+ std::vector<std::string>::const_iterator col_names_it=
+ fields.m_column_name.begin();
+ std::vector<Optional_metadata_fields::str_vector>::const_iterator
+ set_str_values_it= fields.m_set_str_value.begin();
+ std::vector<Optional_metadata_fields::str_vector>::const_iterator
+ enum_str_values_it= fields.m_enum_str_value.begin();
+ std::vector<unsigned int>::const_iterator geometry_type_it=
+ fields.m_geometry_type.begin();
+
+ uint geometry_type= 0;
+
+ my_b_printf(file, "# Columns(");
+
+ for (unsigned long i= 0; i < m_colcnt; i++)
+ {
+ uint real_type = m_coltype[i];
+ if (real_type == MYSQL_TYPE_STRING &&
+ (*field_metadata_ptr == MYSQL_TYPE_ENUM ||
+ *field_metadata_ptr == MYSQL_TYPE_SET))
+ real_type= *field_metadata_ptr;
+
+ // Get current column's collation id if it is a character, enum,
+ // or set column
+ const CHARSET_INFO *cs = NULL;
+ if (is_character_type(real_type))
+ cs = charset_it->next();
+ else if (is_enum_or_set_type(real_type))
+ cs = enum_and_set_charset_it->next();
+
+ // Print column name
+ if (col_names_it != fields.m_column_name.end())
+ {
+ pretty_print_identifier(file, col_names_it->c_str(), col_names_it->size());
+ my_b_printf(file, " ");
+ col_names_it++;
+ }
+
+
+ // update geometry_type for geometry columns
+ if (real_type == MYSQL_TYPE_GEOMETRY)
+ {
+ geometry_type= (geometry_type_it != fields.m_geometry_type.end()) ?
+ *geometry_type_it++ : 0;
+ }
+
+ // print column type
+ const uint TYPE_NAME_LEN = 100;
+ char type_name[TYPE_NAME_LEN];
+ get_type_name(real_type, &field_metadata_ptr, cs, type_name,
+ TYPE_NAME_LEN, geometry_type);
+
+ if (type_name[0] == '\0')
+ {
+ my_b_printf(file, "INVALID_TYPE(%d)", real_type);
+ continue;
+ }
+ my_b_printf(file, "%s", type_name);
+
+ // Print UNSIGNED for numeric column
+ if (is_numeric_type(real_type) &&
+ signedness_it != fields.m_signedness.end())
+ {
+ if (*signedness_it == true)
+ my_b_printf(file, " UNSIGNED");
+ signedness_it++;
+ }
+
+ // if the column is not marked as 'null', print 'not null'
+ if (!(m_null_bits[(i / 8)] & (1 << (i % 8))))
+ my_b_printf(file, " NOT NULL");
+
+ // Print string values of SET and ENUM column
+ const Optional_metadata_fields::str_vector *str_values= NULL;
+ if (real_type == MYSQL_TYPE_ENUM &&
+ enum_str_values_it != fields.m_enum_str_value.end())
+ {
+ str_values= &(*enum_str_values_it);
+ enum_str_values_it++;
+ }
+ else if (real_type == MYSQL_TYPE_SET &&
+ set_str_values_it != fields.m_set_str_value.end())
+ {
+ str_values= &(*set_str_values_it);
+ set_str_values_it++;
+ }
+
+ if (str_values != NULL)
+ {
+ const char *separator= "(";
+ for (Optional_metadata_fields::str_vector::const_iterator it=
+ str_values->begin(); it != str_values->end(); it++)
+ {
+ my_b_printf(file, "%s", separator);
+ pretty_print_str(file, it->c_str(), it->size());
+ separator= ",";
+ }
+ my_b_printf(file, ")");
+ }
+ // Print column character set, except in text columns with binary collation
+ if (cs != NULL &&
+ (is_enum_or_set_type(real_type) || cs->number != my_charset_bin.number))
+ my_b_printf(file, " CHARSET %s COLLATE %s", cs->csname, cs->name);
+ if (i != m_colcnt - 1) my_b_printf(file, ",\n# ");
+ }
+ my_b_printf(file, ")");
+ my_b_printf(file, "\n");
+}
+
+void Table_map_log_event::print_primary_key
+ (IO_CACHE *file,const Optional_metadata_fields &fields)
+{
+ if (!fields.m_primary_key.empty())
+ {
+ my_b_printf(file, "# Primary Key(");
+
+ std::vector<Optional_metadata_fields::uint_pair>::const_iterator it=
+ fields.m_primary_key.begin();
+
+ for (; it != fields.m_primary_key.end(); it++)
+ {
+ if (it != fields.m_primary_key.begin())
+ my_b_printf(file, ", ");
+
+ // Print column name or column index
+ if (it->first >= fields.m_column_name.size())
+ my_b_printf(file, "%u", it->first);
+ else
+ my_b_printf(file, "%s", fields.m_column_name[it->first].c_str());
+
+ // Print prefix length
+ if (it->second != 0)
+ my_b_printf(file, "(%u)", it->second);
+ }
+
+ my_b_printf(file, ")\n");
+ }
+}
+
bool Write_rows_log_event::print(FILE *file, PRINT_EVENT_INFO* print_event_info)
{
diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc
index ba7fb9b8267..1fd329b3427 100644
--- a/sql/log_event_server.cc
+++ b/sql/log_event_server.cc
@@ -5914,10 +5914,19 @@ int Table_map_log_event::save_field_metadata()
{
DBUG_ENTER("Table_map_log_event::save_field_metadata");
int index= 0;
+ Binlog_type_info *info;
for (unsigned int i= 0 ; i < m_table->s->fields ; i++)
{
DBUG_PRINT("debug", ("field_type: %d", m_coltype[i]));
- index+= m_table->s->field[i]->save_field_metadata(&m_field_metadata[index]);
+ //index+= m_table->s->field[i]->save_field_metadata(&m_field_metadata[index]);
+ info= binlog_type_info_array + i;
+ memcpy(&m_field_metadata[index], (uchar *)&info->m_metadata, info->m_metadata_size);
+ index+= info->m_metadata_size;
+ DBUG_EXECUTE_IF("inject_invalid_blob_size",
+ {
+ if (m_coltype[i] == MYSQL_TYPE_BLOB)
+ m_field_metadata[index-1] = 5;
+ });
}
DBUG_RETURN(index);
}
@@ -5944,7 +5953,9 @@ Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid,
m_field_metadata(0),
m_field_metadata_size(0),
m_null_bits(0),
- m_meta_memory(NULL)
+ m_meta_memory(NULL),
+ m_optional_metadata_len(0),
+ m_optional_metadata(NULL)
{
uchar cbuf[MAX_INT_WIDTH];
uchar *cbuf_end;
@@ -5960,6 +5971,13 @@ Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid,
(tbl->s->db.str[tbl->s->db.length] == 0));
DBUG_ASSERT(tbl->s->table_name.str[tbl->s->table_name.length] == 0);
+#ifdef MYSQL_SERVER
+ binlog_type_info_array= (Binlog_type_info *)thd->alloc(m_table->s->fields *
+ sizeof(Binlog_type_info));
+ for (uint i= 0; i < m_table->s->fields; i++)
+ binlog_type_info_array[i]= m_table->field[i]->binlog_type_info();
+#endif
+
m_data_size= TABLE_MAP_HEADER_LEN;
DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master", m_data_size= 6;);
@@ -5977,7 +5995,8 @@ Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid,
{
m_coltype= reinterpret_cast<uchar*>(m_memory);
for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
- m_coltype[i]= m_table->field[i]->binlog_type();
+ m_coltype[i]= binlog_type_info_array[i].m_type_code;
+ DBUG_EXECUTE_IF("inject_invalid_column_type", m_coltype[1]= 230;);
}
/*
@@ -6016,6 +6035,9 @@ Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid,
if (m_table->field[i]->maybe_null())
m_null_bits[(i / 8)]+= 1 << (i % 8);
+ init_metadata_fields();
+ m_data_size+= m_metadata_buf.length();
+
DBUG_VOID_RETURN;
}
@@ -6310,9 +6332,393 @@ bool Table_map_log_event::write_data_body()
write_data(m_coltype, m_colcnt) ||
write_data(mbuf, (size_t) (mbuf_end - mbuf)) ||
write_data(m_field_metadata, m_field_metadata_size),
- write_data(m_null_bits, (m_colcnt + 7) / 8);
+ write_data(m_null_bits, (m_colcnt + 7) / 8) ||
+ write_data((const uchar*) m_metadata_buf.ptr(),
+ m_metadata_buf.length());
}
+/**
+ stores an integer into packed format.
+
+ @param[out] str_buf a buffer where the packed integer will be stored.
+ @param[in] length the integer will be packed.
+ */
+static inline
+void store_compressed_length(String &str_buf, ulonglong length)
+{
+ // Store Type and packed length
+ uchar buf[4];
+ uchar *buf_ptr = net_store_length(buf, length);
+
+ str_buf.append(reinterpret_cast<char *>(buf), buf_ptr-buf);
+}
+
+/**
+ Write data into str_buf with Type|Length|Value(TLV) format.
+
+ @param[out] str_buf a buffer where the field is stored.
+ @param[in] type type of the field
+ @param[in] length length of the field value
+ @param[in] value value of the field
+*/
+static inline
+bool write_tlv_field(String &str_buf,
+ enum Table_map_log_event::Optional_metadata_field_type
+ type, uint length, const uchar *value)
+{
+ /* type is stored in one byte, so it should never bigger than 255. */
+ DBUG_ASSERT(static_cast<int>(type) <= 255);
+ str_buf.append((char) type);
+ store_compressed_length(str_buf, length);
+ return str_buf.append(reinterpret_cast<const char *>(value), length);
+}
+
+/**
+ Write data into str_buf with Type|Length|Value(TLV) format.
+
+ @param[out] str_buf a buffer where the field is stored.
+ @param[in] type type of the field
+ @param[in] value value of the field
+*/
+static inline
+bool write_tlv_field(String &str_buf,
+ enum Table_map_log_event::Optional_metadata_field_type
+ type, const String &value)
+{
+ return write_tlv_field(str_buf, type, value.length(),
+ reinterpret_cast<const uchar *>(value.ptr()));
+}
+
+static inline bool is_character_field(Binlog_type_info *info_array, Field *field)
+{
+ Binlog_type_info *info= info_array + field->field_index;
+ if (!info->m_cs)
+ return 0;
+ if (info->m_set_typelib || info->m_enum_typelib)
+ return 0;
+ return 1;
+}
+
+static inline bool is_enum_or_set_field(Binlog_type_info *info_array, Field *field) {
+ Binlog_type_info *info= info_array + field->field_index;
+ if (info->m_set_typelib || info->m_enum_typelib)
+ return 1;
+ return 0;
+}
+
+
+void Table_map_log_event::init_metadata_fields()
+{
+ DBUG_ENTER("init_metadata_fields");
+ DBUG_EXECUTE_IF("simulate_no_optional_metadata", DBUG_VOID_RETURN;);
+
+ if (binlog_row_metadata == BINLOG_ROW_METADATA_NO_LOG)
+ DBUG_VOID_RETURN;
+ if (init_signedness_field() ||
+ init_charset_field(&is_character_field, DEFAULT_CHARSET,
+ COLUMN_CHARSET) ||
+ init_geometry_type_field())
+ {
+ m_metadata_buf.length(0);
+ DBUG_VOID_RETURN;
+ }
+
+ if (binlog_row_metadata == BINLOG_ROW_METADATA_FULL)
+ {
+ if (DBUG_EVALUATE_IF("dont_log_column_name", 0, init_column_name_field()) ||
+ init_charset_field(&is_enum_or_set_field, ENUM_AND_SET_DEFAULT_CHARSET,
+ ENUM_AND_SET_COLUMN_CHARSET) ||
+ init_set_str_value_field() ||
+ init_enum_str_value_field() ||
+ init_primary_key_field())
+ m_metadata_buf.length(0);
+ }
+ DBUG_VOID_RETURN;
+}
+
+bool Table_map_log_event::init_signedness_field()
+{
+ /* use it to store signed flags, each numeric column take a bit. */
+ StringBuffer<128> buf;
+ unsigned char flag= 0;
+ unsigned char mask= 0x80;
+ Binlog_type_info *info;
+
+ for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
+ {
+ info= binlog_type_info_array + i;
+ if (info->m_signess != Binlog_type_info::SIGNESS_NOT_RELEVANT)
+ {
+ if (info->m_signess == Binlog_type_info::UNSIGNED)
+ flag|= mask;
+ mask >>= 1;
+
+ // 8 fields are tested, store the result and clear the flag.
+ if (mask == 0)
+ {
+ buf.append(flag);
+ flag= 0;
+ mask= 0x80;
+ }
+ }
+ }
+
+ // Stores the signedness flags of last few columns
+ if (mask != 0x80)
+ buf.append(flag);
+
+ // The table has no numeric column, so don't log SIGNEDNESS field
+ if (buf.is_empty())
+ return false;
+
+ return write_tlv_field(m_metadata_buf, SIGNEDNESS, buf);
+}
+
+bool Table_map_log_event::init_charset_field(
+ bool (* include_type)(Binlog_type_info *, Field *),
+ Optional_metadata_field_type default_charset_type,
+ Optional_metadata_field_type column_charset_type)
+{
+ DBUG_EXECUTE_IF("simulate_init_charset_field_error", return true;);
+
+ std::map<uint, uint> collation_map;
+ // For counting characters columns
+ uint char_col_cnt= 0;
+
+ /* Find the collation number used by most fields */
+ for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
+ {
+ if ((*include_type)(binlog_type_info_array, m_table->field[i]))
+ {
+ collation_map[binlog_type_info_array[i].m_cs->number]++;
+ char_col_cnt++;
+ }
+ }
+
+ if (char_col_cnt == 0)
+ return false;
+
+ /* Find the most used collation */
+ uint most_used_collation= 0;
+ uint most_used_count= 0;
+ for (std::map<uint, uint>::iterator it= collation_map.begin();
+ it != collation_map.end(); it++)
+ {
+ if (it->second > most_used_count)
+ {
+ most_used_count= it->second;
+ most_used_collation= it->first;
+ }
+ }
+
+ /*
+ Comparing length of COLUMN_CHARSET field and COLUMN_CHARSET_WITH_DEFAULT
+ field to decide which field should be logged.
+
+ Length of COLUMN_CHARSET = character column count * collation id size.
+ Length of COLUMN_CHARSET_WITH_DEFAULT =
+ default collation_id size + count of columns not use default charset *
+ (column index size + collation id size)
+
+ Assume column index just uses 1 byte and collation number also uses 1 byte.
+ */
+ if (char_col_cnt * 1 < (1 + (char_col_cnt - most_used_count) * 2))
+ {
+ StringBuffer<512> buf;
+
+ /*
+ Stores character set information into COLUMN_CHARSET format,
+ character sets of all columns are stored one by one.
+ -----------------------------------------
+ | Charset number | .... |Charset number |
+ -----------------------------------------
+ */
+ for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
+ {
+ if (include_type(binlog_type_info_array, m_table->field[i]))
+ store_compressed_length(buf, binlog_type_info_array[i].m_cs->number);
+ }
+ return write_tlv_field(m_metadata_buf, column_charset_type, buf);
+ }
+ else
+ {
+ StringBuffer<512> buf;
+ uint char_column_index= 0;
+ uint default_collation= most_used_collation;
+
+ /*
+ Stores character set information into DEFAULT_CHARSET format,
+ First stores the default character set, and then stores the character
+ sets different to default character with their column index one by one.
+ --------------------------------------------------------
+ | Default Charset | Col Index | Charset number | ... |
+ --------------------------------------------------------
+ */
+
+ // Store the default collation number
+ store_compressed_length(buf, default_collation);
+
+ for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
+ {
+ if (include_type(binlog_type_info_array, m_table->field[i]))
+ {
+ Field_str *field= dynamic_cast<Field_str *>(m_table->field[i]);
+
+ if (field->charset()->number != default_collation)
+ {
+ store_compressed_length(buf, char_column_index);
+ store_compressed_length(buf, field->charset()->number);
+ }
+ char_column_index++;
+ }
+ }
+ return write_tlv_field(m_metadata_buf, default_charset_type, buf);
+ }
+}
+
+bool Table_map_log_event::init_column_name_field()
+{
+ StringBuffer<2048> buf;
+
+ for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
+ {
+ size_t len= m_table->field[i]->field_name.length;
+
+ store_compressed_length(buf, len);
+ buf.append(m_table->field[i]->field_name.str, len);
+ }
+ return write_tlv_field(m_metadata_buf, COLUMN_NAME, buf);
+}
+
+bool Table_map_log_event::init_set_str_value_field()
+{
+ StringBuffer<1024> buf;
+ TYPELIB *typelib;
+
+ /*
+ SET string values are stored in the same format:
+ ----------------------------------------------
+ | Value number | value1 len | value 1| .... | // first SET column
+ ----------------------------------------------
+ | Value number | value1 len | value 1| .... | // second SET column
+ ----------------------------------------------
+ */
+ for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
+ {
+ if ((typelib= binlog_type_info_array[i].m_set_typelib))
+ {
+ store_compressed_length(buf, typelib->count);
+ for (unsigned int i= 0; i < typelib->count; i++)
+ {
+ store_compressed_length(buf, typelib->type_lengths[i]);
+ buf.append(typelib->type_names[i], typelib->type_lengths[i]);
+ }
+ }
+ }
+ if (buf.length() > 0)
+ return write_tlv_field(m_metadata_buf, SET_STR_VALUE, buf);
+ return false;
+}
+
+bool Table_map_log_event::init_enum_str_value_field()
+{
+ StringBuffer<1024> buf;
+ TYPELIB *typelib;
+
+ /* ENUM is same to SET columns, see comment in init_set_str_value_field */
+ for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
+ {
+ if ((typelib= binlog_type_info_array[i].m_enum_typelib))
+ {
+ store_compressed_length(buf, typelib->count);
+ for (unsigned int i= 0; i < typelib->count; i++)
+ {
+ store_compressed_length(buf, typelib->type_lengths[i]);
+ buf.append(typelib->type_names[i], typelib->type_lengths[i]);
+ }
+ }
+ }
+
+ if (buf.length() > 0)
+ return write_tlv_field(m_metadata_buf, ENUM_STR_VALUE, buf);
+ return false;
+}
+
+bool Table_map_log_event::init_geometry_type_field()
+{
+ StringBuffer<256> buf;
+ uint geom_type;
+
+ /* Geometry type of geometry columns is stored one by one as packed length */
+ for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
+ {
+ if (binlog_type_info_array[i].m_type_code == MYSQL_TYPE_GEOMETRY)
+ {
+ geom_type= binlog_type_info_array[i].m_geom_type;
+ DBUG_EXECUTE_IF("inject_invalid_geometry_type", geom_type= 100;);
+ store_compressed_length(buf, geom_type);
+ }
+ }
+
+ if (buf.length() > 0)
+ return write_tlv_field(m_metadata_buf, GEOMETRY_TYPE, buf);
+ return false;
+}
+
+bool Table_map_log_event::init_primary_key_field()
+{
+ DBUG_EXECUTE_IF("simulate_init_primary_key_field_error", return true;);
+
+ if (unlikely(m_table->s->primary_key == MAX_KEY))
+ return false;
+
+ // If any key column uses prefix like KEY(c1(10)) */
+ bool has_prefix= false;
+ KEY *pk= m_table->key_info + m_table->s->primary_key;
+
+ DBUG_ASSERT(pk->user_defined_key_parts > 0);
+
+ /* Check if any key column uses prefix */
+ for (uint i= 0; i < pk->user_defined_key_parts; i++)
+ {
+ KEY_PART_INFO *key_part= pk->key_part+i;
+ if (key_part->length != m_table->field[key_part->fieldnr-1]->key_length())
+ {
+ has_prefix= true;
+ break;
+ }
+ }
+
+ StringBuffer<128> buf;
+
+ if (!has_prefix)
+ {
+ /* Index of PK columns are stored one by one. */
+ for (uint i= 0; i < pk->user_defined_key_parts; i++)
+ {
+ KEY_PART_INFO *key_part= pk->key_part+i;
+ store_compressed_length(buf, key_part->fieldnr-1);
+ }
+ return write_tlv_field(m_metadata_buf, SIMPLE_PRIMARY_KEY, buf);
+ }
+ else
+ {
+ /* Index of PK columns are stored with a prefix length one by one. */
+ for (uint i= 0; i < pk->user_defined_key_parts; i++)
+ {
+ KEY_PART_INFO *key_part= pk->key_part+i;
+ size_t prefix= 0;
+
+ store_compressed_length(buf, key_part->fieldnr-1);
+
+ // Store character length but not octet length
+ if (key_part->length != m_table->field[key_part->fieldnr-1]->key_length())
+ prefix= key_part->length / key_part->field->charset()->mbmaxlen;
+ store_compressed_length(buf, prefix);
+ }
+ return write_tlv_field(m_metadata_buf, PRIMARY_KEY_WITH_PREFIX, buf);
+ }
+}
#if defined(HAVE_REPLICATION)
/*
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 9b972a9af5e..7b36bcf861a 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -450,6 +450,7 @@ my_bool opt_noacl;
my_bool sp_automatic_privileges= 1;
ulong opt_binlog_rows_event_max_size;
+ulong binlog_row_metadata;
my_bool opt_master_verify_checksum= 0;
my_bool opt_slave_sql_verify_checksum= 1;
const char *binlog_format_names[]= {"MIXED", "STATEMENT", "ROW", NullS};
diff --git a/sql/mysqld.h b/sql/mysqld.h
index 5a53faec05f..a5dffe0c222 100644
--- a/sql/mysqld.h
+++ b/sql/mysqld.h
@@ -245,6 +245,7 @@ extern ulonglong max_binlog_cache_size, max_binlog_stmt_cache_size;
extern ulong max_binlog_size;
extern ulong slave_max_allowed_packet;
extern ulong opt_binlog_rows_event_max_size;
+extern ulong binlog_row_metadata;
extern ulong thread_cache_size;
extern ulong stored_program_cache_size;
extern ulong opt_slave_parallel_threads;
diff --git a/sql/sql_type_geom.cc b/sql/sql_type_geom.cc
index 9bd8e3935b8..c5a4626c58e 100644
--- a/sql/sql_type_geom.cc
+++ b/sql/sql_type_geom.cc
@@ -912,4 +912,11 @@ uint Field_geom::get_key_image(uchar *buff,uint length, imagetype type_arg)
return Field_blob::get_key_image_itRAW(buff, length);
}
+Binlog_type_info Field_geom::binlog_type_info() const
+{
+ DBUG_ASSERT(Field_geom::type() == binlog_type());
+ return Binlog_type_info(Field_geom::type(), pack_length_no_ptr(), 1,
+ field_charset(), type_handler_geom()->geometry_type());
+}
+
#endif // HAVE_SPATIAL
diff --git a/sql/sql_type_geom.h b/sql/sql_type_geom.h
index 699b5280611..d8d03138fe0 100644
--- a/sql/sql_type_geom.h
+++ b/sql/sql_type_geom.h
@@ -435,6 +435,7 @@ public:
{
out->append(STRING_WITH_LEN("unprintable_geometry_value"));
}
+ Binlog_type_info binlog_type_info() const;
};
#endif // HAVE_SPATIAL
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index ca9988b660e..1ba58bf71c4 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -6209,6 +6209,19 @@ static Sys_var_enum Sys_binlog_row_image(
SESSION_VAR(binlog_row_image), CMD_LINE(REQUIRED_ARG),
binlog_row_image_names, DEFAULT(BINLOG_ROW_IMAGE_FULL));
+static const char *binlog_row_metadata_names[]= {"NO_LOG", "MINIMAL", "FULL", NullS};
+static Sys_var_enum Sys_binlog_row_metadata(
+ "binlog_row_metadata",
+ "Controls whether metadata is logged using FULL , MINIMAL format and NO_LOG."
+ "FULL causes all metadata to be logged; MINIMAL means that only "
+ "metadata actually required by slave is logged; NO_LOG NO metadata will be logged."
+ "Default: NO_LOG.",
+ GLOBAL_VAR(binlog_row_metadata), CMD_LINE(REQUIRED_ARG),
+ binlog_row_metadata_names, DEFAULT(Table_map_log_event::BINLOG_ROW_METADATA_NO_LOG),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(NULL),
+ ON_UPDATE(NULL));
+
+
static bool check_pseudo_slave_mode(sys_var *self, THD *thd, set_var *var)
{
longlong previous_val= thd->variables.pseudo_slave_mode;