summaryrefslogtreecommitdiff
path: root/sql/slave.cc
diff options
context:
space:
mode:
authorKristian Nielsen <knielsen@knielsen-hq.org>2016-11-03 14:48:51 +0100
committerKristian Nielsen <knielsen@knielsen-hq.org>2016-11-03 14:48:51 +0100
commitb002509b67860df9f0634709b5fb8a2ab98d130a (patch)
tree3c1fb6c45a3a2275efc18e003bae531e81b3395e /sql/slave.cc
parentd665e79c5b8582f44dc280e5e6df4a8ff4945623 (diff)
parent56a041cde657e5618c519a3c50e8075136d4a1ce (diff)
downloadmariadb-git-b002509b67860df9f0634709b5fb8a2ab98d130a.tar.gz
MDEV-11065: Compressed binary log. Merge code into current 10.2.
Conflicts: sql/share/errmsg-utf8.txt
Diffstat (limited to 'sql/slave.cc')
-rw-r--r--sql/slave.cc78
1 files changed, 66 insertions, 12 deletions
diff --git a/sql/slave.cc b/sql/slave.cc
index 3a6517f54c4..f59a9142c8d 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -3772,7 +3772,7 @@ inline void update_state_of_relay_log(Relay_log_info *rli, Log_event *ev)
}
/* Check for an event that starts or stops a transaction */
- if (typ == QUERY_EVENT)
+ if (LOG_EVENT_IS_QUERY(typ))
{
Query_log_event *qev= (Query_log_event*) ev;
/*
@@ -3912,7 +3912,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
*/
DBUG_EXECUTE_IF("incomplete_group_in_relay_log",
if ((typ == XID_EVENT) ||
- ((typ == QUERY_EVENT) &&
+ (LOG_EVENT_IS_QUERY(typ) &&
strcmp("COMMIT", ((Query_log_event *) ev)->query) == 0))
{
DBUG_ASSERT(thd->transaction.all.modified_non_trans_table);
@@ -5710,6 +5710,10 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
bool gtid_skip_enqueue= false;
bool got_gtid_event= false;
rpl_gtid event_gtid;
+ bool is_compress_event = false;
+ char* new_buf = NULL;
+ char new_buf_arr[4096];
+ bool is_malloc = false;
/*
FD_q must have been prepared for the first R_a event
@@ -5756,7 +5760,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
// Emulate the network corruption
DBUG_EXECUTE_IF("corrupt_queue_event",
- if (buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT)
+ if ((uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT)
{
char *debug_event_buf_c = (char*) buf;
int debug_cor_pos = rand() % (event_len - BINLOG_CHECKSUM_LEN);
@@ -6190,6 +6194,51 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
inc_pos= event_len;
}
break;
+ /*
+ Binlog compressed event should uncompress in IO thread
+ */
+ case QUERY_COMPRESSED_EVENT:
+ inc_pos= event_len;
+ if (query_event_uncompress(rli->relay_log.description_event_for_queue,
+ checksum_alg == BINLOG_CHECKSUM_ALG_CRC32,
+ buf, event_len, new_buf_arr, sizeof(new_buf_arr),
+ &is_malloc, (char **)&new_buf, &event_len))
+ {
+ char llbuf[22];
+ error = ER_BINLOG_UNCOMPRESS_ERROR;
+ error_msg.append(STRING_WITH_LEN("binlog uncompress error, master log_pos: "));
+ llstr(mi->master_log_pos, llbuf);
+ error_msg.append(llbuf, strlen(llbuf));
+ goto err;
+ }
+ buf = new_buf;
+ is_compress_event = true;
+ goto default_action;
+
+ case WRITE_ROWS_COMPRESSED_EVENT:
+ case UPDATE_ROWS_COMPRESSED_EVENT:
+ case DELETE_ROWS_COMPRESSED_EVENT:
+ case WRITE_ROWS_COMPRESSED_EVENT_V1:
+ case UPDATE_ROWS_COMPRESSED_EVENT_V1:
+ case DELETE_ROWS_COMPRESSED_EVENT_V1:
+ inc_pos = event_len;
+ {
+ if (row_log_event_uncompress(rli->relay_log.description_event_for_queue,
+ checksum_alg == BINLOG_CHECKSUM_ALG_CRC32,
+ buf, event_len, new_buf_arr, sizeof(new_buf_arr),
+ &is_malloc, (char **)&new_buf, &event_len))
+ {
+ char llbuf[22];
+ error = ER_BINLOG_UNCOMPRESS_ERROR;
+ error_msg.append(STRING_WITH_LEN("binlog uncompress error, master log_pos: "));
+ llstr(mi->master_log_pos, llbuf);
+ error_msg.append(llbuf, strlen(llbuf));
+ goto err;
+ }
+ }
+ buf = new_buf;
+ is_compress_event = true;
+ goto default_action;
#ifndef DBUG_OFF
case XID_EVENT:
@@ -6208,7 +6257,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
DBUG_EXECUTE_IF("kill_slave_io_after_2_events",
{
if (mi->dbug_do_disconnect &&
- (((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT) ||
+ (LOG_EVENT_IS_QUERY((Log_event_type)(uchar)buf[EVENT_TYPE_OFFSET]) ||
((uchar)buf[EVENT_TYPE_OFFSET] == TABLE_MAP_EVENT))
&& (--mi->dbug_event_counter == 0))
{
@@ -6221,7 +6270,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
DBUG_EXECUTE_IF("kill_slave_io_before_commit",
{
if ((uchar)buf[EVENT_TYPE_OFFSET] == XID_EVENT ||
- ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT &&
+ ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT && /* QUERY_COMPRESSED_EVENT would never be commmit or rollback */
Query_log_event::peek_is_commit_rollback(buf, event_len,
checksum_alg)))
{
@@ -6241,7 +6290,9 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
++mi->events_queued_since_last_gtid;
}
- inc_pos= event_len;
+ if (!is_compress_event)
+ inc_pos= event_len;
+
break;
}
@@ -6332,8 +6383,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
/* everything is filtered out from non-master */
(s_id != mi->master_id ||
/* for the master meta information is necessary */
- (buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT &&
- buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT))) ||
+ ((uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT &&
+ (uchar)buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT))) ||
/*
Check whether it needs to be filtered based on domain_id
@@ -6362,9 +6413,9 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
*/
if (!(s_id == global_system_variables.server_id &&
!mi->rli.replicate_same_server_id) ||
- (buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT &&
- buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT &&
- buf[EVENT_TYPE_OFFSET] != STOP_EVENT))
+ ((uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT &&
+ (uchar)buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT &&
+ (uchar)buf[EVENT_TYPE_OFFSET] != STOP_EVENT))
{
mi->master_log_pos+= inc_pos;
memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN);
@@ -6405,7 +6456,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
buf[EVENT_TYPE_OFFSET])) ||
(!mi->last_queued_gtid_standalone &&
((uchar)buf[EVENT_TYPE_OFFSET] == XID_EVENT ||
- ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT &&
+ ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT && /* QUERY_COMPRESSED_EVENT would never be commmit or rollback */
Query_log_event::peek_is_commit_rollback(buf, event_len,
checksum_alg))))))
{
@@ -6435,6 +6486,9 @@ err:
mi->report(ERROR_LEVEL, error, NULL, ER_DEFAULT(error),
error_msg.ptr());
+ if(is_malloc)
+ my_free((void *)new_buf);
+
DBUG_RETURN(error);
}