summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/sync_source_resolver.cpp
diff options
context:
space:
mode:
authorSiyuan Zhou <siyuan.zhou@mongodb.com>2017-10-05 21:28:49 -0400
committerSiyuan Zhou <siyuan.zhou@mongodb.com>2017-10-13 16:36:51 -0400
commitf6e461f9a2680b48606658c2010ca6f20ee8e8b7 (patch)
tree107c044f5c1357e3bf157299e749ff53ea0b422b /src/mongo/db/repl/sync_source_resolver.cpp
parentfa80e8fd37983fecbe97534e9a110a6d40daaa60 (diff)
downloadmongo-f6e461f9a2680b48606658c2010ca6f20ee8e8b7.tar.gz
SERVER-31262 Storage of _rbidCommandHandle can race with destruction of SyncSourceResolver
Diffstat (limited to 'src/mongo/db/repl/sync_source_resolver.cpp')
-rw-r--r--src/mongo/db/repl/sync_source_resolver.cpp72
1 files changed, 51 insertions, 21 deletions
diff --git a/src/mongo/db/repl/sync_source_resolver.cpp b/src/mongo/db/repl/sync_source_resolver.cpp
index ad7f0ba0bec..ebb9f9649c7 100644
--- a/src/mongo/db/repl/sync_source_resolver.cpp
+++ b/src/mongo/db/repl/sync_source_resolver.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/repl/oplog_entry.h"
+#include "mongo/db/repl/replication_process.h"
#include "mongo/db/repl/sync_source_selector.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/stdx/memory.h"
@@ -179,7 +180,8 @@ std::unique_ptr<Fetcher> SyncSourceResolver::_makeFirstOplogEntryFetcher(
}
std::unique_ptr<Fetcher> SyncSourceResolver::_makeRequiredOpTimeFetcher(HostAndPort candidate,
- OpTime earliestOpTimeSeen) {
+ OpTime earliestOpTimeSeen,
+ int rbid) {
// This query is structured so that it is executed on the sync source using the oplog
// start hack (oplogReplay=true and $gt/$gte predicate over "ts").
return stdx::make_unique<Fetcher>(
@@ -193,7 +195,8 @@ std::unique_ptr<Fetcher> SyncSourceResolver::_makeRequiredOpTimeFetcher(HostAndP
this,
stdx::placeholders::_1,
candidate,
- earliestOpTimeSeen),
+ earliestOpTimeSeen,
+ rbid),
ReadPreferenceSetting::secondaryPreferredMetadata(),
kFetcherTimeout /* find network timeout */,
kFetcherTimeout /* getMore network timeout */);
@@ -206,6 +209,9 @@ Status SyncSourceResolver::_scheduleFetcher(std::unique_ptr<Fetcher> fetcher) {
// executor.
auto status = fetcher->schedule();
if (status.isOK()) {
+ // Fetcher destruction blocks on all outstanding callbacks. If we are currently in a
+ // Fetcher-related callback, we can't destroy the Fetcher just yet, so we assign it to a
+ // temporary unique pointer to allow the destruction to run to completion.
_shuttingDownFetcher = std::move(_fetcher);
_fetcher = std::move(fetcher);
} else {
@@ -313,10 +319,26 @@ void SyncSourceResolver::_firstOplogEntryFetcherCallback(
return;
}
- _scheduleRBIDRequest(candidate, earliestOpTimeSeen);
+ auto status = _scheduleRBIDRequest(candidate, earliestOpTimeSeen);
+ if (!status.isOK()) {
+ _finishCallback(status).ignore();
+ }
}
-void SyncSourceResolver::_scheduleRBIDRequest(HostAndPort candidate, OpTime earliestOpTimeSeen) {
+Status SyncSourceResolver::_scheduleRBIDRequest(HostAndPort candidate, OpTime earliestOpTimeSeen) {
+ // Once a work is scheduled, nothing prevents it finishing. We need the mutex to protect the
+ // access of member variables after scheduling, because otherwise the scheduled callback could
+ // finish and allow the destructor to fire before we access the member variables.
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (_state == State::kShuttingDown) {
+ return Status(
+ ErrorCodes::CallbackCanceled,
+ str::stream()
+ << "sync source resolver shut down while checking rollbackId on candidate: "
+ << candidate);
+ }
+
+ invariant(_state == State::kRunning);
auto handle = _taskExecutor->scheduleRemoteCommand(
{candidate, "admin", BSON("replSetGetRBID" << 1), nullptr, kFetcherTimeout},
stdx::bind(&SyncSourceResolver::_rbidRequestCallback,
@@ -326,15 +348,11 @@ void SyncSourceResolver::_scheduleRBIDRequest(HostAndPort candidate, OpTime earl
stdx::placeholders::_1));
if (!handle.isOK()) {
- _finishCallback(handle.getStatus()).transitional_ignore();
- return;
+ return handle.getStatus();
}
- stdx::lock_guard<stdx::mutex> lk(_mutex);
_rbidCommandHandle = std::move(handle.getValue());
- if (_state == State::kShuttingDown) {
- _taskExecutor->cancel(_rbidCommandHandle);
- }
+ return Status::OK();
}
void SyncSourceResolver::_rbidRequestCallback(
@@ -346,10 +364,11 @@ void SyncSourceResolver::_rbidRequestCallback(
return;
}
+ int rbid = ReplicationProcess::kUninitializedRollbackId;
try {
uassertStatusOK(rbidReply.response.status);
uassertStatusOK(getStatusFromCommandResult(rbidReply.response.data));
- _rbid = rbidReply.response.data["rbid"].Int();
+ rbid = rbidReply.response.data["rbid"].Int();
} catch (const DBException& ex) {
const auto until = _taskExecutor->now() + kFetcherErrorBlacklistDuration;
log() << "Blacklisting " << candidate << " due to error: '" << ex << "' for "
@@ -362,13 +381,15 @@ void SyncSourceResolver::_rbidRequestCallback(
if (!_requiredOpTime.isNull()) {
// Schedule fetcher to look for '_requiredOpTime' in the remote oplog.
// Unittest requires that this kind of failure be handled specially.
- auto status = _scheduleFetcher(_makeRequiredOpTimeFetcher(candidate, earliestOpTimeSeen));
+ auto status =
+ _scheduleFetcher(_makeRequiredOpTimeFetcher(candidate, earliestOpTimeSeen, rbid));
if (!status.isOK()) {
_finishCallback(status).transitional_ignore();
}
return;
}
- _finishCallback(candidate).transitional_ignore();
+
+ _finishCallback(candidate, rbid).ignore();
}
Status SyncSourceResolver::_compareRequiredOpTimeWithQueryResponse(
@@ -401,7 +422,8 @@ Status SyncSourceResolver::_compareRequiredOpTimeWithQueryResponse(
void SyncSourceResolver::_requiredOpTimeFetcherCallback(
const StatusWith<Fetcher::QueryResponse>& queryResult,
HostAndPort candidate,
- OpTime earliestOpTimeSeen) {
+ OpTime earliestOpTimeSeen,
+ int rbid) {
if (_isShuttingDown()) {
_finishCallback(Status(ErrorCodes::CallbackCanceled,
str::stream() << "sync source resolver shut down while looking for "
@@ -447,18 +469,19 @@ void SyncSourceResolver::_requiredOpTimeFetcherCallback(
return;
}
- _finishCallback(candidate).transitional_ignore();
+ _finishCallback(candidate, rbid).ignore();
}
Status SyncSourceResolver::_chooseAndProbeNextSyncSource(OpTime earliestOpTimeSeen) {
auto candidateResult = _chooseNewSyncSource();
if (!candidateResult.isOK()) {
- return _finishCallback(candidateResult);
+ return _finishCallback(candidateResult.getStatus());
}
if (candidateResult.getValue().empty()) {
if (earliestOpTimeSeen.isNull()) {
- return _finishCallback(candidateResult);
+ return _finishCallback(candidateResult.getValue(),
+ ReplicationProcess::kUninitializedRollbackId);
}
SyncSourceResolverResponse response;
@@ -476,15 +499,22 @@ Status SyncSourceResolver::_chooseAndProbeNextSyncSource(OpTime earliestOpTimeSe
return Status::OK();
}
-Status SyncSourceResolver::_finishCallback(StatusWith<HostAndPort> result) {
+Status SyncSourceResolver::_finishCallback(HostAndPort hostAndPort, int rbid) {
SyncSourceResolverResponse response;
- response.syncSourceStatus = std::move(result);
- if (response.isOK() && !response.getSyncSource().empty()) {
- response.rbid = _rbid;
+ response.syncSourceStatus = std::move(hostAndPort);
+ if (rbid != ReplicationProcess::kUninitializedRollbackId) {
+ response.rbid = rbid;
}
return _finishCallback(response);
}
+Status SyncSourceResolver::_finishCallback(Status status) {
+ invariant(!status.isOK());
+ SyncSourceResolverResponse response;
+ response.syncSourceStatus = std::move(status);
+ return _finishCallback(response);
+}
+
Status SyncSourceResolver::_finishCallback(const SyncSourceResolverResponse& response) {
try {
_onCompletion(response);