diff options
Diffstat (limited to 'sql/log.cc')
-rw-r--r-- | sql/log.cc | 602 |
1 files changed, 476 insertions, 126 deletions
diff --git a/sql/log.cc b/sql/log.cc index 430f0a0bf60..50c66135f24 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -479,7 +479,14 @@ public: */ bool using_xa; my_xid xa_xid; - ulong cookie; + bool need_unlog; + /* + Id of binlog that transaction was written to; only needed if need_unlog is + true. + */ + ulong binlog_id; + /* Set if we get an error during commit that must be returned from unlog(). */ + bool delayed_error; private: @@ -1678,8 +1685,7 @@ binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr, So there is no work to do. Therefore, we will not increment any XID count, so we must not decrement any XID count in unlog(). */ - if (cache_mngr->using_xa && cache_mngr->xa_xid) - cache_mngr->cookie= BINLOG_COOKIE_DUMMY; + cache_mngr->need_unlog= 0; } cache_mngr->reset(using_stmt, using_trx); @@ -2904,16 +2910,16 @@ const char *MYSQL_LOG::generate_name(const char *log_name, MYSQL_BIN_LOG::MYSQL_BIN_LOG(uint *sync_period) - :current_binlog_id(BINLOG_COOKIE_START), reset_master_pending(false), + :reset_master_pending(false), bytes_written(0), file_id(1), open_count(1), - need_start_event(TRUE), group_commit_queue(0), group_commit_queue_busy(FALSE), num_commits(0), num_group_commits(0), sync_period_ptr(sync_period), sync_counter(0), is_relay_log(0), signal_cnt(0), checksum_alg_reset(BINLOG_CHECKSUM_ALG_UNDEF), relay_log_checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF), - description_event_for_exec(0), description_event_for_queue(0) + description_event_for_exec(0), description_event_for_queue(0), + current_binlog_id(0) { /* We don't want to initialize locks here as such initialization depends on @@ -2963,10 +2969,9 @@ void MYSQL_BIN_LOG::cleanup() /* Init binlog-specific vars */ -void MYSQL_BIN_LOG::init(bool no_auto_events_arg, ulong max_size_arg) +void MYSQL_BIN_LOG::init(ulong max_size_arg) { DBUG_ENTER("MYSQL_BIN_LOG::init"); - no_auto_events= no_auto_events_arg; max_size= max_size_arg; DBUG_PRINT("info",("max_size: %lu", max_size)); DBUG_VOID_RETURN; @@ -3070,12 +3075,12 @@ bool MYSQL_BIN_LOG::open(const char *log_name, enum_log_type log_type_arg, const char *new_name, enum cache_type io_cache_type_arg, - bool no_auto_events_arg, ulong max_size_arg, bool null_created_arg, bool need_mutex) { File file= -1; + xid_count_per_binlog *new_xid_list_entry= NULL, *b; DBUG_ENTER("MYSQL_BIN_LOG::open"); DBUG_PRINT("enter",("log_type: %d",(int) log_type_arg)); @@ -3131,7 +3136,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name, DBUG_RETURN(1); /* all warnings issued */ } - init(no_auto_events_arg, max_size_arg); + init(max_size_arg); open_count++; @@ -3155,11 +3160,10 @@ bool MYSQL_BIN_LOG::open(const char *log_name, write_file_name_to_index_file= 1; } - if (need_start_event && !no_auto_events) { /* - In 4.x we set need_start_event=0 here, but in 5.0 we want a Start event - even if this is not the very first binlog. + In 4.x we put Start event only in the first binlog. But from 5.0 we + want a Start event even if this is not the very first binlog. */ Format_description_log_event s(BINLOG_VERSION); /* @@ -3191,42 +3195,51 @@ bool MYSQL_BIN_LOG::open(const char *log_name, { char buf[FN_REFLEN]; /* - Put this one into the list of active binlogs. + Construct an entry in the binlog_xid_count_list for the new binlog + file (we will not link it into the list until we know the new file + is successfully created; otherwise we would have to remove it again + if creation failed, which gets tricky since other threads may have + seen the entry in the meantime - and we do not want to hold + LOCK_xid_list for long periods of time). + Write the current binlog checkpoint into the log, so XA recovery will know from where to start recovery. */ uint off= dirname_length(log_file_name); uint len= strlen(log_file_name) - off; char *entry_mem, *name_mem; - xid_count_per_binlog *b, *b2; - if (!(b = (xid_count_per_binlog *) + if (!(new_xid_list_entry = (xid_count_per_binlog *) my_multi_malloc(MYF(MY_WME), &entry_mem, sizeof(xid_count_per_binlog), &name_mem, len, NULL))) goto err; memcpy(name_mem, log_file_name+off, len); - b->binlog_name= name_mem; - b->binlog_name_len= len; - b->xid_count= 0; - - mysql_mutex_lock(&LOCK_xid_list); - b->binlog_id= ++current_binlog_id; + new_xid_list_entry->binlog_name= name_mem; + new_xid_list_entry->binlog_name_len= len; + new_xid_list_entry->xid_count= 0; /* - Remove any initial entries with no pending XIDs. - Normally this will be done in unlog(), but if there are no - transactions with an XA-capable engine at all in a given binlog - file, unlog() will never be used and we will remove the entry here. - */ - while ((b2= binlog_xid_count_list.head()) && b2->xid_count == 0) - my_free(binlog_xid_count_list.get()); + Find the name for the Initial binlog checkpoint. - binlog_xid_count_list.push_back(b); - b2= binlog_xid_count_list.head(); - strmake(buf, b2->binlog_name, b2->binlog_name_len); + Normally this will just be the first entry, as we delete entries + when their count drops to zero. But we scan the list to handle any + corner case, eg. for the first binlog file opened after startup, the + list will be empty. + */ + mysql_mutex_lock(&LOCK_xid_list); + I_List_iterator<xid_count_per_binlog> it(binlog_xid_count_list); + while ((b= it++) && b->xid_count == 0) + ; mysql_mutex_unlock(&LOCK_xid_list); + if (!b) + b= new_xid_list_entry; + strmake(buf, b->binlog_name, b->binlog_name_len); Binlog_checkpoint_log_event ev(buf, len); + DBUG_EXECUTE_IF("crash_before_write_checkpoint_event", + flush_io_cache(&log_file); + mysql_file_sync(log_file.file, MYF(MY_WME)); + DBUG_SUICIDE();); if (ev.write(&log_file)) goto err; bytes_written+= ev.data_written; @@ -3302,6 +3315,23 @@ bool MYSQL_BIN_LOG::open(const char *log_name, #endif } } + + if (!is_relay_log) + { + /* + Now the file was created successfully, so we can link in the entry for + the new binlog file in binlog_xid_count_list. + */ + mysql_mutex_lock(&LOCK_xid_list); + ++current_binlog_id; + new_xid_list_entry->binlog_id= current_binlog_id; + /* Remove any initial entries with no pending XIDs. */ + while ((b= binlog_xid_count_list.head()) && b->xid_count == 0) + my_free(binlog_xid_count_list.get()); + binlog_xid_count_list.push_back(new_xid_list_entry); + mysql_mutex_unlock(&LOCK_xid_list); + } + log_state= LOG_OPENED; #ifdef HAVE_REPLICATION @@ -3320,6 +3350,8 @@ err: Turning logging off for the whole duration of the MySQL server process. \ To turn it on again: fix the cause, \ shutdown the MySQL server and restart it.", name, errno); + if (new_xid_list_entry) + my_free(new_xid_list_entry); if (file >= 0) mysql_file_close(file, MYF(0)); close(LOG_CLOSE_INDEX); @@ -3599,12 +3631,40 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd) if (!is_relay_log) { /* + Mark that a RESET MASTER is in progress. + This ensures that a binlog checkpoint will not try to write binlog + checkpoint events, which would be useless (as we are deleting the binlog + anyway) and could deadlock, as we are holding LOCK_log. + */ + mysql_mutex_lock(&LOCK_xid_list); + reset_master_pending= true; + mysql_mutex_unlock(&LOCK_xid_list); + + /* We are going to nuke all binary log files. - So first wait until all pending binlog checkpoints have completed. + Without binlog, we cannot XA recover prepared-but-not-committed + transactions in engines. So force a commit checkpoint first. + + Note that we take and immediately release LOCK_commit_ordered. This has + the effect to ensure that any on-going group commit (in + trx_group_commit_leader()) has completed before we request the checkpoint, + due to the chaining of LOCK_log and LOCK_commit_ordered in that function. + (We are holding LOCK_log, so no new group commit can start). + + Without this, it is possible (though perhaps unlikely) that the RESET + MASTER could run in-between the write to the binlog and the + commit_ordered() in the engine of some transaction, and then a crash + later would leave such transaction not recoverable. */ + mysql_mutex_lock(&LOCK_commit_ordered); + mysql_mutex_unlock(&LOCK_commit_ordered); + + mark_xids_active(current_binlog_id, 1); + do_checkpoint_request(current_binlog_id); + + /* Now wait for all checkpoint requests and pending unlog() to complete. */ mysql_mutex_lock(&LOCK_xid_list); xid_count_per_binlog *b; - reset_master_pending= true; for (;;) { I_List_iterator<xid_count_per_binlog> it(binlog_xid_count_list); @@ -3626,9 +3686,6 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd) Now all XIDs are fully flushed to disk, and we are holding LOCK_log so no new ones will be written. So we can proceed to delete the logs. */ - while ((b= binlog_xid_count_list.get())) - my_free(b); - reset_master_pending= false; mysql_mutex_unlock(&LOCK_xid_list); } @@ -3722,10 +3779,8 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd) goto err; } } - if (!thd->slave_thread) - need_start_event=1; if (!open_index_file(index_file_name, 0, FALSE)) - if ((error= open(save_name, log_type, 0, io_cache_type, no_auto_events, max_size, 0, FALSE))) + if ((error= open(save_name, log_type, 0, io_cache_type, max_size, 0, FALSE))) goto err; my_free((void *) save_name); @@ -3733,6 +3788,31 @@ err: if (error == 1) name= const_cast<char*>(save_name); mysql_mutex_unlock(&LOCK_thread_count); + + if (!is_relay_log) + { + xid_count_per_binlog *b; + /* + Remove all entries in the xid_count list except the last. + Normally we will just be deleting all the entries that we waited for to + drop to zero above. But if we fail during RESET MASTER for some reason + then we will not have created any new log file, and we may keep the last + of the old entries. + */ + mysql_mutex_lock(&LOCK_xid_list); + for (;;) + { + b= binlog_xid_count_list.head(); + DBUG_ASSERT(b /* List can never become empty. */); + if (b->binlog_id == current_binlog_id) + break; + DBUG_ASSERT(b->xid_count == 0); + my_free(binlog_xid_count_list.get()); + } + reset_master_pending= false; + mysql_mutex_unlock(&LOCK_xid_list); + } + mysql_mutex_unlock(&LOCK_index); mysql_mutex_unlock(&LOCK_log); DBUG_RETURN(error); @@ -4476,7 +4556,6 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock) if (log_type == LOG_BIN) { - if (!no_auto_events) { /* We log the whole file name for log file as the user may decide @@ -4551,7 +4630,7 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock) /* reopen the binary log file. */ file_to_open= new_name_ptr; error= open(old_name, log_type, new_name_ptr, io_cache_type, - no_auto_events, max_size, 1, FALSE); + max_size, 1, FALSE); } /* handle reopening errors */ @@ -5176,6 +5255,8 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) bool is_trans_cache= FALSE; bool using_trans= event_info->use_trans_cache(); bool direct= event_info->use_direct_logging(); + ulong prev_binlog_id; + LINT_INIT(prev_binlog_id); if (thd->binlog_evt_union.do_union) { @@ -5227,6 +5308,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) file= &log_file; my_org_b_tell= my_b_tell(file); mysql_mutex_lock(&LOCK_log); + prev_binlog_id= current_binlog_id; } else { @@ -5372,7 +5454,7 @@ err: mysql_mutex_unlock(&LOCK_log); if (check_purge) - purge(); + checkpoint_and_purge(prev_binlog_id); } if (error) @@ -5457,6 +5539,64 @@ bool general_log_write(THD *thd, enum enum_server_command command, return FALSE; } + +/* + I would like to make this function static, but this causes compiler warnings + when it is declared as friend function in log.h. +*/ +void +binlog_checkpoint_callback(void *cookie) +{ + MYSQL_BIN_LOG::xid_count_per_binlog *entry= + (MYSQL_BIN_LOG::xid_count_per_binlog *)cookie; + /* + For every supporting engine, we increment the xid_count and issue a + commit_checkpoint_request(). Then we can count when all + commit_checkpoint_notify() callbacks have occured, and then log a new + binlog checkpoint event. + */ + mysql_bin_log.mark_xids_active(entry->binlog_id, 1); +} + + +/* + Request a commit checkpoint from each supporting engine. + This must be called after each binlog rotate, and after LOCK_log has been + released. The xid_count value in the xid_count_per_binlog entry was + incremented by 1 and will be decremented in this function; this ensures + that the entry will not go away early despite LOCK_log not being held. +*/ +void +MYSQL_BIN_LOG::do_checkpoint_request(ulong binlog_id) +{ + xid_count_per_binlog *entry; + + /* + Find the binlog entry, and invoke commit_checkpoint_request() on it in + each supporting storage engine. + */ + mysql_mutex_lock(&LOCK_xid_list); + I_List_iterator<xid_count_per_binlog> it(binlog_xid_count_list); + do { + entry= it++; + DBUG_ASSERT(entry /* binlog_id is always somewhere in the list. */); + } while (entry->binlog_id != binlog_id); + mysql_mutex_unlock(&LOCK_xid_list); + + ha_commit_checkpoint_request(entry, binlog_checkpoint_callback); + /* + When we rotated the binlog, we incremented xid_count to make sure the + entry would not go away until this point, where we have done all necessary + commit_checkpoint_request() calls. + So now we can (and must) decrease the count - when it reaches zero, we + will know that both all pending unlog() and all pending + commit_checkpoint_notify() calls are done, and we can log a new binlog + checkpoint. + */ + mark_xid_done(binlog_id, true); +} + + /** The method executes rotation when LOCK_log is already acquired by the caller. @@ -5465,6 +5605,15 @@ bool general_log_write(THD *thd, enum enum_server_command command, @param check_purge is set to true if rotation took place @note + Caller _must_ check the check_purge variable. If this is set, it means + that the binlog was rotated, and caller _must_ ensure that + do_checkpoint_request() is called later with the binlog_id of the rotated + binlog file. The call to do_checkpoint_request() must happen after + LOCK_log is released (which is why we cannot simply do it here). + Usually, checkpoint_and_purge() is appropriate, as it will both handle + the checkpointing and any needed purging of old logs. + + @note If rotation fails, for instance the server was unable to create a new log file, we still try to write an incident event to the current log. @@ -5482,7 +5631,27 @@ int MYSQL_BIN_LOG::rotate(bool force_rotate, bool* check_purge) if (force_rotate || (my_b_tell(&log_file) >= (my_off_t) max_size)) { + ulong binlog_id= current_binlog_id; + /* + We rotate the binlog, so we need to start a commit checkpoint in all + supporting engines - when it finishes, we can log a new binlog checkpoint + event. + + But we cannot start the checkpoint here - there could be a group commit + still in progress which needs to be included in the checkpoint, and + besides we do not want to do the (possibly expensive) checkpoint while + LOCK_log is held. + + On the other hand, we must be sure that the xid_count entry for the + previous log does not go away until we start the checkpoint - which it + could do as it is no longer the most recent. So we increment xid_count + (to count the pending checkpoint request) - this will fix the entry in + place until we decrement again in do_checkpoint_request(). + */ + mark_xids_active(binlog_id, 1); + if ((error= new_file_without_locking())) + { /** Be conservative... There are possible lost events (eg, failing to log the Execute_load_query_log_event @@ -5495,7 +5664,14 @@ int MYSQL_BIN_LOG::rotate(bool force_rotate, bool* check_purge) if (!write_incident_already_locked(current_thd)) flush_and_sync(0); - *check_purge= true; + /* + We failed to rotate - so we have to decrement the xid_count back that + we incremented before attempting the rotate. + */ + mark_xid_done(binlog_id, false); + } + else + *check_purge= true; } DBUG_RETURN(error); } @@ -5523,6 +5699,13 @@ void MYSQL_BIN_LOG::purge() #endif } + +void MYSQL_BIN_LOG::checkpoint_and_purge(ulong binlog_id) +{ + do_checkpoint_request(binlog_id); + purge(); +} + /** The method is a shortcut of @c rotate() and @c purge(). LOCK_log is acquired prior to rotate and is released after it. @@ -5535,11 +5718,13 @@ void MYSQL_BIN_LOG::purge() int MYSQL_BIN_LOG::rotate_and_purge(bool force_rotate) { int error= 0; + ulong prev_binlog_id; DBUG_ENTER("MYSQL_BIN_LOG::rotate_and_purge"); bool check_purge= false; //todo: fix the macro def and restore safe_mutex_assert_not_owner(&LOCK_log); mysql_mutex_lock(&LOCK_log); + prev_binlog_id= current_binlog_id; if ((error= rotate(force_rotate, &check_purge))) check_purge= false; /* @@ -5549,7 +5734,7 @@ int MYSQL_BIN_LOG::rotate_and_purge(bool force_rotate) mysql_mutex_unlock(&LOCK_log); if (check_purge) - purge(); + checkpoint_and_purge(prev_binlog_id); DBUG_RETURN(error); } @@ -5880,11 +6065,13 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd) uint error= 0; my_off_t offset; bool check_purge= false; + ulong prev_binlog_id; DBUG_ENTER("MYSQL_BIN_LOG::write_incident"); mysql_mutex_lock(&LOCK_log); if (likely(is_open())) { + prev_binlog_id= current_binlog_id; if (!(error= write_incident_already_locked(thd)) && !(error= flush_and_sync(0))) { @@ -5904,7 +6091,7 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd) mysql_mutex_unlock(&LOCK_log); if (check_purge) - purge(); + checkpoint_and_purge(prev_binlog_id); } DBUG_RETURN(error); @@ -5914,6 +6101,7 @@ void MYSQL_BIN_LOG::write_binlog_checkpoint_event_already_locked(const char *name, uint len) { + my_off_t offset; Binlog_checkpoint_log_event ev(name, len); /* Note that we must sync the binlog checkpoint to disk. @@ -5922,22 +6110,29 @@ MYSQL_BIN_LOG::write_binlog_checkpoint_event_already_locked(const char *name, */ if (!ev.write(&log_file) && !flush_and_sync(0)) { - bool check_purge= false; signal_update(); - rotate(false, &check_purge); - if (check_purge) - purge(); - return; + } + else + { + /* + If we fail to write the checkpoint event, something is probably really + bad with the binlog. We complain in the error log. + + Note that failure to write binlog checkpoint does not compromise the + ability to do crash recovery - crash recovery will just have to scan a + bit more of the binlog than strictly necessary. + */ + sql_print_error("Failed to write binlog checkpoint event to binary log\n"); } + offset= my_b_tell(&log_file); /* - If we fail to write the checkpoint event, something is probably really - bad with the binlog. We complain in the error log. - Note that failure to write binlog checkpoint does not compromise the - ability to do crash recovery - crash recovery will just have to scan a - bit more of the binlog than strictly necessary. + Take mutex to protect against a reader seeing partial writes of 64-bit + offset on 32-bit CPUs. */ - sql_print_error("Failed to write binlog checkpoint event to binary log\n"); + mysql_mutex_lock(&LOCK_commit_ordered); + last_commit_pos_offset= offset; + mysql_mutex_unlock(&LOCK_commit_ordered); } @@ -5973,6 +6168,7 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, bool using_trx_cache) { group_commit_entry entry; + Ha_trx_info *ha_info; DBUG_ENTER("MYSQL_BIN_LOG::write_transaction_to_binlog"); entry.thd= thd; @@ -5981,6 +6177,15 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, entry.all= all; entry.using_stmt_cache= using_stmt_cache; entry.using_trx_cache= using_trx_cache; + entry.need_unlog= false; + ha_info= all ? thd->transaction.all.ha_list : thd->transaction.stmt.ha_list; + for (; ha_info; ha_info= ha_info->next()) + { + if (ha_info->is_started() && ha_info->ht() != binlog_hton && + !ha_info->ht()->commit_checkpoint_request) + entry.need_unlog= true; + break; + } /* Log "BEGIN" at the beginning of every transaction. Here, a transaction is @@ -6069,6 +6274,18 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) { next->thd->signal_wakeup_ready(); } + else + { + /* + If we rotated the binlog, and if we are using the unoptimized thread + scheduling where every thread runs its own commit_ordered(), then we + must do the commit checkpoint and log purge here, after all + commit_ordered() calls have finished, and locks have been released. + */ + if (entry->check_purge) + checkpoint_and_purge(entry->binlog_id); + } + } if (likely(!entry->error)) @@ -6099,8 +6316,9 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) we need to mark it as not needed for recovery (unlog() is not called for a transaction if log_xid() fails). */ - if (entry->cache_mngr->using_xa && entry->cache_mngr->xa_xid) - mark_xid_done(entry->cache_mngr->cookie); + if (entry->cache_mngr->using_xa && entry->cache_mngr->xa_xid && + entry->cache_mngr->need_unlog) + mark_xid_done(entry->cache_mngr->binlog_id, true); return 1; } @@ -6120,10 +6338,12 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) { uint xid_count= 0; my_off_t UNINIT_VAR(commit_offset); - group_commit_entry *current; + group_commit_entry *current, *last_in_queue; group_commit_entry *queue= NULL; bool check_purge= false; + ulong binlog_id; DBUG_ENTER("MYSQL_BIN_LOG::trx_group_commit_leader"); + LINT_INIT(binlog_id); DBUG_ASSERT(is_open()); if (likely(is_open())) // Should always be true @@ -6134,6 +6354,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) */ mysql_mutex_lock(&LOCK_log); DEBUG_SYNC(leader->thd, "commit_after_get_LOCK_log"); + binlog_id= current_binlog_id; mysql_mutex_lock(&LOCK_prepare_ordered); current= group_commit_queue; @@ -6141,6 +6362,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) mysql_mutex_unlock(&LOCK_prepare_ordered); /* As the queue is in reverse order of entering, reverse it. */ + last_in_queue= current; while (current) { group_commit_entry *next= current->next; @@ -6180,8 +6402,22 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) cache_mngr->last_commit_pos_offset= commit_offset; if (cache_mngr->using_xa && cache_mngr->xa_xid) { - xid_count++; - cache_mngr->cookie= current_binlog_id; + /* + If all storage engines support commit_checkpoint_request(), then we + do not need to keep track of when this XID is durably committed. + Instead we will just ask the storage engine to durably commit all its + XIDs when we rotate a binlog file. + */ + if (current->need_unlog) + { + xid_count++; + cache_mngr->need_unlog= true; + cache_mngr->binlog_id= binlog_id; + } + else + cache_mngr->need_unlog= false; + + cache_mngr->delayed_error= false; } } @@ -6232,21 +6468,27 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) */ if (xid_count > 0) { - mark_xids_active(current_binlog_id, xid_count); + mark_xids_active(binlog_id, xid_count); } - else + + if (rotate(false, &check_purge)) { - if (rotate(false, &check_purge)) - { - /* - If we fail to rotate, which thread should get the error? - We give the error to the leader, as any my_error() thrown inside - rotate() will have been registered for the leader THD. - */ - leader->error= ER_ERROR_ON_WRITE; - leader->commit_errno= errno; - check_purge= false; - } + /* + If we fail to rotate, which thread should get the error? + We give the error to the leader, as any my_error() thrown inside + rotate() will have been registered for the leader THD. + + However we must not return error from here - that would cause + ha_commit_trans() to abort and rollback the transaction, which would + leave an inconsistent state with the transaction committed in the + binlog but rolled back in the engine. + + Instead set a flag so that we can return error later, from unlog(), + when the transaction has been safely committed in the engine. + */ + leader->cache_mngr->delayed_error= true; + my_error(ER_ERROR_ON_WRITE, MYF(ME_NOREFRESH), name, errno); + check_purge= false; } } @@ -6278,6 +6520,15 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) mysql_cond_wait(&COND_queue_busy, &LOCK_commit_ordered); group_commit_queue_busy= TRUE; + /* + Set these so parent can run checkpoint_and_purge() in last thread. + (When using optimized thread scheduling, we run checkpoint_and_purge() + in this function, so parent does not need to and we need not set these + values). + */ + last_in_queue->check_purge= check_purge; + last_in_queue->binlog_id= binlog_id; + /* Note that we return with LOCK_commit_ordered locked! */ DBUG_VOID_RETURN; } @@ -6308,9 +6559,10 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) } DEBUG_SYNC(leader->thd, "commit_after_group_run_commit_ordered"); mysql_mutex_unlock(&LOCK_commit_ordered); + DEBUG_SYNC(leader->thd, "commit_after_group_release_commit_ordered"); if (check_purge) - purge(); + checkpoint_and_purge(binlog_id); DBUG_VOID_RETURN; } @@ -6470,7 +6722,7 @@ void MYSQL_BIN_LOG::close(uint exiting) if (log_state == LOG_OPENED) { #ifdef HAVE_REPLICATION - if (log_type == LOG_BIN && !no_auto_events && + if (log_type == LOG_BIN && (exiting & LOG_CLOSE_STOP_EVENT)) { Stop_log_event s; @@ -7104,6 +7356,8 @@ int TC_LOG_MMAP::open(const char *opt_name) mysql_mutex_init(key_LOCK_sync, &LOCK_sync, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_LOCK_active, &LOCK_active, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_LOCK_pool, &LOCK_pool, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_LOCK_pending_checkpoint, &LOCK_pending_checkpoint, + MY_MUTEX_INIT_FAST); mysql_cond_init(key_COND_active, &COND_active, 0); mysql_cond_init(key_COND_pool, &COND_pool, 0); mysql_cond_init(key_TC_LOG_MMAP_COND_queue_busy, &COND_queue_busy, 0); @@ -7356,17 +7610,93 @@ int TC_LOG_MMAP::sync() return err; } +static void +mmap_do_checkpoint_callback(void *data) +{ + TC_LOG_MMAP::pending_cookies *pending= + static_cast<TC_LOG_MMAP::pending_cookies *>(data); + ++pending->pending_count; +} + +int TC_LOG_MMAP::unlog(ulong cookie, my_xid xid) +{ + pending_cookies *full_buffer= NULL; + DBUG_ASSERT(*(my_xid *)(data+cookie) == xid); + + /* + Do not delete the entry immediately, as there may be participating storage + engines which implement commit_checkpoint_request(), and thus have not yet + flushed the commit durably to disk. + + Instead put it in a queue - and periodically, we will request a checkpoint + from all engines and delete a whole batch at once. + */ + mysql_mutex_lock(&LOCK_pending_checkpoint); + if (pending_checkpoint == NULL) + { + uint32 size= sizeof(*pending_checkpoint); + if (!(pending_checkpoint= + (pending_cookies *)my_malloc(size, MYF(MY_ZEROFILL)))) + { + my_error(ER_OUTOFMEMORY, MYF(0), size); + mysql_mutex_unlock(&LOCK_pending_checkpoint); + return 1; + } + } + + pending_checkpoint->cookies[pending_checkpoint->count++]= cookie; + if (pending_checkpoint->count == sizeof(pending_checkpoint->cookies) / + sizeof(pending_checkpoint->cookies[0])) + { + full_buffer= pending_checkpoint; + pending_checkpoint= NULL; + } + mysql_mutex_unlock(&LOCK_pending_checkpoint); + + if (full_buffer) + { + /* + We do an extra increment and notify here - this ensures that + things work also if there are no engines at all that support + commit_checkpoint_request. + */ + ++full_buffer->pending_count; + ha_commit_checkpoint_request(full_buffer, mmap_do_checkpoint_callback); + commit_checkpoint_notify(full_buffer); + } + return 0; +} + + +void +TC_LOG_MMAP::commit_checkpoint_notify(void *cookie) +{ + uint count; + pending_cookies *pending= static_cast<pending_cookies *>(cookie); + mysql_mutex_lock(&LOCK_pending_checkpoint); + DBUG_ASSERT(pending->pending_count > 0); + count= --pending->pending_count; + mysql_mutex_unlock(&LOCK_pending_checkpoint); + if (count == 0) + { + uint i; + for (i= 0; i < sizeof(pending->cookies)/sizeof(pending->cookies[0]); ++i) + delete_entry(pending->cookies[i]); + my_free(pending); + } +} + + /** erase xid from the page, update page free space counters/pointers. cookie points directly to the memory where xid was logged. */ -int TC_LOG_MMAP::unlog(ulong cookie, my_xid xid) +int TC_LOG_MMAP::delete_entry(ulong cookie) { PAGE *p=pages+(cookie/tc_log_page_size); my_xid *x=(my_xid *)(data+cookie); - DBUG_ASSERT(*x == xid); DBUG_ASSERT(x >= p->start && x < p->end); mysql_mutex_lock(&p->lock); @@ -7390,6 +7720,7 @@ void TC_LOG_MMAP::close() mysql_mutex_destroy(&LOCK_sync); mysql_mutex_destroy(&LOCK_active); mysql_mutex_destroy(&LOCK_pool); + mysql_mutex_destroy(&LOCK_pending_checkpoint); mysql_cond_destroy(&COND_pool); mysql_cond_destroy(&COND_active); mysql_cond_destroy(&COND_queue_busy); @@ -7412,9 +7743,12 @@ void TC_LOG_MMAP::close() } if (inited>=5) // cannot do in the switch because of Windows mysql_file_delete(key_file_tclog, logname, MYF(MY_WME)); + if (pending_checkpoint) + my_free(pending_checkpoint); inited=0; } + int TC_LOG_MMAP::recover() { HASH xids; @@ -7518,7 +7852,7 @@ int TC_LOG_BINLOG::open(const char *opt_name) if (using_heuristic_recover()) { /* generate a new binlog to mask a corrupted one */ - open(opt_name, LOG_BIN, 0, WRITE_CACHE, 0, max_binlog_size, 0, TRUE); + open(opt_name, LOG_BIN, 0, WRITE_CACHE, max_binlog_size, 0, TRUE); cleanup(); return 1; } @@ -7606,9 +7940,6 @@ TC_LOG_BINLOG::log_and_order(THD *thd, my_xid xid, bool all, cache_mngr->using_xa= TRUE; cache_mngr->xa_xid= xid; -#ifndef DBUG_OFF - cache_mngr->cookie= 0; -#endif err= binlog_commit_flush_xid_caches(thd, cache_mngr, all, xid); DEBUG_SYNC(thd, "binlog_after_log_and_order"); @@ -7619,10 +7950,11 @@ TC_LOG_BINLOG::log_and_order(THD *thd, my_xid xid, bool all, If using explicit user XA, we will not have XID. We must still return a non-zero cookie (as zero cookie signals error). */ - if (!xid) - DBUG_RETURN(BINLOG_COOKIE_DUMMY); - DBUG_ASSERT(cache_mngr->cookie != 0); - DBUG_RETURN(cache_mngr->cookie); + if (!xid || !cache_mngr->need_unlog) + DBUG_RETURN(BINLOG_COOKIE_DUMMY(cache_mngr->delayed_error)); + else + DBUG_RETURN(BINLOG_COOKIE_MAKE(cache_mngr->binlog_id, + cache_mngr->delayed_error)); } /* @@ -7637,19 +7969,18 @@ TC_LOG_BINLOG::log_and_order(THD *thd, my_xid xid, bool all, binary log. */ void -TC_LOG_BINLOG::mark_xids_active(ulong cookie, uint xid_count) +TC_LOG_BINLOG::mark_xids_active(ulong binlog_id, uint xid_count) { xid_count_per_binlog *b; DBUG_ENTER("TC_LOG_BINLOG::mark_xids_active"); - DBUG_PRINT("info", ("cookie=%lu xid_count=%u", cookie, xid_count)); - DBUG_ASSERT(cookie != 0 && cookie != BINLOG_COOKIE_DUMMY); + DBUG_PRINT("info", ("binlog_id=%lu xid_count=%u", binlog_id, xid_count)); mysql_mutex_lock(&LOCK_xid_list); I_List_iterator<xid_count_per_binlog> it(binlog_xid_count_list); while ((b= it++)) { - if (b->binlog_id == cookie) + if (b->binlog_id == binlog_id) { b->xid_count += xid_count; break; @@ -7675,15 +8006,13 @@ TC_LOG_BINLOG::mark_xids_active(ulong cookie, uint xid_count) checkpoint. */ void -TC_LOG_BINLOG::mark_xid_done(ulong cookie) +TC_LOG_BINLOG::mark_xid_done(ulong binlog_id, bool write_checkpoint) { xid_count_per_binlog *b; bool first; ulong current; DBUG_ENTER("TC_LOG_BINLOG::mark_xid_done"); - if (cookie == BINLOG_COOKIE_DUMMY) - DBUG_VOID_RETURN; /* Nothing to do. */ mysql_mutex_lock(&LOCK_xid_list); current= current_binlog_id; @@ -7691,7 +8020,7 @@ TC_LOG_BINLOG::mark_xid_done(ulong cookie) first= true; while ((b= it++)) { - if (b->binlog_id == cookie) + if (b->binlog_id == binlog_id) { --b->xid_count; break; @@ -7700,8 +8029,22 @@ TC_LOG_BINLOG::mark_xid_done(ulong cookie) } /* Binlog is always found, as we do not remove until count reaches 0 */ DBUG_ASSERT(b); - if (likely(cookie == current && !reset_master_pending) || - b->xid_count != 0 || !first) + /* + If a RESET MASTER is pending, we are about to remove all log files, and + the RESET MASTER thread is waiting for all pending unlog() calls to + complete while holding LOCK_log. In this case we should not log a binlog + checkpoint event (it would be deleted immediately anyway and we would + deadlock on LOCK_log) but just signal the thread. + */ + if (unlikely(reset_master_pending)) + { + mysql_cond_signal(&COND_xid_list); + mysql_mutex_unlock(&LOCK_xid_list); + DBUG_VOID_RETURN; + } + + if (likely(binlog_id == current) || b->xid_count != 0 || !first || + !write_checkpoint) { /* No new binlog checkpoint reached yet. */ mysql_mutex_unlock(&LOCK_xid_list); @@ -7726,40 +8069,27 @@ TC_LOG_BINLOG::mark_xid_done(ulong cookie) LOCK_log, then re-aquire LOCK_xid_list. If we were to take LOCK_log while holding LOCK_xid_list, we might deadlock with other threads that take the locks in the opposite order. - - If a RESET MASTER is pending, we are about to remove all log files, and - the RESET MASTER thread is waiting for all pending unlog() calls to - complete while holding LOCK_log. In this case we should not log a binlog - checkpoint event (it would be deleted immediately anywat and we would - deadlock on LOCK_log) but just signal the thread. */ - if (!reset_master_pending) - { - mysql_mutex_unlock(&LOCK_xid_list); - mysql_mutex_lock(&LOCK_log); - mysql_mutex_lock(&LOCK_xid_list); - } + + mysql_mutex_unlock(&LOCK_xid_list); + mysql_mutex_lock(&LOCK_log); + mysql_mutex_lock(&LOCK_xid_list); + /* We need to reload current_binlog_id due to release/re-take of lock. */ + current= current_binlog_id; + for (;;) { /* Remove initial element(s) with zero count. */ b= binlog_xid_count_list.head(); /* - Normally, we must not remove all elements in the list. - Only if a RESET MASTER is in progress may we delete everything - RESET - MASTER has LOCK_log held, and will create a new initial element before - releasing the lock. + We must not remove all elements in the list - the entry for the current + binlog must be present always. */ - DBUG_ASSERT(b || reset_master_pending); - if (unlikely(!b) || b->binlog_id == current || b->xid_count > 0) + DBUG_ASSERT(b); + if (b->binlog_id == current || b->xid_count > 0) break; my_free(binlog_xid_count_list.get()); } - if (reset_master_pending) - { - mysql_cond_signal(&COND_xid_list); - mysql_mutex_unlock(&LOCK_xid_list); - DBUG_VOID_RETURN; - } mysql_mutex_unlock(&LOCK_xid_list); write_binlog_checkpoint_event_already_locked(b->binlog_name, @@ -7771,10 +8101,22 @@ TC_LOG_BINLOG::mark_xid_done(ulong cookie) int TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid) { DBUG_ENTER("TC_LOG_BINLOG::unlog"); - if (xid) - mark_xid_done(cookie); - /* As ::write_transaction_to_binlog() did not rotate, do it here. */ - DBUG_RETURN(rotate_and_purge(0)); + if (!xid) + DBUG_RETURN(0); + + if (!BINLOG_COOKIE_IS_DUMMY(cookie)) + mark_xid_done(BINLOG_COOKIE_GET_ID(cookie), true); + /* + See comment in trx_group_commit_leader() - if rotate() gave a failure, + we delay the return of error code to here. + */ + DBUG_RETURN(BINLOG_COOKIE_GET_ERROR_FLAG(cookie)); +} + +void +TC_LOG_BINLOG::commit_checkpoint_notify(void *cookie) +{ + mark_xid_done(((xid_count_per_binlog *)cookie)->binlog_id, true); } int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name, @@ -7871,6 +8213,11 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name, if (!binlog_checkpoint_found) break; first_round= false; + DBUG_EXECUTE_IF("xa_recover_expect_master_bin_000004", + if (0 != strcmp("./master-bin.000004", binlog_checkpoint_name) && + 0 != strcmp(".\\master-bin.000004", binlog_checkpoint_name)) + DBUG_SUICIDE(); + ); if (find_log_pos(linfo, binlog_checkpoint_name, 1)) { sql_print_error("Binlog file '%s' not found in binlog index, needed " @@ -7983,10 +8330,13 @@ binlog_checksum_update(MYSQL_THD thd, struct st_mysql_sys_var *var, { ulong value= *((ulong *)save); bool check_purge= false; + ulong prev_binlog_id; + LINT_INIT(prev_binlog_id); mysql_mutex_lock(mysql_bin_log.get_log_lock()); if(mysql_bin_log.is_open()) { + prev_binlog_id= mysql_bin_log.current_binlog_id; if (binlog_checksum_options != value) mysql_bin_log.checksum_alg_reset= (uint8) value; if (mysql_bin_log.rotate(true, &check_purge)) @@ -8000,7 +8350,7 @@ binlog_checksum_update(MYSQL_THD thd, struct st_mysql_sys_var *var, mysql_bin_log.checksum_alg_reset= BINLOG_CHECKSUM_ALG_UNDEF; mysql_mutex_unlock(mysql_bin_log.get_log_lock()); if (check_purge) - mysql_bin_log.purge(); + mysql_bin_log.checkpoint_and_purge(prev_binlog_id); } |