diff options
author | Benety Goh <benety@mongodb.com> | 2016-03-25 17:09:44 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2016-04-26 14:37:44 -0400 |
commit | 821c605f9aff83958e1c584a82d691b725e9adb6 (patch) | |
tree | c9b734aece99811140d95939933830da6f70d7e4 /src | |
parent | 66c992d4b93f0c9e0fbe400ccf95344ec9437823 (diff) | |
download | mongo-821c605f9aff83958e1c584a82d691b725e9adb6.tar.gz |
SERVER-22775 integrated OplogFetcher into BackgroundSync
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/repl/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 476 | ||||
-rw-r--r-- | src/mongo/db/repl/bgsync.h | 114 |
3 files changed, 241 insertions, 350 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 7552460f33f..8550b6560ba 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -8,6 +8,7 @@ env.Library( 'bgsync.cpp', ], LIBDEPS=[ + 'data_replicator_external_state_impl', 'repl_coordinator_interface', 'rollback_source_impl', 'rs_rollback', diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index cd25fbe664f..dfe7b287654 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -32,9 +32,8 @@ #include "mongo/db/repl/bgsync.h" -#include <memory> - #include "mongo/base/counter.h" +#include "mongo/bson/util/bson_extract.h" #include "mongo/client/connection_pool.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/client.h" @@ -42,6 +41,7 @@ #include "mongo/db/commands/server_status_metric.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/dbhelpers.h" +#include "mongo/db/repl/data_replicator_external_state_impl.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplog_interface_local.h" #include "mongo/db/repl/replication_coordinator_global.h" @@ -70,10 +70,40 @@ using std::string; namespace repl { namespace { -const char hashFieldName[] = "h"; -int SleepToAllowBatchingMillis = 2; -const int BatchIsSmallish = 40000; // bytes -const Milliseconds oplogSocketTimeout(30000); +const char kHashFieldName[] = "h"; +const int kSleepToAllowBatchingMillis = 2; +const int kSmallBatchLimitBytes = 40000; +const Milliseconds kOplogSocketTimeout(30000); + +/** + * Extends DataReplicatorExternalStateImpl to be member state aware. + */ +class DataReplicatorExternalStateBackgroundSync : public DataReplicatorExternalStateImpl { +public: + DataReplicatorExternalStateBackgroundSync(ReplicationCoordinator* replicationCoordinator, + BackgroundSync* bgsync); + bool shouldStopFetching(const HostAndPort& source, + const OpTime& sourceOpTime, + bool sourceHasSyncSource) override; + +private: + BackgroundSync* _bgsync; +}; + +DataReplicatorExternalStateBackgroundSync::DataReplicatorExternalStateBackgroundSync( + ReplicationCoordinator* replicationCoordinator, BackgroundSync* bgsync) + : DataReplicatorExternalStateImpl(replicationCoordinator), _bgsync(bgsync) {} + +bool DataReplicatorExternalStateBackgroundSync::shouldStopFetching(const HostAndPort& source, + const OpTime& sourceOpTime, + bool sourceHasSyncSource) { + if (_bgsync->shouldStopFetching()) { + return true; + } + + return DataReplicatorExternalStateImpl::shouldStopFetching( + source, sourceOpTime, sourceHasSyncSource); +} /** * Returns new thread pool for thead pool task executor. @@ -84,35 +114,6 @@ std::unique_ptr<ThreadPool> makeThreadPool() { return stdx::make_unique<ThreadPool>(threadPoolOptions); } -/** - * Checks the criteria for rolling back. - * 'getNextOperation' returns the first result of the oplog tailing query. - * 'lastOpTimeFetched' should be consistent with the predicate in the query. - * Returns RemoteOplogStale if the oplog query has no results. - * Returns OplogStartMissing if we cannot find the timestamp of the last fetched operation in - * the remote oplog. - */ -Status checkRemoteOplogStart(stdx::function<StatusWith<BSONObj>()> getNextOperation, - OpTime lastOpTimeFetched, - long long lastHashFetched) { - auto result = getNextOperation(); - if (!result.isOK()) { - // The GTE query from upstream returns nothing, so we're ahead of the upstream. - return Status(ErrorCodes::RemoteOplogStale, - "we are ahead of the sync source, will try to roll back"); - } - BSONObj o = result.getValue(); - OpTime opTime = fassertStatusOK(28778, OpTime::parseFromOplogEntry(o)); - long long hash = o["h"].numberLong(); - if (opTime != lastOpTimeFetched || hash != lastHashFetched) { - return Status(ErrorCodes::OplogStartMissing, - str::stream() << "our last op time fetched: " << lastOpTimeFetched.toString() - << ". source's GTE: " << opTime.toString() << " hashes: (" - << lastHashFetched << "/" << hash << ")"); - } - return Status::OK(); -} - size_t getSize(const BSONObj& o) { // SERVER-9808 Avoid Fortify complaint about implicit signed->unsigned conversion return static_cast<size_t>(o.objsize()); @@ -155,14 +156,12 @@ BackgroundSync::BackgroundSync() : _buffer(bufferMaxSizeGauge, &getSize), _threadPoolTaskExecutor(makeThreadPool(), executor::makeNetworkInterface("NetworkInterfaceASIO-BGSync")), - _lastOpTimeFetched(Timestamp(std::numeric_limits<int>::max(), 0), - std::numeric_limits<long long>::max()), - _lastFetchedHash(0), - _stopped(true), _replCoord(getGlobalReplicationCoordinator()), + _dataReplicatorExternalState( + stdx::make_unique<DataReplicatorExternalStateBackgroundSync>(_replCoord, this)), _syncSourceResolver(_replCoord), - _initialSyncRequestedFlag(false), - _indexPrefetchConfig(PREFETCH_ALL) {} + _lastOpTimeFetched(Timestamp(std::numeric_limits<int>::max(), 0), + std::numeric_limits<long long>::max()) {} BackgroundSync* BackgroundSync::get() { stdx::unique_lock<stdx::mutex> lock(s_mutex); @@ -179,6 +178,10 @@ void BackgroundSync::shutdown() { invariant(inShutdown()); clearBuffer(); _stopped = true; + + if (_oplogFetcher) { + _oplogFetcher->shutdown(); + } } void BackgroundSync::producerThread() { @@ -334,59 +337,46 @@ void BackgroundSync::_produce(OperationContext* txn) { _replCoord->signalUpstreamUpdater(); } - const auto isV1ElectionProtocol = _replCoord->isV1ElectionProtocol(); - // Under protocol version 1, make the awaitData timeout (maxTimeMS) dependent on the election - // timeout. This enables the sync source to communicate liveness of the primary to secondaries. - // Under protocol version 0, use a default timeout of 2 seconds for awaitData. - const Milliseconds fetcherMaxTimeMS( - isV1ElectionProtocol ? _replCoord->getConfig().getElectionTimeoutPeriod() / 2 : Seconds(2)); - + // "lastFetched" not used. Already set in _enqueueDocuments. Status fetcherReturnStatus = Status::OK(); - auto fetcherCallback = stdx::bind(&BackgroundSync::_fetcherCallback, - this, - stdx::placeholders::_1, - stdx::placeholders::_3, - stdx::cref(source), - lastOpTimeFetched, - lastHashFetched, - fetcherMaxTimeMS, - &fetcherReturnStatus); - - - BSONObjBuilder cmdBob; - cmdBob.append("find", nsToCollectionSubstring(rsOplogName)); - cmdBob.append("filter", BSON("ts" << BSON("$gte" << lastOpTimeFetched.getTimestamp()))); - cmdBob.append("tailable", true); - cmdBob.append("oplogReplay", true); - cmdBob.append("awaitData", true); - cmdBob.append("maxTimeMS", durationCount<Milliseconds>(Minutes(1))); // 1 min initial find. - - BSONObjBuilder metadataBob; - if (isV1ElectionProtocol) { - cmdBob.append("term", _replCoord->getTerm()); - metadataBob.append(rpc::kReplSetMetadataFieldName, 1); - } - - auto dbName = nsToDatabase(rsOplogName); - auto cmdObj = cmdBob.obj(); - auto metadataObj = metadataBob.obj(); - Fetcher fetcher(&_threadPoolTaskExecutor, - source, - dbName, - cmdObj, - fetcherCallback, - metadataObj, - _replCoord->getConfig().getElectionTimeoutPeriod()); - - LOG(1) << "scheduling fetcher to read remote oplog on " << source << " starting at " - << cmdObj["filter"]; - auto scheduleStatus = fetcher.schedule(); + OplogFetcher* oplogFetcher; + try { + auto config = _replCoord->getConfig(); + auto onOplogFetcherShutdownCallbackFn = + [&fetcherReturnStatus](const Status& status, const OpTimeWithHash& lastFetched) { + fetcherReturnStatus = status; + }; + + stdx::lock_guard<stdx::mutex> lock(_mutex); + _oplogFetcher = + stdx::make_unique<OplogFetcher>(&_threadPoolTaskExecutor, + OpTimeWithHash(lastHashFetched, lastOpTimeFetched), + source, + NamespaceString(rsOplogName), + config, + _dataReplicatorExternalState.get(), + stdx::bind(&BackgroundSync::_enqueueDocuments, + this, + stdx::placeholders::_1, + stdx::placeholders::_2, + stdx::placeholders::_3, + stdx::placeholders::_4), + onOplogFetcherShutdownCallbackFn); + oplogFetcher = _oplogFetcher.get(); + } catch (const mongo::DBException& ex) { + fassertFailedWithStatus(34440, exceptionToStatus()); + } + + LOG(1) << "scheduling fetcher to read remote oplog on " << _syncSourceHost << " starting at " + << oplogFetcher->getCommandObject_forTest()["filter"]; + auto scheduleStatus = oplogFetcher->startup(); if (!scheduleStatus.isOK()) { warning() << "unable to schedule fetcher to read remote oplog on " << source << ": " << scheduleStatus; return; } - fetcher.wait(); + + oplogFetcher->join(); LOG(1) << "fetcher stopped reading remote oplog on " << source; // If the background sync is stopped after the fetcher is started, we need to @@ -413,7 +403,7 @@ void BackgroundSync::_produce(OperationContext* txn) { auto getConnection = [&connection, &connectionPool, source]() -> DBClientBase* { if (!connection.get()) { connection.reset(new ConnectionPool::ConnectionPtr( - &connectionPool, source, Date_t::now(), oplogSocketTimeout)); + &connectionPool, source, Date_t::now(), kOplogSocketTimeout)); }; return connection->get(); }; @@ -461,180 +451,22 @@ void BackgroundSync::_produce(OperationContext* txn) { } } -void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& result, - BSONObjBuilder* bob, - const HostAndPort& source, - OpTime lastOpTimeFetched, - long long lastFetchedHash, - Milliseconds fetcherMaxTimeMS, - Status* returnStatus) { - // if target cut connections between connecting and querying (for - // example, because it stepped down) we might not have a cursor - if (!result.isOK()) { - LOG(2) << "Error returned from oplog query: " << result.getStatus(); - *returnStatus = result.getStatus(); - return; - } - - if (inShutdown()) { - LOG(2) << "Interrupted by shutdown while querying oplog. 1"; // 1st instance. - return; - } - - // Check if we have been stopped. - if (isStopped()) { - LOG(2) << "Interrupted by stop request while querying the oplog. 1"; // 1st instance. - return; - } - - const auto& queryResponse = result.getValue(); - bool syncSourceHasSyncSource = false; - OpTime sourcesLastOpTime; - - // Forward metadata (containing liveness information) to replication coordinator. - bool receivedMetadata = - queryResponse.otherFields.metadata.hasElement(rpc::kReplSetMetadataFieldName); - if (receivedMetadata) { - auto metadataResult = - rpc::ReplSetMetadata::readFromMetadata(queryResponse.otherFields.metadata); - if (!metadataResult.isOK()) { - error() << "invalid replication metadata from sync source " << source << ": " - << metadataResult.getStatus() << ": " << queryResponse.otherFields.metadata; - return; - } - const auto& metadata = metadataResult.getValue(); - _replCoord->processReplSetMetadata(metadata); - if (metadata.getPrimaryIndex() != rpc::ReplSetMetadata::kNoPrimary) { - _replCoord->cancelAndRescheduleElectionTimeout(); - } - syncSourceHasSyncSource = metadata.getSyncSourceIndex() != -1; - sourcesLastOpTime = metadata.getLastOpVisible(); - } - - const auto& documents = queryResponse.documents; - auto firstDocToApply = documents.cbegin(); - auto lastDocToApply = documents.cend(); - - if (!documents.empty()) { - LOG(2) << "fetcher read " << documents.size() - << " operations from remote oplog starting at " << documents.front()["ts"] - << " and ending at " << documents.back()["ts"]; - } else { - LOG(2) << "fetcher read 0 operations from remote oplog"; - } - - // Check start of remote oplog and, if necessary, stop fetcher to execute rollback. - if (queryResponse.first) { - auto getNextOperation = [&firstDocToApply, lastDocToApply]() -> StatusWith<BSONObj> { - if (firstDocToApply == lastDocToApply) { - return Status(ErrorCodes::OplogStartMissing, "remote oplog start missing"); - } - return *(firstDocToApply++); - }; - - *returnStatus = checkRemoteOplogStart(getNextOperation, lastOpTimeFetched, lastFetchedHash); - if (!returnStatus->isOK()) { - // Stop fetcher and execute rollback. - return; - } - - // If this is the first batch and no rollback is needed, we should have advanced - // the document iterator. - invariant(firstDocToApply != documents.cbegin()); - } - - // No work to do if we are draining/primary. - if (_replCoord->isWaitingForApplierToDrain() || _replCoord->getMemberState().primary()) { - LOG(2) << "Interrupted by waiting for applier to drain " - << "or becoming primary while querying the oplog. 1"; // 1st instance. - return; - } - - // The count of the bytes of the documents read off the network. - int networkDocumentBytes = 0; - Timestamp lastTS; - { - stdx::unique_lock<stdx::mutex> lock(_mutex); - // If we are stopped then return without queueing this batch to apply. - if (_stopped) { - LOG(2) << "Interrupted by stop request while querying the oplog. 2"; // 2nd instance. - return; - } - lastTS = _lastOpTimeFetched.getTimestamp(); - } - int count = 0; - for (auto&& doc : documents) { - networkDocumentBytes += doc.objsize(); - ++count; - - // If this is the first response (to the $gte query) then we already applied the first doc. - if (queryResponse.first && count == 1) { - continue; - } - - // Check to see if the oplog entry goes back in time for this document. - const auto docOpTime = OpTime::parseFromOplogEntry(doc); - fassertStatusOK(34362, docOpTime.getStatus()); // entries must have a "ts" field. - const auto docTS = docOpTime.getValue().getTimestamp(); - - if (lastTS >= docTS) { - *returnStatus = Status( - ErrorCodes::OplogOutOfOrder, - str::stream() << "Reading the oplog from" << source.toString() - << " returned out of order entries. lastTS: " << lastTS.toString() - << " outOfOrderTS:" << docTS.toString() << " at count:" << count); - return; - } - lastTS = docTS; - } - - // These numbers are for the documents we will apply. - auto toApplyDocumentCount = documents.size(); - auto toApplyDocumentBytes = networkDocumentBytes; - if (queryResponse.first) { - // The count is one less since the first document found was already applied ($gte $ts query) - // and we will not apply it again. We just needed to check it so we didn't rollback, or - // error above. - --toApplyDocumentCount; - const auto alreadyAppliedDocument = documents.cbegin(); - toApplyDocumentBytes -= alreadyAppliedDocument->objsize(); - } - - if (toApplyDocumentBytes > 0) { - // Wait for enough space. - _buffer.waitForSpace(toApplyDocumentBytes); - - OCCASIONALLY { - LOG(2) << "bgsync buffer has " << _buffer.size() << " bytes"; - } - - // Buffer docs for later application. - std::vector<BSONObj> objs{firstDocToApply, lastDocToApply}; - _buffer.pushAllNonBlocking(objs); - - // Inc stats. - opsReadStats.increment(documents.size()); // we read all of the docs in the query. - networkByteStats.increment(networkDocumentBytes); - bufferCountGauge.increment(toApplyDocumentCount); - bufferSizeGauge.increment(toApplyDocumentBytes); - - // Update last fetched info. - auto lastDoc = objs.back(); - { - stdx::unique_lock<stdx::mutex> lock(_mutex); - _lastFetchedHash = lastDoc["h"].numberLong(); - _lastOpTimeFetched = fassertStatusOK(28770, OpTime::parseFromOplogEntry(lastDoc)); - LOG(3) << "batch resetting _lastOpTimeFetched: " << _lastOpTimeFetched; - } - } +void BackgroundSync::_recordStats(const OplogFetcher::DocumentsInfo& info, + Milliseconds getMoreElapsedTime) { + // Inc stats. + // We read all of the docs in the query. + opsReadStats.increment(info.networkDocumentCount); + networkByteStats.increment(info.networkDocumentBytes); + bufferCountGauge.increment(info.toApplyDocumentCount); + bufferSizeGauge.increment(info.toApplyDocumentBytes); // record time for each batch - getmoreReplStats.recordMillis(durationCount<Milliseconds>(queryResponse.elapsedMillis)); + getmoreReplStats.recordMillis(durationCount<Milliseconds>(getMoreElapsedTime)); // Check some things periodically // (whenever we run out of items in the // current cursor batch) - if (networkDocumentBytes > 0 && networkDocumentBytes < BatchIsSmallish) { + if (info.networkDocumentBytes > 0 && info.networkDocumentBytes < kSmallBatchLimitBytes) { // on a very low latency network, if we don't wait a little, we'll be // getting ops to write almost one at a time. this will both be expensive // for the upstream server as well as potentially defeating our parallel @@ -643,47 +475,41 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& // the inference here is basically if the batch is really small, we are // "caught up". // - sleepmillis(SleepToAllowBatchingMillis); + sleepmillis(kSleepToAllowBatchingMillis); } +} - if (inShutdown()) { - LOG(2) << "Interrupted by shutdown while querying oplog. 2"; // 2nd instance. +void BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begin, + Fetcher::Documents::const_iterator end, + const OplogFetcher::DocumentsInfo& info, + Milliseconds getMoreElapsed) { + // If this is the first batch of operations returned from the query, "toApplyDocumentCount" will + // be one fewer than "networkDocumentCount" because the first document (which was applied + // previously) is skipped. + if (info.toApplyDocumentCount == 0) { + _recordStats(info, getMoreElapsed); return; } - // If we are transitioning to primary state, we need to leave - // this loop in order to go into bgsync-stop mode. - if (_replCoord->isWaitingForApplierToDrain() || _replCoord->getMemberState().primary()) { - LOG(2) << "Interrupted by waiting for applier to drain " - << "or becoming primary while querying the oplog. 2"; // 2nd instance. - return; - } + // Wait for enough space. + _buffer.waitForSpace(info.toApplyDocumentBytes); - // re-evaluate quality of sync target - if (getSyncTarget().empty() || - _replCoord->shouldChangeSyncSource(source, sourcesLastOpTime, syncSourceHasSyncSource)) { - LOG(1) << "Cancelling oplog query because we have to choose a sync source. Current source: " - << source << ", OpTime" << sourcesLastOpTime - << ", hasSyncSource:" << syncSourceHasSyncSource; - return; + OCCASIONALLY { + LOG(2) << "bgsync buffer has " << _buffer.size() << " bytes"; } - // Check if we have been stopped. - if (isStopped()) { - LOG(2) << "Interrupted by a stop request while fetching the oplog so starting a new query."; - return; - } + // Buffer docs for later application. + _buffer.pushAllNonBlocking(begin, end); - // We fill in 'bob' to signal the fetcher to process with another getMore, if needed. - if (bob) { - bob->append("getMore", queryResponse.cursorId); - bob->append("collection", queryResponse.nss.coll()); - bob->append("maxTimeMS", durationCount<Milliseconds>(fetcherMaxTimeMS)); - if (receivedMetadata) { - bob->append("term", _replCoord->getTerm()); - _replCoord->getLastCommittedOpTime().append(bob, "lastKnownCommittedOpTime"); - } + // Update last fetched info. + { + stdx::unique_lock<stdx::mutex> lock(_mutex); + _lastFetchedHash = info.lastDocument.value; + _lastOpTimeFetched = info.lastDocument.opTime; + LOG(3) << "batch resetting _lastOpTimeFetched: " << _lastOpTimeFetched; } + + _recordStats(info, getMoreElapsed); } bool BackgroundSync::peek(BSONObj* op) { @@ -743,7 +569,7 @@ void BackgroundSync::_rollback(OperationContext* txn, warning() << "rollback cannot proceed at this time (retrying later): " << status; } -HostAndPort BackgroundSync::getSyncTarget() { +HostAndPort BackgroundSync::getSyncTarget() const { stdx::unique_lock<stdx::mutex> lock(_mutex); return _syncSourceHost; } @@ -755,6 +581,11 @@ void BackgroundSync::clearSyncTarget() { void BackgroundSync::cancelFetcher() { _threadPoolTaskExecutor.cancelAllCommands(); + + stdx::lock_guard<stdx::mutex> lock(_mutex); + if (_oplogFetcher) { + _oplogFetcher->shutdown(); + } } void BackgroundSync::stop() { @@ -764,6 +595,10 @@ void BackgroundSync::stop() { _syncSourceHost = HostAndPort(); _lastOpTimeFetched = OpTime(); _lastFetchedHash = 0; + + if (_oplogFetcher) { + _oplogFetcher->shutdown(); + } } void BackgroundSync::start(OperationContext* txn) { @@ -811,22 +646,17 @@ long long BackgroundSync::_readLastAppliedHash(OperationContext* txn) { severe() << "Problem reading " << rsOplogName << ": " << ex.toStatus(); fassertFailed(18904); } - BSONElement hashElement = oplogEntry[hashFieldName]; - if (hashElement.eoo()) { - severe() << "Most recent entry in " << rsOplogName << " missing \"" << hashFieldName - << "\" field. Oplog entry: " << oplogEntry; - + long long hash; + auto status = bsonExtractIntegerField(oplogEntry, kHashFieldName, &hash); + if (!status.isOK()) { + severe() << "Most recent entry in " << rsOplogName << " is missing or has invalid \"" + << kHashFieldName << "\" field. Oplog entry: " << oplogEntry << ": " << status; fassertFailed(18902); } - if (hashElement.type() != NumberLong) { - severe() << "Expected type of \"" << hashFieldName << "\" in most recent " << rsOplogName - << " entry to have type NumberLong, but found " << typeName(hashElement.type()); - fassertFailed(18903); - } - return hashElement.safeNumberLong(); + return hash; } -bool BackgroundSync::getInitialSyncRequestedFlag() { +bool BackgroundSync::getInitialSyncRequestedFlag() const { stdx::lock_guard<stdx::mutex> lock(_initialSyncMutex); return _initialSyncRequestedFlag; } @@ -836,12 +666,54 @@ void BackgroundSync::setInitialSyncRequestedFlag(bool value) { _initialSyncRequestedFlag = value; } +BackgroundSync::IndexPrefetchConfig BackgroundSync::getIndexPrefetchConfig() const { + stdx::lock_guard<stdx::mutex> lock(_indexPrefetchMutex); + return _indexPrefetchConfig; +} + +void BackgroundSync::setIndexPrefetchConfig(const IndexPrefetchConfig cfg) { + stdx::lock_guard<stdx::mutex> lock(_indexPrefetchMutex); + _indexPrefetchConfig = cfg; +} + +bool BackgroundSync::shouldStopFetching() const { + if (inShutdown()) { + LOG(2) << "Interrupted by shutdown while checking sync source."; + return true; + } + + // If we are transitioning to primary state, we need to stop fetching in order to go into + // bgsync-stop mode. + if (_replCoord->isWaitingForApplierToDrain()) { + LOG(2) << "Interrupted by waiting for applier to drain while checking sync source."; + return true; + } + + if (_replCoord->getMemberState().primary()) { + LOG(2) << "Interrupted by becoming primary while checking sync source."; + return true; + } + + // Check if we have been stopped. + if (isStopped()) { + LOG(2) << "Interrupted by a stop request while checking sync source."; + return true; + } + + // Check current sync target. + if (getSyncTarget().empty()) { + LOG(1) << "Canceling oplog query because we have no valid sync source."; + return true; + } + + return false; +} + void BackgroundSync::pushTestOpToBuffer(const BSONObj& op) { _buffer.push(op); bufferCountGauge.increment(); bufferSizeGauge.increment(op.objsize()); } - } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h index 0f9619f7280..f9291c91b44 100644 --- a/src/mongo/db/repl/bgsync.h +++ b/src/mongo/db/repl/bgsync.h @@ -28,9 +28,12 @@ #pragma once +#include <memory> + #include "mongo/base/status_with.h" -#include "mongo/client/fetcher.h" #include "mongo/db/jsobj.h" +#include "mongo/db/repl/data_replicator_external_state.h" +#include "mongo/db/repl/oplog_fetcher.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/sync_source_resolver.h" #include "mongo/executor/thread_pool_task_executor.h" @@ -104,7 +107,7 @@ public: // starts the sync target notifying thread void notifierThread(); - HostAndPort getSyncTarget(); + HostAndPort getSyncTarget() const; // Interface implementation @@ -124,46 +127,25 @@ public: */ void cancelFetcher(); - bool getInitialSyncRequestedFlag(); + bool getInitialSyncRequestedFlag() const; void setInitialSyncRequestedFlag(bool value); - void setIndexPrefetchConfig(const IndexPrefetchConfig cfg) { - _indexPrefetchConfig = cfg; - } - - IndexPrefetchConfig getIndexPrefetchConfig() { - return _indexPrefetchConfig; - } + IndexPrefetchConfig getIndexPrefetchConfig() const; + void setIndexPrefetchConfig(const IndexPrefetchConfig cfg); + /** + * Returns true if any of the following is true: + * 1) We are shutting down; + * 2) We are primary; + * 3) We are in drain mode; or + * 4) We are stopped. + */ + bool shouldStopFetching() const; // Testing related stuff void pushTestOpToBuffer(const BSONObj& op); private: - static BackgroundSync* s_instance; - // protects creation of s_instance - static stdx::mutex s_mutex; - - // Production thread - BlockingQueue<BSONObj> _buffer; - - // Task executor used to run find/getMore commands on sync source. - executor::ThreadPoolTaskExecutor _threadPoolTaskExecutor; - - // _mutex protects all of the class variables except _buffer - mutable stdx::mutex _mutex; - - OpTime _lastOpTimeFetched; - - // lastFetchedHash is used to match ops to determine if we need to rollback, when - // a secondary. - long long _lastFetchedHash; - - // if producer thread should not be running - bool _stopped; - - HostAndPort _syncSourceHost; - BackgroundSync(); BackgroundSync(const BackgroundSync& s); BackgroundSync operator=(const BackgroundSync& s); @@ -181,15 +163,18 @@ private: void _signalNoNewDataForApplier(); /** - * Processes query responses from fetcher. + * Record metrics. */ - void _fetcherCallback(const StatusWith<Fetcher::QueryResponse>& result, - BSONObjBuilder* bob, - const HostAndPort& source, - OpTime lastOpTimeFetched, - long long lastFetchedHash, - Milliseconds fetcherMaxTimeMS, - Status* returnStatus); + void _recordStats(const OplogFetcher::DocumentsInfo& info, Milliseconds getMoreElapsedTime); + + /** + * Checks current background sync state before pushing operations into blocking queue and + * updating metrics. If the queue is full, might block. + */ + void _enqueueDocuments(Fetcher::Documents::const_iterator begin, + Fetcher::Documents::const_iterator end, + const OplogFetcher::DocumentsInfo& info, + Milliseconds elapsed); /** * Executes a rollback. @@ -204,20 +189,53 @@ private: long long _readLastAppliedHash(OperationContext* txn); + static BackgroundSync* s_instance; + // protects creation of s_instance + static stdx::mutex s_mutex; + + // Production thread + BlockingQueue<BSONObj> _buffer; + + // Task executor used to run find/getMore commands on sync source. + executor::ThreadPoolTaskExecutor _threadPoolTaskExecutor; + + // bool for indicating resync need on this node and the mutex that protects it + // The resync command sets this flag; the Applier thread observes and clears it. + mutable stdx::mutex _initialSyncMutex; + bool _initialSyncRequestedFlag = false; + + // This setting affects the Applier prefetcher behavior. + mutable stdx::mutex _indexPrefetchMutex; + IndexPrefetchConfig _indexPrefetchConfig = PREFETCH_ALL; + // A pointer to the replication coordinator running the show. ReplicationCoordinator* _replCoord; + // Data replicator external state required by the oplog fetcher. + // Owned by us. + std::unique_ptr<DataReplicatorExternalState> _dataReplicatorExternalState; + // Used to determine sync source. // TODO(dannenberg) move into DataReplicator. SyncSourceResolver _syncSourceResolver; - // bool for indicating resync need on this node and the mutex that protects it - // The resync command sets this flag; the Applier thread observes and clears it. - bool _initialSyncRequestedFlag; - stdx::mutex _initialSyncMutex; + // _mutex protects all of the class variables declared below. + mutable stdx::mutex _mutex; - // This setting affects the Applier prefetcher behavior. - IndexPrefetchConfig _indexPrefetchConfig; + OpTime _lastOpTimeFetched; + + // lastFetchedHash is used to match ops to determine if we need to rollback, when + // a secondary. + long long _lastFetchedHash = 0LL; + + // if producer thread should not be running + bool _stopped = true; + + HostAndPort _syncSourceHost; + + // Current oplog fetcher tailing the oplog on the sync source. + // Owned by us. + std::unique_ptr<OplogFetcher> _oplogFetcher; }; |