summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2017-01-24 14:06:46 -0500
committerBenety Goh <benety@mongodb.com>2017-01-24 17:09:46 -0500
commit4a6efad4d422b9a06ff0b7e98bfc9b7cc63b5864 (patch)
tree4db363699df81580a546eb41b3e2bc43e431c177 /src
parent76af3d246d482d62520b386e5c1f0b777c367fc6 (diff)
downloadmongo-4a6efad4d422b9a06ff0b7e98bfc9b7cc63b5864.tar.gz
SERVER-27123 Only update the commit point as a secondary from oplog queries against your sync source
(cherry picked from commit 87f49488f1b5c872daa71fd2fd9b5d744409a817) SERVER-27680 Merge stopOplogFetcher and pauseRsBgSyncProducer failpoint into single stopReplProducer failpoint (cherry picked from commit 21948042b6da5fb5bf15897f9808a70551f5af09) SERVER-27053 Don't acknowledge writes if the term has changed. (cherry picked from commit 8347e322cd46e8ee847e1730a7e94ea8e3981c53)
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/bgsync.cpp79
-rw-r--r--src/mongo/db/repl/replication_coordinator.h13
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp60
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h28
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp49
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_test.cpp367
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp3
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h3
-rw-r--r--src/mongo/db/repl/replset_commands.cpp2
-rw-r--r--src/mongo/db/write_concern.cpp5
-rw-r--r--src/mongo/shell/assert.js27
12 files changed, 444 insertions, 194 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 37ba22bf1a7..75a91db536d 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -117,7 +117,7 @@ Status checkRemoteOplogStart(stdx::function<StatusWith<BSONObj>()> getNextOperat
} // namespace
-MONGO_FP_DECLARE(rsBgSyncProduce);
+MONGO_FP_DECLARE(stopReplProducer);
BackgroundSync* BackgroundSync::s_instance = 0;
stdx::mutex BackgroundSync::s_mutex;
@@ -133,6 +133,9 @@ static ServerStatusMetricField<Counter64> displayOpsRead("repl.network.ops", &op
static Counter64 networkByteStats;
static ServerStatusMetricField<Counter64> displayBytesRead("repl.network.bytes", &networkByteStats);
+// Failpoint which causes rollback to hang before starting.
+MONGO_FP_DECLARE(rollbackHangBeforeStart);
+
// The count of items in the buffer
static Counter64 bufferCountGauge;
static ServerStatusMetricField<Counter64> displayBufferCount("repl.buffer.count",
@@ -262,6 +265,21 @@ void BackgroundSync::_producerThread() {
}
void BackgroundSync::_produce(OperationContext* txn) {
+ if (MONGO_FAIL_POINT(stopReplProducer)) {
+ // This log output is used in js tests so please leave it.
+ log() << "bgsync - stopReplProducer fail point "
+ "enabled. Blocking until fail point is disabled.";
+
+ // TODO(SERVER-27120): Remove the return statement and uncomment the while loop.
+ // Currently we cannot block here or we prevent primaries from being fully elected since
+ // we'll never call _signalNoNewDataForApplier.
+ // while (MONGO_FAIL_POINT(stopReplProducer) && !inShutdown()) {
+ // mongo::sleepsecs(1);
+ // }
+ mongo::sleepsecs(1);
+ return;
+ }
+
// this oplog reader does not do a handshake because we don't want the server it's syncing
// from to track how far it has synced
{
@@ -280,10 +298,6 @@ void BackgroundSync::_produce(OperationContext* txn) {
}
}
- while (MONGO_FAIL_POINT(rsBgSyncProduce)) {
- sleepmillis(0);
- }
-
// find a target to sync from the last optime fetched
OpTime lastOpTimeFetched;
@@ -478,26 +492,6 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>&
bool syncSourceHasSyncSource = false;
OpTime sourcesLastOp;
- // Forward metadata (containing liveness information) to replication coordinator.
- bool receivedMetadata =
- queryResponse.otherFields.metadata.hasElement(rpc::kReplSetMetadataFieldName);
- if (receivedMetadata) {
- auto metadataResult =
- rpc::ReplSetMetadata::readFromMetadata(queryResponse.otherFields.metadata);
- if (!metadataResult.isOK()) {
- error() << "invalid replication metadata from sync source " << source << ": "
- << metadataResult.getStatus() << ": " << queryResponse.otherFields.metadata;
- return;
- }
- const auto& metadata = metadataResult.getValue();
- _replCoord->processReplSetMetadata(metadata);
- if (metadata.getPrimaryIndex() != rpc::ReplSetMetadata::kNoPrimary) {
- _replCoord->cancelAndRescheduleElectionTimeout();
- }
- syncSourceHasSyncSource = metadata.getSyncSourceIndex() != -1;
- sourcesLastOp = metadata.getLastOpVisible();
- }
-
const auto& documents = queryResponse.documents;
auto firstDocToApply = documents.cbegin();
auto lastDocToApply = documents.cend();
@@ -576,6 +570,32 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>&
return;
}
+ if (MONGO_FAIL_POINT(stopReplProducer)) {
+ return;
+ }
+
+ // Process replset metadata. It is important that this happen after we've validated the
+ // first batch, so we don't progress our knowledge of the commit point from a
+ // response that triggers a rollback.
+ bool receivedMetadata =
+ queryResponse.otherFields.metadata.hasElement(rpc::kReplSetMetadataFieldName);
+ if (receivedMetadata) {
+ auto metadataResult =
+ rpc::ReplSetMetadata::readFromMetadata(queryResponse.otherFields.metadata);
+ if (!metadataResult.isOK()) {
+ error() << "invalid replication metadata from sync source " << source << ": "
+ << metadataResult.getStatus() << ": " << queryResponse.otherFields.metadata;
+ return;
+ }
+ const auto& metadata = metadataResult.getValue();
+ _replCoord->processReplSetMetadata(metadata, true /*advance commit point*/);
+ if (metadata.getPrimaryIndex() != rpc::ReplSetMetadata::kNoPrimary) {
+ _replCoord->cancelAndRescheduleElectionTimeout();
+ }
+ syncSourceHasSyncSource = metadata.getSyncSourceIndex() != -1;
+ sourcesLastOp = metadata.getLastOpVisible();
+ }
+
// The count of the bytes of the documents read off the network.
int networkDocumentBytes = 0;
Timestamp lastTS;
@@ -741,6 +761,15 @@ void BackgroundSync::_rollback(OperationContext* txn,
const HostAndPort& source,
boost::optional<int> requiredRBID,
stdx::function<DBClientBase*()> getConnection) {
+ if (MONGO_FAIL_POINT(rollbackHangBeforeStart)) {
+ // This log output is used in js tests so please leave it.
+ log() << "rollback - rollbackHangBeforeStart fail point "
+ "enabled. Blocking until fail point is disabled.";
+ while (MONGO_FAIL_POINT(rollbackHangBeforeStart) && !inShutdown()) {
+ mongo::sleepsecs(1);
+ }
+ }
+
// Set state to ROLLBACK while we are in this function. This prevents serving reads, even from
// the oplog. This can fail if we are elected PRIMARY, in which case we better not do any
// rolling back. If we successfully enter ROLLBACK we will only exit this function fatally or
diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h
index 18ccf45a224..57a2c426817 100644
--- a/src/mongo/db/repl/replication_coordinator.h
+++ b/src/mongo/db/repl/replication_coordinator.h
@@ -466,14 +466,13 @@ public:
virtual void processReplSetGetConfig(BSONObjBuilder* result) = 0;
/**
- * Processes the ReplSetMetadata returned from a command run against another replica set
- * member and updates protocol version 1 information (most recent optime that is committed,
- * member id of the current PRIMARY, the current config version and the current term).
- *
- * TODO(dannenberg): Move this method to be testing only if it does not end up being used
- * to process the find and getmore metadata responses from the DataReplicator.
+ * Processes the ReplSetMetadata returned from a command run against another
+ * replica set member and so long as the config version in the metadata matches the replica set
+ * config version this node currently has, updates the current term and optionally updates
+ * this node's notion of the commit point.
*/
- virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) = 0;
+ virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata,
+ bool advanceCommitPoint) = 0;
/**
* Elections under protocol version 1 are triggered by a timer.
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 7c30cd983dd..67102a30033 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -1445,10 +1445,38 @@ ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::_awaitRepl
return StatusAndDuration(Status::OK(), Milliseconds(timer->millis()));
}
- if (replMode == modeReplSet && !_memberState.primary()) {
- return StatusAndDuration(
- Status(ErrorCodes::NotMaster, "Not master while waiting for replication"),
- Milliseconds(timer->millis()));
+ auto checkForStepDown = [&]() -> Status {
+ if (replMode == modeReplSet && !_memberState.primary()) {
+ return {ErrorCodes::NotMaster, "Primary stepped down while waiting for replication"};
+ }
+
+ // Relax term checking under 3.2 because some commands (eg. createIndexes) might not return
+ // a term in the response metadata to mongos which may pass the no-term OpTime back to
+ // mongod eventually.
+ if (opTime.getTerm() != OpTime::kUninitializedTerm &&
+ _cachedTerm != OpTime::kUninitializedTerm && opTime.getTerm() != _cachedTerm) {
+ return {
+ ErrorCodes::NotMaster,
+ str::stream() << "Term changed from " << opTime.getTerm() << " to " << _cachedTerm
+ << " while waiting for replication, indicating that this node must "
+ "have stepped down."};
+ }
+
+ if (_stepDownPending) {
+ return {ErrorCodes::NotMaster,
+ "Received stepdown request while waiting for replication"};
+ }
+ return Status::OK();
+ };
+
+ Status stepdownStatus = checkForStepDown();
+ if (!stepdownStatus.isOK()) {
+ return StatusAndDuration(stepdownStatus, Milliseconds(timer->millis()));
+ }
+
+ auto interruptStatus = txn->checkForInterruptNoAssert();
+ if (!interruptStatus.isOK()) {
+ return StatusAndDuration(interruptStatus, Milliseconds(timer->millis()));
}
if (writeConcern.wMode.empty()) {
@@ -1470,6 +1498,7 @@ ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::_awaitRepl
return StatusAndDuration(interruptedStatus, elapsed);
}
+
if (!waitInfo.master) {
return StatusAndDuration(Status(ErrorCodes::NotMaster,
"Not master anymore while waiting for replication"
@@ -1506,6 +1535,11 @@ ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::_awaitRepl
} else {
condVar.wait_for(*lock, waitTime);
}
+
+ stepdownStatus = checkForStepDown();
+ if (!stepdownStatus.isOK()) {
+ return StatusAndDuration(stepdownStatus, elapsed);
+ }
}
Status status = _checkIfWriteConcernCanBeSatisfied_inlock(writeConcern);
@@ -2001,10 +2035,11 @@ void ReplicationCoordinatorImpl::processReplSetGetConfig(BSONObjBuilder* result)
result->append("config", _rsConfig.toBSON());
}
-void ReplicationCoordinatorImpl::processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) {
+void ReplicationCoordinatorImpl::processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata,
+ bool advanceCommitPoint) {
EventHandle evh;
- _scheduleWorkAndWaitForCompletion([this, &evh, &replMetadata](const CallbackArgs& args) {
- evh = _processReplSetMetadata_incallback(replMetadata);
+ _scheduleWorkAndWaitForCompletion([&](const CallbackArgs& args) {
+ evh = _processReplSetMetadata_incallback(replMetadata, advanceCommitPoint);
});
if (evh.isValid()) {
_replExecutor.waitForEvent(evh);
@@ -2017,11 +2052,13 @@ void ReplicationCoordinatorImpl::cancelAndRescheduleElectionTimeout() {
}
EventHandle ReplicationCoordinatorImpl::_processReplSetMetadata_incallback(
- const rpc::ReplSetMetadata& replMetadata) {
+ const rpc::ReplSetMetadata& replMetadata, bool advanceCommitPoint) {
if (replMetadata.getConfigVersion() != _rsConfig.getConfigVersion()) {
return EventHandle();
}
- _setLastCommittedOpTime(replMetadata.getLastOpCommitted());
+ if (advanceCommitPoint) {
+ _setLastCommittedOpTime(replMetadata.getLastOpCommitted());
+ }
return _updateTerm_incallback(replMetadata.getTerm());
}
@@ -2527,6 +2564,7 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock() {
info->condVar->notify_all();
}
_canAcceptNonLocalWrites = false;
+ _stepDownPending = false;
result = kActionCloseAllConnections;
} else {
result = kActionFollowerModeStateChange;
@@ -3103,7 +3141,7 @@ bool ReplicationCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& curre
}
void ReplicationCoordinatorImpl::_updateLastCommittedOpTime_inlock() {
- if (!_getMemberState_inlock().primary()) {
+ if (!_getMemberState_inlock().primary() || _stepDownPending) {
return;
}
@@ -3509,7 +3547,7 @@ EventHandle ReplicationCoordinatorImpl::_updateTerm_incallback(
if (localUpdateTermResult == TopologyCoordinator::UpdateTermResult::kTriggerStepDown) {
log() << "stepping down from primary, because a new term has begun: " << term;
_topCoord->prepareForStepDown();
- return _stepDownStart();
+ return _stepDownStart(false);
}
return EventHandle();
}
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 3c6bbaefdbb..e2b2bea1298 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -217,7 +217,8 @@ public:
virtual void processReplSetGetConfig(BSONObjBuilder* result) override;
- virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) override;
+ virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata,
+ bool advanceCommitPoint) override;
virtual void cancelAndRescheduleElectionTimeout() override;
@@ -1024,7 +1025,10 @@ private:
*/
void _requestRemotePrimaryStepdown(const HostAndPort& target);
- ReplicationExecutor::EventHandle _stepDownStart();
+ /**
+ * Schedules stepdown to run with the global exclusive lock.
+ */
+ ReplicationExecutor::EventHandle _stepDownStart(bool hasMutex);
/**
* Completes a step-down of the current node. Must be run with a global
@@ -1063,9 +1067,11 @@ private:
* Utility method that schedules or performs actions specified by a HeartbeatResponseAction
* returned by a TopologyCoordinator::processHeartbeatResponse(V1) call with the given
* value of "responseStatus".
+ * 'hasMutex' is true if the caller is holding _mutex. TODO(SERVER-27083): Remove this.
*/
void _handleHeartbeatResponseAction(const HeartbeatResponseAction& action,
- const StatusWith<ReplSetHeartbeatResponse>& responseStatus);
+ const StatusWith<ReplSetHeartbeatResponse>& responseStatus,
+ bool hasMutex);
/**
* Bottom half of processHeartbeat(), which runs in the replication executor.
@@ -1115,11 +1121,13 @@ private:
/**
* Callback that processes the ReplSetMetadata returned from a command run against another
- * replica set member and updates protocol version 1 information (most recent optime that is
- * committed, member id of the current PRIMARY, the current config version and the current term)
+ * replica set member and so long as the config version in the metadata matches the replica set
+ * config version this node currently has, updates the current term and optionally updates
+ * this node's notion of the commit point.
* Returns the finish event which is invalid if the process has already finished.
*/
- EventHandle _processReplSetMetadata_incallback(const rpc::ReplSetMetadata& replMetadata);
+ EventHandle _processReplSetMetadata_incallback(const rpc::ReplSetMetadata& replMetadata,
+ bool advanceCommitPoint);
/**
* Blesses a snapshot to be used for new committed reads.
@@ -1281,6 +1289,14 @@ private:
// TODO: ideally this should only change on rollbacks NOT on mongod restarts also.
int _rbid; // (M)
+ // Indicates that we've received a request to stepdown from PRIMARY (likely via a heartbeat)
+ // TODO(SERVER-27083): This bool is redundant of the same-named bool in TopologyCoordinatorImpl,
+ // but due to mutex ordering between _mutex and _topoMutex we can't inspect the
+ // TopologyCoordinator field in awaitReplication() where this bool is used. Once we get rid
+ // of topoMutex and start guarding access to the TopologyCoordinator via _mutex we should
+ // consolidate the two bools.
+ bool _stepDownPending = false; // (M)
+
// list of information about clients waiting on replication. Does *not* own the WaiterInfos.
std::vector<WaiterInfo*> _replicationWaiterList; // (M)
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
index 4c23bab4f5b..f89f1592170 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
@@ -64,6 +64,8 @@ typedef ReplicationExecutor::CallbackHandle CBHandle;
using executor::RemoteCommandRequest;
+MONGO_FP_DECLARE(blockHeartbeatStepdown);
+
void ReplicationCoordinatorImpl::_doMemberHeartbeat(ReplicationExecutor::CallbackArgs cbData,
const HostAndPort& target,
int targetIndex) {
@@ -152,7 +154,9 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
if (replMetadata.isOK()) {
// Asynchronous stepdown could happen, but it will be queued in executor after
// this function, so we cannot and don't need to wait for it to finish.
- _processReplSetMetadata_incallback(replMetadata.getValue());
+ // Arbiters are the only nodes allowed to advance their commit point via heartbeats.
+ bool advanceCommitPoint = getMemberState().arbiter();
+ _processReplSetMetadata_incallback(replMetadata.getValue(), advanceCommitPoint);
}
}
const Date_t now = _replExecutor.now();
@@ -164,10 +168,11 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
networkTime = cbData.response.getValue().elapsedMillis;
// TODO(sz) Because the term is duplicated in ReplSetMetaData, we can get rid of this
// and update tests.
- _updateTerm_incallback(hbStatusResponse.getValue().getTerm());
- // Postpone election timeout if we have a successful heartbeat response from the primary.
const auto& hbResponse = hbStatusResponse.getValue();
- if (hbResponse.hasState() && hbResponse.getState().primary()) {
+ _updateTerm_incallback(hbResponse.getTerm());
+ // Postpone election timeout if we have a successful heartbeat response from the primary.
+ if (hbResponse.hasState() && hbResponse.getState().primary() &&
+ hbResponse.getTerm() == _topCoord->getTerm()) {
cancelAndRescheduleElectionTimeout();
}
} else {
@@ -206,7 +211,7 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
_scheduleHeartbeatToTarget(
target, targetIndex, std::max(now, action.getNextHeartbeatStartDate()));
- _handleHeartbeatResponseAction(action, hbStatusResponse);
+ _handleHeartbeatResponseAction(action, hbStatusResponse, false /*we're not holding _mutex*/);
}
void ReplicationCoordinatorImpl::_updateOpTimesFromHeartbeat_inlock(int targetIndex,
@@ -226,11 +231,13 @@ void ReplicationCoordinatorImpl::_updateOpTimesFromHeartbeat_inlock(int targetIn
void ReplicationCoordinatorImpl::_handleHeartbeatResponseAction(
const HeartbeatResponseAction& action,
- const StatusWith<ReplSetHeartbeatResponse>& responseStatus) {
+ const StatusWith<ReplSetHeartbeatResponse>& responseStatus,
+ bool hasMutex) {
switch (action.getAction()) {
case HeartbeatResponseAction::NoAction:
// Update the cached member state if different than the current topology member state
if (_memberState != _topCoord->getMemberState()) {
+ invariant(!hasMutex);
stdx::unique_lock<stdx::mutex> lk(_mutex);
const PostMemberStateUpdateAction postUpdateAction =
_updateMemberStateFromTopologyCoordinator_inlock();
@@ -250,7 +257,7 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponseAction(
log() << "Stepping down from primary in response to heartbeat";
_topCoord->prepareForStepDown();
// Don't need to wait for stepdown to finish.
- _stepDownStart();
+ _stepDownStart(hasMutex);
break;
case HeartbeatResponseAction::StepDownRemotePrimary: {
invariant(action.getPrimaryConfigIndex() != _selfIndex);
@@ -304,11 +311,19 @@ void ReplicationCoordinatorImpl::_requestRemotePrimaryStepdown(const HostAndPort
}
}
-ReplicationExecutor::EventHandle ReplicationCoordinatorImpl::_stepDownStart() {
+ReplicationExecutor::EventHandle ReplicationCoordinatorImpl::_stepDownStart(bool hasMutex) {
+ {
+ boost::optional<stdx::lock_guard<stdx::mutex>> lk;
+ if (!hasMutex) {
+ lk.emplace(_mutex);
+ }
+ _stepDownPending = true;
+ }
auto finishEvent = _makeEvent();
if (!finishEvent) {
return finishEvent;
}
+
_replExecutor.scheduleWorkWithGlobalExclusiveLock(stdx::bind(
&ReplicationCoordinatorImpl::_stepDownFinish, this, stdx::placeholders::_1, finishEvent));
return finishEvent;
@@ -321,6 +336,20 @@ void ReplicationCoordinatorImpl::_stepDownFinish(
return;
}
invariant(cbData.txn);
+
+ if (MONGO_FAIL_POINT(blockHeartbeatStepdown)) {
+ // Must reschedule rather than block so we don't take up threads in the replication
+ // executor.
+ sleepmillis(10);
+ _replExecutor.scheduleWorkWithGlobalExclusiveLock(
+ stdx::bind(&ReplicationCoordinatorImpl::_stepDownFinish,
+ this,
+ stdx::placeholders::_1,
+ finishedEvent));
+
+ return;
+ }
+
// TODO Add invariant that we've got global shared or global exclusive lock, when supported
// by lock manager.
stdx::unique_lock<stdx::mutex> lk(_mutex);
@@ -623,7 +652,9 @@ void ReplicationCoordinatorImpl::_handleLivenessTimeout(
_topCoord->setMemberAsDown(now, memberIndex, _getMyLastDurableOpTime_inlock());
// Don't mind potential asynchronous stepdown as this is the last step of
// liveness check.
- _handleHeartbeatResponseAction(action, makeStatusWith<ReplSetHeartbeatResponse>());
+ _handleHeartbeatResponseAction(action,
+ makeStatusWith<ReplSetHeartbeatResponse>(),
+ true /*we're holding _mutex*/);
}
}
}
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
index 2233e21cc21..40a30c48e5a 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
@@ -351,7 +351,7 @@ TEST_F(ReplCoordHBV1Test, ArbiterRecordsCommittedOpTimeFromHeartbeatMetadata) {
<< 1 << "primaryIndex" << 1 << "term"
<< committedOpTime.getTerm() << "syncSourceIndex" << 1)));
ASSERT_OK(metadata.getStatus());
- getReplCoord()->processReplSetMetadata(metadata.getValue());
+ getReplCoord()->processReplSetMetadata(metadata.getValue(), true);
ASSERT_EQ(getReplCoord()->getMyLastAppliedOpTime().getTimestamp(), expected.getTimestamp());
};
diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
index 9dfd6f11045..2a52f004700 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
@@ -44,6 +44,7 @@
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/repl/read_concern_response.h"
+#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/repl_set_heartbeat_args.h"
#include "mongo/db/repl/repl_set_heartbeat_args_v1.h"
#include "mongo/db/repl/repl_set_request_votes_args.h"
@@ -81,15 +82,15 @@ using executor::RemoteCommandResponse;
typedef ReplicationCoordinator::ReplSetReconfigArgs ReplSetReconfigArgs;
Status kInterruptedStatus(ErrorCodes::Interrupted, "operation was interrupted");
-// Helper class to wrap Timestamp as an OpTime with term 0.
-struct OpTimeWithTermZero {
- OpTimeWithTermZero(unsigned int sec, unsigned int i) : timestamp(sec, i) {}
+// Helper class to wrap Timestamp as an OpTime with term 1.
+struct OpTimeWithTermOne {
+ OpTimeWithTermOne(unsigned int sec, unsigned int i) : timestamp(sec, i) {}
operator OpTime() const {
- return OpTime(timestamp, 0);
+ return OpTime(timestamp, 1);
}
operator boost::optional<OpTime>() const {
- return OpTime(timestamp, 0);
+ return OpTime(timestamp, 1);
}
OpTime asOpTime() const {
@@ -674,7 +675,7 @@ TEST_F(ReplCoordTest, RollBackIDShouldIncreaseByOneWhenIncrementRollbackIDIsCall
TEST_F(ReplCoordTest, NodeReturnsImmediatelyWhenAwaitReplicationIsRanAgainstAStandaloneNode) {
init("");
OperationContextNoop txn;
- OpTimeWithTermZero time(100, 1);
+ OpTimeWithTermOne time(100, 1);
WriteConcernOptions writeConcern;
writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
@@ -692,7 +693,7 @@ TEST_F(ReplCoordTest, NodeReturnsImmediatelyWhenAwaitReplicationIsRanAgainstAMas
settings.setMaster(true);
init(settings);
OperationContextNoop txn;
- OpTimeWithTermZero time(100, 1);
+ OpTimeWithTermOne time(100, 1);
WriteConcernOptions writeConcern;
writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
@@ -719,7 +720,7 @@ TEST_F(ReplCoordTest, NodeReturnsNotMasterWhenRunningAwaitReplicationAgainstASec
HostAndPort("node1", 12345));
OperationContextNoop txn;
- OpTimeWithTermZero time(100, 1);
+ OpTimeWithTermOne time(100, 1);
WriteConcernOptions writeConcern;
writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
@@ -732,7 +733,7 @@ TEST_F(ReplCoordTest, NodeReturnsNotMasterWhenRunningAwaitReplicationAgainstASec
ASSERT_EQUALS(ErrorCodes::NotMaster, statusAndDur.status);
}
-TEST_F(ReplCoordTest, NodeReturnsOkWhenRunningAwaitReplicationAgainstPrimaryWithWZero) {
+TEST_F(ReplCoordTest, NodeReturnsOkWhenRunningAwaitReplicationAgainstPrimaryWithWTermOne) {
assertStartSuccess(BSON("_id"
<< "mySet"
<< "version" << 2 << "members"
@@ -747,7 +748,7 @@ TEST_F(ReplCoordTest, NodeReturnsOkWhenRunningAwaitReplicationAgainstPrimaryWith
HostAndPort("node1", 12345));
OperationContextNoop txn;
- OpTimeWithTermZero time(100, 1);
+ OpTimeWithTermOne time(100, 1);
WriteConcernOptions writeConcern;
writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
@@ -756,8 +757,8 @@ TEST_F(ReplCoordTest, NodeReturnsOkWhenRunningAwaitReplicationAgainstPrimaryWith
// Become primary.
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
ASSERT(getReplCoord()->getMemberState().primary());
@@ -787,12 +788,12 @@ TEST_F(ReplCoordTest,
<< "_id" << 3))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
- OpTimeWithTermZero time1(100, 1);
- OpTimeWithTermZero time2(100, 2);
+ OpTimeWithTermOne time1(100, 1);
+ OpTimeWithTermOne time2(100, 2);
WriteConcernOptions writeConcern;
writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
@@ -860,12 +861,12 @@ TEST_F(ReplCoordTest, NodeReturnsWriteConcernFailedUntilASufficientNumberOfNodes
<< "_id" << 3))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
- OpTimeWithTermZero time1(100, 1);
- OpTimeWithTermZero time2(100, 2);
+ OpTimeWithTermOne time1(100, 1);
+ OpTimeWithTermOne time2(100, 2);
WriteConcernOptions writeConcern;
writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
@@ -1056,6 +1057,8 @@ TEST_F(
// another name if we didn't get a high enough one.
}
+ auto zeroOpTimeInCurrentTerm = OpTime(Timestamp(0, 0), 1);
+ ReplClientInfo::forClient(txn.getClient()).setLastOp(zeroOpTimeInCurrentTerm);
statusAndDur = getReplCoord()->awaitReplicationOfLastOpForClient(&txn, majorityWriteConcern);
ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
statusAndDur = getReplCoord()->awaitReplicationOfLastOpForClient(&txn, multiDCWriteConcern);
@@ -1158,14 +1161,14 @@ TEST_F(ReplCoordTest, NodeReturnsOkWhenAWriteConcernWithNoTimeoutHasBeenSatisfie
<< "_id" << 2))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
ReplicationAwaiter awaiter(getReplCoord(), &txn);
- OpTimeWithTermZero time1(100, 1);
- OpTimeWithTermZero time2(100, 2);
+ OpTimeWithTermOne time1(100, 1);
+ OpTimeWithTermOne time2(100, 2);
WriteConcernOptions writeConcern;
writeConcern.wTimeout = WriteConcernOptions::kNoTimeout;
@@ -1217,14 +1220,14 @@ TEST_F(ReplCoordTest, NodeReturnsWriteConcernFailedWhenAWriteConcernTimesOutBefo
<< "_id" << 2))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
ReplicationAwaiter awaiter(getReplCoord(), &txn);
- OpTimeWithTermZero time1(100, 1);
- OpTimeWithTermZero time2(100, 2);
+ OpTimeWithTermOne time1(100, 1);
+ OpTimeWithTermOne time2(100, 2);
WriteConcernOptions writeConcern;
writeConcern.wTimeout = 50;
@@ -1258,14 +1261,14 @@ TEST_F(ReplCoordTest,
<< "_id" << 2))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
ReplicationAwaiter awaiter(getReplCoord(), &txn);
- OpTimeWithTermZero time1(100, 1);
- OpTimeWithTermZero time2(100, 2);
+ OpTimeWithTermOne time1(100, 1);
+ OpTimeWithTermOne time2(100, 2);
WriteConcernOptions writeConcern;
writeConcern.wTimeout = WriteConcernOptions::kNoTimeout;
@@ -1300,14 +1303,14 @@ TEST_F(ReplCoordTest, NodeReturnsNotMasterWhenSteppingDownBeforeSatisfyingAWrite
<< "_id" << 2))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
ReplicationAwaiter awaiter(getReplCoord(), &txn);
- OpTimeWithTermZero time1(100, 1);
- OpTimeWithTermZero time2(100, 2);
+ OpTimeWithTermOne time1(100, 1);
+ OpTimeWithTermOne time2(100, 2);
WriteConcernOptions writeConcern;
writeConcern.wTimeout = WriteConcernOptions::kNoTimeout;
@@ -1340,14 +1343,14 @@ TEST_F(ReplCoordTest,
<< "node3"))),
HostAndPort("node1"));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
ReplicationAwaiter awaiter(getReplCoord(), &txn);
- OpTimeWithTermZero time1(100, 1);
- OpTimeWithTermZero time2(100, 2);
+ OpTimeWithTermOne time1(100, 1);
+ OpTimeWithTermOne time2(100, 2);
WriteConcernOptions writeConcern;
writeConcern.wTimeout = WriteConcernOptions::kNoTimeout;
@@ -1523,7 +1526,7 @@ TEST_F(ReplCoordTest, ConcurrentStepDownShouldNotSignalTheSameFinishEventMoreTha
TEST_F(StepDownTest, NodeReturnsNotMasterWhenAskedToStepDownAsANonPrimaryNode) {
OperationContextReplMock txn;
- OpTimeWithTermZero optime1(100, 1);
+ OpTimeWithTermOne optime1(100, 1);
// All nodes are caught up
getReplCoord()->setMyLastAppliedOpTime(optime1);
getReplCoord()->setMyLastDurableOpTime(optime1);
@@ -1538,7 +1541,7 @@ TEST_F(StepDownTest, NodeReturnsNotMasterWhenAskedToStepDownAsANonPrimaryNode) {
TEST_F(StepDownTest,
NodeReturnsExceededTimeLimitWhenStepDownFailsToObtainTheGlobalLockWithinTheAllottedTime) {
OperationContextReplMock txn;
- OpTimeWithTermZero optime1(100, 1);
+ OpTimeWithTermOne optime1(100, 1);
// All nodes are caught up
getReplCoord()->setMyLastAppliedOpTime(optime1);
getReplCoord()->setMyLastDurableOpTime(optime1);
@@ -1759,8 +1762,8 @@ TEST_F(ReplCoordTest, NodeBecomesPrimaryAgainWhenStepDownTimeoutExpiresInASingle
TEST_F(StepDownTest,
NodeReturnsExceededTimeLimitWhenNoSecondaryIsCaughtUpWithinStepDownsSecondaryCatchUpPeriod) {
OperationContextReplMock txn;
- OpTimeWithTermZero optime1(100, 1);
- OpTimeWithTermZero optime2(100, 2);
+ OpTimeWithTermOne optime1(100, 1);
+ OpTimeWithTermOne optime2(100, 2);
// No secondary is caught up
auto repl = getReplCoord();
repl->setMyLastAppliedOpTime(optime2);
@@ -1941,8 +1944,8 @@ TEST_F(StepDownTest,
TEST_F(StepDownTest, NodeReturnsInterruptedWhenInterruptedDuringStepDown) {
const unsigned int opID = 100;
OperationContextReplMock txn{opID};
- OpTimeWithTermZero optime1(100, 1);
- OpTimeWithTermZero optime2(100, 2);
+ OpTimeWithTermOne optime1(100, 1);
+ OpTimeWithTermOne optime2(100, 2);
// No secondary is caught up
auto repl = getReplCoord();
repl->setMyLastAppliedOpTime(optime2);
@@ -2092,9 +2095,9 @@ TEST_F(ReplCoordTest, NodeIncludesOtherMembersProgressInOldUpdatePositionCommand
<< "test2:1234") << BSON("_id" << 2 << "host"
<< "test3:1234"))),
HostAndPort("test1", 1234));
- OpTimeWithTermZero optime1(100, 1);
- OpTimeWithTermZero optime2(100, 2);
- OpTimeWithTermZero optime3(2, 1);
+ OpTimeWithTermOne optime1(100, 1);
+ OpTimeWithTermOne optime2(100, 2);
+ OpTimeWithTermOne optime3(2, 1);
getReplCoord()->setMyLastAppliedOpTime(optime1);
getReplCoord()->setMyLastDurableOpTime(optime1);
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 1, optime2));
@@ -2124,7 +2127,7 @@ TEST_F(ReplCoordTest, NodeIncludesOtherMembersProgressInOldUpdatePositionCommand
ASSERT_EQUALS(2, memberId);
ASSERT_EQUALS(optime3.timestamp, entry["optime"]["ts"].timestamp());
}
- ASSERT_EQUALS(0, entry["optime"]["t"].Number());
+ ASSERT_EQUALS(1, entry["optime"]["t"].Number());
}
ASSERT_EQUALS(3U, memberIds.size()); // Make sure we saw all 3 nodes
}
@@ -2144,8 +2147,8 @@ TEST_F(ReplCoordTest,
HostAndPort("test2", 1234));
OperationContextNoop txn;
getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
// Can't unset maintenance mode if it was never set to begin with.
Status status = getReplCoord()->setMaintenanceMode(false);
@@ -2168,8 +2171,8 @@ TEST_F(ReplCoordTest,
HostAndPort("test2", 1234));
OperationContextNoop txn;
getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
// valid set
ASSERT_OK(getReplCoord()->setMaintenanceMode(true));
ASSERT_TRUE(getReplCoord()->getMemberState().recovering());
@@ -2197,8 +2200,8 @@ TEST_F(ReplCoordTest, AllowAsManyUnsetMaintenanceModesAsThereHaveBeenSetMaintena
HostAndPort("test2", 1234));
OperationContextNoop txn;
getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
// Can set multiple times
ASSERT_OK(getReplCoord()->setMaintenanceMode(true));
ASSERT_OK(getReplCoord()->setMaintenanceMode(true));
@@ -2228,8 +2231,8 @@ TEST_F(ReplCoordTest, SettingAndUnsettingMaintenanceModeShouldNotAffectRollbackS
HostAndPort("test2", 1234));
OperationContextNoop txn;
getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
// From rollback, entering and exiting maintenance mode doesn't change perceived
// state.
@@ -2267,8 +2270,8 @@ TEST_F(ReplCoordTest, DoNotAllowMaintenanceModeWhilePrimary) {
HostAndPort("test2", 1234));
OperationContextNoop txn;
getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
// Can't modify maintenance mode when PRIMARY
simulateSuccessfulV1Election();
@@ -2300,8 +2303,8 @@ TEST_F(ReplCoordTest, DoNotAllowSettingMaintenanceModeWhileConductingAnElection)
HostAndPort("test2", 1234));
OperationContextNoop txn;
getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
// TODO this election shouldn't have to happen.
simulateSuccessfulV1Election();
@@ -2360,8 +2363,8 @@ TEST_F(ReplCoordTest,
HostAndPort("node1", 12345));
OperationContextNoop txn;
- OpTimeWithTermZero time1(100, 1);
- OpTimeWithTermZero time2(100, 2);
+ OpTimeWithTermOne time1(100, 1);
+ OpTimeWithTermOne time2(100, 2);
getReplCoord()->setMyLastAppliedOpTime(time2);
getReplCoord()->setMyLastDurableOpTime(time2);
@@ -2404,8 +2407,8 @@ TEST_F(ReplCoordTest,
HostAndPort("node1", 12345));
OperationContextNoop txn;
- OpTimeWithTermZero time1(100, 1);
- OpTimeWithTermZero time2(100, 2);
+ OpTimeWithTermOne time1(100, 1);
+ OpTimeWithTermOne time2(100, 2);
getReplCoord()->setMyLastAppliedOpTime(time2);
getReplCoord()->setMyLastDurableOpTime(time2);
@@ -2434,8 +2437,8 @@ TEST_F(ReplCoordTest, NodeDoesNotIncludeItselfWhenRunningGetHostsWrittenToInMast
OperationContextNoop txn;
OID client = OID::gen();
- OpTimeWithTermZero time1(100, 1);
- OpTimeWithTermZero time2(100, 2);
+ OpTimeWithTermOne time1(100, 1);
+ OpTimeWithTermOne time2(100, 2);
getExternalState()->setClientHostAndPort(clientHost);
HandshakeArgs handshake;
@@ -2584,12 +2587,12 @@ TEST_F(ReplCoordTest, DoNotProcessSelfWhenUpdatePositionContainsInfoAboutSelf) {
<< "_id" << 2))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
- OpTime time1({100, 1}, 2);
- OpTime time2({100, 2}, 2);
+ OpTime time1({100, 1}, 1);
+ OpTime time2({100, 2}, 1);
getReplCoord()->setMyLastAppliedOpTime(time1);
getReplCoord()->setMyLastDurableOpTime(time1);
@@ -2630,13 +2633,13 @@ TEST_F(ReplCoordTest, DoNotProcessSelfWhenOldUpdatePositionContainsInfoAboutSelf
<< "_id" << 2))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
- OpTimeWithTermZero time1(100, 1);
- OpTimeWithTermZero time2(100, 2);
- OpTimeWithTermZero staleTime(10, 0);
+ OpTimeWithTermOne time1(100, 1);
+ OpTimeWithTermOne time2(100, 2);
+ OpTimeWithTermOne staleTime(10, 0);
getReplCoord()->setMyLastAppliedOpTime(time1);
getReplCoord()->setMyLastDurableOpTime(time1);
@@ -2674,12 +2677,12 @@ TEST_F(ReplCoordTest, DoNotProcessUpdatePositionWhenItsConfigVersionIsIncorrect)
<< "_id" << 2))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
- OpTime time1({100, 1}, 3);
- OpTime time2({100, 2}, 3);
+ OpTime time1({100, 1}, 1);
+ OpTime time2({100, 2}, 1);
getReplCoord()->setMyLastAppliedOpTime(time1);
getReplCoord()->setMyLastDurableOpTime(time1);
@@ -2719,13 +2722,13 @@ TEST_F(ReplCoordTest, DoNotProcessOldUpdatePositionWhenItsConfigVersionIsIncorre
<< "_id" << 2))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
- OpTimeWithTermZero time1(100, 1);
- OpTimeWithTermZero time2(100, 2);
- OpTimeWithTermZero staleTime(10, 0);
+ OpTimeWithTermOne time1(100, 1);
+ OpTimeWithTermOne time2(100, 2);
+ OpTimeWithTermOne staleTime(10, 0);
getReplCoord()->setMyLastAppliedOpTime(time1);
getReplCoord()->setMyLastDurableOpTime(time1);
@@ -2762,12 +2765,12 @@ TEST_F(ReplCoordTest, DoNotProcessUpdatePositionOfMembersWhoseIdsAreNotInTheConf
<< "_id" << 2))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
- OpTime time1({100, 1}, 2);
- OpTime time2({100, 2}, 2);
+ OpTime time1({100, 1}, 1);
+ OpTime time2({100, 2}, 1);
getReplCoord()->setMyLastAppliedOpTime(time1);
getReplCoord()->setMyLastDurableOpTime(time1);
@@ -2805,13 +2808,13 @@ TEST_F(ReplCoordTest, DoNotProcessOldUpdatePositionOfMembersWhoseIdsAreNotInTheC
<< "_id" << 2))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
- OpTimeWithTermZero time1(100, 1);
- OpTimeWithTermZero time2(100, 2);
- OpTimeWithTermZero staleTime(10, 0);
+ OpTimeWithTermOne time1(100, 1);
+ OpTimeWithTermOne time2(100, 2);
+ OpTimeWithTermOne staleTime(10, 0);
getReplCoord()->setMyLastAppliedOpTime(time1);
getReplCoord()->setMyLastDurableOpTime(time1);
@@ -2847,13 +2850,13 @@ TEST_F(ReplCoordTest,
<< "_id" << 2))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
- OpTimeWithTermZero time1(100, 1);
- OpTimeWithTermZero time2(100, 2);
- OpTimeWithTermZero staleTime(10, 0);
+ OpTimeWithTermOne time1(100, 1);
+ OpTimeWithTermOne time2(100, 2);
+ OpTimeWithTermOne staleTime(10, 0);
getReplCoord()->setMyLastAppliedOpTime(time1);
getReplCoord()->setMyLastDurableOpTime(time1);
@@ -2912,11 +2915,11 @@ TEST_F(ReplCoordTest, AwaitReplicationShouldResolveAsNormalDuringAReconfig) {
<< "_id" << 2))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 2));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 2));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 2));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 2));
simulateSuccessfulV1Election();
- OpTimeWithTermZero time(100, 2);
+ OpTimeWithTermOne time(100, 2);
// 3 nodes waiting for time
WriteConcernOptions writeConcern;
@@ -2994,11 +2997,11 @@ TEST_F(
<< "_id" << 2))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 2));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 2));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 2));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 2));
simulateSuccessfulV1Election();
- OpTimeWithTermZero time(100, 2);
+ OpTimeWithTermOne time(100, 2);
// 3 nodes waiting for time
WriteConcernOptions writeConcern;
@@ -3055,8 +3058,8 @@ TEST_F(ReplCoordTest,
<< "_id" << 4))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 1));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 1));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 1));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 1));
simulateSuccessfulV1Election();
OpTime time(Timestamp(100, 2), 1);
@@ -3227,13 +3230,13 @@ TEST_F(ReplCoordTest, NodeReturnsShutdownInProgressWhenWaitingUntilAnOpTimeDurin
<< "_id" << 0))),
HostAndPort("node1", 12345));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(10, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(10, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(10, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(10, 0));
shutdown();
auto result = getReplCoord()->waitUntilOpTime(
- &txn, ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern));
+ &txn, ReadConcernArgs(OpTimeWithTermOne(50, 0), ReadConcernLevel::kLocalReadConcern));
ASSERT_TRUE(result.didWait());
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, result.getStatus());
@@ -3248,13 +3251,13 @@ TEST_F(ReplCoordTest, NodeReturnsInterruptedWhenWaitingUntilAnOpTimeIsInterrupte
<< "_id" << 0))),
HostAndPort("node1", 12345));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(10, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(10, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(10, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(10, 0));
txn.setCheckForInterruptStatus(Status(ErrorCodes::Interrupted, "test"));
auto result = getReplCoord()->waitUntilOpTime(
- &txn, ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern));
+ &txn, ReadConcernArgs(OpTimeWithTermOne(50, 0), ReadConcernLevel::kLocalReadConcern));
ASSERT_TRUE(result.didWait());
ASSERT_EQUALS(ErrorCodes::Interrupted, result.getStatus());
@@ -3284,10 +3287,10 @@ TEST_F(ReplCoordTest, NodeReturnsOkImmediatelyWhenWaitingUntilOpTimePassesAnOpTi
<< "_id" << 0))),
HostAndPort("node1", 12345));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
auto result = getReplCoord()->waitUntilOpTime(
- &txn, ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern));
+ &txn, ReadConcernArgs(OpTimeWithTermOne(50, 0), ReadConcernLevel::kLocalReadConcern));
ASSERT_TRUE(result.didWait());
ASSERT_OK(result.getStatus());
@@ -3303,7 +3306,7 @@ TEST_F(ReplCoordTest, NodeReturnsOkImmediatelyWhenWaitingUntilOpTimePassesAnOpTi
HostAndPort("node1", 12345));
- OpTimeWithTermZero time(100, 0);
+ OpTimeWithTermOne time(100, 0);
getReplCoord()->setMyLastAppliedOpTime(time);
getReplCoord()->setMyLastDurableOpTime(time);
auto result = getReplCoord()->waitUntilOpTime(
@@ -3318,7 +3321,7 @@ TEST_F(ReplCoordTest,
init(ReplSettings());
OperationContextNoop txn;
auto result = getReplCoord()->waitUntilOpTime(
- &txn, ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern));
+ &txn, ReadConcernArgs(OpTimeWithTermOne(50, 0), ReadConcernLevel::kLocalReadConcern));
ASSERT_FALSE(result.didWait());
ASSERT_EQUALS(ErrorCodes::NotAReplicaSet, result.getStatus());
@@ -3505,7 +3508,7 @@ TEST_F(ReplCoordTest, IgnoreTheContentsOfMetadataWhenItsConfigVersionDoesNotMatc
"lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 2) << "lastOpVisible"
<< BSON("ts" << Timestamp(10, 0) << "t" << 2) << "configVersion" << 1
<< "primaryIndex" << 2 << "term" << 2 << "syncSourceIndex" << 1)));
- getReplCoord()->processReplSetMetadata(metadata.getValue());
+ getReplCoord()->processReplSetMetadata(metadata.getValue(), true);
ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime());
// higher configVersion
@@ -3515,7 +3518,7 @@ TEST_F(ReplCoordTest, IgnoreTheContentsOfMetadataWhenItsConfigVersionDoesNotMatc
<< BSON("ts" << Timestamp(10, 0) << "t" << 2) << "lastOpVisible"
<< BSON("ts" << Timestamp(10, 0) << "t" << 2) << "configVersion" << 100
<< "primaryIndex" << 2 << "term" << 2 << "syncSourceIndex" << 1)));
- getReplCoord()->processReplSetMetadata(metadata2.getValue());
+ getReplCoord()->processReplSetMetadata(metadata2.getValue(), true);
ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime());
}
@@ -3550,7 +3553,7 @@ TEST_F(ReplCoordTest, UpdateLastCommittedOpTimeWhenTheLastCommittedOpTimeFromMet
"lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 1) << "lastOpVisible"
<< BSON("ts" << Timestamp(10, 0) << "t" << 1) << "configVersion" << 2
<< "primaryIndex" << 2 << "term" << 1 << "syncSourceIndex" << 1)));
- getReplCoord()->processReplSetMetadata(metadata.getValue());
+ getReplCoord()->processReplSetMetadata(metadata.getValue(), true);
ASSERT_EQUALS(OpTime(Timestamp(10, 0), 1), getReplCoord()->getLastCommittedOpTime());
ASSERT_EQUALS(OpTime(Timestamp(10, 0), 1), getReplCoord()->getCurrentCommittedSnapshotOpTime());
@@ -3560,7 +3563,7 @@ TEST_F(ReplCoordTest, UpdateLastCommittedOpTimeWhenTheLastCommittedOpTimeFromMet
"lastOpCommitted" << BSON("ts" << Timestamp(9, 0) << "t" << 1) << "lastOpVisible"
<< BSON("ts" << Timestamp(9, 0) << "t" << 1) << "configVersion" << 2
<< "primaryIndex" << 2 << "term" << 1 << "syncSourceIndex" << 1)));
- getReplCoord()->processReplSetMetadata(metadata2.getValue());
+ getReplCoord()->processReplSetMetadata(metadata2.getValue(), true);
ASSERT_EQUALS(OpTime(Timestamp(10, 0), 1), getReplCoord()->getLastCommittedOpTime());
}
@@ -3591,7 +3594,7 @@ TEST_F(ReplCoordTest, UpdateTermWhenTheTermFromMetadataIsNewerButNeverUpdateCurr
"lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 3) << "lastOpVisible"
<< BSON("ts" << Timestamp(10, 0) << "t" << 3) << "configVersion" << 2
<< "primaryIndex" << 2 << "term" << 3 << "syncSourceIndex" << 1)));
- getReplCoord()->processReplSetMetadata(metadata.getValue());
+ getReplCoord()->processReplSetMetadata(metadata.getValue(), true);
ASSERT_EQUALS(OpTime(Timestamp(10, 0), 3), getReplCoord()->getLastCommittedOpTime());
ASSERT_EQUALS(3, getReplCoord()->getTerm());
ASSERT_EQUALS(-1, getTopoCoord().getCurrentPrimaryIndex());
@@ -3602,7 +3605,7 @@ TEST_F(ReplCoordTest, UpdateTermWhenTheTermFromMetadataIsNewerButNeverUpdateCurr
"lastOpCommitted" << BSON("ts" << Timestamp(11, 0) << "t" << 3) << "lastOpVisible"
<< BSON("ts" << Timestamp(11, 0) << "t" << 3) << "configVersion" << 2
<< "primaryIndex" << 1 << "term" << 2 << "syncSourceIndex" << 1)));
- getReplCoord()->processReplSetMetadata(metadata2.getValue());
+ getReplCoord()->processReplSetMetadata(metadata2.getValue(), true);
ASSERT_EQUALS(OpTime(Timestamp(11, 0), 3), getReplCoord()->getLastCommittedOpTime());
ASSERT_EQUALS(3, getReplCoord()->getTerm());
ASSERT_EQUALS(-1, getTopoCoord().getCurrentPrimaryIndex());
@@ -3613,14 +3616,14 @@ TEST_F(ReplCoordTest, UpdateTermWhenTheTermFromMetadataIsNewerButNeverUpdateCurr
"lastOpCommitted" << BSON("ts" << Timestamp(11, 0) << "t" << 3) << "lastOpVisible"
<< BSON("ts" << Timestamp(11, 0) << "t" << 3) << "configVersion" << 2
<< "primaryIndex" << 1 << "term" << 3 << "syncSourceIndex" << 1)));
- getReplCoord()->processReplSetMetadata(metadata3.getValue());
+ getReplCoord()->processReplSetMetadata(metadata3.getValue(), true);
ASSERT_EQUALS(OpTime(Timestamp(11, 0), 3), getReplCoord()->getLastCommittedOpTime());
ASSERT_EQUALS(3, getReplCoord()->getTerm());
ASSERT_EQUALS(-1, getTopoCoord().getCurrentPrimaryIndex());
}
TEST_F(ReplCoordTest,
- TermAndLastCommittedOpTimeUpdateWhenHeartbeatResponseWithMetadataHasFresherValues) {
+ LastCommittedOpTimeNotUpdatedEvenWhenHeartbeatResponseWithMetadataHasFresherValues) {
// Ensure that the metadata is processed if it is contained in a heartbeat response.
assertStartSuccess(BSON("_id"
<< "mySet"
@@ -3640,7 +3643,61 @@ TEST_F(ReplCoordTest,
auto replCoord = getReplCoord();
auto config = replCoord->getConfig();
- // Higher term - should update term and lastCommittedOpTime.
+ // Higher term - should update term but not last committed optime.
+ StatusWith<rpc::ReplSetMetadata> metadata = rpc::ReplSetMetadata::readFromMetadata(BSON(
+ rpc::kReplSetMetadataFieldName
+ << BSON("lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 3) << "lastOpVisible"
+ << BSON("ts" << Timestamp(10, 0) << "t" << 3) << "configVersion"
+ << config.getConfigVersion() << "primaryIndex" << 1 << "term" << 3
+ << "syncSourceIndex" << 1)));
+ BSONObjBuilder metadataBuilder;
+ ASSERT_OK(metadata.getValue().writeToMetadata(&metadataBuilder));
+ auto metadataObj = metadataBuilder.obj();
+
+ auto net = getNet();
+ net->enterNetwork();
+
+ ASSERT_TRUE(net->hasReadyRequests());
+ auto noi = net->getNextReadyRequest();
+ const auto& request = noi->getRequest();
+ ASSERT_EQUALS(HostAndPort("node2", 12345), request.target);
+ ASSERT_EQUALS("replSetHeartbeat", request.cmdObj.firstElement().fieldNameStringData());
+
+ ReplSetHeartbeatResponse hbResp;
+ hbResp.setConfigVersion(config.getConfigVersion());
+ hbResp.setSetName(config.getReplSetName());
+ hbResp.setState(MemberState::RS_SECONDARY);
+ net->scheduleResponse(noi, net->now(), makeResponseStatus(hbResp.toBSON(true), metadataObj));
+ net->runReadyNetworkOperations();
+ net->exitNetwork();
+
+ ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime());
+ ASSERT_EQUALS(3, getReplCoord()->getTerm());
+ ASSERT_EQUALS(-1, getTopoCoord().getCurrentPrimaryIndex());
+}
+
+TEST_F(ReplCoordTest, TermAndLastCommittedOpTimeUpdatedFromHeartbeatWhenArbiter) {
+ // Ensure that the metadata is processed if it is contained in a heartbeat response.
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version" << 2 << "members"
+ << BSON_ARRAY(BSON("host"
+ << "node1:12345"
+ << "_id" << 0 << "arbiterOnly" << true)
+ << BSON("host"
+ << "node2:12345"
+ << "_id" << 1)) << "protocolVersion" << 1),
+ HostAndPort("node1", 12345));
+ ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime());
+ OperationContextNoop txn;
+ getReplCoord()->updateTerm(&txn, 1);
+ ASSERT_EQUALS(1, getReplCoord()->getTerm());
+
+ auto replCoord = getReplCoord();
+ auto config = replCoord->getConfig();
+
+ // Higher term - should update term and lastCommittedOpTime since arbiters learn of the
+ // commit point via heartbeats.
StatusWith<rpc::ReplSetMetadata> metadata = rpc::ReplSetMetadata::readFromMetadata(BSON(
rpc::kReplSetMetadataFieldName
<< BSON("lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 3) << "lastOpVisible"
@@ -3836,7 +3893,7 @@ TEST_F(ReplCoordTest,
}
TEST_F(ReplCoordTest,
- CancelAndRescheduleElectionTimeoutWhenProcessingHeartbeatResponseFromPrimary) {
+ RescheduleElectionTimeoutWhenProcessingHeartbeatResponseFromPrimaryInSameTerm) {
assertStartSuccess(BSON("_id"
<< "mySet"
<< "protocolVersion" << 1 << "version" << 2 << "members"
@@ -3868,6 +3925,8 @@ TEST_F(ReplCoordTest,
ReplSetHeartbeatResponse hbResp;
hbResp.setSetName("mySet");
hbResp.setState(MemberState::RS_PRIMARY);
+ hbResp.setTerm(replCoord->getTerm());
+
// Heartbeat response is scheduled with a delay so that we can be sure that
// the election was rescheduled due to the heartbeat response.
auto heartbeatWhen = net->now() + Seconds(1);
@@ -3882,6 +3941,54 @@ TEST_F(ReplCoordTest,
}
TEST_F(ReplCoordTest,
+ DontRescheduleElectionTimeoutWhenProcessingHeartbeatResponseFromPrimaryInDiffertTerm) {
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "protocolVersion" << 1 << "version" << 2 << "members"
+ << BSON_ARRAY(BSON("host"
+ << "node1:12345"
+ << "_id" << 0)
+ << BSON("host"
+ << "node2:12345"
+ << "_id" << 1))),
+ HostAndPort("node1", 12345));
+
+ ReplicationCoordinatorImpl* replCoord = getReplCoord();
+ ASSERT_TRUE(replCoord->setFollowerMode(MemberState::RS_SECONDARY));
+
+ auto electionTimeoutWhen = replCoord->getElectionTimeout_forTest();
+ ASSERT_NOT_EQUALS(Date_t(), electionTimeoutWhen);
+
+ auto net = getNet();
+ net->enterNetwork();
+ ASSERT_TRUE(net->hasReadyRequests());
+ auto noi = net->getNextReadyRequest();
+ auto&& request = noi->getRequest();
+ log() << "processing " << request.cmdObj;
+ ASSERT_EQUALS(HostAndPort("node2", 12345), request.target);
+
+ ASSERT_EQUALS("replSetHeartbeat", request.cmdObj.firstElement().fieldNameStringData());
+
+ // Respond to node1's heartbeat command to indicate that node2 is PRIMARY.
+ ReplSetHeartbeatResponse hbResp;
+ hbResp.setSetName("mySet");
+ hbResp.setState(MemberState::RS_PRIMARY);
+ hbResp.setTerm(replCoord->getTerm() - 1);
+
+ // Heartbeat response is scheduled with a delay so that we can be sure that
+ // the election was rescheduled due to the heartbeat response.
+ auto heartbeatWhen = net->now() + Seconds(1);
+ net->scheduleResponse(noi, heartbeatWhen, makeResponseStatus(hbResp.toBSON(true)));
+ net->runUntil(heartbeatWhen);
+ ASSERT_EQUALS(heartbeatWhen, net->now());
+ net->runReadyNetworkOperations();
+ net->exitNetwork();
+
+ ASSERT_GREATER_THAN(heartbeatWhen + replCoord->getConfig().getElectionTimeoutPeriod(),
+ replCoord->getElectionTimeout_forTest());
+}
+
+TEST_F(ReplCoordTest,
CancelAndRescheduleElectionTimeoutWhenProcessingHeartbeatResponseWithoutState) {
assertStartSuccess(BSON("_id"
<< "mySet"
@@ -4241,7 +4348,7 @@ TEST_F(ReplCoordTest, NewStyleUpdatePositionCmdHasMetadata) {
// Set last committed optime via metadata.
rpc::ReplSetMetadata syncSourceMetadata(optime.getTerm(), optime, optime, 1, OID(), -1, 1);
- getReplCoord()->processReplSetMetadata(syncSourceMetadata);
+ getReplCoord()->processReplSetMetadata(syncSourceMetadata, true);
getReplCoord()->onSnapshotCreate(optime, SnapshotName(1));
BSONObjBuilder cmdBuilder;
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index cb763bed4ff..df4a83c6f07 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -236,7 +236,8 @@ void ReplicationCoordinatorMock::processReplSetGetConfig(BSONObjBuilder* result)
// TODO
}
-void ReplicationCoordinatorMock::processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) {}
+void ReplicationCoordinatorMock::processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata,
+ bool advanceCommitPoint) {}
void ReplicationCoordinatorMock::cancelAndRescheduleElectionTimeout() {}
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index 861799f00d2..24eba706cfe 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -150,7 +150,8 @@ public:
virtual void processReplSetGetConfig(BSONObjBuilder* result);
- virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata);
+ void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata,
+ bool advanceCommitPoint) override;
virtual void cancelAndRescheduleElectionTimeout() override;
diff --git a/src/mongo/db/repl/replset_commands.cpp b/src/mongo/db/repl/replset_commands.cpp
index e6d90aa82ec..0c202e906a1 100644
--- a/src/mongo/db/repl/replset_commands.cpp
+++ b/src/mongo/db/repl/replset_commands.cpp
@@ -681,7 +681,7 @@ public:
// New style update position command has metadata, which may inform the
// upstream of a higher term.
auto metadata = metadataResult.getValue();
- replCoord->processReplSetMetadata(metadata);
+ replCoord->processReplSetMetadata(metadata, false /*don't advance the commit point*/);
}
// In the case of an update from a member with an invalid replica set config,
diff --git a/src/mongo/db/write_concern.cpp b/src/mongo/db/write_concern.cpp
index 3c9086ca39a..9dc04670dc4 100644
--- a/src/mongo/db/write_concern.cpp
+++ b/src/mongo/db/write_concern.cpp
@@ -45,6 +45,7 @@
#include "mongo/db/storage/storage_engine.h"
#include "mongo/db/write_concern_options.h"
#include "mongo/rpc/protocol.h"
+#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -72,6 +73,8 @@ namespace {
const std::string kLocalDB = "local";
} // namespace
+MONGO_FP_DECLARE(hangBeforeWaitingForWriteConcern);
+
StatusWith<WriteConcernOptions> extractWriteConcern(OperationContext* txn,
const BSONObj& cmdObj,
const std::string& dbName) {
@@ -234,6 +237,8 @@ Status waitForWriteConcern(OperationContext* txn,
// This check does not hold for writes done through dbeval because it runs with a global X lock.
dassert(!txn->lockState()->isLocked() || txn->getClient()->isInDirectClient());
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangBeforeWaitingForWriteConcern);
+
// Next handle blocking on disk
Timer syncTimer;
diff --git a/src/mongo/shell/assert.js b/src/mongo/shell/assert.js
index 0d1e225a990..ba9333fb9cf 100644
--- a/src/mongo/shell/assert.js
+++ b/src/mongo/shell/assert.js
@@ -452,16 +452,33 @@ assert.writeOK = function(res, msg) {
};
assert.writeError = function(res, msg) {
+ return assert.writeErrorWithCode(res, null, msg);
+};
+
+assert.writeErrorWithCode = function(res, expectedCode, msg) {
var errMsg = null;
+ var foundCode = null;
if (res instanceof WriteResult) {
- if (!res.hasWriteError() && !res.hasWriteConcernError()) {
+ if (res.hasWriteError()) {
+ foundCode = res.getWriteError().code;
+ } else if (res.hasWriteConcernError()) {
+ foundCode = res.getWriteConcernError().code;
+ } else {
errMsg = "no write error: " + tojson(res);
}
} else if (res instanceof BulkWriteResult) {
// Can only happen with bulk inserts
- if (!res.hasWriteErrors() && !res.hasWriteConcernError()) {
+ if (res.hasWriteErrors()) {
+ if (res.getWriteErrorCount() > 1 && expectedCode != null) {
+ errMsg = "can't check for specific code when there was more than one write error";
+ } else {
+ foundCode = res.getWriteErrorAt(0).code;
+ }
+ } else if (res.hasWriteConcernError()) {
+ foundCode = res.getWriteConcernError().code;
+ } else {
errMsg = "no write errors: " + tojson(res);
}
} else if (res instanceof WriteCommandError) {
@@ -473,6 +490,12 @@ assert.writeError = function(res, msg) {
}
}
+ if (!errMsg && expectedCode) {
+ if (foundCode != expectedCode) {
+ errMsg = "found code " + foundCode + " does not match expected code " + expectedCode;
+ }
+ }
+
if (errMsg) {
if (msg)
errMsg = errMsg + ": " + msg;