summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/oplog_fetcher.cpp
diff options
context:
space:
mode:
authorJudah Schvimer <judah@mongodb.com>2017-04-18 10:06:14 -0400
committerJudah Schvimer <judah@mongodb.com>2017-04-18 10:06:14 -0400
commitda37567fe37e39a52a96dd75c8929fabd096d2cb (patch)
treeada3cef019fea8f0cb8f622e8a06f35400e0b450 /src/mongo/db/repl/oplog_fetcher.cpp
parentbae70bcec33c9e45a8adfa54c8e4468b60093d04 (diff)
downloadmongo-da37567fe37e39a52a96dd75c8929fabd096d2cb.tar.gz
SERVER-28209 Implement RollbackCommonPointResolver
Diffstat (limited to 'src/mongo/db/repl/oplog_fetcher.cpp')
-rw-r--r--src/mongo/db/repl/oplog_fetcher.cpp281
1 files changed, 59 insertions, 222 deletions
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp
index 32f0f462fed..54b572b3e68 100644
--- a/src/mongo/db/repl/oplog_fetcher.cpp
+++ b/src/mongo/db/repl/oplog_fetcher.cpp
@@ -39,10 +39,7 @@
#include "mongo/db/stats/timer_stats.h"
#include "mongo/rpc/metadata/oplog_query_metadata.h"
#include "mongo/rpc/metadata/server_selection_metadata.h"
-#include "mongo/stdx/memory.h"
-#include "mongo/stdx/mutex.h"
#include "mongo/util/assert_util.h"
-#include "mongo/util/destructor_guard.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/time_support.h"
@@ -56,12 +53,6 @@ MONGO_FP_DECLARE(stopReplProducer);
namespace {
-Seconds kOplogInitialFindMaxTime{60};
-Seconds kOplogQueryNetworkTimeout{65}; // 5 seconds past the find command's 1 minute maxTimeMs
-
-Counter64 readersCreatedStats;
-ServerStatusMetricField<Counter64> displayReadersCreated("repl.network.readersCreated",
- &readersCreatedStats);
// The number and time spent reading batches off the network
TimerStats getmoreReplStats;
ServerStatusMetricField<TimerStats> displayBatchesRecieved("repl.network.getmores",
@@ -87,25 +78,6 @@ Milliseconds calculateAwaitDataTimeout(const ReplSetConfig& config) {
}
/**
- * Returns find command object suitable for tailing remote oplog.
- */
-BSONObj makeFindCommandObject(const NamespaceString& nss,
- long long currentTerm,
- OpTime lastOpTimeFetched) {
- BSONObjBuilder cmdBob;
- cmdBob.append("find", nss.coll());
- cmdBob.append("filter", BSON("ts" << BSON("$gte" << lastOpTimeFetched.getTimestamp())));
- cmdBob.append("tailable", true);
- cmdBob.append("oplogReplay", true);
- cmdBob.append("awaitData", true);
- cmdBob.append("maxTimeMS", durationCount<Milliseconds>(kOplogInitialFindMaxTime));
- if (currentTerm != OpTime::kUninitializedTerm) {
- cmdBob.append("term", currentTerm);
- }
- return cmdBob.obj();
-}
-
-/**
* Returns getMore command object suitable for tailing remote oplog.
*/
BSONObj makeGetMoreCommandObject(const NamespaceString& nss,
@@ -293,15 +265,13 @@ StatusWith<OplogFetcher::DocumentsInfo> OplogFetcher::validateDocuments(
continue;
}
- // Check to see if the oplog entry goes back in time for this document.
- const auto docOpTime = OpTime::parseFromOplogEntry(doc);
- // entries must have a "ts" field.
- if (!docOpTime.isOK()) {
- return docOpTime.getStatus();
+ auto docOpTimeWithHash = AbstractOplogFetcher::parseOpTimeWithHash(doc);
+ if (!docOpTimeWithHash.isOK()) {
+ return docOpTimeWithHash.getStatus();
}
+ info.lastDocument = docOpTimeWithHash.getValue();
- info.lastDocument = {doc["h"].numberLong(), docOpTime.getValue()};
-
+ // Check to see if the oplog entry goes back in time for this document.
const auto docTS = info.lastDocument.opTime.getTimestamp();
if (lastTS >= docTS) {
return Status(ErrorCodes::OplogOutOfOrder,
@@ -343,65 +313,50 @@ OplogFetcher::OplogFetcher(executor::TaskExecutor* executor,
DataReplicatorExternalState* dataReplicatorExternalState,
EnqueueDocumentsFn enqueueDocumentsFn,
OnShutdownCallbackFn onShutdownCallbackFn)
- : AbstractAsyncComponent(executor, "oplog fetcher"),
- _source(source),
- _nss(nss),
+ : AbstractOplogFetcher(executor,
+ lastFetched,
+ source,
+ nss,
+ maxFetcherRestarts,
+ onShutdownCallbackFn,
+ "oplog fetcher"),
_metadataObject(uassertStatusOK(makeMetadataObject(config.getProtocolVersion() == 1LL))),
- _maxFetcherRestarts(maxFetcherRestarts),
_requiredRBID(requiredRBID),
_requireFresherSyncSource(requireFresherSyncSource),
_dataReplicatorExternalState(dataReplicatorExternalState),
_enqueueDocumentsFn(enqueueDocumentsFn),
- _awaitDataTimeout(calculateAwaitDataTimeout(config)),
- _onShutdownCallbackFn(onShutdownCallbackFn),
- _lastFetched(lastFetched) {
- uassert(ErrorCodes::BadValue, "null last optime fetched", !_lastFetched.opTime.isNull());
- uassert(ErrorCodes::InvalidReplicaSetConfig,
- "uninitialized replica set configuration",
- config.isInitialized());
- uassert(ErrorCodes::BadValue, "null enqueueDocuments function", enqueueDocumentsFn);
- uassert(ErrorCodes::BadValue, "null onShutdownCallback function", onShutdownCallbackFn);
-
- auto currentTerm = dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime().value;
- _fetcher = _makeFetcher(currentTerm, _lastFetched.opTime);
-}
-
-OplogFetcher::~OplogFetcher() {
- DESTRUCTOR_GUARD(shutdown(); join(););
-}
-
-std::string OplogFetcher::toString() const {
- return str::stream() << "OplogReader -"
- << " last optime fetched: " << _lastFetched.opTime.toString()
- << " last hash fetched: " << _lastFetched.value
- << " fetcher: " << _fetcher->getDiagnosticString();
-}
-
-Status OplogFetcher::_doStartup_inlock() noexcept {
- return _scheduleFetcher_inlock();
-}
+ _awaitDataTimeout(calculateAwaitDataTimeout(config)) {
-void OplogFetcher::_doShutdown_inlock() noexcept {
- _fetcher->shutdown();
+ invariant(config.isInitialized());
+ invariant(enqueueDocumentsFn);
}
-stdx::mutex* OplogFetcher::_getMutex() noexcept {
- return &_mutex;
-}
-
-Status OplogFetcher::_scheduleFetcher_inlock() {
- readersCreatedStats.increment();
- return _fetcher->schedule();
+OplogFetcher::~OplogFetcher() {
+ shutdown();
+ join();
}
-OpTimeWithHash OplogFetcher::getLastOpTimeWithHashFetched() const {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- return _lastFetched;
+BSONObj OplogFetcher::_makeFindCommandObject(const NamespaceString& nss,
+ OpTime lastOpTimeFetched) const {
+ auto lastCommittedWithCurrentTerm =
+ _dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime();
+ auto term = lastCommittedWithCurrentTerm.value;
+ BSONObjBuilder cmdBob;
+ cmdBob.append("find", nss.coll());
+ cmdBob.append("filter", BSON("ts" << BSON("$gte" << lastOpTimeFetched.getTimestamp())));
+ cmdBob.append("tailable", true);
+ cmdBob.append("oplogReplay", true);
+ cmdBob.append("awaitData", true);
+ cmdBob.append("maxTimeMS",
+ durationCount<Milliseconds>(AbstractOplogFetcher::kOplogInitialFindMaxTime));
+ if (term != OpTime::kUninitializedTerm) {
+ cmdBob.append("term", term);
+ }
+ return cmdBob.obj();
}
-BSONObj OplogFetcher::getCommandObject_forTest() const {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- return _fetcher->getCommandObject();
+BSONObj OplogFetcher::_makeMetadataObject() const {
+ return _metadataObject;
}
BSONObj OplogFetcher::getMetadataObject_forTest() const {
@@ -412,76 +367,15 @@ Milliseconds OplogFetcher::getAwaitDataTimeout_forTest() const {
return _awaitDataTimeout;
}
-void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result,
- BSONObjBuilder* getMoreBob) {
- const auto& responseStatus = result.getStatus();
- if (ErrorCodes::CallbackCanceled == responseStatus) {
- LOG(1) << "oplog query cancelled";
- _finishCallback(responseStatus);
- return;
- }
-
- // If target cut connections between connecting and querying (for
- // example, because it stepped down) we might not have a cursor.
- if (!responseStatus.isOK()) {
- {
- // We have to call into replication coordinator outside oplog fetcher's mutex.
- // It is OK if the current term becomes stale after this line since requests
- // to remote nodes are asynchronous anyway.
- auto currentTerm =
- _dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime().value;
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- if (_isShuttingDown_inlock()) {
- log() << "Error returned from oplog query while canceling query: "
- << redact(responseStatus);
- } else if (_fetcherRestarts == _maxFetcherRestarts) {
- log() << "Error returned from oplog query (no more query restarts left): "
- << redact(responseStatus);
- } else {
- log() << "Restarting oplog query due to error: " << redact(responseStatus)
- << ". Last fetched optime (with hash): " << _lastFetched
- << ". Restarts remaining: " << (_maxFetcherRestarts - _fetcherRestarts);
- _fetcherRestarts++;
- // Destroying current instance in _shuttingDownFetcher will possibly block.
- _shuttingDownFetcher.reset();
- // Move the old fetcher into the shutting down instance.
- _shuttingDownFetcher.swap(_fetcher);
- // Create and start fetcher with current term and new starting optime.
- _fetcher = _makeFetcher(currentTerm, _lastFetched.opTime);
- auto scheduleStatus = _scheduleFetcher_inlock();
- if (scheduleStatus.isOK()) {
- log() << "Scheduled new oplog query " << _fetcher->toString();
- return;
- }
- error() << "Error scheduling new oplog query: " << redact(scheduleStatus)
- << ". Returning current oplog query error: " << redact(responseStatus);
- }
- }
- _finishCallback(responseStatus);
- return;
- }
-
- // Reset fetcher restart counter on successful response.
- {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- invariant(_isActive_inlock());
- _fetcherRestarts = 0;
- }
-
- if (_isShuttingDown()) {
- _finishCallback(Status(ErrorCodes::CallbackCanceled, "oplog fetcher shutting down"));
- return;
- }
+StatusWith<BSONObj> OplogFetcher::_onSuccessfulBatch(const Fetcher::QueryResponse& queryResponse) {
// Stop fetching and return on fail point.
// This fail point makes the oplog fetcher ignore the downloaded batch of operations and not
- // error out.
+ // error out. The FailPointEnabled error will be caught by the AbstractOplogFetcher.
if (MONGO_FAIL_POINT(stopReplProducer)) {
- _finishCallback(Status::OK());
- return;
+ return Status(ErrorCodes::FailPointEnabled, "stopReplProducer fail point is enabled");
}
- const auto& queryResponse = result.getValue();
const auto& documents = queryResponse.documents;
auto firstDocToApply = documents.cbegin();
@@ -493,45 +387,43 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result,
LOG(2) << "oplog fetcher read 0 operations from remote oplog";
}
- auto opTimeWithHash = getLastOpTimeWithHashFetched();
-
auto oqMetadataResult = parseOplogQueryMetadata(queryResponse);
if (!oqMetadataResult.isOK()) {
- error() << "invalid oplog query metadata from sync source " << _fetcher->getSource() << ": "
+ error() << "invalid oplog query metadata from sync source " << _getSource() << ": "
<< oqMetadataResult.getStatus() << ": " << queryResponse.otherFields.metadata;
- _finishCallback(oqMetadataResult.getStatus());
- return;
+ return oqMetadataResult.getStatus();
}
auto oqMetadata = oqMetadataResult.getValue();
+ // This lastFetched value is the last OpTime from the previous batch.
+ auto lastFetched = _getLastOpTimeWithHashFetched();
+
// Check start of remote oplog and, if necessary, stop fetcher to execute rollback.
if (queryResponse.first) {
auto remoteRBID = oqMetadata ? boost::make_optional(oqMetadata->getRBID()) : boost::none;
auto remoteLastApplied =
oqMetadata ? boost::make_optional(oqMetadata->getLastOpApplied()) : boost::none;
auto status = checkRemoteOplogStart(documents,
- opTimeWithHash,
+ lastFetched,
remoteLastApplied,
_requiredRBID,
remoteRBID,
_requireFresherSyncSource);
if (!status.isOK()) {
// Stop oplog fetcher and execute rollback if necessary.
- _finishCallback(status, opTimeWithHash);
- return;
+ return status;
}
- LOG(1) << "oplog fetcher successfully fetched from " << _source;
+ LOG(1) << "oplog fetcher successfully fetched from " << _getSource();
// If this is the first batch and no rollback is needed, skip the first document.
firstDocToApply++;
}
auto validateResult = OplogFetcher::validateDocuments(
- documents, queryResponse.first, opTimeWithHash.opTime.getTimestamp());
+ documents, queryResponse.first, lastFetched.opTime.getTimestamp());
if (!validateResult.isOK()) {
- _finishCallback(validateResult.getStatus(), opTimeWithHash);
- return;
+ return validateResult.getStatus();
}
auto info = validateResult.getValue();
@@ -545,10 +437,9 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result,
const auto& metadataObj = queryResponse.otherFields.metadata;
auto metadataResult = rpc::ReplSetMetadata::readFromMetadata(metadataObj);
if (!metadataResult.isOK()) {
- error() << "invalid replication metadata from sync source " << _fetcher->getSource()
- << ": " << metadataResult.getStatus() << ": " << metadataObj;
- _finishCallback(metadataResult.getStatus());
- return;
+ error() << "invalid replication metadata from sync source " << _getSource() << ": "
+ << metadataResult.getStatus() << ": " << metadataObj;
+ return metadataResult.getStatus();
}
replSetMetadata = metadataResult.getValue();
@@ -567,24 +458,13 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result,
// TODO: back pressure handling will be added in SERVER-23499.
auto status = _enqueueDocumentsFn(firstDocToApply, documents.cend(), info);
if (!status.isOK()) {
- _finishCallback(status);
- return;
- }
-
- // Update last fetched info.
- if (firstDocToApply != documents.cend()) {
- opTimeWithHash = info.lastDocument;
- LOG(3) << "batch resetting last fetched optime: " << opTimeWithHash.opTime
- << "; hash: " << opTimeWithHash.value;
-
- stdx::unique_lock<stdx::mutex> lock(_mutex);
- _lastFetched = opTimeWithHash;
+ return status;
}
if (_dataReplicatorExternalState->shouldStopFetching(
- _fetcher->getSource(), replSetMetadata, oqMetadata)) {
+ _getSource(), replSetMetadata, oqMetadata)) {
str::stream errMsg;
- errMsg << "sync source " << _fetcher->getSource().toString();
+ errMsg << "sync source " << _getSource().toString();
errMsg << " (config version: " << replSetMetadata.getConfigVersion();
// If OplogQueryMetadata was provided, its values were used to determine if we should
// stop fetching from this sync source.
@@ -598,56 +478,13 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result,
errMsg << "; primary index: " << replSetMetadata.getPrimaryIndex();
}
errMsg << ") is no longer valid";
- _finishCallback(Status(ErrorCodes::InvalidSyncSource, errMsg), opTimeWithHash);
- return;
- }
-
- // No more data. Stop processing and return Status::OK along with last
- // fetch info.
- if (!getMoreBob) {
- _finishCallback(Status::OK(), opTimeWithHash);
- return;
+ return Status(ErrorCodes::InvalidSyncSource, errMsg);
}
auto lastCommittedWithCurrentTerm =
_dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime();
- getMoreBob->appendElements(makeGetMoreCommandObject(queryResponse.nss,
- queryResponse.cursorId,
- lastCommittedWithCurrentTerm,
- _awaitDataTimeout));
-}
-
-void OplogFetcher::_finishCallback(Status status) {
- _finishCallback(status, getLastOpTimeWithHashFetched());
+ return makeGetMoreCommandObject(
+ queryResponse.nss, queryResponse.cursorId, lastCommittedWithCurrentTerm, _awaitDataTimeout);
}
-
-void OplogFetcher::_finishCallback(Status status, OpTimeWithHash opTimeWithHash) {
- invariant(isActive());
-
- _onShutdownCallbackFn(status, opTimeWithHash);
-
- decltype(_onShutdownCallbackFn) onShutdownCallbackFn;
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- _transitionToComplete_inlock();
-
- // Release any resources that might be held by the '_onShutdownCallbackFn' function object.
- // The function object will be destroyed outside the lock since the temporary variable
- // 'onShutdownCallbackFn' is declared before 'lock'.
- invariant(_onShutdownCallbackFn);
- std::swap(_onShutdownCallbackFn, onShutdownCallbackFn);
-}
-
-std::unique_ptr<Fetcher> OplogFetcher::_makeFetcher(long long currentTerm,
- OpTime lastFetchedOpTime) {
- return stdx::make_unique<Fetcher>(
- _getExecutor(),
- _source,
- _nss.db().toString(),
- makeFindCommandObject(_nss, currentTerm, lastFetchedOpTime),
- stdx::bind(&OplogFetcher::_callback, this, stdx::placeholders::_1, stdx::placeholders::_3),
- _metadataObject,
- kOplogQueryNetworkTimeout);
-}
-
} // namespace repl
} // namespace mongo