diff options
author | unknown <knielsen@knielsen-hq.org> | 2013-11-01 12:00:11 +0100 |
---|---|---|
committer | unknown <knielsen@knielsen-hq.org> | 2013-11-01 12:00:11 +0100 |
commit | 57a267a8c00471bbe13724e7d9ba89d23acef3c2 (patch) | |
tree | 808a9f9b165bfe034304e4974dd618eaafcdedb2 /sql/log.cc | |
parent | bd3dc54261f10f387a03ad99ce74c3824c42e462 (diff) | |
parent | cb86ce60b9bade5ae7712d8f3f74668208ee3fd2 (diff) | |
download | mariadb-git-57a267a8c00471bbe13724e7d9ba89d23acef3c2.tar.gz |
Merge from 10.0-base to 10.0 the feature MDEV-4506: Parallel replication.
The merge is still missing a few hunks related to temporary tables and
InnoDB log file size. The associated code did not seem to exist in
10.0, so the merge of that needs more work. Until this is fixed, there
are a number of test failures as a result.
Diffstat (limited to 'sql/log.cc')
-rw-r--r-- | sql/log.cc | 393 |
1 files changed, 365 insertions, 28 deletions
diff --git a/sql/log.cc b/sql/log.cc index b2cd03de481..6f08a924116 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -88,6 +88,7 @@ ulong opt_binlog_dbug_fsync_sleep= 0; #endif mysql_mutex_t LOCK_prepare_ordered; +mysql_cond_t COND_prepare_ordered; mysql_mutex_t LOCK_commit_ordered; static ulonglong binlog_status_var_num_commits; @@ -5402,7 +5403,7 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd, /* Generate a new global transaction ID, and write it to the binlog */ bool MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone, - bool is_transactional) + bool is_transactional, uint64 commit_id) { rpl_gtid gtid; uint32 domain_id= thd->variables.gtid_domain_id; @@ -5440,7 +5441,8 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone, return true; Gtid_log_event gtid_event(thd, seq_no, domain_id, standalone, - LOG_EVENT_SUPPRESS_USE_F, is_transactional); + LOG_EVENT_SUPPRESS_USE_F, is_transactional, + commit_id); /* Write the event to the binary log. */ if (gtid_event.write(&mysql_bin_log.log_file)) @@ -5722,7 +5724,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) my_org_b_tell= my_b_tell(file); mysql_mutex_lock(&LOCK_log); prev_binlog_id= current_binlog_id; - if (write_gtid_event(thd, true, using_trans)) + if (write_gtid_event(thd, true, using_trans, 0)) goto err; } else @@ -6611,45 +6613,284 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, } } + +/* + Put a transaction that is ready to commit in the group commit queue. + The transaction is identified by the ENTRY object passed into this function. + + To facilitate group commit for the binlog, we first queue up ourselves in + this function. Then later the first thread to enter the queue waits for + the LOCK_log mutex, and commits for everyone in the queue once it gets the + lock. Any other threads in the queue just wait for the first one to finish + the commit and wake them up. This way, all transactions in the queue get + committed in a single disk operation. + + The main work in this function is when the commit in one transaction has + been marked to wait for the commit of another transaction to happen + first. This is used to support in-order parallel replication, where + transactions can execute out-of-order but need to be committed in-order with + how they happened on the master. The waiting of one commit on another needs + to be integrated with the group commit queue, to ensure that the waiting + transaction can participate in the same group commit as the waited-for + transaction. + + So when we put a transaction in the queue, we check if there were other + transactions already prepared to commit but just waiting for the first one + to commit. If so, we add those to the queue as well, transitively for all + waiters. + + @retval TRUE If queued as the first entry in the queue (meaning this + is the leader) + @retval FALSE Otherwise +*/ + bool -MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) +MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) { + group_commit_entry *entry, *orig_queue; + wait_for_commit *list, *cur, *last; + wait_for_commit *wfc; + DBUG_ENTER("MYSQL_BIN_LOG::queue_for_group_commit"); + /* - To facilitate group commit for the binlog, we first queue up ourselves in - the group commit queue. Then the first thread to enter the queue waits for - the LOCK_log mutex, and commits for everyone in the queue once it gets the - lock. Any other threads in the queue just wait for the first one to finish - the commit and wake them up. + Check if we need to wait for another transaction to commit before us. + + It is safe to do a quick check without lock first in the case where we do + not have to wait. But if the quick check shows we need to wait, we must do + another safe check under lock, to avoid the race where the other + transaction wakes us up between the check and the wait. */ + wfc= orig_entry->thd->wait_for_commit_ptr; + orig_entry->queued_by_other= false; + if (wfc && wfc->waiting_for_commit) + { + mysql_mutex_lock(&wfc->LOCK_wait_commit); + /* Do an extra check here, this time safely under lock. */ + if (wfc->waiting_for_commit) + { + /* + By setting wfc->opaque_pointer to our own entry, we mark that we are + ready to commit, but waiting for another transaction to commit before + us. + + This other transaction may then take over the commit process for us to + get us included in its own group commit. If this happens, the + queued_by_other flag is set. + */ + wfc->opaque_pointer= orig_entry; + DEBUG_SYNC(orig_entry->thd, "group_commit_waiting_for_prior"); + do + { + mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit); + } while (wfc->waiting_for_commit); + wfc->opaque_pointer= NULL; + DBUG_PRINT("info", ("After waiting for prior commit, queued_by_other=%d", + orig_entry->queued_by_other)); + } + mysql_mutex_unlock(&wfc->LOCK_wait_commit); + } - entry->thd->clear_wakeup_ready(); + /* + If the transaction we were waiting for has already put us into the group + commit queue (and possibly already done the entire binlog commit for us), + then there is nothing else to do. + */ + if (orig_entry->queued_by_other) + DBUG_RETURN(false); + + /* Now enqueue ourselves in the group commit queue. */ + DEBUG_SYNC(orig_entry->thd, "commit_before_enqueue"); + orig_entry->thd->clear_wakeup_ready(); mysql_mutex_lock(&LOCK_prepare_ordered); - group_commit_entry *orig_queue= group_commit_queue; - entry->next= orig_queue; - group_commit_queue= entry; + orig_queue= group_commit_queue; + + /* + Iteratively process everything added to the queue, looking for waiters, + and their waiters, and so on. If a waiter is ready to commit, we + immediately add it to the queue; if not we just wake it up. + + This would be natural to do with recursion, but we want to avoid + potentially unbounded recursion blowing the C stack, so we use the list + approach instead. + + We keep a list of all the waiters that need to be processed in `list', + linked through the next_subsequent_commit pointer. Initially this list + contains only the entry passed into this function. + + We process entries in the list one by one. The element currently being + processed is pointed to by `cur`, and the element at the end of the list + is pointed to by `last` (we do not use NULL to terminate the list). + + As we process an element, it is first added to the group_commit_queue. + Then any waiters for that element are added at the end of the list, to + be processed in subsequent iterations. This continues until the list + is exhausted, with all elements ever added eventually processed. + + The end result is a breath-first traversal of the tree of waiters, + re-using the next_subsequent_commit pointers in place of extra stack + space in a recursive traversal. + + The temporary list created in next_subsequent_commit is not + used by the caller or any other function. + */ + + list= wfc; + cur= list; + last= list; + entry= orig_entry; + for (;;) + { + /* Add the entry to the group commit queue. */ + entry->next= group_commit_queue; + group_commit_queue= entry; + + if (entry->cache_mngr->using_xa) + { + DEBUG_SYNC(entry->thd, "commit_before_prepare_ordered"); + run_prepare_ordered(entry->thd, entry->all); + DEBUG_SYNC(entry->thd, "commit_after_prepare_ordered"); + } + + if (!cur) + break; // Can happen if initial entry has no wait_for_commit + + /* + Check if this transaction has other transaction waiting for it to commit. + + If so, process the waiting transactions, and their waiters and so on, + transitively. + */ + if (cur->subsequent_commits_list) + { + bool have_lock; + wait_for_commit *waiter; + + mysql_mutex_lock(&cur->LOCK_wait_commit); + have_lock= true; + /* + Grab the list, now safely under lock, and process it if still + non-empty. + */ + waiter= cur->subsequent_commits_list; + cur->subsequent_commits_list= NULL; + while (waiter) + { + wait_for_commit *next= waiter->next_subsequent_commit; + group_commit_entry *entry2= + (group_commit_entry *)waiter->opaque_pointer; + if (entry2) + { + /* + This is another transaction ready to be written to the binary + log. We can put it into the queue directly, without needing a + separate context switch to the other thread. We just set a flag + so that the other thread will know when it wakes up that it was + already processed. + + So put it at the end of the list to be processed in a subsequent + iteration of the outer loop. + */ + entry2->queued_by_other= true; + last->next_subsequent_commit= waiter; + last= waiter; + /* + As a small optimisation, we do not actually need to set + waiter->next_subsequent_commit to NULL, as we can use the + pointer `last' to check for end-of-list. + */ + } + else + { + /* + Wake up the waiting transaction. + + For this, we need to set the "wakeup running" flag and release + the waitee lock to avoid a deadlock, see comments on + THD::wakeup_subsequent_commits2() for details. + */ + if (have_lock) + { + have_lock= false; + cur->wakeup_subsequent_commits_running= true; + mysql_mutex_unlock(&cur->LOCK_wait_commit); + } + waiter->wakeup(0); + } + waiter= next; + } + if (have_lock) + mysql_mutex_unlock(&cur->LOCK_wait_commit); + } + if (cur == last) + break; + /* + Move to the next entry in the flattened list of waiting transactions + that still need to be processed transitively. + */ + cur= cur->next_subsequent_commit; + entry= (group_commit_entry *)cur->opaque_pointer; + DBUG_ASSERT(entry != NULL); + } + + /* + Now we need to clear the wakeup_subsequent_commits_running flags. + + We need a full memory barrier between walking the list above, and clearing + the flag wakeup_subsequent_commits_running below. This barrier is needed + to ensure that no other thread will start to modify the list pointers + before we are done traversing the list. - if (entry->cache_mngr->using_xa) + But wait_for_commit::wakeup(), which was called above for any other thread + that might modify the list in parallel, does a full memory barrier already + (it locks a mutex). + */ + if (list) { - DEBUG_SYNC(entry->thd, "commit_before_prepare_ordered"); - run_prepare_ordered(entry->thd, entry->all); - DEBUG_SYNC(entry->thd, "commit_after_prepare_ordered"); + for (;;) + { + list->wakeup_subsequent_commits_running= false; + if (list == last) + break; + list= list->next_subsequent_commit; + } } + + if (opt_binlog_commit_wait_count > 0) + mysql_cond_signal(&COND_prepare_ordered); mysql_mutex_unlock(&LOCK_prepare_ordered); - DEBUG_SYNC(entry->thd, "commit_after_release_LOCK_prepare_ordered"); + DEBUG_SYNC(orig_entry->thd, "commit_after_release_LOCK_prepare_ordered"); + + DBUG_PRINT("info", ("Queued for group commit as %s\n", + (orig_queue == NULL) ? "leader" : "participant")); + DBUG_RETURN(orig_queue == NULL); +} + +bool +MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) +{ + bool is_leader= queue_for_group_commit(entry); /* - The first in the queue handle group commit for all; the others just wait + The first in the queue handles group commit for all; the others just wait to be signalled when group commit is done. */ - if (orig_queue != NULL) + if (is_leader) + trx_group_commit_leader(entry); + else if (!entry->queued_by_other) entry->thd->wait_for_wakeup_ready(); else - trx_group_commit_leader(entry); + { + /* + If we were queued by another prior commit, then we are woken up + only when the leader has already completed the commit for us. + So nothing to do here then. + */ + } if (!opt_optimize_thread_scheduling) { /* For the leader, trx_group_commit_leader() already took the lock. */ - if (orig_queue != NULL) + if (!is_leader) mysql_mutex_lock(&LOCK_commit_ordered); DEBUG_SYNC(entry->thd, "commit_loop_entry_commit_ordered"); @@ -6668,7 +6909,20 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) if (next) { - next->thd->signal_wakeup_ready(); + /* + Wake up the next thread in the group commit. + + The next thread can be waiting in two different ways, depending on + whether it put itself in the queue, or if it was put in queue by us + because it had to wait for us to commit first. + + So execute the appropriate wakeup, identified by the queued_by_other + field. + */ + if (next->queued_by_other) + next->thd->wait_for_commit_ptr->wakeup(entry->error); + else + next->thd->signal_wakeup_ready(); } else { @@ -6738,6 +6992,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) group_commit_entry *queue= NULL; bool check_purge= false; ulong binlog_id; + uint64 commit_id; DBUG_ENTER("MYSQL_BIN_LOG::trx_group_commit_leader"); LINT_INIT(binlog_id); @@ -6748,12 +7003,18 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) */ mysql_mutex_lock(&LOCK_log); DEBUG_SYNC(leader->thd, "commit_after_get_LOCK_log"); - binlog_id= current_binlog_id; mysql_mutex_lock(&LOCK_prepare_ordered); + if (opt_binlog_commit_wait_count) + wait_for_sufficient_commits(); + /* + Note that wait_for_sufficient_commits() may have released and + re-acquired the LOCK_log and LOCK_prepare_ordered if it needed to wait. + */ current= group_commit_queue; group_commit_queue= NULL; mysql_mutex_unlock(&LOCK_prepare_ordered); + binlog_id= current_binlog_id; /* As the queue is in reverse order of entering, reverse it. */ last_in_queue= current; @@ -6772,6 +7033,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) DBUG_ASSERT(is_open()); if (likely(is_open())) // Should always be true { + commit_id= (last_in_queue == leader ? 0 : (uint64)leader->thd->query_id); /* Commit every transaction in the queue. @@ -6792,7 +7054,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) */ DBUG_ASSERT(!cache_mngr->stmt_cache.empty() || !cache_mngr->trx_cache.empty()); - if ((current->error= write_transaction_or_stmt(current))) + if ((current->error= write_transaction_or_stmt(current, commit_id))) current->commit_errno= errno; strmake_buf(cache_mngr->last_commit_pos_file, log_file_name); @@ -6952,7 +7214,12 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) */ next= current->next; if (current != leader) // Don't wake up ourself - current->thd->signal_wakeup_ready(); + { + if (current->queued_by_other) + current->thd->wait_for_commit_ptr->wakeup(current->error); + else + current->thd->signal_wakeup_ready(); + } current= next; } DEBUG_SYNC(leader->thd, "commit_after_group_run_commit_ordered"); @@ -6967,11 +7234,12 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) int -MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry) +MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry, + uint64 commit_id) { binlog_cache_mngr *mngr= entry->cache_mngr; - if (write_gtid_event(entry->thd, false, entry->using_trx_cache)) + if (write_gtid_event(entry->thd, false, entry->using_trx_cache, commit_id)) return ER_ERROR_ON_WRITE; if (entry->using_stmt_cache && !mngr->stmt_cache.empty() && @@ -7039,6 +7307,72 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry) return 0; } + +/* + Wait for sufficient commits to queue up for group commit, according to the + values of binlog_commit_wait_count and binlog_commit_wait_usec. + + Note that this function may release and re-acquire LOCK_log and + LOCK_prepare_ordered if it needs to wait. +*/ + +void +MYSQL_BIN_LOG::wait_for_sufficient_commits() +{ + size_t count; + group_commit_entry *e; + group_commit_entry *last_head; + struct timespec wait_until; + + mysql_mutex_assert_owner(&LOCK_log); + mysql_mutex_assert_owner(&LOCK_prepare_ordered); + + for (e= last_head= group_commit_queue, count= 0; e; e= e->next) + if (++count >= opt_binlog_commit_wait_count) + return; + + mysql_mutex_unlock(&LOCK_log); + set_timespec_nsec(wait_until, (ulonglong)1000*opt_binlog_commit_wait_usec); + + for (;;) + { + int err; + group_commit_entry *head; + + err= mysql_cond_timedwait(&COND_prepare_ordered, &LOCK_prepare_ordered, + &wait_until); + if (err == ETIMEDOUT) + break; + head= group_commit_queue; + for (e= head; e && e != last_head; e= e->next) + ++count; + if (count >= opt_binlog_commit_wait_count) + break; + last_head= head; + } + + /* + We must not wait for LOCK_log while holding LOCK_prepare_ordered. + LOCK_log can be held for long periods (eg. we do I/O under it), while + LOCK_prepare_ordered must only be held for short periods. + + In addition, waiting for LOCK_log while holding LOCK_prepare_ordered would + violate locking order of LOCK_log-before-LOCK_prepare_ordered. This could + cause SAFEMUTEX warnings (even if it cannot actually deadlock with current + code, as there can be at most one group commit leader thread at a time). + + So release and re-acquire LOCK_prepare_ordered if we need to wait for the + LOCK_log. + */ + if (mysql_mutex_trylock(&LOCK_log)) + { + mysql_mutex_unlock(&LOCK_prepare_ordered); + mysql_mutex_lock(&LOCK_log); + mysql_mutex_lock(&LOCK_prepare_ordered); + } +} + + /** Wait until we get a signal that the relay log has been updated. @@ -7580,6 +7914,9 @@ int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all, mysql_mutex_unlock(&LOCK_prepare_ordered); } + if (thd->wait_for_prior_commit()) + return 0; + cookie= 0; if (xid) cookie= log_one_transaction(xid); |