diff options
Diffstat (limited to 'sql/slave.cc')
-rw-r--r-- | sql/slave.cc | 280 |
1 files changed, 171 insertions, 109 deletions
diff --git a/sql/slave.cc b/sql/slave.cc index 17379eda5a6..64a35ff0119 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -171,7 +171,7 @@ static int safe_connect(THD* thd, MYSQL* mysql, Master_info* mi); static int safe_reconnect(THD*, MYSQL*, Master_info*, bool); static int connect_to_master(THD*, MYSQL*, Master_info*, bool, bool); static Log_event* next_event(rpl_group_info* rgi, ulonglong *event_size); -static int queue_event(Master_info* mi,const char* buf,ulong event_len); +static int queue_event(Master_info *mi,const uchar *buf, ulong event_len); static int terminate_slave_thread(THD *, mysql_mutex_t *, mysql_cond_t *, volatile uint *, bool); static bool check_io_slave_killed(Master_info *mi, const char *info); @@ -315,9 +315,11 @@ build_gtid_pos_create_query(THD *thd, String *query, LEX_CSTRING *engine_name) { bool err= false; - err|= query->append(gtid_pos_table_definition1); + err|= query->append(gtid_pos_table_definition1, + sizeof(gtid_pos_table_definition1)-1); err|= append_identifier(thd, query, table_name); - err|= query->append(gtid_pos_table_definition2); + err|= query->append(gtid_pos_table_definition2, + sizeof(gtid_pos_table_definition2)-1); err|= append_identifier(thd, query, engine_name); return err; } @@ -348,8 +350,7 @@ gtid_pos_table_creation(THD *thd, plugin_ref engine, LEX_CSTRING *table_name) err= parser_state.init(thd, thd->query(), thd->query_length()); if (err) goto end; - mysql_parse(thd, thd->query(), thd->query_length(), &parser_state, - FALSE, FALSE); + mysql_parse(thd, thd->query(), thd->query_length(), &parser_state); if (unlikely(thd->is_error())) err= 1; /* The warning is relevant to 10.3 and earlier. */ @@ -1767,7 +1768,7 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi) { errmsg= err_buff2; snprintf(err_buff2, sizeof(err_buff2), - "Master reported unrecognized MySQL version: %s", + "Master reported unrecognized MariaDB version: %s", mysql->server_version); err_code= ER_SLAVE_FATAL_ERROR; sprintf(err_buff, ER_DEFAULT(err_code), err_buff2); @@ -1783,7 +1784,7 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi) case 2: errmsg= err_buff2; snprintf(err_buff2, sizeof(err_buff2), - "Master reported unrecognized MySQL version: %s", + "Master reported unrecognized MariaDB version: %s", mysql->server_version); err_code= ER_SLAVE_FATAL_ERROR; sprintf(err_buff, ER_DEFAULT(err_code), err_buff2); @@ -1956,7 +1957,7 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi) !mi->rli.replicate_same_server_id) { errmsg= "The slave I/O thread stops because master and slave have equal \ -MySQL server ids; these ids must be different for replication to work (or \ +MariaDB server ids; these ids must be different for replication to work (or \ the --replicate-same-server-id option must be used on slave but this does \ not always make sense; please check the manual before using it)."; err_code= ER_SLAVE_FATAL_ERROR; @@ -2029,7 +2030,8 @@ maybe it is a *VERY OLD MASTER*."); (master_res= mysql_store_result(mysql)) && (master_row= mysql_fetch_row(master_res))) { - if (strcmp(master_row[0], global_system_variables.collation_server->name)) + if (strcmp(master_row[0], + global_system_variables.collation_server->coll_name.str)) { errmsg= "The slave I/O thread stops because master and slave have \ different values for the COLLATION_SERVER global variable. The values must \ @@ -2116,7 +2118,7 @@ be equal for the Statement-format replication to work"; /* We use ERROR_LEVEL to get the error logged to file */ mi->report(ERROR_LEVEL, err_code, NULL, - "MySQL master doesn't have a TIME_ZONE variable. Note that" + "MariaDB master doesn't have a TIME_ZONE variable. Note that" "if your timezone is not same between master and slave, your " "slave may get wrong data into timestamp columns"); } @@ -2541,15 +2543,20 @@ after_set_capability: char quote_buf[2*sizeof(mi->master_log_name)+1]; char str_buf[28+2*sizeof(mi->master_log_name)+10]; String query(str_buf, sizeof(str_buf), system_charset_info); + size_t quote_length; + my_bool overflow; query.length(0); - query.append("SELECT binlog_gtid_pos('"); - escape_quotes_for_mysql(&my_charset_bin, quote_buf, sizeof(quote_buf), - mi->master_log_name, strlen(mi->master_log_name)); - query.append(quote_buf); - query.append("',"); + query.append(STRING_WITH_LEN("SELECT binlog_gtid_pos('")); + quote_length= escape_quotes_for_mysql(&my_charset_bin, quote_buf, + sizeof(quote_buf), + mi->master_log_name, + strlen(mi->master_log_name), + &overflow); + query.append(quote_buf, quote_length); + query.append(STRING_WITH_LEN("',")); query.append_ulonglong(mi->master_log_pos); - query.append(")"); + query.append(')'); if (!mysql_real_query(mysql, query.c_ptr_safe(), query.length()) && (master_res= mysql_store_result(mysql)) && @@ -3115,7 +3122,20 @@ void show_master_info_get_fields(THD *thd, List<Item> *field_list, } /* Text for Slave_IO_Running */ -static const char *slave_running[]= { "No", "Connecting", "Preparing", "Yes" }; +static const LEX_CSTRING slave_running[]= +{ + { STRING_WITH_LEN("No") }, + { STRING_WITH_LEN("Connecting") }, + { STRING_WITH_LEN("Preparing") }, + { STRING_WITH_LEN("Yes") } +}; + +static const LEX_CSTRING msg_yes= { STRING_WITH_LEN("Yes") }; +static const LEX_CSTRING msg_no= { STRING_WITH_LEN("No") }; +#ifndef HAVE_OPENSSL +static const LEX_CSTRING msg_ignored= { STRING_WITH_LEN("Ignored") }; +#endif + static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full, String *gtid_pos) @@ -3129,6 +3149,7 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full, Protocol *protocol= thd->protocol; Rpl_filter *rpl_filter= mi->rpl_filter; StringBuffer<256> tmp; + const char *msg; protocol->prepare_for_resend(); @@ -3146,11 +3167,13 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full, Show what the sql driver replication thread is doing This is only meaningful if there is only one slave thread. */ - protocol->store(mi->rli.sql_driver_thd ? - mi->rli.sql_driver_thd->get_proc_info() : "", - &my_charset_bin); + msg= (mi->rli.sql_driver_thd ? + mi->rli.sql_driver_thd->get_proc_info() : ""); + protocol->store_string_or_null(msg, &my_charset_bin); } - protocol->store(mi->io_thd ? mi->io_thd->get_proc_info() : "", &my_charset_bin); + msg= mi->io_thd ? mi->io_thd->get_proc_info() : ""; + protocol->store_string_or_null(msg, &my_charset_bin); + mysql_mutex_unlock(&mi->run_lock); mysql_mutex_lock(&mi->data_lock); @@ -3159,19 +3182,22 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full, mysql_mutex_lock(&mi->err_lock); /* err_lock is to protect mi->rli.last_error() */ mysql_mutex_lock(&mi->rli.err_lock); - protocol->store(mi->host, &my_charset_bin); - protocol->store(mi->user, &my_charset_bin); + protocol->store_string_or_null(mi->host, &my_charset_bin); + protocol->store_string_or_null(mi->user, &my_charset_bin); protocol->store((uint32) mi->port); protocol->store((uint32) mi->connect_retry); - protocol->store(mi->master_log_name, &my_charset_bin); - protocol->store((ulonglong) mi->master_log_pos); - protocol->store(mi->rli.group_relay_log_name + - dirname_length(mi->rli.group_relay_log_name), + protocol->store(mi->master_log_name, strlen(mi->master_log_name), &my_charset_bin); + protocol->store((ulonglong) mi->master_log_pos); + msg= (mi->rli.group_relay_log_name + + dirname_length(mi->rli.group_relay_log_name)); + protocol->store(msg, strlen(msg), &my_charset_bin); protocol->store((ulonglong) mi->rli.group_relay_log_pos); - protocol->store(mi->rli.group_master_log_name, &my_charset_bin); - protocol->store(slave_running[mi->slave_running], &my_charset_bin); - protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin); + protocol->store(mi->rli.group_master_log_name, + strlen(mi->rli.group_master_log_name), + &my_charset_bin); + protocol->store(&slave_running[mi->slave_running], &my_charset_bin); + protocol->store(mi->rli.slave_running ? &msg_yes : &msg_no, &my_charset_bin); protocol->store(rpl_filter->get_do_db()); protocol->store(rpl_filter->get_ignore_db()); @@ -3185,29 +3211,30 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full, protocol->store(&tmp); protocol->store(mi->rli.last_error().number); - protocol->store(mi->rli.last_error().message, &my_charset_bin); + protocol->store_string_or_null(mi->rli.last_error().message, + &my_charset_bin); protocol->store((uint32) mi->rli.slave_skip_counter); protocol->store((ulonglong) mi->rli.group_master_log_pos); protocol->store((ulonglong) mi->rli.log_space_total); - protocol->store( - mi->rli.until_condition==Relay_log_info::UNTIL_NONE ? "None": - ( mi->rli.until_condition==Relay_log_info::UNTIL_MASTER_POS? "Master": - ( mi->rli.until_condition==Relay_log_info::UNTIL_RELAY_POS? "Relay": - "Gtid")), &my_charset_bin); - protocol->store(mi->rli.until_log_name, &my_charset_bin); + msg= (mi->rli.until_condition==Relay_log_info::UNTIL_NONE ? "None" : + (mi->rli.until_condition==Relay_log_info::UNTIL_MASTER_POS? "Master": + (mi->rli.until_condition==Relay_log_info::UNTIL_RELAY_POS? "Relay": + "Gtid"))); + protocol->store(msg, strlen(msg), &my_charset_bin); + protocol->store_string_or_null(mi->rli.until_log_name, &my_charset_bin); protocol->store((ulonglong) mi->rli.until_log_pos); #ifdef HAVE_OPENSSL - protocol->store(mi->ssl? "Yes":"No", &my_charset_bin); + protocol->store(mi->ssl ? &msg_yes : &msg_no, &my_charset_bin); #else - protocol->store(mi->ssl? "Ignored":"No", &my_charset_bin); + protocol->store(mi->ssl ? &msg_ignored: &msg_no, &my_charset_bin); #endif - protocol->store(mi->ssl_ca, &my_charset_bin); - protocol->store(mi->ssl_capath, &my_charset_bin); - protocol->store(mi->ssl_cert, &my_charset_bin); - protocol->store(mi->ssl_cipher, &my_charset_bin); - protocol->store(mi->ssl_key, &my_charset_bin); + protocol->store_string_or_null(mi->ssl_ca, &my_charset_bin); + protocol->store_string_or_null(mi->ssl_capath, &my_charset_bin); + protocol->store_string_or_null(mi->ssl_cert, &my_charset_bin); + protocol->store_string_or_null(mi->ssl_cipher, &my_charset_bin); + protocol->store_string_or_null(mi->ssl_key, &my_charset_bin); /* Seconds_Behind_Master: if SQL thread is running and I/O thread is @@ -3262,27 +3289,30 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full, { protocol->store_null(); } - protocol->store(mi->ssl_verify_server_cert? "Yes":"No", &my_charset_bin); + protocol->store(mi->ssl_verify_server_cert? &msg_yes : &msg_no, + &my_charset_bin); // Last_IO_Errno protocol->store(mi->last_error().number); // Last_IO_Error - protocol->store(mi->last_error().message, &my_charset_bin); + protocol->store_string_or_null(mi->last_error().message, &my_charset_bin); // Last_SQL_Errno protocol->store(mi->rli.last_error().number); // Last_SQL_Error - protocol->store(mi->rli.last_error().message, &my_charset_bin); + protocol->store_string_or_null(mi->rli.last_error().message, + &my_charset_bin); // Replicate_Ignore_Server_Ids prot_store_ids(thd, &mi->ignore_server_ids); // Master_Server_id protocol->store((uint32) mi->master_id); // SQL_Delay // Master_Ssl_Crl - protocol->store(mi->ssl_crl, &my_charset_bin); + protocol->store_string_or_null(mi->ssl_crl, &my_charset_bin); // Master_Ssl_Crlpath - protocol->store(mi->ssl_crlpath, &my_charset_bin); + protocol->store_string_or_null(mi->ssl_crlpath, &my_charset_bin); // Using_Gtid - protocol->store(mi->using_gtid_astext(mi->using_gtid), &my_charset_bin); + protocol->store_string_or_null(mi->using_gtid_astext(mi->using_gtid), + &my_charset_bin); // Gtid_IO_Pos { mi->gtid_current_pos.to_string(&tmp); @@ -3313,7 +3343,7 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full, else protocol->store_null(); // Slave_SQL_Running_State - protocol->store(slave_sql_running_state, &my_charset_bin); + protocol->store_string_or_null(slave_sql_running_state, &my_charset_bin); protocol->store(mi->total_ddl_groups); protocol->store(mi->total_non_trans_groups); @@ -4329,6 +4359,8 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, DBUG_RETURN(1); } + rli->last_seen_gtid= serial_rgi->current_gtid; + rli->last_trans_retry_count= serial_rgi->trans_retries; if (opt_gtid_ignore_duplicates && rli->mi->using_gtid != Master_info::USE_GTID_NO) { @@ -4481,7 +4513,7 @@ Could not parse relay log event entry. The possible reasons are: the master's \ binary log is corrupted (you can check this by running 'mysqlbinlog' on the \ binary log), the slave's relay log is corrupted (you can check this by running \ 'mysqlbinlog' on the relay log), a network problem, or a bug in the master's \ -or slave's MySQL code. If you want to check the master's binary log or slave's \ +or slave's MariaDB code. If you want to check the master's binary log or slave's \ relay log, you will be able to know their names by issuing 'SHOW SLAVE STATUS' \ on this slave.\ "); @@ -4804,6 +4836,8 @@ connected: thd->set_command(COM_SLAVE_IO); while (!io_slave_killed(mi)) { + const uchar *event_buf; + THD_STAGE_INFO(thd, stage_requesting_binlog_dump); if (request_dump(thd, mysql, mi, &suppress_warnings)) { @@ -4815,8 +4849,6 @@ connected: goto connected; } - const char *event_buf; - mi->slave_running= MYSQL_SLAVE_RUN_READING; DBUG_ASSERT(mi->last_error().number == 0); ulonglong lastchecktime = my_hrtime().val; @@ -4868,10 +4900,11 @@ Stopping slave I/O thread due to out-of-memory error from master"); retry_count=0; // ok event, reset retry counter THD_STAGE_INFO(thd, stage_queueing_master_event_to_the_relay_log); - event_buf= (const char*)mysql->net.read_pos + 1; + event_buf= mysql->net.read_pos + 1; mi->semi_ack= 0; if (repl_semisync_slave. - slave_read_sync_header((const char*)mysql->net.read_pos + 1, event_len, + slave_read_sync_header((const uchar*) mysql->net.read_pos + 1, + event_len, &(mi->semi_ack), &event_buf, &event_len)) { mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL, @@ -4993,20 +5026,17 @@ log space"); err: // print the current replication position if (mi->using_gtid == Master_info::USE_GTID_NO) - { sql_print_information("Slave I/O thread exiting, read up to log '%s', " - "position %llu", IO_RPL_LOG_NAME, mi->master_log_pos); - sql_print_information("master was %s:%d", mi->host, mi->port); - } + "position %llu, master %s:%d", IO_RPL_LOG_NAME, mi->master_log_pos, + mi->host, mi->port); else { StringBuffer<100> tmp; mi->gtid_current_pos.to_string(&tmp); sql_print_information("Slave I/O thread exiting, read up to log '%s', " - "position %llu; GTID position %s", + "position %llu; GTID position %s, master %s:%d", IO_RPL_LOG_NAME, mi->master_log_pos, - tmp.c_ptr_safe()); - sql_print_information("master was %s:%d", mi->host, mi->port); + tmp.c_ptr_safe(), mi->host, mi->port); } repl_semisync_slave.slave_stop(mi); thd->reset_query(); @@ -5048,6 +5078,7 @@ err_during_init: mi->abort_slave= 0; mi->slave_running= MYSQL_SLAVE_NOT_RUN; mi->io_thd= 0; + mi->do_accept_own_server_id= false; /* Note: the order of the two following calls (first broadcast, then unlock) is important. Otherwise a killer_thread can execute between the calls and @@ -5351,6 +5382,7 @@ pthread_handler_t handle_slave_sql(void *arg) serial_rgi->gtid_sub_id= 0; serial_rgi->gtid_pending= false; + rli->last_seen_gtid= serial_rgi->current_gtid; if (mi->using_gtid != Master_info::USE_GTID_NO && mi->using_parallel() && rli->restart_gtid_pos.count() > 0) { @@ -5608,9 +5640,9 @@ pthread_handler_t handle_slave_sql(void *arg) tmp.append(STRING_WITH_LEN("'")); } sql_print_information("Slave SQL thread exiting, replication stopped in " - "log '%s' at position %llu%s", RPL_LOG_NAME, - rli->group_master_log_pos, tmp.c_ptr_safe()); - sql_print_information("master was %s:%d", mi->host, mi->port); + "log '%s' at position %llu%s, master: %s:%d", RPL_LOG_NAME, + rli->group_master_log_pos, tmp.c_ptr_safe(), + mi->host, mi->port); } #ifdef WITH_WSREP wsrep_after_command_before_result(thd); @@ -5952,13 +5984,13 @@ static int process_io_rotate(Master_info *mi, Rotate_log_event *rev) Reads a 3.23 event and converts it to the slave's format. This code was copied from MySQL 4.0. */ -static int queue_binlog_ver_1_event(Master_info *mi, const char *buf, - ulong event_len) +static int queue_binlog_ver_1_event(Master_info *mi, const uchar *buf, + ulong event_len) { const char *errmsg = 0; ulong inc_pos; bool ignore_event= 0; - char *tmp_buf = 0; + uchar *tmp_buf = 0; Relay_log_info *rli= &mi->rli; DBUG_ENTER("queue_binlog_ver_1_event"); @@ -5968,8 +6000,8 @@ static int queue_binlog_ver_1_event(Master_info *mi, const char *buf, */ if ((uchar)buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) { - if (unlikely(!(tmp_buf=(char*)my_malloc(key_memory_binlog_ver_1_event, - event_len+1,MYF(MY_WME))))) + if (unlikely(!(tmp_buf= (uchar*) my_malloc(key_memory_binlog_ver_1_event, + event_len+1, MYF(MY_WME))))) { mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL, ER(ER_SLAVE_FATAL_ERROR), "Memory allocation failed"); @@ -5985,7 +6017,7 @@ static int queue_binlog_ver_1_event(Master_info *mi, const char *buf, */ tmp_buf[event_len++]=0; int4store(tmp_buf+EVENT_LEN_OFFSET, event_len); - buf = (const char*)tmp_buf; + buf= tmp_buf; } /* This will transform LOAD_EVENT into CREATE_FILE_EVENT, ask the master to @@ -6072,8 +6104,8 @@ static int queue_binlog_ver_1_event(Master_info *mi, const char *buf, Reads a 4.0 event and converts it to the slave's format. This code was copied from queue_binlog_ver_1_event(), with some affordable simplifications. */ -static int queue_binlog_ver_3_event(Master_info *mi, const char *buf, - ulong event_len) +static int queue_binlog_ver_3_event(Master_info *mi, const uchar *buf, + ulong event_len) { const char *errmsg = 0; ulong inc_pos; @@ -6083,7 +6115,7 @@ static int queue_binlog_ver_3_event(Master_info *mi, const char *buf, /* read_log_event() will adjust log_pos to be end_log_pos */ Log_event *ev= - Log_event::read_log_event(buf,event_len, &errmsg, + Log_event::read_log_event(buf, event_len, &errmsg, mi->rli.relay_log.description_event_for_queue, 0); if (unlikely(!ev)) { @@ -6138,13 +6170,11 @@ err: setup with 3.23 master or 4.0 master */ -static int queue_old_event(Master_info *mi, const char *buf, - ulong event_len) +static int queue_old_event(Master_info *mi, const uchar *buf, ulong event_len) { DBUG_ENTER("queue_old_event"); - switch (mi->rli.relay_log.description_event_for_queue->binlog_version) - { + switch (mi->rli.relay_log.description_event_for_queue->binlog_version) { case 1: DBUG_RETURN(queue_binlog_ver_1_event(mi,buf,event_len)); case 3: @@ -6166,7 +6196,7 @@ static int queue_old_event(Master_info *mi, const char *buf, any >=5.0.0 format. */ -static int queue_event(Master_info* mi,const char* buf, ulong event_len) +static int queue_event(Master_info* mi, const uchar *buf, ulong event_len) { int error= 0; StringBuffer<1024> error_msg; @@ -6181,8 +6211,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) rpl_gtid event_gtid; static uint dbug_rows_event_count __attribute__((unused))= 0; bool is_compress_event = false; - char* new_buf = NULL; - char new_buf_arr[4096]; + uchar *new_buf = NULL; + uchar new_buf_arr[4096]; bool is_malloc = false; bool is_rows_event= false; /* @@ -6195,8 +6225,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) mi->checksum_alg_before_fd != BINLOG_CHECKSUM_ALG_UNDEF ? mi->checksum_alg_before_fd : mi->rli.relay_log.relay_log_checksum_alg; - char *save_buf= NULL; // needed for checksumming the fake Rotate event - char rot_buf[LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN + FN_REFLEN]; + const uchar *save_buf= NULL; // needed for checksumming the fake Rotate event + uchar rot_buf[LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN + FN_REFLEN]; DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_OFF || checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF || @@ -6230,9 +6260,9 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) // Emulate the network corruption DBUG_EXECUTE_IF("corrupt_queue_event", - if ((uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT) + if (buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT) { - char *debug_event_buf_c = (char*) buf; + uchar *debug_event_buf_c= const_cast<uchar*>(buf); int debug_cor_pos = rand() % (event_len - BINLOG_CHECKSUM_LEN); debug_event_buf_c[debug_cor_pos] =~ debug_event_buf_c[debug_cor_pos]; DBUG_PRINT("info", ("Corrupt the event at queue_event: byte on position %d", debug_cor_pos)); @@ -6240,15 +6270,16 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) } ); - if (event_checksum_test((uchar *) buf, event_len, checksum_alg)) + if (event_checksum_test((uchar*) buf, event_len, checksum_alg)) { error= ER_NETWORK_READ_EVENT_CHECKSUM_FAILURE; unlock_data_lock= FALSE; goto err; } + DBUG_ASSERT(((uchar) buf[FLAGS_OFFSET] & LOG_EVENT_ACCEPT_OWN_F) == 0); if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 && - (uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */) + buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */) DBUG_RETURN(queue_old_event(mi,buf,event_len)); #ifdef ENABLED_DEBUG_SYNC @@ -6272,9 +6303,11 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) dbug_rows_event_count = 0; };); #endif + s_id= uint4korr(buf + SERVER_ID_OFFSET); + mysql_mutex_lock(&mi->data_lock); - switch ((uchar)buf[EVENT_TYPE_OFFSET]) { + switch (buf[EVENT_TYPE_OFFSET]) { case STOP_EVENT: /* We needn't write this event to the relay log. Indeed, it just indicates a @@ -6409,7 +6442,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) mi->rli.relay_log.relay_log_checksum_alg); /* the first one */ DBUG_ASSERT(mi->checksum_alg_before_fd != BINLOG_CHECKSUM_ALG_UNDEF); - save_buf= (char *) buf; + save_buf= buf; buf= rot_buf; } else @@ -6429,7 +6462,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) mi->rli.relay_log.relay_log_checksum_alg); /* the first one */ DBUG_ASSERT(mi->checksum_alg_before_fd != BINLOG_CHECKSUM_ALG_UNDEF); - save_buf= (char *) buf; + save_buf= buf; buf= rot_buf; } /* @@ -6514,7 +6547,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) error= ER_SLAVE_HEARTBEAT_FAILURE; error_msg.append(STRING_WITH_LEN("inconsistent heartbeat event content;")); error_msg.append(STRING_WITH_LEN("the event's data: log_file_name ")); - error_msg.append(hb.get_log_ident(), (uint) hb.get_ident_len()); + error_msg.append((char*) hb.get_log_ident(), (uint) hb.get_ident_len()); error_msg.append(STRING_WITH_LEN(" log_pos ")); error_msg.append_ulonglong(hb.log_pos); goto err; @@ -6540,7 +6573,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) error= ER_SLAVE_HEARTBEAT_FAILURE; error_msg.append(STRING_WITH_LEN("heartbeat is not compatible with local info;")); error_msg.append(STRING_WITH_LEN("the event's data: log_file_name ")); - error_msg.append(hb.get_log_ident(), (uint) hb.get_ident_len()); + error_msg.append((char*) hb.get_log_ident(), (uint) hb.get_ident_len()); error_msg.append(STRING_WITH_LEN(" log_pos ")); error_msg.append_ulonglong(hb.log_pos); goto err; @@ -6771,6 +6804,19 @@ dbug_gtid_accept: ++mi->events_queued_since_last_gtid; inc_pos= event_len; + + /* + To compute `true` is normal for this *now* semisync slave server when + it has passed its crash-recovery as a former master. + */ + mi->do_accept_own_server_id= + (s_id == global_system_variables.server_id && + rpl_semi_sync_slave_enabled && opt_gtid_strict_mode && + mi->using_gtid != Master_info::USE_GTID_NO && + !mysql_bin_log.check_strict_gtid_sequence(event_gtid.domain_id, + event_gtid.server_id, + event_gtid.seq_no, + true)); // ...} eof else_likely } break; @@ -6782,7 +6828,7 @@ dbug_gtid_accept: if (query_event_uncompress(rli->relay_log.description_event_for_queue, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, buf, event_len, new_buf_arr, sizeof(new_buf_arr), - &is_malloc, (char **)&new_buf, &event_len)) + &is_malloc, &new_buf, &event_len)) { char llbuf[22]; error = ER_BINLOG_UNCOMPRESS_ERROR; @@ -6805,8 +6851,9 @@ dbug_gtid_accept: { if (row_log_event_uncompress(rli->relay_log.description_event_for_queue, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, - buf, event_len, new_buf_arr, sizeof(new_buf_arr), - &is_malloc, (char **)&new_buf, &event_len)) + buf, event_len, new_buf_arr, + sizeof(new_buf_arr), + &is_malloc, &new_buf, &event_len)) { char llbuf[22]; error = ER_BINLOG_UNCOMPRESS_ERROR; @@ -6893,7 +6940,8 @@ dbug_gtid_accept: { if ((uchar)buf[EVENT_TYPE_OFFSET] == XID_EVENT || ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT && /* QUERY_COMPRESSED_EVENT would never be commmit or rollback */ - Query_log_event::peek_is_commit_rollback(buf, event_len, + Query_log_event::peek_is_commit_rollback(buf, + event_len, checksum_alg))) { error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; @@ -6996,7 +7044,6 @@ dbug_gtid_accept: */ mysql_mutex_lock(log_lock); - s_id= uint4korr(buf + SERVER_ID_OFFSET); /* Write the event to the relay log, unless we reconnected in the middle of an event group and now need to skip the initial part of the group that @@ -7041,7 +7088,8 @@ dbug_gtid_accept: } else if ((s_id == global_system_variables.server_id && - !mi->rli.replicate_same_server_id) || + !(mi->rli.replicate_same_server_id || + mi->do_accept_own_server_id)) || event_that_should_be_ignored(buf) || /* the following conjunction deals with IGNORE_SERVER_IDS, if set @@ -7101,6 +7149,19 @@ dbug_gtid_accept: } else { + if (mi->do_accept_own_server_id) + { + int2store(const_cast<uchar*>(buf + FLAGS_OFFSET), + uint2korr(buf + FLAGS_OFFSET) | LOG_EVENT_ACCEPT_OWN_F); + if (checksum_alg != BINLOG_CHECKSUM_ALG_OFF) + { + ha_checksum crc= 0; + + crc= my_checksum(crc, (const uchar *) buf, + event_len - BINLOG_CHECKSUM_LEN); + int4store(&buf[event_len - BINLOG_CHECKSUM_LEN], crc); + } + } if (likely(!rli->relay_log.write_event_buffer((uchar*)buf, event_len))) { mi->master_log_pos+= inc_pos; @@ -7320,16 +7381,16 @@ static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi, charset, then set client charset to 'latin1' (default client charset). */ if (is_supported_parser_charset(default_charset_info)) - mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname); + mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->cs_name.str); else { sql_print_information("'%s' can not be used as client character set. " "'%s' will be used as default client character set " "while connecting to master.", - default_charset_info->csname, - default_client_charset_info->csname); + default_charset_info->cs_name.str, + default_client_charset_info->cs_name.str); mysql_options(mysql, MYSQL_SET_CHARSET_NAME, - default_client_charset_info->csname); + default_client_charset_info->cs_name.str); } /* This one is not strictly needed but we have it here for completeness */ @@ -7476,7 +7537,8 @@ MYSQL *rpl_connect_master(MYSQL *mysql) } #endif - mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname); + mysql_options(mysql, MYSQL_SET_CHARSET_NAME, + default_charset_info->cs_name.str); /* This one is not strictly needed but we have it here for completeness */ mysql_options(mysql, MYSQL_SET_CHARSET_DIR, (char *) charsets_dir); @@ -8191,19 +8253,19 @@ bool rpl_master_erroneous_autoinc(THD *thd) } -static bool get_row_event_stmt_end(const char* buf, +static bool get_row_event_stmt_end(const uchar *buf, const Format_description_log_event *fdle) { uint8 const common_header_len= fdle->common_header_len; Log_event_type event_type= (Log_event_type)(uchar)buf[EVENT_TYPE_OFFSET]; uint8 const post_header_len= fdle->post_header_len[event_type-1]; - const char *flag_start= buf + common_header_len; + const uchar *flag_start= buf + common_header_len; /* The term 4 below signifies that master is of 'an intermediate source', see Rows_log_event::Rows_log_event. */ - flag_start += RW_MAPID_OFFSET + ((post_header_len == 6) ? 4 : RW_FLAGS_OFFSET); + flag_start += RW_MAPID_OFFSET + ((post_header_len == 6) ? 4 : RW_FLAGS_OFFSET); return (uint2korr(flag_start) & Rows_log_event::STMT_END_F) != 0; } @@ -8228,8 +8290,8 @@ void Rows_event_tracker::reset() well as the end-of-statement status of the last one. */ -void Rows_event_tracker::update(const char* file_name, my_off_t pos, - const char* buf, +void Rows_event_tracker::update(const char *file_name, my_off_t pos, + const uchar *buf, const Format_description_log_event *fdle) { DBUG_ENTER("Rows_event_tracker::update"); |