diff options
Diffstat (limited to 'sql/slave.cc')
-rw-r--r-- | sql/slave.cc | 78 |
1 files changed, 66 insertions, 12 deletions
diff --git a/sql/slave.cc b/sql/slave.cc index 3a6517f54c4..f59a9142c8d 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -3772,7 +3772,7 @@ inline void update_state_of_relay_log(Relay_log_info *rli, Log_event *ev) } /* Check for an event that starts or stops a transaction */ - if (typ == QUERY_EVENT) + if (LOG_EVENT_IS_QUERY(typ)) { Query_log_event *qev= (Query_log_event*) ev; /* @@ -3912,7 +3912,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, */ DBUG_EXECUTE_IF("incomplete_group_in_relay_log", if ((typ == XID_EVENT) || - ((typ == QUERY_EVENT) && + (LOG_EVENT_IS_QUERY(typ) && strcmp("COMMIT", ((Query_log_event *) ev)->query) == 0)) { DBUG_ASSERT(thd->transaction.all.modified_non_trans_table); @@ -5710,6 +5710,10 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) bool gtid_skip_enqueue= false; bool got_gtid_event= false; rpl_gtid event_gtid; + bool is_compress_event = false; + char* new_buf = NULL; + char new_buf_arr[4096]; + bool is_malloc = false; /* FD_q must have been prepared for the first R_a event @@ -5756,7 +5760,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) // Emulate the network corruption DBUG_EXECUTE_IF("corrupt_queue_event", - if (buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT) + if ((uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT) { char *debug_event_buf_c = (char*) buf; int debug_cor_pos = rand() % (event_len - BINLOG_CHECKSUM_LEN); @@ -6190,6 +6194,51 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) inc_pos= event_len; } break; + /* + Binlog compressed event should uncompress in IO thread + */ + case QUERY_COMPRESSED_EVENT: + inc_pos= event_len; + if (query_event_uncompress(rli->relay_log.description_event_for_queue, + checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, + buf, event_len, new_buf_arr, sizeof(new_buf_arr), + &is_malloc, (char **)&new_buf, &event_len)) + { + char llbuf[22]; + error = ER_BINLOG_UNCOMPRESS_ERROR; + error_msg.append(STRING_WITH_LEN("binlog uncompress error, master log_pos: ")); + llstr(mi->master_log_pos, llbuf); + error_msg.append(llbuf, strlen(llbuf)); + goto err; + } + buf = new_buf; + is_compress_event = true; + goto default_action; + + case WRITE_ROWS_COMPRESSED_EVENT: + case UPDATE_ROWS_COMPRESSED_EVENT: + case DELETE_ROWS_COMPRESSED_EVENT: + case WRITE_ROWS_COMPRESSED_EVENT_V1: + case UPDATE_ROWS_COMPRESSED_EVENT_V1: + case DELETE_ROWS_COMPRESSED_EVENT_V1: + inc_pos = event_len; + { + if (row_log_event_uncompress(rli->relay_log.description_event_for_queue, + checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, + buf, event_len, new_buf_arr, sizeof(new_buf_arr), + &is_malloc, (char **)&new_buf, &event_len)) + { + char llbuf[22]; + error = ER_BINLOG_UNCOMPRESS_ERROR; + error_msg.append(STRING_WITH_LEN("binlog uncompress error, master log_pos: ")); + llstr(mi->master_log_pos, llbuf); + error_msg.append(llbuf, strlen(llbuf)); + goto err; + } + } + buf = new_buf; + is_compress_event = true; + goto default_action; #ifndef DBUG_OFF case XID_EVENT: @@ -6208,7 +6257,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) DBUG_EXECUTE_IF("kill_slave_io_after_2_events", { if (mi->dbug_do_disconnect && - (((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT) || + (LOG_EVENT_IS_QUERY((Log_event_type)(uchar)buf[EVENT_TYPE_OFFSET]) || ((uchar)buf[EVENT_TYPE_OFFSET] == TABLE_MAP_EVENT)) && (--mi->dbug_event_counter == 0)) { @@ -6221,7 +6270,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) DBUG_EXECUTE_IF("kill_slave_io_before_commit", { if ((uchar)buf[EVENT_TYPE_OFFSET] == XID_EVENT || - ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT && + ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT && /* QUERY_COMPRESSED_EVENT would never be commmit or rollback */ Query_log_event::peek_is_commit_rollback(buf, event_len, checksum_alg))) { @@ -6241,7 +6290,9 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) ++mi->events_queued_since_last_gtid; } - inc_pos= event_len; + if (!is_compress_event) + inc_pos= event_len; + break; } @@ -6332,8 +6383,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) /* everything is filtered out from non-master */ (s_id != mi->master_id || /* for the master meta information is necessary */ - (buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT && - buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT))) || + ((uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT && + (uchar)buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT))) || /* Check whether it needs to be filtered based on domain_id @@ -6362,9 +6413,9 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) */ if (!(s_id == global_system_variables.server_id && !mi->rli.replicate_same_server_id) || - (buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT && - buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT && - buf[EVENT_TYPE_OFFSET] != STOP_EVENT)) + ((uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT && + (uchar)buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT && + (uchar)buf[EVENT_TYPE_OFFSET] != STOP_EVENT)) { mi->master_log_pos+= inc_pos; memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN); @@ -6405,7 +6456,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) buf[EVENT_TYPE_OFFSET])) || (!mi->last_queued_gtid_standalone && ((uchar)buf[EVENT_TYPE_OFFSET] == XID_EVENT || - ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT && + ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT && /* QUERY_COMPRESSED_EVENT would never be commmit or rollback */ Query_log_event::peek_is_commit_rollback(buf, event_len, checksum_alg)))))) { @@ -6435,6 +6486,9 @@ err: mi->report(ERROR_LEVEL, error, NULL, ER_DEFAULT(error), error_msg.ptr()); + if(is_malloc) + my_free((void *)new_buf); + DBUG_RETURN(error); } |