diff options
-rw-r--r-- | mysql-test/suite/rpl/r/rpl_semi_sync.result | 31 | ||||
-rw-r--r-- | mysql-test/suite/rpl/t/rpl_semi_sync.test | 33 | ||||
-rw-r--r-- | plugin/semisync/semisync_master.cc | 25 | ||||
-rw-r--r-- | plugin/semisync/semisync_master.h | 284 | ||||
-rw-r--r-- | sql/rpl_handler.cc | 10 |
5 files changed, 335 insertions, 48 deletions
diff --git a/mysql-test/suite/rpl/r/rpl_semi_sync.result b/mysql-test/suite/rpl/r/rpl_semi_sync.result index 1e220b28d78..74eb14d33e0 100644 --- a/mysql-test/suite/rpl/r/rpl_semi_sync.result +++ b/mysql-test/suite/rpl/r/rpl_semi_sync.result @@ -120,8 +120,27 @@ min(a) select max(a) from t1; max(a) 300 + +# BUG#50157 +# semi-sync replication crashes when replicating a transaction which +# include 'CREATE TEMPORARY TABLE `MyISAM_t` SELECT * FROM `Innodb_t` ; +[ on master ] +SET SESSION AUTOCOMMIT= 0; +CREATE TABLE t2(c1 INT) ENGINE=innodb; +BEGIN; + +# Even though it is in a transaction, this statement is binlogged into binlog +# file immediately. +CREATE TEMPORARY TABLE t3 SELECT c1 FROM t2 where 1=1; + +# These statements will not be binlogged until the transaction is committed +INSERT INTO t2 VALUES(11); +INSERT INTO t2 VALUES(22); +COMMIT; +DROP TABLE t2, t3; +SET SESSION AUTOCOMMIT= 1; # -# Test semi-sync master will switch OFF after one transacton +# Test semi-sync master will switch OFF after one transaction # timeout waiting for slave reply. # include/stop_slave.inc @@ -135,7 +154,7 @@ Variable_name Value Rpl_semi_sync_master_no_tx 0 show status like 'Rpl_semi_sync_master_yes_tx'; Variable_name Value -Rpl_semi_sync_master_yes_tx 301 +Rpl_semi_sync_master_yes_tx 304 show status like 'Rpl_semi_sync_master_clients'; Variable_name Value Rpl_semi_sync_master_clients 1 @@ -150,7 +169,7 @@ Variable_name Value Rpl_semi_sync_master_no_tx 1 show status like 'Rpl_semi_sync_master_yes_tx'; Variable_name Value -Rpl_semi_sync_master_yes_tx 301 +Rpl_semi_sync_master_yes_tx 304 insert into t1 values (100); [ master status should be OFF ] show status like 'Rpl_semi_sync_master_status'; @@ -161,7 +180,7 @@ Variable_name Value Rpl_semi_sync_master_no_tx 302 show status like 'Rpl_semi_sync_master_yes_tx'; Variable_name Value -Rpl_semi_sync_master_yes_tx 301 +Rpl_semi_sync_master_yes_tx 304 # # Test semi-sync status on master will be ON again when slave catches up # @@ -194,7 +213,7 @@ Variable_name Value Rpl_semi_sync_master_no_tx 302 show status like 'Rpl_semi_sync_master_yes_tx'; Variable_name Value -Rpl_semi_sync_master_yes_tx 301 +Rpl_semi_sync_master_yes_tx 304 show status like 'Rpl_semi_sync_master_clients'; Variable_name Value Rpl_semi_sync_master_clients 1 @@ -213,7 +232,7 @@ Variable_name Value Rpl_semi_sync_master_no_tx 302 SHOW STATUS LIKE 'Rpl_semi_sync_master_yes_tx'; Variable_name Value -Rpl_semi_sync_master_yes_tx 302 +Rpl_semi_sync_master_yes_tx 305 FLUSH NO_WRITE_TO_BINLOG STATUS; [ Semi-sync master status variables after FLUSH STATUS ] SHOW STATUS LIKE 'Rpl_semi_sync_master_no_tx'; diff --git a/mysql-test/suite/rpl/t/rpl_semi_sync.test b/mysql-test/suite/rpl/t/rpl_semi_sync.test index 4900acc1e91..b04541aba21 100644 --- a/mysql-test/suite/rpl/t/rpl_semi_sync.test +++ b/mysql-test/suite/rpl/t/rpl_semi_sync.test @@ -11,6 +11,7 @@ disable_query_log; connection master; call mtr.add_suppression("Timeout waiting for reply of binlog"); call mtr.add_suppression("Read semi-sync reply"); +call mtr.add_suppression("Unsafe statement binlogged in statement format since BINLOG_FORMAT = STATEMENT."); connection slave; call mtr.add_suppression("Master server does not support semi-sync"); call mtr.add_suppression("Semi-sync slave .* reply"); @@ -193,8 +194,38 @@ select count(distinct a) from t1; select min(a) from t1; select max(a) from t1; +--echo +--echo # BUG#50157 +--echo # semi-sync replication crashes when replicating a transaction which +--echo # include 'CREATE TEMPORARY TABLE `MyISAM_t` SELECT * FROM `Innodb_t` ; + +connection master; +echo [ on master ]; +SET SESSION AUTOCOMMIT= 0; +CREATE TABLE t2(c1 INT) ENGINE=innodb; +sync_slave_with_master; + +connection master; +BEGIN; +--echo +--echo # Even though it is in a transaction, this statement is binlogged into binlog +--echo # file immediately. +--disable_warnings +CREATE TEMPORARY TABLE t3 SELECT c1 FROM t2 where 1=1; +--enable_warnings +--echo +--echo # These statements will not be binlogged until the transaction is committed +INSERT INTO t2 VALUES(11); +INSERT INTO t2 VALUES(22); +COMMIT; + +DROP TABLE t2, t3; +SET SESSION AUTOCOMMIT= 1; +sync_slave_with_master; + + --echo # ---echo # Test semi-sync master will switch OFF after one transacton +--echo # Test semi-sync master will switch OFF after one transaction --echo # timeout waiting for slave reply. --echo # connection slave; diff --git a/plugin/semisync/semisync_master.cc b/plugin/semisync/semisync_master.cc index c2e329e1fe4..bbbc2a669c4 100644 --- a/plugin/semisync/semisync_master.cc +++ b/plugin/semisync/semisync_master.cc @@ -65,7 +65,7 @@ static int gettimeofday(struct timeval *tv, void *tz) ActiveTranx::ActiveTranx(pthread_mutex_t *lock, unsigned long trace_level) - : Trace(trace_level), + : Trace(trace_level), allocator_(max_connections), num_entries_(max_connections << 1), /* Transaction hash table size * is set to double the size * of max_connections */ @@ -115,25 +115,6 @@ unsigned int ActiveTranx::get_hash_value(const char *log_file_name, return (hash1 + hash2) % num_entries_; } -ActiveTranx::TranxNode* ActiveTranx::alloc_tranx_node() -{ - MYSQL_THD thd= (MYSQL_THD)current_thd; - /* The memory allocated for TranxNode will be automatically freed at - the end of the command of current THD. And because - ha_autocommit_or_rollback() will always be called before that, so - we are sure that the node will be removed from the active list - before it get freed. */ - TranxNode *trx_node = (TranxNode *)thd_alloc(thd, sizeof(TranxNode)); - if (trx_node) - { - trx_node->log_name_[0] = '\0'; - trx_node->log_pos_= 0; - trx_node->next_= 0; - trx_node->hash_next_= 0; - } - return trx_node; -} - int ActiveTranx::compare(const char *log_file_name1, my_off_t log_file_pos1, const char *log_file_name2, my_off_t log_file_pos2) { @@ -159,7 +140,7 @@ int ActiveTranx::insert_tranx_node(const char *log_file_name, function_enter(kWho); - ins_node = alloc_tranx_node(); + ins_node = allocator_.allocate_node(); if (!ins_node) { sql_print_error("%s: transaction node allocation failed for: (%s, %lu)", @@ -271,6 +252,7 @@ int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name, /* Clear the hash table. */ memset(trx_htb_, 0, num_entries_ * sizeof(TranxNode *)); + allocator_.free_all_nodes(); /* Clear the active transaction list. */ if (trx_front_ != NULL) @@ -311,6 +293,7 @@ int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name, } trx_front_ = new_front; + allocator_.free_nodes_before(trx_front_); if (trace_level_ & kTraceDetail) sql_print_information("%s: cleared %d nodes back until pos (%s, %lu)", diff --git a/plugin/semisync/semisync_master.h b/plugin/semisync/semisync_master.h index bfb1cb74cd0..982a7f77a59 100644 --- a/plugin/semisync/semisync_master.h +++ b/plugin/semisync/semisync_master.h @@ -20,6 +20,267 @@ #include "semisync.h" +struct TranxNode { + char log_name_[FN_REFLEN]; + my_off_t log_pos_; + struct TranxNode *next_; /* the next node in the sorted list */ + struct TranxNode *hash_next_; /* the next node during hash collision */ +}; + +/** + @class TranxNodeAllocator + + This class provides memory allocating and freeing methods for + TranxNode. The main target is performance. + + @section ALLOCATE How to allocate a node + The pointer of the first node after 'last_node' in current_block is + returned. current_block will move to the next free Block when all nodes of + it are in use. A new Block is allocated and is put into the rear of the + Block link table if no Block is free. + + The list starts up empty (ie, there is no allocated Block). + + After some nodes are freed, there probably are some free nodes before + the sequence of the allocated nodes, but we do not reuse it. It is better + to keep the allocated nodes are in the sequence, for it is more efficient + for allocating and freeing TranxNode. + + @section FREENODE How to free nodes + There are two methods for freeing nodes. They are free_all_nodes and + free_nodes_before. + + 'A Block is free' means all of its nodes are free. + @subsection free_nodes_before + As all allocated nodes are in the sequence, 'Before one node' means all + nodes before given node in the same Block and all Blocks before the Block + which containing the given node. As such, all Blocks before the given one + ('node') are free Block and moved into the rear of the Block link table. + The Block containing the given 'node', however, is not. For at least the + given 'node' is still in use. This will waste at most one Block, but it is + more efficient. + */ +#define BLOCK_TRANX_NODES 16 +class TranxNodeAllocator +{ +public: + /** + @param reserved_nodes + The number of reserved TranxNodes. It is used to set 'reserved_blocks' + which can contain at least 'reserved_nodes' number of TranxNodes. When + freeing memory, we will reserve at least reserved_blocks of Blocks not + freed. + */ + TranxNodeAllocator(uint reserved_nodes) : + reserved_blocks(reserved_nodes/BLOCK_TRANX_NODES + + (reserved_nodes%BLOCK_TRANX_NODES > 1 ? 2 : 1)), + first_block(NULL), last_block(NULL), + current_block(NULL), last_node(-1), block_num(0) {} + + ~TranxNodeAllocator() + { + Block *block= first_block; + while (block != NULL) + { + Block *next= block->next; + free_block(block); + block= next; + } + } + + /** + The pointer of the first node after 'last_node' in current_block is + returned. current_block will move to the next free Block when all nodes of + it are in use. A new Block is allocated and is put into the rear of the + Block link table if no Block is free. + + @return Return a TranxNode *, or NULL if an error occured. + */ + TranxNode *allocate_node() + { + TranxNode *trx_node; + Block *block= current_block; + + if (last_node == BLOCK_TRANX_NODES-1) + { + current_block= current_block->next; + last_node= -1; + } + + if (current_block == NULL && allocate_block()) + { + current_block= block; + if (current_block) + last_node= BLOCK_TRANX_NODES-1; + return NULL; + } + + trx_node= &(current_block->nodes[++last_node]); + trx_node->log_name_[0] = '\0'; + trx_node->log_pos_= 0; + trx_node->next_= 0; + trx_node->hash_next_= 0; + return trx_node; + } + + /** + All nodes are freed. + + @return Return 0, or 1 if an error occured. + */ + int free_all_nodes() + { + current_block= first_block; + last_node= -1; + free_blocks(); + return 0; + } + + /** + All Blocks before the given 'node' are free Block and moved into the rear + of the Block link table. + + @param node All nodes before 'node' will be freed + + @return Return 0, or 1 if an error occured. + */ + int free_nodes_before(TranxNode* node) + { + Block *block; + Block *prev_block; + + block= first_block; + while (block != current_block->next) + { + /* Find the Block containing the given node */ + if (&(block->nodes[0]) <= node && &(block->nodes[BLOCK_TRANX_NODES]) >= node) + { + /* All Blocks before the given node are put into the rear */ + if (first_block != block) + { + last_block->next= first_block; + first_block= block; + last_block= prev_block; + last_block->next= NULL; + free_blocks(); + } + return 0; + } + prev_block= block; + block= block->next; + } + + /* Node does not find should never happen */ + DBUG_ASSERT(0); + return 1; + } + +private: + uint reserved_blocks; + + /** + A sequence memory which contains BLOCK_TRANX_NODES TranxNodes. + + BLOCK_TRANX_NODES The number of TranxNodes which are in a Block. + + next Every Block has a 'next' pointer which points to the next Block. + These linking Blocks constitute a Block link table. + */ + struct Block { + Block *next; + TranxNode nodes[BLOCK_TRANX_NODES]; + }; + + /** + The 'first_block' is the head of the Block link table; + */ + Block *first_block; + /** + The 'last_block' is the rear of the Block link table; + */ + Block *last_block; + + /** + current_block always points the Block in the Block link table in + which the last allocated node is. The Blocks before it are all in use + and the Blocks after it are all free. + */ + Block *current_block; + + /** + It always points to the last node which has been allocated in the + current_block. + */ + int last_node; + + /** + How many Blocks are in the Block link table. + */ + uint block_num; + + /** + Allocate a block and then assign it to current_block. + */ + int allocate_block() + { + Block *block= (Block *)my_malloc(sizeof(Block), MYF(0)); + if (block) + { + block->next= NULL; + + if (first_block == NULL) + first_block= block; + else + last_block->next= block; + + /* New Block is always put into the rear */ + last_block= block; + /* New Block is always the current_block */ + current_block= block; + ++block_num; + return 0; + } + return 1; + } + + /** + Free a given Block. + @param block The Block will be freed. + */ + void free_block(Block *block) + { + my_free(block, MYF(0)); + --block_num; + } + + + /** + If there are some free Blocks and the total number of the Blocks in the + Block link table is larger than the 'reserved_blocks', Some free Blocks + will be freed until the total number of the Blocks is equal to the + 'reserved_blocks' or there is only one free Block behind the + 'current_block'. + */ + void free_blocks() + { + if (current_block == NULL || current_block->next == NULL) + return; + + /* One free Block is always kept behind the current block */ + Block *block= current_block->next->next; + while (block_num > reserved_blocks && block != NULL) + { + Block *next= block->next; + free_block(block); + block= next; + } + current_block->next->next= block; + if (block == NULL) + last_block= current_block->next; + } +}; + + /** This class manages memory for active transaction list. @@ -31,13 +292,8 @@ class ActiveTranx :public Trace { private: - struct TranxNode { - char log_name_[FN_REFLEN]; - my_off_t log_pos_; - struct TranxNode *next_; /* the next node in the sorted list */ - struct TranxNode *hash_next_; /* the next node during hash collision */ - }; + TranxNodeAllocator allocator_; /* These two record the active transaction list in sort order. */ TranxNode *trx_front_, *trx_rear_; @@ -48,24 +304,22 @@ private: inline void assert_lock_owner(); - inline TranxNode* alloc_tranx_node(); - inline unsigned int calc_hash(const unsigned char *key,unsigned int length); unsigned int get_hash_value(const char *log_file_name, my_off_t log_file_pos); int compare(const char *log_file_name1, my_off_t log_file_pos1, - const TranxNode *node2) { + const TranxNode *node2) { return compare(log_file_name1, log_file_pos1, - node2->log_name_, node2->log_pos_); + node2->log_name_, node2->log_pos_); } int compare(const TranxNode *node1, - const char *log_file_name2, my_off_t log_file_pos2) { + const char *log_file_name2, my_off_t log_file_pos2) { return compare(node1->log_name_, node1->log_pos_, - log_file_name2, log_file_pos2); + log_file_name2, log_file_pos2); } int compare(const TranxNode *node1, const TranxNode *node2) { return compare(node1->log_name_, node1->log_pos_, - node2->log_name_, node2->log_pos_); + node2->log_name_, node2->log_pos_); } public: @@ -88,7 +342,7 @@ public: * 0: success; non-zero: error */ int clear_active_tranx_nodes(const char *log_file_name, - my_off_t log_file_pos); + my_off_t log_file_pos); /* Given a position, check to see whether the position is an active * transaction's ending position by probing the hash table. @@ -99,7 +353,7 @@ public: * (file_name, file_position). */ static int compare(const char *log_file_name1, my_off_t log_file_pos1, - const char *log_file_name2, my_off_t log_file_pos2); + const char *log_file_name2, my_off_t log_file_pos2); }; diff --git a/sql/rpl_handler.cc b/sql/rpl_handler.cc index c4b55e3d068..b347b7c751d 100644 --- a/sql/rpl_handler.cc +++ b/sql/rpl_handler.cc @@ -190,8 +190,8 @@ int Trans_delegate::after_commit(THD *thd, bool all) { Trans_param param; bool is_real_trans= (all || thd->transaction.all.ha_list == 0); - if (is_real_trans) - param.flags |= TRANS_IS_REAL_TRANS; + + param.flags = is_real_trans ? TRANS_IS_REAL_TRANS : 0; Trans_binlog_info *log_info= my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO); @@ -218,8 +218,8 @@ int Trans_delegate::after_rollback(THD *thd, bool all) { Trans_param param; bool is_real_trans= (all || thd->transaction.all.ha_list == 0); - if (is_real_trans) - param.flags |= TRANS_IS_REAL_TRANS; + + param.flags = is_real_trans ? TRANS_IS_REAL_TRANS : 0; Trans_binlog_info *log_info= my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO); @@ -228,7 +228,7 @@ int Trans_delegate::after_rollback(THD *thd, bool all) param.log_pos= log_info ? log_info->log_pos : 0; int ret= 0; - FOREACH_OBSERVER(ret, after_commit, thd, (¶m)); + FOREACH_OBSERVER(ret, after_rollback, thd, (¶m)); /* This is the end of a real transaction or autocommit statement, we |