diff options
author | Sergi Mateo Bellido <sergi.mateo-bellido@mongodb.com> | 2022-05-12 17:03:11 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-05-13 13:33:42 +0000 |
commit | f9bf6f179994afeaff255568a6ab0c28dc3f646c (patch) | |
tree | 4a985f288400179323299cc44e0ed4eefb39ec99 | |
parent | 94ebb3709d96e4d991294c0c1db021f83d9b2c58 (diff) | |
download | mongo-f9bf6f179994afeaff255568a6ab0c28dc3f646c.tar.gz |
SERVER-64433 Recovering the topology tick points on startup/init sync
(cherry picked from commit cc33146088335da2bc08edf4eeec7d6b9fd724f0)
-rw-r--r-- | src/mongo/db/SConscript | 3 | ||||
-rw-r--r-- | src/mongo/db/namespace_string.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/namespace_string.h | 3 | ||||
-rw-r--r-- | src/mongo/db/s/SConscript | 3 | ||||
-rw-r--r-- | src/mongo/db/s/config_server_op_observer.cpp | 107 | ||||
-rw-r--r-- | src/mongo/db/s/config_server_op_observer.h | 6 | ||||
-rw-r--r-- | src/mongo/db/s/topology_time_ticker.cpp | 139 | ||||
-rw-r--r-- | src/mongo/db/s/topology_time_ticker.h | 99 | ||||
-rw-r--r-- | src/mongo/db/s/topology_time_ticker_test.cpp | 182 | ||||
-rw-r--r-- | src/mongo/db/vector_clock_mongod.cpp | 34 |
10 files changed, 495 insertions, 91 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 446061814da..8e5de43635d 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -2062,7 +2062,8 @@ env.Library( env.Library( target='vector_clock_mongod', source=[ - 'vector_clock_mongod.cpp', + 's/topology_time_ticker.cpp', + 'vector_clock_mongod.cpp' ], LIBDEPS=[ 'vector_clock_mutable', diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp index d6bc8ced42f..13729330ee1 100644 --- a/src/mongo/db/namespace_string.cpp +++ b/src/mongo/db/namespace_string.cpp @@ -165,6 +165,10 @@ const NamespaceString NamespaceString::kCompactStructuredEncryptionCoordinatorNa const NamespaceString NamespaceString::kClusterParametersNamespace(NamespaceString::kConfigDb, "clusterParameters"); +// TODO (SERVER-66431): replace all usages of ShardType::ConfigNS by kConfigsvrShardsNamespace +const NamespaceString NamespaceString::kConfigsvrShardsNamespace(NamespaceString::kConfigDb, + "shards"); + bool NamespaceString::isListCollectionsCursorNS() const { return coll() == listCollectionsCursorCol; } @@ -234,12 +238,16 @@ bool NamespaceString::isLegalClientSystemNS( * * Process updates to config.tenantMigrationRecipients individually so they serialize after inserts * into config.donatedFiles.<migrationId>. + * + * Oplog entries on 'config.shards' should be processed one at a time, otherwise the in-memory state + * that its kept on the TopologyTimeTicker might be wrong. + * */ bool NamespaceString::mustBeAppliedInOwnOplogBatch() const { return isSystemDotViews() || isServerConfigurationCollection() || isPrivilegeCollection() || _ns == kDonorReshardingOperationsNamespace.ns() || _ns == kForceOplogBatchBoundaryNamespace.ns() || - _ns == kTenantMigrationRecipientsNamespace.ns(); + _ns == kTenantMigrationRecipientsNamespace.ns() || _ns == kConfigsvrShardsNamespace.ns(); } NamespaceString NamespaceString::makeListCollectionsNSS(StringData dbName) { diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h index c4bbc52fcbd..0822cb21d4d 100644 --- a/src/mongo/db/namespace_string.h +++ b/src/mongo/db/namespace_string.h @@ -219,6 +219,9 @@ public: // Namespace used for storing cluster wide parameters. static const NamespaceString kClusterParametersNamespace; + // Namespace used for storing the list of shards on the CSRS + static const NamespaceString kConfigsvrShardsNamespace; + /** * Constructs an empty NamespaceString. */ diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 750fd0dab37..fa1e774f18e 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -669,9 +669,10 @@ env.CppUnitTest( 'resharding/resharding_util_test.cpp', 'sharding_ddl_util_test.cpp', 'sharding_util_refresh_test.cpp', + 'topology_time_ticker_test.cpp', 'type_lockpings_test.cpp', 'type_locks_test.cpp', - 'vector_clock_config_server_test.cpp', + 'vector_clock_config_server_test.cpp' ], LIBDEPS=[ '$BUILD_DIR/mongo/db/auth/authmocks', diff --git a/src/mongo/db/s/config_server_op_observer.cpp b/src/mongo/db/s/config_server_op_observer.cpp index fe73090a366..477f304d3a8 100644 --- a/src/mongo/db/s/config_server_op_observer.cpp +++ b/src/mongo/db/s/config_server_op_observer.cpp @@ -34,6 +34,7 @@ #include "mongo/db/s/config_server_op_observer.h" #include "mongo/db/s/config/sharding_catalog_manager.h" +#include "mongo/db/s/topology_time_ticker.h" #include "mongo/db/update/update_oplog_entry_serialization.h" #include "mongo/db/vector_clock_mutable.h" #include "mongo/logv2/log.h" @@ -43,16 +44,6 @@ namespace mongo { -namespace { - -// The number of pending topologyTime tick points (stored in the _topologyTimeTickPoints vector) is -// generally expected to be small (since shard topology operations should be infrequent, relative to -// any config server replication lag). If the size of this vector exceeds this constant (when a -// tick point is added), then a warning (with id 4740600) will be logged. -constexpr size_t kPossiblyExcessiveNumTopologyTimeTickPoints = 3; - -} // namespace - ConfigServerOpObserver::ConfigServerOpObserver() = default; ConfigServerOpObserver::~ConfigServerOpObserver() = default; @@ -102,6 +93,13 @@ void ConfigServerOpObserver::_onReplicationRollback(OperationContext* opCtx, ShardingCatalogManager::get(opCtx)->discardCachedConfigDatabaseInitializationState(); ClusterIdentityLoader::get(opCtx)->discardCachedClusterId(); } + + if (rbInfo.rollbackNamespaces.find(ShardType::ConfigNS) != rbInfo.rollbackNamespaces.end()) { + // If some entries were rollbacked from config.shards we might need to discard some tick + // points from the TopologyTimeTicker + const auto lastApplied = repl::ReplicationCoordinator::get(opCtx)->getMyLastAppliedOpTime(); + TopologyTimeTicker::get(opCtx).onReplicationRollback(lastApplied); + } } void ConfigServerOpObserver::onInserts(OperationContext* opCtx, @@ -114,21 +112,25 @@ void ConfigServerOpObserver::onInserts(OperationContext* opCtx, return; } - boost::optional<Timestamp> maxTopologyTime; - for (auto it = begin; it != end; it++) { - Timestamp newTopologyTime = it->doc[ShardType::topologyTime.name()].timestamp(); - if (newTopologyTime != Timestamp()) { - if (!maxTopologyTime || newTopologyTime > *maxTopologyTime) { - maxTopologyTime = newTopologyTime; + if (!topology_time_ticker_utils::inRecoveryMode(opCtx)) { + boost::optional<Timestamp> maxTopologyTime; + for (auto it = begin; it != end; it++) { + Timestamp newTopologyTime = it->doc[ShardType::topologyTime.name()].timestamp(); + if (newTopologyTime != Timestamp()) { + if (!maxTopologyTime || newTopologyTime > *maxTopologyTime) { + maxTopologyTime = newTopologyTime; + } } } - } - if (maxTopologyTime) { - opCtx->recoveryUnit()->onCommit( - [this, maxTopologyTime](boost::optional<Timestamp> unusedCommitTime) mutable { - _registerTopologyTimeTickPoint(*maxTopologyTime); - }); + if (maxTopologyTime) { + opCtx->recoveryUnit()->onCommit( + [opCtx, maxTopologyTime](boost::optional<Timestamp> commitTime) mutable { + invariant(commitTime); + TopologyTimeTicker::get(opCtx).onNewLocallyCommittedTopologyTimeAvailable( + *commitTime, *maxTopologyTime); + }); + } } } @@ -139,6 +141,10 @@ void ConfigServerOpObserver::onApplyOps(OperationContext* opCtx, return; } + if (topology_time_ticker_utils::inRecoveryMode(opCtx)) { + return; + } + if (applyOpCmd.firstElementFieldNameStringData() != "applyOps") { return; } @@ -189,61 +195,15 @@ void ConfigServerOpObserver::onApplyOps(OperationContext* opCtx, auto newTopologyTime = update_oplog_entry::extractNewValueForField(updateObj, ShardType::topologyTime()) .timestamp(); - if (newTopologyTime == Timestamp()) { - return; - } opCtx->recoveryUnit()->onCommit( - [this, newTopologyTime](boost::optional<Timestamp> unusedCommitTime) mutable { - _registerTopologyTimeTickPoint(newTopologyTime); + [opCtx, newTopologyTime](boost::optional<Timestamp> commitTime) mutable { + invariant(commitTime); + TopologyTimeTicker::get(opCtx).onNewLocallyCommittedTopologyTimeAvailable( + *commitTime, newTopologyTime); }); } -void ConfigServerOpObserver::_registerTopologyTimeTickPoint(Timestamp newTopologyTime) { - std::vector<Timestamp>::size_type numTickPoints = 0; - { - stdx::lock_guard lg(_mutex); - _topologyTimeTickPoints.push_back(newTopologyTime); - numTickPoints = _topologyTimeTickPoints.size(); - } - if (numTickPoints >= kPossiblyExcessiveNumTopologyTimeTickPoints) { - LOGV2_WARNING(4740600, - "possibly excessive number of topologyTime tick points", - "numTickPoints"_attr = numTickPoints, - "kPossiblyExcessiveNumTopologyTimeTickPoints"_attr = - kPossiblyExcessiveNumTopologyTimeTickPoints); - } -} - -void ConfigServerOpObserver::_tickTopologyTimeIfNecessary(ServiceContext* service, - Timestamp newCommitPointTime) { - stdx::lock_guard lg(_mutex); - - if (_topologyTimeTickPoints.empty()) { - return; - } - - // Find and remove the topologyTime tick points that have been passed. - boost::optional<Timestamp> maxPassedTime; - auto it = _topologyTimeTickPoints.begin(); - while (it != _topologyTimeTickPoints.end()) { - const auto& topologyTimeTickPoint = *it; - if (newCommitPointTime >= topologyTimeTickPoint) { - if (!maxPassedTime || topologyTimeTickPoint > *maxPassedTime) { - maxPassedTime = topologyTimeTickPoint; - } - it = _topologyTimeTickPoints.erase(it); - } else { - it++; - } - } - - // If any tick points were passed, tick TopologyTime to the largest one. - if (maxPassedTime) { - VectorClockMutable::get(service)->tickTopologyTimeTo(LogicalTime(*maxPassedTime)); - } -} - void ConfigServerOpObserver::onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint) { Timestamp newCommitPointTime = newCommitPoint.getTimestamp(); @@ -252,7 +212,8 @@ void ConfigServerOpObserver::onMajorityCommitPointUpdate(ServiceContext* service // ConfigTime is done first. VectorClockMutable::get(service)->tickConfigTimeTo(LogicalTime(newCommitPointTime)); - _tickTopologyTimeIfNecessary(service, newCommitPointTime); + // Letting the TopologyTimeTicker know that the majority commit point was advanced + TopologyTimeTicker::get(service).onMajorityCommitPointUpdate(service, newCommitPoint); } } // namespace mongo diff --git a/src/mongo/db/s/config_server_op_observer.h b/src/mongo/db/s/config_server_op_observer.h index f9414d4f49e..18e83a4b994 100644 --- a/src/mongo/db/s/config_server_op_observer.h +++ b/src/mongo/db/s/config_server_op_observer.h @@ -218,13 +218,7 @@ public: private: void _onReplicationRollback(OperationContext* opCtx, const RollbackObserverInfo& rbInfo); - void _registerTopologyTimeTickPoint(Timestamp newTopologyTime); void _tickTopologyTimeIfNecessary(ServiceContext* service, Timestamp newCommitPointTime); - - // Guards access to the instance variables below. - Mutex _mutex = MONGO_MAKE_LATCH("ConfigServerOpObserver"); - - std::vector<Timestamp> _topologyTimeTickPoints; }; } // namespace mongo diff --git a/src/mongo/db/s/topology_time_ticker.cpp b/src/mongo/db/s/topology_time_ticker.cpp new file mode 100644 index 00000000000..0275951f46e --- /dev/null +++ b/src/mongo/db/s/topology_time_ticker.cpp @@ -0,0 +1,139 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/topology_time_ticker.h" + +#include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/vector_clock_mutable.h" +#include "mongo/logv2/log.h" + +namespace mongo { + +namespace { + +// The number of pending topologyTime tick points (stored in the _topologyTimeTickPoints vector) is +// generally expected to be small (since shard topology operations should be infrequent, relative to +// any config server replication lag). If the size of this vector exceeds this constant (when a +// tick point is added), then a warning (with id 4740600) will be logged. +constexpr size_t kPossiblyExcessiveNumTopologyTimeTickPoints = 3; + +const auto serviceDecorator = ServiceContext::declareDecoration<TopologyTimeTicker>(); + +} // namespace + +namespace topology_time_ticker_utils { + +bool inRecoveryMode(OperationContext* opCtx) { + invariant(opCtx->lockState()->isRSTLLocked()); + + const auto replCoord = repl::ReplicationCoordinator::get(opCtx); + if (!replCoord->isReplEnabled()) { + return false; + } + + const auto memberState = replCoord->getMemberState(); + return memberState.startup2() || memberState.rollback(); +} + +} // namespace topology_time_ticker_utils + +TopologyTimeTicker& TopologyTimeTicker::get(ServiceContext* serviceContext) { + return serviceDecorator(serviceContext); +} + +TopologyTimeTicker& TopologyTimeTicker::get(OperationContext* opCtx) { + return get(opCtx->getServiceContext()); +} + +void TopologyTimeTicker::onNewLocallyCommittedTopologyTimeAvailable(Timestamp commitTime, + Timestamp topologyTime) { + const auto numTickPoints = [&] { + stdx::lock_guard lg(_mutex); + invariant(_topologyTimeTickPoints.size() == 0 || + _topologyTimeTickPoints.back().commitTime < commitTime); + _topologyTimeTickPoints.push_back({commitTime, topologyTime}); + return _topologyTimeTickPoints.size(); + }(); + + if (numTickPoints >= kPossiblyExcessiveNumTopologyTimeTickPoints) { + LOGV2_WARNING(4740600, + "possibly excessive number of topologyTime tick points", + "numTickPoints"_attr = numTickPoints, + "kPossiblyExcessiveNumTopologyTimeTickPoints"_attr = + kPossiblyExcessiveNumTopologyTimeTickPoints); + } +} + +void TopologyTimeTicker::onMajorityCommitPointUpdate(ServiceContext* service, + const repl::OpTime& newCommitPoint) { + Timestamp newMajorityTimestamp = newCommitPoint.getTimestamp(); + stdx::lock_guard lg(_mutex); + if (_topologyTimeTickPoints.empty()) + return; + + // Looking for the first tick point that is not majority committed + auto itFirstTickPointNonMajorityCommitted = + std::find_if(_topologyTimeTickPoints.begin(), + _topologyTimeTickPoints.end(), + [newMajorityTimestamp](const TopologyTimeTickPoint& tick) { + return tick.commitTime > newMajorityTimestamp; + }); + + if (itFirstTickPointNonMajorityCommitted != _topologyTimeTickPoints.begin()) { + // If some ticks were majority committed, advance the TopologyTime to the most recent one + const auto maxMajorityCommittedTopologyTime = + (itFirstTickPointNonMajorityCommitted - 1)->topologyTime; + + VectorClockMutable::get(service)->tickTopologyTimeTo( + LogicalTime(maxMajorityCommittedTopologyTime)); + + _topologyTimeTickPoints.erase(_topologyTimeTickPoints.begin(), + itFirstTickPointNonMajorityCommitted); + } +} + +void TopologyTimeTicker::onReplicationRollback(const repl::OpTime& lastAppliedOpTime) { + Timestamp newestTimestamp = lastAppliedOpTime.getTimestamp(); + + stdx::lock_guard lg(_mutex); + auto itFirstElemToBeRemoved = + std::find_if(_topologyTimeTickPoints.begin(), + _topologyTimeTickPoints.end(), + [newestTimestamp](const TopologyTimeTickPoint& tick) { + return newestTimestamp < tick.commitTime; + }); + + _topologyTimeTickPoints.erase(itFirstElemToBeRemoved, _topologyTimeTickPoints.end()); +} + +} // namespace mongo diff --git a/src/mongo/db/s/topology_time_ticker.h b/src/mongo/db/s/topology_time_ticker.h new file mode 100644 index 00000000000..8e20ae2e754 --- /dev/null +++ b/src/mongo/db/s/topology_time_ticker.h @@ -0,0 +1,99 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <vector> + +#include "mongo/bson/timestamp.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/service_context.h" +#include "mongo/platform/mutex.h" + +namespace mongo { + +namespace topology_time_ticker_utils { + +bool inRecoveryMode(OperationContext* opCtx); + +} + +/** + * This object is responsible for ticking the topology time of the Vector Clock when needed. It only + * exists on the CSRS nodes and it is associated with the mongod instance. + * + * Every time there is a local change on the topology of a sharded cluster this class registers a + * new TopologyTimeTickPoint. Once the oplog entry associated to that change is majority committed, + * we can advance the topology time. + * Since the tick points represent non-majority committed changes, this class has to handle what + * happens on rollback. + */ +class TopologyTimeTicker { +public: + TopologyTimeTicker() = default; + + static TopologyTimeTicker& get(ServiceContext* serviceContext); + static TopologyTimeTicker& get(OperationContext* opCtx); + + /** + * Registers a new tick point <commitTime, topologyTime> on the _topologyTimeTickPoints vector. + * This method is invoked from two places: + * - From the OpObservers when writes modify config.shards + * - As part of the initialization of the VectorClock on the CSRS. + */ + void onNewLocallyCommittedTopologyTimeAvailable(Timestamp commitTime, Timestamp topologyTime); + + /** + * Advances the topology time if one or more tick points have been majority committed. + */ + void onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint); + + /** + * Removes from the _topologyTimeTickPoints vector any topology change that was rollbacked. + */ + void onReplicationRollback(const repl::OpTime& lastAppliedOpTime); + +private: + Mutex _mutex = MONGO_MAKE_LATCH("TopologyTimeTicker"); + /** + * This class tracks some time-related information about a topology change in a sharded cluster. + * More specifically, the vector clock should start gossiping the new topologyTime once the + * commitTime is majority committed. + */ + struct TopologyTimeTickPoint { + // The time at which the write on config.shards was locally committed. Do not gossip it! + Timestamp commitTime; + // The time that was stored in config.shards. It will be gossiped once the commitTime is + // majority committed. + Timestamp topologyTime; + }; + + std::vector<TopologyTimeTickPoint> _topologyTimeTickPoints; +}; +} // namespace mongo diff --git a/src/mongo/db/s/topology_time_ticker_test.cpp b/src/mongo/db/s/topology_time_ticker_test.cpp new file mode 100644 index 00000000000..d676ee35b40 --- /dev/null +++ b/src/mongo/db/s/topology_time_ticker_test.cpp @@ -0,0 +1,182 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/service_context.h" +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest + +#include "mongo/platform/basic.h" + +#include "mongo/db/keys_collection_client_sharded.h" +#include "mongo/db/keys_collection_manager.h" +#include "mongo/db/logical_time_validator.h" +#include "mongo/db/s/config/config_server_test_fixture.h" +#include "mongo/db/s/topology_time_ticker.h" +#include "mongo/db/vector_clock_mutable.h" +#include "mongo/unittest/death_test.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/clock_source_mock.h" + +namespace mongo { +namespace { + +struct TickPoint { + Timestamp topologyTime; + Timestamp commitTime; +}; + +class TopologyTimeTickerConfigServer : public ConfigServerTestFixture { +protected: + TopologyTimeTickerConfigServer() : ConfigServerTestFixture(Options{}.useMockClock(true)) {} + + void setUp() override { + ConfigServerTestFixture::setUp(); + } + + void tearDown() override { + ConfigServerTestFixture::tearDown(); + } + + /** + * This function registers three tick points into the TopologyTimeTicker. Those three tick + * points are also stored in the _ticks vector. + */ + void registerThreeTickPoints(ServiceContext* sc) { + auto& topologyTimeTicker = TopologyTimeTicker::get(sc); + for (int i = 0; i < 3; ++i) { + TickPoint newTick = {Timestamp(1, 2 * i), Timestamp(1, 2 * i + 1)}; + _ticks.push_back(newTick); + topologyTimeTicker.onNewLocallyCommittedTopologyTimeAvailable(newTick.commitTime, + newTick.topologyTime); + } + } + + void checkTopologyTimeAfterAdvancingMajorityCommitPoint(ServiceContext* sc, + const Timestamp& majorityTime, + const Timestamp& expectedTopologyTime) { + auto& topologyTimeTicker = TopologyTimeTicker::get(sc); + auto vc = VectorClockMutable::get(sc); + topologyTimeTicker.onMajorityCommitPointUpdate(sc, repl::OpTime(majorityTime, /*term*/ -1)); + const auto time = vc->getTime(); + ASSERT_EQ(LogicalTime(expectedTopologyTime), time.topologyTime()); + } + + + static const Timestamp kCommitTimePreTicks; + static const Timestamp kCommitTimePostTicks; + // The commit time of the different ticks must belong to the range (kCommitTimePreTicks, + // kCommitTimePostTicks) + std::vector<TickPoint> _ticks; +}; + +const Timestamp TopologyTimeTickerConfigServer::kCommitTimePreTicks = Timestamp(0, 1); +const Timestamp TopologyTimeTickerConfigServer::kCommitTimePostTicks = Timestamp(10, 10); + +TEST_F(TopologyTimeTickerConfigServer, GossipingNewTopologyTimesWhenMajorityCommitted1) { + auto sc = getServiceContext(); + + registerThreeTickPoints(sc); + + // Checking initial topologyTime value + checkTopologyTimeAfterAdvancingMajorityCommitPoint(sc, kCommitTimePreTicks, Timestamp(0, 1)); + + // Checking that after advancing the commit point we also advance the topologyTime + for (const auto& tick : _ticks) { + checkTopologyTimeAfterAdvancingMajorityCommitPoint(sc, tick.commitTime, tick.topologyTime); + } + + // The majority commit point is advanced a bit more but there are no tick points: the + // topology time shouldn't change. + checkTopologyTimeAfterAdvancingMajorityCommitPoint( + sc, kCommitTimePostTicks, _ticks.back().topologyTime); +} + +TEST_F(TopologyTimeTickerConfigServer, GossipingNewTopologyTimesWhenMajorityCommitted2) { + auto sc = getServiceContext(); + + registerThreeTickPoints(sc); + + // Checking initial topologyTime value + checkTopologyTimeAfterAdvancingMajorityCommitPoint(sc, kCommitTimePreTicks, Timestamp(0, 1)); + + // The majority commit point is advanced to a point that includes all the tick points. The + // topology time should be the greatest one. + checkTopologyTimeAfterAdvancingMajorityCommitPoint( + sc, kCommitTimePostTicks, _ticks.back().topologyTime); +} + +DEATH_TEST_F(TopologyTimeTickerConfigServer, + InvalidonNewLocallyCommittedTopologyTimeAvailable, + "invariant") { + // This test verifies that the internal elements on the tick point vector are sorted. + auto sc = getServiceContext(); + Timestamp topologyTime1(0, 8); + Timestamp commitTime1(0, 10); + auto& topologyTimeTicker = TopologyTimeTicker::get(sc); + topologyTimeTicker.onNewLocallyCommittedTopologyTimeAvailable(commitTime1, topologyTime1); + + Timestamp topologyTime2(0, 5); + Timestamp commitTime2(0, 6); + // Newer tick points must have a greater commit time than older ones + topologyTimeTicker.onNewLocallyCommittedTopologyTimeAvailable(commitTime2, topologyTime2); +} + +TEST_F(TopologyTimeTickerConfigServer, RollbackingAllTickPoints) { + auto sc = getServiceContext(); + + registerThreeTickPoints(sc); + + // Checking initial topologyTime value + checkTopologyTimeAfterAdvancingMajorityCommitPoint(sc, kCommitTimePreTicks, Timestamp(0, 1)); + + // We rollback all tick points + auto& topologyTimeTicker = TopologyTimeTicker::get(sc); + topologyTimeTicker.onReplicationRollback(repl::OpTime(kCommitTimePreTicks, /*term*/ -1)); + + // The majority commit point is advanced to a point that would have included all our tick + // points, but because they were rollbacked the topology time should still be Timestamp(0, 1). + checkTopologyTimeAfterAdvancingMajorityCommitPoint(sc, kCommitTimePostTicks, Timestamp(0, 1)); +} + +TEST_F(TopologyTimeTickerConfigServer, PartialRollbackingTickPoints) { + auto sc = getServiceContext(); + + registerThreeTickPoints(sc); + + // We rollback the last two tick points + auto& topologyTimeTicker = TopologyTimeTicker::get(sc); + topologyTimeTicker.onReplicationRollback(repl::OpTime(_ticks.front().commitTime, /*term*/ -1)); + + // The majority commit point is advanced to a point that includes all the tick points. The + // topology time should be the one from the first tick since the other two were rollbacked. + checkTopologyTimeAfterAdvancingMajorityCommitPoint( + sc, kCommitTimePostTicks, _ticks.front().topologyTime); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/vector_clock_mongod.cpp b/src/mongo/db/vector_clock_mongod.cpp index 8f791e87e3f..6ef97de2b27 100644 --- a/src/mongo/db/vector_clock_mongod.cpp +++ b/src/mongo/db/vector_clock_mongod.cpp @@ -35,7 +35,9 @@ #include "mongo/db/persistent_task_store.h" #include "mongo/db/repl/replica_set_aware_service.h" #include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/repl/storage_interface.h" #include "mongo/db/s/sharding_state.h" +#include "mongo/db/s/topology_time_ticker.h" #include "mongo/db/vector_clock_document_gen.h" #include "mongo/db/vector_clock_mutable.h" #include "mongo/executor/scoped_task_executor.h" @@ -92,7 +94,7 @@ private: // ReplicaSetAwareService methods implementation void onStartup(OperationContext* opCtx) override {} - void onInitialDataAvailable(OperationContext* opCtx, bool isMajorityDataAvailable) override {} + void onInitialDataAvailable(OperationContext* opCtx, bool isMajorityDataAvailable) override; void onShutdown() override {} void onStepUpBegin(OperationContext* opCtx, long long term) override; void onStepUpComplete(OperationContext* opCtx, long long term) override {} @@ -185,9 +187,15 @@ VectorClockMongoD::~VectorClockMongoD() = default; void VectorClockMongoD::onStepUpBegin(OperationContext* opCtx, long long term) { stdx::lock_guard lg(_mutex); _durableTime.reset(); +} + +void VectorClockMongoD::onStepDown() { + stdx::lock_guard lg(_mutex); + _durableTime.reset(); +} - // Initialize the config server's topology time to the maximum topology time from - // `config.shards` collection instead of using `Timestamp(0 ,0)`. +void VectorClockMongoD::onInitialDataAvailable(OperationContext* opCtx, + bool isMajorityDataAvailable) { if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { const auto maxTopologyTime{[&opCtx]() -> boost::optional<Timestamp> { DBDirectClient client{opCtx}; @@ -206,16 +214,24 @@ void VectorClockMongoD::onStepUpBegin(OperationContext* opCtx, long long term) { }()}; if (maxTopologyTime) { - _advanceComponentTimeTo(Component::TopologyTime, LogicalTime(*maxTopologyTime)); + if (isMajorityDataAvailable) { + // The maxTopologyTime is majority committed. Thus, we can start gossiping it. + _advanceComponentTimeTo(Component::TopologyTime, LogicalTime(*maxTopologyTime)); + } else { + // There is no guarantee that the maxTopologyTime is majority committed and we don't + // have a way to obtain the commit time associated with it (init sync scenario). + // The only guarantee that we have at this point is that any majority read + // that comes afterwards will read, at least, from the initialDataTimestamp. Thus, + // we introduce an artificial tick point <initialDataTimestamp, maxTopologyTime>. + const auto initialDataTimestamp = + repl::ReplicationCoordinator::get(opCtx)->getMyLastAppliedOpTime(); + TopologyTimeTicker::get(opCtx).onNewLocallyCommittedTopologyTimeAvailable( + initialDataTimestamp.getTimestamp(), *maxTopologyTime); + } } } } -void VectorClockMongoD::onStepDown() { - stdx::lock_guard lg(_mutex); - _durableTime.reset(); -} - void VectorClockMongoD::onBecomeArbiter() { // The node has become an arbiter, hence will not need logical clock for external operations. _disable(); |