summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2016-07-14 19:36:25 -0400
committerBenety Goh <benety@mongodb.com>2016-07-16 15:37:15 -0400
commitd3a4a9d72bbd99f3f5947fe52851c1fac17073d5 (patch)
tree4d650b47d1ae8e8e5306e0256cb6b9f8a58cc15f
parent31d1c5473e1c5e21646267bd81cb1afbab677672 (diff)
downloadmongo-d3a4a9d72bbd99f3f5947fe52851c1fac17073d5.tar.gz
SERVER-25081 DataReplicator should update the global timestamp at the end of each batch and when initial sync completes
-rw-r--r--src/mongo/db/repl/data_replicator.cpp3
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp8
2 files changed, 7 insertions, 4 deletions
diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp
index 731d5114220..d314850c060 100644
--- a/src/mongo/db/repl/data_replicator.cpp
+++ b/src/mongo/db/repl/data_replicator.cpp
@@ -718,6 +718,7 @@ StatusWith<OpTimeWithHash> DataReplicator::doInitialSync(OperationContext* txn)
_storage->clearInitialSyncFlag(txn);
_storage->setMinValid(txn, _lastApplied.opTime, DurableRequirement::Strong);
+ _opts.setMyLastOptime(_lastApplied.opTime);
log() << "initial sync done; took " << t.millis() << " milliseconds.";
return _lastApplied;
}
@@ -1078,7 +1079,7 @@ void DataReplicator::_onApplyBatchFinish(const StatusWith<Timestamp>& ts,
_lastApplied = uassertStatusOK(opTimeWithHashStatus);
lk.unlock();
- _opts.setMyLastOptime(OpTime(ts.getValue(), OpTime::kUninitializedTerm));
+ _opts.setMyLastOptime(_lastApplied.opTime);
lk.lock();
if (_reporter) {
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 6e6f1de4b66..d810ee1674f 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -206,7 +206,8 @@ ReplicationCoordinator::Mode getReplicationModeFromSettings(const ReplSettings&
return ReplicationCoordinator::modeNone;
}
-DataReplicatorOptions createDataReplicatorOptions(ReplicationCoordinator* replCoord) {
+DataReplicatorOptions createDataReplicatorOptions(
+ ReplicationCoordinator* replCoord, ReplicationCoordinatorExternalState* externalState) {
DataReplicatorOptions options;
options.rollbackFn = [](OperationContext*, const OpTime&, const HostAndPort&) -> Status {
return Status::OK();
@@ -217,8 +218,9 @@ DataReplicatorOptions createDataReplicatorOptions(ReplicationCoordinator* replCo
return replCoord->prepareReplSetUpdatePositionCommand(commandStyle);
};
options.getMyLastOptime = [replCoord]() { return replCoord->getMyLastAppliedOpTime(); };
- options.setMyLastOptime = [replCoord](const OpTime& opTime) {
+ options.setMyLastOptime = [replCoord, externalState](const OpTime& opTime) {
replCoord->setMyLastAppliedOpTime(opTime);
+ externalState->setGlobalTimestamp(opTime.getTimestamp());
};
options.setFollowerMode = [replCoord](const MemberState& newState) {
return replCoord->setFollowerMode(newState);
@@ -508,7 +510,7 @@ void ReplicationCoordinatorImpl::_startDataReplication(OperationContext* txn) {
if (_externalState->shouldUseDataReplicatorInitialSync()) {
_externalState->runOnInitialSyncThread([this](OperationContext* txn) {
DataReplicator dr(
- createDataReplicatorOptions(this),
+ createDataReplicatorOptions(this, _externalState.get()),
stdx::make_unique<DataReplicatorExternalStateImpl>(this, _externalState.get()),
_storage);
const auto status = dr.doInitialSync(txn);