diff options
author | Judah Schvimer <judah@mongodb.com> | 2017-04-18 10:06:14 -0400 |
---|---|---|
committer | Judah Schvimer <judah@mongodb.com> | 2017-04-18 10:06:14 -0400 |
commit | da37567fe37e39a52a96dd75c8929fabd096d2cb (patch) | |
tree | ada3cef019fea8f0cb8f622e8a06f35400e0b450 /src/mongo/db/repl/oplog_fetcher.cpp | |
parent | bae70bcec33c9e45a8adfa54c8e4468b60093d04 (diff) | |
download | mongo-da37567fe37e39a52a96dd75c8929fabd096d2cb.tar.gz |
SERVER-28209 Implement RollbackCommonPointResolver
Diffstat (limited to 'src/mongo/db/repl/oplog_fetcher.cpp')
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher.cpp | 281 |
1 files changed, 59 insertions, 222 deletions
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp index 32f0f462fed..54b572b3e68 100644 --- a/src/mongo/db/repl/oplog_fetcher.cpp +++ b/src/mongo/db/repl/oplog_fetcher.cpp @@ -39,10 +39,7 @@ #include "mongo/db/stats/timer_stats.h" #include "mongo/rpc/metadata/oplog_query_metadata.h" #include "mongo/rpc/metadata/server_selection_metadata.h" -#include "mongo/stdx/memory.h" -#include "mongo/stdx/mutex.h" #include "mongo/util/assert_util.h" -#include "mongo/util/destructor_guard.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/util/time_support.h" @@ -56,12 +53,6 @@ MONGO_FP_DECLARE(stopReplProducer); namespace { -Seconds kOplogInitialFindMaxTime{60}; -Seconds kOplogQueryNetworkTimeout{65}; // 5 seconds past the find command's 1 minute maxTimeMs - -Counter64 readersCreatedStats; -ServerStatusMetricField<Counter64> displayReadersCreated("repl.network.readersCreated", - &readersCreatedStats); // The number and time spent reading batches off the network TimerStats getmoreReplStats; ServerStatusMetricField<TimerStats> displayBatchesRecieved("repl.network.getmores", @@ -87,25 +78,6 @@ Milliseconds calculateAwaitDataTimeout(const ReplSetConfig& config) { } /** - * Returns find command object suitable for tailing remote oplog. - */ -BSONObj makeFindCommandObject(const NamespaceString& nss, - long long currentTerm, - OpTime lastOpTimeFetched) { - BSONObjBuilder cmdBob; - cmdBob.append("find", nss.coll()); - cmdBob.append("filter", BSON("ts" << BSON("$gte" << lastOpTimeFetched.getTimestamp()))); - cmdBob.append("tailable", true); - cmdBob.append("oplogReplay", true); - cmdBob.append("awaitData", true); - cmdBob.append("maxTimeMS", durationCount<Milliseconds>(kOplogInitialFindMaxTime)); - if (currentTerm != OpTime::kUninitializedTerm) { - cmdBob.append("term", currentTerm); - } - return cmdBob.obj(); -} - -/** * Returns getMore command object suitable for tailing remote oplog. */ BSONObj makeGetMoreCommandObject(const NamespaceString& nss, @@ -293,15 +265,13 @@ StatusWith<OplogFetcher::DocumentsInfo> OplogFetcher::validateDocuments( continue; } - // Check to see if the oplog entry goes back in time for this document. - const auto docOpTime = OpTime::parseFromOplogEntry(doc); - // entries must have a "ts" field. - if (!docOpTime.isOK()) { - return docOpTime.getStatus(); + auto docOpTimeWithHash = AbstractOplogFetcher::parseOpTimeWithHash(doc); + if (!docOpTimeWithHash.isOK()) { + return docOpTimeWithHash.getStatus(); } + info.lastDocument = docOpTimeWithHash.getValue(); - info.lastDocument = {doc["h"].numberLong(), docOpTime.getValue()}; - + // Check to see if the oplog entry goes back in time for this document. const auto docTS = info.lastDocument.opTime.getTimestamp(); if (lastTS >= docTS) { return Status(ErrorCodes::OplogOutOfOrder, @@ -343,65 +313,50 @@ OplogFetcher::OplogFetcher(executor::TaskExecutor* executor, DataReplicatorExternalState* dataReplicatorExternalState, EnqueueDocumentsFn enqueueDocumentsFn, OnShutdownCallbackFn onShutdownCallbackFn) - : AbstractAsyncComponent(executor, "oplog fetcher"), - _source(source), - _nss(nss), + : AbstractOplogFetcher(executor, + lastFetched, + source, + nss, + maxFetcherRestarts, + onShutdownCallbackFn, + "oplog fetcher"), _metadataObject(uassertStatusOK(makeMetadataObject(config.getProtocolVersion() == 1LL))), - _maxFetcherRestarts(maxFetcherRestarts), _requiredRBID(requiredRBID), _requireFresherSyncSource(requireFresherSyncSource), _dataReplicatorExternalState(dataReplicatorExternalState), _enqueueDocumentsFn(enqueueDocumentsFn), - _awaitDataTimeout(calculateAwaitDataTimeout(config)), - _onShutdownCallbackFn(onShutdownCallbackFn), - _lastFetched(lastFetched) { - uassert(ErrorCodes::BadValue, "null last optime fetched", !_lastFetched.opTime.isNull()); - uassert(ErrorCodes::InvalidReplicaSetConfig, - "uninitialized replica set configuration", - config.isInitialized()); - uassert(ErrorCodes::BadValue, "null enqueueDocuments function", enqueueDocumentsFn); - uassert(ErrorCodes::BadValue, "null onShutdownCallback function", onShutdownCallbackFn); - - auto currentTerm = dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime().value; - _fetcher = _makeFetcher(currentTerm, _lastFetched.opTime); -} - -OplogFetcher::~OplogFetcher() { - DESTRUCTOR_GUARD(shutdown(); join();); -} - -std::string OplogFetcher::toString() const { - return str::stream() << "OplogReader -" - << " last optime fetched: " << _lastFetched.opTime.toString() - << " last hash fetched: " << _lastFetched.value - << " fetcher: " << _fetcher->getDiagnosticString(); -} - -Status OplogFetcher::_doStartup_inlock() noexcept { - return _scheduleFetcher_inlock(); -} + _awaitDataTimeout(calculateAwaitDataTimeout(config)) { -void OplogFetcher::_doShutdown_inlock() noexcept { - _fetcher->shutdown(); + invariant(config.isInitialized()); + invariant(enqueueDocumentsFn); } -stdx::mutex* OplogFetcher::_getMutex() noexcept { - return &_mutex; -} - -Status OplogFetcher::_scheduleFetcher_inlock() { - readersCreatedStats.increment(); - return _fetcher->schedule(); +OplogFetcher::~OplogFetcher() { + shutdown(); + join(); } -OpTimeWithHash OplogFetcher::getLastOpTimeWithHashFetched() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); - return _lastFetched; +BSONObj OplogFetcher::_makeFindCommandObject(const NamespaceString& nss, + OpTime lastOpTimeFetched) const { + auto lastCommittedWithCurrentTerm = + _dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime(); + auto term = lastCommittedWithCurrentTerm.value; + BSONObjBuilder cmdBob; + cmdBob.append("find", nss.coll()); + cmdBob.append("filter", BSON("ts" << BSON("$gte" << lastOpTimeFetched.getTimestamp()))); + cmdBob.append("tailable", true); + cmdBob.append("oplogReplay", true); + cmdBob.append("awaitData", true); + cmdBob.append("maxTimeMS", + durationCount<Milliseconds>(AbstractOplogFetcher::kOplogInitialFindMaxTime)); + if (term != OpTime::kUninitializedTerm) { + cmdBob.append("term", term); + } + return cmdBob.obj(); } -BSONObj OplogFetcher::getCommandObject_forTest() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); - return _fetcher->getCommandObject(); +BSONObj OplogFetcher::_makeMetadataObject() const { + return _metadataObject; } BSONObj OplogFetcher::getMetadataObject_forTest() const { @@ -412,76 +367,15 @@ Milliseconds OplogFetcher::getAwaitDataTimeout_forTest() const { return _awaitDataTimeout; } -void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result, - BSONObjBuilder* getMoreBob) { - const auto& responseStatus = result.getStatus(); - if (ErrorCodes::CallbackCanceled == responseStatus) { - LOG(1) << "oplog query cancelled"; - _finishCallback(responseStatus); - return; - } - - // If target cut connections between connecting and querying (for - // example, because it stepped down) we might not have a cursor. - if (!responseStatus.isOK()) { - { - // We have to call into replication coordinator outside oplog fetcher's mutex. - // It is OK if the current term becomes stale after this line since requests - // to remote nodes are asynchronous anyway. - auto currentTerm = - _dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime().value; - stdx::lock_guard<stdx::mutex> lock(_mutex); - if (_isShuttingDown_inlock()) { - log() << "Error returned from oplog query while canceling query: " - << redact(responseStatus); - } else if (_fetcherRestarts == _maxFetcherRestarts) { - log() << "Error returned from oplog query (no more query restarts left): " - << redact(responseStatus); - } else { - log() << "Restarting oplog query due to error: " << redact(responseStatus) - << ". Last fetched optime (with hash): " << _lastFetched - << ". Restarts remaining: " << (_maxFetcherRestarts - _fetcherRestarts); - _fetcherRestarts++; - // Destroying current instance in _shuttingDownFetcher will possibly block. - _shuttingDownFetcher.reset(); - // Move the old fetcher into the shutting down instance. - _shuttingDownFetcher.swap(_fetcher); - // Create and start fetcher with current term and new starting optime. - _fetcher = _makeFetcher(currentTerm, _lastFetched.opTime); - auto scheduleStatus = _scheduleFetcher_inlock(); - if (scheduleStatus.isOK()) { - log() << "Scheduled new oplog query " << _fetcher->toString(); - return; - } - error() << "Error scheduling new oplog query: " << redact(scheduleStatus) - << ". Returning current oplog query error: " << redact(responseStatus); - } - } - _finishCallback(responseStatus); - return; - } - - // Reset fetcher restart counter on successful response. - { - stdx::lock_guard<stdx::mutex> lock(_mutex); - invariant(_isActive_inlock()); - _fetcherRestarts = 0; - } - - if (_isShuttingDown()) { - _finishCallback(Status(ErrorCodes::CallbackCanceled, "oplog fetcher shutting down")); - return; - } +StatusWith<BSONObj> OplogFetcher::_onSuccessfulBatch(const Fetcher::QueryResponse& queryResponse) { // Stop fetching and return on fail point. // This fail point makes the oplog fetcher ignore the downloaded batch of operations and not - // error out. + // error out. The FailPointEnabled error will be caught by the AbstractOplogFetcher. if (MONGO_FAIL_POINT(stopReplProducer)) { - _finishCallback(Status::OK()); - return; + return Status(ErrorCodes::FailPointEnabled, "stopReplProducer fail point is enabled"); } - const auto& queryResponse = result.getValue(); const auto& documents = queryResponse.documents; auto firstDocToApply = documents.cbegin(); @@ -493,45 +387,43 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result, LOG(2) << "oplog fetcher read 0 operations from remote oplog"; } - auto opTimeWithHash = getLastOpTimeWithHashFetched(); - auto oqMetadataResult = parseOplogQueryMetadata(queryResponse); if (!oqMetadataResult.isOK()) { - error() << "invalid oplog query metadata from sync source " << _fetcher->getSource() << ": " + error() << "invalid oplog query metadata from sync source " << _getSource() << ": " << oqMetadataResult.getStatus() << ": " << queryResponse.otherFields.metadata; - _finishCallback(oqMetadataResult.getStatus()); - return; + return oqMetadataResult.getStatus(); } auto oqMetadata = oqMetadataResult.getValue(); + // This lastFetched value is the last OpTime from the previous batch. + auto lastFetched = _getLastOpTimeWithHashFetched(); + // Check start of remote oplog and, if necessary, stop fetcher to execute rollback. if (queryResponse.first) { auto remoteRBID = oqMetadata ? boost::make_optional(oqMetadata->getRBID()) : boost::none; auto remoteLastApplied = oqMetadata ? boost::make_optional(oqMetadata->getLastOpApplied()) : boost::none; auto status = checkRemoteOplogStart(documents, - opTimeWithHash, + lastFetched, remoteLastApplied, _requiredRBID, remoteRBID, _requireFresherSyncSource); if (!status.isOK()) { // Stop oplog fetcher and execute rollback if necessary. - _finishCallback(status, opTimeWithHash); - return; + return status; } - LOG(1) << "oplog fetcher successfully fetched from " << _source; + LOG(1) << "oplog fetcher successfully fetched from " << _getSource(); // If this is the first batch and no rollback is needed, skip the first document. firstDocToApply++; } auto validateResult = OplogFetcher::validateDocuments( - documents, queryResponse.first, opTimeWithHash.opTime.getTimestamp()); + documents, queryResponse.first, lastFetched.opTime.getTimestamp()); if (!validateResult.isOK()) { - _finishCallback(validateResult.getStatus(), opTimeWithHash); - return; + return validateResult.getStatus(); } auto info = validateResult.getValue(); @@ -545,10 +437,9 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result, const auto& metadataObj = queryResponse.otherFields.metadata; auto metadataResult = rpc::ReplSetMetadata::readFromMetadata(metadataObj); if (!metadataResult.isOK()) { - error() << "invalid replication metadata from sync source " << _fetcher->getSource() - << ": " << metadataResult.getStatus() << ": " << metadataObj; - _finishCallback(metadataResult.getStatus()); - return; + error() << "invalid replication metadata from sync source " << _getSource() << ": " + << metadataResult.getStatus() << ": " << metadataObj; + return metadataResult.getStatus(); } replSetMetadata = metadataResult.getValue(); @@ -567,24 +458,13 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result, // TODO: back pressure handling will be added in SERVER-23499. auto status = _enqueueDocumentsFn(firstDocToApply, documents.cend(), info); if (!status.isOK()) { - _finishCallback(status); - return; - } - - // Update last fetched info. - if (firstDocToApply != documents.cend()) { - opTimeWithHash = info.lastDocument; - LOG(3) << "batch resetting last fetched optime: " << opTimeWithHash.opTime - << "; hash: " << opTimeWithHash.value; - - stdx::unique_lock<stdx::mutex> lock(_mutex); - _lastFetched = opTimeWithHash; + return status; } if (_dataReplicatorExternalState->shouldStopFetching( - _fetcher->getSource(), replSetMetadata, oqMetadata)) { + _getSource(), replSetMetadata, oqMetadata)) { str::stream errMsg; - errMsg << "sync source " << _fetcher->getSource().toString(); + errMsg << "sync source " << _getSource().toString(); errMsg << " (config version: " << replSetMetadata.getConfigVersion(); // If OplogQueryMetadata was provided, its values were used to determine if we should // stop fetching from this sync source. @@ -598,56 +478,13 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result, errMsg << "; primary index: " << replSetMetadata.getPrimaryIndex(); } errMsg << ") is no longer valid"; - _finishCallback(Status(ErrorCodes::InvalidSyncSource, errMsg), opTimeWithHash); - return; - } - - // No more data. Stop processing and return Status::OK along with last - // fetch info. - if (!getMoreBob) { - _finishCallback(Status::OK(), opTimeWithHash); - return; + return Status(ErrorCodes::InvalidSyncSource, errMsg); } auto lastCommittedWithCurrentTerm = _dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime(); - getMoreBob->appendElements(makeGetMoreCommandObject(queryResponse.nss, - queryResponse.cursorId, - lastCommittedWithCurrentTerm, - _awaitDataTimeout)); -} - -void OplogFetcher::_finishCallback(Status status) { - _finishCallback(status, getLastOpTimeWithHashFetched()); + return makeGetMoreCommandObject( + queryResponse.nss, queryResponse.cursorId, lastCommittedWithCurrentTerm, _awaitDataTimeout); } - -void OplogFetcher::_finishCallback(Status status, OpTimeWithHash opTimeWithHash) { - invariant(isActive()); - - _onShutdownCallbackFn(status, opTimeWithHash); - - decltype(_onShutdownCallbackFn) onShutdownCallbackFn; - stdx::lock_guard<stdx::mutex> lock(_mutex); - _transitionToComplete_inlock(); - - // Release any resources that might be held by the '_onShutdownCallbackFn' function object. - // The function object will be destroyed outside the lock since the temporary variable - // 'onShutdownCallbackFn' is declared before 'lock'. - invariant(_onShutdownCallbackFn); - std::swap(_onShutdownCallbackFn, onShutdownCallbackFn); -} - -std::unique_ptr<Fetcher> OplogFetcher::_makeFetcher(long long currentTerm, - OpTime lastFetchedOpTime) { - return stdx::make_unique<Fetcher>( - _getExecutor(), - _source, - _nss.db().toString(), - makeFindCommandObject(_nss, currentTerm, lastFetchedOpTime), - stdx::bind(&OplogFetcher::_callback, this, stdx::placeholders::_1, stdx::placeholders::_3), - _metadataObject, - kOplogQueryNetworkTimeout); -} - } // namespace repl } // namespace mongo |