/**
* Copyright 2016 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
#include "mongo/platform/basic.h"
#include "mongo/db/repl/sync_source_resolver.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/replication_process.h"
#include "mongo/db/repl/sync_source_selector.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/destructor_guard.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
namespace mongo {
namespace repl {
const NamespaceString SyncSourceResolver::kLocalOplogNss("local.oplog.rs");
const Seconds SyncSourceResolver::kFetcherTimeout(30);
const Seconds SyncSourceResolver::kFetcherErrorBlacklistDuration(10);
const Seconds SyncSourceResolver::kOplogEmptyBlacklistDuration(10);
const Seconds SyncSourceResolver::kFirstOplogEntryEmptyBlacklistDuration(10);
const Seconds SyncSourceResolver::kFirstOplogEntryNullTimestampBlacklistDuration(10);
const Minutes SyncSourceResolver::kTooStaleBlacklistDuration(1);
const Seconds SyncSourceResolver::kNoRequiredOpTimeBlacklistDuration(60);
SyncSourceResolver::SyncSourceResolver(executor::TaskExecutor* taskExecutor,
SyncSourceSelector* syncSourceSelector,
const OpTime& lastOpTimeFetched,
const OpTime& requiredOpTime,
const OnCompletionFn& onCompletion)
: _taskExecutor(taskExecutor),
_syncSourceSelector(syncSourceSelector),
_lastOpTimeFetched(lastOpTimeFetched),
_requiredOpTime(requiredOpTime),
_onCompletion(onCompletion) {
uassert(ErrorCodes::BadValue, "task executor cannot be null", taskExecutor);
uassert(ErrorCodes::BadValue, "sync source selector cannot be null", syncSourceSelector);
uassert(
ErrorCodes::BadValue, "last fetched optime cannot be null", !lastOpTimeFetched.isNull());
uassert(ErrorCodes::BadValue,
str::stream() << "required optime (if provided) must be more recent than last "
"fetched optime. requiredOpTime: "
<< requiredOpTime.toString()
<< ", lastOpTimeFetched: "
<< lastOpTimeFetched.toString(),
requiredOpTime.isNull() || requiredOpTime > lastOpTimeFetched);
uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion);
}
SyncSourceResolver::~SyncSourceResolver() {
DESTRUCTOR_GUARD(shutdown(); join(););
}
bool SyncSourceResolver::isActive() const {
stdx::lock_guard lock(_mutex);
return _isActive_inlock();
}
bool SyncSourceResolver::_isActive_inlock() const {
return State::kRunning == _state || State::kShuttingDown == _state;
}
Status SyncSourceResolver::startup() {
{
stdx::lock_guard lock(_mutex);
switch (_state) {
case State::kPreStart:
_state = State::kRunning;
break;
case State::kRunning:
return Status(ErrorCodes::IllegalOperation, "sync source resolver already started");
case State::kShuttingDown:
return Status(ErrorCodes::ShutdownInProgress, "sync source resolver shutting down");
case State::kComplete:
return Status(ErrorCodes::ShutdownInProgress, "sync source resolver completed");
}
}
return _chooseAndProbeNextSyncSource(OpTime());
}
void SyncSourceResolver::shutdown() {
stdx::lock_guard lock(_mutex);
// Transition directly from PreStart to Complete if not started yet.
if (State::kPreStart == _state) {
_state = State::kComplete;
return;
}
// Nothing to do if we are already in ShuttingDown or Complete state.
if (State::kShuttingDown == _state || State::kComplete == _state) {
return;
}
invariant(_state == State::kRunning);
_state = State::kShuttingDown;
if (_fetcher) {
_fetcher->shutdown();
}
if (_rbidCommandHandle) {
_taskExecutor->cancel(_rbidCommandHandle);
}
}
void SyncSourceResolver::join() {
stdx::unique_lock lk(_mutex);
_condition.wait(lk, [this]() { return !_isActive_inlock(); });
}
bool SyncSourceResolver::_isShuttingDown() const {
stdx::lock_guard lock(_mutex);
return State::kShuttingDown == _state;
}
StatusWith SyncSourceResolver::_chooseNewSyncSource() {
HostAndPort candidate;
try {
candidate = _syncSourceSelector->chooseNewSyncSource(_lastOpTimeFetched);
} catch (...) {
return exceptionToStatus();
}
if (_isShuttingDown()) {
return Status(ErrorCodes::CallbackCanceled,
str::stream() << "sync source resolver shut down before probing candidate: "
<< candidate);
}
return candidate;
}
std::unique_ptr SyncSourceResolver::_makeFirstOplogEntryFetcher(
HostAndPort candidate, OpTime earliestOpTimeSeen) {
return stdx::make_unique(
_taskExecutor,
candidate,
kLocalOplogNss.db().toString(),
BSON("find" << kLocalOplogNss.coll() << "limit" << 1 << "sort" << BSON("$natural" << 1)
<< "projection"
<< BSON(OplogEntryBase::kTimestampFieldName << 1
<< OplogEntryBase::kTermFieldName
<< 1)),
[=](const StatusWith& response,
Fetcher::NextAction*,
BSONObjBuilder*) {
return _firstOplogEntryFetcherCallback(response, candidate, earliestOpTimeSeen);
},
ReadPreferenceSetting::secondaryPreferredMetadata(),
kFetcherTimeout /* find network timeout */,
kFetcherTimeout /* getMore network timeout */);
}
std::unique_ptr SyncSourceResolver::_makeRequiredOpTimeFetcher(HostAndPort candidate,
OpTime earliestOpTimeSeen,
int rbid) {
// This query is structured so that it is executed on the sync source using the oplog
// start hack (oplogReplay=true and $gt/$gte predicate over "ts").
return stdx::make_unique(
_taskExecutor,
candidate,
kLocalOplogNss.db().toString(),
BSON("find" << kLocalOplogNss.coll() << "oplogReplay" << true << "filter"
<< BSON("ts" << BSON("$gte" << _requiredOpTime.getTimestamp() << "$lte"
<< _requiredOpTime.getTimestamp()))),
[=](const StatusWith& response,
Fetcher::NextAction*,
BSONObjBuilder*) {
return _requiredOpTimeFetcherCallback(response, candidate, earliestOpTimeSeen, rbid);
},
ReadPreferenceSetting::secondaryPreferredMetadata(),
kFetcherTimeout /* find network timeout */,
kFetcherTimeout /* getMore network timeout */);
}
Status SyncSourceResolver::_scheduleFetcher(std::unique_ptr fetcher) {
stdx::lock_guard lk(_mutex);
// TODO SERVER-27499 need to check if _state is kShuttingDown inside the mutex.
// Must schedule fetcher inside lock in case fetcher's callback gets invoked immediately by task
// executor.
auto status = fetcher->schedule();
if (status.isOK()) {
// Fetcher destruction blocks on all outstanding callbacks. If we are currently in a
// Fetcher-related callback, we can't destroy the Fetcher just yet, so we assign it to a
// temporary unique pointer to allow the destruction to run to completion.
_shuttingDownFetcher = std::move(_fetcher);
_fetcher = std::move(fetcher);
} else {
error() << "Error scheduling fetcher to evaluate host as sync source, host:"
<< fetcher->getSource() << ", error: " << status;
}
return status;
}
OpTime SyncSourceResolver::_parseRemoteEarliestOpTime(const HostAndPort& candidate,
const Fetcher::QueryResponse& queryResponse) {
if (queryResponse.documents.empty()) {
// Remote oplog is empty.
const auto until = _taskExecutor->now() + kOplogEmptyBlacklistDuration;
log() << "Blacklisting " << candidate << " due to empty oplog for "
<< kOplogEmptyBlacklistDuration << " until: " << until;
_syncSourceSelector->blacklistSyncSource(candidate, until);
return OpTime();
}
const auto& firstObjFound = queryResponse.documents.front();
if (firstObjFound.isEmpty()) {
// First document in remote oplog is empty.
const auto until = _taskExecutor->now() + kFirstOplogEntryEmptyBlacklistDuration;
log() << "Blacklisting " << candidate << " due to empty first document for "
<< kFirstOplogEntryEmptyBlacklistDuration << " until: " << until;
_syncSourceSelector->blacklistSyncSource(candidate, until);
return OpTime();
}
const auto remoteEarliestOpTime = OpTime::parseFromOplogEntry(firstObjFound);
if (!remoteEarliestOpTime.isOK()) {
const auto until = _taskExecutor->now() + kFirstOplogEntryNullTimestampBlacklistDuration;
log() << "Blacklisting " << candidate << " due to error parsing OpTime from the oldest"
<< " oplog entry for " << kFirstOplogEntryNullTimestampBlacklistDuration
<< " until: " << until << ". Error: " << remoteEarliestOpTime.getStatus()
<< ", Entry: " << redact(firstObjFound);
_syncSourceSelector->blacklistSyncSource(candidate, until);
return OpTime();
}
if (remoteEarliestOpTime.getValue().isNull()) {
// First document in remote oplog is empty.
const auto until = _taskExecutor->now() + kFirstOplogEntryNullTimestampBlacklistDuration;
log() << "Blacklisting " << candidate << " due to null timestamp in first document for "
<< kFirstOplogEntryNullTimestampBlacklistDuration << " until: " << until;
_syncSourceSelector->blacklistSyncSource(candidate, until);
return OpTime();
}
return remoteEarliestOpTime.getValue();
}
void SyncSourceResolver::_firstOplogEntryFetcherCallback(
const StatusWith& queryResult,
HostAndPort candidate,
OpTime earliestOpTimeSeen) {
if (_isShuttingDown()) {
_finishCallback(Status(ErrorCodes::CallbackCanceled,
str::stream()
<< "sync source resolver shut down while probing candidate: "
<< candidate))
.transitional_ignore();
return;
}
if (ErrorCodes::CallbackCanceled == queryResult.getStatus()) {
_finishCallback(queryResult.getStatus()).transitional_ignore();
return;
}
if (!queryResult.isOK()) {
// We got an error.
const auto until = _taskExecutor->now() + kFetcherErrorBlacklistDuration;
log() << "Blacklisting " << candidate << " due to error: '" << queryResult.getStatus()
<< "' for " << kFetcherErrorBlacklistDuration << " until: " << until;
_syncSourceSelector->blacklistSyncSource(candidate, until);
_chooseAndProbeNextSyncSource(earliestOpTimeSeen).transitional_ignore();
return;
}
const auto& queryResponse = queryResult.getValue();
const auto remoteEarliestOpTime = _parseRemoteEarliestOpTime(candidate, queryResponse);
if (remoteEarliestOpTime.isNull()) {
_chooseAndProbeNextSyncSource(earliestOpTimeSeen).transitional_ignore();
return;
}
// remoteEarliestOpTime may come from a very old config, so we cannot compare their terms.
if (_lastOpTimeFetched.getTimestamp() < remoteEarliestOpTime.getTimestamp()) {
// We're too stale to use this sync source.
const auto blacklistDuration = kTooStaleBlacklistDuration;
const auto until = _taskExecutor->now() + Minutes(1);
log() << "We are too stale to use " << candidate << " as a sync source. "
<< "Blacklisting this sync source"
<< " because our last fetched timestamp: " << _lastOpTimeFetched.getTimestamp()
<< " is before their earliest timestamp: " << remoteEarliestOpTime.getTimestamp()
<< " for " << blacklistDuration << " until: " << until;
_syncSourceSelector->blacklistSyncSource(candidate, until);
// If all the viable sync sources are too far ahead of us (i.e. we are "too stale" relative
// each sync source), we will want to return the starting timestamp of the sync source
// candidate that is closest to us. See SyncSourceResolverResponse::earliestOpTimeSeen.
// We use "earliestOpTimeSeen" to keep track of the current minimum starting timestamp.
if (earliestOpTimeSeen.isNull() ||
earliestOpTimeSeen.getTimestamp() > remoteEarliestOpTime.getTimestamp()) {
earliestOpTimeSeen = remoteEarliestOpTime;
}
_chooseAndProbeNextSyncSource(earliestOpTimeSeen).transitional_ignore();
return;
}
auto status = _scheduleRBIDRequest(candidate, earliestOpTimeSeen);
if (!status.isOK()) {
_finishCallback(status).ignore();
}
}
Status SyncSourceResolver::_scheduleRBIDRequest(HostAndPort candidate, OpTime earliestOpTimeSeen) {
// Once a work is scheduled, nothing prevents it finishing. We need the mutex to protect the
// access of member variables after scheduling, because otherwise the scheduled callback could
// finish and allow the destructor to fire before we access the member variables.
stdx::lock_guard lk(_mutex);
if (_state == State::kShuttingDown) {
return Status(
ErrorCodes::CallbackCanceled,
str::stream()
<< "sync source resolver shut down while checking rollbackId on candidate: "
<< candidate);
}
invariant(_state == State::kRunning);
auto handle = _taskExecutor->scheduleRemoteCommand(
{candidate, "admin", BSON("replSetGetRBID" << 1), nullptr, kFetcherTimeout},
[=](const executor::TaskExecutor::RemoteCommandCallbackArgs& rbidReply) {
_rbidRequestCallback(candidate, earliestOpTimeSeen, rbidReply);
});
if (!handle.isOK()) {
return handle.getStatus();
}
_rbidCommandHandle = std::move(handle.getValue());
return Status::OK();
}
void SyncSourceResolver::_rbidRequestCallback(
HostAndPort candidate,
OpTime earliestOpTimeSeen,
const executor::TaskExecutor::RemoteCommandCallbackArgs& rbidReply) {
if (rbidReply.response.status == ErrorCodes::CallbackCanceled) {
_finishCallback(rbidReply.response.status).transitional_ignore();
return;
}
int rbid = ReplicationProcess::kUninitializedRollbackId;
try {
uassertStatusOK(rbidReply.response.status);
uassertStatusOK(getStatusFromCommandResult(rbidReply.response.data));
rbid = rbidReply.response.data["rbid"].Int();
} catch (const DBException& ex) {
const auto until = _taskExecutor->now() + kFetcherErrorBlacklistDuration;
log() << "Blacklisting " << candidate << " due to error: '" << ex << "' for "
<< kFetcherErrorBlacklistDuration << " until: " << until;
_syncSourceSelector->blacklistSyncSource(candidate, until);
_chooseAndProbeNextSyncSource(earliestOpTimeSeen).transitional_ignore();
return;
}
if (!_requiredOpTime.isNull()) {
// Schedule fetcher to look for '_requiredOpTime' in the remote oplog.
// Unittest requires that this kind of failure be handled specially.
auto status =
_scheduleFetcher(_makeRequiredOpTimeFetcher(candidate, earliestOpTimeSeen, rbid));
if (!status.isOK()) {
_finishCallback(status).transitional_ignore();
}
return;
}
_finishCallback(candidate, rbid).ignore();
}
Status SyncSourceResolver::_compareRequiredOpTimeWithQueryResponse(
const Fetcher::QueryResponse& queryResponse) {
if (queryResponse.documents.empty()) {
return Status(
ErrorCodes::NoMatchingDocument,
"remote oplog does not contain entry with optime matching our required optime");
}
const OplogEntry oplogEntry(queryResponse.documents.front());
const auto opTime = oplogEntry.getOpTime();
if (_requiredOpTime != opTime) {
return Status(ErrorCodes::BadValue,
str::stream() << "remote oplog contain entry with matching timestamp "
<< opTime.getTimestamp().toString()
<< " but optime "
<< opTime.toString()
<< " does not "
"match our required optime");
}
if (_requiredOpTime.getTerm() != opTime.getTerm()) {
return Status(ErrorCodes::BadValue,
str::stream() << "remote oplog contain entry with term " << opTime.getTerm()
<< " that does not "
"match the term in our required optime");
}
return Status::OK();
}
void SyncSourceResolver::_requiredOpTimeFetcherCallback(
const StatusWith& queryResult,
HostAndPort candidate,
OpTime earliestOpTimeSeen,
int rbid) {
if (_isShuttingDown()) {
_finishCallback(Status(ErrorCodes::CallbackCanceled,
str::stream() << "sync source resolver shut down while looking for "
"required optime "
<< _requiredOpTime.toString()
<< " in candidate's oplog: "
<< candidate))
.transitional_ignore();
return;
}
if (ErrorCodes::CallbackCanceled == queryResult.getStatus()) {
_finishCallback(queryResult.getStatus()).transitional_ignore();
return;
}
if (!queryResult.isOK()) {
// We got an error.
const auto until = _taskExecutor->now() + kFetcherErrorBlacklistDuration;
log() << "Blacklisting " << candidate << " due to required optime fetcher error: '"
<< queryResult.getStatus() << "' for " << kFetcherErrorBlacklistDuration
<< " until: " << until << ". required optime: " << _requiredOpTime;
_syncSourceSelector->blacklistSyncSource(candidate, until);
_chooseAndProbeNextSyncSource(earliestOpTimeSeen).transitional_ignore();
return;
}
const auto& queryResponse = queryResult.getValue();
auto status = _compareRequiredOpTimeWithQueryResponse(queryResponse);
if (!status.isOK()) {
const auto until = _taskExecutor->now() + kNoRequiredOpTimeBlacklistDuration;
warning() << "We cannot use " << candidate.toString()
<< " as a sync source because it does not contain the necessary "
"operations for us to reach a consistent state: "
<< status << " last fetched optime: " << _lastOpTimeFetched
<< ". required optime: " << _requiredOpTime
<< ". Blacklisting this sync source for " << kNoRequiredOpTimeBlacklistDuration
<< " until: " << until;
_syncSourceSelector->blacklistSyncSource(candidate, until);
_chooseAndProbeNextSyncSource(earliestOpTimeSeen).transitional_ignore();
return;
}
_finishCallback(candidate, rbid).ignore();
}
Status SyncSourceResolver::_chooseAndProbeNextSyncSource(OpTime earliestOpTimeSeen) {
auto candidateResult = _chooseNewSyncSource();
if (!candidateResult.isOK()) {
return _finishCallback(candidateResult.getStatus());
}
if (candidateResult.getValue().empty()) {
if (earliestOpTimeSeen.isNull()) {
return _finishCallback(candidateResult.getValue(),
ReplicationProcess::kUninitializedRollbackId);
}
SyncSourceResolverResponse response;
response.syncSourceStatus = {ErrorCodes::OplogStartMissing, "too stale to catch up"};
response.earliestOpTimeSeen = earliestOpTimeSeen;
return _finishCallback(response);
}
auto status = _scheduleFetcher(
_makeFirstOplogEntryFetcher(candidateResult.getValue(), earliestOpTimeSeen));
if (!status.isOK()) {
return _finishCallback(status);
}
return Status::OK();
}
Status SyncSourceResolver::_finishCallback(HostAndPort hostAndPort, int rbid) {
SyncSourceResolverResponse response;
response.syncSourceStatus = std::move(hostAndPort);
if (rbid != ReplicationProcess::kUninitializedRollbackId) {
response.rbid = rbid;
}
return _finishCallback(response);
}
Status SyncSourceResolver::_finishCallback(Status status) {
invariant(!status.isOK());
SyncSourceResolverResponse response;
response.syncSourceStatus = std::move(status);
return _finishCallback(response);
}
Status SyncSourceResolver::_finishCallback(const SyncSourceResolverResponse& response) {
try {
_onCompletion(response);
} catch (...) {
warning() << "sync source resolver finish callback threw exception: "
<< exceptionToStatus();
}
stdx::lock_guard lock(_mutex);
invariant(_state != State::kComplete);
_state = State::kComplete;
_condition.notify_all();
return response.syncSourceStatus.getStatus();
}
} // namespace repl
} // namespace mongo