summaryrefslogtreecommitdiff
path: root/sql/log.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/log.cc')
-rw-r--r--sql/log.cc271
1 files changed, 129 insertions, 142 deletions
diff --git a/sql/log.cc b/sql/log.cc
index 24e52838e47..2c20ea318c8 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -25,7 +25,7 @@
Abort logging when we get an error in reading or writing log files
*/
-#include "my_global.h" /* NO_EMBEDDED_ACCESS_CHECKS */
+#include <my_global.h> /* NO_EMBEDDED_ACCESS_CHECKS */
#include "sql_priv.h"
#include "log.h"
#include "sql_base.h" // open_log_table
@@ -2515,6 +2515,7 @@ bool MYSQL_LOG::open(
const char *new_name, enum cache_type io_cache_type_arg)
{
char buff[FN_REFLEN];
+ MY_STAT f_stat;
File file= -1;
int open_flags= O_CREAT | O_BINARY;
DBUG_ENTER("MYSQL_LOG::open");
@@ -2532,6 +2533,10 @@ bool MYSQL_LOG::open(
log_type_arg, io_cache_type_arg))
goto err;
+ /* File is regular writable file */
+ if (my_stat(log_file_name, &f_stat, MYF(0)) && !MY_S_ISREG(f_stat.st_mode))
+ goto err;
+
if (io_cache_type == SEQ_READ_APPEND)
open_flags |= O_RDWR | O_APPEND;
else
@@ -5874,7 +5879,10 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
if (direct)
{
+ int res;
DBUG_PRINT("info", ("direct is set"));
+ if ((res= thd->wait_for_prior_commit()))
+ DBUG_RETURN(res);
file= &log_file;
my_org_b_tell= my_b_tell(file);
mysql_mutex_lock(&LOCK_log);
@@ -6807,6 +6815,10 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd,
to commit. If so, we add those to the queue as well, transitively for all
waiters.
+ And if a transaction is marked to wait for a prior transaction, but that
+ prior transaction is already queued for group commit, then we can queue the
+ new transaction directly to participate in the group commit.
+
@retval < 0 Error
@retval > 0 If queued as the first entry in the queue (meaning this
is the leader)
@@ -6816,8 +6828,8 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd,
int
MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
{
- group_commit_entry *entry, *orig_queue;
- wait_for_commit *cur, *last;
+ group_commit_entry *entry, *orig_queue, *last;
+ wait_for_commit *cur;
wait_for_commit *wfc;
DBUG_ENTER("MYSQL_BIN_LOG::queue_for_group_commit");
@@ -6834,8 +6846,17 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
if (wfc && wfc->waitee)
{
mysql_mutex_lock(&wfc->LOCK_wait_commit);
- /* Do an extra check here, this time safely under lock. */
- if (wfc->waitee)
+ /*
+ Do an extra check here, this time safely under lock.
+
+ If waitee->commit_started is set, it means that the transaction we need
+ to wait for has already queued up for group commit. In this case it is
+ safe for us to queue up immediately as well, increasing the opprtunities
+ for group commit. Because waitee has taken the LOCK_prepare_ordered
+ before setting the flag, so there is no risk that we can queue ahead of
+ it.
+ */
+ if (wfc->waitee && !wfc->waitee->commit_started)
{
PSI_stage_info old_stage;
wait_for_commit *loc_waitee;
@@ -6848,6 +6869,11 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
This other transaction may then take over the commit process for us to
get us included in its own group commit. If this happens, the
queued_by_other flag is set.
+
+ Setting this flag may or may not be seen by the other thread, but we
+ are safe in any case: The other thread will set queued_by_other under
+ its LOCK_wait_commit, and we will not check queued_by_other only after
+ we have been woken up.
*/
wfc->opaque_pointer= orig_entry;
DEBUG_SYNC(orig_entry->thd, "group_commit_waiting_for_prior");
@@ -6920,41 +6946,41 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
/*
Iteratively process everything added to the queue, looking for waiters,
and their waiters, and so on. If a waiter is ready to commit, we
- immediately add it to the queue; if not we just wake it up.
+ immediately add it to the queue, and mark it as queued_by_other.
This would be natural to do with recursion, but we want to avoid
potentially unbounded recursion blowing the C stack, so we use the list
approach instead.
- We keep a list of all the waiters that need to be processed in `list',
- linked through the next_subsequent_commit pointer. Initially this list
- contains only the entry passed into this function.
+ We keep a list of the group_commit_entry of all the waiters that need to
+ be processed. Initially this list contains only the entry passed into this
+ function.
We process entries in the list one by one. The element currently being
- processed is pointed to by `cur`, and the element at the end of the list
+ processed is pointed to by `entry`, and the element at the end of the list
is pointed to by `last` (we do not use NULL to terminate the list).
- As we process an element, it is first added to the group_commit_queue.
- Then any waiters for that element are added at the end of the list, to
- be processed in subsequent iterations. This continues until the list
- is exhausted, with all elements ever added eventually processed.
+ As we process an entry, any waiters for that entry are added at the end of
+ the list, to be processed in subsequent iterations. The the entry is added
+ to the group_commit_queue. This continues until the list is exhausted,
+ with all entries ever added eventually processed.
The end result is a breath-first traversal of the tree of waiters,
- re-using the next_subsequent_commit pointers in place of extra stack
- space in a recursive traversal.
+ re-using the `next' pointers of the group_commit_entry objects in place of
+ extra stack space in a recursive traversal.
- The temporary list created in next_subsequent_commit is not
- used by the caller or any other function.
+ The temporary list linked through these `next' pointers is not used by the
+ caller or any other function; it only exists while doing the iterative
+ tree traversal. After, all the processed entries are linked into the
+ group_commit_queue.
*/
cur= wfc;
- last= wfc;
+ last= orig_entry;
entry= orig_entry;
for (;;)
{
- /* Add the entry to the group commit queue. */
- entry->next= group_commit_queue;
- group_commit_queue= entry;
+ group_commit_entry *next_entry;
if (entry->cache_mngr->using_xa)
{
@@ -6963,135 +6989,95 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
DEBUG_SYNC(entry->thd, "commit_after_prepare_ordered");
}
- if (!cur)
- break; // Can happen if initial entry has no wait_for_commit
-
- /*
- Check if this transaction has other transaction waiting for it to commit.
-
- If so, process the waiting transactions, and their waiters and so on,
- transitively.
- */
- if (cur->subsequent_commits_list)
+ if (cur)
{
- wait_for_commit *waiter;
- wait_for_commit *wakeup_list= NULL;
- wait_for_commit **wakeup_next_ptr= &wakeup_list;
+ /*
+ Now that we have taken LOCK_prepare_ordered and will queue up in the
+ group commit queue, it is safe for following transactions to queue
+ themselves. We will grab here any transaction that is now ready to
+ queue up, but after that, more transactions may become ready while the
+ leader is waiting to start the group commit. So set the flag
+ `commit_started', so that later transactions can still participate in
+ the group commit..
+ */
+ cur->commit_started= true;
- mysql_mutex_lock(&cur->LOCK_wait_commit);
/*
- Grab the list, now safely under lock, and process it if still
- non-empty.
+ Check if this transaction has other transaction waiting for it to
+ commit.
+
+ If so, process the waiting transactions, and their waiters and so on,
+ transitively.
*/
- waiter= cur->subsequent_commits_list;
- cur->subsequent_commits_list= NULL;
- while (waiter)
- {
- wait_for_commit *next= waiter->next_subsequent_commit;
- group_commit_entry *entry2=
- (group_commit_entry *)waiter->opaque_pointer;
- if (entry2)
- {
- /*
- This is another transaction ready to be written to the binary
- log. We can put it into the queue directly, without needing a
- separate context switch to the other thread. We just set a flag
- so that the other thread will know when it wakes up that it was
- already processed.
-
- So put it at the end of the list to be processed in a subsequent
- iteration of the outer loop.
- */
- entry2->queued_by_other= true;
- last->next_subsequent_commit= waiter;
- last= waiter;
- /*
- As a small optimisation, we do not actually need to set
- waiter->next_subsequent_commit to NULL, as we can use the
- pointer `last' to check for end-of-list.
- */
- }
- else
- {
- /*
- Wake up the waiting transaction.
-
- For this, we need to set the "wakeup running" flag and release
- the waitee lock to avoid a deadlock, see comments on
- THD::wakeup_subsequent_commits2() for details.
-
- So we need to put these on a list and delay the wakeup until we
- have released the lock.
- */
- *wakeup_next_ptr= waiter;
- wakeup_next_ptr= &waiter->next_subsequent_commit;
- }
- waiter= next;
- }
- if (wakeup_list)
+ if (cur->subsequent_commits_list)
{
- /* Now release our lock and do the wakeups that were delayed above. */
- cur->wakeup_subsequent_commits_running= true;
- mysql_mutex_unlock(&cur->LOCK_wait_commit);
- for (;;)
+ wait_for_commit *waiter, **waiter_ptr;
+
+ mysql_mutex_lock(&cur->LOCK_wait_commit);
+ /*
+ Grab the list, now safely under lock, and process it if still
+ non-empty.
+ */
+ waiter= cur->subsequent_commits_list;
+ waiter_ptr= &cur->subsequent_commits_list;
+ while (waiter)
{
- wait_for_commit *next;
-
- /*
- ToDo: We wakeup the waiter here, so that it can have the chance to
- reach its own commit state and queue up for this same group commit,
- if it is still pending.
-
- One problem with this is that if the waiter does not reach its own
- commit state before this group commit starts, and then the group
- commit fails (binlog write failure), we do not get to propagate
- the error to the waiter.
-
- A solution for this could be to delay the wakeup until commit is
- successful. But then we need to set a flag in the waitee that it is
- already queued for group commit, so that the waiter can check this
- flag and queue itself if it _does_ reach the commit state in time.
-
- (But error handling in case of binlog write failure is currently
- broken in other ways, as well).
- */
- if (&wakeup_list->next_subsequent_commit == wakeup_next_ptr)
+ wait_for_commit *next_waiter= waiter->next_subsequent_commit;
+ group_commit_entry *entry2=
+ (group_commit_entry *)waiter->opaque_pointer;
+ if (entry2)
+ {
+ /*
+ This is another transaction ready to be written to the binary
+ log. We can put it into the queue directly, without needing a
+ separate context switch to the other thread. We just set a flag
+ so that the other thread will know when it wakes up that it was
+ already processed.
+
+ So remove it from the list of our waiters, and instead put it at
+ the end of the list to be processed in a subsequent iteration of
+ the outer loop.
+ */
+ *waiter_ptr= next_waiter;
+ entry2->queued_by_other= true;
+ last->next= entry2;
+ last= entry2;
+ /*
+ As a small optimisation, we do not actually need to set
+ entry2->next to NULL, as we can use the pointer `last' to check
+ for end-of-list.
+ */
+ }
+ else
{
- /* The last one in the list. */
- wakeup_list->wakeup(0);
- break;
+ /*
+ This transaction is not ready to participate in the group commit
+ yet, so leave it in the waiter list. It might join the group
+ commit later, if it completes soon enough to do so (it will see
+ our wfc->commit_started flag set), or it might commit later in a
+ later group commit.
+ */
+ waiter_ptr= &waiter->next_subsequent_commit;
}
- /*
- Important: don't access wakeup_list->next after the wakeup() call,
- it may be invalidated by the other thread.
- */
- next= wakeup_list->next_subsequent_commit;
- wakeup_list->wakeup(0);
- wakeup_list= next;
+ waiter= next_waiter;
}
- /*
- We need a full memory barrier between walking the list and clearing
- the flag wakeup_subsequent_commits_running. This barrier is needed
- to ensure that no other thread will start to modify the list
- pointers before we are done traversing the list.
-
- But wait_for_commit::wakeup(), which was called above, does a full
- memory barrier already (it locks a mutex).
- */
- cur->wakeup_subsequent_commits_running= false;
- }
- else
mysql_mutex_unlock(&cur->LOCK_wait_commit);
+ }
}
- if (cur == last)
+
+ /* Add the entry to the group commit queue. */
+ next_entry= entry->next;
+ entry->next= group_commit_queue;
+ group_commit_queue= entry;
+ if (entry == last)
break;
/*
Move to the next entry in the flattened list of waiting transactions
that still need to be processed transitively.
*/
- cur= cur->next_subsequent_commit;
- entry= (group_commit_entry *)cur->opaque_pointer;
+ entry= next_entry;
DBUG_ASSERT(entry != NULL);
+ cur= entry->thd->wait_for_commit_ptr;
}
if (opt_binlog_commit_wait_count > 0)
@@ -7147,6 +7133,7 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
DEBUG_SYNC(entry->thd, "commit_after_group_run_commit_ordered");
}
mysql_mutex_unlock(&LOCK_commit_ordered);
+ entry->thd->wakeup_subsequent_commits(entry->error);
if (next)
{
@@ -7180,7 +7167,7 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
}
if (likely(!entry->error))
- return 0;
+ return entry->thd->wait_for_prior_commit();
switch (entry->error)
{
@@ -7238,10 +7225,15 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
LINT_INIT(binlog_id);
{
+ DBUG_EXECUTE_IF("inject_binlog_commit_before_get_LOCK_log",
+ DBUG_ASSERT(!debug_sync_set_action(leader->thd, STRING_WITH_LEN
+ ("commit_before_get_LOCK_log SIGNAL waiting WAIT_FOR cont TIMEOUT 1")));
+ );
/*
Lock the LOCK_log(), and once we get it, collect any additional writes
that queued up while we were waiting.
*/
+ DEBUG_SYNC(leader->thd, "commit_before_get_LOCK_log");
mysql_mutex_lock(&LOCK_log);
DEBUG_SYNC(leader->thd, "commit_after_get_LOCK_log");
@@ -7448,6 +7440,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
if (current->cache_mngr->using_xa && !current->error &&
DBUG_EVALUATE_IF("skip_commit_ordered", 0, 1))
run_commit_ordered(current->thd, current->all);
+ current->thd->wakeup_subsequent_commits(current->error);
/*
Careful not to access current->next after waking up the other thread! As
@@ -7488,7 +7481,6 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry,
write_cache(entry->thd, mngr->get_binlog_cache_log(FALSE)))
{
entry->error_cache= &mngr->stmt_cache.cache_log;
- entry->commit_errno= errno;
DBUG_RETURN(ER_ERROR_ON_WRITE);
}
@@ -7509,7 +7501,6 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry,
if (write_cache(entry->thd, mngr->get_binlog_cache_log(TRUE)))
{
entry->error_cache= &mngr->trx_cache.cache_log;
- entry->commit_errno= errno;
DBUG_RETURN(ER_ERROR_ON_WRITE);
}
}
@@ -7517,14 +7508,13 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry,
DBUG_EXECUTE_IF("inject_error_writing_xid",
{
entry->error_cache= NULL;
- entry->commit_errno= 28;
+ errno= 28;
DBUG_RETURN(ER_ERROR_ON_WRITE);
});
if (entry->end_event->write(&log_file))
{
entry->error_cache= NULL;
- entry->commit_errno= errno;
DBUG_RETURN(ER_ERROR_ON_WRITE);
}
status_var_add(entry->thd->status_var.binlog_bytes_written,
@@ -7535,7 +7525,6 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry,
if (entry->incident_event->write(&log_file))
{
entry->error_cache= NULL;
- entry->commit_errno= errno;
DBUG_RETURN(ER_ERROR_ON_WRITE);
}
}
@@ -7543,13 +7532,11 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry,
if (mngr->get_binlog_cache_log(FALSE)->error) // Error on read
{
entry->error_cache= &mngr->stmt_cache.cache_log;
- entry->commit_errno= errno;
DBUG_RETURN(ER_ERROR_ON_WRITE);
}
if (mngr->get_binlog_cache_log(TRUE)->error) // Error on read
{
entry->error_cache= &mngr->trx_cache.cache_log;
- entry->commit_errno= errno;
DBUG_RETURN(ER_ERROR_ON_WRITE);
}