summaryrefslogtreecommitdiff
path: root/sql/sql_repl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/sql_repl.cc')
-rw-r--r--sql/sql_repl.cc51
1 files changed, 40 insertions, 11 deletions
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 8b6ba0e44e5..bce687ebcb9 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -1,4 +1,5 @@
-/* Copyright (C) 2000-2006 MySQL AB & Sasha
+/* Copyright (C) 2000, 2011, Oracle and/or its affiliates.
+ Copyright (c) 2009-2011, Monty Program Ab
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -21,6 +22,7 @@
#include "log_event.h"
#include "rpl_filter.h"
#include <my_dir.h>
+#include "debug_sync.h"
int max_binlog_dump_events = 0; // unlimited
my_bool opt_sporadic_binlog_dump_fail = 0;
@@ -464,11 +466,10 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
pthread_mutex_t *log_lock;
bool binlog_can_be_corrupted= FALSE;
uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
-
+ int old_max_allowed_packet= thd->variables.max_allowed_packet;
#ifndef DBUG_OFF
int left_events = max_binlog_dump_events;
#endif
- int old_max_allowed_packet= thd->variables.max_allowed_packet;
DBUG_ENTER("mysql_binlog_send");
DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos));
@@ -563,7 +564,7 @@ impossible position";
and fake Rotates.
*/
if (fake_rotate_event(net, packet, log_file_name, pos, &errmsg,
- get_binlog_checksum_value_at_connect(current_thd)))
+ get_binlog_checksum_value_at_connect(thd)))
{
/*
This error code is not perfect, as fake_rotate_event() does not
@@ -680,9 +681,11 @@ impossible position";
while (!net->error && net->vio != 0 && !thd->killed)
{
+ my_off_t prev_pos= pos;
while (!(error = Log_event::read_log_event(&log, packet, log_lock,
current_checksum_alg)))
{
+ prev_pos= my_b_tell(&log);
#ifndef DBUG_OFF
if (max_binlog_dump_events && !left_events--)
{
@@ -692,8 +695,21 @@ impossible position";
goto err;
}
#endif
-
- if ((uchar)(*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
+ DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid",
+ {
+ if ((*packet)[EVENT_TYPE_OFFSET+1] == XID_EVENT)
+ {
+ net_flush(net);
+ const char act[]=
+ "now "
+ "wait_for signal.continue";
+ DBUG_ASSERT(opt_debug_sync_timeout > 0);
+ DBUG_ASSERT(!debug_sync_set_action(thd,
+ STRING_WITH_LEN(act)));
+ }
+ });
+
+ if ((uchar) (*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
{
current_checksum_alg= get_checksum_alg(packet->ptr() + 1,
packet->length() - 1);
@@ -729,15 +745,23 @@ impossible position";
goto err;
}
+ DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid",
+ {
+ if ((*packet)[EVENT_TYPE_OFFSET+1] == XID_EVENT)
+ {
+ net_flush(net);
+ }
+ });
+
DBUG_PRINT("info", ("log event code %d",
(*packet)[LOG_EVENT_OFFSET+1] ));
- if ((uchar)(*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
+ if ((uchar) (*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
{
if (send_file(thd))
{
- errmsg = "failed in send_file()";
- my_errno= ER_UNKNOWN_ERROR;
- goto err;
+ errmsg = "failed in send_file()";
+ my_errno= ER_UNKNOWN_ERROR;
+ goto err;
}
}
}
@@ -749,8 +773,13 @@ impossible position";
of a crash ?). treat any corruption as EOF
*/
if (binlog_can_be_corrupted &&
- (error != LOG_READ_MEM && error != LOG_READ_CHECKSUM_FAILURE))
+ (error != LOG_READ_MEM && error != LOG_READ_CHECKSUM_FAILURE &&
+ error != LOG_READ_EOF))
+ {
+ my_b_seek(&log, prev_pos);
error=LOG_READ_EOF;
+ }
+
/*
TODO: now that we are logging the offset, check to make sure
the recorded offset and the actual match.