summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Chan <jason.chan@10gen.com>2020-04-09 20:59:30 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-04-23 19:26:49 +0000
commit3064008dd3830bee2c18cea531fff565e26d47e5 (patch)
tree4288e9377b2455855b032035b3a262e74da6d296
parentad478267c27b2b5f36cb39ad8c150081eaec9644 (diff)
downloadmongo-3064008dd3830bee2c18cea531fff565e26d47e5.tar.gz
SERVER-46893 Allow streamable isMaster to wait on removed/uninitialized nodes
(cherry picked from commit 2bad13a63315132a2793194d8d89f28dd7534928) SERVER-47638 Ensure isMaster is waiting before calling replSetInitiate in AwaitableIsMasterOnNodeWithUninitializedConfigInvalidHorizon (cherry picked from commit 523b9c9f92db20062ad6e3f42ceb80292e1a23f3)
-rw-r--r--jstests/replsets/awaitable_ismaster_errors_on_horizon_change.js2
-rw-r--r--jstests/replsets/awaitable_ismaster_on_nodes_with_invalid_configs.js172
-rw-r--r--src/mongo/db/repl/replication_coordinator.h4
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp201
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h46
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_test.cpp424
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp4
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h4
-rw-r--r--src/mongo/db/repl/replication_coordinator_noop.cpp4
-rw-r--r--src/mongo/db/repl/replication_coordinator_noop.h4
-rw-r--r--src/mongo/embedded/replication_coordinator_embedded.cpp4
-rw-r--r--src/mongo/embedded/replication_coordinator_embedded.h4
12 files changed, 750 insertions, 123 deletions
diff --git a/jstests/replsets/awaitable_ismaster_errors_on_horizon_change.js b/jstests/replsets/awaitable_ismaster_errors_on_horizon_change.js
index e48a1e1f917..d7c2c3ca4bc 100644
--- a/jstests/replsets/awaitable_ismaster_errors_on_horizon_change.js
+++ b/jstests/replsets/awaitable_ismaster_errors_on_horizon_change.js
@@ -26,7 +26,7 @@ function runAwaitableIsMasterBeforeHorizonChange(topologyVersionField) {
}),
ErrorCodes.SplitHorizonChange);
- assert.eq(result.errmsg, "Received a reconfig that changed the horizon parameters.");
+ assert.eq(result.errmsg, "Received a reconfig that changed the horizon mappings.");
}
function runAwaitableIsMaster(topologyVersionField) {
diff --git a/jstests/replsets/awaitable_ismaster_on_nodes_with_invalid_configs.js b/jstests/replsets/awaitable_ismaster_on_nodes_with_invalid_configs.js
new file mode 100644
index 00000000000..f39720e2ed5
--- /dev/null
+++ b/jstests/replsets/awaitable_ismaster_on_nodes_with_invalid_configs.js
@@ -0,0 +1,172 @@
+/**
+ * Tests the streamable isMaster protocol against nodes with invalid replica set configs.
+ * @tags: [requires_fcv_44]
+ */
+(function() {
+"use strict";
+load("jstests/libs/parallel_shell_helpers.js");
+load("jstests/libs/fail_point_util.js");
+
+const replTest = new ReplSetTest({nodes: [{}, {rsConfig: {priority: 0, votes: 0}}]});
+// Start the replica set but don't initiate yet.
+replTest.startSet();
+
+const dbName = "awaitable_ismaster_horizon_change";
+const node0 = replTest.nodes[0];
+const node1 = replTest.nodes[1];
+const dbNode0 = node0.getDB(dbName);
+const dbNode1 = node1.getDB(dbName);
+
+let responseNode0 = assert.commandWorked(dbNode0.runCommand({isMaster: 1}));
+let responseNode1 = assert.commandWorked(dbNode1.runCommand({isMaster: 1}));
+let topologyVersionNode0 = responseNode0.topologyVersion;
+let topologyVersionNode1 = responseNode1.topologyVersion;
+
+function runAwaitableIsMaster(topologyVersionField) {
+ const result = assert.commandWorked(db.runCommand({
+ isMaster: 1,
+ topologyVersion: topologyVersionField,
+ maxAwaitTimeMS: 99999999,
+ }));
+ assert.eq(topologyVersionField.counter + 1, result.topologyVersion.counter, result);
+}
+
+// Waiting isMasters should error when a node rejoins a replica set.
+function runAwaitableIsMasterOnRejoiningSet(topologyVersionField) {
+ const result = assert.commandFailedWithCode(db.runCommand({
+ isMaster: 1,
+ topologyVersion: topologyVersionField,
+ maxAwaitTimeMS: 99999999,
+ }),
+ ErrorCodes.SplitHorizonChange);
+ assert.eq(result.errmsg, "Received a reconfig that changed the horizon mappings.");
+}
+
+// A failpoint signalling that the servers have received the isMaster request and are waiting for a
+// topology change.
+let node0FailPoint = configureFailPoint(node0, "waitForIsMasterResponse");
+let node1FailPoint = configureFailPoint(node1, "waitForIsMasterResponse");
+// Send an awaitable isMaster request. This will block until there is a topology change.
+const firstAwaitInitiateOnNode0 =
+ startParallelShell(funWithArgs(runAwaitableIsMaster, topologyVersionNode0), node0.port);
+const firstAwaitInitiateOnNode1 =
+ startParallelShell(funWithArgs(runAwaitableIsMaster, topologyVersionNode1), node1.port);
+node0FailPoint.wait();
+node1FailPoint.wait();
+
+// Each node has one isMaster request waiting on a topology change.
+let numAwaitingTopologyChangeOnNode0 = dbNode0.serverStatus().connections.awaitingTopologyChanges;
+let numAwaitingTopologyChangeOnNode1 = dbNode1.serverStatus().connections.awaitingTopologyChanges;
+assert.eq(1, numAwaitingTopologyChangeOnNode0);
+assert.eq(1, numAwaitingTopologyChangeOnNode1);
+
+// Reconfigure the failpoint to refresh the number of times the failpoint has been entered.
+node0FailPoint = configureFailPoint(node0, "waitForIsMasterResponse");
+node1FailPoint = configureFailPoint(node1, "waitForIsMasterResponse");
+const secondAwaitInitiateOnNode0 =
+ startParallelShell(funWithArgs(runAwaitableIsMaster, topologyVersionNode0), node0.port);
+const secondAwaitInitiateOnNode1 =
+ startParallelShell(funWithArgs(runAwaitableIsMaster, topologyVersionNode1), node1.port);
+node0FailPoint.wait();
+node1FailPoint.wait();
+
+// Each node has two isMaster requests waiting on a topology change.
+numAwaitingTopologyChangeOnNode0 = dbNode0.serverStatus().connections.awaitingTopologyChanges;
+numAwaitingTopologyChangeOnNode1 = dbNode1.serverStatus().connections.awaitingTopologyChanges;
+assert.eq(2, numAwaitingTopologyChangeOnNode0);
+assert.eq(2, numAwaitingTopologyChangeOnNode1);
+
+// Doing a replSetInitiate should respond to all waiting isMasters.
+replTest.initiate();
+firstAwaitInitiateOnNode0();
+firstAwaitInitiateOnNode1();
+secondAwaitInitiateOnNode0();
+secondAwaitInitiateOnNode1();
+
+numAwaitingTopologyChangeOnNode0 = dbNode0.serverStatus().connections.awaitingTopologyChanges;
+numAwaitingTopologyChangeOnNode1 = dbNode1.serverStatus().connections.awaitingTopologyChanges;
+assert.eq(0, numAwaitingTopologyChangeOnNode0);
+assert.eq(0, numAwaitingTopologyChangeOnNode1);
+
+let primary = replTest.getPrimary();
+let secondary = replTest.getSecondary();
+let primaryDB = primary.getDB('admin');
+let secondaryDB = secondary.getDB('admin');
+const primaryRespAfterInitiate = assert.commandWorked(primaryDB.runCommand({isMaster: 1}));
+let primaryTopologyVersion = primaryRespAfterInitiate.topologyVersion;
+
+// Reconfigure the failpoint to refresh the number of times the failpoint has been entered.
+let primaryFailPoint = configureFailPoint(primary, "waitForIsMasterResponse");
+const awaitPrimaryIsMasterBeforeNodeRemoval =
+ startParallelShell(funWithArgs(runAwaitableIsMaster, primaryTopologyVersion), primary.port);
+primaryFailPoint.wait();
+
+// The primary has one isMaster request waiting on a topology change.
+let numAwaitingTopologyChangeOnPrimary =
+ primaryDB.serverStatus().connections.awaitingTopologyChanges;
+assert.eq(1, numAwaitingTopologyChangeOnPrimary);
+
+// Doing a reconfig that removes the secondary should respond to all waiting isMasters.
+let config = replTest.getReplSetConfig();
+config.members.splice(1, 1);
+config.version = replTest.getReplSetConfigFromNode().version + 1;
+assert.commandWorked(primaryDB.runCommand({replSetReconfig: config}));
+awaitPrimaryIsMasterBeforeNodeRemoval();
+
+// Wait for secondary to realize it is removed.
+assert.soonNoExcept(
+ () => assert.commandFailedWithCode(secondaryDB.adminCommand({replSetGetStatus: 1}),
+ ErrorCodes.InvalidReplicaSetConfig));
+
+const primaryRespAfterRemoval = assert.commandWorked(primaryDB.runCommand({isMaster: 1}));
+const secondaryRespAfterRemoval = assert.commandWorked(secondaryDB.runCommand({isMaster: 1}));
+primaryTopologyVersion = primaryRespAfterRemoval.topologyVersion;
+let secondaryTopologyVersion = secondaryRespAfterRemoval.topologyVersion;
+assert.eq(false, secondaryRespAfterRemoval.ismaster, secondaryRespAfterRemoval);
+assert.eq(false, secondaryRespAfterRemoval.secondary, secondaryRespAfterRemoval);
+assert.eq("Does not have a valid replica set config",
+ secondaryRespAfterRemoval.info,
+ secondaryRespAfterRemoval);
+
+// Reconfigure the failpoint to refresh the number of times the failpoint has been entered.
+primaryFailPoint = configureFailPoint(primary, "waitForIsMasterResponse");
+let secondaryFailPoint = configureFailPoint(secondary, "waitForIsMasterResponse");
+const awaitPrimaryIsMasterBeforeReadding =
+ startParallelShell(funWithArgs(runAwaitableIsMaster, primaryTopologyVersion), primary.port);
+const firstAwaitSecondaryIsMasterBeforeRejoining = startParallelShell(
+ funWithArgs(runAwaitableIsMasterOnRejoiningSet, secondaryTopologyVersion), secondary.port);
+primaryFailPoint.wait();
+secondaryFailPoint.wait();
+
+numAwaitingTopologyChangeOnPrimary = primaryDB.serverStatus().connections.awaitingTopologyChanges;
+let numAwaitingTopologyChangeOnSecondary =
+ secondaryDB.serverStatus().connections.awaitingTopologyChanges;
+assert.eq(1, numAwaitingTopologyChangeOnPrimary);
+assert.eq(1, numAwaitingTopologyChangeOnSecondary);
+
+// Send a second isMaster to the removed secondary.
+secondaryFailPoint = configureFailPoint(secondary, "waitForIsMasterResponse");
+const secondAwaitSecondaryIsMasterBeforeRejoining = startParallelShell(
+ funWithArgs(runAwaitableIsMasterOnRejoiningSet, secondaryTopologyVersion), secondary.port);
+secondaryFailPoint.wait();
+
+numAwaitingTopologyChangeOnSecondary =
+ secondaryDB.serverStatus().connections.awaitingTopologyChanges;
+assert.eq(2, numAwaitingTopologyChangeOnSecondary);
+
+// Have the secondary rejoin the set. This should respond to waiting isMasters on both nodes.
+config = replTest.getReplSetConfig();
+config.version = replTest.getReplSetConfigFromNode().version + 1;
+assert.commandWorked(primaryDB.runCommand({replSetReconfig: config}));
+awaitPrimaryIsMasterBeforeReadding();
+firstAwaitSecondaryIsMasterBeforeRejoining();
+secondAwaitSecondaryIsMasterBeforeRejoining();
+
+numAwaitingTopologyChangeOnPrimary = primaryDB.serverStatus().connections.awaitingTopologyChanges;
+numAwaitingTopologyChangeOnSecondary =
+ secondaryDB.serverStatus().connections.awaitingTopologyChanges;
+assert.eq(0, numAwaitingTopologyChangeOnPrimary);
+assert.eq(0, numAwaitingTopologyChangeOnSecondary);
+
+replTest.stopSet();
+})(); \ No newline at end of file
diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h
index fca99dfad5f..afa0fbadff0 100644
--- a/src/mongo/db/repl/replication_coordinator.h
+++ b/src/mongo/db/repl/replication_coordinator.h
@@ -984,7 +984,7 @@ public:
OperationContext* opCtx,
const SplitHorizon::Parameters& horizonParams,
boost::optional<TopologyVersion> clientTopologyVersion,
- boost::optional<Date_t> deadline) const = 0;
+ boost::optional<Date_t> deadline) = 0;
/**
* The futurized version of `awaitIsMasterResponse()`:
@@ -993,7 +993,7 @@ public:
*/
virtual SharedSemiFuture<std::shared_ptr<const IsMasterResponse>> getIsMasterResponseFuture(
const SplitHorizon::Parameters& horizonParams,
- boost::optional<TopologyVersion> clientTopologyVersion) const = 0;
+ boost::optional<TopologyVersion> clientTopologyVersion) = 0;
/**
* Returns the OpTime that consists of the timestamp of the latest oplog entry and the current
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 51983a92dce..0da209a79e0 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -536,9 +536,9 @@ void ReplicationCoordinatorImpl::_createHorizonTopologyChangePromiseMapping(With
auto horizonMappings = _rsConfig.getMemberAt(_selfIndex).getHorizonMappings();
// Create a new horizon to promise mapping since it is possible for the horizons
// to change after a replica set reconfig.
- _horizonToPromiseMap.clear();
+ _horizonToTopologyChangePromiseMap.clear();
for (auto const& [horizon, hostAndPort] : horizonMappings) {
- _horizonToPromiseMap.emplace(
+ _horizonToTopologyChangePromiseMap.emplace(
horizon, std::make_shared<SharedPromise<std::shared_ptr<const IsMasterResponse>>>());
}
}
@@ -2089,12 +2089,22 @@ void ReplicationCoordinatorImpl::updateAndLogStateTransitionMetrics(
}
std::shared_ptr<IsMasterResponse> ReplicationCoordinatorImpl::_makeIsMasterResponse(
- const StringData horizonString, WithLock lock) const {
+ boost::optional<StringData> horizonString, WithLock lock, const bool hasValidConfig) const {
+ if (!hasValidConfig) {
+ auto response = std::make_shared<IsMasterResponse>();
+ response->setTopologyVersion(_topCoord->getTopologyVersion());
+ response->markAsNoConfig();
+ return response;
+ }
+
+ // horizonString must be passed in if we are a valid member of the config.
+ invariant(horizonString);
auto response = std::make_shared<IsMasterResponse>();
invariant(getSettings().usingReplSets());
- _topCoord->fillIsMasterForReplSet(response, horizonString);
+ _topCoord->fillIsMasterForReplSet(response, *horizonString);
OpTime lastOpTime = _getMyLastAppliedOpTime_inlock();
+
response->setLastWrite(lastOpTime, lastOpTime.getTimestamp().getSecs());
if (_currentCommittedSnapshot) {
response->setLastMajorityWrite(_currentCommittedSnapshot->opTime,
@@ -2118,43 +2128,23 @@ SharedSemiFuture<ReplicationCoordinatorImpl::SharedIsMasterResponse>
ReplicationCoordinatorImpl::_getIsMasterResponseFuture(
WithLock lk,
const SplitHorizon::Parameters& horizonParams,
- boost::optional<TopologyVersion> clientTopologyVersion) const {
+ boost::optional<StringData> horizonString,
+ boost::optional<TopologyVersion> clientTopologyVersion) {
- const MemberState myState = _topCoord->getMemberState();
- if (!_rsConfig.isInitialized() || myState.removed()) {
- // It is possible the SplitHorizon mappings have not been initialized yet for a member
- // config. We also clear the horizon mappings for nodes that are no longer part of the
- // config.
- auto response = std::make_shared<IsMasterResponse>();
- response->setTopologyVersion(_topCoord->getTopologyVersion());
- response->markAsNoConfig();
- return SharedSemiFuture<SharedIsMasterResponse>(
- SharedIsMasterResponse(std::move(response)));
- }
+ const bool hasValidConfig = horizonString != boost::none;
- const auto& self = _rsConfig.getMemberAt(_selfIndex);
- // determineHorizon falls back to kDefaultHorizon if the server does not know of the given
- // horizon.
- const StringData horizonString = self.determineHorizon(horizonParams);
if (!clientTopologyVersion) {
// The client is not using awaitable isMaster so we respond immediately.
return SharedSemiFuture<SharedIsMasterResponse>(
- SharedIsMasterResponse(_makeIsMasterResponse(horizonString, lk)));
+ SharedIsMasterResponse(_makeIsMasterResponse(horizonString, lk, hasValidConfig)));
}
- // Each awaitable isMaster will wait on their specific horizon. We always expect horizonString
- // to exist in _horizonToPromiseMap.
- auto horizonIter = _horizonToPromiseMap.find(horizonString);
- invariant(horizonIter != end(_horizonToPromiseMap));
- SharedSemiFuture<std::shared_ptr<const IsMasterResponse>> future =
- horizonIter->second->getFuture();
-
const TopologyVersion topologyVersion = _topCoord->getTopologyVersion();
if (clientTopologyVersion->getProcessId() != topologyVersion.getProcessId()) {
// Getting a different process id indicates that the server has restarted so we return
// immediately with the updated process id.
return SharedSemiFuture<SharedIsMasterResponse>(
- SharedIsMasterResponse(_makeIsMasterResponse(horizonString, lk)));
+ SharedIsMasterResponse(_makeIsMasterResponse(horizonString, lk, hasValidConfig)));
}
auto prevCounter = clientTopologyVersion->getCounter();
@@ -2169,38 +2159,64 @@ ReplicationCoordinatorImpl::_getIsMasterResponseFuture(
// The received isMaster command contains a stale topology version so we respond
// immediately with a more current topology version.
return SharedSemiFuture<SharedIsMasterResponse>(
- SharedIsMasterResponse(_makeIsMasterResponse(horizonString, lk)));
+ SharedIsMasterResponse(_makeIsMasterResponse(horizonString, lk, hasValidConfig)));
}
- return future;
+ if (!hasValidConfig) {
+ // An empty SNI will correspond to kDefaultHorizon.
+ const auto sni = horizonParams.sniName ? *horizonParams.sniName : "";
+ auto sniIter =
+ _sniToValidConfigPromiseMap
+ .emplace(sni,
+ std::make_shared<SharedPromise<std::shared_ptr<const IsMasterResponse>>>())
+ .first;
+ return sniIter->second->getFuture();
+ }
+ // Each awaitable isMaster will wait on their specific horizon. We always expect horizonString
+ // to exist in _horizonToTopologyChangePromiseMap.
+ auto horizonIter = _horizonToTopologyChangePromiseMap.find(*horizonString);
+ invariant(horizonIter != end(_horizonToTopologyChangePromiseMap));
+ return horizonIter->second->getFuture();
}
SharedSemiFuture<ReplicationCoordinatorImpl::SharedIsMasterResponse>
ReplicationCoordinatorImpl::getIsMasterResponseFuture(
const SplitHorizon::Parameters& horizonParams,
- boost::optional<TopologyVersion> clientTopologyVersion) const {
+ boost::optional<TopologyVersion> clientTopologyVersion) {
stdx::lock_guard lk(_mutex);
- return _getIsMasterResponseFuture(lk, horizonParams, clientTopologyVersion);
+ const auto horizonString = _getHorizonString(lk, horizonParams);
+ return _getIsMasterResponseFuture(lk, horizonParams, horizonString, clientTopologyVersion);
+}
+
+boost::optional<StringData> ReplicationCoordinatorImpl::_getHorizonString(
+ WithLock, const SplitHorizon::Parameters& horizonParams) const {
+ const auto myState = _topCoord->getMemberState();
+ const bool hasValidConfig = _rsConfig.isInitialized() && !myState.removed();
+ boost::optional<StringData> horizonString;
+ if (hasValidConfig) {
+ const auto& self = _rsConfig.getMemberAt(_selfIndex);
+ horizonString = self.determineHorizon(horizonParams);
+ }
+ // A horizonString that is boost::none indicates that we do not have a valid config.
+ return horizonString;
}
std::shared_ptr<const IsMasterResponse> ReplicationCoordinatorImpl::awaitIsMasterResponse(
OperationContext* opCtx,
const SplitHorizon::Parameters& horizonParams,
boost::optional<TopologyVersion> clientTopologyVersion,
- boost::optional<Date_t> deadline) const {
+ boost::optional<Date_t> deadline) {
stdx::unique_lock lk(_mutex);
- auto future = _getIsMasterResponseFuture(lk, horizonParams, clientTopologyVersion);
+ const auto horizonString = _getHorizonString(lk, horizonParams);
+ auto future =
+ _getIsMasterResponseFuture(lk, horizonParams, horizonString, clientTopologyVersion);
if (future.isReady()) {
return future.get();
}
// If clientTopologyVersion is not none, deadline must also be not none.
invariant(deadline);
- const auto myState = _topCoord->getMemberState();
- invariant(_rsConfig.isInitialized() && !myState.removed());
- const auto& self = _rsConfig.getMemberAt(_selfIndex);
- const StringData horizonString = self.determineHorizon(horizonParams);
const TopologyVersion topologyVersion = _topCoord->getTopologyVersion();
lk.unlock();
@@ -2233,7 +2249,10 @@ std::shared_ptr<const IsMasterResponse> ReplicationCoordinatorImpl::awaitIsMaste
// a topology change.
stdx::lock_guard lk(_mutex);
IsMasterMetrics::get(opCtx)->decrementNumAwaitingTopologyChanges();
- return _makeIsMasterResponse(horizonString, lk);
+ // A topology change has not occured within the deadline so horizonString is still a good
+ // indicator of whether we have a valid config.
+ const bool hasValidConfig = horizonString != boost::none;
+ return _makeIsMasterResponse(horizonString, lk, hasValidConfig);
}
// A topology change has happened so we return an IsMasterResponse with the updated
@@ -3620,7 +3639,6 @@ Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* opCt
_shouldSetStableTimestamp = true;
_setStableTimestampForStorage(lk);
}
-
_finishReplSetInitiate(opCtx, newConfig, myIndex.getValue());
// A configuration passed to replSetInitiate() with the current node as an arbiter
@@ -3651,30 +3669,82 @@ void ReplicationCoordinatorImpl::_setConfigState_inlock(ConfigState newState) {
}
}
-bool ReplicationCoordinatorImpl::_haveHorizonsChanged(const ReplSetConfig& oldConfig,
- const ReplSetConfig& newConfig,
- int oldIndex,
- int newIndex) {
- if (oldIndex < 0 || newIndex < 0) {
- // It's possible for index to be -1 if we are performing a reconfig via heartbeat.
- return false;
+void ReplicationCoordinatorImpl::_errorOnPromisesIfHorizonChanged(WithLock lk,
+ OperationContext* opCtx,
+ const ReplSetConfig& oldConfig,
+ const ReplSetConfig& newConfig,
+ int oldIndex,
+ int newIndex) {
+ if (newIndex < 0) {
+ // When a node is removed, always return an isMaster response indicating the server has no
+ // config set.
+ return;
+ }
+
+ // We were previously removed but are now rejoining the replica set.
+ if (_memberState.removed()) {
+ // Reply with an error to isMaster requests received while the node had an invalid config.
+ invariant(_horizonToTopologyChangePromiseMap.empty());
+
+ for (const auto& [sni, promise] : _sniToValidConfigPromiseMap) {
+ promise->setError({ErrorCodes::SplitHorizonChange,
+ "Received a reconfig that changed the horizon mappings."});
+ }
+ _sniToValidConfigPromiseMap.clear();
+ IsMasterMetrics::get(opCtx)->resetNumAwaitingTopologyChanges();
+ }
+
+ if (oldIndex >= 0 && newIndex >= 0) {
+ invariant(_sniToValidConfigPromiseMap.empty());
+
+ const auto oldHorizonMappings = oldConfig.getMemberAt(oldIndex).getHorizonMappings();
+ const auto newHorizonMappings = newConfig.getMemberAt(newIndex).getHorizonMappings();
+ if (oldHorizonMappings != newHorizonMappings) {
+ for (const auto& [horizon, promise] : _horizonToTopologyChangePromiseMap) {
+ promise->setError({ErrorCodes::SplitHorizonChange,
+ "Received a reconfig that changed the horizon mappings."});
+ }
+ _createHorizonTopologyChangePromiseMapping(lk);
+ IsMasterMetrics::get(opCtx)->resetNumAwaitingTopologyChanges();
+ }
}
- const auto oldHorizonMappings = oldConfig.getMemberAt(oldIndex).getHorizonMappings();
- const auto newHorizonMappings = newConfig.getMemberAt(newIndex).getHorizonMappings();
- return oldHorizonMappings != newHorizonMappings;
}
void ReplicationCoordinatorImpl::_fulfillTopologyChangePromise(WithLock lock) {
_topCoord->incrementTopologyVersion();
_cachedTopologyVersionCounter.store(_topCoord->getTopologyVersion().getCounter());
+ const auto myState = _topCoord->getMemberState();
+ const bool hasValidConfig = _rsConfig.isInitialized() && !myState.removed();
// Create an isMaster response for each horizon the server is knowledgeable about.
- for (auto iter = _horizonToPromiseMap.begin(); iter != _horizonToPromiseMap.end(); iter++) {
- auto response = _makeIsMasterResponse(iter->first, lock);
+ for (auto iter = _horizonToTopologyChangePromiseMap.begin();
+ iter != _horizonToTopologyChangePromiseMap.end();
+ iter++) {
+ StringData horizonString = iter->first;
+ auto response = _makeIsMasterResponse(horizonString, lock, hasValidConfig);
// Fulfill the promise and replace with a new one for future waiters.
iter->second->emplaceValue(response);
iter->second = std::make_shared<SharedPromise<std::shared_ptr<const IsMasterResponse>>>();
}
-
+ if (_selfIndex >= 0 && !_sniToValidConfigPromiseMap.empty()) {
+ // We are joining the replica set for the first time. Send back an error to isMaster
+ // requests that are waiting on a horizon that does not exist in the new config. Otherwise,
+ // reply with an updated isMaster response.
+ const auto& reverseHostMappings =
+ _rsConfig.getMemberAt(_selfIndex).getHorizonReverseHostMappings();
+ for (const auto& [sni, promise] : _sniToValidConfigPromiseMap) {
+ const auto iter = reverseHostMappings.find(sni);
+ if (!sni.empty() && iter == end(reverseHostMappings)) {
+ promise->setError({ErrorCodes::SplitHorizonChange,
+ "The original request horizon parameter does not exist in the "
+ "current replica set config"});
+ } else {
+ const auto horizon = sni.empty() ? SplitHorizon::kDefaultHorizon : iter->second;
+ const auto response = _makeIsMasterResponse(horizon, lock, hasValidConfig);
+ promise->emplaceValue(response);
+ }
+ }
+ _sniToValidConfigPromiseMap.clear();
+ }
IsMasterMetrics::get(getGlobalServiceContext())->resetNumAwaitingTopologyChanges();
}
@@ -4172,7 +4242,8 @@ ReplicationCoordinatorImpl::_setCurrentRSConfig(WithLock lk,
LOGV2_OPTIONS(21391, {logv2::LogTag::kStartupWarnings}, "");
}
- const bool horizonsChanged = _haveHorizonsChanged(oldConfig, newConfig, _selfIndex, myIndex);
+ // If the SplitHorizon has changed, reply to all waiting isMasters with an error.
+ _errorOnPromisesIfHorizonChanged(lk, opCtx, oldConfig, newConfig, _selfIndex, myIndex);
LOGV2_OPTIONS(21392,
{logv2::LogTag::kRS},
@@ -4189,18 +4260,6 @@ ReplicationCoordinatorImpl::_setCurrentRSConfig(WithLock lk,
LOGV2(21394, "This node is not a member of the config");
}
- if (horizonsChanged) {
- for (auto iter = _horizonToPromiseMap.begin(); iter != _horizonToPromiseMap.end(); iter++) {
- iter->second->setError({ErrorCodes::SplitHorizonChange,
- "Received a reconfig that changed the horizon parameters."});
- IsMasterMetrics::get(opCtx)->resetNumAwaitingTopologyChanges();
- }
- if (_selfIndex >= 0) {
- // Only create a new horizon promise mapping if the node exists in the new config.
- _createHorizonTopologyChangePromiseMapping(lk);
- }
- }
-
// Wake up writeConcern waiters that are no longer satisfiable due to the rsConfig change.
_replicationWaiterList.setValueIf_inlock(
[this](const OpTime& opTime, const SharedWaiterHandle& waiter) {
@@ -4223,15 +4282,15 @@ ReplicationCoordinatorImpl::_setCurrentRSConfig(WithLock lk,
// nodes in the set will contact us.
_startHeartbeats_inlock();
- if (_horizonToPromiseMap.empty()) {
+ if (_horizonToTopologyChangePromiseMap.empty()) {
// We should only create a new horizon-to-promise mapping for nodes that are members of
// the config.
_createHorizonTopologyChangePromiseMapping(lk);
}
} else {
- // Clear the horizon promise mappings of removed nodes so they can be recreated if the node
- // later rejoins the set.
- _horizonToPromiseMap.clear();
+ // Clear the horizon promise mappings of removed nodes so they can be recreated if the
+ // node later rejoins the set.
+ _horizonToTopologyChangePromiseMap.clear();
// If we're still REMOVED, clear the seedList.
_seedList.clear();
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 12e787b8112..82cf33ae8df 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -355,13 +355,13 @@ public:
virtual SharedSemiFuture<SharedIsMasterResponse> getIsMasterResponseFuture(
const SplitHorizon::Parameters& horizonParams,
- boost::optional<TopologyVersion> clientTopologyVersion) const override;
+ boost::optional<TopologyVersion> clientTopologyVersion) override;
virtual std::shared_ptr<const IsMasterResponse> awaitIsMasterResponse(
OperationContext* opCtx,
const SplitHorizon::Parameters& horizonParams,
boost::optional<TopologyVersion> clientTopologyVersion,
- boost::optional<Date_t> deadline) const override;
+ boost::optional<Date_t> deadline) override;
virtual OpTime getLatestWriteOpTime(OperationContext* opCtx) const override;
@@ -1018,10 +1018,12 @@ private:
/**
* Returns true if the horizon mappings between the oldConfig and newConfig are different.
*/
- bool _haveHorizonsChanged(const ReplSetConfig& oldConfig,
- const ReplSetConfig& newConfig,
- int oldIndex,
- int newIndex);
+ void _errorOnPromisesIfHorizonChanged(WithLock lk,
+ OperationContext* opCtx,
+ const ReplSetConfig& oldConfig,
+ const ReplSetConfig& newConfig,
+ int oldIndex,
+ int newIndex);
/**
* Fulfills the promises that are waited on by awaitable isMaster requests. This increments the
@@ -1168,17 +1170,28 @@ private:
StatusWith<int> myIndex);
/**
- * Fills an IsMasterResponse with the appropriate replication related fields.
+ * Fills an IsMasterResponse with the appropriate replication related fields. horizonString
+ * should be passed in if hasValidConfig is true.
*/
- std::shared_ptr<IsMasterResponse> _makeIsMasterResponse(const StringData horizonString,
- WithLock) const;
+ std::shared_ptr<IsMasterResponse> _makeIsMasterResponse(
+ boost::optional<StringData> horizonString, WithLock, const bool hasValidConfig) const;
+
/**
- * Creates a semi-future for isMasterResponse.
+ * Creates a semi-future for isMasterResponse. horizonString should be passed in if and only if
+ * the server is a valid member of the config.
*/
virtual SharedSemiFuture<SharedIsMasterResponse> _getIsMasterResponseFuture(
WithLock,
const SplitHorizon::Parameters& horizonParams,
- boost::optional<TopologyVersion> clientTopologyVersion) const;
+ boost::optional<StringData> horizonString,
+ boost::optional<TopologyVersion> clientTopologyVersion);
+
+ /**
+ * Returns the horizon string by parsing horizonParams if the node is a valid member of the
+ * replica set. Otherwise, return boost::none.
+ */
+ boost::optional<StringData> _getHorizonString(
+ WithLock, const SplitHorizon::Parameters& horizonParams) const;
/**
* Utility method that schedules or performs actions specified by a HeartbeatResponseAction
@@ -1472,8 +1485,15 @@ private:
// Waiters in this list are checked and notified on self's lastApplied opTime updates.
WaiterList _opTimeWaiterList; // (M)
- // Maps a horizon name to the promise waited on by exhaust isMaster requests.
- StringMap<std::shared_ptr<SharedPromiseOfIsMasterResponse>> _horizonToPromiseMap; // (M)
+ // Maps a horizon name to the promise waited on by awaitable isMaster requests when the node
+ // has an initialized replica set config and is an active member of the replica set.
+ StringMap<std::shared_ptr<SharedPromiseOfIsMasterResponse>>
+ _horizonToTopologyChangePromiseMap; // (M)
+
+ // Maps a requested SNI to the promise waited on by awaitable isMaster requests when the node
+ // has an unitialized replica set config or is removed. An empty SNI will map to a promise on
+ // the default horizon.
+ StringMap<std::shared_ptr<SharedPromiseOfIsMasterResponse>> _sniToValidConfigPromiseMap; // (M)
// Set to true when we are in the process of shutting down replication.
bool _inShutdown; // (M)
diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
index 9dd165ec9e0..e220a9c8bda 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
@@ -3183,6 +3183,7 @@ TEST_F(ReplCoordTest, AwaitIsMasterResponseReturnsOnStepDown) {
auto waitForIsMasterFailPoint = globalFailPointRegistry().find("waitForIsMasterResponse");
auto timesEnteredFailPoint = waitForIsMasterFailPoint->setMode(FailPoint::alwaysOn, 0);
+ ON_BLOCK_EXIT([&] { waitForIsMasterFailPoint->setMode(FailPoint::off, 0); });
// awaitIsMasterResponse blocks and waits on a future when the request TopologyVersion equals
// the current TopologyVersion of the server.
@@ -3325,6 +3326,7 @@ TEST_F(ReplCoordTest, AwaitIsMasterResponseReturnsErrorOnHorizonChange) {
auto waitForIsMasterFailPoint = globalFailPointRegistry().find("waitForIsMasterResponse");
auto timesEnteredFailPoint = waitForIsMasterFailPoint->setMode(FailPoint::alwaysOn, 0);
+ ON_BLOCK_EXIT([&] { waitForIsMasterFailPoint->setMode(FailPoint::off, 0); });
// awaitIsMasterResponse blocks and waits on a future when the request TopologyVersion equals
// the current TopologyVersion of the server.
@@ -3374,6 +3376,242 @@ TEST_F(ReplCoordTest, AwaitIsMasterResponseReturnsErrorOnHorizonChange) {
getIsMasterThread.join();
}
+TEST_F(ReplCoordTest, NonAwaitableIsMasterReturnsNoConfigsOnNodeWithUninitializedConfig) {
+ start();
+ auto opCtx = makeOperationContext();
+
+ const auto response = getReplCoord()->awaitIsMasterResponse(opCtx.get(), {}, {}, {});
+ ASSERT_FALSE(response->isMaster());
+ ASSERT_FALSE(response->isSecondary());
+ ASSERT_FALSE(response->isConfigSet());
+}
+
+TEST_F(ReplCoordTest, AwaitableIsMasterOnNodeWithUninitializedConfig) {
+ init("mySet");
+ start(HostAndPort("node1", 12345));
+ auto opCtx = makeOperationContext();
+
+ auto maxAwaitTime = Milliseconds(5000);
+ auto halfwayToDeadline = getNet()->now() + maxAwaitTime / 2;
+ auto deadline = getNet()->now() + maxAwaitTime;
+
+ bool isMasterReturned = false;
+ stdx::thread awaitIsMasterTimeout([&] {
+ const auto expectedTopologyVersion = getTopoCoord().getTopologyVersion();
+ const auto response = getReplCoord()->awaitIsMasterResponse(
+ opCtx.get(), {}, expectedTopologyVersion, deadline);
+ isMasterReturned = true;
+ auto responseTopologyVersion = response->getTopologyVersion();
+ ASSERT_EQUALS(expectedTopologyVersion.getProcessId(),
+ responseTopologyVersion->getProcessId());
+ ASSERT_EQUALS(expectedTopologyVersion.getCounter(), responseTopologyVersion->getCounter());
+ ASSERT_FALSE(response->isMaster());
+ ASSERT_FALSE(response->isSecondary());
+ ASSERT_FALSE(response->isConfigSet());
+ });
+
+ getNet()->enterNetwork();
+ getNet()->advanceTime(halfwayToDeadline);
+ ASSERT_EQUALS(halfwayToDeadline, getNet()->now());
+ ASSERT_FALSE(isMasterReturned);
+
+ getNet()->advanceTime(deadline);
+ ASSERT_EQUALS(deadline, getNet()->now());
+ awaitIsMasterTimeout.join();
+ ASSERT_TRUE(isMasterReturned);
+ getNet()->exitNetwork();
+
+ auto waitForIsMasterFailPoint = globalFailPointRegistry().find("waitForIsMasterResponse");
+ auto timesEnteredFailPoint = waitForIsMasterFailPoint->setMode(FailPoint::alwaysOn, 0);
+ ON_BLOCK_EXIT([&] { waitForIsMasterFailPoint->setMode(FailPoint::off, 0); });
+
+ deadline = getNet()->now() + maxAwaitTime;
+ stdx::thread awaitIsMasterInitiate([&] {
+ const auto topologyVersion = getTopoCoord().getTopologyVersion();
+ const auto response =
+ getReplCoord()->awaitIsMasterResponse(opCtx.get(), {}, topologyVersion, deadline);
+ auto responseTopologyVersion = response->getTopologyVersion();
+ ASSERT_EQUALS(topologyVersion.getProcessId(), responseTopologyVersion->getProcessId());
+ ASSERT_EQUALS(topologyVersion.getCounter() + 1, responseTopologyVersion->getCounter());
+ ASSERT_FALSE(response->isMaster());
+ ASSERT_FALSE(response->isSecondary());
+ ASSERT_TRUE(response->isConfigSet());
+ });
+
+ // Ensure that awaitIsMasterResponse() is called before initiating.
+ waitForIsMasterFailPoint->waitForTimesEntered(timesEnteredFailPoint + 1);
+
+ BSONObjBuilder result;
+ auto status =
+ getReplCoord()->processReplSetInitiate(opCtx.get(),
+ BSON("_id"
+ << "mySet"
+ << "version" << 1 << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "node1:12345"))),
+ &result);
+ ASSERT_OK(status);
+ awaitIsMasterInitiate.join();
+}
+
+TEST_F(ReplCoordTest, AwaitableIsMasterOnNodeWithUninitializedConfigDifferentTopologyVersion) {
+ start();
+ auto opCtx = makeOperationContext();
+
+ auto maxAwaitTime = Milliseconds(5000);
+ auto deadline = getNet()->now() + maxAwaitTime;
+ const auto currentTopologyVersion = getTopoCoord().getTopologyVersion();
+
+ // A request with a future TopologyVersion should error.
+ const auto futureTopologyVersion = TopologyVersion(currentTopologyVersion.getProcessId(),
+ currentTopologyVersion.getCounter() + 1);
+ ASSERT_GREATER_THAN(futureTopologyVersion.getCounter(), currentTopologyVersion.getCounter());
+ ASSERT_THROWS_CODE(
+ getReplCoord()->awaitIsMasterResponse(opCtx.get(), {}, futureTopologyVersion, deadline),
+ AssertionException,
+ 31382);
+
+ // A request with a stale TopologyVersion should return immediately with the current server
+ // TopologyVersion.
+ const auto staleTopologyVersion = TopologyVersion(currentTopologyVersion.getProcessId(),
+ currentTopologyVersion.getCounter() - 1);
+ ASSERT_LESS_THAN(staleTopologyVersion.getCounter(), currentTopologyVersion.getCounter());
+ auto response =
+ getReplCoord()->awaitIsMasterResponse(opCtx.get(), {}, staleTopologyVersion, deadline);
+ auto responseTopologyVersion = response->getTopologyVersion();
+ ASSERT_EQUALS(responseTopologyVersion->getCounter(), currentTopologyVersion.getCounter());
+ ASSERT_FALSE(response->isMaster());
+ ASSERT_FALSE(response->isSecondary());
+ ASSERT_FALSE(response->isConfigSet());
+
+ // A request with a different processId should return immediately with the server processId.
+ const auto differentPid = OID::gen();
+ ASSERT_NOT_EQUALS(differentPid, currentTopologyVersion.getProcessId());
+ auto topologyVersionWithDifferentProcessId =
+ TopologyVersion(differentPid, currentTopologyVersion.getCounter());
+ response = getReplCoord()->awaitIsMasterResponse(
+ opCtx.get(), {}, topologyVersionWithDifferentProcessId, deadline);
+ responseTopologyVersion = response->getTopologyVersion();
+ ASSERT_EQUALS(responseTopologyVersion->getProcessId(), currentTopologyVersion.getProcessId());
+ ASSERT_EQUALS(responseTopologyVersion->getCounter(), currentTopologyVersion.getCounter());
+ ASSERT_FALSE(response->isMaster());
+ ASSERT_FALSE(response->isSecondary());
+ ASSERT_FALSE(response->isConfigSet());
+
+ // A request with a future TopologyVersion but different processId should still return
+ // immediately.
+ topologyVersionWithDifferentProcessId =
+ TopologyVersion(differentPid, currentTopologyVersion.getCounter() + 1);
+ ASSERT_GREATER_THAN(topologyVersionWithDifferentProcessId.getCounter(),
+ currentTopologyVersion.getCounter());
+ response = getReplCoord()->awaitIsMasterResponse(
+ opCtx.get(), {}, topologyVersionWithDifferentProcessId, deadline);
+ responseTopologyVersion = response->getTopologyVersion();
+ ASSERT_EQUALS(responseTopologyVersion->getProcessId(), currentTopologyVersion.getProcessId());
+ ASSERT_EQUALS(responseTopologyVersion->getCounter(), currentTopologyVersion.getCounter());
+ ASSERT_FALSE(response->isMaster());
+ ASSERT_FALSE(response->isSecondary());
+ ASSERT_FALSE(response->isConfigSet());
+}
+
+TEST_F(ReplCoordTest, AwaitableIsMasterOnNodeWithUninitializedConfigInvalidHorizon) {
+ init("mySet");
+ start(HostAndPort("node1", 12345));
+ auto opCtx = makeOperationContext();
+
+ auto maxAwaitTime = Milliseconds(5000);
+ auto deadline = getNet()->now() + maxAwaitTime;
+
+ const std::string horizonSniName = "horizon.com";
+ const auto horizonParam = SplitHorizon::Parameters(horizonSniName);
+
+ // Send a non-awaitable isMaster.
+ const auto initialResponse = getReplCoord()->awaitIsMasterResponse(opCtx.get(), {}, {}, {});
+ ASSERT_FALSE(initialResponse->isMaster());
+ ASSERT_FALSE(initialResponse->isSecondary());
+ ASSERT_FALSE(initialResponse->isConfigSet());
+
+ auto waitForIsMasterFailPoint = globalFailPointRegistry().find("waitForIsMasterResponse");
+ auto timesEnteredFailPoint = waitForIsMasterFailPoint->setMode(FailPoint::alwaysOn, 0);
+ ON_BLOCK_EXIT([&] { waitForIsMasterFailPoint->setMode(FailPoint::off, 0); });
+
+ stdx::thread awaitIsMasterInitiate([&] {
+ const auto topologyVersion = getTopoCoord().getTopologyVersion();
+ ASSERT_THROWS_CODE(getReplCoord()->awaitIsMasterResponse(
+ opCtx.get(), horizonParam, topologyVersion, deadline),
+ AssertionException,
+ ErrorCodes::SplitHorizonChange);
+ });
+
+ // Ensure that the isMaster request has started waiting before initiating.
+ waitForIsMasterFailPoint->waitForTimesEntered(timesEnteredFailPoint + 1);
+
+ // Call replSetInitiate with no horizon configured. This should return an error to the isMaster
+ // request that is currently waiting on a horizonParam that doesn't exit in the config.
+ BSONObjBuilder result;
+ auto status =
+ getReplCoord()->processReplSetInitiate(opCtx.get(),
+ BSON("_id"
+ << "mySet"
+ << "version" << 1 << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "node1:12345"))),
+ &result);
+ ASSERT_OK(status);
+ awaitIsMasterInitiate.join();
+}
+
+TEST_F(ReplCoordTest, AwaitableIsMasterOnNodeWithUninitializedConfigSpecifiedHorizon) {
+ init("mySet");
+ start(HostAndPort("node1", 12345));
+ auto opCtx = makeOperationContext();
+
+ auto maxAwaitTime = Milliseconds(5000);
+ auto deadline = getNet()->now() + maxAwaitTime;
+
+ const std::string horizonSniName = "horizon.com";
+ const auto horizonParam = SplitHorizon::Parameters(horizonSniName);
+
+ auto waitForIsMasterFailPoint = globalFailPointRegistry().find("waitForIsMasterResponse");
+ auto timesEnteredFailPoint = waitForIsMasterFailPoint->setMode(FailPoint::alwaysOn, 0);
+ ON_BLOCK_EXIT([&] { waitForIsMasterFailPoint->setMode(FailPoint::off, 0); });
+
+ const std::string horizonOneSniName = "horizon1.com";
+ const auto horizonOne = SplitHorizon::Parameters(horizonOneSniName);
+ const auto horizonOneView = HostAndPort("horizon1.com:12345");
+ stdx::thread awaitIsMasterInitiate([&] {
+ const auto topologyVersion = getTopoCoord().getTopologyVersion();
+ const auto response = getReplCoord()->awaitIsMasterResponse(
+ opCtx.get(), horizonOne, topologyVersion, deadline);
+ auto responseTopologyVersion = response->getTopologyVersion();
+ const auto hosts = response->getHosts();
+ ASSERT_EQUALS(hosts[0], horizonOneView);
+ ASSERT_EQUALS(topologyVersion.getProcessId(), responseTopologyVersion->getProcessId());
+ ASSERT_EQUALS(topologyVersion.getCounter() + 1, responseTopologyVersion->getCounter());
+ ASSERT_FALSE(response->isMaster());
+ ASSERT_FALSE(response->isSecondary());
+ ASSERT_TRUE(response->isConfigSet());
+ });
+
+ waitForIsMasterFailPoint->waitForTimesEntered(timesEnteredFailPoint + 1);
+
+ // Call replSetInitiate with a horizon configured.
+ BSONObjBuilder result;
+ auto status = getReplCoord()->processReplSetInitiate(
+ opCtx.get(),
+ BSON("_id"
+ << "mySet"
+ << "version" << 1 << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "node1:12345"
+ << "horizons"
+ << BSON("horizon1"
+ << "horizon1.com:12345")))),
+ &result);
+ ASSERT_OK(status);
+ awaitIsMasterInitiate.join();
+}
+
TEST_F(ReplCoordTest, AwaitIsMasterUsesDefaultHorizonWhenRequestedHorizonNotFound) {
init();
const auto nodeOneHostName = "node1:12345";
@@ -3561,6 +3799,147 @@ TEST_F(ReplCoordTest, AwaitIsMasterRespondsWithNewHorizon) {
getNet()->exitNetwork();
}
+TEST_F(ReplCoordTest, IsMasterOnRemovedNode) {
+ init();
+ const auto nodeOneHostName = "node1:12345";
+ const auto nodeTwoHostName = "node2:12345";
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version" << 1 << "members"
+ << BSON_ARRAY(BSON("host" << nodeOneHostName << "_id" << 1)
+ << BSON("host" << nodeTwoHostName << "_id" << 2))),
+ HostAndPort(nodeOneHostName));
+
+ ReplicationCoordinatorImpl* replCoord = getReplCoord();
+ ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY));
+
+ getReplCoord()->cancelAndRescheduleElectionTimeout();
+
+ auto net = getNet();
+ enterNetwork();
+ ASSERT_TRUE(net->hasReadyRequests());
+ auto noi = net->getNextReadyRequest();
+ auto&& request = noi->getRequest();
+ ASSERT_EQUALS(HostAndPort(nodeTwoHostName), request.target);
+ ASSERT_EQUALS("replSetHeartbeat", request.cmdObj.firstElement().fieldNameStringData());
+
+ // Receive a config that excludes node1 and with node2 having a configured horizon.
+ ReplSetHeartbeatResponse hbResp;
+ ReplSetConfig removedFromConfig;
+ ASSERT_OK(removedFromConfig.initialize(
+ BSON("_id"
+ << "mySet"
+ << "protocolVersion" << 1 << "version" << 2 << "members"
+ << BSON_ARRAY(BSON("host" << nodeTwoHostName << "_id" << 2 << "horizons"
+ << BSON("horizon1"
+ << "testhorizon.com:100"))))));
+ hbResp.setConfig(removedFromConfig);
+ hbResp.setConfigVersion(2);
+ hbResp.setSetName("mySet");
+ hbResp.setState(MemberState::RS_SECONDARY);
+ hbResp.setAppliedOpTimeAndWallTime({OpTime(Timestamp(100, 1), 0), Date_t() + Seconds(100)});
+ hbResp.setDurableOpTimeAndWallTime({OpTime(Timestamp(100, 1), 0), Date_t() + Seconds(100)});
+ net->scheduleResponse(noi, net->now(), makeResponseStatus(hbResp.toBSON()));
+ net->runReadyNetworkOperations();
+ exitNetwork();
+
+ // node1 no longer exists in the replica set config.
+ ASSERT_OK(getReplCoord()->waitForMemberState(MemberState::RS_REMOVED, Seconds(1)));
+ ASSERT_EQUALS(removedFromConfig.getConfigVersion(),
+ getReplCoord()->getConfig().getConfigVersion());
+
+ const auto maxAwaitTime = Milliseconds(5000);
+ auto deadline = net->now() + maxAwaitTime;
+ auto opCtx = makeOperationContext();
+ const auto currentTopologyVersion = getTopoCoord().getTopologyVersion();
+
+ // Non-awaitable isMaster requests should return immediately.
+ auto response = getReplCoord()->awaitIsMasterResponse(opCtx.get(), {}, {}, {});
+ ASSERT_FALSE(response->isMaster());
+ ASSERT_FALSE(response->isSecondary());
+ ASSERT_FALSE(response->isConfigSet());
+
+ // A request with a future TopologyVersion should error.
+ const auto futureTopologyVersion = TopologyVersion(currentTopologyVersion.getProcessId(),
+ currentTopologyVersion.getCounter() + 1);
+ ASSERT_GREATER_THAN(futureTopologyVersion.getCounter(), currentTopologyVersion.getCounter());
+ ASSERT_THROWS_CODE(
+ getReplCoord()->awaitIsMasterResponse(opCtx.get(), {}, futureTopologyVersion, deadline),
+ AssertionException,
+ 31382);
+
+ // A request with a stale TopologyVersion should return immediately.
+ const auto staleTopologyVersion = TopologyVersion(currentTopologyVersion.getProcessId(),
+ currentTopologyVersion.getCounter() - 1);
+ ASSERT_LESS_THAN(staleTopologyVersion.getCounter(), currentTopologyVersion.getCounter());
+ response =
+ getReplCoord()->awaitIsMasterResponse(opCtx.get(), {}, staleTopologyVersion, deadline);
+ auto responseTopologyVersion = response->getTopologyVersion();
+ ASSERT_EQUALS(responseTopologyVersion->getCounter(), currentTopologyVersion.getCounter());
+ ASSERT_FALSE(response->isMaster());
+ ASSERT_FALSE(response->isSecondary());
+ ASSERT_FALSE(response->isConfigSet());
+
+ // A request with a different processId should return immediately with the server processId.
+ const auto differentPid = OID::gen();
+ ASSERT_NOT_EQUALS(differentPid, currentTopologyVersion.getProcessId());
+ auto topologyVersionWithDifferentProcessId =
+ TopologyVersion(differentPid, currentTopologyVersion.getCounter());
+ response = getReplCoord()->awaitIsMasterResponse(
+ opCtx.get(), {}, topologyVersionWithDifferentProcessId, deadline);
+ responseTopologyVersion = response->getTopologyVersion();
+ ASSERT_EQUALS(responseTopologyVersion->getProcessId(), currentTopologyVersion.getProcessId());
+ ASSERT_EQUALS(responseTopologyVersion->getCounter(), currentTopologyVersion.getCounter());
+ ASSERT_FALSE(response->isMaster());
+ ASSERT_FALSE(response->isSecondary());
+ ASSERT_FALSE(response->isConfigSet());
+
+ // A request with a future TopologyVersion but different processId should still return
+ // immediately.
+ topologyVersionWithDifferentProcessId =
+ TopologyVersion(differentPid, currentTopologyVersion.getCounter() + 1);
+ ASSERT_GREATER_THAN(topologyVersionWithDifferentProcessId.getCounter(),
+ currentTopologyVersion.getCounter());
+ response = getReplCoord()->awaitIsMasterResponse(
+ opCtx.get(), {}, topologyVersionWithDifferentProcessId, deadline);
+ responseTopologyVersion = response->getTopologyVersion();
+ ASSERT_EQUALS(responseTopologyVersion->getProcessId(), currentTopologyVersion.getProcessId());
+ ASSERT_EQUALS(responseTopologyVersion->getCounter(), currentTopologyVersion.getCounter());
+ ASSERT_FALSE(response->isMaster());
+ ASSERT_FALSE(response->isSecondary());
+ ASSERT_FALSE(response->isConfigSet());
+
+ bool isMasterReturned = false;
+ // A request with an equal TopologyVersion should wait and timeout once the deadline is reached.
+ const auto halfwayToDeadline = getNet()->now() + maxAwaitTime / 2;
+ stdx::thread getIsMasterThread([&] {
+ // Sending an isMaster request on a removed node should wait.
+ response = getReplCoord()->awaitIsMasterResponse(
+ opCtx.get(), {}, currentTopologyVersion, deadline);
+ isMasterReturned = true;
+ responseTopologyVersion = response->getTopologyVersion();
+ ASSERT_EQUALS(responseTopologyVersion->getCounter(), currentTopologyVersion.getCounter());
+ ASSERT_FALSE(response->isMaster());
+ ASSERT_FALSE(response->isSecondary());
+ ASSERT_FALSE(response->isConfigSet());
+ });
+
+ deadline = net->now() + maxAwaitTime;
+ net->enterNetwork();
+ // Set the network clock to a time before the deadline of the isMaster request. The request
+ // should still be waiting.
+ net->advanceTime(halfwayToDeadline);
+ ASSERT_EQUALS(halfwayToDeadline, net->now());
+ ASSERT_FALSE(isMasterReturned);
+
+ // Set the network clock to the deadline.
+ net->advanceTime(deadline);
+ ASSERT_EQUALS(deadline, net->now());
+ getIsMasterThread.join();
+ ASSERT_TRUE(isMasterReturned);
+ net->exitNetwork();
+}
+
TEST_F(ReplCoordTest, AwaitIsMasterRespondsCorrectlyWhenNodeRemovedAndReadded) {
init();
const auto nodeOneHostName = "node1:12345";
@@ -3584,13 +3963,17 @@ TEST_F(ReplCoordTest, AwaitIsMasterRespondsCorrectlyWhenNodeRemovedAndReadded) {
auto waitForIsMasterFailPoint = globalFailPointRegistry().find("waitForIsMasterResponse");
auto timesEnteredFailPoint = waitForIsMasterFailPoint->setMode(FailPoint::alwaysOn, 0);
+ ON_BLOCK_EXIT([&] { waitForIsMasterFailPoint->setMode(FailPoint::off, 0); });
stdx::thread getIsMasterWaitingForRemovedNodeThread([&] {
- const auto expectedTopologyVersion = getTopoCoord().getTopologyVersion();
+ const auto topologyVersion = getTopoCoord().getTopologyVersion();
// The isMaster response should indicate that the node does not have a valid replica set
// config.
- const auto response = getReplCoord()->awaitIsMasterResponse(
- opCtx.get(), {}, expectedTopologyVersion, deadline);
+ const auto response =
+ getReplCoord()->awaitIsMasterResponse(opCtx.get(), {}, topologyVersion, deadline);
+ const auto responseTopologyVersion = response->getTopologyVersion();
+ ASSERT_EQUALS(responseTopologyVersion->getProcessId(), topologyVersion.getProcessId());
+ ASSERT_EQUALS(responseTopologyVersion->getCounter(), topologyVersion.getCounter() + 1);
ASSERT_FALSE(response->isMaster());
ASSERT_FALSE(response->isSecondary());
ASSERT_FALSE(response->isConfigSet());
@@ -3631,32 +4014,19 @@ TEST_F(ReplCoordTest, AwaitIsMasterRespondsCorrectlyWhenNodeRemovedAndReadded) {
ASSERT_EQUALS(removedFromConfig.getConfigVersion(),
getReplCoord()->getConfig().getConfigVersion());
getIsMasterWaitingForRemovedNodeThread.join();
+ const std::string newHorizonSniName = "newhorizon.com";
+ auto newHorizon = SplitHorizon::Parameters(newHorizonSniName);
- const std::string testHorizonSniName = "testhorizon.com";
- auto newHorizon = SplitHorizon::Parameters(testHorizonSniName);
stdx::thread getIsMasterThread([&] {
const auto expectedTopologyVersion = getTopoCoord().getTopologyVersion();
- // Sending an isMaster request on a removed node should still indicate that the node does
- // not have a valid replica set reconfig.
- const auto response = getReplCoord()->awaitIsMasterResponse(
- opCtx.get(), newHorizon, expectedTopologyVersion, deadline);
- auto topologyVersion = response->getTopologyVersion();
- ASSERT_FALSE(response->isMaster());
- ASSERT_FALSE(response->isSecondary());
- ASSERT_FALSE(response->isConfigSet());
+ // Wait for the node to be readded to the set. This should return an error.
+ ASSERT_THROWS_CODE(getReplCoord()->awaitIsMasterResponse(
+ opCtx.get(), {}, expectedTopologyVersion, deadline),
+ AssertionException,
+ ErrorCodes::SplitHorizonChange);
});
+ waitForIsMasterFailPoint->waitForTimesEntered(timesEnteredFailPoint + 2);
- deadline = net->now() + maxAwaitTime;
- // Set the network clock to the timeout deadline of awaitIsMasterResponse.
- net->enterNetwork();
- net->advanceTime(deadline);
- ASSERT_EQUALS(deadline, net->now());
- getIsMasterThread.join();
- ASSERT_FALSE(net->hasReadyRequests());
- net->exitNetwork();
-
- const std::string newHorizonSniName = "newhorizon.com";
- newHorizon = SplitHorizon::Parameters(newHorizonSniName);
const auto newHorizonNodeOne = "newhorizon.com:100";
const auto newHorizonNodeTwo = "newhorizon.com:200";
@@ -3682,6 +4052,7 @@ TEST_F(ReplCoordTest, AwaitIsMasterRespondsCorrectlyWhenNodeRemovedAndReadded) {
replyToReceivedHeartbeatV1();
reconfigThread.join();
ASSERT_OK(getReplCoord()->waitForMemberState(MemberState::RS_SECONDARY, Seconds(1)));
+ getIsMasterThread.join();
stdx::thread getIsMasterThreadNewHorizon([&] {
const auto expectedTopologyVersion = getTopoCoord().getTopologyVersion();
@@ -3739,6 +4110,7 @@ TEST_F(ReplCoordTest, AwaitIsMasterResponseReturnsOnElectionTimeout) {
auto waitForIsMasterFailPoint = globalFailPointRegistry().find("waitForIsMasterResponse");
auto timesEnteredFailPoint = waitForIsMasterFailPoint->setMode(FailPoint::alwaysOn, 0);
+ ON_BLOCK_EXIT([&] { waitForIsMasterFailPoint->setMode(FailPoint::off, 0); });
// awaitIsMasterResponse blocks and waits on a future when the request TopologyVersion equals
// the current TopologyVersion of the server.
@@ -3803,6 +4175,7 @@ TEST_F(ReplCoordTest, AwaitIsMasterResponseReturnsOnElectionWin) {
auto waitForIsMasterFailPoint = globalFailPointRegistry().find("waitForIsMasterResponse");
auto timesEnteredFailPoint = waitForIsMasterFailPoint->setMode(FailPoint::alwaysOn, 0);
+ ON_BLOCK_EXIT([&] { waitForIsMasterFailPoint->setMode(FailPoint::off, 0); });
// awaitIsMasterResponse blocks and waits on a future when the request TopologyVersion equals
// the current TopologyVersion of the server.
@@ -3896,6 +4269,7 @@ TEST_F(ReplCoordTest, AwaitIsMasterResponseReturnsOnElectionWinWithReconfig) {
globalFailPointRegistry().find("hangAfterReconfigOnDrainComplete");
auto timesEnteredFailPoint = waitForIsMasterFailPoint->setMode(FailPoint::alwaysOn);
+ ON_BLOCK_EXIT([&] { waitForIsMasterFailPoint->setMode(FailPoint::off, 0); });
// awaitIsMasterResponse blocks and waits on a future when the request TopologyVersion equals
// the current TopologyVersion of the server.
@@ -4384,6 +4758,7 @@ TEST_F(ReplCoordTest, AwaitIsMasterResponseReturnsOnReplSetReconfig) {
auto waitForIsMasterFailPoint = globalFailPointRegistry().find("waitForIsMasterResponse");
auto timesEnteredFailPoint = waitForIsMasterFailPoint->setMode(FailPoint::alwaysOn, 0);
+ ON_BLOCK_EXIT([&] { waitForIsMasterFailPoint->setMode(FailPoint::off, 0); });
// awaitIsMasterResponse blocks and waits on a future when the request TopologyVersion equals
// the current TopologyVersion of the server.
@@ -4534,6 +4909,7 @@ TEST_F(ReplCoordTest, AwaitIsMasterResponseReturnsOnReplSetReconfigOnSecondary)
auto waitForIsMasterFailPoint = globalFailPointRegistry().find("waitForIsMasterResponse");
auto timesEnteredFailPoint = waitForIsMasterFailPoint->setMode(FailPoint::alwaysOn, 0);
+ ON_BLOCK_EXIT([&] { waitForIsMasterFailPoint->setMode(FailPoint::off, 0); });
// awaitIsMasterResponse blocks and waits on a future when the request TopologyVersion equals
// the current TopologyVersion of the server.
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index 9d8e94643fc..e30c8f0b417 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -577,7 +577,7 @@ void ReplicationCoordinatorMock::incrementTopologyVersion() {
SharedSemiFuture<std::shared_ptr<const IsMasterResponse>>
ReplicationCoordinatorMock::getIsMasterResponseFuture(
const SplitHorizon::Parameters& horizonParams,
- boost::optional<TopologyVersion> clientTopologyVersion) const {
+ boost::optional<TopologyVersion> clientTopologyVersion) {
auto response =
awaitIsMasterResponse(nullptr, horizonParams, clientTopologyVersion, Date_t::now());
return SharedSemiFuture<std::shared_ptr<const IsMasterResponse>>(
@@ -588,7 +588,7 @@ std::shared_ptr<const IsMasterResponse> ReplicationCoordinatorMock::awaitIsMaste
OperationContext* opCtx,
const SplitHorizon::Parameters& horizonParams,
boost::optional<TopologyVersion> clientTopologyVersion,
- boost::optional<Date_t> deadline) const {
+ boost::optional<Date_t> deadline) {
auto response = std::make_shared<IsMasterResponse>();
response->setReplSetVersion(_getConfigReturnValue.getConfigVersion());
response->setIsMaster(true);
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index 055069cdb24..9b9aab0017f 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -336,11 +336,11 @@ public:
OperationContext* opCtx,
const SplitHorizon::Parameters& horizonParams,
boost::optional<TopologyVersion> clientTopologyVersion,
- boost::optional<Date_t> deadline) const override;
+ boost::optional<Date_t> deadline) override;
virtual SharedSemiFuture<std::shared_ptr<const IsMasterResponse>> getIsMasterResponseFuture(
const SplitHorizon::Parameters& horizonParams,
- boost::optional<TopologyVersion> clientTopologyVersion) const override;
+ boost::optional<TopologyVersion> clientTopologyVersion) override;
virtual OpTime getLatestWriteOpTime(OperationContext* opCtx) const override;
diff --git a/src/mongo/db/repl/replication_coordinator_noop.cpp b/src/mongo/db/repl/replication_coordinator_noop.cpp
index 48969b8a365..902dd065fb8 100644
--- a/src/mongo/db/repl/replication_coordinator_noop.cpp
+++ b/src/mongo/db/repl/replication_coordinator_noop.cpp
@@ -488,7 +488,7 @@ std::shared_ptr<const IsMasterResponse> ReplicationCoordinatorNoOp::awaitIsMaste
OperationContext* opCtx,
const SplitHorizon::Parameters& horizonParams,
boost::optional<TopologyVersion> clientTopologyVersion,
- boost::optional<Date_t> deadline) const {
+ boost::optional<Date_t> deadline) {
MONGO_UNREACHABLE;
}
@@ -496,7 +496,7 @@ std::shared_ptr<const IsMasterResponse> ReplicationCoordinatorNoOp::awaitIsMaste
SharedSemiFuture<std::shared_ptr<const IsMasterResponse>>
ReplicationCoordinatorNoOp::getIsMasterResponseFuture(
const SplitHorizon::Parameters& horizonParams,
- boost::optional<TopologyVersion> clientTopologyVersion) const {
+ boost::optional<TopologyVersion> clientTopologyVersion) {
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/db/repl/replication_coordinator_noop.h b/src/mongo/db/repl/replication_coordinator_noop.h
index 4686deca20f..e672a820757 100644
--- a/src/mongo/db/repl/replication_coordinator_noop.h
+++ b/src/mongo/db/repl/replication_coordinator_noop.h
@@ -271,11 +271,11 @@ public:
OperationContext* opCtx,
const SplitHorizon::Parameters& horizonParams,
boost::optional<TopologyVersion> clientTopologyVersion,
- boost::optional<Date_t> deadline) const final;
+ boost::optional<Date_t> deadline) final;
SharedSemiFuture<std::shared_ptr<const IsMasterResponse>> getIsMasterResponseFuture(
const SplitHorizon::Parameters& horizonParams,
- boost::optional<TopologyVersion> clientTopologyVersion) const final;
+ boost::optional<TopologyVersion> clientTopologyVersion) final;
OpTime getLatestWriteOpTime(OperationContext* opCtx) const override;
diff --git a/src/mongo/embedded/replication_coordinator_embedded.cpp b/src/mongo/embedded/replication_coordinator_embedded.cpp
index 1f569487739..cee4e8a51f3 100644
--- a/src/mongo/embedded/replication_coordinator_embedded.cpp
+++ b/src/mongo/embedded/replication_coordinator_embedded.cpp
@@ -515,14 +515,14 @@ std::shared_ptr<const repl::IsMasterResponse> ReplicationCoordinatorEmbedded::aw
OperationContext* opCtx,
const repl::SplitHorizon::Parameters& horizonParams,
boost::optional<TopologyVersion> previous,
- boost::optional<Date_t> deadline) const {
+ boost::optional<Date_t> deadline) {
UASSERT_NOT_IMPLEMENTED;
};
SharedSemiFuture<std::shared_ptr<const IsMasterResponse>>
ReplicationCoordinatorEmbedded::getIsMasterResponseFuture(
const SplitHorizon::Parameters& horizonParams,
- boost::optional<TopologyVersion> clientTopologyVersion) const {
+ boost::optional<TopologyVersion> clientTopologyVersion) {
UASSERT_NOT_IMPLEMENTED;
}
diff --git a/src/mongo/embedded/replication_coordinator_embedded.h b/src/mongo/embedded/replication_coordinator_embedded.h
index 862e3da1fc4..ff32320a720 100644
--- a/src/mongo/embedded/replication_coordinator_embedded.h
+++ b/src/mongo/embedded/replication_coordinator_embedded.h
@@ -279,11 +279,11 @@ public:
OperationContext* opCtx,
const repl::SplitHorizon::Parameters& horizonParams,
boost::optional<TopologyVersion> previous,
- boost::optional<Date_t> deadline) const override;
+ boost::optional<Date_t> deadline) override;
virtual SharedSemiFuture<std::shared_ptr<const repl::IsMasterResponse>>
getIsMasterResponseFuture(const repl::SplitHorizon::Parameters& horizonParams,
- boost::optional<TopologyVersion> clientTopologyVersion) const;
+ boost::optional<TopologyVersion> clientTopologyVersion);
repl::OpTime getLatestWriteOpTime(OperationContext* opCtx) const override;