summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/client/SConscript2
-rw-r--r--src/mongo/client/streamable_replica_set_monitor.cpp5
-rw-r--r--src/mongo/client/streamable_replica_set_monitor.h5
-rw-r--r--src/mongo/client/streamable_replica_set_monitor_discovery_time_processor.cpp67
-rw-r--r--src/mongo/client/streamable_replica_set_monitor_discovery_time_processor.h49
-rw-r--r--src/mongo/client/streamable_replica_set_monitor_discovery_time_processor_test.cpp100
6 files changed, 228 insertions, 0 deletions
diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript
index 0b1f03cc2e1..5010be181f0 100644
--- a/src/mongo/client/SConscript
+++ b/src/mongo/client/SConscript
@@ -225,6 +225,7 @@ clientDriverEnv.Library(
'streamable_replica_set_monitor_error_handler.cpp',
'server_discovery_monitor.cpp',
'server_ping_monitor.cpp',
+ 'streamable_replica_set_monitor_discovery_time_processor.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/client/sdam/sdam',
@@ -369,6 +370,7 @@ if wiredtiger:
'server_discovery_monitor_test.cpp',
'server_ping_monitor_test.cpp',
'streamable_replica_set_monitor_error_handler_test.cpp',
+ 'streamable_replica_set_monitor_discovery_time_processor_test.cpp'
],
LIBDEPS=[
'$BUILD_DIR/mongo/client/sdam/sdam',
diff --git a/src/mongo/client/streamable_replica_set_monitor.cpp b/src/mongo/client/streamable_replica_set_monitor.cpp
index 20f21ea9b3d..d600e46c5f3 100644
--- a/src/mongo/client/streamable_replica_set_monitor.cpp
+++ b/src/mongo/client/streamable_replica_set_monitor.cpp
@@ -40,6 +40,7 @@
#include "mongo/client/connpool.h"
#include "mongo/client/global_conn_pool.h"
#include "mongo/client/read_preference.h"
+#include "mongo/client/streamable_replica_set_monitor_discovery_time_processor.h"
#include "mongo/client/streamable_replica_set_monitor_query_processor.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/bson_extract_optime.h"
@@ -190,6 +191,8 @@ StreamableReplicaSetMonitor::StreamableReplicaSetMonitor(
: ReplicaSetMonitor(cleanupCallback),
_errorHandler(std::make_unique<SdamErrorHandler>(uri.getSetName())),
_queryProcessor(std::make_shared<StreamableReplicaSetMonitorQueryProcessor>()),
+ _primaryDiscoveryTimeProcessor(
+ std::make_shared<StreamableReplicaSetMonitorDiscoveryTimeProcessor>()),
_uri(uri),
_connectionManager(connectionManager),
_executor(executor),
@@ -267,6 +270,8 @@ void StreamableReplicaSetMonitor::init() {
_eventsPublisher->registerListener(_queryProcessor);
+ _eventsPublisher->registerListener(_primaryDiscoveryTimeProcessor);
+
_isDropped.store(false);
ReplicaSetMonitorManager::get()->getNotifier().onFoundSet(getName());
diff --git a/src/mongo/client/streamable_replica_set_monitor.h b/src/mongo/client/streamable_replica_set_monitor.h
index 8f16013daa3..1557b3eec8a 100644
--- a/src/mongo/client/streamable_replica_set_monitor.h
+++ b/src/mongo/client/streamable_replica_set_monitor.h
@@ -139,12 +139,16 @@ public:
bool isKnownToHaveGoodPrimary() const override;
void runScanForMockReplicaSet() override;
+ class StreamableReplicaSetMonitorDiscoveryTimeProcessor;
private:
class StreamableReplicaSetMonitorQueryProcessor;
using StreamableReplicaSetMontiorQueryProcessorPtr =
std::shared_ptr<StreamableReplicaSetMonitor::StreamableReplicaSetMonitorQueryProcessor>;
+ using StreamableReplicaSetMonitorDiscoveryTimeProcessorPtr = std::shared_ptr<
+ StreamableReplicaSetMonitor::StreamableReplicaSetMonitorDiscoveryTimeProcessor>;
+
struct HostQuery {
HostQuery(std::shared_ptr<ReplicaSetMonitorStats> stats)
: statsCollector(stats->collectGetHostAndRefreshStats()) {}
@@ -297,6 +301,7 @@ private:
// any outstanding queries for this RSM instance.
StreamableReplicaSetMontiorQueryProcessorPtr _queryProcessor;
+ StreamableReplicaSetMonitorDiscoveryTimeProcessorPtr _primaryDiscoveryTimeProcessor;
const MongoURI _uri;
std::shared_ptr<executor::EgressTagCloser> _connectionManager;
diff --git a/src/mongo/client/streamable_replica_set_monitor_discovery_time_processor.cpp b/src/mongo/client/streamable_replica_set_monitor_discovery_time_processor.cpp
new file mode 100644
index 00000000000..53ad894f94e
--- /dev/null
+++ b/src/mongo/client/streamable_replica_set_monitor_discovery_time_processor.cpp
@@ -0,0 +1,67 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kNetwork
+#include "mongo/client/streamable_replica_set_monitor_discovery_time_processor.h"
+
+#include <memory>
+
+#include "mongo/client/global_conn_pool.h"
+#include "mongo/logv2/log.h"
+
+namespace mongo {
+
+void StreamableReplicaSetMonitor::StreamableReplicaSetMonitorDiscoveryTimeProcessor::
+ onTopologyDescriptionChangedEvent(sdam::TopologyDescriptionPtr previousDescription,
+ sdam::TopologyDescriptionPtr newDescription) {
+
+
+ const auto oldPrimary = previousDescription->getPrimary();
+ const auto oldHost = oldPrimary ? (*oldPrimary)->getAddress().toString() : "Unknown";
+
+ const auto newPrimary = newDescription->getPrimary();
+ const auto newHost = newPrimary ? (*newPrimary)->getAddress().toString() : "Unknown";
+
+ if (newHost != oldHost) {
+ stdx::lock_guard lock(_mutex);
+ LOGV2(6006301,
+ "Replica set primary server change detected",
+ "replicaSet"_attr = newDescription->getSetName(),
+ "topologyType"_attr = newDescription->getType(),
+ "primary"_attr = newHost,
+ "durationMillis"_attr = _elapsedTime.millis());
+ _elapsedTime.reset();
+ }
+}
+const Milliseconds StreamableReplicaSetMonitor::StreamableReplicaSetMonitorDiscoveryTimeProcessor::
+ getPrimaryServerChangeElapsedTime() const {
+ stdx::lock_guard lock(_mutex);
+ return Milliseconds(_elapsedTime.millis());
+}
+
+}; // namespace mongo
diff --git a/src/mongo/client/streamable_replica_set_monitor_discovery_time_processor.h b/src/mongo/client/streamable_replica_set_monitor_discovery_time_processor.h
new file mode 100644
index 00000000000..ab786a5cb73
--- /dev/null
+++ b/src/mongo/client/streamable_replica_set_monitor_discovery_time_processor.h
@@ -0,0 +1,49 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#pragma once
+
+#include "mongo/client/sdam/sdam.h"
+#include "mongo/client/streamable_replica_set_monitor.h"
+
+namespace mongo {
+class StreamableReplicaSetMonitor::StreamableReplicaSetMonitorDiscoveryTimeProcessor final
+ : public sdam::TopologyListener {
+public:
+ void onTopologyDescriptionChangedEvent(sdam::TopologyDescriptionPtr previousDescription,
+ sdam::TopologyDescriptionPtr newDescription) override;
+
+ const Milliseconds getPrimaryServerChangeElapsedTime() const;
+
+private:
+ mutable Mutex _mutex =
+ MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0),
+ "StreamableReplicaSetMonitorDiscoveryTimeProcessor::_mutex");
+ Timer _elapsedTime;
+};
+} // namespace mongo
diff --git a/src/mongo/client/streamable_replica_set_monitor_discovery_time_processor_test.cpp b/src/mongo/client/streamable_replica_set_monitor_discovery_time_processor_test.cpp
new file mode 100644
index 00000000000..caff43580f0
--- /dev/null
+++ b/src/mongo/client/streamable_replica_set_monitor_discovery_time_processor_test.cpp
@@ -0,0 +1,100 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include <boost/optional/optional_io.hpp>
+
+#include "mongo/client/sdam/sdam_test_base.h"
+#include "mongo/client/sdam/server_description_builder.h"
+#include "mongo/client/sdam/topology_description.h"
+#include "mongo/client/sdam/topology_manager.h"
+#include "mongo/client/streamable_replica_set_monitor_discovery_time_processor.h"
+
+namespace mongo {
+
+class PrimaryServerDiscoveryTime : public SdamTestFixture {
+public:
+ const SdamConfiguration sdamConfiguration;
+
+ PrimaryServerDiscoveryTime() : sdamConfiguration(SdamConfiguration({{HostAndPort("s0")}})) {}
+ static inline const OID kOidOne{"000000000000000000000001"};
+ SdamServerSelector selector = SdamServerSelector(sdamConfiguration);
+ StreamableReplicaSetMonitor::StreamableReplicaSetMonitorDiscoveryTimeProcessor
+ _rsmTimeProcessor;
+};
+TEST_F(PrimaryServerDiscoveryTime, ShouldFilterByLastWriteTime2) {
+ TopologyStateMachine stateMachine(sdamConfiguration);
+ auto topologyDescription = std::make_shared<TopologyDescription>(sdamConfiguration);
+
+ const auto s0 = ServerDescriptionBuilder()
+ .withAddress(HostAndPort("s0"))
+ .withType(ServerType::kRSPrimary)
+ .withRtt(sdamConfiguration.getLocalThreshold())
+ .withSetName("set")
+ .withHost(HostAndPort("s0"))
+ .withHost(HostAndPort("s1"))
+ .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG)
+ .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION)
+ .withElectionId(kOidOne)
+ .withSetVersion(100)
+ .instance();
+ stateMachine.onServerDescription(*topologyDescription, s0);
+
+ auto newTopology = TopologyDescription::clone(*topologyDescription);
+ const auto s1 = ServerDescriptionBuilder()
+ .withAddress(HostAndPort("s0"))
+ .withType(ServerType::kRSSecondary)
+ .withRtt(sdamConfiguration.getLocalThreshold())
+ .withSetName("set")
+ .withHost(HostAndPort("s0"))
+ .withHost(HostAndPort("s1"))
+ .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG)
+ .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION)
+ .withElectionId(kOidOne)
+ .withSetVersion(100)
+ .instance();
+
+ stateMachine.onServerDescription(*newTopology, s1);
+
+ // no timer reset because primary server didn't change
+ auto beforeElapsedDuration = _rsmTimeProcessor.getPrimaryServerChangeElapsedTime();
+ sleepFor(Milliseconds(100));
+ _rsmTimeProcessor.onTopologyDescriptionChangedEvent(topologyDescription, topologyDescription);
+ auto afterElapsedDuration = _rsmTimeProcessor.getPrimaryServerChangeElapsedTime();
+ ASSERT_TRUE(afterElapsedDuration > beforeElapsedDuration);
+
+ // timer reset because of primary server change
+ beforeElapsedDuration = _rsmTimeProcessor.getPrimaryServerChangeElapsedTime();
+ sleepFor(Milliseconds(100));
+ _rsmTimeProcessor.onTopologyDescriptionChangedEvent(topologyDescription, newTopology);
+ afterElapsedDuration = _rsmTimeProcessor.getPrimaryServerChangeElapsedTime();
+ ASSERT_TRUE(afterElapsedDuration <
+ beforeElapsedDuration); // afterElapsedDuration was just reset
+}
+
+} // namespace mongo