summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/ha_innodb.cc287
-rw-r--r--sql/ha_innodb.h2
-rw-r--r--sql/handler.cc57
-rw-r--r--sql/handler.h6
-rw-r--r--sql/mysqld.cc2
-rw-r--r--sql/set_var.cc2
-rw-r--r--sql/sql_repl.cc37
7 files changed, 382 insertions, 11 deletions
diff --git a/sql/ha_innodb.cc b/sql/ha_innodb.cc
index 3f592e36219..e68a85bdac9 100644
--- a/sql/ha_innodb.cc
+++ b/sql/ha_innodb.cc
@@ -45,10 +45,58 @@ have disables the InnoDB inlining in this file. */
#include "ha_innodb.h"
-pthread_mutex_t innobase_share_mutex, // to protect innobase_open_files
- prepare_commit_mutex; // to force correct commit order in binlog
+pthread_mutex_t innobase_share_mutex, /* to protect innobase_open_files */
+ prepare_commit_mutex; /* to force correct commit order in
+ binlog */
bool innodb_inited= 0;
+/*-----------------------------------------------------------------*/
+/* These variables are used to implement (semi-)synchronous MySQL binlog
+replication for InnoDB tables. */
+
+pthread_cond_t innobase_repl_cond; /* Posix cond variable;
+ this variable is signaled
+ when enough binlog has been
+ sent to slave, so that a
+ waiting trx can return the
+ 'ok' message to the client
+ for a commit */
+pthread_mutex_t innobase_repl_cond_mutex; /* Posix cond variable mutex
+ that also protects the next
+ innobase_repl_... variables */
+uint innobase_repl_state; /* 1 if synchronous replication
+ is switched on and is working
+ ok; else 0 */
+uint innobase_repl_file_name_inited = 0; /* This is set to 1 when
+ innobase_repl_file_name
+ contains meaningful data */
+char* innobase_repl_file_name; /* The binlog name up to which
+ we have sent some binlog to
+ the slave */
+my_off_t innobase_repl_pos; /* The position in that file
+ up to which we have sent the
+ binlog to the slave */
+uint innobase_repl_n_wait_threads = 0; /* This tells how many
+ transactions currently are
+ waiting for the binlog to be
+ sent to the client */
+uint innobase_repl_wait_file_name_inited = 0; /* This is set to 1
+ when we know the 'smallest'
+ wait position */
+char* innobase_repl_wait_file_name; /* NULL, or the 'smallest'
+ innobase_repl_file_name that
+ a transaction is waiting for */
+my_off_t innobase_repl_wait_pos; /* The smallest position in
+ that file that a trx is
+ waiting for: the trx can
+ proceed and send an 'ok' to
+ the client when MySQL has sent
+ the binlog up to this position
+ to the slave */
+/*-----------------------------------------------------------------*/
+
+
+
/* Store MySQL definition of 'byte': in Linux it is char while InnoDB
uses unsigned char; the header univ.i which we include next defines
'byte' as a macro which expands to 'unsigned char' */
@@ -97,7 +145,7 @@ long innobase_mirrored_log_groups, innobase_log_files_in_group,
innobase_log_file_size, innobase_log_buffer_size,
innobase_buffer_pool_awe_mem_mb,
innobase_buffer_pool_size, innobase_additional_mem_pool_size,
- innobase_file_io_threads, innobase_lock_wait_timeout,
+ innobase_file_io_threads, innobase_lock_wait_timeout,
innobase_thread_concurrency, innobase_force_recovery,
innobase_open_files;
@@ -1531,10 +1579,10 @@ innobase_commit(
DBUG_RETURN(0);
}
-/* The following defined-out code will be enabled later when we put the
+/* TODO: put the
MySQL-4.1 functionality back to 5.0. This is needed to get InnoDB Hot Backup
to work. */
-#if 0
+
/*********************************************************************
This is called when MySQL writes the binlog entry for the current
transaction. Writes to the InnoDB tablespace info which tells where the
@@ -1563,6 +1611,24 @@ innobase_report_binlog_offset_and_commit(
trx->mysql_log_file_name = log_file_name;
trx->mysql_log_offset = (ib_longlong)end_offset;
+ if (thd->variables.sync_replication) {
+ /* Let us store the binlog file name and the position, so that
+ we know how long to wait for the binlog to the replicated to
+ the slave in synchronous replication. */
+
+ if (trx->repl_wait_binlog_name == NULL) {
+
+ trx->repl_wait_binlog_name =
+ (char*)mem_alloc(FN_REFLEN + 100);
+ }
+
+ ut_a(strlen(log_file_name) <= FN_REFLEN + 100);
+
+ strcpy(trx->repl_wait_binlog_name, log_file_name);
+
+ trx->repl_wait_binlog_pos = (ib_longlong)end_offset;
+ }
+
trx->flush_log_later = TRUE;
innobase_commit(thd, trx_handle);
@@ -1572,6 +1638,7 @@ innobase_report_binlog_offset_and_commit(
return(0);
}
+#if 0
/***********************************************************************
This function stores the binlog offset and flushes logs. */
@@ -1602,7 +1669,6 @@ innobase_store_binlog_offset_and_flush_log(
/* Syncronous flush of the log buffer to disk */
log_buffer_flush_to_disk();
}
-
#endif
/*********************************************************************
@@ -1615,7 +1681,10 @@ innobase_commit_complete(
/* out: 0 */
THD* thd) /* in: user thread */
{
+ struct timespec abstime;
trx_t* trx;
+ int cmp;
+ int ret;
trx = (trx_t*) thd->ha_data[innobase_hton.slot];
@@ -1631,10 +1700,216 @@ innobase_commit_complete(
trx_commit_complete_for_mysql(trx);
}
+ printf("Wait binlog name %s, repl state %lu\n",
+ trx->repl_wait_binlog_name,
+ (uint)innobase_repl_state);
+
+ if (thd->variables.sync_replication
+ && trx->repl_wait_binlog_name
+ && innobase_repl_state != 0) {
+
+ /* In synchronous replication, let us wait until the MySQL
+ replication has sent the relevant binlog segment to the
+ replication slave. */
+
+/* TODO: Make sure MySQL uses some way (TCP_NODELAY?) to ensure that the data
+has been received in the slave! */
+
+ pthread_mutex_lock(&innobase_repl_cond_mutex);
+try_again:
+ if (innobase_repl_state == 0) {
+
+ pthread_mutex_unlock(&innobase_repl_cond_mutex);
+
+ return(0);
+ }
+
+ cmp = strcmp(innobase_repl_file_name,
+ trx->repl_wait_binlog_name);
+ if (cmp > 0
+ || (cmp == 0 && innobase_repl_pos
+ >= (my_off_t)trx->repl_wait_binlog_pos)) {
+ /* We have already sent the relevant binlog to the
+ slave: no need to wait here */
+
+ pthread_mutex_unlock(&innobase_repl_cond_mutex);
+
+/* printf("Binlog now sent\n"); */
+
+ return(0);
+ }
+
+ /* Let us update the info about the minimum binlog position
+ of waiting threads in the innobase_repl_... variables */
+
+ if (innobase_repl_wait_file_name_inited != 0) {
+ cmp = strcmp(trx->repl_wait_binlog_name,
+ innobase_repl_wait_file_name);
+ if (cmp < 0
+ || (cmp == 0 && (my_off_t)trx->repl_wait_binlog_pos
+ <= innobase_repl_wait_pos)) {
+ /* This thd has an even lower position, let
+ us update the minimum info */
+
+ strcpy(innobase_repl_wait_file_name,
+ trx->repl_wait_binlog_name);
+
+ innobase_repl_wait_pos =
+ trx->repl_wait_binlog_pos;
+ }
+ } else {
+ strcpy(innobase_repl_wait_file_name,
+ trx->repl_wait_binlog_name);
+
+ innobase_repl_wait_pos = trx->repl_wait_binlog_pos;
+
+ innobase_repl_wait_file_name_inited = 1;
+ }
+ set_timespec(abstime, thd->variables.sync_replication_timeout);
+
+ /* Let us suspend this thread to wait on the condition;
+ when replication has progressed far enough, we will release
+ these waiting threads. The following call
+ pthread_cond_timedwait also atomically unlocks
+ innobase_repl_cond_mutex. */
+
+ innobase_repl_n_wait_threads++;
+
+/* printf("Waiting for binlog to be sent\n"); */
+
+ ret = pthread_cond_timedwait(&innobase_repl_cond,
+ &innobase_repl_cond_mutex, &abstime);
+ innobase_repl_n_wait_threads--;
+
+ if (ret != 0) {
+ ut_print_timestamp(stderr);
+
+ fprintf(stderr,
+" InnoDB: Error: MySQL synchronous replication\n"
+"InnoDB: was not able to send the binlog to the slave within the\n"
+"InnoDB: timeout %lu. We assume that the slave has become inaccessible,\n"
+"InnoDB: and switch off synchronous replication until the communication.\n"
+"InnoDB: to the slave works again.\n",
+ thd->variables.sync_replication_timeout);
+ fprintf(stderr,
+"InnoDB: MySQL synchronous replication has sent binlog\n"
+"InnoDB: to the slave up to file %s, position %lu\n", innobase_repl_file_name,
+ (ulong)innobase_repl_pos);
+ fprintf(stderr,
+"InnoDB: This transaction needs it to be sent up to\n"
+"InnoDB: file %s, position %lu\n", trx->repl_wait_binlog_name,
+ (uint)trx->repl_wait_binlog_pos);
+
+ innobase_repl_state = 0;
+
+ pthread_mutex_unlock(&innobase_repl_cond_mutex);
+
+ return(0);
+ }
+
+ goto try_again;
+ }
+
return(0);
}
/*********************************************************************
+In synchronous replication, reports to InnoDB up to which binlog position
+we have sent the binlog to the slave. Note that replication is synchronous
+for one slave only. For other slaves, we do nothing in this function. This
+function is used in a replication master. */
+
+int
+innobase_repl_report_sent_binlog(
+/*=============================*/
+ /* out: 0 */
+ THD* thd, /* in: thread doing the binlog communication to
+ the slave */
+ char* log_file_name, /* in: binlog file name */
+ my_off_t end_offset) /* in: the offset in the binlog file up to
+ which we sent the contents to the slave */
+{
+ int cmp;
+ ibool can_release_threads = 0;
+
+ /* If synchronous replication is not switched on, or this thd is
+ sending binlog to a slave where we do not need synchronous replication,
+ then return immediately */
+
+ if (thd->server_id != thd->variables.sync_replication_slave_id) {
+
+ /* Do nothing */
+
+ return(0);
+ }
+
+ pthread_mutex_lock(&innobase_repl_cond_mutex);
+
+ if (innobase_repl_state == 0) {
+
+ ut_print_timestamp(stderr);
+ fprintf(stderr,
+" InnoDB: Switching MySQL synchronous replication on again at\n"
+"InnoDB: binlog file %s, position %lu\n", log_file_name, (ulong)end_offset);
+
+ innobase_repl_state = 1;
+ }
+
+ /* The position should increase monotonically, since just one thread
+ is sending the binlog to the slave for which we want synchronous
+ replication. Let us check this, and print an error to the .err log
+ if that is not the case. */
+
+ if (innobase_repl_file_name_inited) {
+ cmp = strcmp(log_file_name, innobase_repl_file_name);
+
+ if (cmp < 0
+ || (cmp == 0 && end_offset < innobase_repl_pos)) {
+
+ ut_print_timestamp(stderr);
+ fprintf(stderr,
+" InnoDB: Error: MySQL synchronous replication has sent binlog\n"
+"InnoDB: to the slave up to file %s, position %lu\n", innobase_repl_file_name,
+ (ulong)innobase_repl_pos);
+ fprintf(stderr,
+"InnoDB: but now MySQL reports that it sent the binlog only up to\n"
+"InnoDB: file %s, position %lu\n", log_file_name, (ulong)end_offset);
+
+ }
+ }
+
+ strcpy(innobase_repl_file_name, log_file_name);
+ innobase_repl_pos = end_offset;
+ innobase_repl_file_name_inited = 1;
+
+ if (innobase_repl_n_wait_threads > 0) {
+ /* Let us check if some of the waiting threads doing a trx
+ commit can now proceed */
+
+ cmp = strcmp(innobase_repl_file_name,
+ innobase_repl_wait_file_name);
+ if (cmp > 0
+ || (cmp == 0 && innobase_repl_pos
+ >= innobase_repl_wait_pos)) {
+
+ /* Yes, at least one waiting thread can now proceed:
+ let us release all waiting threads with a broadcast */
+
+ can_release_threads = 1;
+
+ innobase_repl_wait_file_name_inited = 0;
+ }
+ }
+
+ pthread_mutex_unlock(&innobase_repl_cond_mutex);
+
+ if (can_release_threads) {
+
+ pthread_cond_broadcast(&innobase_repl_cond);
+ }
+}
+
+/*********************************************************************
Rolls back a transaction or the latest SQL statement. */
static int
diff --git a/sql/ha_innodb.h b/sql/ha_innodb.h
index 35f95ead757..6c412a889b2 100644
--- a/sql/ha_innodb.h
+++ b/sql/ha_innodb.h
@@ -321,3 +321,5 @@ int innobase_rollback_by_xid(
int innobase_xa_end(THD *thd);
+int innobase_repl_report_sent_binlog(THD *thd, char *log_file_name,
+ my_off_t end_offset);
diff --git a/sql/handler.cc b/sql/handler.cc
index 14b8974ece9..6ab4f7824ed 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -2411,3 +2411,60 @@ TYPELIB *ha_known_exts(void)
}
return &known_extensions;
}
+
+/*
+ Reports to table handlers up to which position we have sent the binlog
+ to a slave in replication
+
+ SYNOPSIS
+ ha_repl_report_sent_binlog()
+
+ NOTES
+ Only works for InnoDB at the moment
+
+ RETURN VALUE
+ Always 0 (= success)
+
+ PARAMETERS
+ THD *thd in: thread doing the binlog communication to
+ the slave
+ char *log_file_name in: binlog file name
+ my_off_t end_offset in: the offset in the binlog file up to
+ which we sent the contents to the slave
+*/
+
+int ha_repl_report_sent_binlog(THD *thd, char *log_file_name,
+ my_off_t end_offset)
+{
+#ifdef HAVE_INNOBASE_DB
+ return innobase_repl_report_sent_binlog(thd,log_file_name,end_offset);
+#else
+ /* remove warnings about unused parameters */
+ thd=thd; log_file_name=log_file_name; end_offset=end_offset;
+ return 0;
+#endif
+}
+
+/*
+ Reports to table handlers that we stop replication to a specific slave
+
+ SYNOPSIS
+ ha_repl_report_replication_stop()
+
+ NOTES
+ Does nothing at the moment
+
+ RETURN VALUE
+ Always 0 (= success)
+
+ PARAMETERS
+ THD *thd in: thread doing the binlog communication to
+ the slave
+*/
+
+int ha_repl_report_replication_stop(THD *thd)
+{
+ thd = thd;
+
+ return 0;
+}
diff --git a/sql/handler.h b/sql/handler.h
index 4c06fe8299d..5e25f038c36 100644
--- a/sql/handler.h
+++ b/sql/handler.h
@@ -843,7 +843,7 @@ int ha_change_key_cache_param(KEY_CACHE *key_cache);
int ha_change_key_cache(KEY_CACHE *old_key_cache, KEY_CACHE *new_key_cache);
int ha_end_key_cache(KEY_CACHE *key_cache);
-/* weird stuff */
+/* report to InnoDB that control passes to the client */
int ha_release_temporary_latches(THD *thd);
/* transactions: interface to handlerton functions */
@@ -875,3 +875,7 @@ void trans_register_ha(THD *thd, bool all, handlerton *ht);
#define trans_need_2pc(thd, all) ((total_ha_2pc > 1) && \
!((all ? &thd->transaction.all : &thd->transaction.stmt)->no_2pc))
+/* semi-synchronous replication */
+int ha_repl_report_sent_binlog(THD *thd, char *log_file_name,
+ my_off_t end_offset);
+int ha_repl_report_replication_stop(THD *thd);
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 9c592d068ee..214915a3cb0 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -5495,7 +5495,6 @@ The minimum value for this variable is 4096.",
{"sync-frm", OPT_SYNC_FRM, "Sync .frm to disk on create. Enabled by default.",
(gptr*) &opt_sync_frm, (gptr*) &opt_sync_frm, 0, GET_BOOL, NO_ARG, 1, 0,
0, 0, 0, 0},
-#ifdef DOES_NOTHING_YET
{"sync-replication", OPT_SYNC_REPLICATION,
"Enable synchronous replication.",
(gptr*) &global_system_variables.sync_replication,
@@ -5511,7 +5510,6 @@ The minimum value for this variable is 4096.",
(gptr*) &global_system_variables.sync_replication_timeout,
(gptr*) &global_system_variables.sync_replication_timeout,
0, GET_ULONG, REQUIRED_ARG, 10, 0, ~0L, 0, 1, 0},
-#endif
{"table_cache", OPT_TABLE_CACHE,
"The number of open tables for all threads.", (gptr*) &table_cache_size,
(gptr*) &table_cache_size, 0, GET_ULONG, REQUIRED_ARG, 64, 1, 512*1024L,
diff --git a/sql/set_var.cc b/sql/set_var.cc
index 70d64b5dac6..09fc7b20dad 100644
--- a/sql/set_var.cc
+++ b/sql/set_var.cc
@@ -957,12 +957,10 @@ struct show_var_st init_vars[]= {
{"sql_warnings", (char*) &sys_sql_warnings, SHOW_BOOL},
#ifdef HAVE_REPLICATION
{sys_sync_binlog_period.name,(char*) &sys_sync_binlog_period, SHOW_SYS},
-#ifdef DOES_NOTHING_YET
{sys_sync_replication.name, (char*) &sys_sync_replication, SHOW_SYS},
{sys_sync_replication_slave_id.name, (char*) &sys_sync_replication_slave_id,SHOW_SYS},
{sys_sync_replication_timeout.name, (char*) &sys_sync_replication_timeout,SHOW_SYS},
#endif
-#endif
{sys_sync_frm.name, (char*) &sys_sync_frm, SHOW_SYS},
#ifdef HAVE_TZNAME
{"system_time_zone", system_time_zone, SHOW_CHAR},
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 634b6ab0995..72470c487a3 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -385,6 +385,11 @@ impossible position";
goto err;
}
+ printf("Binlog file name %s\n", log_file_name);
+
+ if (thd->variables.sync_replication)
+ ha_repl_report_sent_binlog(thd, log_file_name, pos);
+
/*
We need to start a packet with something other than 255
to distinguish it from error
@@ -470,6 +475,10 @@ impossible position";
my_errno= ER_UNKNOWN_ERROR;
goto err;
}
+
+ if (thd->variables.sync_replication)
+ ha_repl_report_sent_binlog(thd, log_file_name, my_b_tell(&log));
+
/*
No need to save this event. We are only doing simple reads
(no real parsing of the events) so we don't need it. And so
@@ -527,6 +536,13 @@ impossible position";
my_errno= ER_UNKNOWN_ERROR;
goto err;
}
+
+ printf("Dump loop: %s: Current log position %lu\n", log_file_name,
+ (ulong)my_b_tell(&log));
+
+ if (thd->variables.sync_replication)
+ ha_repl_report_sent_binlog(thd, log_file_name, my_b_tell(&log));
+
DBUG_PRINT("info", ("log event code %d",
(*packet)[LOG_EVENT_OFFSET+1] ));
if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
@@ -640,6 +656,12 @@ impossible position";
goto err;
}
+ printf("Second loop: %s: Current log position %lu\n", log_file_name,
+ (ulong)my_b_tell(&log));
+
+ if (thd->variables.sync_replication)
+ ha_repl_report_sent_binlog(thd, log_file_name, my_b_tell(&log));
+
if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
{
if (send_file(thd))
@@ -704,12 +726,22 @@ impossible position";
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
goto err;
}
+
+ if (thd->variables.sync_replication)
+ ha_repl_report_sent_binlog(thd, log_file_name, 0);
+
+ printf("Binlog file name of a new binlog %s\n", log_file_name);
+
packet->length(0);
packet->append('\0');
}
}
end:
+ printf("Ending replication\n");
+ if (thd->variables.sync_replication)
+ ha_repl_report_replication_stop(thd);
+
end_io_cache(&log);
(void)my_close(file, MYF(MY_WME));
@@ -721,6 +753,11 @@ end:
DBUG_VOID_RETURN;
err:
+ if (thd->variables.sync_replication)
+ ha_repl_report_replication_stop(thd);
+
+ printf("Ending replication in error %s\n", errmsg);
+
thd->proc_info = "Waiting to finalize termination";
end_io_cache(&log);
/*