summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r--src/mongo/db/repl/bgsync.cpp47
1 files changed, 37 insertions, 10 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 19b25760397..27a4b3cb221 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -54,6 +54,7 @@
#include "mongo/db/stats/timer_stats.h"
#include "mongo/executor/network_interface_factory.h"
#include "mongo/executor/thread_pool_task_executor.h"
+#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/exit.h"
@@ -328,16 +329,24 @@ void BackgroundSync::_produce(OperationContext* txn, executor::TaskExecutor* tas
lastHashFetched,
&remoteOplogStartStatus);
- auto cmdObj = BSON("find" << nsToCollectionSubstring(rsOplogName) << "filter"
- << BSON("ts" << BSON("$gte" << lastOpTimeFetched.getTimestamp()))
- << "tailable" << true << "oplogReplay" << true << "awaitData" << true
- << "maxTimeMS" << durationCount<Milliseconds>(fetcherMaxTimeMS));
- Fetcher fetcher(taskExecutor,
- source,
- nsToDatabase(rsOplogName),
- cmdObj,
- fetcherCallback,
- rpc::makeEmptyMetadata());
+ BSONObjBuilder cmdBob;
+ cmdBob.append("find", nsToCollectionSubstring(rsOplogName));
+ cmdBob.append("filter", BSON("ts" << BSON("$gte" << lastOpTimeFetched.getTimestamp())));
+ cmdBob.append("tailable", true);
+ cmdBob.append("oplogReplay", true);
+ cmdBob.append("awaitData", true);
+ cmdBob.append("maxTimeMS", durationCount<Milliseconds>(fetcherMaxTimeMS));
+
+ BSONObjBuilder metadataBob;
+ if (_replCoord->isV1ElectionProtocol()) {
+ cmdBob.append("term", _replCoord->getTerm());
+ metadataBob.append(rpc::kReplSetMetadataFieldName, 1);
+ }
+
+ auto cmdObj = cmdBob.obj();
+ auto metadataObj = metadataBob.obj();
+ Fetcher fetcher(
+ taskExecutor, source, nsToDatabase(rsOplogName), cmdObj, fetcherCallback, metadataObj);
auto scheduleStatus = fetcher.schedule();
if (!scheduleStatus.isOK()) {
warning() << "unable to schedule fetcher to read remote oplog on " << source << ": "
@@ -396,6 +405,21 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>&
}
const auto& queryResponse = result.getValue();
+
+ // Forward metadata (containing liveness information) to replication coordinator.
+ bool receivedMetadata =
+ queryResponse.otherFields.metadata.hasElement(rpc::kReplSetMetadataFieldName);
+ if (receivedMetadata) {
+ auto metadataResult =
+ rpc::ReplSetMetadata::readFromMetadata(queryResponse.otherFields.metadata);
+ if (!metadataResult.isOK()) {
+ error() << "invalid replication metadata from sync source " << source << ": "
+ << metadataResult.getStatus() << ": " << queryResponse.otherFields.metadata;
+ return;
+ }
+ _replCoord->processReplSetMetadata(metadataResult.getValue());
+ }
+
const auto& documents = queryResponse.documents;
auto documentBegin = documents.cbegin();
auto documentEnd = documents.cend();
@@ -507,6 +531,9 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>&
bob->append("getMore", queryResponse.cursorId);
bob->append("collection", queryResponse.nss.coll());
bob->append("maxTimeMS", durationCount<Milliseconds>(fetcherMaxTimeMS));
+ if (receivedMetadata) {
+ bob->append("term", _replCoord->getTerm());
+ }
}
bool BackgroundSync::_shouldChangeSyncSource(const HostAndPort& syncSource) {