diff options
author | Benety Goh <benety@mongodb.com> | 2017-01-24 14:06:46 -0500 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2017-01-24 17:09:46 -0500 |
commit | 4a6efad4d422b9a06ff0b7e98bfc9b7cc63b5864 (patch) | |
tree | 4db363699df81580a546eb41b3e2bc43e431c177 /src | |
parent | 76af3d246d482d62520b386e5c1f0b777c367fc6 (diff) | |
download | mongo-4a6efad4d422b9a06ff0b7e98bfc9b7cc63b5864.tar.gz |
SERVER-27123 Only update the commit point as a secondary from oplog queries against your sync source
(cherry picked from commit 87f49488f1b5c872daa71fd2fd9b5d744409a817)
SERVER-27680 Merge stopOplogFetcher and pauseRsBgSyncProducer failpoint into single stopReplProducer failpoint
(cherry picked from commit 21948042b6da5fb5bf15897f9808a70551f5af09)
SERVER-27053 Don't acknowledge writes if the term has changed.
(cherry picked from commit 8347e322cd46e8ee847e1730a7e94ea8e3981c53)
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 79 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator.h | 13 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 60 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.h | 28 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp | 49 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_test.cpp | 367 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_mock.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_mock.h | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/replset_commands.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/write_concern.cpp | 5 | ||||
-rw-r--r-- | src/mongo/shell/assert.js | 27 |
12 files changed, 444 insertions, 194 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 37ba22bf1a7..75a91db536d 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -117,7 +117,7 @@ Status checkRemoteOplogStart(stdx::function<StatusWith<BSONObj>()> getNextOperat } // namespace -MONGO_FP_DECLARE(rsBgSyncProduce); +MONGO_FP_DECLARE(stopReplProducer); BackgroundSync* BackgroundSync::s_instance = 0; stdx::mutex BackgroundSync::s_mutex; @@ -133,6 +133,9 @@ static ServerStatusMetricField<Counter64> displayOpsRead("repl.network.ops", &op static Counter64 networkByteStats; static ServerStatusMetricField<Counter64> displayBytesRead("repl.network.bytes", &networkByteStats); +// Failpoint which causes rollback to hang before starting. +MONGO_FP_DECLARE(rollbackHangBeforeStart); + // The count of items in the buffer static Counter64 bufferCountGauge; static ServerStatusMetricField<Counter64> displayBufferCount("repl.buffer.count", @@ -262,6 +265,21 @@ void BackgroundSync::_producerThread() { } void BackgroundSync::_produce(OperationContext* txn) { + if (MONGO_FAIL_POINT(stopReplProducer)) { + // This log output is used in js tests so please leave it. + log() << "bgsync - stopReplProducer fail point " + "enabled. Blocking until fail point is disabled."; + + // TODO(SERVER-27120): Remove the return statement and uncomment the while loop. + // Currently we cannot block here or we prevent primaries from being fully elected since + // we'll never call _signalNoNewDataForApplier. + // while (MONGO_FAIL_POINT(stopReplProducer) && !inShutdown()) { + // mongo::sleepsecs(1); + // } + mongo::sleepsecs(1); + return; + } + // this oplog reader does not do a handshake because we don't want the server it's syncing // from to track how far it has synced { @@ -280,10 +298,6 @@ void BackgroundSync::_produce(OperationContext* txn) { } } - while (MONGO_FAIL_POINT(rsBgSyncProduce)) { - sleepmillis(0); - } - // find a target to sync from the last optime fetched OpTime lastOpTimeFetched; @@ -478,26 +492,6 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& bool syncSourceHasSyncSource = false; OpTime sourcesLastOp; - // Forward metadata (containing liveness information) to replication coordinator. - bool receivedMetadata = - queryResponse.otherFields.metadata.hasElement(rpc::kReplSetMetadataFieldName); - if (receivedMetadata) { - auto metadataResult = - rpc::ReplSetMetadata::readFromMetadata(queryResponse.otherFields.metadata); - if (!metadataResult.isOK()) { - error() << "invalid replication metadata from sync source " << source << ": " - << metadataResult.getStatus() << ": " << queryResponse.otherFields.metadata; - return; - } - const auto& metadata = metadataResult.getValue(); - _replCoord->processReplSetMetadata(metadata); - if (metadata.getPrimaryIndex() != rpc::ReplSetMetadata::kNoPrimary) { - _replCoord->cancelAndRescheduleElectionTimeout(); - } - syncSourceHasSyncSource = metadata.getSyncSourceIndex() != -1; - sourcesLastOp = metadata.getLastOpVisible(); - } - const auto& documents = queryResponse.documents; auto firstDocToApply = documents.cbegin(); auto lastDocToApply = documents.cend(); @@ -576,6 +570,32 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& return; } + if (MONGO_FAIL_POINT(stopReplProducer)) { + return; + } + + // Process replset metadata. It is important that this happen after we've validated the + // first batch, so we don't progress our knowledge of the commit point from a + // response that triggers a rollback. + bool receivedMetadata = + queryResponse.otherFields.metadata.hasElement(rpc::kReplSetMetadataFieldName); + if (receivedMetadata) { + auto metadataResult = + rpc::ReplSetMetadata::readFromMetadata(queryResponse.otherFields.metadata); + if (!metadataResult.isOK()) { + error() << "invalid replication metadata from sync source " << source << ": " + << metadataResult.getStatus() << ": " << queryResponse.otherFields.metadata; + return; + } + const auto& metadata = metadataResult.getValue(); + _replCoord->processReplSetMetadata(metadata, true /*advance commit point*/); + if (metadata.getPrimaryIndex() != rpc::ReplSetMetadata::kNoPrimary) { + _replCoord->cancelAndRescheduleElectionTimeout(); + } + syncSourceHasSyncSource = metadata.getSyncSourceIndex() != -1; + sourcesLastOp = metadata.getLastOpVisible(); + } + // The count of the bytes of the documents read off the network. int networkDocumentBytes = 0; Timestamp lastTS; @@ -741,6 +761,15 @@ void BackgroundSync::_rollback(OperationContext* txn, const HostAndPort& source, boost::optional<int> requiredRBID, stdx::function<DBClientBase*()> getConnection) { + if (MONGO_FAIL_POINT(rollbackHangBeforeStart)) { + // This log output is used in js tests so please leave it. + log() << "rollback - rollbackHangBeforeStart fail point " + "enabled. Blocking until fail point is disabled."; + while (MONGO_FAIL_POINT(rollbackHangBeforeStart) && !inShutdown()) { + mongo::sleepsecs(1); + } + } + // Set state to ROLLBACK while we are in this function. This prevents serving reads, even from // the oplog. This can fail if we are elected PRIMARY, in which case we better not do any // rolling back. If we successfully enter ROLLBACK we will only exit this function fatally or diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index 18ccf45a224..57a2c426817 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -466,14 +466,13 @@ public: virtual void processReplSetGetConfig(BSONObjBuilder* result) = 0; /** - * Processes the ReplSetMetadata returned from a command run against another replica set - * member and updates protocol version 1 information (most recent optime that is committed, - * member id of the current PRIMARY, the current config version and the current term). - * - * TODO(dannenberg): Move this method to be testing only if it does not end up being used - * to process the find and getmore metadata responses from the DataReplicator. + * Processes the ReplSetMetadata returned from a command run against another + * replica set member and so long as the config version in the metadata matches the replica set + * config version this node currently has, updates the current term and optionally updates + * this node's notion of the commit point. */ - virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) = 0; + virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata, + bool advanceCommitPoint) = 0; /** * Elections under protocol version 1 are triggered by a timer. diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 7c30cd983dd..67102a30033 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -1445,10 +1445,38 @@ ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::_awaitRepl return StatusAndDuration(Status::OK(), Milliseconds(timer->millis())); } - if (replMode == modeReplSet && !_memberState.primary()) { - return StatusAndDuration( - Status(ErrorCodes::NotMaster, "Not master while waiting for replication"), - Milliseconds(timer->millis())); + auto checkForStepDown = [&]() -> Status { + if (replMode == modeReplSet && !_memberState.primary()) { + return {ErrorCodes::NotMaster, "Primary stepped down while waiting for replication"}; + } + + // Relax term checking under 3.2 because some commands (eg. createIndexes) might not return + // a term in the response metadata to mongos which may pass the no-term OpTime back to + // mongod eventually. + if (opTime.getTerm() != OpTime::kUninitializedTerm && + _cachedTerm != OpTime::kUninitializedTerm && opTime.getTerm() != _cachedTerm) { + return { + ErrorCodes::NotMaster, + str::stream() << "Term changed from " << opTime.getTerm() << " to " << _cachedTerm + << " while waiting for replication, indicating that this node must " + "have stepped down."}; + } + + if (_stepDownPending) { + return {ErrorCodes::NotMaster, + "Received stepdown request while waiting for replication"}; + } + return Status::OK(); + }; + + Status stepdownStatus = checkForStepDown(); + if (!stepdownStatus.isOK()) { + return StatusAndDuration(stepdownStatus, Milliseconds(timer->millis())); + } + + auto interruptStatus = txn->checkForInterruptNoAssert(); + if (!interruptStatus.isOK()) { + return StatusAndDuration(interruptStatus, Milliseconds(timer->millis())); } if (writeConcern.wMode.empty()) { @@ -1470,6 +1498,7 @@ ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::_awaitRepl return StatusAndDuration(interruptedStatus, elapsed); } + if (!waitInfo.master) { return StatusAndDuration(Status(ErrorCodes::NotMaster, "Not master anymore while waiting for replication" @@ -1506,6 +1535,11 @@ ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::_awaitRepl } else { condVar.wait_for(*lock, waitTime); } + + stepdownStatus = checkForStepDown(); + if (!stepdownStatus.isOK()) { + return StatusAndDuration(stepdownStatus, elapsed); + } } Status status = _checkIfWriteConcernCanBeSatisfied_inlock(writeConcern); @@ -2001,10 +2035,11 @@ void ReplicationCoordinatorImpl::processReplSetGetConfig(BSONObjBuilder* result) result->append("config", _rsConfig.toBSON()); } -void ReplicationCoordinatorImpl::processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) { +void ReplicationCoordinatorImpl::processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata, + bool advanceCommitPoint) { EventHandle evh; - _scheduleWorkAndWaitForCompletion([this, &evh, &replMetadata](const CallbackArgs& args) { - evh = _processReplSetMetadata_incallback(replMetadata); + _scheduleWorkAndWaitForCompletion([&](const CallbackArgs& args) { + evh = _processReplSetMetadata_incallback(replMetadata, advanceCommitPoint); }); if (evh.isValid()) { _replExecutor.waitForEvent(evh); @@ -2017,11 +2052,13 @@ void ReplicationCoordinatorImpl::cancelAndRescheduleElectionTimeout() { } EventHandle ReplicationCoordinatorImpl::_processReplSetMetadata_incallback( - const rpc::ReplSetMetadata& replMetadata) { + const rpc::ReplSetMetadata& replMetadata, bool advanceCommitPoint) { if (replMetadata.getConfigVersion() != _rsConfig.getConfigVersion()) { return EventHandle(); } - _setLastCommittedOpTime(replMetadata.getLastOpCommitted()); + if (advanceCommitPoint) { + _setLastCommittedOpTime(replMetadata.getLastOpCommitted()); + } return _updateTerm_incallback(replMetadata.getTerm()); } @@ -2527,6 +2564,7 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock() { info->condVar->notify_all(); } _canAcceptNonLocalWrites = false; + _stepDownPending = false; result = kActionCloseAllConnections; } else { result = kActionFollowerModeStateChange; @@ -3103,7 +3141,7 @@ bool ReplicationCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& curre } void ReplicationCoordinatorImpl::_updateLastCommittedOpTime_inlock() { - if (!_getMemberState_inlock().primary()) { + if (!_getMemberState_inlock().primary() || _stepDownPending) { return; } @@ -3509,7 +3547,7 @@ EventHandle ReplicationCoordinatorImpl::_updateTerm_incallback( if (localUpdateTermResult == TopologyCoordinator::UpdateTermResult::kTriggerStepDown) { log() << "stepping down from primary, because a new term has begun: " << term; _topCoord->prepareForStepDown(); - return _stepDownStart(); + return _stepDownStart(false); } return EventHandle(); } diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 3c6bbaefdbb..e2b2bea1298 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -217,7 +217,8 @@ public: virtual void processReplSetGetConfig(BSONObjBuilder* result) override; - virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) override; + virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata, + bool advanceCommitPoint) override; virtual void cancelAndRescheduleElectionTimeout() override; @@ -1024,7 +1025,10 @@ private: */ void _requestRemotePrimaryStepdown(const HostAndPort& target); - ReplicationExecutor::EventHandle _stepDownStart(); + /** + * Schedules stepdown to run with the global exclusive lock. + */ + ReplicationExecutor::EventHandle _stepDownStart(bool hasMutex); /** * Completes a step-down of the current node. Must be run with a global @@ -1063,9 +1067,11 @@ private: * Utility method that schedules or performs actions specified by a HeartbeatResponseAction * returned by a TopologyCoordinator::processHeartbeatResponse(V1) call with the given * value of "responseStatus". + * 'hasMutex' is true if the caller is holding _mutex. TODO(SERVER-27083): Remove this. */ void _handleHeartbeatResponseAction(const HeartbeatResponseAction& action, - const StatusWith<ReplSetHeartbeatResponse>& responseStatus); + const StatusWith<ReplSetHeartbeatResponse>& responseStatus, + bool hasMutex); /** * Bottom half of processHeartbeat(), which runs in the replication executor. @@ -1115,11 +1121,13 @@ private: /** * Callback that processes the ReplSetMetadata returned from a command run against another - * replica set member and updates protocol version 1 information (most recent optime that is - * committed, member id of the current PRIMARY, the current config version and the current term) + * replica set member and so long as the config version in the metadata matches the replica set + * config version this node currently has, updates the current term and optionally updates + * this node's notion of the commit point. * Returns the finish event which is invalid if the process has already finished. */ - EventHandle _processReplSetMetadata_incallback(const rpc::ReplSetMetadata& replMetadata); + EventHandle _processReplSetMetadata_incallback(const rpc::ReplSetMetadata& replMetadata, + bool advanceCommitPoint); /** * Blesses a snapshot to be used for new committed reads. @@ -1281,6 +1289,14 @@ private: // TODO: ideally this should only change on rollbacks NOT on mongod restarts also. int _rbid; // (M) + // Indicates that we've received a request to stepdown from PRIMARY (likely via a heartbeat) + // TODO(SERVER-27083): This bool is redundant of the same-named bool in TopologyCoordinatorImpl, + // but due to mutex ordering between _mutex and _topoMutex we can't inspect the + // TopologyCoordinator field in awaitReplication() where this bool is used. Once we get rid + // of topoMutex and start guarding access to the TopologyCoordinator via _mutex we should + // consolidate the two bools. + bool _stepDownPending = false; // (M) + // list of information about clients waiting on replication. Does *not* own the WaiterInfos. std::vector<WaiterInfo*> _replicationWaiterList; // (M) diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index 4c23bab4f5b..f89f1592170 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -64,6 +64,8 @@ typedef ReplicationExecutor::CallbackHandle CBHandle; using executor::RemoteCommandRequest; +MONGO_FP_DECLARE(blockHeartbeatStepdown); + void ReplicationCoordinatorImpl::_doMemberHeartbeat(ReplicationExecutor::CallbackArgs cbData, const HostAndPort& target, int targetIndex) { @@ -152,7 +154,9 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse( if (replMetadata.isOK()) { // Asynchronous stepdown could happen, but it will be queued in executor after // this function, so we cannot and don't need to wait for it to finish. - _processReplSetMetadata_incallback(replMetadata.getValue()); + // Arbiters are the only nodes allowed to advance their commit point via heartbeats. + bool advanceCommitPoint = getMemberState().arbiter(); + _processReplSetMetadata_incallback(replMetadata.getValue(), advanceCommitPoint); } } const Date_t now = _replExecutor.now(); @@ -164,10 +168,11 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse( networkTime = cbData.response.getValue().elapsedMillis; // TODO(sz) Because the term is duplicated in ReplSetMetaData, we can get rid of this // and update tests. - _updateTerm_incallback(hbStatusResponse.getValue().getTerm()); - // Postpone election timeout if we have a successful heartbeat response from the primary. const auto& hbResponse = hbStatusResponse.getValue(); - if (hbResponse.hasState() && hbResponse.getState().primary()) { + _updateTerm_incallback(hbResponse.getTerm()); + // Postpone election timeout if we have a successful heartbeat response from the primary. + if (hbResponse.hasState() && hbResponse.getState().primary() && + hbResponse.getTerm() == _topCoord->getTerm()) { cancelAndRescheduleElectionTimeout(); } } else { @@ -206,7 +211,7 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse( _scheduleHeartbeatToTarget( target, targetIndex, std::max(now, action.getNextHeartbeatStartDate())); - _handleHeartbeatResponseAction(action, hbStatusResponse); + _handleHeartbeatResponseAction(action, hbStatusResponse, false /*we're not holding _mutex*/); } void ReplicationCoordinatorImpl::_updateOpTimesFromHeartbeat_inlock(int targetIndex, @@ -226,11 +231,13 @@ void ReplicationCoordinatorImpl::_updateOpTimesFromHeartbeat_inlock(int targetIn void ReplicationCoordinatorImpl::_handleHeartbeatResponseAction( const HeartbeatResponseAction& action, - const StatusWith<ReplSetHeartbeatResponse>& responseStatus) { + const StatusWith<ReplSetHeartbeatResponse>& responseStatus, + bool hasMutex) { switch (action.getAction()) { case HeartbeatResponseAction::NoAction: // Update the cached member state if different than the current topology member state if (_memberState != _topCoord->getMemberState()) { + invariant(!hasMutex); stdx::unique_lock<stdx::mutex> lk(_mutex); const PostMemberStateUpdateAction postUpdateAction = _updateMemberStateFromTopologyCoordinator_inlock(); @@ -250,7 +257,7 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponseAction( log() << "Stepping down from primary in response to heartbeat"; _topCoord->prepareForStepDown(); // Don't need to wait for stepdown to finish. - _stepDownStart(); + _stepDownStart(hasMutex); break; case HeartbeatResponseAction::StepDownRemotePrimary: { invariant(action.getPrimaryConfigIndex() != _selfIndex); @@ -304,11 +311,19 @@ void ReplicationCoordinatorImpl::_requestRemotePrimaryStepdown(const HostAndPort } } -ReplicationExecutor::EventHandle ReplicationCoordinatorImpl::_stepDownStart() { +ReplicationExecutor::EventHandle ReplicationCoordinatorImpl::_stepDownStart(bool hasMutex) { + { + boost::optional<stdx::lock_guard<stdx::mutex>> lk; + if (!hasMutex) { + lk.emplace(_mutex); + } + _stepDownPending = true; + } auto finishEvent = _makeEvent(); if (!finishEvent) { return finishEvent; } + _replExecutor.scheduleWorkWithGlobalExclusiveLock(stdx::bind( &ReplicationCoordinatorImpl::_stepDownFinish, this, stdx::placeholders::_1, finishEvent)); return finishEvent; @@ -321,6 +336,20 @@ void ReplicationCoordinatorImpl::_stepDownFinish( return; } invariant(cbData.txn); + + if (MONGO_FAIL_POINT(blockHeartbeatStepdown)) { + // Must reschedule rather than block so we don't take up threads in the replication + // executor. + sleepmillis(10); + _replExecutor.scheduleWorkWithGlobalExclusiveLock( + stdx::bind(&ReplicationCoordinatorImpl::_stepDownFinish, + this, + stdx::placeholders::_1, + finishedEvent)); + + return; + } + // TODO Add invariant that we've got global shared or global exclusive lock, when supported // by lock manager. stdx::unique_lock<stdx::mutex> lk(_mutex); @@ -623,7 +652,9 @@ void ReplicationCoordinatorImpl::_handleLivenessTimeout( _topCoord->setMemberAsDown(now, memberIndex, _getMyLastDurableOpTime_inlock()); // Don't mind potential asynchronous stepdown as this is the last step of // liveness check. - _handleHeartbeatResponseAction(action, makeStatusWith<ReplSetHeartbeatResponse>()); + _handleHeartbeatResponseAction(action, + makeStatusWith<ReplSetHeartbeatResponse>(), + true /*we're holding _mutex*/); } } } diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp index 2233e21cc21..40a30c48e5a 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp @@ -351,7 +351,7 @@ TEST_F(ReplCoordHBV1Test, ArbiterRecordsCommittedOpTimeFromHeartbeatMetadata) { << 1 << "primaryIndex" << 1 << "term" << committedOpTime.getTerm() << "syncSourceIndex" << 1))); ASSERT_OK(metadata.getStatus()); - getReplCoord()->processReplSetMetadata(metadata.getValue()); + getReplCoord()->processReplSetMetadata(metadata.getValue(), true); ASSERT_EQ(getReplCoord()->getMyLastAppliedOpTime().getTimestamp(), expected.getTimestamp()); }; diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index 9dfd6f11045..2a52f004700 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -44,6 +44,7 @@ #include "mongo/db/repl/optime.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/repl/read_concern_response.h" +#include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/repl_set_heartbeat_args.h" #include "mongo/db/repl/repl_set_heartbeat_args_v1.h" #include "mongo/db/repl/repl_set_request_votes_args.h" @@ -81,15 +82,15 @@ using executor::RemoteCommandResponse; typedef ReplicationCoordinator::ReplSetReconfigArgs ReplSetReconfigArgs; Status kInterruptedStatus(ErrorCodes::Interrupted, "operation was interrupted"); -// Helper class to wrap Timestamp as an OpTime with term 0. -struct OpTimeWithTermZero { - OpTimeWithTermZero(unsigned int sec, unsigned int i) : timestamp(sec, i) {} +// Helper class to wrap Timestamp as an OpTime with term 1. +struct OpTimeWithTermOne { + OpTimeWithTermOne(unsigned int sec, unsigned int i) : timestamp(sec, i) {} operator OpTime() const { - return OpTime(timestamp, 0); + return OpTime(timestamp, 1); } operator boost::optional<OpTime>() const { - return OpTime(timestamp, 0); + return OpTime(timestamp, 1); } OpTime asOpTime() const { @@ -674,7 +675,7 @@ TEST_F(ReplCoordTest, RollBackIDShouldIncreaseByOneWhenIncrementRollbackIDIsCall TEST_F(ReplCoordTest, NodeReturnsImmediatelyWhenAwaitReplicationIsRanAgainstAStandaloneNode) { init(""); OperationContextNoop txn; - OpTimeWithTermZero time(100, 1); + OpTimeWithTermOne time(100, 1); WriteConcernOptions writeConcern; writeConcern.wTimeout = WriteConcernOptions::kNoWaiting; @@ -692,7 +693,7 @@ TEST_F(ReplCoordTest, NodeReturnsImmediatelyWhenAwaitReplicationIsRanAgainstAMas settings.setMaster(true); init(settings); OperationContextNoop txn; - OpTimeWithTermZero time(100, 1); + OpTimeWithTermOne time(100, 1); WriteConcernOptions writeConcern; writeConcern.wTimeout = WriteConcernOptions::kNoWaiting; @@ -719,7 +720,7 @@ TEST_F(ReplCoordTest, NodeReturnsNotMasterWhenRunningAwaitReplicationAgainstASec HostAndPort("node1", 12345)); OperationContextNoop txn; - OpTimeWithTermZero time(100, 1); + OpTimeWithTermOne time(100, 1); WriteConcernOptions writeConcern; writeConcern.wTimeout = WriteConcernOptions::kNoWaiting; @@ -732,7 +733,7 @@ TEST_F(ReplCoordTest, NodeReturnsNotMasterWhenRunningAwaitReplicationAgainstASec ASSERT_EQUALS(ErrorCodes::NotMaster, statusAndDur.status); } -TEST_F(ReplCoordTest, NodeReturnsOkWhenRunningAwaitReplicationAgainstPrimaryWithWZero) { +TEST_F(ReplCoordTest, NodeReturnsOkWhenRunningAwaitReplicationAgainstPrimaryWithWTermOne) { assertStartSuccess(BSON("_id" << "mySet" << "version" << 2 << "members" @@ -747,7 +748,7 @@ TEST_F(ReplCoordTest, NodeReturnsOkWhenRunningAwaitReplicationAgainstPrimaryWith HostAndPort("node1", 12345)); OperationContextNoop txn; - OpTimeWithTermZero time(100, 1); + OpTimeWithTermOne time(100, 1); WriteConcernOptions writeConcern; writeConcern.wTimeout = WriteConcernOptions::kNoWaiting; @@ -756,8 +757,8 @@ TEST_F(ReplCoordTest, NodeReturnsOkWhenRunningAwaitReplicationAgainstPrimaryWith // Become primary. ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); simulateSuccessfulV1Election(); ASSERT(getReplCoord()->getMemberState().primary()); @@ -787,12 +788,12 @@ TEST_F(ReplCoordTest, << "_id" << 3))), HostAndPort("node1", 12345)); ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); simulateSuccessfulV1Election(); - OpTimeWithTermZero time1(100, 1); - OpTimeWithTermZero time2(100, 2); + OpTimeWithTermOne time1(100, 1); + OpTimeWithTermOne time2(100, 2); WriteConcernOptions writeConcern; writeConcern.wTimeout = WriteConcernOptions::kNoWaiting; @@ -860,12 +861,12 @@ TEST_F(ReplCoordTest, NodeReturnsWriteConcernFailedUntilASufficientNumberOfNodes << "_id" << 3))), HostAndPort("node1", 12345)); ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); simulateSuccessfulV1Election(); - OpTimeWithTermZero time1(100, 1); - OpTimeWithTermZero time2(100, 2); + OpTimeWithTermOne time1(100, 1); + OpTimeWithTermOne time2(100, 2); WriteConcernOptions writeConcern; writeConcern.wTimeout = WriteConcernOptions::kNoWaiting; @@ -1056,6 +1057,8 @@ TEST_F( // another name if we didn't get a high enough one. } + auto zeroOpTimeInCurrentTerm = OpTime(Timestamp(0, 0), 1); + ReplClientInfo::forClient(txn.getClient()).setLastOp(zeroOpTimeInCurrentTerm); statusAndDur = getReplCoord()->awaitReplicationOfLastOpForClient(&txn, majorityWriteConcern); ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status); statusAndDur = getReplCoord()->awaitReplicationOfLastOpForClient(&txn, multiDCWriteConcern); @@ -1158,14 +1161,14 @@ TEST_F(ReplCoordTest, NodeReturnsOkWhenAWriteConcernWithNoTimeoutHasBeenSatisfie << "_id" << 2))), HostAndPort("node1", 12345)); ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); simulateSuccessfulV1Election(); ReplicationAwaiter awaiter(getReplCoord(), &txn); - OpTimeWithTermZero time1(100, 1); - OpTimeWithTermZero time2(100, 2); + OpTimeWithTermOne time1(100, 1); + OpTimeWithTermOne time2(100, 2); WriteConcernOptions writeConcern; writeConcern.wTimeout = WriteConcernOptions::kNoTimeout; @@ -1217,14 +1220,14 @@ TEST_F(ReplCoordTest, NodeReturnsWriteConcernFailedWhenAWriteConcernTimesOutBefo << "_id" << 2))), HostAndPort("node1", 12345)); ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); simulateSuccessfulV1Election(); ReplicationAwaiter awaiter(getReplCoord(), &txn); - OpTimeWithTermZero time1(100, 1); - OpTimeWithTermZero time2(100, 2); + OpTimeWithTermOne time1(100, 1); + OpTimeWithTermOne time2(100, 2); WriteConcernOptions writeConcern; writeConcern.wTimeout = 50; @@ -1258,14 +1261,14 @@ TEST_F(ReplCoordTest, << "_id" << 2))), HostAndPort("node1", 12345)); ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); simulateSuccessfulV1Election(); ReplicationAwaiter awaiter(getReplCoord(), &txn); - OpTimeWithTermZero time1(100, 1); - OpTimeWithTermZero time2(100, 2); + OpTimeWithTermOne time1(100, 1); + OpTimeWithTermOne time2(100, 2); WriteConcernOptions writeConcern; writeConcern.wTimeout = WriteConcernOptions::kNoTimeout; @@ -1300,14 +1303,14 @@ TEST_F(ReplCoordTest, NodeReturnsNotMasterWhenSteppingDownBeforeSatisfyingAWrite << "_id" << 2))), HostAndPort("node1", 12345)); ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); simulateSuccessfulV1Election(); ReplicationAwaiter awaiter(getReplCoord(), &txn); - OpTimeWithTermZero time1(100, 1); - OpTimeWithTermZero time2(100, 2); + OpTimeWithTermOne time1(100, 1); + OpTimeWithTermOne time2(100, 2); WriteConcernOptions writeConcern; writeConcern.wTimeout = WriteConcernOptions::kNoTimeout; @@ -1340,14 +1343,14 @@ TEST_F(ReplCoordTest, << "node3"))), HostAndPort("node1")); ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); simulateSuccessfulV1Election(); ReplicationAwaiter awaiter(getReplCoord(), &txn); - OpTimeWithTermZero time1(100, 1); - OpTimeWithTermZero time2(100, 2); + OpTimeWithTermOne time1(100, 1); + OpTimeWithTermOne time2(100, 2); WriteConcernOptions writeConcern; writeConcern.wTimeout = WriteConcernOptions::kNoTimeout; @@ -1523,7 +1526,7 @@ TEST_F(ReplCoordTest, ConcurrentStepDownShouldNotSignalTheSameFinishEventMoreTha TEST_F(StepDownTest, NodeReturnsNotMasterWhenAskedToStepDownAsANonPrimaryNode) { OperationContextReplMock txn; - OpTimeWithTermZero optime1(100, 1); + OpTimeWithTermOne optime1(100, 1); // All nodes are caught up getReplCoord()->setMyLastAppliedOpTime(optime1); getReplCoord()->setMyLastDurableOpTime(optime1); @@ -1538,7 +1541,7 @@ TEST_F(StepDownTest, NodeReturnsNotMasterWhenAskedToStepDownAsANonPrimaryNode) { TEST_F(StepDownTest, NodeReturnsExceededTimeLimitWhenStepDownFailsToObtainTheGlobalLockWithinTheAllottedTime) { OperationContextReplMock txn; - OpTimeWithTermZero optime1(100, 1); + OpTimeWithTermOne optime1(100, 1); // All nodes are caught up getReplCoord()->setMyLastAppliedOpTime(optime1); getReplCoord()->setMyLastDurableOpTime(optime1); @@ -1759,8 +1762,8 @@ TEST_F(ReplCoordTest, NodeBecomesPrimaryAgainWhenStepDownTimeoutExpiresInASingle TEST_F(StepDownTest, NodeReturnsExceededTimeLimitWhenNoSecondaryIsCaughtUpWithinStepDownsSecondaryCatchUpPeriod) { OperationContextReplMock txn; - OpTimeWithTermZero optime1(100, 1); - OpTimeWithTermZero optime2(100, 2); + OpTimeWithTermOne optime1(100, 1); + OpTimeWithTermOne optime2(100, 2); // No secondary is caught up auto repl = getReplCoord(); repl->setMyLastAppliedOpTime(optime2); @@ -1941,8 +1944,8 @@ TEST_F(StepDownTest, TEST_F(StepDownTest, NodeReturnsInterruptedWhenInterruptedDuringStepDown) { const unsigned int opID = 100; OperationContextReplMock txn{opID}; - OpTimeWithTermZero optime1(100, 1); - OpTimeWithTermZero optime2(100, 2); + OpTimeWithTermOne optime1(100, 1); + OpTimeWithTermOne optime2(100, 2); // No secondary is caught up auto repl = getReplCoord(); repl->setMyLastAppliedOpTime(optime2); @@ -2092,9 +2095,9 @@ TEST_F(ReplCoordTest, NodeIncludesOtherMembersProgressInOldUpdatePositionCommand << "test2:1234") << BSON("_id" << 2 << "host" << "test3:1234"))), HostAndPort("test1", 1234)); - OpTimeWithTermZero optime1(100, 1); - OpTimeWithTermZero optime2(100, 2); - OpTimeWithTermZero optime3(2, 1); + OpTimeWithTermOne optime1(100, 1); + OpTimeWithTermOne optime2(100, 2); + OpTimeWithTermOne optime3(2, 1); getReplCoord()->setMyLastAppliedOpTime(optime1); getReplCoord()->setMyLastDurableOpTime(optime1); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 1, optime2)); @@ -2124,7 +2127,7 @@ TEST_F(ReplCoordTest, NodeIncludesOtherMembersProgressInOldUpdatePositionCommand ASSERT_EQUALS(2, memberId); ASSERT_EQUALS(optime3.timestamp, entry["optime"]["ts"].timestamp()); } - ASSERT_EQUALS(0, entry["optime"]["t"].Number()); + ASSERT_EQUALS(1, entry["optime"]["t"].Number()); } ASSERT_EQUALS(3U, memberIds.size()); // Make sure we saw all 3 nodes } @@ -2144,8 +2147,8 @@ TEST_F(ReplCoordTest, HostAndPort("test2", 1234)); OperationContextNoop txn; getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); // Can't unset maintenance mode if it was never set to begin with. Status status = getReplCoord()->setMaintenanceMode(false); @@ -2168,8 +2171,8 @@ TEST_F(ReplCoordTest, HostAndPort("test2", 1234)); OperationContextNoop txn; getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); // valid set ASSERT_OK(getReplCoord()->setMaintenanceMode(true)); ASSERT_TRUE(getReplCoord()->getMemberState().recovering()); @@ -2197,8 +2200,8 @@ TEST_F(ReplCoordTest, AllowAsManyUnsetMaintenanceModesAsThereHaveBeenSetMaintena HostAndPort("test2", 1234)); OperationContextNoop txn; getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); // Can set multiple times ASSERT_OK(getReplCoord()->setMaintenanceMode(true)); ASSERT_OK(getReplCoord()->setMaintenanceMode(true)); @@ -2228,8 +2231,8 @@ TEST_F(ReplCoordTest, SettingAndUnsettingMaintenanceModeShouldNotAffectRollbackS HostAndPort("test2", 1234)); OperationContextNoop txn; getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); // From rollback, entering and exiting maintenance mode doesn't change perceived // state. @@ -2267,8 +2270,8 @@ TEST_F(ReplCoordTest, DoNotAllowMaintenanceModeWhilePrimary) { HostAndPort("test2", 1234)); OperationContextNoop txn; getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); // Can't modify maintenance mode when PRIMARY simulateSuccessfulV1Election(); @@ -2300,8 +2303,8 @@ TEST_F(ReplCoordTest, DoNotAllowSettingMaintenanceModeWhileConductingAnElection) HostAndPort("test2", 1234)); OperationContextNoop txn; getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); // TODO this election shouldn't have to happen. simulateSuccessfulV1Election(); @@ -2360,8 +2363,8 @@ TEST_F(ReplCoordTest, HostAndPort("node1", 12345)); OperationContextNoop txn; - OpTimeWithTermZero time1(100, 1); - OpTimeWithTermZero time2(100, 2); + OpTimeWithTermOne time1(100, 1); + OpTimeWithTermOne time2(100, 2); getReplCoord()->setMyLastAppliedOpTime(time2); getReplCoord()->setMyLastDurableOpTime(time2); @@ -2404,8 +2407,8 @@ TEST_F(ReplCoordTest, HostAndPort("node1", 12345)); OperationContextNoop txn; - OpTimeWithTermZero time1(100, 1); - OpTimeWithTermZero time2(100, 2); + OpTimeWithTermOne time1(100, 1); + OpTimeWithTermOne time2(100, 2); getReplCoord()->setMyLastAppliedOpTime(time2); getReplCoord()->setMyLastDurableOpTime(time2); @@ -2434,8 +2437,8 @@ TEST_F(ReplCoordTest, NodeDoesNotIncludeItselfWhenRunningGetHostsWrittenToInMast OperationContextNoop txn; OID client = OID::gen(); - OpTimeWithTermZero time1(100, 1); - OpTimeWithTermZero time2(100, 2); + OpTimeWithTermOne time1(100, 1); + OpTimeWithTermOne time2(100, 2); getExternalState()->setClientHostAndPort(clientHost); HandshakeArgs handshake; @@ -2584,12 +2587,12 @@ TEST_F(ReplCoordTest, DoNotProcessSelfWhenUpdatePositionContainsInfoAboutSelf) { << "_id" << 2))), HostAndPort("node1", 12345)); ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); simulateSuccessfulV1Election(); - OpTime time1({100, 1}, 2); - OpTime time2({100, 2}, 2); + OpTime time1({100, 1}, 1); + OpTime time2({100, 2}, 1); getReplCoord()->setMyLastAppliedOpTime(time1); getReplCoord()->setMyLastDurableOpTime(time1); @@ -2630,13 +2633,13 @@ TEST_F(ReplCoordTest, DoNotProcessSelfWhenOldUpdatePositionContainsInfoAboutSelf << "_id" << 2))), HostAndPort("node1", 12345)); ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); simulateSuccessfulV1Election(); - OpTimeWithTermZero time1(100, 1); - OpTimeWithTermZero time2(100, 2); - OpTimeWithTermZero staleTime(10, 0); + OpTimeWithTermOne time1(100, 1); + OpTimeWithTermOne time2(100, 2); + OpTimeWithTermOne staleTime(10, 0); getReplCoord()->setMyLastAppliedOpTime(time1); getReplCoord()->setMyLastDurableOpTime(time1); @@ -2674,12 +2677,12 @@ TEST_F(ReplCoordTest, DoNotProcessUpdatePositionWhenItsConfigVersionIsIncorrect) << "_id" << 2))), HostAndPort("node1", 12345)); ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); simulateSuccessfulV1Election(); - OpTime time1({100, 1}, 3); - OpTime time2({100, 2}, 3); + OpTime time1({100, 1}, 1); + OpTime time2({100, 2}, 1); getReplCoord()->setMyLastAppliedOpTime(time1); getReplCoord()->setMyLastDurableOpTime(time1); @@ -2719,13 +2722,13 @@ TEST_F(ReplCoordTest, DoNotProcessOldUpdatePositionWhenItsConfigVersionIsIncorre << "_id" << 2))), HostAndPort("node1", 12345)); ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); simulateSuccessfulV1Election(); - OpTimeWithTermZero time1(100, 1); - OpTimeWithTermZero time2(100, 2); - OpTimeWithTermZero staleTime(10, 0); + OpTimeWithTermOne time1(100, 1); + OpTimeWithTermOne time2(100, 2); + OpTimeWithTermOne staleTime(10, 0); getReplCoord()->setMyLastAppliedOpTime(time1); getReplCoord()->setMyLastDurableOpTime(time1); @@ -2762,12 +2765,12 @@ TEST_F(ReplCoordTest, DoNotProcessUpdatePositionOfMembersWhoseIdsAreNotInTheConf << "_id" << 2))), HostAndPort("node1", 12345)); ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); simulateSuccessfulV1Election(); - OpTime time1({100, 1}, 2); - OpTime time2({100, 2}, 2); + OpTime time1({100, 1}, 1); + OpTime time2({100, 2}, 1); getReplCoord()->setMyLastAppliedOpTime(time1); getReplCoord()->setMyLastDurableOpTime(time1); @@ -2805,13 +2808,13 @@ TEST_F(ReplCoordTest, DoNotProcessOldUpdatePositionOfMembersWhoseIdsAreNotInTheC << "_id" << 2))), HostAndPort("node1", 12345)); ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); simulateSuccessfulV1Election(); - OpTimeWithTermZero time1(100, 1); - OpTimeWithTermZero time2(100, 2); - OpTimeWithTermZero staleTime(10, 0); + OpTimeWithTermOne time1(100, 1); + OpTimeWithTermOne time2(100, 2); + OpTimeWithTermOne staleTime(10, 0); getReplCoord()->setMyLastAppliedOpTime(time1); getReplCoord()->setMyLastDurableOpTime(time1); @@ -2847,13 +2850,13 @@ TEST_F(ReplCoordTest, << "_id" << 2))), HostAndPort("node1", 12345)); ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); simulateSuccessfulV1Election(); - OpTimeWithTermZero time1(100, 1); - OpTimeWithTermZero time2(100, 2); - OpTimeWithTermZero staleTime(10, 0); + OpTimeWithTermOne time1(100, 1); + OpTimeWithTermOne time2(100, 2); + OpTimeWithTermOne staleTime(10, 0); getReplCoord()->setMyLastAppliedOpTime(time1); getReplCoord()->setMyLastDurableOpTime(time1); @@ -2912,11 +2915,11 @@ TEST_F(ReplCoordTest, AwaitReplicationShouldResolveAsNormalDuringAReconfig) { << "_id" << 2))), HostAndPort("node1", 12345)); ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 2)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 2)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 2)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 2)); simulateSuccessfulV1Election(); - OpTimeWithTermZero time(100, 2); + OpTimeWithTermOne time(100, 2); // 3 nodes waiting for time WriteConcernOptions writeConcern; @@ -2994,11 +2997,11 @@ TEST_F( << "_id" << 2))), HostAndPort("node1", 12345)); ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 2)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 2)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 2)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 2)); simulateSuccessfulV1Election(); - OpTimeWithTermZero time(100, 2); + OpTimeWithTermOne time(100, 2); // 3 nodes waiting for time WriteConcernOptions writeConcern; @@ -3055,8 +3058,8 @@ TEST_F(ReplCoordTest, << "_id" << 4))), HostAndPort("node1", 12345)); ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 1)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 1)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 1)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 1)); simulateSuccessfulV1Election(); OpTime time(Timestamp(100, 2), 1); @@ -3227,13 +3230,13 @@ TEST_F(ReplCoordTest, NodeReturnsShutdownInProgressWhenWaitingUntilAnOpTimeDurin << "_id" << 0))), HostAndPort("node1", 12345)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(10, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(10, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(10, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(10, 0)); shutdown(); auto result = getReplCoord()->waitUntilOpTime( - &txn, ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern)); + &txn, ReadConcernArgs(OpTimeWithTermOne(50, 0), ReadConcernLevel::kLocalReadConcern)); ASSERT_TRUE(result.didWait()); ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, result.getStatus()); @@ -3248,13 +3251,13 @@ TEST_F(ReplCoordTest, NodeReturnsInterruptedWhenWaitingUntilAnOpTimeIsInterrupte << "_id" << 0))), HostAndPort("node1", 12345)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(10, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(10, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(10, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(10, 0)); txn.setCheckForInterruptStatus(Status(ErrorCodes::Interrupted, "test")); auto result = getReplCoord()->waitUntilOpTime( - &txn, ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern)); + &txn, ReadConcernArgs(OpTimeWithTermOne(50, 0), ReadConcernLevel::kLocalReadConcern)); ASSERT_TRUE(result.didWait()); ASSERT_EQUALS(ErrorCodes::Interrupted, result.getStatus()); @@ -3284,10 +3287,10 @@ TEST_F(ReplCoordTest, NodeReturnsOkImmediatelyWhenWaitingUntilOpTimePassesAnOpTi << "_id" << 0))), HostAndPort("node1", 12345)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); auto result = getReplCoord()->waitUntilOpTime( - &txn, ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern)); + &txn, ReadConcernArgs(OpTimeWithTermOne(50, 0), ReadConcernLevel::kLocalReadConcern)); ASSERT_TRUE(result.didWait()); ASSERT_OK(result.getStatus()); @@ -3303,7 +3306,7 @@ TEST_F(ReplCoordTest, NodeReturnsOkImmediatelyWhenWaitingUntilOpTimePassesAnOpTi HostAndPort("node1", 12345)); - OpTimeWithTermZero time(100, 0); + OpTimeWithTermOne time(100, 0); getReplCoord()->setMyLastAppliedOpTime(time); getReplCoord()->setMyLastDurableOpTime(time); auto result = getReplCoord()->waitUntilOpTime( @@ -3318,7 +3321,7 @@ TEST_F(ReplCoordTest, init(ReplSettings()); OperationContextNoop txn; auto result = getReplCoord()->waitUntilOpTime( - &txn, ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern)); + &txn, ReadConcernArgs(OpTimeWithTermOne(50, 0), ReadConcernLevel::kLocalReadConcern)); ASSERT_FALSE(result.didWait()); ASSERT_EQUALS(ErrorCodes::NotAReplicaSet, result.getStatus()); @@ -3505,7 +3508,7 @@ TEST_F(ReplCoordTest, IgnoreTheContentsOfMetadataWhenItsConfigVersionDoesNotMatc "lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 2) << "lastOpVisible" << BSON("ts" << Timestamp(10, 0) << "t" << 2) << "configVersion" << 1 << "primaryIndex" << 2 << "term" << 2 << "syncSourceIndex" << 1))); - getReplCoord()->processReplSetMetadata(metadata.getValue()); + getReplCoord()->processReplSetMetadata(metadata.getValue(), true); ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime()); // higher configVersion @@ -3515,7 +3518,7 @@ TEST_F(ReplCoordTest, IgnoreTheContentsOfMetadataWhenItsConfigVersionDoesNotMatc << BSON("ts" << Timestamp(10, 0) << "t" << 2) << "lastOpVisible" << BSON("ts" << Timestamp(10, 0) << "t" << 2) << "configVersion" << 100 << "primaryIndex" << 2 << "term" << 2 << "syncSourceIndex" << 1))); - getReplCoord()->processReplSetMetadata(metadata2.getValue()); + getReplCoord()->processReplSetMetadata(metadata2.getValue(), true); ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime()); } @@ -3550,7 +3553,7 @@ TEST_F(ReplCoordTest, UpdateLastCommittedOpTimeWhenTheLastCommittedOpTimeFromMet "lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 1) << "lastOpVisible" << BSON("ts" << Timestamp(10, 0) << "t" << 1) << "configVersion" << 2 << "primaryIndex" << 2 << "term" << 1 << "syncSourceIndex" << 1))); - getReplCoord()->processReplSetMetadata(metadata.getValue()); + getReplCoord()->processReplSetMetadata(metadata.getValue(), true); ASSERT_EQUALS(OpTime(Timestamp(10, 0), 1), getReplCoord()->getLastCommittedOpTime()); ASSERT_EQUALS(OpTime(Timestamp(10, 0), 1), getReplCoord()->getCurrentCommittedSnapshotOpTime()); @@ -3560,7 +3563,7 @@ TEST_F(ReplCoordTest, UpdateLastCommittedOpTimeWhenTheLastCommittedOpTimeFromMet "lastOpCommitted" << BSON("ts" << Timestamp(9, 0) << "t" << 1) << "lastOpVisible" << BSON("ts" << Timestamp(9, 0) << "t" << 1) << "configVersion" << 2 << "primaryIndex" << 2 << "term" << 1 << "syncSourceIndex" << 1))); - getReplCoord()->processReplSetMetadata(metadata2.getValue()); + getReplCoord()->processReplSetMetadata(metadata2.getValue(), true); ASSERT_EQUALS(OpTime(Timestamp(10, 0), 1), getReplCoord()->getLastCommittedOpTime()); } @@ -3591,7 +3594,7 @@ TEST_F(ReplCoordTest, UpdateTermWhenTheTermFromMetadataIsNewerButNeverUpdateCurr "lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 3) << "lastOpVisible" << BSON("ts" << Timestamp(10, 0) << "t" << 3) << "configVersion" << 2 << "primaryIndex" << 2 << "term" << 3 << "syncSourceIndex" << 1))); - getReplCoord()->processReplSetMetadata(metadata.getValue()); + getReplCoord()->processReplSetMetadata(metadata.getValue(), true); ASSERT_EQUALS(OpTime(Timestamp(10, 0), 3), getReplCoord()->getLastCommittedOpTime()); ASSERT_EQUALS(3, getReplCoord()->getTerm()); ASSERT_EQUALS(-1, getTopoCoord().getCurrentPrimaryIndex()); @@ -3602,7 +3605,7 @@ TEST_F(ReplCoordTest, UpdateTermWhenTheTermFromMetadataIsNewerButNeverUpdateCurr "lastOpCommitted" << BSON("ts" << Timestamp(11, 0) << "t" << 3) << "lastOpVisible" << BSON("ts" << Timestamp(11, 0) << "t" << 3) << "configVersion" << 2 << "primaryIndex" << 1 << "term" << 2 << "syncSourceIndex" << 1))); - getReplCoord()->processReplSetMetadata(metadata2.getValue()); + getReplCoord()->processReplSetMetadata(metadata2.getValue(), true); ASSERT_EQUALS(OpTime(Timestamp(11, 0), 3), getReplCoord()->getLastCommittedOpTime()); ASSERT_EQUALS(3, getReplCoord()->getTerm()); ASSERT_EQUALS(-1, getTopoCoord().getCurrentPrimaryIndex()); @@ -3613,14 +3616,14 @@ TEST_F(ReplCoordTest, UpdateTermWhenTheTermFromMetadataIsNewerButNeverUpdateCurr "lastOpCommitted" << BSON("ts" << Timestamp(11, 0) << "t" << 3) << "lastOpVisible" << BSON("ts" << Timestamp(11, 0) << "t" << 3) << "configVersion" << 2 << "primaryIndex" << 1 << "term" << 3 << "syncSourceIndex" << 1))); - getReplCoord()->processReplSetMetadata(metadata3.getValue()); + getReplCoord()->processReplSetMetadata(metadata3.getValue(), true); ASSERT_EQUALS(OpTime(Timestamp(11, 0), 3), getReplCoord()->getLastCommittedOpTime()); ASSERT_EQUALS(3, getReplCoord()->getTerm()); ASSERT_EQUALS(-1, getTopoCoord().getCurrentPrimaryIndex()); } TEST_F(ReplCoordTest, - TermAndLastCommittedOpTimeUpdateWhenHeartbeatResponseWithMetadataHasFresherValues) { + LastCommittedOpTimeNotUpdatedEvenWhenHeartbeatResponseWithMetadataHasFresherValues) { // Ensure that the metadata is processed if it is contained in a heartbeat response. assertStartSuccess(BSON("_id" << "mySet" @@ -3640,7 +3643,61 @@ TEST_F(ReplCoordTest, auto replCoord = getReplCoord(); auto config = replCoord->getConfig(); - // Higher term - should update term and lastCommittedOpTime. + // Higher term - should update term but not last committed optime. + StatusWith<rpc::ReplSetMetadata> metadata = rpc::ReplSetMetadata::readFromMetadata(BSON( + rpc::kReplSetMetadataFieldName + << BSON("lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 3) << "lastOpVisible" + << BSON("ts" << Timestamp(10, 0) << "t" << 3) << "configVersion" + << config.getConfigVersion() << "primaryIndex" << 1 << "term" << 3 + << "syncSourceIndex" << 1))); + BSONObjBuilder metadataBuilder; + ASSERT_OK(metadata.getValue().writeToMetadata(&metadataBuilder)); + auto metadataObj = metadataBuilder.obj(); + + auto net = getNet(); + net->enterNetwork(); + + ASSERT_TRUE(net->hasReadyRequests()); + auto noi = net->getNextReadyRequest(); + const auto& request = noi->getRequest(); + ASSERT_EQUALS(HostAndPort("node2", 12345), request.target); + ASSERT_EQUALS("replSetHeartbeat", request.cmdObj.firstElement().fieldNameStringData()); + + ReplSetHeartbeatResponse hbResp; + hbResp.setConfigVersion(config.getConfigVersion()); + hbResp.setSetName(config.getReplSetName()); + hbResp.setState(MemberState::RS_SECONDARY); + net->scheduleResponse(noi, net->now(), makeResponseStatus(hbResp.toBSON(true), metadataObj)); + net->runReadyNetworkOperations(); + net->exitNetwork(); + + ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime()); + ASSERT_EQUALS(3, getReplCoord()->getTerm()); + ASSERT_EQUALS(-1, getTopoCoord().getCurrentPrimaryIndex()); +} + +TEST_F(ReplCoordTest, TermAndLastCommittedOpTimeUpdatedFromHeartbeatWhenArbiter) { + // Ensure that the metadata is processed if it is contained in a heartbeat response. + assertStartSuccess(BSON("_id" + << "mySet" + << "version" << 2 << "members" + << BSON_ARRAY(BSON("host" + << "node1:12345" + << "_id" << 0 << "arbiterOnly" << true) + << BSON("host" + << "node2:12345" + << "_id" << 1)) << "protocolVersion" << 1), + HostAndPort("node1", 12345)); + ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime()); + OperationContextNoop txn; + getReplCoord()->updateTerm(&txn, 1); + ASSERT_EQUALS(1, getReplCoord()->getTerm()); + + auto replCoord = getReplCoord(); + auto config = replCoord->getConfig(); + + // Higher term - should update term and lastCommittedOpTime since arbiters learn of the + // commit point via heartbeats. StatusWith<rpc::ReplSetMetadata> metadata = rpc::ReplSetMetadata::readFromMetadata(BSON( rpc::kReplSetMetadataFieldName << BSON("lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 3) << "lastOpVisible" @@ -3836,7 +3893,7 @@ TEST_F(ReplCoordTest, } TEST_F(ReplCoordTest, - CancelAndRescheduleElectionTimeoutWhenProcessingHeartbeatResponseFromPrimary) { + RescheduleElectionTimeoutWhenProcessingHeartbeatResponseFromPrimaryInSameTerm) { assertStartSuccess(BSON("_id" << "mySet" << "protocolVersion" << 1 << "version" << 2 << "members" @@ -3868,6 +3925,8 @@ TEST_F(ReplCoordTest, ReplSetHeartbeatResponse hbResp; hbResp.setSetName("mySet"); hbResp.setState(MemberState::RS_PRIMARY); + hbResp.setTerm(replCoord->getTerm()); + // Heartbeat response is scheduled with a delay so that we can be sure that // the election was rescheduled due to the heartbeat response. auto heartbeatWhen = net->now() + Seconds(1); @@ -3882,6 +3941,54 @@ TEST_F(ReplCoordTest, } TEST_F(ReplCoordTest, + DontRescheduleElectionTimeoutWhenProcessingHeartbeatResponseFromPrimaryInDiffertTerm) { + assertStartSuccess(BSON("_id" + << "mySet" + << "protocolVersion" << 1 << "version" << 2 << "members" + << BSON_ARRAY(BSON("host" + << "node1:12345" + << "_id" << 0) + << BSON("host" + << "node2:12345" + << "_id" << 1))), + HostAndPort("node1", 12345)); + + ReplicationCoordinatorImpl* replCoord = getReplCoord(); + ASSERT_TRUE(replCoord->setFollowerMode(MemberState::RS_SECONDARY)); + + auto electionTimeoutWhen = replCoord->getElectionTimeout_forTest(); + ASSERT_NOT_EQUALS(Date_t(), electionTimeoutWhen); + + auto net = getNet(); + net->enterNetwork(); + ASSERT_TRUE(net->hasReadyRequests()); + auto noi = net->getNextReadyRequest(); + auto&& request = noi->getRequest(); + log() << "processing " << request.cmdObj; + ASSERT_EQUALS(HostAndPort("node2", 12345), request.target); + + ASSERT_EQUALS("replSetHeartbeat", request.cmdObj.firstElement().fieldNameStringData()); + + // Respond to node1's heartbeat command to indicate that node2 is PRIMARY. + ReplSetHeartbeatResponse hbResp; + hbResp.setSetName("mySet"); + hbResp.setState(MemberState::RS_PRIMARY); + hbResp.setTerm(replCoord->getTerm() - 1); + + // Heartbeat response is scheduled with a delay so that we can be sure that + // the election was rescheduled due to the heartbeat response. + auto heartbeatWhen = net->now() + Seconds(1); + net->scheduleResponse(noi, heartbeatWhen, makeResponseStatus(hbResp.toBSON(true))); + net->runUntil(heartbeatWhen); + ASSERT_EQUALS(heartbeatWhen, net->now()); + net->runReadyNetworkOperations(); + net->exitNetwork(); + + ASSERT_GREATER_THAN(heartbeatWhen + replCoord->getConfig().getElectionTimeoutPeriod(), + replCoord->getElectionTimeout_forTest()); +} + +TEST_F(ReplCoordTest, CancelAndRescheduleElectionTimeoutWhenProcessingHeartbeatResponseWithoutState) { assertStartSuccess(BSON("_id" << "mySet" @@ -4241,7 +4348,7 @@ TEST_F(ReplCoordTest, NewStyleUpdatePositionCmdHasMetadata) { // Set last committed optime via metadata. rpc::ReplSetMetadata syncSourceMetadata(optime.getTerm(), optime, optime, 1, OID(), -1, 1); - getReplCoord()->processReplSetMetadata(syncSourceMetadata); + getReplCoord()->processReplSetMetadata(syncSourceMetadata, true); getReplCoord()->onSnapshotCreate(optime, SnapshotName(1)); BSONObjBuilder cmdBuilder; diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index cb763bed4ff..df4a83c6f07 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -236,7 +236,8 @@ void ReplicationCoordinatorMock::processReplSetGetConfig(BSONObjBuilder* result) // TODO } -void ReplicationCoordinatorMock::processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) {} +void ReplicationCoordinatorMock::processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata, + bool advanceCommitPoint) {} void ReplicationCoordinatorMock::cancelAndRescheduleElectionTimeout() {} diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index 861799f00d2..24eba706cfe 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -150,7 +150,8 @@ public: virtual void processReplSetGetConfig(BSONObjBuilder* result); - virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata); + void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata, + bool advanceCommitPoint) override; virtual void cancelAndRescheduleElectionTimeout() override; diff --git a/src/mongo/db/repl/replset_commands.cpp b/src/mongo/db/repl/replset_commands.cpp index e6d90aa82ec..0c202e906a1 100644 --- a/src/mongo/db/repl/replset_commands.cpp +++ b/src/mongo/db/repl/replset_commands.cpp @@ -681,7 +681,7 @@ public: // New style update position command has metadata, which may inform the // upstream of a higher term. auto metadata = metadataResult.getValue(); - replCoord->processReplSetMetadata(metadata); + replCoord->processReplSetMetadata(metadata, false /*don't advance the commit point*/); } // In the case of an update from a member with an invalid replica set config, diff --git a/src/mongo/db/write_concern.cpp b/src/mongo/db/write_concern.cpp index 3c9086ca39a..9dc04670dc4 100644 --- a/src/mongo/db/write_concern.cpp +++ b/src/mongo/db/write_concern.cpp @@ -45,6 +45,7 @@ #include "mongo/db/storage/storage_engine.h" #include "mongo/db/write_concern_options.h" #include "mongo/rpc/protocol.h" +#include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" namespace mongo { @@ -72,6 +73,8 @@ namespace { const std::string kLocalDB = "local"; } // namespace +MONGO_FP_DECLARE(hangBeforeWaitingForWriteConcern); + StatusWith<WriteConcernOptions> extractWriteConcern(OperationContext* txn, const BSONObj& cmdObj, const std::string& dbName) { @@ -234,6 +237,8 @@ Status waitForWriteConcern(OperationContext* txn, // This check does not hold for writes done through dbeval because it runs with a global X lock. dassert(!txn->lockState()->isLocked() || txn->getClient()->isInDirectClient()); + MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangBeforeWaitingForWriteConcern); + // Next handle blocking on disk Timer syncTimer; diff --git a/src/mongo/shell/assert.js b/src/mongo/shell/assert.js index 0d1e225a990..ba9333fb9cf 100644 --- a/src/mongo/shell/assert.js +++ b/src/mongo/shell/assert.js @@ -452,16 +452,33 @@ assert.writeOK = function(res, msg) { }; assert.writeError = function(res, msg) { + return assert.writeErrorWithCode(res, null, msg); +}; + +assert.writeErrorWithCode = function(res, expectedCode, msg) { var errMsg = null; + var foundCode = null; if (res instanceof WriteResult) { - if (!res.hasWriteError() && !res.hasWriteConcernError()) { + if (res.hasWriteError()) { + foundCode = res.getWriteError().code; + } else if (res.hasWriteConcernError()) { + foundCode = res.getWriteConcernError().code; + } else { errMsg = "no write error: " + tojson(res); } } else if (res instanceof BulkWriteResult) { // Can only happen with bulk inserts - if (!res.hasWriteErrors() && !res.hasWriteConcernError()) { + if (res.hasWriteErrors()) { + if (res.getWriteErrorCount() > 1 && expectedCode != null) { + errMsg = "can't check for specific code when there was more than one write error"; + } else { + foundCode = res.getWriteErrorAt(0).code; + } + } else if (res.hasWriteConcernError()) { + foundCode = res.getWriteConcernError().code; + } else { errMsg = "no write errors: " + tojson(res); } } else if (res instanceof WriteCommandError) { @@ -473,6 +490,12 @@ assert.writeError = function(res, msg) { } } + if (!errMsg && expectedCode) { + if (foundCode != expectedCode) { + errMsg = "found code " + foundCode + " does not match expected code " + expectedCode; + } + } + if (errMsg) { if (msg) errMsg = errMsg + ": " + msg; |