diff options
author | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2017-10-05 21:28:49 -0400 |
---|---|---|
committer | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2017-10-13 16:36:51 -0400 |
commit | f6e461f9a2680b48606658c2010ca6f20ee8e8b7 (patch) | |
tree | 107c044f5c1357e3bf157299e749ff53ea0b422b /src/mongo/db/repl/sync_source_resolver.cpp | |
parent | fa80e8fd37983fecbe97534e9a110a6d40daaa60 (diff) | |
download | mongo-f6e461f9a2680b48606658c2010ca6f20ee8e8b7.tar.gz |
SERVER-31262 Storage of _rbidCommandHandle can race with destruction of SyncSourceResolver
Diffstat (limited to 'src/mongo/db/repl/sync_source_resolver.cpp')
-rw-r--r-- | src/mongo/db/repl/sync_source_resolver.cpp | 72 |
1 files changed, 51 insertions, 21 deletions
diff --git a/src/mongo/db/repl/sync_source_resolver.cpp b/src/mongo/db/repl/sync_source_resolver.cpp index ad7f0ba0bec..ebb9f9649c7 100644 --- a/src/mongo/db/repl/sync_source_resolver.cpp +++ b/src/mongo/db/repl/sync_source_resolver.cpp @@ -34,6 +34,7 @@ #include "mongo/db/jsobj.h" #include "mongo/db/repl/oplog_entry.h" +#include "mongo/db/repl/replication_process.h" #include "mongo/db/repl/sync_source_selector.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/stdx/memory.h" @@ -179,7 +180,8 @@ std::unique_ptr<Fetcher> SyncSourceResolver::_makeFirstOplogEntryFetcher( } std::unique_ptr<Fetcher> SyncSourceResolver::_makeRequiredOpTimeFetcher(HostAndPort candidate, - OpTime earliestOpTimeSeen) { + OpTime earliestOpTimeSeen, + int rbid) { // This query is structured so that it is executed on the sync source using the oplog // start hack (oplogReplay=true and $gt/$gte predicate over "ts"). return stdx::make_unique<Fetcher>( @@ -193,7 +195,8 @@ std::unique_ptr<Fetcher> SyncSourceResolver::_makeRequiredOpTimeFetcher(HostAndP this, stdx::placeholders::_1, candidate, - earliestOpTimeSeen), + earliestOpTimeSeen, + rbid), ReadPreferenceSetting::secondaryPreferredMetadata(), kFetcherTimeout /* find network timeout */, kFetcherTimeout /* getMore network timeout */); @@ -206,6 +209,9 @@ Status SyncSourceResolver::_scheduleFetcher(std::unique_ptr<Fetcher> fetcher) { // executor. auto status = fetcher->schedule(); if (status.isOK()) { + // Fetcher destruction blocks on all outstanding callbacks. If we are currently in a + // Fetcher-related callback, we can't destroy the Fetcher just yet, so we assign it to a + // temporary unique pointer to allow the destruction to run to completion. _shuttingDownFetcher = std::move(_fetcher); _fetcher = std::move(fetcher); } else { @@ -313,10 +319,26 @@ void SyncSourceResolver::_firstOplogEntryFetcherCallback( return; } - _scheduleRBIDRequest(candidate, earliestOpTimeSeen); + auto status = _scheduleRBIDRequest(candidate, earliestOpTimeSeen); + if (!status.isOK()) { + _finishCallback(status).ignore(); + } } -void SyncSourceResolver::_scheduleRBIDRequest(HostAndPort candidate, OpTime earliestOpTimeSeen) { +Status SyncSourceResolver::_scheduleRBIDRequest(HostAndPort candidate, OpTime earliestOpTimeSeen) { + // Once a work is scheduled, nothing prevents it finishing. We need the mutex to protect the + // access of member variables after scheduling, because otherwise the scheduled callback could + // finish and allow the destructor to fire before we access the member variables. + stdx::lock_guard<stdx::mutex> lk(_mutex); + if (_state == State::kShuttingDown) { + return Status( + ErrorCodes::CallbackCanceled, + str::stream() + << "sync source resolver shut down while checking rollbackId on candidate: " + << candidate); + } + + invariant(_state == State::kRunning); auto handle = _taskExecutor->scheduleRemoteCommand( {candidate, "admin", BSON("replSetGetRBID" << 1), nullptr, kFetcherTimeout}, stdx::bind(&SyncSourceResolver::_rbidRequestCallback, @@ -326,15 +348,11 @@ void SyncSourceResolver::_scheduleRBIDRequest(HostAndPort candidate, OpTime earl stdx::placeholders::_1)); if (!handle.isOK()) { - _finishCallback(handle.getStatus()).transitional_ignore(); - return; + return handle.getStatus(); } - stdx::lock_guard<stdx::mutex> lk(_mutex); _rbidCommandHandle = std::move(handle.getValue()); - if (_state == State::kShuttingDown) { - _taskExecutor->cancel(_rbidCommandHandle); - } + return Status::OK(); } void SyncSourceResolver::_rbidRequestCallback( @@ -346,10 +364,11 @@ void SyncSourceResolver::_rbidRequestCallback( return; } + int rbid = ReplicationProcess::kUninitializedRollbackId; try { uassertStatusOK(rbidReply.response.status); uassertStatusOK(getStatusFromCommandResult(rbidReply.response.data)); - _rbid = rbidReply.response.data["rbid"].Int(); + rbid = rbidReply.response.data["rbid"].Int(); } catch (const DBException& ex) { const auto until = _taskExecutor->now() + kFetcherErrorBlacklistDuration; log() << "Blacklisting " << candidate << " due to error: '" << ex << "' for " @@ -362,13 +381,15 @@ void SyncSourceResolver::_rbidRequestCallback( if (!_requiredOpTime.isNull()) { // Schedule fetcher to look for '_requiredOpTime' in the remote oplog. // Unittest requires that this kind of failure be handled specially. - auto status = _scheduleFetcher(_makeRequiredOpTimeFetcher(candidate, earliestOpTimeSeen)); + auto status = + _scheduleFetcher(_makeRequiredOpTimeFetcher(candidate, earliestOpTimeSeen, rbid)); if (!status.isOK()) { _finishCallback(status).transitional_ignore(); } return; } - _finishCallback(candidate).transitional_ignore(); + + _finishCallback(candidate, rbid).ignore(); } Status SyncSourceResolver::_compareRequiredOpTimeWithQueryResponse( @@ -401,7 +422,8 @@ Status SyncSourceResolver::_compareRequiredOpTimeWithQueryResponse( void SyncSourceResolver::_requiredOpTimeFetcherCallback( const StatusWith<Fetcher::QueryResponse>& queryResult, HostAndPort candidate, - OpTime earliestOpTimeSeen) { + OpTime earliestOpTimeSeen, + int rbid) { if (_isShuttingDown()) { _finishCallback(Status(ErrorCodes::CallbackCanceled, str::stream() << "sync source resolver shut down while looking for " @@ -447,18 +469,19 @@ void SyncSourceResolver::_requiredOpTimeFetcherCallback( return; } - _finishCallback(candidate).transitional_ignore(); + _finishCallback(candidate, rbid).ignore(); } Status SyncSourceResolver::_chooseAndProbeNextSyncSource(OpTime earliestOpTimeSeen) { auto candidateResult = _chooseNewSyncSource(); if (!candidateResult.isOK()) { - return _finishCallback(candidateResult); + return _finishCallback(candidateResult.getStatus()); } if (candidateResult.getValue().empty()) { if (earliestOpTimeSeen.isNull()) { - return _finishCallback(candidateResult); + return _finishCallback(candidateResult.getValue(), + ReplicationProcess::kUninitializedRollbackId); } SyncSourceResolverResponse response; @@ -476,15 +499,22 @@ Status SyncSourceResolver::_chooseAndProbeNextSyncSource(OpTime earliestOpTimeSe return Status::OK(); } -Status SyncSourceResolver::_finishCallback(StatusWith<HostAndPort> result) { +Status SyncSourceResolver::_finishCallback(HostAndPort hostAndPort, int rbid) { SyncSourceResolverResponse response; - response.syncSourceStatus = std::move(result); - if (response.isOK() && !response.getSyncSource().empty()) { - response.rbid = _rbid; + response.syncSourceStatus = std::move(hostAndPort); + if (rbid != ReplicationProcess::kUninitializedRollbackId) { + response.rbid = rbid; } return _finishCallback(response); } +Status SyncSourceResolver::_finishCallback(Status status) { + invariant(!status.isOK()); + SyncSourceResolverResponse response; + response.syncSourceStatus = std::move(status); + return _finishCallback(response); +} + Status SyncSourceResolver::_finishCallback(const SyncSourceResolverResponse& response) { try { _onCompletion(response); |