diff options
Diffstat (limited to 'sql/sql_repl.cc')
-rw-r--r-- | sql/sql_repl.cc | 51 |
1 files changed, 40 insertions, 11 deletions
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 8b6ba0e44e5..bce687ebcb9 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -1,4 +1,5 @@ -/* Copyright (C) 2000-2006 MySQL AB & Sasha +/* Copyright (C) 2000, 2011, Oracle and/or its affiliates. + Copyright (c) 2009-2011, Monty Program Ab This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -21,6 +22,7 @@ #include "log_event.h" #include "rpl_filter.h" #include <my_dir.h> +#include "debug_sync.h" int max_binlog_dump_events = 0; // unlimited my_bool opt_sporadic_binlog_dump_fail = 0; @@ -464,11 +466,10 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, pthread_mutex_t *log_lock; bool binlog_can_be_corrupted= FALSE; uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF; - + int old_max_allowed_packet= thd->variables.max_allowed_packet; #ifndef DBUG_OFF int left_events = max_binlog_dump_events; #endif - int old_max_allowed_packet= thd->variables.max_allowed_packet; DBUG_ENTER("mysql_binlog_send"); DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos)); @@ -563,7 +564,7 @@ impossible position"; and fake Rotates. */ if (fake_rotate_event(net, packet, log_file_name, pos, &errmsg, - get_binlog_checksum_value_at_connect(current_thd))) + get_binlog_checksum_value_at_connect(thd))) { /* This error code is not perfect, as fake_rotate_event() does not @@ -680,9 +681,11 @@ impossible position"; while (!net->error && net->vio != 0 && !thd->killed) { + my_off_t prev_pos= pos; while (!(error = Log_event::read_log_event(&log, packet, log_lock, current_checksum_alg))) { + prev_pos= my_b_tell(&log); #ifndef DBUG_OFF if (max_binlog_dump_events && !left_events--) { @@ -692,8 +695,21 @@ impossible position"; goto err; } #endif - - if ((uchar)(*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT) + DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid", + { + if ((*packet)[EVENT_TYPE_OFFSET+1] == XID_EVENT) + { + net_flush(net); + const char act[]= + "now " + "wait_for signal.continue"; + DBUG_ASSERT(opt_debug_sync_timeout > 0); + DBUG_ASSERT(!debug_sync_set_action(thd, + STRING_WITH_LEN(act))); + } + }); + + if ((uchar) (*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT) { current_checksum_alg= get_checksum_alg(packet->ptr() + 1, packet->length() - 1); @@ -729,15 +745,23 @@ impossible position"; goto err; } + DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid", + { + if ((*packet)[EVENT_TYPE_OFFSET+1] == XID_EVENT) + { + net_flush(net); + } + }); + DBUG_PRINT("info", ("log event code %d", (*packet)[LOG_EVENT_OFFSET+1] )); - if ((uchar)(*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT) + if ((uchar) (*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT) { if (send_file(thd)) { - errmsg = "failed in send_file()"; - my_errno= ER_UNKNOWN_ERROR; - goto err; + errmsg = "failed in send_file()"; + my_errno= ER_UNKNOWN_ERROR; + goto err; } } } @@ -749,8 +773,13 @@ impossible position"; of a crash ?). treat any corruption as EOF */ if (binlog_can_be_corrupted && - (error != LOG_READ_MEM && error != LOG_READ_CHECKSUM_FAILURE)) + (error != LOG_READ_MEM && error != LOG_READ_CHECKSUM_FAILURE && + error != LOG_READ_EOF)) + { + my_b_seek(&log, prev_pos); error=LOG_READ_EOF; + } + /* TODO: now that we are logging the offset, check to make sure the recorded offset and the actual match. |