/**
* Copyright (C) 2016 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
#include "mongo/platform/basic.h"
#include "mongo/db/repl/oplog_fetcher.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/log.h"
#include "mongo/util/time_support.h"
namespace mongo {
namespace repl {
Seconds OplogFetcher::kDefaultProtocolZeroAwaitDataTimeout(2);
namespace {
/**
* Calculates await data timeout based on the current replica set configuration.
*/
Milliseconds calculateAwaitDataTimeout(const ReplicaSetConfig& config) {
// Under protocol version 1, make the awaitData timeout (maxTimeMS) dependent on the election
// timeout. This enables the sync source to communicate liveness of the primary to secondaries.
// Under protocol version 0, use a default timeout of 2 seconds for awaitData.
if (config.getProtocolVersion() == 1LL) {
return config.getElectionTimeoutPeriod() / 2;
}
return OplogFetcher::kDefaultProtocolZeroAwaitDataTimeout;
}
/**
* Returns find command object suitable for tailing remote oplog.
*/
BSONObj makeFindCommandObject(DataReplicatorExternalState* dataReplicatorExternalState,
const NamespaceString& nss,
OpTime lastOpTimeFetched) {
invariant(dataReplicatorExternalState);
BSONObjBuilder cmdBob;
cmdBob.append("find", nss.coll());
cmdBob.append("filter", BSON("ts" << BSON("$gte" << lastOpTimeFetched.getTimestamp())));
cmdBob.append("tailable", true);
cmdBob.append("oplogReplay", true);
cmdBob.append("awaitData", true);
cmdBob.append("maxTimeMS", durationCount(Minutes(1))); // 1 min initial find.
auto opTimeWithTerm = dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime();
if (opTimeWithTerm.value != OpTime::kUninitializedTerm) {
cmdBob.append("term", opTimeWithTerm.value);
}
return cmdBob.obj();
}
/**
* Returns getMore command object suitable for tailing remote oplog.
*/
BSONObj makeGetMoreCommandObject(DataReplicatorExternalState* dataReplicatorExternalState,
const NamespaceString& nss,
CursorId cursorId,
Milliseconds fetcherMaxTimeMS) {
BSONObjBuilder cmdBob;
cmdBob.append("getMore", cursorId);
cmdBob.append("collection", nss.coll());
cmdBob.append("maxTimeMS", durationCount(fetcherMaxTimeMS));
auto opTimeWithTerm = dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime();
if (opTimeWithTerm.value != OpTime::kUninitializedTerm) {
cmdBob.append("term", opTimeWithTerm.value);
opTimeWithTerm.opTime.append(&cmdBob, "lastKnownCommittedOpTime");
}
return cmdBob.obj();
}
/**
* Returns command metadata object suitable for tailing remote oplog.
*/
StatusWith makeMetadataObject(bool isV1ElectionProtocol) {
return isV1ElectionProtocol ? BSON(rpc::kReplSetMetadataFieldName << 1)
: rpc::makeEmptyMetadata();
}
/**
* Checks the first batch of results from query.
* 'documents' are the first batch of results returned from tailing the remote oplog.
* 'lastFetched' optime and hash should be consistent with the predicate in the query.
* Returns RemoteOplogStale if the oplog query has no results.
* Returns OplogStartMissing if we cannot find the optime of the last fetched operation in
* the remote oplog.
*/
Status checkRemoteOplogStart(const Fetcher::Documents& documents, OpTimeWithHash lastFetched) {
if (documents.empty()) {
// The GTE query from upstream returns nothing, so we're ahead of the upstream.
return Status(ErrorCodes::RemoteOplogStale,
str::stream() << "We are ahead of the sync source. Our last op time fetched: "
<< lastFetched.opTime.toString());
}
const auto& o = documents.front();
auto opTimeResult = OpTime::parseFromOplogEntry(o);
if (!opTimeResult.isOK()) {
return Status(ErrorCodes::OplogStartMissing,
str::stream() << "our last op time fetched: " << lastFetched.opTime.toString()
<< " (hash: "
<< lastFetched.value
<< ")"
<< ". failed to parse optime from first oplog on source: "
<< o.toString()
<< ": "
<< opTimeResult.getStatus().toString());
}
auto opTime = opTimeResult.getValue();
long long hash = o["h"].numberLong();
if (opTime != lastFetched.opTime || hash != lastFetched.value) {
return Status(ErrorCodes::OplogStartMissing,
str::stream() << "our last op time fetched: " << lastFetched.opTime.toString()
<< ". source's GTE: "
<< opTime.toString()
<< " hashes: ("
<< lastFetched.value
<< "/"
<< hash
<< ")");
}
return Status::OK();
}
} // namespace
StatusWith OplogFetcher::validateDocuments(
const Fetcher::Documents& documents, bool first, Timestamp lastTS) {
if (first && documents.empty()) {
return Status(ErrorCodes::OplogStartMissing,
str::stream() << "The first batch of oplog entries is empty, but expected at "
"least 1 document matching ts: "
<< lastTS.toString());
}
DocumentsInfo info;
// The count of the bytes of the documents read off the network.
info.networkDocumentBytes = 0;
info.networkDocumentCount = 0;
for (auto&& doc : documents) {
info.networkDocumentBytes += doc.objsize();
++info.networkDocumentCount;
// If this is the first response (to the $gte query) then we already applied the first doc.
if (first && info.networkDocumentCount == 1U) {
continue;
}
// Check to see if the oplog entry goes back in time for this document.
const auto docOpTime = OpTime::parseFromOplogEntry(doc);
// entries must have a "ts" field.
if (!docOpTime.isOK()) {
return docOpTime.getStatus();
}
info.lastDocument = {doc["h"].numberLong(), docOpTime.getValue()};
const auto docTS = info.lastDocument.opTime.getTimestamp();
if (lastTS >= docTS) {
return Status(ErrorCodes::OplogOutOfOrder,
str::stream() << "Out of order entries in oplog. lastTS: "
<< lastTS.toString()
<< " outOfOrderTS:"
<< docTS.toString()
<< " at count:"
<< info.networkDocumentCount);
}
lastTS = docTS;
}
// These numbers are for the documents we will apply.
info.toApplyDocumentCount = documents.size();
info.toApplyDocumentBytes = info.networkDocumentBytes;
if (first) {
// The count is one less since the first document found was already applied ($gte $ts query)
// and we will not apply it again.
--info.toApplyDocumentCount;
auto alreadyAppliedDocument = documents.cbegin();
info.toApplyDocumentBytes -= alreadyAppliedDocument->objsize();
}
return info;
}
OplogFetcher::OplogFetcher(executor::TaskExecutor* exec,
OpTimeWithHash lastFetched,
HostAndPort source,
NamespaceString oplogNSS,
ReplicaSetConfig config,
DataReplicatorExternalState* dataReplicatorExternalState,
EnqueueDocumentsFn enqueueDocumentsFn,
OnShutdownCallbackFn onShutdownCallbackFn)
: _dataReplicatorExternalState(dataReplicatorExternalState),
_fetcher(exec,
source,
oplogNSS.db().toString(),
makeFindCommandObject(dataReplicatorExternalState, oplogNSS, lastFetched.opTime),
stdx::bind(
&OplogFetcher::_callback, this, stdx::placeholders::_1, stdx::placeholders::_3),
uassertStatusOK(makeMetadataObject(config.getProtocolVersion() == 1LL)),
config.getElectionTimeoutPeriod()),
_enqueueDocumentsFn(enqueueDocumentsFn),
_awaitDataTimeout(calculateAwaitDataTimeout(config)),
_onShutdownCallbackFn(onShutdownCallbackFn),
_lastFetched(lastFetched) {
uassert(ErrorCodes::BadValue, "null last optime fetched", !lastFetched.opTime.isNull());
uassert(ErrorCodes::InvalidReplicaSetConfig,
"uninitialized replica set configuration",
config.isInitialized());
uassert(ErrorCodes::BadValue, "null enqueueDocuments function", enqueueDocumentsFn);
uassert(ErrorCodes::BadValue, "null onShutdownCallback function", onShutdownCallbackFn);
}
std::string OplogFetcher::toString() const {
return str::stream() << "OplogReader -"
<< " last optime fetched: " << _lastFetched.opTime.toString()
<< " last hash fetched: " << _lastFetched.value
<< " fetcher: " << _fetcher.getDiagnosticString();
}
bool OplogFetcher::isActive() const {
return _fetcher.isActive();
}
Status OplogFetcher::startup() {
return _fetcher.schedule();
}
void OplogFetcher::shutdown() {
_fetcher.cancel();
}
void OplogFetcher::join() {
_fetcher.wait();
}
OpTimeWithHash OplogFetcher::getLastOpTimeWithHashFetched() const {
stdx::lock_guard lock(_mutex);
return _lastFetched;
}
BSONObj OplogFetcher::getCommandObject_forTest() const {
return _fetcher.getCommandObject();
}
BSONObj OplogFetcher::getMetadataObject_forTest() const {
return _fetcher.getMetadataObject();
}
Milliseconds OplogFetcher::getRemoteCommandTimeout_forTest() const {
return _fetcher.getTimeout();
}
Milliseconds OplogFetcher::getAwaitDataTimeout_forTest() const {
return _awaitDataTimeout;
}
void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result,
BSONObjBuilder* getMoreBob) {
// if target cut connections between connecting and querying (for
// example, because it stepped down) we might not have a cursor
if (!result.isOK()) {
LOG(2) << "Error returned from oplog query: " << result.getStatus();
_onShutdown(result.getStatus());
return;
}
const auto& queryResponse = result.getValue();
rpc::ReplSetMetadata metadata;
// Forward metadata (containing liveness information) to data replicator external state.
bool receivedMetadata =
queryResponse.otherFields.metadata.hasElement(rpc::kReplSetMetadataFieldName);
if (receivedMetadata) {
const auto& metadataObj = queryResponse.otherFields.metadata;
auto metadataResult = rpc::ReplSetMetadata::readFromMetadata(metadataObj);
if (!metadataResult.isOK()) {
error() << "invalid replication metadata from sync source " << _fetcher.getSource()
<< ": " << metadataResult.getStatus() << ": " << metadataObj;
_onShutdown(metadataResult.getStatus());
return;
}
metadata = metadataResult.getValue();
_dataReplicatorExternalState->processMetadata(metadata);
}
const auto& documents = queryResponse.documents;
auto firstDocToApply = documents.cbegin();
if (!documents.empty()) {
LOG(2) << "oplog fetcher read " << documents.size()
<< " operations from remote oplog starting at " << documents.front()["ts"]
<< " and ending at " << documents.back()["ts"];
} else {
LOG(2) << "oplog fetcher read 0 operations from remote oplog";
}
auto opTimeWithHash = getLastOpTimeWithHashFetched();
// Check start of remote oplog and, if necessary, stop fetcher to execute rollback.
if (queryResponse.first) {
auto status = checkRemoteOplogStart(documents, opTimeWithHash);
if (!status.isOK()) {
// Stop oplog fetcher and execute rollback.
_onShutdown(status, opTimeWithHash);
return;
}
// If this is the first batch and no rollback is needed, skip the first document.
firstDocToApply++;
}
auto validateResult = OplogFetcher::validateDocuments(
documents, queryResponse.first, opTimeWithHash.opTime.getTimestamp());
if (!validateResult.isOK()) {
_onShutdown(validateResult.getStatus(), opTimeWithHash);
return;
}
auto info = validateResult.getValue();
// TODO: back pressure handling will be added in SERVER-23499.
_enqueueDocumentsFn(firstDocToApply, documents.cend(), info, queryResponse.elapsedMillis);
// Update last fetched info.
if (firstDocToApply != documents.cend()) {
opTimeWithHash = info.lastDocument;
LOG(3) << "batch resetting last fetched optime: " << opTimeWithHash.opTime
<< "; hash: " << opTimeWithHash.value;
stdx::unique_lock lock(_mutex);
_lastFetched = opTimeWithHash;
}
if (_dataReplicatorExternalState->shouldStopFetching(_fetcher.getSource(), metadata)) {
_onShutdown(Status(ErrorCodes::InvalidSyncSource,
str::stream() << "sync source " << _fetcher.getSource().toString()
<< " (last optime: "
<< metadata.getLastOpVisible().toString()
<< "; sync source index: "
<< metadata.getSyncSourceIndex()
<< "; primary index: "
<< metadata.getPrimaryIndex()
<< ") is no longer valid"),
opTimeWithHash);
return;
}
// No more data. Stop processing and return Status::OK along with last
// fetch info.
if (!getMoreBob) {
_onShutdown(Status::OK(), opTimeWithHash);
return;
}
getMoreBob->appendElements(makeGetMoreCommandObject(_dataReplicatorExternalState,
queryResponse.nss,
queryResponse.cursorId,
_awaitDataTimeout));
}
void OplogFetcher::_onShutdown(Status status) {
_onShutdown(status, getLastOpTimeWithHashFetched());
}
void OplogFetcher::_onShutdown(Status status, OpTimeWithHash opTimeWithHash) {
_onShutdownCallbackFn(status, opTimeWithHash);
}
} // namespace repl
} // namespace mongo