summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/data_replicator_external_state_impl.cpp
diff options
context:
space:
mode:
authorJudah Schvimer <judah@mongodb.com>2017-02-15 11:49:36 -0500
committerJudah Schvimer <judah@mongodb.com>2017-02-15 11:49:36 -0500
commit7284884f3d8f3cf1d1489579180c2637efcc42b2 (patch)
tree7e2749c6940232f2a70a182e6856bfb0c97f2454 /src/mongo/db/repl/data_replicator_external_state_impl.cpp
parentf6006942e76377c9434a61e76a7803eb83430591 (diff)
downloadmongo-7284884f3d8f3cf1d1489579180c2637efcc42b2.tar.gz
SERVER-27543 Process OplogQueryMetadata with backwards and forwards compatibility
Diffstat (limited to 'src/mongo/db/repl/data_replicator_external_state_impl.cpp')
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_impl.cpp45
1 files changed, 35 insertions, 10 deletions
diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.cpp b/src/mongo/db/repl/data_replicator_external_state_impl.cpp
index 238f4717aa6..ab166f3bdc2 100644
--- a/src/mongo/db/repl/data_replicator_external_state_impl.cpp
+++ b/src/mongo/db/repl/data_replicator_external_state_impl.cpp
@@ -60,21 +60,46 @@ OpTimeWithTerm DataReplicatorExternalStateImpl::getCurrentTermAndLastCommittedOp
return {_replicationCoordinator->getTerm(), _replicationCoordinator->getLastCommittedOpTime()};
}
-void DataReplicatorExternalStateImpl::processMetadata(const rpc::ReplSetMetadata& metadata) {
- _replicationCoordinator->processReplSetMetadata(metadata, true /*advance the commit point*/);
- if (metadata.getPrimaryIndex() != rpc::ReplSetMetadata::kNoPrimary) {
+void DataReplicatorExternalStateImpl::processMetadata(
+ const rpc::ReplSetMetadata& replMetadata, boost::optional<rpc::OplogQueryMetadata> oqMetadata) {
+ OpTime newCommitPoint;
+ // If OplogQueryMetadata was provided, use its values, otherwise use the ones in
+ // ReplSetMetadata.
+ if (oqMetadata) {
+ newCommitPoint = oqMetadata->getLastOpCommitted();
+ } else {
+ newCommitPoint = replMetadata.getLastOpCommitted();
+ }
+ _replicationCoordinator->advanceCommitPoint(newCommitPoint);
+
+ _replicationCoordinator->processReplSetMetadata(replMetadata);
+
+ if ((oqMetadata && (oqMetadata->getPrimaryIndex() != rpc::OplogQueryMetadata::kNoPrimary)) ||
+ (replMetadata.getPrimaryIndex() != rpc::ReplSetMetadata::kNoPrimary)) {
_replicationCoordinator->cancelAndRescheduleElectionTimeout();
}
}
-bool DataReplicatorExternalStateImpl::shouldStopFetching(const HostAndPort& source,
- const rpc::ReplSetMetadata& metadata) {
+bool DataReplicatorExternalStateImpl::shouldStopFetching(
+ const HostAndPort& source,
+ const rpc::ReplSetMetadata& replMetadata,
+ boost::optional<rpc::OplogQueryMetadata> oqMetadata) {
// Re-evaluate quality of sync target.
- if (_replicationCoordinator->shouldChangeSyncSource(source, metadata)) {
- log()
- << "Canceling oplog query because we have to choose a new sync source. Current source: "
- << source << ", OpTime " << metadata.getLastOpVisible()
- << ", its sync source index:" << metadata.getSyncSourceIndex();
+ if (_replicationCoordinator->shouldChangeSyncSource(source, replMetadata, oqMetadata)) {
+ // If OplogQueryMetadata was provided, its values were used to determine if we should
+ // change sync sources.
+ if (oqMetadata) {
+ log() << "Canceling oplog query due to OplogQueryMetadata. We have to choose a new "
+ "sync source. Current source: "
+ << source << ", OpTime " << oqMetadata->getLastOpApplied()
+ << ", its sync source index:" << oqMetadata->getSyncSourceIndex();
+
+ } else {
+ log() << "Canceling oplog query due to ReplSetMetadata. We have to choose a new sync "
+ "source. Current source: "
+ << source << ", OpTime " << replMetadata.getLastOpVisible()
+ << ", its sync source index:" << replMetadata.getSyncSourceIndex();
+ }
return true;
}
return false;