summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mysql-test/suite/rpl/r/rpl_semi_sync.result31
-rw-r--r--mysql-test/suite/rpl/t/rpl_semi_sync.test33
-rw-r--r--plugin/semisync/semisync_master.cc25
-rw-r--r--plugin/semisync/semisync_master.h284
-rw-r--r--sql/rpl_handler.cc10
5 files changed, 335 insertions, 48 deletions
diff --git a/mysql-test/suite/rpl/r/rpl_semi_sync.result b/mysql-test/suite/rpl/r/rpl_semi_sync.result
index 1e220b28d78..74eb14d33e0 100644
--- a/mysql-test/suite/rpl/r/rpl_semi_sync.result
+++ b/mysql-test/suite/rpl/r/rpl_semi_sync.result
@@ -120,8 +120,27 @@ min(a)
select max(a) from t1;
max(a)
300
+
+# BUG#50157
+# semi-sync replication crashes when replicating a transaction which
+# include 'CREATE TEMPORARY TABLE `MyISAM_t` SELECT * FROM `Innodb_t` ;
+[ on master ]
+SET SESSION AUTOCOMMIT= 0;
+CREATE TABLE t2(c1 INT) ENGINE=innodb;
+BEGIN;
+
+# Even though it is in a transaction, this statement is binlogged into binlog
+# file immediately.
+CREATE TEMPORARY TABLE t3 SELECT c1 FROM t2 where 1=1;
+
+# These statements will not be binlogged until the transaction is committed
+INSERT INTO t2 VALUES(11);
+INSERT INTO t2 VALUES(22);
+COMMIT;
+DROP TABLE t2, t3;
+SET SESSION AUTOCOMMIT= 1;
#
-# Test semi-sync master will switch OFF after one transacton
+# Test semi-sync master will switch OFF after one transaction
# timeout waiting for slave reply.
#
include/stop_slave.inc
@@ -135,7 +154,7 @@ Variable_name Value
Rpl_semi_sync_master_no_tx 0
show status like 'Rpl_semi_sync_master_yes_tx';
Variable_name Value
-Rpl_semi_sync_master_yes_tx 301
+Rpl_semi_sync_master_yes_tx 304
show status like 'Rpl_semi_sync_master_clients';
Variable_name Value
Rpl_semi_sync_master_clients 1
@@ -150,7 +169,7 @@ Variable_name Value
Rpl_semi_sync_master_no_tx 1
show status like 'Rpl_semi_sync_master_yes_tx';
Variable_name Value
-Rpl_semi_sync_master_yes_tx 301
+Rpl_semi_sync_master_yes_tx 304
insert into t1 values (100);
[ master status should be OFF ]
show status like 'Rpl_semi_sync_master_status';
@@ -161,7 +180,7 @@ Variable_name Value
Rpl_semi_sync_master_no_tx 302
show status like 'Rpl_semi_sync_master_yes_tx';
Variable_name Value
-Rpl_semi_sync_master_yes_tx 301
+Rpl_semi_sync_master_yes_tx 304
#
# Test semi-sync status on master will be ON again when slave catches up
#
@@ -194,7 +213,7 @@ Variable_name Value
Rpl_semi_sync_master_no_tx 302
show status like 'Rpl_semi_sync_master_yes_tx';
Variable_name Value
-Rpl_semi_sync_master_yes_tx 301
+Rpl_semi_sync_master_yes_tx 304
show status like 'Rpl_semi_sync_master_clients';
Variable_name Value
Rpl_semi_sync_master_clients 1
@@ -213,7 +232,7 @@ Variable_name Value
Rpl_semi_sync_master_no_tx 302
SHOW STATUS LIKE 'Rpl_semi_sync_master_yes_tx';
Variable_name Value
-Rpl_semi_sync_master_yes_tx 302
+Rpl_semi_sync_master_yes_tx 305
FLUSH NO_WRITE_TO_BINLOG STATUS;
[ Semi-sync master status variables after FLUSH STATUS ]
SHOW STATUS LIKE 'Rpl_semi_sync_master_no_tx';
diff --git a/mysql-test/suite/rpl/t/rpl_semi_sync.test b/mysql-test/suite/rpl/t/rpl_semi_sync.test
index 4900acc1e91..b04541aba21 100644
--- a/mysql-test/suite/rpl/t/rpl_semi_sync.test
+++ b/mysql-test/suite/rpl/t/rpl_semi_sync.test
@@ -11,6 +11,7 @@ disable_query_log;
connection master;
call mtr.add_suppression("Timeout waiting for reply of binlog");
call mtr.add_suppression("Read semi-sync reply");
+call mtr.add_suppression("Unsafe statement binlogged in statement format since BINLOG_FORMAT = STATEMENT.");
connection slave;
call mtr.add_suppression("Master server does not support semi-sync");
call mtr.add_suppression("Semi-sync slave .* reply");
@@ -193,8 +194,38 @@ select count(distinct a) from t1;
select min(a) from t1;
select max(a) from t1;
+--echo
+--echo # BUG#50157
+--echo # semi-sync replication crashes when replicating a transaction which
+--echo # include 'CREATE TEMPORARY TABLE `MyISAM_t` SELECT * FROM `Innodb_t` ;
+
+connection master;
+echo [ on master ];
+SET SESSION AUTOCOMMIT= 0;
+CREATE TABLE t2(c1 INT) ENGINE=innodb;
+sync_slave_with_master;
+
+connection master;
+BEGIN;
+--echo
+--echo # Even though it is in a transaction, this statement is binlogged into binlog
+--echo # file immediately.
+--disable_warnings
+CREATE TEMPORARY TABLE t3 SELECT c1 FROM t2 where 1=1;
+--enable_warnings
+--echo
+--echo # These statements will not be binlogged until the transaction is committed
+INSERT INTO t2 VALUES(11);
+INSERT INTO t2 VALUES(22);
+COMMIT;
+
+DROP TABLE t2, t3;
+SET SESSION AUTOCOMMIT= 1;
+sync_slave_with_master;
+
+
--echo #
---echo # Test semi-sync master will switch OFF after one transacton
+--echo # Test semi-sync master will switch OFF after one transaction
--echo # timeout waiting for slave reply.
--echo #
connection slave;
diff --git a/plugin/semisync/semisync_master.cc b/plugin/semisync/semisync_master.cc
index c2e329e1fe4..bbbc2a669c4 100644
--- a/plugin/semisync/semisync_master.cc
+++ b/plugin/semisync/semisync_master.cc
@@ -65,7 +65,7 @@ static int gettimeofday(struct timeval *tv, void *tz)
ActiveTranx::ActiveTranx(pthread_mutex_t *lock,
unsigned long trace_level)
- : Trace(trace_level),
+ : Trace(trace_level), allocator_(max_connections),
num_entries_(max_connections << 1), /* Transaction hash table size
* is set to double the size
* of max_connections */
@@ -115,25 +115,6 @@ unsigned int ActiveTranx::get_hash_value(const char *log_file_name,
return (hash1 + hash2) % num_entries_;
}
-ActiveTranx::TranxNode* ActiveTranx::alloc_tranx_node()
-{
- MYSQL_THD thd= (MYSQL_THD)current_thd;
- /* The memory allocated for TranxNode will be automatically freed at
- the end of the command of current THD. And because
- ha_autocommit_or_rollback() will always be called before that, so
- we are sure that the node will be removed from the active list
- before it get freed. */
- TranxNode *trx_node = (TranxNode *)thd_alloc(thd, sizeof(TranxNode));
- if (trx_node)
- {
- trx_node->log_name_[0] = '\0';
- trx_node->log_pos_= 0;
- trx_node->next_= 0;
- trx_node->hash_next_= 0;
- }
- return trx_node;
-}
-
int ActiveTranx::compare(const char *log_file_name1, my_off_t log_file_pos1,
const char *log_file_name2, my_off_t log_file_pos2)
{
@@ -159,7 +140,7 @@ int ActiveTranx::insert_tranx_node(const char *log_file_name,
function_enter(kWho);
- ins_node = alloc_tranx_node();
+ ins_node = allocator_.allocate_node();
if (!ins_node)
{
sql_print_error("%s: transaction node allocation failed for: (%s, %lu)",
@@ -271,6 +252,7 @@ int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
/* Clear the hash table. */
memset(trx_htb_, 0, num_entries_ * sizeof(TranxNode *));
+ allocator_.free_all_nodes();
/* Clear the active transaction list. */
if (trx_front_ != NULL)
@@ -311,6 +293,7 @@ int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
}
trx_front_ = new_front;
+ allocator_.free_nodes_before(trx_front_);
if (trace_level_ & kTraceDetail)
sql_print_information("%s: cleared %d nodes back until pos (%s, %lu)",
diff --git a/plugin/semisync/semisync_master.h b/plugin/semisync/semisync_master.h
index bfb1cb74cd0..982a7f77a59 100644
--- a/plugin/semisync/semisync_master.h
+++ b/plugin/semisync/semisync_master.h
@@ -20,6 +20,267 @@
#include "semisync.h"
+struct TranxNode {
+ char log_name_[FN_REFLEN];
+ my_off_t log_pos_;
+ struct TranxNode *next_; /* the next node in the sorted list */
+ struct TranxNode *hash_next_; /* the next node during hash collision */
+};
+
+/**
+ @class TranxNodeAllocator
+
+ This class provides memory allocating and freeing methods for
+ TranxNode. The main target is performance.
+
+ @section ALLOCATE How to allocate a node
+ The pointer of the first node after 'last_node' in current_block is
+ returned. current_block will move to the next free Block when all nodes of
+ it are in use. A new Block is allocated and is put into the rear of the
+ Block link table if no Block is free.
+
+ The list starts up empty (ie, there is no allocated Block).
+
+ After some nodes are freed, there probably are some free nodes before
+ the sequence of the allocated nodes, but we do not reuse it. It is better
+ to keep the allocated nodes are in the sequence, for it is more efficient
+ for allocating and freeing TranxNode.
+
+ @section FREENODE How to free nodes
+ There are two methods for freeing nodes. They are free_all_nodes and
+ free_nodes_before.
+
+ 'A Block is free' means all of its nodes are free.
+ @subsection free_nodes_before
+ As all allocated nodes are in the sequence, 'Before one node' means all
+ nodes before given node in the same Block and all Blocks before the Block
+ which containing the given node. As such, all Blocks before the given one
+ ('node') are free Block and moved into the rear of the Block link table.
+ The Block containing the given 'node', however, is not. For at least the
+ given 'node' is still in use. This will waste at most one Block, but it is
+ more efficient.
+ */
+#define BLOCK_TRANX_NODES 16
+class TranxNodeAllocator
+{
+public:
+ /**
+ @param reserved_nodes
+ The number of reserved TranxNodes. It is used to set 'reserved_blocks'
+ which can contain at least 'reserved_nodes' number of TranxNodes. When
+ freeing memory, we will reserve at least reserved_blocks of Blocks not
+ freed.
+ */
+ TranxNodeAllocator(uint reserved_nodes) :
+ reserved_blocks(reserved_nodes/BLOCK_TRANX_NODES +
+ (reserved_nodes%BLOCK_TRANX_NODES > 1 ? 2 : 1)),
+ first_block(NULL), last_block(NULL),
+ current_block(NULL), last_node(-1), block_num(0) {}
+
+ ~TranxNodeAllocator()
+ {
+ Block *block= first_block;
+ while (block != NULL)
+ {
+ Block *next= block->next;
+ free_block(block);
+ block= next;
+ }
+ }
+
+ /**
+ The pointer of the first node after 'last_node' in current_block is
+ returned. current_block will move to the next free Block when all nodes of
+ it are in use. A new Block is allocated and is put into the rear of the
+ Block link table if no Block is free.
+
+ @return Return a TranxNode *, or NULL if an error occured.
+ */
+ TranxNode *allocate_node()
+ {
+ TranxNode *trx_node;
+ Block *block= current_block;
+
+ if (last_node == BLOCK_TRANX_NODES-1)
+ {
+ current_block= current_block->next;
+ last_node= -1;
+ }
+
+ if (current_block == NULL && allocate_block())
+ {
+ current_block= block;
+ if (current_block)
+ last_node= BLOCK_TRANX_NODES-1;
+ return NULL;
+ }
+
+ trx_node= &(current_block->nodes[++last_node]);
+ trx_node->log_name_[0] = '\0';
+ trx_node->log_pos_= 0;
+ trx_node->next_= 0;
+ trx_node->hash_next_= 0;
+ return trx_node;
+ }
+
+ /**
+ All nodes are freed.
+
+ @return Return 0, or 1 if an error occured.
+ */
+ int free_all_nodes()
+ {
+ current_block= first_block;
+ last_node= -1;
+ free_blocks();
+ return 0;
+ }
+
+ /**
+ All Blocks before the given 'node' are free Block and moved into the rear
+ of the Block link table.
+
+ @param node All nodes before 'node' will be freed
+
+ @return Return 0, or 1 if an error occured.
+ */
+ int free_nodes_before(TranxNode* node)
+ {
+ Block *block;
+ Block *prev_block;
+
+ block= first_block;
+ while (block != current_block->next)
+ {
+ /* Find the Block containing the given node */
+ if (&(block->nodes[0]) <= node && &(block->nodes[BLOCK_TRANX_NODES]) >= node)
+ {
+ /* All Blocks before the given node are put into the rear */
+ if (first_block != block)
+ {
+ last_block->next= first_block;
+ first_block= block;
+ last_block= prev_block;
+ last_block->next= NULL;
+ free_blocks();
+ }
+ return 0;
+ }
+ prev_block= block;
+ block= block->next;
+ }
+
+ /* Node does not find should never happen */
+ DBUG_ASSERT(0);
+ return 1;
+ }
+
+private:
+ uint reserved_blocks;
+
+ /**
+ A sequence memory which contains BLOCK_TRANX_NODES TranxNodes.
+
+ BLOCK_TRANX_NODES The number of TranxNodes which are in a Block.
+
+ next Every Block has a 'next' pointer which points to the next Block.
+ These linking Blocks constitute a Block link table.
+ */
+ struct Block {
+ Block *next;
+ TranxNode nodes[BLOCK_TRANX_NODES];
+ };
+
+ /**
+ The 'first_block' is the head of the Block link table;
+ */
+ Block *first_block;
+ /**
+ The 'last_block' is the rear of the Block link table;
+ */
+ Block *last_block;
+
+ /**
+ current_block always points the Block in the Block link table in
+ which the last allocated node is. The Blocks before it are all in use
+ and the Blocks after it are all free.
+ */
+ Block *current_block;
+
+ /**
+ It always points to the last node which has been allocated in the
+ current_block.
+ */
+ int last_node;
+
+ /**
+ How many Blocks are in the Block link table.
+ */
+ uint block_num;
+
+ /**
+ Allocate a block and then assign it to current_block.
+ */
+ int allocate_block()
+ {
+ Block *block= (Block *)my_malloc(sizeof(Block), MYF(0));
+ if (block)
+ {
+ block->next= NULL;
+
+ if (first_block == NULL)
+ first_block= block;
+ else
+ last_block->next= block;
+
+ /* New Block is always put into the rear */
+ last_block= block;
+ /* New Block is always the current_block */
+ current_block= block;
+ ++block_num;
+ return 0;
+ }
+ return 1;
+ }
+
+ /**
+ Free a given Block.
+ @param block The Block will be freed.
+ */
+ void free_block(Block *block)
+ {
+ my_free(block, MYF(0));
+ --block_num;
+ }
+
+
+ /**
+ If there are some free Blocks and the total number of the Blocks in the
+ Block link table is larger than the 'reserved_blocks', Some free Blocks
+ will be freed until the total number of the Blocks is equal to the
+ 'reserved_blocks' or there is only one free Block behind the
+ 'current_block'.
+ */
+ void free_blocks()
+ {
+ if (current_block == NULL || current_block->next == NULL)
+ return;
+
+ /* One free Block is always kept behind the current block */
+ Block *block= current_block->next->next;
+ while (block_num > reserved_blocks && block != NULL)
+ {
+ Block *next= block->next;
+ free_block(block);
+ block= next;
+ }
+ current_block->next->next= block;
+ if (block == NULL)
+ last_block= current_block->next;
+ }
+};
+
+
/**
This class manages memory for active transaction list.
@@ -31,13 +292,8 @@
class ActiveTranx
:public Trace {
private:
- struct TranxNode {
- char log_name_[FN_REFLEN];
- my_off_t log_pos_;
- struct TranxNode *next_; /* the next node in the sorted list */
- struct TranxNode *hash_next_; /* the next node during hash collision */
- };
+ TranxNodeAllocator allocator_;
/* These two record the active transaction list in sort order. */
TranxNode *trx_front_, *trx_rear_;
@@ -48,24 +304,22 @@ private:
inline void assert_lock_owner();
- inline TranxNode* alloc_tranx_node();
-
inline unsigned int calc_hash(const unsigned char *key,unsigned int length);
unsigned int get_hash_value(const char *log_file_name, my_off_t log_file_pos);
int compare(const char *log_file_name1, my_off_t log_file_pos1,
- const TranxNode *node2) {
+ const TranxNode *node2) {
return compare(log_file_name1, log_file_pos1,
- node2->log_name_, node2->log_pos_);
+ node2->log_name_, node2->log_pos_);
}
int compare(const TranxNode *node1,
- const char *log_file_name2, my_off_t log_file_pos2) {
+ const char *log_file_name2, my_off_t log_file_pos2) {
return compare(node1->log_name_, node1->log_pos_,
- log_file_name2, log_file_pos2);
+ log_file_name2, log_file_pos2);
}
int compare(const TranxNode *node1, const TranxNode *node2) {
return compare(node1->log_name_, node1->log_pos_,
- node2->log_name_, node2->log_pos_);
+ node2->log_name_, node2->log_pos_);
}
public:
@@ -88,7 +342,7 @@ public:
* 0: success; non-zero: error
*/
int clear_active_tranx_nodes(const char *log_file_name,
- my_off_t log_file_pos);
+ my_off_t log_file_pos);
/* Given a position, check to see whether the position is an active
* transaction's ending position by probing the hash table.
@@ -99,7 +353,7 @@ public:
* (file_name, file_position).
*/
static int compare(const char *log_file_name1, my_off_t log_file_pos1,
- const char *log_file_name2, my_off_t log_file_pos2);
+ const char *log_file_name2, my_off_t log_file_pos2);
};
diff --git a/sql/rpl_handler.cc b/sql/rpl_handler.cc
index c4b55e3d068..b347b7c751d 100644
--- a/sql/rpl_handler.cc
+++ b/sql/rpl_handler.cc
@@ -190,8 +190,8 @@ int Trans_delegate::after_commit(THD *thd, bool all)
{
Trans_param param;
bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
- if (is_real_trans)
- param.flags |= TRANS_IS_REAL_TRANS;
+
+ param.flags = is_real_trans ? TRANS_IS_REAL_TRANS : 0;
Trans_binlog_info *log_info=
my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
@@ -218,8 +218,8 @@ int Trans_delegate::after_rollback(THD *thd, bool all)
{
Trans_param param;
bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
- if (is_real_trans)
- param.flags |= TRANS_IS_REAL_TRANS;
+
+ param.flags = is_real_trans ? TRANS_IS_REAL_TRANS : 0;
Trans_binlog_info *log_info=
my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
@@ -228,7 +228,7 @@ int Trans_delegate::after_rollback(THD *thd, bool all)
param.log_pos= log_info ? log_info->log_pos : 0;
int ret= 0;
- FOREACH_OBSERVER(ret, after_commit, thd, (&param));
+ FOREACH_OBSERVER(ret, after_rollback, thd, (&param));
/*
This is the end of a real transaction or autocommit statement, we