diff options
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 47 |
1 files changed, 37 insertions, 10 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 19b25760397..27a4b3cb221 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -54,6 +54,7 @@ #include "mongo/db/stats/timer_stats.h" #include "mongo/executor/network_interface_factory.h" #include "mongo/executor/thread_pool_task_executor.h" +#include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/stdx/memory.h" #include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/exit.h" @@ -328,16 +329,24 @@ void BackgroundSync::_produce(OperationContext* txn, executor::TaskExecutor* tas lastHashFetched, &remoteOplogStartStatus); - auto cmdObj = BSON("find" << nsToCollectionSubstring(rsOplogName) << "filter" - << BSON("ts" << BSON("$gte" << lastOpTimeFetched.getTimestamp())) - << "tailable" << true << "oplogReplay" << true << "awaitData" << true - << "maxTimeMS" << durationCount<Milliseconds>(fetcherMaxTimeMS)); - Fetcher fetcher(taskExecutor, - source, - nsToDatabase(rsOplogName), - cmdObj, - fetcherCallback, - rpc::makeEmptyMetadata()); + BSONObjBuilder cmdBob; + cmdBob.append("find", nsToCollectionSubstring(rsOplogName)); + cmdBob.append("filter", BSON("ts" << BSON("$gte" << lastOpTimeFetched.getTimestamp()))); + cmdBob.append("tailable", true); + cmdBob.append("oplogReplay", true); + cmdBob.append("awaitData", true); + cmdBob.append("maxTimeMS", durationCount<Milliseconds>(fetcherMaxTimeMS)); + + BSONObjBuilder metadataBob; + if (_replCoord->isV1ElectionProtocol()) { + cmdBob.append("term", _replCoord->getTerm()); + metadataBob.append(rpc::kReplSetMetadataFieldName, 1); + } + + auto cmdObj = cmdBob.obj(); + auto metadataObj = metadataBob.obj(); + Fetcher fetcher( + taskExecutor, source, nsToDatabase(rsOplogName), cmdObj, fetcherCallback, metadataObj); auto scheduleStatus = fetcher.schedule(); if (!scheduleStatus.isOK()) { warning() << "unable to schedule fetcher to read remote oplog on " << source << ": " @@ -396,6 +405,21 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& } const auto& queryResponse = result.getValue(); + + // Forward metadata (containing liveness information) to replication coordinator. + bool receivedMetadata = + queryResponse.otherFields.metadata.hasElement(rpc::kReplSetMetadataFieldName); + if (receivedMetadata) { + auto metadataResult = + rpc::ReplSetMetadata::readFromMetadata(queryResponse.otherFields.metadata); + if (!metadataResult.isOK()) { + error() << "invalid replication metadata from sync source " << source << ": " + << metadataResult.getStatus() << ": " << queryResponse.otherFields.metadata; + return; + } + _replCoord->processReplSetMetadata(metadataResult.getValue()); + } + const auto& documents = queryResponse.documents; auto documentBegin = documents.cbegin(); auto documentEnd = documents.cend(); @@ -507,6 +531,9 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& bob->append("getMore", queryResponse.cursorId); bob->append("collection", queryResponse.nss.coll()); bob->append("maxTimeMS", durationCount<Milliseconds>(fetcherMaxTimeMS)); + if (receivedMetadata) { + bob->append("term", _replCoord->getTerm()); + } } bool BackgroundSync::_shouldChangeSyncSource(const HostAndPort& syncSource) { |