diff options
Diffstat (limited to 'sql/slave.cc')
-rw-r--r-- | sql/slave.cc | 83 |
1 files changed, 42 insertions, 41 deletions
diff --git a/sql/slave.cc b/sql/slave.cc index 09aa1f97936..8c213e0a3eb 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -171,7 +171,7 @@ static int safe_connect(THD* thd, MYSQL* mysql, Master_info* mi); static int safe_reconnect(THD*, MYSQL*, Master_info*, bool); static int connect_to_master(THD*, MYSQL*, Master_info*, bool, bool); static Log_event* next_event(rpl_group_info* rgi, ulonglong *event_size); -static int queue_event(Master_info* mi,const char* buf,ulong event_len); +static int queue_event(Master_info *mi,const uchar *buf, ulong event_len); static int terminate_slave_thread(THD *, mysql_mutex_t *, mysql_cond_t *, volatile uint *, bool); static bool check_io_slave_killed(Master_info *mi, const char *info); @@ -4794,6 +4794,8 @@ connected: thd->set_command(COM_SLAVE_IO); while (!io_slave_killed(mi)) { + const uchar *event_buf; + THD_STAGE_INFO(thd, stage_requesting_binlog_dump); if (request_dump(thd, mysql, mi, &suppress_warnings)) { @@ -4805,8 +4807,6 @@ connected: goto connected; } - const char *event_buf; - mi->slave_running= MYSQL_SLAVE_RUN_READING; DBUG_ASSERT(mi->last_error().number == 0); ulonglong lastchecktime = my_hrtime().val; @@ -4858,10 +4858,11 @@ Stopping slave I/O thread due to out-of-memory error from master"); retry_count=0; // ok event, reset retry counter THD_STAGE_INFO(thd, stage_queueing_master_event_to_the_relay_log); - event_buf= (const char*)mysql->net.read_pos + 1; + event_buf= mysql->net.read_pos + 1; mi->semi_ack= 0; if (repl_semisync_slave. - slave_read_sync_header((const char*)mysql->net.read_pos + 1, event_len, + slave_read_sync_header((const uchar*) mysql->net.read_pos + 1, + event_len, &(mi->semi_ack), &event_buf, &event_len)) { mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL, @@ -5941,13 +5942,13 @@ static int process_io_rotate(Master_info *mi, Rotate_log_event *rev) Reads a 3.23 event and converts it to the slave's format. This code was copied from MySQL 4.0. */ -static int queue_binlog_ver_1_event(Master_info *mi, const char *buf, - ulong event_len) +static int queue_binlog_ver_1_event(Master_info *mi, const uchar *buf, + ulong event_len) { const char *errmsg = 0; ulong inc_pos; bool ignore_event= 0; - char *tmp_buf = 0; + uchar *tmp_buf = 0; Relay_log_info *rli= &mi->rli; DBUG_ENTER("queue_binlog_ver_1_event"); @@ -5957,8 +5958,8 @@ static int queue_binlog_ver_1_event(Master_info *mi, const char *buf, */ if ((uchar)buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) { - if (unlikely(!(tmp_buf=(char*)my_malloc(key_memory_binlog_ver_1_event, - event_len+1,MYF(MY_WME))))) + if (unlikely(!(tmp_buf= (uchar*) my_malloc(key_memory_binlog_ver_1_event, + event_len+1, MYF(MY_WME))))) { mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL, ER(ER_SLAVE_FATAL_ERROR), "Memory allocation failed"); @@ -5974,7 +5975,7 @@ static int queue_binlog_ver_1_event(Master_info *mi, const char *buf, */ tmp_buf[event_len++]=0; int4store(tmp_buf+EVENT_LEN_OFFSET, event_len); - buf = (const char*)tmp_buf; + buf= tmp_buf; } /* This will transform LOAD_EVENT into CREATE_FILE_EVENT, ask the master to @@ -6061,8 +6062,8 @@ static int queue_binlog_ver_1_event(Master_info *mi, const char *buf, Reads a 4.0 event and converts it to the slave's format. This code was copied from queue_binlog_ver_1_event(), with some affordable simplifications. */ -static int queue_binlog_ver_3_event(Master_info *mi, const char *buf, - ulong event_len) +static int queue_binlog_ver_3_event(Master_info *mi, const uchar *buf, + ulong event_len) { const char *errmsg = 0; ulong inc_pos; @@ -6072,7 +6073,7 @@ static int queue_binlog_ver_3_event(Master_info *mi, const char *buf, /* read_log_event() will adjust log_pos to be end_log_pos */ Log_event *ev= - Log_event::read_log_event(buf,event_len, &errmsg, + Log_event::read_log_event(buf, event_len, &errmsg, mi->rli.relay_log.description_event_for_queue, 0); if (unlikely(!ev)) { @@ -6127,13 +6128,11 @@ err: setup with 3.23 master or 4.0 master */ -static int queue_old_event(Master_info *mi, const char *buf, - ulong event_len) +static int queue_old_event(Master_info *mi, const uchar *buf, ulong event_len) { DBUG_ENTER("queue_old_event"); - switch (mi->rli.relay_log.description_event_for_queue->binlog_version) - { + switch (mi->rli.relay_log.description_event_for_queue->binlog_version) { case 1: DBUG_RETURN(queue_binlog_ver_1_event(mi,buf,event_len)); case 3: @@ -6155,7 +6154,7 @@ static int queue_old_event(Master_info *mi, const char *buf, any >=5.0.0 format. */ -static int queue_event(Master_info* mi,const char* buf, ulong event_len) +static int queue_event(Master_info* mi, const uchar *buf, ulong event_len) { int error= 0; StringBuffer<1024> error_msg; @@ -6170,8 +6169,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) rpl_gtid event_gtid; static uint dbug_rows_event_count __attribute__((unused))= 0; bool is_compress_event = false; - char* new_buf = NULL; - char new_buf_arr[4096]; + uchar *new_buf = NULL; + uchar new_buf_arr[4096]; bool is_malloc = false; bool is_rows_event= false; /* @@ -6184,8 +6183,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) mi->checksum_alg_before_fd != BINLOG_CHECKSUM_ALG_UNDEF ? mi->checksum_alg_before_fd : mi->rli.relay_log.relay_log_checksum_alg; - char *save_buf= NULL; // needed for checksumming the fake Rotate event - char rot_buf[LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN + FN_REFLEN]; + const uchar *save_buf= NULL; // needed for checksumming the fake Rotate event + uchar rot_buf[LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN + FN_REFLEN]; DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_OFF || checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF || @@ -6219,9 +6218,9 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) // Emulate the network corruption DBUG_EXECUTE_IF("corrupt_queue_event", - if ((uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT) + if (buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT) { - char *debug_event_buf_c = (char*) buf; + uchar *debug_event_buf_c= const_cast<uchar*>(buf); int debug_cor_pos = rand() % (event_len - BINLOG_CHECKSUM_LEN); debug_event_buf_c[debug_cor_pos] =~ debug_event_buf_c[debug_cor_pos]; DBUG_PRINT("info", ("Corrupt the event at queue_event: byte on position %d", debug_cor_pos)); @@ -6229,7 +6228,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) } ); - if (event_checksum_test((uchar *) buf, event_len, checksum_alg)) + if (event_checksum_test((uchar*) buf, event_len, checksum_alg)) { error= ER_NETWORK_READ_EVENT_CHECKSUM_FAILURE; unlock_data_lock= FALSE; @@ -6237,7 +6236,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) } if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 && - (uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */) + buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */) DBUG_RETURN(queue_old_event(mi,buf,event_len)); #ifdef ENABLED_DEBUG_SYNC @@ -6263,7 +6262,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) #endif mysql_mutex_lock(&mi->data_lock); - switch ((uchar)buf[EVENT_TYPE_OFFSET]) { + switch (buf[EVENT_TYPE_OFFSET]) { case STOP_EVENT: /* We needn't write this event to the relay log. Indeed, it just indicates a @@ -6398,7 +6397,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) mi->rli.relay_log.relay_log_checksum_alg); /* the first one */ DBUG_ASSERT(mi->checksum_alg_before_fd != BINLOG_CHECKSUM_ALG_UNDEF); - save_buf= (char *) buf; + save_buf= buf; buf= rot_buf; } else @@ -6418,7 +6417,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) mi->rli.relay_log.relay_log_checksum_alg); /* the first one */ DBUG_ASSERT(mi->checksum_alg_before_fd != BINLOG_CHECKSUM_ALG_UNDEF); - save_buf= (char *) buf; + save_buf= buf; buf= rot_buf; } /* @@ -6503,7 +6502,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) error= ER_SLAVE_HEARTBEAT_FAILURE; error_msg.append(STRING_WITH_LEN("inconsistent heartbeat event content;")); error_msg.append(STRING_WITH_LEN("the event's data: log_file_name ")); - error_msg.append(hb.get_log_ident(), (uint) hb.get_ident_len()); + error_msg.append((char*) hb.get_log_ident(), (uint) hb.get_ident_len()); error_msg.append(STRING_WITH_LEN(" log_pos ")); error_msg.append_ulonglong(hb.log_pos); goto err; @@ -6529,7 +6528,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) error= ER_SLAVE_HEARTBEAT_FAILURE; error_msg.append(STRING_WITH_LEN("heartbeat is not compatible with local info;")); error_msg.append(STRING_WITH_LEN("the event's data: log_file_name ")); - error_msg.append(hb.get_log_ident(), (uint) hb.get_ident_len()); + error_msg.append((char*) hb.get_log_ident(), (uint) hb.get_ident_len()); error_msg.append(STRING_WITH_LEN(" log_pos ")); error_msg.append_ulonglong(hb.log_pos); goto err; @@ -6712,7 +6711,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) if (query_event_uncompress(rli->relay_log.description_event_for_queue, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, buf, event_len, new_buf_arr, sizeof(new_buf_arr), - &is_malloc, (char **)&new_buf, &event_len)) + &is_malloc, &new_buf, &event_len)) { char llbuf[22]; error = ER_BINLOG_UNCOMPRESS_ERROR; @@ -6735,8 +6734,9 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) { if (row_log_event_uncompress(rli->relay_log.description_event_for_queue, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, - buf, event_len, new_buf_arr, sizeof(new_buf_arr), - &is_malloc, (char **)&new_buf, &event_len)) + buf, event_len, new_buf_arr, + sizeof(new_buf_arr), + &is_malloc, &new_buf, &event_len)) { char llbuf[22]; error = ER_BINLOG_UNCOMPRESS_ERROR; @@ -6817,7 +6817,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) { if ((uchar)buf[EVENT_TYPE_OFFSET] == XID_EVENT || ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT && /* QUERY_COMPRESSED_EVENT would never be commmit or rollback */ - Query_log_event::peek_is_commit_rollback(buf, event_len, + Query_log_event::peek_is_commit_rollback(buf, + event_len, checksum_alg))) { error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; @@ -8073,19 +8074,19 @@ bool rpl_master_erroneous_autoinc(THD *thd) } -static bool get_row_event_stmt_end(const char* buf, +static bool get_row_event_stmt_end(const uchar *buf, const Format_description_log_event *fdle) { uint8 const common_header_len= fdle->common_header_len; Log_event_type event_type= (Log_event_type)(uchar)buf[EVENT_TYPE_OFFSET]; uint8 const post_header_len= fdle->post_header_len[event_type-1]; - const char *flag_start= buf + common_header_len; + const uchar *flag_start= buf + common_header_len; /* The term 4 below signifies that master is of 'an intermediate source', see Rows_log_event::Rows_log_event. */ - flag_start += RW_MAPID_OFFSET + ((post_header_len == 6) ? 4 : RW_FLAGS_OFFSET); + flag_start += RW_MAPID_OFFSET + ((post_header_len == 6) ? 4 : RW_FLAGS_OFFSET); return (uint2korr(flag_start) & Rows_log_event::STMT_END_F) != 0; } @@ -8110,8 +8111,8 @@ void Rows_event_tracker::reset() well as the end-of-statement status of the last one. */ -void Rows_event_tracker::update(const char* file_name, my_off_t pos, - const char* buf, +void Rows_event_tracker::update(const char *file_name, my_off_t pos, + const uchar *buf, const Format_description_log_event *fdle) { DBUG_ENTER("Rows_event_tracker::update"); |