summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2016-03-25 17:09:44 -0400
committerBenety Goh <benety@mongodb.com>2016-04-26 14:37:44 -0400
commit821c605f9aff83958e1c584a82d691b725e9adb6 (patch)
treec9b734aece99811140d95939933830da6f70d7e4
parent66c992d4b93f0c9e0fbe400ccf95344ec9437823 (diff)
downloadmongo-821c605f9aff83958e1c584a82d691b725e9adb6.tar.gz
SERVER-22775 integrated OplogFetcher into BackgroundSync
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/bgsync.cpp476
-rw-r--r--src/mongo/db/repl/bgsync.h114
3 files changed, 241 insertions, 350 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 7552460f33f..8550b6560ba 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -8,6 +8,7 @@ env.Library(
'bgsync.cpp',
],
LIBDEPS=[
+ 'data_replicator_external_state_impl',
'repl_coordinator_interface',
'rollback_source_impl',
'rs_rollback',
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index cd25fbe664f..dfe7b287654 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -32,9 +32,8 @@
#include "mongo/db/repl/bgsync.h"
-#include <memory>
-
#include "mongo/base/counter.h"
+#include "mongo/bson/util/bson_extract.h"
#include "mongo/client/connection_pool.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/client.h"
@@ -42,6 +41,7 @@
#include "mongo/db/commands/server_status_metric.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/dbhelpers.h"
+#include "mongo/db/repl/data_replicator_external_state_impl.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_interface_local.h"
#include "mongo/db/repl/replication_coordinator_global.h"
@@ -70,10 +70,40 @@ using std::string;
namespace repl {
namespace {
-const char hashFieldName[] = "h";
-int SleepToAllowBatchingMillis = 2;
-const int BatchIsSmallish = 40000; // bytes
-const Milliseconds oplogSocketTimeout(30000);
+const char kHashFieldName[] = "h";
+const int kSleepToAllowBatchingMillis = 2;
+const int kSmallBatchLimitBytes = 40000;
+const Milliseconds kOplogSocketTimeout(30000);
+
+/**
+ * Extends DataReplicatorExternalStateImpl to be member state aware.
+ */
+class DataReplicatorExternalStateBackgroundSync : public DataReplicatorExternalStateImpl {
+public:
+ DataReplicatorExternalStateBackgroundSync(ReplicationCoordinator* replicationCoordinator,
+ BackgroundSync* bgsync);
+ bool shouldStopFetching(const HostAndPort& source,
+ const OpTime& sourceOpTime,
+ bool sourceHasSyncSource) override;
+
+private:
+ BackgroundSync* _bgsync;
+};
+
+DataReplicatorExternalStateBackgroundSync::DataReplicatorExternalStateBackgroundSync(
+ ReplicationCoordinator* replicationCoordinator, BackgroundSync* bgsync)
+ : DataReplicatorExternalStateImpl(replicationCoordinator), _bgsync(bgsync) {}
+
+bool DataReplicatorExternalStateBackgroundSync::shouldStopFetching(const HostAndPort& source,
+ const OpTime& sourceOpTime,
+ bool sourceHasSyncSource) {
+ if (_bgsync->shouldStopFetching()) {
+ return true;
+ }
+
+ return DataReplicatorExternalStateImpl::shouldStopFetching(
+ source, sourceOpTime, sourceHasSyncSource);
+}
/**
* Returns new thread pool for thead pool task executor.
@@ -84,35 +114,6 @@ std::unique_ptr<ThreadPool> makeThreadPool() {
return stdx::make_unique<ThreadPool>(threadPoolOptions);
}
-/**
- * Checks the criteria for rolling back.
- * 'getNextOperation' returns the first result of the oplog tailing query.
- * 'lastOpTimeFetched' should be consistent with the predicate in the query.
- * Returns RemoteOplogStale if the oplog query has no results.
- * Returns OplogStartMissing if we cannot find the timestamp of the last fetched operation in
- * the remote oplog.
- */
-Status checkRemoteOplogStart(stdx::function<StatusWith<BSONObj>()> getNextOperation,
- OpTime lastOpTimeFetched,
- long long lastHashFetched) {
- auto result = getNextOperation();
- if (!result.isOK()) {
- // The GTE query from upstream returns nothing, so we're ahead of the upstream.
- return Status(ErrorCodes::RemoteOplogStale,
- "we are ahead of the sync source, will try to roll back");
- }
- BSONObj o = result.getValue();
- OpTime opTime = fassertStatusOK(28778, OpTime::parseFromOplogEntry(o));
- long long hash = o["h"].numberLong();
- if (opTime != lastOpTimeFetched || hash != lastHashFetched) {
- return Status(ErrorCodes::OplogStartMissing,
- str::stream() << "our last op time fetched: " << lastOpTimeFetched.toString()
- << ". source's GTE: " << opTime.toString() << " hashes: ("
- << lastHashFetched << "/" << hash << ")");
- }
- return Status::OK();
-}
-
size_t getSize(const BSONObj& o) {
// SERVER-9808 Avoid Fortify complaint about implicit signed->unsigned conversion
return static_cast<size_t>(o.objsize());
@@ -155,14 +156,12 @@ BackgroundSync::BackgroundSync()
: _buffer(bufferMaxSizeGauge, &getSize),
_threadPoolTaskExecutor(makeThreadPool(),
executor::makeNetworkInterface("NetworkInterfaceASIO-BGSync")),
- _lastOpTimeFetched(Timestamp(std::numeric_limits<int>::max(), 0),
- std::numeric_limits<long long>::max()),
- _lastFetchedHash(0),
- _stopped(true),
_replCoord(getGlobalReplicationCoordinator()),
+ _dataReplicatorExternalState(
+ stdx::make_unique<DataReplicatorExternalStateBackgroundSync>(_replCoord, this)),
_syncSourceResolver(_replCoord),
- _initialSyncRequestedFlag(false),
- _indexPrefetchConfig(PREFETCH_ALL) {}
+ _lastOpTimeFetched(Timestamp(std::numeric_limits<int>::max(), 0),
+ std::numeric_limits<long long>::max()) {}
BackgroundSync* BackgroundSync::get() {
stdx::unique_lock<stdx::mutex> lock(s_mutex);
@@ -179,6 +178,10 @@ void BackgroundSync::shutdown() {
invariant(inShutdown());
clearBuffer();
_stopped = true;
+
+ if (_oplogFetcher) {
+ _oplogFetcher->shutdown();
+ }
}
void BackgroundSync::producerThread() {
@@ -334,59 +337,46 @@ void BackgroundSync::_produce(OperationContext* txn) {
_replCoord->signalUpstreamUpdater();
}
- const auto isV1ElectionProtocol = _replCoord->isV1ElectionProtocol();
- // Under protocol version 1, make the awaitData timeout (maxTimeMS) dependent on the election
- // timeout. This enables the sync source to communicate liveness of the primary to secondaries.
- // Under protocol version 0, use a default timeout of 2 seconds for awaitData.
- const Milliseconds fetcherMaxTimeMS(
- isV1ElectionProtocol ? _replCoord->getConfig().getElectionTimeoutPeriod() / 2 : Seconds(2));
-
+ // "lastFetched" not used. Already set in _enqueueDocuments.
Status fetcherReturnStatus = Status::OK();
- auto fetcherCallback = stdx::bind(&BackgroundSync::_fetcherCallback,
- this,
- stdx::placeholders::_1,
- stdx::placeholders::_3,
- stdx::cref(source),
- lastOpTimeFetched,
- lastHashFetched,
- fetcherMaxTimeMS,
- &fetcherReturnStatus);
-
-
- BSONObjBuilder cmdBob;
- cmdBob.append("find", nsToCollectionSubstring(rsOplogName));
- cmdBob.append("filter", BSON("ts" << BSON("$gte" << lastOpTimeFetched.getTimestamp())));
- cmdBob.append("tailable", true);
- cmdBob.append("oplogReplay", true);
- cmdBob.append("awaitData", true);
- cmdBob.append("maxTimeMS", durationCount<Milliseconds>(Minutes(1))); // 1 min initial find.
-
- BSONObjBuilder metadataBob;
- if (isV1ElectionProtocol) {
- cmdBob.append("term", _replCoord->getTerm());
- metadataBob.append(rpc::kReplSetMetadataFieldName, 1);
- }
-
- auto dbName = nsToDatabase(rsOplogName);
- auto cmdObj = cmdBob.obj();
- auto metadataObj = metadataBob.obj();
- Fetcher fetcher(&_threadPoolTaskExecutor,
- source,
- dbName,
- cmdObj,
- fetcherCallback,
- metadataObj,
- _replCoord->getConfig().getElectionTimeoutPeriod());
-
- LOG(1) << "scheduling fetcher to read remote oplog on " << source << " starting at "
- << cmdObj["filter"];
- auto scheduleStatus = fetcher.schedule();
+ OplogFetcher* oplogFetcher;
+ try {
+ auto config = _replCoord->getConfig();
+ auto onOplogFetcherShutdownCallbackFn =
+ [&fetcherReturnStatus](const Status& status, const OpTimeWithHash& lastFetched) {
+ fetcherReturnStatus = status;
+ };
+
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ _oplogFetcher =
+ stdx::make_unique<OplogFetcher>(&_threadPoolTaskExecutor,
+ OpTimeWithHash(lastHashFetched, lastOpTimeFetched),
+ source,
+ NamespaceString(rsOplogName),
+ config,
+ _dataReplicatorExternalState.get(),
+ stdx::bind(&BackgroundSync::_enqueueDocuments,
+ this,
+ stdx::placeholders::_1,
+ stdx::placeholders::_2,
+ stdx::placeholders::_3,
+ stdx::placeholders::_4),
+ onOplogFetcherShutdownCallbackFn);
+ oplogFetcher = _oplogFetcher.get();
+ } catch (const mongo::DBException& ex) {
+ fassertFailedWithStatus(34440, exceptionToStatus());
+ }
+
+ LOG(1) << "scheduling fetcher to read remote oplog on " << _syncSourceHost << " starting at "
+ << oplogFetcher->getCommandObject_forTest()["filter"];
+ auto scheduleStatus = oplogFetcher->startup();
if (!scheduleStatus.isOK()) {
warning() << "unable to schedule fetcher to read remote oplog on " << source << ": "
<< scheduleStatus;
return;
}
- fetcher.wait();
+
+ oplogFetcher->join();
LOG(1) << "fetcher stopped reading remote oplog on " << source;
// If the background sync is stopped after the fetcher is started, we need to
@@ -413,7 +403,7 @@ void BackgroundSync::_produce(OperationContext* txn) {
auto getConnection = [&connection, &connectionPool, source]() -> DBClientBase* {
if (!connection.get()) {
connection.reset(new ConnectionPool::ConnectionPtr(
- &connectionPool, source, Date_t::now(), oplogSocketTimeout));
+ &connectionPool, source, Date_t::now(), kOplogSocketTimeout));
};
return connection->get();
};
@@ -461,180 +451,22 @@ void BackgroundSync::_produce(OperationContext* txn) {
}
}
-void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& result,
- BSONObjBuilder* bob,
- const HostAndPort& source,
- OpTime lastOpTimeFetched,
- long long lastFetchedHash,
- Milliseconds fetcherMaxTimeMS,
- Status* returnStatus) {
- // if target cut connections between connecting and querying (for
- // example, because it stepped down) we might not have a cursor
- if (!result.isOK()) {
- LOG(2) << "Error returned from oplog query: " << result.getStatus();
- *returnStatus = result.getStatus();
- return;
- }
-
- if (inShutdown()) {
- LOG(2) << "Interrupted by shutdown while querying oplog. 1"; // 1st instance.
- return;
- }
-
- // Check if we have been stopped.
- if (isStopped()) {
- LOG(2) << "Interrupted by stop request while querying the oplog. 1"; // 1st instance.
- return;
- }
-
- const auto& queryResponse = result.getValue();
- bool syncSourceHasSyncSource = false;
- OpTime sourcesLastOpTime;
-
- // Forward metadata (containing liveness information) to replication coordinator.
- bool receivedMetadata =
- queryResponse.otherFields.metadata.hasElement(rpc::kReplSetMetadataFieldName);
- if (receivedMetadata) {
- auto metadataResult =
- rpc::ReplSetMetadata::readFromMetadata(queryResponse.otherFields.metadata);
- if (!metadataResult.isOK()) {
- error() << "invalid replication metadata from sync source " << source << ": "
- << metadataResult.getStatus() << ": " << queryResponse.otherFields.metadata;
- return;
- }
- const auto& metadata = metadataResult.getValue();
- _replCoord->processReplSetMetadata(metadata);
- if (metadata.getPrimaryIndex() != rpc::ReplSetMetadata::kNoPrimary) {
- _replCoord->cancelAndRescheduleElectionTimeout();
- }
- syncSourceHasSyncSource = metadata.getSyncSourceIndex() != -1;
- sourcesLastOpTime = metadata.getLastOpVisible();
- }
-
- const auto& documents = queryResponse.documents;
- auto firstDocToApply = documents.cbegin();
- auto lastDocToApply = documents.cend();
-
- if (!documents.empty()) {
- LOG(2) << "fetcher read " << documents.size()
- << " operations from remote oplog starting at " << documents.front()["ts"]
- << " and ending at " << documents.back()["ts"];
- } else {
- LOG(2) << "fetcher read 0 operations from remote oplog";
- }
-
- // Check start of remote oplog and, if necessary, stop fetcher to execute rollback.
- if (queryResponse.first) {
- auto getNextOperation = [&firstDocToApply, lastDocToApply]() -> StatusWith<BSONObj> {
- if (firstDocToApply == lastDocToApply) {
- return Status(ErrorCodes::OplogStartMissing, "remote oplog start missing");
- }
- return *(firstDocToApply++);
- };
-
- *returnStatus = checkRemoteOplogStart(getNextOperation, lastOpTimeFetched, lastFetchedHash);
- if (!returnStatus->isOK()) {
- // Stop fetcher and execute rollback.
- return;
- }
-
- // If this is the first batch and no rollback is needed, we should have advanced
- // the document iterator.
- invariant(firstDocToApply != documents.cbegin());
- }
-
- // No work to do if we are draining/primary.
- if (_replCoord->isWaitingForApplierToDrain() || _replCoord->getMemberState().primary()) {
- LOG(2) << "Interrupted by waiting for applier to drain "
- << "or becoming primary while querying the oplog. 1"; // 1st instance.
- return;
- }
-
- // The count of the bytes of the documents read off the network.
- int networkDocumentBytes = 0;
- Timestamp lastTS;
- {
- stdx::unique_lock<stdx::mutex> lock(_mutex);
- // If we are stopped then return without queueing this batch to apply.
- if (_stopped) {
- LOG(2) << "Interrupted by stop request while querying the oplog. 2"; // 2nd instance.
- return;
- }
- lastTS = _lastOpTimeFetched.getTimestamp();
- }
- int count = 0;
- for (auto&& doc : documents) {
- networkDocumentBytes += doc.objsize();
- ++count;
-
- // If this is the first response (to the $gte query) then we already applied the first doc.
- if (queryResponse.first && count == 1) {
- continue;
- }
-
- // Check to see if the oplog entry goes back in time for this document.
- const auto docOpTime = OpTime::parseFromOplogEntry(doc);
- fassertStatusOK(34362, docOpTime.getStatus()); // entries must have a "ts" field.
- const auto docTS = docOpTime.getValue().getTimestamp();
-
- if (lastTS >= docTS) {
- *returnStatus = Status(
- ErrorCodes::OplogOutOfOrder,
- str::stream() << "Reading the oplog from" << source.toString()
- << " returned out of order entries. lastTS: " << lastTS.toString()
- << " outOfOrderTS:" << docTS.toString() << " at count:" << count);
- return;
- }
- lastTS = docTS;
- }
-
- // These numbers are for the documents we will apply.
- auto toApplyDocumentCount = documents.size();
- auto toApplyDocumentBytes = networkDocumentBytes;
- if (queryResponse.first) {
- // The count is one less since the first document found was already applied ($gte $ts query)
- // and we will not apply it again. We just needed to check it so we didn't rollback, or
- // error above.
- --toApplyDocumentCount;
- const auto alreadyAppliedDocument = documents.cbegin();
- toApplyDocumentBytes -= alreadyAppliedDocument->objsize();
- }
-
- if (toApplyDocumentBytes > 0) {
- // Wait for enough space.
- _buffer.waitForSpace(toApplyDocumentBytes);
-
- OCCASIONALLY {
- LOG(2) << "bgsync buffer has " << _buffer.size() << " bytes";
- }
-
- // Buffer docs for later application.
- std::vector<BSONObj> objs{firstDocToApply, lastDocToApply};
- _buffer.pushAllNonBlocking(objs);
-
- // Inc stats.
- opsReadStats.increment(documents.size()); // we read all of the docs in the query.
- networkByteStats.increment(networkDocumentBytes);
- bufferCountGauge.increment(toApplyDocumentCount);
- bufferSizeGauge.increment(toApplyDocumentBytes);
-
- // Update last fetched info.
- auto lastDoc = objs.back();
- {
- stdx::unique_lock<stdx::mutex> lock(_mutex);
- _lastFetchedHash = lastDoc["h"].numberLong();
- _lastOpTimeFetched = fassertStatusOK(28770, OpTime::parseFromOplogEntry(lastDoc));
- LOG(3) << "batch resetting _lastOpTimeFetched: " << _lastOpTimeFetched;
- }
- }
+void BackgroundSync::_recordStats(const OplogFetcher::DocumentsInfo& info,
+ Milliseconds getMoreElapsedTime) {
+ // Inc stats.
+ // We read all of the docs in the query.
+ opsReadStats.increment(info.networkDocumentCount);
+ networkByteStats.increment(info.networkDocumentBytes);
+ bufferCountGauge.increment(info.toApplyDocumentCount);
+ bufferSizeGauge.increment(info.toApplyDocumentBytes);
// record time for each batch
- getmoreReplStats.recordMillis(durationCount<Milliseconds>(queryResponse.elapsedMillis));
+ getmoreReplStats.recordMillis(durationCount<Milliseconds>(getMoreElapsedTime));
// Check some things periodically
// (whenever we run out of items in the
// current cursor batch)
- if (networkDocumentBytes > 0 && networkDocumentBytes < BatchIsSmallish) {
+ if (info.networkDocumentBytes > 0 && info.networkDocumentBytes < kSmallBatchLimitBytes) {
// on a very low latency network, if we don't wait a little, we'll be
// getting ops to write almost one at a time. this will both be expensive
// for the upstream server as well as potentially defeating our parallel
@@ -643,47 +475,41 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>&
// the inference here is basically if the batch is really small, we are
// "caught up".
//
- sleepmillis(SleepToAllowBatchingMillis);
+ sleepmillis(kSleepToAllowBatchingMillis);
}
+}
- if (inShutdown()) {
- LOG(2) << "Interrupted by shutdown while querying oplog. 2"; // 2nd instance.
+void BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begin,
+ Fetcher::Documents::const_iterator end,
+ const OplogFetcher::DocumentsInfo& info,
+ Milliseconds getMoreElapsed) {
+ // If this is the first batch of operations returned from the query, "toApplyDocumentCount" will
+ // be one fewer than "networkDocumentCount" because the first document (which was applied
+ // previously) is skipped.
+ if (info.toApplyDocumentCount == 0) {
+ _recordStats(info, getMoreElapsed);
return;
}
- // If we are transitioning to primary state, we need to leave
- // this loop in order to go into bgsync-stop mode.
- if (_replCoord->isWaitingForApplierToDrain() || _replCoord->getMemberState().primary()) {
- LOG(2) << "Interrupted by waiting for applier to drain "
- << "or becoming primary while querying the oplog. 2"; // 2nd instance.
- return;
- }
+ // Wait for enough space.
+ _buffer.waitForSpace(info.toApplyDocumentBytes);
- // re-evaluate quality of sync target
- if (getSyncTarget().empty() ||
- _replCoord->shouldChangeSyncSource(source, sourcesLastOpTime, syncSourceHasSyncSource)) {
- LOG(1) << "Cancelling oplog query because we have to choose a sync source. Current source: "
- << source << ", OpTime" << sourcesLastOpTime
- << ", hasSyncSource:" << syncSourceHasSyncSource;
- return;
+ OCCASIONALLY {
+ LOG(2) << "bgsync buffer has " << _buffer.size() << " bytes";
}
- // Check if we have been stopped.
- if (isStopped()) {
- LOG(2) << "Interrupted by a stop request while fetching the oplog so starting a new query.";
- return;
- }
+ // Buffer docs for later application.
+ _buffer.pushAllNonBlocking(begin, end);
- // We fill in 'bob' to signal the fetcher to process with another getMore, if needed.
- if (bob) {
- bob->append("getMore", queryResponse.cursorId);
- bob->append("collection", queryResponse.nss.coll());
- bob->append("maxTimeMS", durationCount<Milliseconds>(fetcherMaxTimeMS));
- if (receivedMetadata) {
- bob->append("term", _replCoord->getTerm());
- _replCoord->getLastCommittedOpTime().append(bob, "lastKnownCommittedOpTime");
- }
+ // Update last fetched info.
+ {
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+ _lastFetchedHash = info.lastDocument.value;
+ _lastOpTimeFetched = info.lastDocument.opTime;
+ LOG(3) << "batch resetting _lastOpTimeFetched: " << _lastOpTimeFetched;
}
+
+ _recordStats(info, getMoreElapsed);
}
bool BackgroundSync::peek(BSONObj* op) {
@@ -743,7 +569,7 @@ void BackgroundSync::_rollback(OperationContext* txn,
warning() << "rollback cannot proceed at this time (retrying later): " << status;
}
-HostAndPort BackgroundSync::getSyncTarget() {
+HostAndPort BackgroundSync::getSyncTarget() const {
stdx::unique_lock<stdx::mutex> lock(_mutex);
return _syncSourceHost;
}
@@ -755,6 +581,11 @@ void BackgroundSync::clearSyncTarget() {
void BackgroundSync::cancelFetcher() {
_threadPoolTaskExecutor.cancelAllCommands();
+
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ if (_oplogFetcher) {
+ _oplogFetcher->shutdown();
+ }
}
void BackgroundSync::stop() {
@@ -764,6 +595,10 @@ void BackgroundSync::stop() {
_syncSourceHost = HostAndPort();
_lastOpTimeFetched = OpTime();
_lastFetchedHash = 0;
+
+ if (_oplogFetcher) {
+ _oplogFetcher->shutdown();
+ }
}
void BackgroundSync::start(OperationContext* txn) {
@@ -811,22 +646,17 @@ long long BackgroundSync::_readLastAppliedHash(OperationContext* txn) {
severe() << "Problem reading " << rsOplogName << ": " << ex.toStatus();
fassertFailed(18904);
}
- BSONElement hashElement = oplogEntry[hashFieldName];
- if (hashElement.eoo()) {
- severe() << "Most recent entry in " << rsOplogName << " missing \"" << hashFieldName
- << "\" field. Oplog entry: " << oplogEntry;
-
+ long long hash;
+ auto status = bsonExtractIntegerField(oplogEntry, kHashFieldName, &hash);
+ if (!status.isOK()) {
+ severe() << "Most recent entry in " << rsOplogName << " is missing or has invalid \""
+ << kHashFieldName << "\" field. Oplog entry: " << oplogEntry << ": " << status;
fassertFailed(18902);
}
- if (hashElement.type() != NumberLong) {
- severe() << "Expected type of \"" << hashFieldName << "\" in most recent " << rsOplogName
- << " entry to have type NumberLong, but found " << typeName(hashElement.type());
- fassertFailed(18903);
- }
- return hashElement.safeNumberLong();
+ return hash;
}
-bool BackgroundSync::getInitialSyncRequestedFlag() {
+bool BackgroundSync::getInitialSyncRequestedFlag() const {
stdx::lock_guard<stdx::mutex> lock(_initialSyncMutex);
return _initialSyncRequestedFlag;
}
@@ -836,12 +666,54 @@ void BackgroundSync::setInitialSyncRequestedFlag(bool value) {
_initialSyncRequestedFlag = value;
}
+BackgroundSync::IndexPrefetchConfig BackgroundSync::getIndexPrefetchConfig() const {
+ stdx::lock_guard<stdx::mutex> lock(_indexPrefetchMutex);
+ return _indexPrefetchConfig;
+}
+
+void BackgroundSync::setIndexPrefetchConfig(const IndexPrefetchConfig cfg) {
+ stdx::lock_guard<stdx::mutex> lock(_indexPrefetchMutex);
+ _indexPrefetchConfig = cfg;
+}
+
+bool BackgroundSync::shouldStopFetching() const {
+ if (inShutdown()) {
+ LOG(2) << "Interrupted by shutdown while checking sync source.";
+ return true;
+ }
+
+ // If we are transitioning to primary state, we need to stop fetching in order to go into
+ // bgsync-stop mode.
+ if (_replCoord->isWaitingForApplierToDrain()) {
+ LOG(2) << "Interrupted by waiting for applier to drain while checking sync source.";
+ return true;
+ }
+
+ if (_replCoord->getMemberState().primary()) {
+ LOG(2) << "Interrupted by becoming primary while checking sync source.";
+ return true;
+ }
+
+ // Check if we have been stopped.
+ if (isStopped()) {
+ LOG(2) << "Interrupted by a stop request while checking sync source.";
+ return true;
+ }
+
+ // Check current sync target.
+ if (getSyncTarget().empty()) {
+ LOG(1) << "Canceling oplog query because we have no valid sync source.";
+ return true;
+ }
+
+ return false;
+}
+
void BackgroundSync::pushTestOpToBuffer(const BSONObj& op) {
_buffer.push(op);
bufferCountGauge.increment();
bufferSizeGauge.increment(op.objsize());
}
-
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h
index 0f9619f7280..f9291c91b44 100644
--- a/src/mongo/db/repl/bgsync.h
+++ b/src/mongo/db/repl/bgsync.h
@@ -28,9 +28,12 @@
#pragma once
+#include <memory>
+
#include "mongo/base/status_with.h"
-#include "mongo/client/fetcher.h"
#include "mongo/db/jsobj.h"
+#include "mongo/db/repl/data_replicator_external_state.h"
+#include "mongo/db/repl/oplog_fetcher.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/sync_source_resolver.h"
#include "mongo/executor/thread_pool_task_executor.h"
@@ -104,7 +107,7 @@ public:
// starts the sync target notifying thread
void notifierThread();
- HostAndPort getSyncTarget();
+ HostAndPort getSyncTarget() const;
// Interface implementation
@@ -124,46 +127,25 @@ public:
*/
void cancelFetcher();
- bool getInitialSyncRequestedFlag();
+ bool getInitialSyncRequestedFlag() const;
void setInitialSyncRequestedFlag(bool value);
- void setIndexPrefetchConfig(const IndexPrefetchConfig cfg) {
- _indexPrefetchConfig = cfg;
- }
-
- IndexPrefetchConfig getIndexPrefetchConfig() {
- return _indexPrefetchConfig;
- }
+ IndexPrefetchConfig getIndexPrefetchConfig() const;
+ void setIndexPrefetchConfig(const IndexPrefetchConfig cfg);
+ /**
+ * Returns true if any of the following is true:
+ * 1) We are shutting down;
+ * 2) We are primary;
+ * 3) We are in drain mode; or
+ * 4) We are stopped.
+ */
+ bool shouldStopFetching() const;
// Testing related stuff
void pushTestOpToBuffer(const BSONObj& op);
private:
- static BackgroundSync* s_instance;
- // protects creation of s_instance
- static stdx::mutex s_mutex;
-
- // Production thread
- BlockingQueue<BSONObj> _buffer;
-
- // Task executor used to run find/getMore commands on sync source.
- executor::ThreadPoolTaskExecutor _threadPoolTaskExecutor;
-
- // _mutex protects all of the class variables except _buffer
- mutable stdx::mutex _mutex;
-
- OpTime _lastOpTimeFetched;
-
- // lastFetchedHash is used to match ops to determine if we need to rollback, when
- // a secondary.
- long long _lastFetchedHash;
-
- // if producer thread should not be running
- bool _stopped;
-
- HostAndPort _syncSourceHost;
-
BackgroundSync();
BackgroundSync(const BackgroundSync& s);
BackgroundSync operator=(const BackgroundSync& s);
@@ -181,15 +163,18 @@ private:
void _signalNoNewDataForApplier();
/**
- * Processes query responses from fetcher.
+ * Record metrics.
*/
- void _fetcherCallback(const StatusWith<Fetcher::QueryResponse>& result,
- BSONObjBuilder* bob,
- const HostAndPort& source,
- OpTime lastOpTimeFetched,
- long long lastFetchedHash,
- Milliseconds fetcherMaxTimeMS,
- Status* returnStatus);
+ void _recordStats(const OplogFetcher::DocumentsInfo& info, Milliseconds getMoreElapsedTime);
+
+ /**
+ * Checks current background sync state before pushing operations into blocking queue and
+ * updating metrics. If the queue is full, might block.
+ */
+ void _enqueueDocuments(Fetcher::Documents::const_iterator begin,
+ Fetcher::Documents::const_iterator end,
+ const OplogFetcher::DocumentsInfo& info,
+ Milliseconds elapsed);
/**
* Executes a rollback.
@@ -204,20 +189,53 @@ private:
long long _readLastAppliedHash(OperationContext* txn);
+ static BackgroundSync* s_instance;
+ // protects creation of s_instance
+ static stdx::mutex s_mutex;
+
+ // Production thread
+ BlockingQueue<BSONObj> _buffer;
+
+ // Task executor used to run find/getMore commands on sync source.
+ executor::ThreadPoolTaskExecutor _threadPoolTaskExecutor;
+
+ // bool for indicating resync need on this node and the mutex that protects it
+ // The resync command sets this flag; the Applier thread observes and clears it.
+ mutable stdx::mutex _initialSyncMutex;
+ bool _initialSyncRequestedFlag = false;
+
+ // This setting affects the Applier prefetcher behavior.
+ mutable stdx::mutex _indexPrefetchMutex;
+ IndexPrefetchConfig _indexPrefetchConfig = PREFETCH_ALL;
+
// A pointer to the replication coordinator running the show.
ReplicationCoordinator* _replCoord;
+ // Data replicator external state required by the oplog fetcher.
+ // Owned by us.
+ std::unique_ptr<DataReplicatorExternalState> _dataReplicatorExternalState;
+
// Used to determine sync source.
// TODO(dannenberg) move into DataReplicator.
SyncSourceResolver _syncSourceResolver;
- // bool for indicating resync need on this node and the mutex that protects it
- // The resync command sets this flag; the Applier thread observes and clears it.
- bool _initialSyncRequestedFlag;
- stdx::mutex _initialSyncMutex;
+ // _mutex protects all of the class variables declared below.
+ mutable stdx::mutex _mutex;
- // This setting affects the Applier prefetcher behavior.
- IndexPrefetchConfig _indexPrefetchConfig;
+ OpTime _lastOpTimeFetched;
+
+ // lastFetchedHash is used to match ops to determine if we need to rollback, when
+ // a secondary.
+ long long _lastFetchedHash = 0LL;
+
+ // if producer thread should not be running
+ bool _stopped = true;
+
+ HostAndPort _syncSourceHost;
+
+ // Current oplog fetcher tailing the oplog on the sync source.
+ // Owned by us.
+ std::unique_ptr<OplogFetcher> _oplogFetcher;
};