summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSergi Mateo Bellido <sergi.mateo-bellido@mongodb.com>2022-05-12 17:03:11 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-05-13 13:33:42 +0000
commitf9bf6f179994afeaff255568a6ab0c28dc3f646c (patch)
tree4a985f288400179323299cc44e0ed4eefb39ec99
parent94ebb3709d96e4d991294c0c1db021f83d9b2c58 (diff)
downloadmongo-f9bf6f179994afeaff255568a6ab0c28dc3f646c.tar.gz
SERVER-64433 Recovering the topology tick points on startup/init sync
(cherry picked from commit cc33146088335da2bc08edf4eeec7d6b9fd724f0)
-rw-r--r--src/mongo/db/SConscript3
-rw-r--r--src/mongo/db/namespace_string.cpp10
-rw-r--r--src/mongo/db/namespace_string.h3
-rw-r--r--src/mongo/db/s/SConscript3
-rw-r--r--src/mongo/db/s/config_server_op_observer.cpp107
-rw-r--r--src/mongo/db/s/config_server_op_observer.h6
-rw-r--r--src/mongo/db/s/topology_time_ticker.cpp139
-rw-r--r--src/mongo/db/s/topology_time_ticker.h99
-rw-r--r--src/mongo/db/s/topology_time_ticker_test.cpp182
-rw-r--r--src/mongo/db/vector_clock_mongod.cpp34
10 files changed, 495 insertions, 91 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 446061814da..8e5de43635d 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -2062,7 +2062,8 @@ env.Library(
env.Library(
target='vector_clock_mongod',
source=[
- 'vector_clock_mongod.cpp',
+ 's/topology_time_ticker.cpp',
+ 'vector_clock_mongod.cpp'
],
LIBDEPS=[
'vector_clock_mutable',
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp
index d6bc8ced42f..13729330ee1 100644
--- a/src/mongo/db/namespace_string.cpp
+++ b/src/mongo/db/namespace_string.cpp
@@ -165,6 +165,10 @@ const NamespaceString NamespaceString::kCompactStructuredEncryptionCoordinatorNa
const NamespaceString NamespaceString::kClusterParametersNamespace(NamespaceString::kConfigDb,
"clusterParameters");
+// TODO (SERVER-66431): replace all usages of ShardType::ConfigNS by kConfigsvrShardsNamespace
+const NamespaceString NamespaceString::kConfigsvrShardsNamespace(NamespaceString::kConfigDb,
+ "shards");
+
bool NamespaceString::isListCollectionsCursorNS() const {
return coll() == listCollectionsCursorCol;
}
@@ -234,12 +238,16 @@ bool NamespaceString::isLegalClientSystemNS(
*
* Process updates to config.tenantMigrationRecipients individually so they serialize after inserts
* into config.donatedFiles.<migrationId>.
+ *
+ * Oplog entries on 'config.shards' should be processed one at a time, otherwise the in-memory state
+ * that its kept on the TopologyTimeTicker might be wrong.
+ *
*/
bool NamespaceString::mustBeAppliedInOwnOplogBatch() const {
return isSystemDotViews() || isServerConfigurationCollection() || isPrivilegeCollection() ||
_ns == kDonorReshardingOperationsNamespace.ns() ||
_ns == kForceOplogBatchBoundaryNamespace.ns() ||
- _ns == kTenantMigrationRecipientsNamespace.ns();
+ _ns == kTenantMigrationRecipientsNamespace.ns() || _ns == kConfigsvrShardsNamespace.ns();
}
NamespaceString NamespaceString::makeListCollectionsNSS(StringData dbName) {
diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h
index c4bbc52fcbd..0822cb21d4d 100644
--- a/src/mongo/db/namespace_string.h
+++ b/src/mongo/db/namespace_string.h
@@ -219,6 +219,9 @@ public:
// Namespace used for storing cluster wide parameters.
static const NamespaceString kClusterParametersNamespace;
+ // Namespace used for storing the list of shards on the CSRS
+ static const NamespaceString kConfigsvrShardsNamespace;
+
/**
* Constructs an empty NamespaceString.
*/
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 750fd0dab37..fa1e774f18e 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -669,9 +669,10 @@ env.CppUnitTest(
'resharding/resharding_util_test.cpp',
'sharding_ddl_util_test.cpp',
'sharding_util_refresh_test.cpp',
+ 'topology_time_ticker_test.cpp',
'type_lockpings_test.cpp',
'type_locks_test.cpp',
- 'vector_clock_config_server_test.cpp',
+ 'vector_clock_config_server_test.cpp'
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/auth/authmocks',
diff --git a/src/mongo/db/s/config_server_op_observer.cpp b/src/mongo/db/s/config_server_op_observer.cpp
index fe73090a366..477f304d3a8 100644
--- a/src/mongo/db/s/config_server_op_observer.cpp
+++ b/src/mongo/db/s/config_server_op_observer.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/s/config_server_op_observer.h"
#include "mongo/db/s/config/sharding_catalog_manager.h"
+#include "mongo/db/s/topology_time_ticker.h"
#include "mongo/db/update/update_oplog_entry_serialization.h"
#include "mongo/db/vector_clock_mutable.h"
#include "mongo/logv2/log.h"
@@ -43,16 +44,6 @@
namespace mongo {
-namespace {
-
-// The number of pending topologyTime tick points (stored in the _topologyTimeTickPoints vector) is
-// generally expected to be small (since shard topology operations should be infrequent, relative to
-// any config server replication lag). If the size of this vector exceeds this constant (when a
-// tick point is added), then a warning (with id 4740600) will be logged.
-constexpr size_t kPossiblyExcessiveNumTopologyTimeTickPoints = 3;
-
-} // namespace
-
ConfigServerOpObserver::ConfigServerOpObserver() = default;
ConfigServerOpObserver::~ConfigServerOpObserver() = default;
@@ -102,6 +93,13 @@ void ConfigServerOpObserver::_onReplicationRollback(OperationContext* opCtx,
ShardingCatalogManager::get(opCtx)->discardCachedConfigDatabaseInitializationState();
ClusterIdentityLoader::get(opCtx)->discardCachedClusterId();
}
+
+ if (rbInfo.rollbackNamespaces.find(ShardType::ConfigNS) != rbInfo.rollbackNamespaces.end()) {
+ // If some entries were rollbacked from config.shards we might need to discard some tick
+ // points from the TopologyTimeTicker
+ const auto lastApplied = repl::ReplicationCoordinator::get(opCtx)->getMyLastAppliedOpTime();
+ TopologyTimeTicker::get(opCtx).onReplicationRollback(lastApplied);
+ }
}
void ConfigServerOpObserver::onInserts(OperationContext* opCtx,
@@ -114,21 +112,25 @@ void ConfigServerOpObserver::onInserts(OperationContext* opCtx,
return;
}
- boost::optional<Timestamp> maxTopologyTime;
- for (auto it = begin; it != end; it++) {
- Timestamp newTopologyTime = it->doc[ShardType::topologyTime.name()].timestamp();
- if (newTopologyTime != Timestamp()) {
- if (!maxTopologyTime || newTopologyTime > *maxTopologyTime) {
- maxTopologyTime = newTopologyTime;
+ if (!topology_time_ticker_utils::inRecoveryMode(opCtx)) {
+ boost::optional<Timestamp> maxTopologyTime;
+ for (auto it = begin; it != end; it++) {
+ Timestamp newTopologyTime = it->doc[ShardType::topologyTime.name()].timestamp();
+ if (newTopologyTime != Timestamp()) {
+ if (!maxTopologyTime || newTopologyTime > *maxTopologyTime) {
+ maxTopologyTime = newTopologyTime;
+ }
}
}
- }
- if (maxTopologyTime) {
- opCtx->recoveryUnit()->onCommit(
- [this, maxTopologyTime](boost::optional<Timestamp> unusedCommitTime) mutable {
- _registerTopologyTimeTickPoint(*maxTopologyTime);
- });
+ if (maxTopologyTime) {
+ opCtx->recoveryUnit()->onCommit(
+ [opCtx, maxTopologyTime](boost::optional<Timestamp> commitTime) mutable {
+ invariant(commitTime);
+ TopologyTimeTicker::get(opCtx).onNewLocallyCommittedTopologyTimeAvailable(
+ *commitTime, *maxTopologyTime);
+ });
+ }
}
}
@@ -139,6 +141,10 @@ void ConfigServerOpObserver::onApplyOps(OperationContext* opCtx,
return;
}
+ if (topology_time_ticker_utils::inRecoveryMode(opCtx)) {
+ return;
+ }
+
if (applyOpCmd.firstElementFieldNameStringData() != "applyOps") {
return;
}
@@ -189,61 +195,15 @@ void ConfigServerOpObserver::onApplyOps(OperationContext* opCtx,
auto newTopologyTime =
update_oplog_entry::extractNewValueForField(updateObj, ShardType::topologyTime())
.timestamp();
- if (newTopologyTime == Timestamp()) {
- return;
- }
opCtx->recoveryUnit()->onCommit(
- [this, newTopologyTime](boost::optional<Timestamp> unusedCommitTime) mutable {
- _registerTopologyTimeTickPoint(newTopologyTime);
+ [opCtx, newTopologyTime](boost::optional<Timestamp> commitTime) mutable {
+ invariant(commitTime);
+ TopologyTimeTicker::get(opCtx).onNewLocallyCommittedTopologyTimeAvailable(
+ *commitTime, newTopologyTime);
});
}
-void ConfigServerOpObserver::_registerTopologyTimeTickPoint(Timestamp newTopologyTime) {
- std::vector<Timestamp>::size_type numTickPoints = 0;
- {
- stdx::lock_guard lg(_mutex);
- _topologyTimeTickPoints.push_back(newTopologyTime);
- numTickPoints = _topologyTimeTickPoints.size();
- }
- if (numTickPoints >= kPossiblyExcessiveNumTopologyTimeTickPoints) {
- LOGV2_WARNING(4740600,
- "possibly excessive number of topologyTime tick points",
- "numTickPoints"_attr = numTickPoints,
- "kPossiblyExcessiveNumTopologyTimeTickPoints"_attr =
- kPossiblyExcessiveNumTopologyTimeTickPoints);
- }
-}
-
-void ConfigServerOpObserver::_tickTopologyTimeIfNecessary(ServiceContext* service,
- Timestamp newCommitPointTime) {
- stdx::lock_guard lg(_mutex);
-
- if (_topologyTimeTickPoints.empty()) {
- return;
- }
-
- // Find and remove the topologyTime tick points that have been passed.
- boost::optional<Timestamp> maxPassedTime;
- auto it = _topologyTimeTickPoints.begin();
- while (it != _topologyTimeTickPoints.end()) {
- const auto& topologyTimeTickPoint = *it;
- if (newCommitPointTime >= topologyTimeTickPoint) {
- if (!maxPassedTime || topologyTimeTickPoint > *maxPassedTime) {
- maxPassedTime = topologyTimeTickPoint;
- }
- it = _topologyTimeTickPoints.erase(it);
- } else {
- it++;
- }
- }
-
- // If any tick points were passed, tick TopologyTime to the largest one.
- if (maxPassedTime) {
- VectorClockMutable::get(service)->tickTopologyTimeTo(LogicalTime(*maxPassedTime));
- }
-}
-
void ConfigServerOpObserver::onMajorityCommitPointUpdate(ServiceContext* service,
const repl::OpTime& newCommitPoint) {
Timestamp newCommitPointTime = newCommitPoint.getTimestamp();
@@ -252,7 +212,8 @@ void ConfigServerOpObserver::onMajorityCommitPointUpdate(ServiceContext* service
// ConfigTime is done first.
VectorClockMutable::get(service)->tickConfigTimeTo(LogicalTime(newCommitPointTime));
- _tickTopologyTimeIfNecessary(service, newCommitPointTime);
+ // Letting the TopologyTimeTicker know that the majority commit point was advanced
+ TopologyTimeTicker::get(service).onMajorityCommitPointUpdate(service, newCommitPoint);
}
} // namespace mongo
diff --git a/src/mongo/db/s/config_server_op_observer.h b/src/mongo/db/s/config_server_op_observer.h
index f9414d4f49e..18e83a4b994 100644
--- a/src/mongo/db/s/config_server_op_observer.h
+++ b/src/mongo/db/s/config_server_op_observer.h
@@ -218,13 +218,7 @@ public:
private:
void _onReplicationRollback(OperationContext* opCtx, const RollbackObserverInfo& rbInfo);
- void _registerTopologyTimeTickPoint(Timestamp newTopologyTime);
void _tickTopologyTimeIfNecessary(ServiceContext* service, Timestamp newCommitPointTime);
-
- // Guards access to the instance variables below.
- Mutex _mutex = MONGO_MAKE_LATCH("ConfigServerOpObserver");
-
- std::vector<Timestamp> _topologyTimeTickPoints;
};
} // namespace mongo
diff --git a/src/mongo/db/s/topology_time_ticker.cpp b/src/mongo/db/s/topology_time_ticker.cpp
new file mode 100644
index 00000000000..0275951f46e
--- /dev/null
+++ b/src/mongo/db/s/topology_time_ticker.cpp
@@ -0,0 +1,139 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/s/topology_time_ticker.h"
+
+#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/db/vector_clock_mutable.h"
+#include "mongo/logv2/log.h"
+
+namespace mongo {
+
+namespace {
+
+// The number of pending topologyTime tick points (stored in the _topologyTimeTickPoints vector) is
+// generally expected to be small (since shard topology operations should be infrequent, relative to
+// any config server replication lag). If the size of this vector exceeds this constant (when a
+// tick point is added), then a warning (with id 4740600) will be logged.
+constexpr size_t kPossiblyExcessiveNumTopologyTimeTickPoints = 3;
+
+const auto serviceDecorator = ServiceContext::declareDecoration<TopologyTimeTicker>();
+
+} // namespace
+
+namespace topology_time_ticker_utils {
+
+bool inRecoveryMode(OperationContext* opCtx) {
+ invariant(opCtx->lockState()->isRSTLLocked());
+
+ const auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ if (!replCoord->isReplEnabled()) {
+ return false;
+ }
+
+ const auto memberState = replCoord->getMemberState();
+ return memberState.startup2() || memberState.rollback();
+}
+
+} // namespace topology_time_ticker_utils
+
+TopologyTimeTicker& TopologyTimeTicker::get(ServiceContext* serviceContext) {
+ return serviceDecorator(serviceContext);
+}
+
+TopologyTimeTicker& TopologyTimeTicker::get(OperationContext* opCtx) {
+ return get(opCtx->getServiceContext());
+}
+
+void TopologyTimeTicker::onNewLocallyCommittedTopologyTimeAvailable(Timestamp commitTime,
+ Timestamp topologyTime) {
+ const auto numTickPoints = [&] {
+ stdx::lock_guard lg(_mutex);
+ invariant(_topologyTimeTickPoints.size() == 0 ||
+ _topologyTimeTickPoints.back().commitTime < commitTime);
+ _topologyTimeTickPoints.push_back({commitTime, topologyTime});
+ return _topologyTimeTickPoints.size();
+ }();
+
+ if (numTickPoints >= kPossiblyExcessiveNumTopologyTimeTickPoints) {
+ LOGV2_WARNING(4740600,
+ "possibly excessive number of topologyTime tick points",
+ "numTickPoints"_attr = numTickPoints,
+ "kPossiblyExcessiveNumTopologyTimeTickPoints"_attr =
+ kPossiblyExcessiveNumTopologyTimeTickPoints);
+ }
+}
+
+void TopologyTimeTicker::onMajorityCommitPointUpdate(ServiceContext* service,
+ const repl::OpTime& newCommitPoint) {
+ Timestamp newMajorityTimestamp = newCommitPoint.getTimestamp();
+ stdx::lock_guard lg(_mutex);
+ if (_topologyTimeTickPoints.empty())
+ return;
+
+ // Looking for the first tick point that is not majority committed
+ auto itFirstTickPointNonMajorityCommitted =
+ std::find_if(_topologyTimeTickPoints.begin(),
+ _topologyTimeTickPoints.end(),
+ [newMajorityTimestamp](const TopologyTimeTickPoint& tick) {
+ return tick.commitTime > newMajorityTimestamp;
+ });
+
+ if (itFirstTickPointNonMajorityCommitted != _topologyTimeTickPoints.begin()) {
+ // If some ticks were majority committed, advance the TopologyTime to the most recent one
+ const auto maxMajorityCommittedTopologyTime =
+ (itFirstTickPointNonMajorityCommitted - 1)->topologyTime;
+
+ VectorClockMutable::get(service)->tickTopologyTimeTo(
+ LogicalTime(maxMajorityCommittedTopologyTime));
+
+ _topologyTimeTickPoints.erase(_topologyTimeTickPoints.begin(),
+ itFirstTickPointNonMajorityCommitted);
+ }
+}
+
+void TopologyTimeTicker::onReplicationRollback(const repl::OpTime& lastAppliedOpTime) {
+ Timestamp newestTimestamp = lastAppliedOpTime.getTimestamp();
+
+ stdx::lock_guard lg(_mutex);
+ auto itFirstElemToBeRemoved =
+ std::find_if(_topologyTimeTickPoints.begin(),
+ _topologyTimeTickPoints.end(),
+ [newestTimestamp](const TopologyTimeTickPoint& tick) {
+ return newestTimestamp < tick.commitTime;
+ });
+
+ _topologyTimeTickPoints.erase(itFirstElemToBeRemoved, _topologyTimeTickPoints.end());
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/s/topology_time_ticker.h b/src/mongo/db/s/topology_time_ticker.h
new file mode 100644
index 00000000000..8e20ae2e754
--- /dev/null
+++ b/src/mongo/db/s/topology_time_ticker.h
@@ -0,0 +1,99 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <vector>
+
+#include "mongo/bson/timestamp.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/service_context.h"
+#include "mongo/platform/mutex.h"
+
+namespace mongo {
+
+namespace topology_time_ticker_utils {
+
+bool inRecoveryMode(OperationContext* opCtx);
+
+}
+
+/**
+ * This object is responsible for ticking the topology time of the Vector Clock when needed. It only
+ * exists on the CSRS nodes and it is associated with the mongod instance.
+ *
+ * Every time there is a local change on the topology of a sharded cluster this class registers a
+ * new TopologyTimeTickPoint. Once the oplog entry associated to that change is majority committed,
+ * we can advance the topology time.
+ * Since the tick points represent non-majority committed changes, this class has to handle what
+ * happens on rollback.
+ */
+class TopologyTimeTicker {
+public:
+ TopologyTimeTicker() = default;
+
+ static TopologyTimeTicker& get(ServiceContext* serviceContext);
+ static TopologyTimeTicker& get(OperationContext* opCtx);
+
+ /**
+ * Registers a new tick point <commitTime, topologyTime> on the _topologyTimeTickPoints vector.
+ * This method is invoked from two places:
+ * - From the OpObservers when writes modify config.shards
+ * - As part of the initialization of the VectorClock on the CSRS.
+ */
+ void onNewLocallyCommittedTopologyTimeAvailable(Timestamp commitTime, Timestamp topologyTime);
+
+ /**
+ * Advances the topology time if one or more tick points have been majority committed.
+ */
+ void onMajorityCommitPointUpdate(ServiceContext* service, const repl::OpTime& newCommitPoint);
+
+ /**
+ * Removes from the _topologyTimeTickPoints vector any topology change that was rollbacked.
+ */
+ void onReplicationRollback(const repl::OpTime& lastAppliedOpTime);
+
+private:
+ Mutex _mutex = MONGO_MAKE_LATCH("TopologyTimeTicker");
+ /**
+ * This class tracks some time-related information about a topology change in a sharded cluster.
+ * More specifically, the vector clock should start gossiping the new topologyTime once the
+ * commitTime is majority committed.
+ */
+ struct TopologyTimeTickPoint {
+ // The time at which the write on config.shards was locally committed. Do not gossip it!
+ Timestamp commitTime;
+ // The time that was stored in config.shards. It will be gossiped once the commitTime is
+ // majority committed.
+ Timestamp topologyTime;
+ };
+
+ std::vector<TopologyTimeTickPoint> _topologyTimeTickPoints;
+};
+} // namespace mongo
diff --git a/src/mongo/db/s/topology_time_ticker_test.cpp b/src/mongo/db/s/topology_time_ticker_test.cpp
new file mode 100644
index 00000000000..d676ee35b40
--- /dev/null
+++ b/src/mongo/db/s/topology_time_ticker_test.cpp
@@ -0,0 +1,182 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/service_context.h"
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/keys_collection_client_sharded.h"
+#include "mongo/db/keys_collection_manager.h"
+#include "mongo/db/logical_time_validator.h"
+#include "mongo/db/s/config/config_server_test_fixture.h"
+#include "mongo/db/s/topology_time_ticker.h"
+#include "mongo/db/vector_clock_mutable.h"
+#include "mongo/unittest/death_test.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/clock_source_mock.h"
+
+namespace mongo {
+namespace {
+
+struct TickPoint {
+ Timestamp topologyTime;
+ Timestamp commitTime;
+};
+
+class TopologyTimeTickerConfigServer : public ConfigServerTestFixture {
+protected:
+ TopologyTimeTickerConfigServer() : ConfigServerTestFixture(Options{}.useMockClock(true)) {}
+
+ void setUp() override {
+ ConfigServerTestFixture::setUp();
+ }
+
+ void tearDown() override {
+ ConfigServerTestFixture::tearDown();
+ }
+
+ /**
+ * This function registers three tick points into the TopologyTimeTicker. Those three tick
+ * points are also stored in the _ticks vector.
+ */
+ void registerThreeTickPoints(ServiceContext* sc) {
+ auto& topologyTimeTicker = TopologyTimeTicker::get(sc);
+ for (int i = 0; i < 3; ++i) {
+ TickPoint newTick = {Timestamp(1, 2 * i), Timestamp(1, 2 * i + 1)};
+ _ticks.push_back(newTick);
+ topologyTimeTicker.onNewLocallyCommittedTopologyTimeAvailable(newTick.commitTime,
+ newTick.topologyTime);
+ }
+ }
+
+ void checkTopologyTimeAfterAdvancingMajorityCommitPoint(ServiceContext* sc,
+ const Timestamp& majorityTime,
+ const Timestamp& expectedTopologyTime) {
+ auto& topologyTimeTicker = TopologyTimeTicker::get(sc);
+ auto vc = VectorClockMutable::get(sc);
+ topologyTimeTicker.onMajorityCommitPointUpdate(sc, repl::OpTime(majorityTime, /*term*/ -1));
+ const auto time = vc->getTime();
+ ASSERT_EQ(LogicalTime(expectedTopologyTime), time.topologyTime());
+ }
+
+
+ static const Timestamp kCommitTimePreTicks;
+ static const Timestamp kCommitTimePostTicks;
+ // The commit time of the different ticks must belong to the range (kCommitTimePreTicks,
+ // kCommitTimePostTicks)
+ std::vector<TickPoint> _ticks;
+};
+
+const Timestamp TopologyTimeTickerConfigServer::kCommitTimePreTicks = Timestamp(0, 1);
+const Timestamp TopologyTimeTickerConfigServer::kCommitTimePostTicks = Timestamp(10, 10);
+
+TEST_F(TopologyTimeTickerConfigServer, GossipingNewTopologyTimesWhenMajorityCommitted1) {
+ auto sc = getServiceContext();
+
+ registerThreeTickPoints(sc);
+
+ // Checking initial topologyTime value
+ checkTopologyTimeAfterAdvancingMajorityCommitPoint(sc, kCommitTimePreTicks, Timestamp(0, 1));
+
+ // Checking that after advancing the commit point we also advance the topologyTime
+ for (const auto& tick : _ticks) {
+ checkTopologyTimeAfterAdvancingMajorityCommitPoint(sc, tick.commitTime, tick.topologyTime);
+ }
+
+ // The majority commit point is advanced a bit more but there are no tick points: the
+ // topology time shouldn't change.
+ checkTopologyTimeAfterAdvancingMajorityCommitPoint(
+ sc, kCommitTimePostTicks, _ticks.back().topologyTime);
+}
+
+TEST_F(TopologyTimeTickerConfigServer, GossipingNewTopologyTimesWhenMajorityCommitted2) {
+ auto sc = getServiceContext();
+
+ registerThreeTickPoints(sc);
+
+ // Checking initial topologyTime value
+ checkTopologyTimeAfterAdvancingMajorityCommitPoint(sc, kCommitTimePreTicks, Timestamp(0, 1));
+
+ // The majority commit point is advanced to a point that includes all the tick points. The
+ // topology time should be the greatest one.
+ checkTopologyTimeAfterAdvancingMajorityCommitPoint(
+ sc, kCommitTimePostTicks, _ticks.back().topologyTime);
+}
+
+DEATH_TEST_F(TopologyTimeTickerConfigServer,
+ InvalidonNewLocallyCommittedTopologyTimeAvailable,
+ "invariant") {
+ // This test verifies that the internal elements on the tick point vector are sorted.
+ auto sc = getServiceContext();
+ Timestamp topologyTime1(0, 8);
+ Timestamp commitTime1(0, 10);
+ auto& topologyTimeTicker = TopologyTimeTicker::get(sc);
+ topologyTimeTicker.onNewLocallyCommittedTopologyTimeAvailable(commitTime1, topologyTime1);
+
+ Timestamp topologyTime2(0, 5);
+ Timestamp commitTime2(0, 6);
+ // Newer tick points must have a greater commit time than older ones
+ topologyTimeTicker.onNewLocallyCommittedTopologyTimeAvailable(commitTime2, topologyTime2);
+}
+
+TEST_F(TopologyTimeTickerConfigServer, RollbackingAllTickPoints) {
+ auto sc = getServiceContext();
+
+ registerThreeTickPoints(sc);
+
+ // Checking initial topologyTime value
+ checkTopologyTimeAfterAdvancingMajorityCommitPoint(sc, kCommitTimePreTicks, Timestamp(0, 1));
+
+ // We rollback all tick points
+ auto& topologyTimeTicker = TopologyTimeTicker::get(sc);
+ topologyTimeTicker.onReplicationRollback(repl::OpTime(kCommitTimePreTicks, /*term*/ -1));
+
+ // The majority commit point is advanced to a point that would have included all our tick
+ // points, but because they were rollbacked the topology time should still be Timestamp(0, 1).
+ checkTopologyTimeAfterAdvancingMajorityCommitPoint(sc, kCommitTimePostTicks, Timestamp(0, 1));
+}
+
+TEST_F(TopologyTimeTickerConfigServer, PartialRollbackingTickPoints) {
+ auto sc = getServiceContext();
+
+ registerThreeTickPoints(sc);
+
+ // We rollback the last two tick points
+ auto& topologyTimeTicker = TopologyTimeTicker::get(sc);
+ topologyTimeTicker.onReplicationRollback(repl::OpTime(_ticks.front().commitTime, /*term*/ -1));
+
+ // The majority commit point is advanced to a point that includes all the tick points. The
+ // topology time should be the one from the first tick since the other two were rollbacked.
+ checkTopologyTimeAfterAdvancingMajorityCommitPoint(
+ sc, kCommitTimePostTicks, _ticks.front().topologyTime);
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/vector_clock_mongod.cpp b/src/mongo/db/vector_clock_mongod.cpp
index 8f791e87e3f..6ef97de2b27 100644
--- a/src/mongo/db/vector_clock_mongod.cpp
+++ b/src/mongo/db/vector_clock_mongod.cpp
@@ -35,7 +35,9 @@
#include "mongo/db/persistent_task_store.h"
#include "mongo/db/repl/replica_set_aware_service.h"
#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/s/topology_time_ticker.h"
#include "mongo/db/vector_clock_document_gen.h"
#include "mongo/db/vector_clock_mutable.h"
#include "mongo/executor/scoped_task_executor.h"
@@ -92,7 +94,7 @@ private:
// ReplicaSetAwareService methods implementation
void onStartup(OperationContext* opCtx) override {}
- void onInitialDataAvailable(OperationContext* opCtx, bool isMajorityDataAvailable) override {}
+ void onInitialDataAvailable(OperationContext* opCtx, bool isMajorityDataAvailable) override;
void onShutdown() override {}
void onStepUpBegin(OperationContext* opCtx, long long term) override;
void onStepUpComplete(OperationContext* opCtx, long long term) override {}
@@ -185,9 +187,15 @@ VectorClockMongoD::~VectorClockMongoD() = default;
void VectorClockMongoD::onStepUpBegin(OperationContext* opCtx, long long term) {
stdx::lock_guard lg(_mutex);
_durableTime.reset();
+}
+
+void VectorClockMongoD::onStepDown() {
+ stdx::lock_guard lg(_mutex);
+ _durableTime.reset();
+}
- // Initialize the config server's topology time to the maximum topology time from
- // `config.shards` collection instead of using `Timestamp(0 ,0)`.
+void VectorClockMongoD::onInitialDataAvailable(OperationContext* opCtx,
+ bool isMajorityDataAvailable) {
if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
const auto maxTopologyTime{[&opCtx]() -> boost::optional<Timestamp> {
DBDirectClient client{opCtx};
@@ -206,16 +214,24 @@ void VectorClockMongoD::onStepUpBegin(OperationContext* opCtx, long long term) {
}()};
if (maxTopologyTime) {
- _advanceComponentTimeTo(Component::TopologyTime, LogicalTime(*maxTopologyTime));
+ if (isMajorityDataAvailable) {
+ // The maxTopologyTime is majority committed. Thus, we can start gossiping it.
+ _advanceComponentTimeTo(Component::TopologyTime, LogicalTime(*maxTopologyTime));
+ } else {
+ // There is no guarantee that the maxTopologyTime is majority committed and we don't
+ // have a way to obtain the commit time associated with it (init sync scenario).
+ // The only guarantee that we have at this point is that any majority read
+ // that comes afterwards will read, at least, from the initialDataTimestamp. Thus,
+ // we introduce an artificial tick point <initialDataTimestamp, maxTopologyTime>.
+ const auto initialDataTimestamp =
+ repl::ReplicationCoordinator::get(opCtx)->getMyLastAppliedOpTime();
+ TopologyTimeTicker::get(opCtx).onNewLocallyCommittedTopologyTimeAvailable(
+ initialDataTimestamp.getTimestamp(), *maxTopologyTime);
+ }
}
}
}
-void VectorClockMongoD::onStepDown() {
- stdx::lock_guard lg(_mutex);
- _durableTime.reset();
-}
-
void VectorClockMongoD::onBecomeArbiter() {
// The node has become an arbiter, hence will not need logical clock for external operations.
_disable();