diff options
6 files changed, 228 insertions, 0 deletions
diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript index 0b1f03cc2e1..5010be181f0 100644 --- a/src/mongo/client/SConscript +++ b/src/mongo/client/SConscript @@ -225,6 +225,7 @@ clientDriverEnv.Library( 'streamable_replica_set_monitor_error_handler.cpp', 'server_discovery_monitor.cpp', 'server_ping_monitor.cpp', + 'streamable_replica_set_monitor_discovery_time_processor.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/client/sdam/sdam', @@ -369,6 +370,7 @@ if wiredtiger: 'server_discovery_monitor_test.cpp', 'server_ping_monitor_test.cpp', 'streamable_replica_set_monitor_error_handler_test.cpp', + 'streamable_replica_set_monitor_discovery_time_processor_test.cpp' ], LIBDEPS=[ '$BUILD_DIR/mongo/client/sdam/sdam', diff --git a/src/mongo/client/streamable_replica_set_monitor.cpp b/src/mongo/client/streamable_replica_set_monitor.cpp index 20f21ea9b3d..d600e46c5f3 100644 --- a/src/mongo/client/streamable_replica_set_monitor.cpp +++ b/src/mongo/client/streamable_replica_set_monitor.cpp @@ -40,6 +40,7 @@ #include "mongo/client/connpool.h" #include "mongo/client/global_conn_pool.h" #include "mongo/client/read_preference.h" +#include "mongo/client/streamable_replica_set_monitor_discovery_time_processor.h" #include "mongo/client/streamable_replica_set_monitor_query_processor.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/bson_extract_optime.h" @@ -190,6 +191,8 @@ StreamableReplicaSetMonitor::StreamableReplicaSetMonitor( : ReplicaSetMonitor(cleanupCallback), _errorHandler(std::make_unique<SdamErrorHandler>(uri.getSetName())), _queryProcessor(std::make_shared<StreamableReplicaSetMonitorQueryProcessor>()), + _primaryDiscoveryTimeProcessor( + std::make_shared<StreamableReplicaSetMonitorDiscoveryTimeProcessor>()), _uri(uri), _connectionManager(connectionManager), _executor(executor), @@ -267,6 +270,8 @@ void StreamableReplicaSetMonitor::init() { _eventsPublisher->registerListener(_queryProcessor); + _eventsPublisher->registerListener(_primaryDiscoveryTimeProcessor); + _isDropped.store(false); ReplicaSetMonitorManager::get()->getNotifier().onFoundSet(getName()); diff --git a/src/mongo/client/streamable_replica_set_monitor.h b/src/mongo/client/streamable_replica_set_monitor.h index 8f16013daa3..1557b3eec8a 100644 --- a/src/mongo/client/streamable_replica_set_monitor.h +++ b/src/mongo/client/streamable_replica_set_monitor.h @@ -139,12 +139,16 @@ public: bool isKnownToHaveGoodPrimary() const override; void runScanForMockReplicaSet() override; + class StreamableReplicaSetMonitorDiscoveryTimeProcessor; private: class StreamableReplicaSetMonitorQueryProcessor; using StreamableReplicaSetMontiorQueryProcessorPtr = std::shared_ptr<StreamableReplicaSetMonitor::StreamableReplicaSetMonitorQueryProcessor>; + using StreamableReplicaSetMonitorDiscoveryTimeProcessorPtr = std::shared_ptr< + StreamableReplicaSetMonitor::StreamableReplicaSetMonitorDiscoveryTimeProcessor>; + struct HostQuery { HostQuery(std::shared_ptr<ReplicaSetMonitorStats> stats) : statsCollector(stats->collectGetHostAndRefreshStats()) {} @@ -297,6 +301,7 @@ private: // any outstanding queries for this RSM instance. StreamableReplicaSetMontiorQueryProcessorPtr _queryProcessor; + StreamableReplicaSetMonitorDiscoveryTimeProcessorPtr _primaryDiscoveryTimeProcessor; const MongoURI _uri; std::shared_ptr<executor::EgressTagCloser> _connectionManager; diff --git a/src/mongo/client/streamable_replica_set_monitor_discovery_time_processor.cpp b/src/mongo/client/streamable_replica_set_monitor_discovery_time_processor.cpp new file mode 100644 index 00000000000..53ad894f94e --- /dev/null +++ b/src/mongo/client/streamable_replica_set_monitor_discovery_time_processor.cpp @@ -0,0 +1,67 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kNetwork +#include "mongo/client/streamable_replica_set_monitor_discovery_time_processor.h" + +#include <memory> + +#include "mongo/client/global_conn_pool.h" +#include "mongo/logv2/log.h" + +namespace mongo { + +void StreamableReplicaSetMonitor::StreamableReplicaSetMonitorDiscoveryTimeProcessor:: + onTopologyDescriptionChangedEvent(sdam::TopologyDescriptionPtr previousDescription, + sdam::TopologyDescriptionPtr newDescription) { + + + const auto oldPrimary = previousDescription->getPrimary(); + const auto oldHost = oldPrimary ? (*oldPrimary)->getAddress().toString() : "Unknown"; + + const auto newPrimary = newDescription->getPrimary(); + const auto newHost = newPrimary ? (*newPrimary)->getAddress().toString() : "Unknown"; + + if (newHost != oldHost) { + stdx::lock_guard lock(_mutex); + LOGV2(6006301, + "Replica set primary server change detected", + "replicaSet"_attr = newDescription->getSetName(), + "topologyType"_attr = newDescription->getType(), + "primary"_attr = newHost, + "durationMillis"_attr = _elapsedTime.millis()); + _elapsedTime.reset(); + } +} +const Milliseconds StreamableReplicaSetMonitor::StreamableReplicaSetMonitorDiscoveryTimeProcessor:: + getPrimaryServerChangeElapsedTime() const { + stdx::lock_guard lock(_mutex); + return Milliseconds(_elapsedTime.millis()); +} + +}; // namespace mongo diff --git a/src/mongo/client/streamable_replica_set_monitor_discovery_time_processor.h b/src/mongo/client/streamable_replica_set_monitor_discovery_time_processor.h new file mode 100644 index 00000000000..ab786a5cb73 --- /dev/null +++ b/src/mongo/client/streamable_replica_set_monitor_discovery_time_processor.h @@ -0,0 +1,49 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#pragma once + +#include "mongo/client/sdam/sdam.h" +#include "mongo/client/streamable_replica_set_monitor.h" + +namespace mongo { +class StreamableReplicaSetMonitor::StreamableReplicaSetMonitorDiscoveryTimeProcessor final + : public sdam::TopologyListener { +public: + void onTopologyDescriptionChangedEvent(sdam::TopologyDescriptionPtr previousDescription, + sdam::TopologyDescriptionPtr newDescription) override; + + const Milliseconds getPrimaryServerChangeElapsedTime() const; + +private: + mutable Mutex _mutex = + MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), + "StreamableReplicaSetMonitorDiscoveryTimeProcessor::_mutex"); + Timer _elapsedTime; +}; +} // namespace mongo diff --git a/src/mongo/client/streamable_replica_set_monitor_discovery_time_processor_test.cpp b/src/mongo/client/streamable_replica_set_monitor_discovery_time_processor_test.cpp new file mode 100644 index 00000000000..caff43580f0 --- /dev/null +++ b/src/mongo/client/streamable_replica_set_monitor_discovery_time_processor_test.cpp @@ -0,0 +1,100 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include <boost/optional/optional_io.hpp> + +#include "mongo/client/sdam/sdam_test_base.h" +#include "mongo/client/sdam/server_description_builder.h" +#include "mongo/client/sdam/topology_description.h" +#include "mongo/client/sdam/topology_manager.h" +#include "mongo/client/streamable_replica_set_monitor_discovery_time_processor.h" + +namespace mongo { + +class PrimaryServerDiscoveryTime : public SdamTestFixture { +public: + const SdamConfiguration sdamConfiguration; + + PrimaryServerDiscoveryTime() : sdamConfiguration(SdamConfiguration({{HostAndPort("s0")}})) {} + static inline const OID kOidOne{"000000000000000000000001"}; + SdamServerSelector selector = SdamServerSelector(sdamConfiguration); + StreamableReplicaSetMonitor::StreamableReplicaSetMonitorDiscoveryTimeProcessor + _rsmTimeProcessor; +}; +TEST_F(PrimaryServerDiscoveryTime, ShouldFilterByLastWriteTime2) { + TopologyStateMachine stateMachine(sdamConfiguration); + auto topologyDescription = std::make_shared<TopologyDescription>(sdamConfiguration); + + const auto s0 = ServerDescriptionBuilder() + .withAddress(HostAndPort("s0")) + .withType(ServerType::kRSPrimary) + .withRtt(sdamConfiguration.getLocalThreshold()) + .withSetName("set") + .withHost(HostAndPort("s0")) + .withHost(HostAndPort("s1")) + .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG) + .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION) + .withElectionId(kOidOne) + .withSetVersion(100) + .instance(); + stateMachine.onServerDescription(*topologyDescription, s0); + + auto newTopology = TopologyDescription::clone(*topologyDescription); + const auto s1 = ServerDescriptionBuilder() + .withAddress(HostAndPort("s0")) + .withType(ServerType::kRSSecondary) + .withRtt(sdamConfiguration.getLocalThreshold()) + .withSetName("set") + .withHost(HostAndPort("s0")) + .withHost(HostAndPort("s1")) + .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG) + .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION) + .withElectionId(kOidOne) + .withSetVersion(100) + .instance(); + + stateMachine.onServerDescription(*newTopology, s1); + + // no timer reset because primary server didn't change + auto beforeElapsedDuration = _rsmTimeProcessor.getPrimaryServerChangeElapsedTime(); + sleepFor(Milliseconds(100)); + _rsmTimeProcessor.onTopologyDescriptionChangedEvent(topologyDescription, topologyDescription); + auto afterElapsedDuration = _rsmTimeProcessor.getPrimaryServerChangeElapsedTime(); + ASSERT_TRUE(afterElapsedDuration > beforeElapsedDuration); + + // timer reset because of primary server change + beforeElapsedDuration = _rsmTimeProcessor.getPrimaryServerChangeElapsedTime(); + sleepFor(Milliseconds(100)); + _rsmTimeProcessor.onTopologyDescriptionChangedEvent(topologyDescription, newTopology); + afterElapsedDuration = _rsmTimeProcessor.getPrimaryServerChangeElapsedTime(); + ASSERT_TRUE(afterElapsedDuration < + beforeElapsedDuration); // afterElapsedDuration was just reset +} + +} // namespace mongo |