summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSiyuan Zhou <siyuan.zhou@mongodb.com>2017-10-05 21:28:49 -0400
committerSiyuan Zhou <siyuan.zhou@mongodb.com>2018-01-12 15:13:05 -0500
commit2676b7aa412853c608c2cd834d53cb3c81aba985 (patch)
treef50e8e24c61d4432680ef6f28ef29b662a743baa
parentf1f38099c3c964cc445f4805de0ce072b436e5cc (diff)
downloadmongo-2676b7aa412853c608c2cd834d53cb3c81aba985.tar.gz
SERVER-31262 Storage of _rbidCommandHandle can race with destruction of SyncSourceResolver
(cherry picked from commit f6e461f9a2680b48606658c2010ca6f20ee8e8b7)
-rw-r--r--src/mongo/db/repl/sync_source_resolver.cpp72
-rw-r--r--src/mongo/db/repl/sync_source_resolver.h15
2 files changed, 58 insertions, 29 deletions
diff --git a/src/mongo/db/repl/sync_source_resolver.cpp b/src/mongo/db/repl/sync_source_resolver.cpp
index f65da1f66c9..17a706f1b61 100644
--- a/src/mongo/db/repl/sync_source_resolver.cpp
+++ b/src/mongo/db/repl/sync_source_resolver.cpp
@@ -54,6 +54,7 @@ const Seconds SyncSourceResolver::kFirstOplogEntryEmptyBlacklistDuration(10);
const Seconds SyncSourceResolver::kFirstOplogEntryNullTimestampBlacklistDuration(10);
const Minutes SyncSourceResolver::kTooStaleBlacklistDuration(1);
const Seconds SyncSourceResolver::kNoRequiredOpTimeBlacklistDuration(60);
+const int SyncSourceResolver::kUninitializedRollbackId(-1);
SyncSourceResolver::SyncSourceResolver(executor::TaskExecutor* taskExecutor,
SyncSourceSelector* syncSourceSelector,
@@ -180,7 +181,8 @@ std::unique_ptr<Fetcher> SyncSourceResolver::_makeFirstOplogEntryFetcher(
}
std::unique_ptr<Fetcher> SyncSourceResolver::_makeRequiredOpTimeFetcher(HostAndPort candidate,
- OpTime earliestOpTimeSeen) {
+ OpTime earliestOpTimeSeen,
+ int rbid) {
// This query is structured so that it is executed on the sync source using the oplog
// start hack (oplogReplay=true and $gt/$gte predicate over "ts").
return stdx::make_unique<Fetcher>(
@@ -194,7 +196,8 @@ std::unique_ptr<Fetcher> SyncSourceResolver::_makeRequiredOpTimeFetcher(HostAndP
this,
stdx::placeholders::_1,
candidate,
- earliestOpTimeSeen),
+ earliestOpTimeSeen,
+ rbid),
rpc::ServerSelectionMetadata(true, boost::none).toBSON(),
kFetcherTimeout /* find network timeout */,
kFetcherTimeout /* getMore network timeout */);
@@ -207,6 +210,9 @@ Status SyncSourceResolver::_scheduleFetcher(std::unique_ptr<Fetcher> fetcher) {
// executor.
auto status = fetcher->schedule();
if (status.isOK()) {
+ // Fetcher destruction blocks on all outstanding callbacks. If we are currently in a
+ // Fetcher-related callback, we can't destroy the Fetcher just yet, so we assign it to a
+ // temporary unique pointer to allow the destruction to run to completion.
_shuttingDownFetcher = std::move(_fetcher);
_fetcher = std::move(fetcher);
} else {
@@ -313,10 +319,26 @@ void SyncSourceResolver::_firstOplogEntryFetcherCallback(
return;
}
- _scheduleRBIDRequest(candidate, earliestOpTimeSeen);
+ auto status = _scheduleRBIDRequest(candidate, earliestOpTimeSeen);
+ if (!status.isOK()) {
+ _finishCallback(status);
+ }
}
-void SyncSourceResolver::_scheduleRBIDRequest(HostAndPort candidate, OpTime earliestOpTimeSeen) {
+Status SyncSourceResolver::_scheduleRBIDRequest(HostAndPort candidate, OpTime earliestOpTimeSeen) {
+ // Once a work is scheduled, nothing prevents it finishing. We need the mutex to protect the
+ // access of member variables after scheduling, because otherwise the scheduled callback could
+ // finish and allow the destructor to fire before we access the member variables.
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (_state == State::kShuttingDown) {
+ return Status(
+ ErrorCodes::CallbackCanceled,
+ str::stream()
+ << "sync source resolver shut down while checking rollbackId on candidate: "
+ << candidate);
+ }
+
+ invariant(_state == State::kRunning);
auto handle = _taskExecutor->scheduleRemoteCommand(
{candidate, "admin", BSON("replSetGetRBID" << 1), nullptr, kFetcherTimeout},
stdx::bind(&SyncSourceResolver::_rbidRequestCallback,
@@ -326,15 +348,11 @@ void SyncSourceResolver::_scheduleRBIDRequest(HostAndPort candidate, OpTime earl
stdx::placeholders::_1));
if (!handle.isOK()) {
- _finishCallback(handle.getStatus());
- return;
+ return handle.getStatus();
}
- stdx::lock_guard<stdx::mutex> lk(_mutex);
_rbidCommandHandle = std::move(handle.getValue());
- if (_state == State::kShuttingDown) {
- _taskExecutor->cancel(_rbidCommandHandle);
- }
+ return Status::OK();
}
void SyncSourceResolver::_rbidRequestCallback(
@@ -346,10 +364,11 @@ void SyncSourceResolver::_rbidRequestCallback(
return;
}
+ int rbid = kUninitializedRollbackId;
try {
uassertStatusOK(rbidReply.response.status);
uassertStatusOK(getStatusFromCommandResult(rbidReply.response.data));
- _rbid = rbidReply.response.data["rbid"].Int();
+ rbid = rbidReply.response.data["rbid"].Int();
} catch (const DBException& ex) {
const auto until = _taskExecutor->now() + kFetcherErrorBlacklistDuration;
log() << "Blacklisting " << candidate << " due to error: '" << ex << "' for "
@@ -362,13 +381,15 @@ void SyncSourceResolver::_rbidRequestCallback(
if (!_requiredOpTime.isNull()) {
// Schedule fetcher to look for '_requiredOpTime' in the remote oplog.
// Unittest requires that this kind of failure be handled specially.
- auto status = _scheduleFetcher(_makeRequiredOpTimeFetcher(candidate, earliestOpTimeSeen));
+ auto status =
+ _scheduleFetcher(_makeRequiredOpTimeFetcher(candidate, earliestOpTimeSeen, rbid));
if (!status.isOK()) {
_finishCallback(status);
}
return;
}
- _finishCallback(candidate);
+
+ _finishCallback(candidate, rbid);
}
Status SyncSourceResolver::_compareRequiredOpTimeWithQueryResponse(
@@ -401,7 +422,8 @@ Status SyncSourceResolver::_compareRequiredOpTimeWithQueryResponse(
void SyncSourceResolver::_requiredOpTimeFetcherCallback(
const StatusWith<Fetcher::QueryResponse>& queryResult,
HostAndPort candidate,
- OpTime earliestOpTimeSeen) {
+ OpTime earliestOpTimeSeen,
+ int rbid) {
if (_isShuttingDown()) {
_finishCallback(Status(ErrorCodes::CallbackCanceled,
str::stream() << "sync source resolver shut down while looking for "
@@ -446,18 +468,18 @@ void SyncSourceResolver::_requiredOpTimeFetcherCallback(
return;
}
- _finishCallback(candidate);
+ _finishCallback(candidate, rbid);
}
Status SyncSourceResolver::_chooseAndProbeNextSyncSource(OpTime earliestOpTimeSeen) {
auto candidateResult = _chooseNewSyncSource();
if (!candidateResult.isOK()) {
- return _finishCallback(candidateResult);
+ return _finishCallback(candidateResult.getStatus());
}
if (candidateResult.getValue().empty()) {
if (earliestOpTimeSeen.isNull()) {
- return _finishCallback(candidateResult);
+ return _finishCallback(candidateResult.getValue(), kUninitializedRollbackId);
}
SyncSourceResolverResponse response;
@@ -475,16 +497,22 @@ Status SyncSourceResolver::_chooseAndProbeNextSyncSource(OpTime earliestOpTimeSe
return Status::OK();
}
-Status SyncSourceResolver::_finishCallback(StatusWith<HostAndPort> result) {
+Status SyncSourceResolver::_finishCallback(HostAndPort hostAndPort, int rbid) {
SyncSourceResolverResponse response;
- response.syncSourceStatus = std::move(result);
- if (response.isOK() && !response.getSyncSource().empty()) {
- invariant(_requiredOpTime.isNull() || _rbid);
- response.rbid = _rbid;
+ response.syncSourceStatus = std::move(hostAndPort);
+ if (rbid != kUninitializedRollbackId) {
+ response.rbid = rbid;
}
return _finishCallback(response);
}
+Status SyncSourceResolver::_finishCallback(Status status) {
+ invariant(!status.isOK());
+ SyncSourceResolverResponse response;
+ response.syncSourceStatus = std::move(status);
+ return _finishCallback(response);
+}
+
Status SyncSourceResolver::_finishCallback(const SyncSourceResolverResponse& response) {
try {
_onCompletion(response);
diff --git a/src/mongo/db/repl/sync_source_resolver.h b/src/mongo/db/repl/sync_source_resolver.h
index 201658caacf..0f9343cfcdb 100644
--- a/src/mongo/db/repl/sync_source_resolver.h
+++ b/src/mongo/db/repl/sync_source_resolver.h
@@ -102,6 +102,7 @@ public:
static const Seconds kFirstOplogEntryNullTimestampBlacklistDuration;
static const Minutes kTooStaleBlacklistDuration;
static const Seconds kNoRequiredOpTimeBlacklistDuration;
+ static const int kUninitializedRollbackId;
/**
* Callback function to report final status of resolving sync source.
@@ -154,7 +155,8 @@ private:
* Creates fetcher to check the remote oplog for '_requiredOpTime'.
*/
std::unique_ptr<Fetcher> _makeRequiredOpTimeFetcher(HostAndPort candidate,
- OpTime earliestOpTimeSeen);
+ OpTime earliestOpTimeSeen,
+ int rbid);
/**
* Schedules fetcher to read oplog on sync source.
@@ -179,7 +181,7 @@ private:
/**
* Schedules a replSetGetRBID command against the candidate to fetch its current rollback id.
*/
- void _scheduleRBIDRequest(HostAndPort candidate, OpTime earliestOpTimeSeen);
+ Status _scheduleRBIDRequest(HostAndPort candidate, OpTime earliestOpTimeSeen);
void _rbidRequestCallback(HostAndPort candidate,
OpTime earliestOpTimeSeen,
const executor::TaskExecutor::RemoteCommandCallbackArgs& rbidReply);
@@ -194,7 +196,8 @@ private:
*/
void _requiredOpTimeFetcherCallback(const StatusWith<Fetcher::QueryResponse>& queryResult,
HostAndPort candidate,
- OpTime earliestOpTimeSeen);
+ OpTime earliestOpTimeSeen,
+ int rbid);
/**
* Obtains new sync source candidate and schedules remote command to fetcher first oplog entry.
@@ -207,7 +210,8 @@ private:
* Invokes completion callback and transitions state to State::kComplete.
* Returns result.getStatus().
*/
- Status _finishCallback(StatusWith<HostAndPort> result);
+ Status _finishCallback(HostAndPort hostAndPort, int rbid);
+ Status _finishCallback(Status status);
Status _finishCallback(const SyncSourceResolverResponse& response);
// Executor used to send remote commands to sync source candidates.
@@ -229,9 +233,6 @@ private:
// resolver via this callback in a SyncSourceResolverResponse struct when the resolver finishes.
const OnCompletionFn _onCompletion;
- // The rbid we will return to our caller.
- int _rbid;
-
// Protects members of this sync source resolver defined below.
mutable stdx::mutex _mutex;
mutable stdx::condition_variable _condition;