summaryrefslogtreecommitdiff
path: root/plugin
diff options
context:
space:
mode:
authorHe Zhenxing <zhenxing.he@sun.com>2009-10-12 20:55:01 +0800
committerHe Zhenxing <zhenxing.he@sun.com>2009-10-12 20:55:01 +0800
commit55842061e782593608808dc5c3e95ebc652650d7 (patch)
tree0f1ff7d4db0e95f267665506c40a22068fe849c8 /plugin
parentd8b4e637a07cea438c03747e33a7d82a18aa5e71 (diff)
downloadmariadb-git-55842061e782593608808dc5c3e95ebc652650d7.tar.gz
Backport BUG#45848 Semisynchronous replication internals are visible in SHOW PROCESSLIST and logs
Semi-sync uses an extra connection from slave to master to send replies, this is a normal client connection, and used a normal SET query to set the reply information on master, which is visible to user and may cause some confusion and complaining. This problem is fixed by using the method of sending reply by using the same connection that is used by master dump thread to send binlog to slave. Since now the semi-sync plugins are integrated with the server code, it is not a problem to use the internal net interfaces to do this. The master dump thread will mark the event requires a reply and wait for the reply when the event just sent is the last event of a transaction and semi-sync status is ON; And the slave will send a reply to master when it received such an event that requires a reply.
Diffstat (limited to 'plugin')
-rw-r--r--plugin/semisync/Makefile.am1
-rw-r--r--plugin/semisync/semisync.h30
-rw-r--r--plugin/semisync/semisync_master.cc162
-rw-r--r--plugin/semisync/semisync_master.h27
-rw-r--r--plugin/semisync/semisync_master_plugin.cc50
-rw-r--r--plugin/semisync/semisync_slave.cc50
-rw-r--r--plugin/semisync/semisync_slave.h12
-rw-r--r--plugin/semisync/semisync_slave_plugin.cc10
8 files changed, 234 insertions, 108 deletions
diff --git a/plugin/semisync/Makefile.am b/plugin/semisync/Makefile.am
index dd9a630670c..dfe539b8386 100644
--- a/plugin/semisync/Makefile.am
+++ b/plugin/semisync/Makefile.am
@@ -18,6 +18,7 @@
pkgplugindir = $(pkglibdir)/plugin
INCLUDES = -I$(top_srcdir)/include \
-I$(top_srcdir)/sql \
+ -I$(top_srcdir)/regex \
-I$(srcdir)
noinst_HEADERS = semisync.h semisync_master.h semisync_slave.h
diff --git a/plugin/semisync/semisync.h b/plugin/semisync/semisync.h
index c9d35a093f6..ced25c40534 100644
--- a/plugin/semisync/semisync.h
+++ b/plugin/semisync/semisync.h
@@ -18,25 +18,9 @@
#ifndef SEMISYNC_H
#define SEMISYNC_H
-#include <stdint.h>
-#include <string.h>
-#include <assert.h>
-#include <sys/time.h>
-#include <time.h>
-#include <stdio.h>
-#include <pthread.h>
-#include <mysql.h>
-
-typedef uint32_t uint32;
-typedef unsigned long long my_off_t;
-#define FN_REFLEN 512 /* Max length of full path-name */
-void sql_print_error(const char *format, ...);
-void sql_print_warning(const char *format, ...);
-void sql_print_information(const char *format, ...);
-extern unsigned long max_connections;
-
#define MYSQL_SERVER
#define HAVE_REPLICATION
+#include <mysql_priv.h>
#include <my_global.h>
#include <my_pthread.h>
#include <mysql/plugin.h>
@@ -92,4 +76,16 @@ public:
static const unsigned char kPacketFlagSync;
};
+/* The layout of a semisync slave reply packet:
+ 1 byte for the magic num
+ 8 bytes for the binlog positon
+ n bytes for the binlog filename, terminated with a '\0'
+*/
+#define REPLY_MAGIC_NUM_LEN 1
+#define REPLY_BINLOG_POS_LEN 8
+#define REPLY_BINLOG_NAME_LEN (FN_REFLEN + 1)
+#define REPLY_MAGIC_NUM_OFFSET 0
+#define REPLY_BINLOG_POS_OFFSET (REPLY_MAGIC_NUM_OFFSET + REPLY_MAGIC_NUM_LEN)
+#define REPLY_BINLOG_NAME_OFFSET (REPLY_BINLOG_POS_OFFSET + REPLY_BINLOG_POS_LEN)
+
#endif /* SEMISYNC_H */
diff --git a/plugin/semisync/semisync_master.cc b/plugin/semisync/semisync_master.cc
index 3641b658268..decab205674 100644
--- a/plugin/semisync/semisync_master.cc
+++ b/plugin/semisync/semisync_master.cc
@@ -546,19 +546,6 @@ bool ReplSemiSyncMaster::is_semi_sync_slave()
return val;
}
-int ReplSemiSyncMaster::reportReplyBinlog(const char *log_file_pos)
-{
- char log_name[FN_REFLEN];
- char *endptr;
- my_off_t log_pos= strtoull(log_file_pos, &endptr, 10);
- if (!log_pos || !endptr || *endptr != ':' )
- return 1;
- endptr++; // skip the ':' seperator
- strncpy(log_name, endptr, FN_REFLEN);
- uint32 server_id= 0;
- return reportReplyBinlog(server_id, log_name, log_pos);
-}
-
int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
const char *log_file_name,
my_off_t log_file_pos)
@@ -679,7 +666,7 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
"Waiting for semi-sync ACK from slave");
/* This is the real check inside the mutex. */
- if (!getMasterEnabled() || !is_on() || !rpl_semi_sync_master_clients)
+ if (!getMasterEnabled() || !is_on())
goto l_end;
if (trace_level_ & kTraceDetail)
@@ -691,17 +678,20 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
while (is_on())
{
- int cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_,
- trx_wait_binlog_name, trx_wait_binlog_pos);
- if (cmp >= 0)
+ if (reply_file_name_inited_)
{
- /* We have already sent the relevant binlog to the slave: no need to
- * wait here.
- */
- if (trace_level_ & kTraceDetail)
- sql_print_information("%s: Binlog reply is ahead (%s, %lu),",
- kWho, reply_file_name_, (unsigned long)reply_file_pos_);
- break;
+ int cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_,
+ trx_wait_binlog_name, trx_wait_binlog_pos);
+ if (cmp >= 0)
+ {
+ /* We have already sent the relevant binlog to the slave: no need to
+ * wait here.
+ */
+ if (trace_level_ & kTraceDetail)
+ sql_print_information("%s: Binlog reply is ahead (%s, %lu),",
+ kWho, reply_file_name_, (unsigned long)reply_file_pos_);
+ break;
+ }
}
/* Let us update the info about the minimum binlog position of waiting
@@ -709,8 +699,8 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
*/
if (wait_file_name_inited_)
{
- cmp = ActiveTranx::compare(trx_wait_binlog_name, trx_wait_binlog_pos,
- wait_file_name_, wait_file_pos_);
+ int cmp = ActiveTranx::compare(trx_wait_binlog_name, trx_wait_binlog_pos,
+ wait_file_name_, wait_file_pos_);
if (cmp <= 0)
{
/* This thd has a lower position, let's update the minimum info. */
@@ -824,6 +814,13 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
}
l_end:
+ /*
+ At this point, the binlog file and position of this transaction
+ must have been removed from ActiveTranx.
+ */
+ assert(!active_tranxs_->is_tranx_end_pos(trx_wait_binlog_name,
+ trx_wait_binlog_pos));
+
/* Update the status counter. */
if (is_on() && rpl_semi_sync_master_clients)
enabled_transactions_++;
@@ -1045,7 +1042,9 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
* reserve the packet header.
*/
if (sync)
+ {
(packet)[2] = kPacketFlagSync;
+ }
return function_exit(kWho, 0);
}
@@ -1098,8 +1097,8 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
if insert tranx_node failed, print a warning message
and turn off semi-sync
*/
- sql_print_warning("Semi-sync failed to insert tranx_node for binlog file: %s, position: %ul",
- log_file_name, log_file_pos);
+ sql_print_warning("Semi-sync failed to insert tranx_node for binlog file: %s, position: %lu",
+ log_file_name, (ulong)log_file_pos);
switch_off();
}
}
@@ -1110,6 +1109,113 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
return function_exit(kWho, result);
}
+int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id,
+ const char *event_buf)
+{
+ const char *kWho = "ReplSemiSyncMaster::readSlaveReply";
+ const unsigned char *packet;
+ char log_file_name[FN_REFLEN];
+ my_off_t log_file_pos;
+ ulong packet_len;
+ int result = -1;
+
+ struct timeval start_tv;
+ int start_time_err= 0;
+ ulong trc_level = trace_level_;
+
+ function_enter(kWho);
+
+ assert((unsigned char)event_buf[1] == kPacketMagicNum);
+ if ((unsigned char)event_buf[2] != kPacketFlagSync)
+ {
+ /* current event does not require reply */
+ result = 0;
+ goto l_end;
+ }
+
+ if (trc_level & kTraceNetWait)
+ start_time_err = gettimeofday(&start_tv, 0);
+
+ /* We flush to make sure that the current event is sent to the network,
+ * instead of being buffered in the TCP/IP stack.
+ */
+ if (net_flush(net))
+ {
+ sql_print_error("Semi-sync master failed on net_flush() "
+ "before waiting for slave reply");
+ goto l_end;
+ }
+
+ net_clear(net, 0);
+ if (trc_level & kTraceDetail)
+ sql_print_information("%s: Wait for replica's reply", kWho);
+
+ /* Wait for the network here. Though binlog dump thread can indefinitely wait
+ * here, transactions would not wait indefintely.
+ * Transactions wait on binlog replies detected by binlog dump threads. If
+ * binlog dump threads wait too long, transactions will timeout and continue.
+ */
+ packet_len = my_net_read(net);
+
+ if (trc_level & kTraceNetWait)
+ {
+ if (start_time_err != 0)
+ {
+ sql_print_error("Semi-sync master wait for reply "
+ "gettimeofday fail to get start time");
+ timefunc_fails_++;
+ }
+ else
+ {
+ int wait_time;
+
+ wait_time = getWaitTime(start_tv);
+ if (wait_time < 0)
+ {
+ sql_print_error("Semi-sync master wait for reply "
+ "gettimeofday fail to get wait time.");
+ timefunc_fails_++;
+ }
+ else
+ {
+ total_net_wait_num_++;
+ total_net_wait_time_ += wait_time;
+ }
+ }
+ }
+
+ if (packet_len == packet_error || packet_len < REPLY_BINLOG_NAME_OFFSET)
+ {
+ if (packet_len == packet_error)
+ sql_print_error("Read semi-sync reply network error: %s (errno: %d)",
+ net->last_error, net->last_errno);
+ else
+ sql_print_error("Read semi-sync reply length error: %s (errno: %d)",
+ net->last_error, net->last_errno);
+ goto l_end;
+ }
+
+ packet = net->read_pos;
+ if (packet[REPLY_MAGIC_NUM_OFFSET] != ReplSemiSyncMaster::kPacketMagicNum)
+ {
+ sql_print_error("Read semi-sync reply magic number error");
+ goto l_end;
+ }
+
+ log_file_pos = uint8korr(packet + REPLY_BINLOG_POS_OFFSET);
+ strcpy(log_file_name, (const char*)packet + REPLY_BINLOG_NAME_OFFSET);
+
+ if (trc_level & kTraceDetail)
+ sql_print_information("%s: Got reply (%s, %lu)",
+ kWho, log_file_name, (ulong)log_file_pos);
+
+ result = reportReplyBinlog(server_id, log_file_name, log_file_pos);
+
+ l_end:
+ return function_exit(kWho, result);
+}
+
+
int ReplSemiSyncMaster::resetMaster()
{
const char *kWho = "ReplSemiSyncMaster::resetMaster";
diff --git a/plugin/semisync/semisync_master.h b/plugin/semisync/semisync_master.h
index bb63cece18a..50a4665efb8 100644
--- a/plugin/semisync/semisync_master.h
+++ b/plugin/semisync/semisync_master.h
@@ -81,7 +81,7 @@ public:
/* Insert an active transaction node with the specified position.
*
* Return:
- * 0: success; -1 or otherwise: error
+ * 0: success; non-zero: error
*/
int insert_tranx_node(const char *log_file_name, my_off_t log_file_pos);
@@ -91,7 +91,7 @@ public:
* list and the hash table will be reset to empty.
*
* Return:
- * 0: success; -1 or otherwise: error
+ * 0: success; non-zero: error
*/
int clear_active_tranx_nodes(const char *log_file_name,
my_off_t log_file_pos);
@@ -253,8 +253,6 @@ class ReplSemiSyncMaster
/* Is the slave servered by the thread requested semi-sync */
bool is_semi_sync_slave();
- int reportReplyBinlog(const char *log_file_pos);
-
/* In semi-sync replication, reports up to which binlog position we have
* received replies from the slave indicating that it already get the events.
*
@@ -265,7 +263,7 @@ class ReplSemiSyncMaster
* the replies from the slave
*
* Return:
- * 0: success; -1 or otherwise: error
+ * 0: success; non-zero: error
*/
int reportReplyBinlog(uint32 server_id,
const char* log_file_name,
@@ -284,7 +282,7 @@ class ReplSemiSyncMaster
* trx_wait_binlog_pos - (IN) ending position's file offset
*
* Return:
- * 0: success; -1 or otherwise: error
+ * 0: success; non-zero: error
*/
int commitTrx(const char* trx_wait_binlog_name,
my_off_t trx_wait_binlog_pos);
@@ -313,7 +311,7 @@ class ReplSemiSyncMaster
* server_id - (IN) master server id number
*
* Return:
- * 0: success; -1 or otherwise: error
+ * 0: success; non-zero: error
*/
int updateSyncHeader(unsigned char *packet,
const char *log_file_name,
@@ -330,10 +328,23 @@ class ReplSemiSyncMaster
* log_file_pos - (IN) transaction ending position's file offset
*
* Return:
- * 0: success; -1 or otherwise: error
+ * 0: success; non-zero: error
*/
int writeTranxInBinlog(const char* log_file_name, my_off_t log_file_pos);
+ /* Read the slave's reply so that we know how much progress the slave makes
+ * on receive replication events.
+ *
+ * Input:
+ * net - (IN) the connection to master
+ * server_id - (IN) master server id number
+ * event_buf - (IN) pointer to the event packet
+ *
+ * Return:
+ * 0: success; non-zero: error
+ */
+ int readSlaveReply(NET *net, uint32 server_id, const char *event_buf);
+
/* Export internal statistics for semi-sync replication. */
void setExportStats();
diff --git a/plugin/semisync/semisync_master_plugin.cc b/plugin/semisync/semisync_master_plugin.cc
index dc19d09e622..9c76b5369b6 100644
--- a/plugin/semisync/semisync_master_plugin.cc
+++ b/plugin/semisync/semisync_master_plugin.cc
@@ -69,8 +69,16 @@ int repl_semi_binlog_dump_start(Binlog_transmit_param *param,
bool semi_sync_slave= repl_semisync.is_semi_sync_slave();
if (semi_sync_slave)
+ {
/* One more semi-sync slave */
repl_semisync.add_slave();
+
+ /*
+ Let's assume this semi-sync slave has already received all
+ binlog events before the filename and position it requests.
+ */
+ repl_semisync.reportReplyBinlog(param->server_id, log_file, log_pos);
+ }
sql_print_information("Start %s binlog_dump to slave (server_id: %d), pos(%s, %lu)",
semi_sync_slave ? "semi-sync" : "asynchronous",
param->server_id, log_file, (unsigned long)log_pos);
@@ -114,6 +122,18 @@ int repl_semi_before_send_event(Binlog_transmit_param *param,
int repl_semi_after_send_event(Binlog_transmit_param *param,
const char *event_buf, unsigned long len)
{
+ if (repl_semisync.is_semi_sync_slave())
+ {
+ THD *thd= current_thd;
+ /*
+ Possible errors in reading slave reply are ignored deliberately
+ because we do not want dump thread to quit on this. Error
+ messages are already reported.
+ */
+ (void) repl_semisync.readSlaveReply(&thd->net,
+ param->server_id, event_buf);
+ thd->clear_error();
+ }
return 0;
}
@@ -142,11 +162,6 @@ static void fix_rpl_semi_sync_master_enabled(MYSQL_THD thd,
void *ptr,
const void *val);
-static void fix_rpl_semi_sync_master_reply_log_file_pos(MYSQL_THD thd,
- SYS_VAR *var,
- void *ptr,
- const void *val);
-
static MYSQL_SYSVAR_BOOL(enabled, rpl_semi_sync_master_enabled,
PLUGIN_VAR_OPCMDARG,
"Enable semi-synchronous replication master (disabled by default). ",
@@ -168,22 +183,10 @@ static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_master_trace_level,
&fix_rpl_semi_sync_master_trace_level, // update
32, 0, ~0L, 1);
-/*
- Use a SESSION instead of GLOBAL variable for slave to send reply to
- avoid requiring SUPER privilege.
-*/
-static MYSQL_THDVAR_STR(reply_log_file_pos,
- PLUGIN_VAR_NOCMDOPT,
- "The log filename and position slave has queued to relay log.",
- NULL, // check
- &fix_rpl_semi_sync_master_reply_log_file_pos,
- "");
-
static SYS_VAR* semi_sync_master_system_vars[]= {
MYSQL_SYSVAR(enabled),
MYSQL_SYSVAR(timeout),
MYSQL_SYSVAR(trace_level),
- MYSQL_SYSVAR(reply_log_file_pos),
NULL,
};
@@ -228,19 +231,6 @@ static void fix_rpl_semi_sync_master_enabled(MYSQL_THD thd,
return;
}
-static void fix_rpl_semi_sync_master_reply_log_file_pos(MYSQL_THD thd,
- SYS_VAR *var,
- void *ptr,
- const void *val)
-{
- const char *log_file_pos= *(char **)val;
-
- if (repl_semisync.reportReplyBinlog(log_file_pos))
- sql_print_error("report slave binlog reply failed.");
-
- return;
-}
-
Trans_observer trans_observer = {
sizeof(Trans_observer), // len
diff --git a/plugin/semisync/semisync_slave.cc b/plugin/semisync/semisync_slave.cc
index 3298ce316a8..6e773b2f743 100644
--- a/plugin/semisync/semisync_slave.cc
+++ b/plugin/semisync/semisync_slave.cc
@@ -104,19 +104,45 @@ int ReplSemiSyncSlave::slaveStop(Binlog_relay_IO_param *param)
return 0;
}
-int ReplSemiSyncSlave::slaveReply(const char *log_name, my_off_t log_pos)
+int ReplSemiSyncSlave::slaveReply(MYSQL *mysql,
+ const char *binlog_filename,
+ my_off_t binlog_filepos)
{
- char query[FN_REFLEN + 100];
- sprintf(query, "SET SESSION rpl_semi_sync_master_reply_log_file_pos='%llu:%s'",
- (unsigned long long)log_pos, log_name);
- if (mysql_real_query(mysql_reply, query, strlen(query)))
+ const char *kWho = "ReplSemiSyncSlave::slaveReply";
+ NET *net= &mysql->net;
+ uchar reply_buffer[REPLY_MAGIC_NUM_LEN
+ + REPLY_BINLOG_POS_LEN
+ + REPLY_BINLOG_NAME_LEN];
+ int reply_res, name_len = strlen(binlog_filename);
+
+ function_enter(kWho);
+
+ /* Prepare the buffer of the reply. */
+ reply_buffer[REPLY_MAGIC_NUM_OFFSET] = kPacketMagicNum;
+ int8store(reply_buffer + REPLY_BINLOG_POS_OFFSET, binlog_filepos);
+ memcpy(reply_buffer + REPLY_BINLOG_NAME_OFFSET,
+ binlog_filename,
+ name_len + 1 /* including trailing '\0' */);
+
+ if (trace_level_ & kTraceDetail)
+ sql_print_information("%s: reply (%s, %lu)", kWho,
+ binlog_filename, (ulong)binlog_filepos);
+
+ net_clear(net, 0);
+ /* Send the reply. */
+ reply_res = my_net_write(net, reply_buffer,
+ name_len + REPLY_BINLOG_NAME_OFFSET);
+ if (!reply_res)
{
- sql_print_error("Set 'rpl_semi_sync_master_reply_log_file_pos' on master failed");
- mysql_free_result(mysql_store_result(mysql_reply));
- mysql_close(mysql_reply);
- mysql_reply= 0;
- return 1;
+ reply_res = net_flush(net);
+ if (reply_res)
+ sql_print_error("Semi-sync slave net_flush() reply failed");
}
- mysql_free_result(mysql_store_result(mysql_reply));
- return 0;
+ else
+ {
+ sql_print_error("Semi-sync slave send reply failed: %s (%d)",
+ net->last_error, net->last_errno);
+ }
+
+ return function_exit(kWho, reply_res);
}
diff --git a/plugin/semisync/semisync_slave.h b/plugin/semisync/semisync_slave.h
index 16fa31c69eb..f212911c49d 100644
--- a/plugin/semisync/semisync_slave.h
+++ b/plugin/semisync/semisync_slave.h
@@ -57,7 +57,7 @@ public:
* payload_len - (IN) payload length
*
* Return:
- * 0: success; -1 or otherwise: error
+ * 0: success; non-zero: error
*/
int slaveReadSyncHeader(const char *header, unsigned long total_len, bool *need_reply,
const char **payload, unsigned long *payload_len);
@@ -67,13 +67,15 @@ public:
* binlog position.
*
* Input:
- * log_name - (IN) the reply point's binlog file name
- * log_pos - (IN) the reply point's binlog file offset
+ * mysql - (IN) the mysql network connection
+ * binlog_filename - (IN) the reply point's binlog file name
+ * binlog_filepos - (IN) the reply point's binlog file offset
*
* Return:
- * 0: success; -1 or otherwise: error
+ * 0: success; non-zero: error
*/
- int slaveReply(const char *log_name, my_off_t log_pos);
+ int slaveReply(MYSQL *mysql, const char *binlog_filename,
+ my_off_t binlog_filepos);
/*
Connect to master for sending reply
diff --git a/plugin/semisync/semisync_slave_plugin.cc b/plugin/semisync/semisync_slave_plugin.cc
index ffc663c9bdb..40c6ed9838c 100644
--- a/plugin/semisync/semisync_slave_plugin.cc
+++ b/plugin/semisync/semisync_slave_plugin.cc
@@ -45,13 +45,6 @@ int repl_semi_slave_request_dump(Binlog_relay_IO_param *param,
if (!repl_semisync.getSlaveEnabled())
return 0;
- /*
- Create the connection that is used to send slave ACK replies to
- master
- */
- if (repl_semisync.slaveReplyConnect())
- return 1;
-
/* Check if master server has semi-sync plugin installed */
query= "SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled'";
if (mysql_real_query(mysql, query, strlen(query)) ||
@@ -106,7 +99,8 @@ int repl_semi_slave_queue_event(Binlog_relay_IO_param *param,
uint32 flags)
{
if (rpl_semi_sync_slave_status && semi_sync_need_reply)
- return repl_semisync.slaveReply(param->master_log_name,
+ return repl_semisync.slaveReply(param->mysql,
+ param->master_log_name,
param->master_log_pos);
return 0;
}