diff options
Diffstat (limited to 'sql/sql_repl.cc')
-rw-r--r-- | sql/sql_repl.cc | 236 |
1 files changed, 133 insertions, 103 deletions
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 3e652a4d5a6..9cc596f9bb5 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -180,6 +180,27 @@ err: } +/* + Adjust the position pointer in the binary log file for all running slaves + + SYNOPSIS + adjust_linfo_offsets() + purge_offset Number of bytes removed from start of log index file + + NOTES + - This is called when doing a PURGE when we delete lines from the + index log file + + REQUIREMENTS + - Before calling this function, we have to ensure that no threads are + using any binary log file before purge_offset.a + + TODO + - Inform the slave threads that they should sync the position + in the binary log file with flush_relay_log_info. + Now they sync is done for next read. +*/ + void adjust_linfo_offsets(my_off_t purge_offset) { THD *tmp; @@ -193,9 +214,10 @@ void adjust_linfo_offsets(my_off_t purge_offset) if ((linfo = tmp->current_linfo)) { pthread_mutex_lock(&linfo->lock); - /* index file offset can be less that purge offset - only if we just started reading the index file. In that case - we have nothing to adjust + /* + Index file offset can be less that purge offset only if + we just started reading the index file. In that case + we have nothing to adjust */ if (linfo->index_file_offset < purge_offset) linfo->fatal = (linfo->index_file_offset != 0); @@ -274,7 +296,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags) { LOG_INFO linfo; char *log_file_name = linfo.log_file_name; - char search_file_name[FN_REFLEN]; + char search_file_name[FN_REFLEN], *name; IO_CACHE log; File file = -1; String* packet = &thd->packet; @@ -295,7 +317,6 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags) } #endif - if (!mysql_bin_log.is_open()) { errmsg = "Binary log is not open"; @@ -307,17 +328,18 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags) goto err; } + name=search_file_name; if (log_ident[0]) mysql_bin_log.make_log_name(search_file_name, log_ident); else - search_file_name[0] = 0; + name=0; // Find first log linfo.index_file_offset = 0; thd->current_linfo = &linfo; - if (mysql_bin_log.find_first_log(&linfo, search_file_name)) + if (mysql_bin_log.find_log_pos(&linfo, name)) { - errmsg = "Could not find first log"; + errmsg = "Could not find first log file name in binary log index file"; goto err; } @@ -332,19 +354,19 @@ impossible position"; } my_b_seek(&log, pos); // Seek will done on next read - packet->length(0); - // we need to start a packet with something other than 255 - // to distiquish it from error - packet->append("\0", 1); + /* + We need to start a packet with something other than 255 + to distiquish it from error + */ + packet->set("\0", 1); // if we are at the start of the log - if (pos == 4) + if (pos == BIN_LOG_HEADER_SIZE) { // tell the client log name with a fake rotate_event if (fake_rotate_event(net, packet, log_file_name, &errmsg)) goto err; - packet->length(0); - packet->append("\0", 1); + packet->set("\0", 1); } while (!net->error && net->vio != 0 && !thd->killed) @@ -376,20 +398,21 @@ impossible position"; goto err; } } - packet->length(0); - packet->append("\0",1); + packet->set("\0", 1); } - // TODO: now that we are logging the offset, check to make sure - // the recorded offset and the actual match + /* + TODO: now that we are logging the offset, check to make sure + the recorded offset and the actual match + */ if (error != LOG_READ_EOF) { - switch(error) { + switch (error) { case LOG_READ_BOGUS: errmsg = "bogus data in log event"; break; case LOG_READ_TOO_LARGE: - errmsg = "log event entry exceeded max_allowed_packet -\ - increase max_allowed_packet on master"; + errmsg = "log event entry exceeded max_allowed_packet; \ +Increase max_allowed_packet on master"; break; case LOG_READ_IO: errmsg = "I/O error reading log event"; @@ -410,18 +433,21 @@ impossible position"; if (!(flags & BINLOG_DUMP_NON_BLOCK) && mysql_bin_log.is_active(log_file_name)) { - // block until there is more data in the log - // unless non-blocking mode requested + /* + Block until there is more data in the log + */ if (net_flush(net)) { errmsg = "failed on net_flush()"; goto err; } - // we may have missed the update broadcast from the log - // that has just happened, let's try to catch it if it did - // if we did not miss anything, we just wait for other threads - // to signal us + /* + We may have missed the update broadcast from the log + that has just happened, let's try to catch it if it did. + If we did not miss anything, we just wait for other threads + to signal us. + */ { log.error=0; bool read_packet = 0, fatal_error = 0; @@ -435,32 +461,32 @@ impossible position"; } #endif - // no one will update the log while we are reading - // now, but we'll be quick and just read one record + /* + No one will update the log while we are reading + now, but we'll be quick and just read one record + + To be able to handle EOF properly, we have to have the + pthread_mutex_unlock() statements in the case statements. + */ pthread_mutex_lock(log_lock); - switch (Log_event::read_log_event(&log, packet, (pthread_mutex_t*)0)) - { + switch (Log_event::read_log_event(&log, packet, (pthread_mutex_t*)0)) { case 0: - pthread_mutex_unlock(log_lock); + /* we read successfully, so we'll need to send it to the slave */ read_packet = 1; - // we read successfully, so we'll need to send it to the - // slave break; + case LOG_READ_EOF: DBUG_PRINT("wait",("waiting for data in binary log")); - // wait_for_update unlocks the log lock - needed to avoid race if (!thd->killed) mysql_bin_log.wait_for_update(thd); - else - pthread_mutex_unlock(log_lock); DBUG_PRINT("wait",("binary log received update")); break; default: - pthread_mutex_unlock(log_lock); fatal_error = 1; break; } + pthread_mutex_unlock(log_lock); if (read_packet) { @@ -479,10 +505,11 @@ impossible position"; goto err; } } - packet->length(0); - packet->append("\0",1); - // no need to net_flush because we will get to flush later when - // we hit EOF pretty quick + packet->set("\0", 1); + /* + No need to net_flush because we will get to flush later when + we hit EOF pretty quick + */ } if (fatal_error) @@ -539,7 +566,6 @@ impossible position"; err: thd->proc_info = "waiting to finalize termination"; end_io_cache(&log); - pthread_mutex_lock(&LOCK_thread_count); /* Exclude iteration through thread list this is needed for purge_logs() - it will iterate through @@ -547,6 +573,7 @@ impossible position"; this mutex will make sure that it never tried to update our linfo after we return from this stack frame */ + pthread_mutex_lock(&LOCK_thread_count); thd->current_linfo = 0; pthread_mutex_unlock(&LOCK_thread_count); if (file >= 0) @@ -561,16 +588,19 @@ int start_slave(THD* thd , MASTER_INFO* mi, bool net_report) if (!thd) thd = current_thd; NET* net = &thd->net; int thread_mask; + DBUG_ENTER("start_slave"); if (check_access(thd, SUPER_ACL, any_db)) - return 1; + DBUG_RETURN(1); lock_slave_threads(mi); // this allows us to cleanly read slave_running init_thread_mask(&thread_mask,mi,1 /* inverse */); if (thd->lex.slave_thd_opt) thread_mask &= thd->lex.slave_thd_opt; if (thread_mask) { - if (server_id_supplied && (!mi->inited || (mi->inited && *mi->host))) + if (init_master_info(mi,master_info_file,relay_log_info_file, 0)) + slave_errno=ER_MASTER_INFO; + else if (server_id_supplied && *mi->host) slave_errno = start_slave_threads(0 /*no mutex */, 1 /* wait for start */, mi, @@ -588,12 +618,12 @@ int start_slave(THD* thd , MASTER_INFO* mi, bool net_report) { if (net_report) send_error(net, slave_errno); - return 1; + DBUG_RETURN(1); } else if (net_report) send_ok(net); - return 0; + DBUG_RETURN(0); } int stop_slave(THD* thd, MASTER_INFO* mi, bool net_report ) @@ -628,7 +658,7 @@ int stop_slave(THD* thd, MASTER_INFO* mi, bool net_report ) return 0; } -int reset_slave(MASTER_INFO* mi) +int reset_slave(THD *thd, MASTER_INFO* mi) { MY_STAT stat_area; char fname[FN_REFLEN]; @@ -639,7 +669,9 @@ int reset_slave(MASTER_INFO* mi) lock_slave_threads(mi); init_thread_mask(&restart_thread_mask,mi,0 /* not inverse */); if ((error=terminate_slave_threads(mi,restart_thread_mask,1 /*skip lock*/)) - || (error=purge_relay_logs(&mi->rli,1 /*just reset*/,&errmsg))) + || (error=purge_relay_logs(&mi->rli, thd, + 1 /* just reset */, + &errmsg))) goto err; end_master_info(mi); @@ -713,16 +745,17 @@ int change_master(THD* thd, MASTER_INFO* mi) thd->proc_info = "changing master"; LEX_MASTER_INFO* lex_mi = &thd->lex.mi; // TODO: see if needs re-write - if (init_master_info(mi,master_info_file,relay_log_info_file)) + if (init_master_info(mi, master_info_file, relay_log_info_file, 0)) { send_error(&thd->net, 0, "Could not initialize master info"); unlock_slave_threads(mi); DBUG_RETURN(1); } - /* data lock not needed since we have already stopped the running threads, - and we have the hold on the run locks which will keep all threads that - could possibly modify the data structures from running + /* + Data lock not needed since we have already stopped the running threads, + and we have the hold on the run locks which will keep all threads that + could possibly modify the data structures from running */ if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos) { @@ -772,7 +805,8 @@ int change_master(THD* thd, MASTER_INFO* mi) { mi->rli.skip_log_purge=0; thd->proc_info="purging old relay logs"; - if (purge_relay_logs(&mi->rli,0 /* not only reset, but also reinit*/, + if (purge_relay_logs(&mi->rli, thd, + 0 /* not only reset, but also reinit */, &errmsg)) { net_printf(&thd->net, 0, "Failed purging old relay logs: %s",errmsg); @@ -782,12 +816,13 @@ int change_master(THD* thd, MASTER_INFO* mi) else { const char* msg; - if (init_relay_log_pos(&mi->rli,0/*log already inited*/, - 0 /*pos already inited*/, + /* Relay log is already initialized */ + if (init_relay_log_pos(&mi->rli, + mi->rli.relay_log_name, + mi->rli.relay_log_pos, 0 /*no data lock*/, &msg)) { - //Sasha: note that I had to change net_printf() to make this work net_printf(&thd->net,0,"Failed initializing relay log position: %s",msg); unlock_slave_threads(mi); DBUG_RETURN(1); @@ -857,26 +892,27 @@ int show_binlog_events(THD* thd) if (mysql_bin_log.is_open()) { - LOG_INFO linfo; - char search_file_name[FN_REFLEN]; - LEX_MASTER_INFO* lex_mi = &thd->lex.mi; + LEX_MASTER_INFO *lex_mi = &thd->lex.mi; uint event_count, limit_start, limit_end; - const char* log_file_name = lex_mi->log_file_name; - Log_event* ev; my_off_t pos = lex_mi->pos; + char search_file_name[FN_REFLEN], *name; + const char *log_file_name = lex_mi->log_file_name; + LOG_INFO linfo; + Log_event* ev; limit_start = thd->lex.select->offset_limit; limit_end = thd->lex.select->select_limit + limit_start; + name= search_file_name; if (log_file_name) mysql_bin_log.make_log_name(search_file_name, log_file_name); else - search_file_name[0] = 0; + name=0; // Find first log linfo.index_file_offset = 0; thd->current_linfo = &linfo; - if (mysql_bin_log.find_first_log(&linfo, search_file_name)) + if (mysql_bin_log.find_log_pos(&linfo, name)) { errmsg = "Could not find target log"; goto err; @@ -981,71 +1017,65 @@ int show_binlog_info(THD* thd) } +/* + Send a lost of all binary logs to client + + SYNOPSIS + show_binlogs() + thd Thread specific variable + + RETURN VALUES + 0 ok + 1 error (Error message sent to client) +*/ + int show_binlogs(THD* thd) { - const char* errmsg = 0; - File index_file; + const char *errmsg; + IO_CACHE *index_file; char fname[FN_REFLEN]; NET* net = &thd->net; List<Item> field_list; - String* packet = &thd->packet; - IO_CACHE io_cache; + String *packet = &thd->packet; uint length; if (!mysql_bin_log.is_open()) { - errmsg = "binlog is not open"; - goto err; + //TODO: Replace with ER() error message + errmsg= "You are not using binary logging"; + goto err_with_msg; } - field_list.push_back(new Item_empty_string("Log_name", 128)); + field_list.push_back(new Item_empty_string("Log_name", 255)); if (send_fields(thd, field_list, 1)) - { - sql_print_error("Failed in send_fields"); return 1; - } - mysql_bin_log.lock_index(); - index_file = mysql_bin_log.get_index_file(); - if (index_file < 0) - { - errmsg = "Uninitialized index file pointer"; - goto err2; - } - if (init_io_cache(&io_cache, index_file, IO_SIZE, READ_CACHE, 0, 0, - MYF(MY_WME))) - { - errmsg = "Failed on init_io_cache()"; - goto err2; - } - while ((length=my_b_gets(&io_cache, fname, sizeof(fname)))) + index_file=mysql_bin_log.get_index_file(); + + reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0); + + /* The file ends with EOF or empty line */ + while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1) { - fname[--length]=0; int dir_len = dirname_length(fname); packet->length(0); - net_store_data(packet, fname + dir_len, length-dir_len); + /* The -1 is for removing newline from fname */ + net_store_data(packet, fname + dir_len, length-1-dir_len); if (my_net_write(net, (char*) packet->ptr(), packet->length())) - { - sql_print_error("Failed in my_net_write"); - end_io_cache(&io_cache); - mysql_bin_log.unlock_index(); - return 1; - } + goto err; } - mysql_bin_log.unlock_index(); - end_io_cache(&io_cache); send_eof(net); return 0; -err2: - mysql_bin_log.unlock_index(); - end_io_cache(&io_cache); -err: +err_with_msg: send_error(net, 0, errmsg); +err: + mysql_bin_log.unlock_index(); return 1; } + int log_loaded_block(IO_CACHE* file) { LOAD_FILE_INFO* lf_info; |