diff options
author | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2017-10-05 21:28:49 -0400 |
---|---|---|
committer | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2018-01-12 15:13:05 -0500 |
commit | 2676b7aa412853c608c2cd834d53cb3c81aba985 (patch) | |
tree | f50e8e24c61d4432680ef6f28ef29b662a743baa | |
parent | f1f38099c3c964cc445f4805de0ce072b436e5cc (diff) | |
download | mongo-2676b7aa412853c608c2cd834d53cb3c81aba985.tar.gz |
SERVER-31262 Storage of _rbidCommandHandle can race with destruction of SyncSourceResolver
(cherry picked from commit f6e461f9a2680b48606658c2010ca6f20ee8e8b7)
-rw-r--r-- | src/mongo/db/repl/sync_source_resolver.cpp | 72 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_source_resolver.h | 15 |
2 files changed, 58 insertions, 29 deletions
diff --git a/src/mongo/db/repl/sync_source_resolver.cpp b/src/mongo/db/repl/sync_source_resolver.cpp index f65da1f66c9..17a706f1b61 100644 --- a/src/mongo/db/repl/sync_source_resolver.cpp +++ b/src/mongo/db/repl/sync_source_resolver.cpp @@ -54,6 +54,7 @@ const Seconds SyncSourceResolver::kFirstOplogEntryEmptyBlacklistDuration(10); const Seconds SyncSourceResolver::kFirstOplogEntryNullTimestampBlacklistDuration(10); const Minutes SyncSourceResolver::kTooStaleBlacklistDuration(1); const Seconds SyncSourceResolver::kNoRequiredOpTimeBlacklistDuration(60); +const int SyncSourceResolver::kUninitializedRollbackId(-1); SyncSourceResolver::SyncSourceResolver(executor::TaskExecutor* taskExecutor, SyncSourceSelector* syncSourceSelector, @@ -180,7 +181,8 @@ std::unique_ptr<Fetcher> SyncSourceResolver::_makeFirstOplogEntryFetcher( } std::unique_ptr<Fetcher> SyncSourceResolver::_makeRequiredOpTimeFetcher(HostAndPort candidate, - OpTime earliestOpTimeSeen) { + OpTime earliestOpTimeSeen, + int rbid) { // This query is structured so that it is executed on the sync source using the oplog // start hack (oplogReplay=true and $gt/$gte predicate over "ts"). return stdx::make_unique<Fetcher>( @@ -194,7 +196,8 @@ std::unique_ptr<Fetcher> SyncSourceResolver::_makeRequiredOpTimeFetcher(HostAndP this, stdx::placeholders::_1, candidate, - earliestOpTimeSeen), + earliestOpTimeSeen, + rbid), rpc::ServerSelectionMetadata(true, boost::none).toBSON(), kFetcherTimeout /* find network timeout */, kFetcherTimeout /* getMore network timeout */); @@ -207,6 +210,9 @@ Status SyncSourceResolver::_scheduleFetcher(std::unique_ptr<Fetcher> fetcher) { // executor. auto status = fetcher->schedule(); if (status.isOK()) { + // Fetcher destruction blocks on all outstanding callbacks. If we are currently in a + // Fetcher-related callback, we can't destroy the Fetcher just yet, so we assign it to a + // temporary unique pointer to allow the destruction to run to completion. _shuttingDownFetcher = std::move(_fetcher); _fetcher = std::move(fetcher); } else { @@ -313,10 +319,26 @@ void SyncSourceResolver::_firstOplogEntryFetcherCallback( return; } - _scheduleRBIDRequest(candidate, earliestOpTimeSeen); + auto status = _scheduleRBIDRequest(candidate, earliestOpTimeSeen); + if (!status.isOK()) { + _finishCallback(status); + } } -void SyncSourceResolver::_scheduleRBIDRequest(HostAndPort candidate, OpTime earliestOpTimeSeen) { +Status SyncSourceResolver::_scheduleRBIDRequest(HostAndPort candidate, OpTime earliestOpTimeSeen) { + // Once a work is scheduled, nothing prevents it finishing. We need the mutex to protect the + // access of member variables after scheduling, because otherwise the scheduled callback could + // finish and allow the destructor to fire before we access the member variables. + stdx::lock_guard<stdx::mutex> lk(_mutex); + if (_state == State::kShuttingDown) { + return Status( + ErrorCodes::CallbackCanceled, + str::stream() + << "sync source resolver shut down while checking rollbackId on candidate: " + << candidate); + } + + invariant(_state == State::kRunning); auto handle = _taskExecutor->scheduleRemoteCommand( {candidate, "admin", BSON("replSetGetRBID" << 1), nullptr, kFetcherTimeout}, stdx::bind(&SyncSourceResolver::_rbidRequestCallback, @@ -326,15 +348,11 @@ void SyncSourceResolver::_scheduleRBIDRequest(HostAndPort candidate, OpTime earl stdx::placeholders::_1)); if (!handle.isOK()) { - _finishCallback(handle.getStatus()); - return; + return handle.getStatus(); } - stdx::lock_guard<stdx::mutex> lk(_mutex); _rbidCommandHandle = std::move(handle.getValue()); - if (_state == State::kShuttingDown) { - _taskExecutor->cancel(_rbidCommandHandle); - } + return Status::OK(); } void SyncSourceResolver::_rbidRequestCallback( @@ -346,10 +364,11 @@ void SyncSourceResolver::_rbidRequestCallback( return; } + int rbid = kUninitializedRollbackId; try { uassertStatusOK(rbidReply.response.status); uassertStatusOK(getStatusFromCommandResult(rbidReply.response.data)); - _rbid = rbidReply.response.data["rbid"].Int(); + rbid = rbidReply.response.data["rbid"].Int(); } catch (const DBException& ex) { const auto until = _taskExecutor->now() + kFetcherErrorBlacklistDuration; log() << "Blacklisting " << candidate << " due to error: '" << ex << "' for " @@ -362,13 +381,15 @@ void SyncSourceResolver::_rbidRequestCallback( if (!_requiredOpTime.isNull()) { // Schedule fetcher to look for '_requiredOpTime' in the remote oplog. // Unittest requires that this kind of failure be handled specially. - auto status = _scheduleFetcher(_makeRequiredOpTimeFetcher(candidate, earliestOpTimeSeen)); + auto status = + _scheduleFetcher(_makeRequiredOpTimeFetcher(candidate, earliestOpTimeSeen, rbid)); if (!status.isOK()) { _finishCallback(status); } return; } - _finishCallback(candidate); + + _finishCallback(candidate, rbid); } Status SyncSourceResolver::_compareRequiredOpTimeWithQueryResponse( @@ -401,7 +422,8 @@ Status SyncSourceResolver::_compareRequiredOpTimeWithQueryResponse( void SyncSourceResolver::_requiredOpTimeFetcherCallback( const StatusWith<Fetcher::QueryResponse>& queryResult, HostAndPort candidate, - OpTime earliestOpTimeSeen) { + OpTime earliestOpTimeSeen, + int rbid) { if (_isShuttingDown()) { _finishCallback(Status(ErrorCodes::CallbackCanceled, str::stream() << "sync source resolver shut down while looking for " @@ -446,18 +468,18 @@ void SyncSourceResolver::_requiredOpTimeFetcherCallback( return; } - _finishCallback(candidate); + _finishCallback(candidate, rbid); } Status SyncSourceResolver::_chooseAndProbeNextSyncSource(OpTime earliestOpTimeSeen) { auto candidateResult = _chooseNewSyncSource(); if (!candidateResult.isOK()) { - return _finishCallback(candidateResult); + return _finishCallback(candidateResult.getStatus()); } if (candidateResult.getValue().empty()) { if (earliestOpTimeSeen.isNull()) { - return _finishCallback(candidateResult); + return _finishCallback(candidateResult.getValue(), kUninitializedRollbackId); } SyncSourceResolverResponse response; @@ -475,16 +497,22 @@ Status SyncSourceResolver::_chooseAndProbeNextSyncSource(OpTime earliestOpTimeSe return Status::OK(); } -Status SyncSourceResolver::_finishCallback(StatusWith<HostAndPort> result) { +Status SyncSourceResolver::_finishCallback(HostAndPort hostAndPort, int rbid) { SyncSourceResolverResponse response; - response.syncSourceStatus = std::move(result); - if (response.isOK() && !response.getSyncSource().empty()) { - invariant(_requiredOpTime.isNull() || _rbid); - response.rbid = _rbid; + response.syncSourceStatus = std::move(hostAndPort); + if (rbid != kUninitializedRollbackId) { + response.rbid = rbid; } return _finishCallback(response); } +Status SyncSourceResolver::_finishCallback(Status status) { + invariant(!status.isOK()); + SyncSourceResolverResponse response; + response.syncSourceStatus = std::move(status); + return _finishCallback(response); +} + Status SyncSourceResolver::_finishCallback(const SyncSourceResolverResponse& response) { try { _onCompletion(response); diff --git a/src/mongo/db/repl/sync_source_resolver.h b/src/mongo/db/repl/sync_source_resolver.h index 201658caacf..0f9343cfcdb 100644 --- a/src/mongo/db/repl/sync_source_resolver.h +++ b/src/mongo/db/repl/sync_source_resolver.h @@ -102,6 +102,7 @@ public: static const Seconds kFirstOplogEntryNullTimestampBlacklistDuration; static const Minutes kTooStaleBlacklistDuration; static const Seconds kNoRequiredOpTimeBlacklistDuration; + static const int kUninitializedRollbackId; /** * Callback function to report final status of resolving sync source. @@ -154,7 +155,8 @@ private: * Creates fetcher to check the remote oplog for '_requiredOpTime'. */ std::unique_ptr<Fetcher> _makeRequiredOpTimeFetcher(HostAndPort candidate, - OpTime earliestOpTimeSeen); + OpTime earliestOpTimeSeen, + int rbid); /** * Schedules fetcher to read oplog on sync source. @@ -179,7 +181,7 @@ private: /** * Schedules a replSetGetRBID command against the candidate to fetch its current rollback id. */ - void _scheduleRBIDRequest(HostAndPort candidate, OpTime earliestOpTimeSeen); + Status _scheduleRBIDRequest(HostAndPort candidate, OpTime earliestOpTimeSeen); void _rbidRequestCallback(HostAndPort candidate, OpTime earliestOpTimeSeen, const executor::TaskExecutor::RemoteCommandCallbackArgs& rbidReply); @@ -194,7 +196,8 @@ private: */ void _requiredOpTimeFetcherCallback(const StatusWith<Fetcher::QueryResponse>& queryResult, HostAndPort candidate, - OpTime earliestOpTimeSeen); + OpTime earliestOpTimeSeen, + int rbid); /** * Obtains new sync source candidate and schedules remote command to fetcher first oplog entry. @@ -207,7 +210,8 @@ private: * Invokes completion callback and transitions state to State::kComplete. * Returns result.getStatus(). */ - Status _finishCallback(StatusWith<HostAndPort> result); + Status _finishCallback(HostAndPort hostAndPort, int rbid); + Status _finishCallback(Status status); Status _finishCallback(const SyncSourceResolverResponse& response); // Executor used to send remote commands to sync source candidates. @@ -229,9 +233,6 @@ private: // resolver via this callback in a SyncSourceResolverResponse struct when the resolver finishes. const OnCompletionFn _onCompletion; - // The rbid we will return to our caller. - int _rbid; - // Protects members of this sync source resolver defined below. mutable stdx::mutex _mutex; mutable stdx::condition_variable _condition; |