From 97fd38d397f586d2d296714cfe3a27dafb37b26a Mon Sep 17 00:00:00 2001 From: Pierlauro Sciarelli Date: Wed, 15 Jul 2020 19:46:11 +0200 Subject: SERVER-48717 Implement the persist/recover functionalities of the VectorClock --- src/mongo/db/SConscript | 2 + src/mongo/db/namespace_string.cpp | 5 +- src/mongo/db/namespace_string.h | 3 + src/mongo/db/vector_clock.cpp | 1 + src/mongo/db/vector_clock.h | 26 +++ src/mongo/db/vector_clock_mongod.cpp | 254 ++++++++++++++++++++++++++++++ src/mongo/db/vector_clock_mongod_test.cpp | 195 ++++++++++++++++++----- 7 files changed, 445 insertions(+), 41 deletions(-) diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 6c6b8ce2301..a649e04031d 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -1613,6 +1613,8 @@ env.Library( 'vector_clock_mutable', ], LIBDEPS_PRIVATE=[ + 'dbdirectclient', + 'rw_concern_d', 'repl/replica_set_aware_service', 'server_options_core', ], diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp index 92c46ef6652..8b23b28045c 100644 --- a/src/mongo/db/namespace_string.cpp +++ b/src/mongo/db/namespace_string.cpp @@ -87,6 +87,9 @@ const NamespaceString NamespaceString::kRangeDeletionNamespace(NamespaceString:: "rangeDeletions"); const NamespaceString NamespaceString::kConfigSettingsNamespace(NamespaceString::kConfigDb, "settings"); +const NamespaceString NamespaceString::kVectorClockNamespace(NamespaceString::kConfigDb, + "vectorClock"); + bool NamespaceString::isListCollectionsCursorNS() const { return coll() == listCollectionsCursorCol; @@ -217,7 +220,7 @@ bool NamespaceString::isNamespaceAlwaysUnsharded() const { // Certain config collections can never be sharded if (ns() == kSessionTransactionsTableNamespace.ns() || ns() == kRangeDeletionNamespace.ns() || - ns() == kTransactionCoordinatorsNamespace.ns() || + ns() == kTransactionCoordinatorsNamespace.ns() || ns() == kVectorClockNamespace.ns() || ns() == kMigrationCoordinatorsNamespace.ns() || ns() == kIndexBuildEntryNamespace.ns()) return true; diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h index c80d9eb3bb1..566b9b159ad 100644 --- a/src/mongo/db/namespace_string.h +++ b/src/mongo/db/namespace_string.h @@ -121,6 +121,9 @@ public: // Namespace for balancer settings and default read and write concerns. static const NamespaceString kConfigSettingsNamespace; + // Namespace for vector clock state. + static const NamespaceString kVectorClockNamespace; + /** * Constructs an empty NamespaceString. */ diff --git a/src/mongo/db/vector_clock.cpp b/src/mongo/db/vector_clock.cpp index 5ea6b280bed..2168c852c33 100644 --- a/src/mongo/db/vector_clock.cpp +++ b/src/mongo/db/vector_clock.cpp @@ -32,6 +32,7 @@ #include "mongo/db/vector_clock.h" #include "mongo/bson/util/bson_extract.h" +#include "mongo/client/query.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/logical_clock_gen.h" #include "mongo/db/logical_time_validator.h" diff --git a/src/mongo/db/vector_clock.h b/src/mongo/db/vector_clock.h index 6434cd3aa6b..750f3a9133c 100644 --- a/src/mongo/db/vector_clock.h +++ b/src/mongo/db/vector_clock.h @@ -31,11 +31,14 @@ #include +#include "mongo/client/query.h" #include "mongo/db/logical_time.h" #include "mongo/db/operation_context.h" #include "mongo/db/service_context.h" +#include "mongo/db/vector_clock_document_gen.h" #include "mongo/platform/mutex.h" #include "mongo/transport/session.h" +#include "mongo/util/static_immortal.h" namespace mongo { @@ -132,9 +135,32 @@ public: */ bool isEnabled() const; + /* + * Methods to save/recover the the vector clock to/from persistent storage. Subclasses are + * eventually expected to override those method to provide persistence mechanisms. Default + * implementations result in a NOP. + */ + virtual SharedSemiFuture persist(OperationContext* opCtx) { + return SharedSemiFuture(); + } + virtual SharedSemiFuture recover(OperationContext* opCtx) { + return SharedSemiFuture(); + } + virtual void waitForInMemoryVectorClockToBePersisted(OperationContext* opCtx) {} + virtual void waitForVectorClockToBeRecovered(OperationContext* opCtx){}; + void resetVectorClock_forTest(); void advanceTime_forTest(Component component, LogicalTime newTime); + // Query to use when reading/writing the vector clock state document. + static const Query& kStateQuery() { + static StaticImmortal q{QUERY(VectorClockDocument::k_idFieldName << kDocIdKey)}; + return *q; + } + + // The _id value of the vector clock singleton document. + static constexpr StringData kDocIdKey = "vectorClockState"_sd; + protected: class ComponentFormat { diff --git a/src/mongo/db/vector_clock_mongod.cpp b/src/mongo/db/vector_clock_mongod.cpp index ab60403d149..c0c1caea299 100644 --- a/src/mongo/db/vector_clock_mongod.cpp +++ b/src/mongo/db/vector_clock_mongod.cpp @@ -26,12 +26,18 @@ * exception statement from all source files in the program, then also delete * it in the license file. */ +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault #include "mongo/platform/basic.h" #include "mongo/db/logical_time_validator.h" #include "mongo/db/repl/replica_set_aware_service.h" +#include "mongo/db/s/persistent_task_store.h" +#include "mongo/db/vector_clock_document_gen.h" #include "mongo/db/vector_clock_mutable.h" +#include "mongo/executor/task_executor_pool.h" +#include "mongo/logv2/log.h" +#include "mongo/util/concurrency/thread_pool.h" namespace mongo { namespace { @@ -53,6 +59,12 @@ public: LogicalTime tick(Component component, uint64_t nTicks) override; void tickTo(Component component, LogicalTime newTime) override; + SharedSemiFuture persist(OperationContext* opCtx) override; + void waitForInMemoryVectorClockToBePersisted(OperationContext* opCtx) override; + + SharedSemiFuture recover(OperationContext* opCtx) override; + void waitForVectorClockToBeRecovered(OperationContext* opCtx) override; + protected: bool _gossipOutInternal(OperationContext* opCtx, BSONObjBuilder* out, @@ -73,6 +85,206 @@ private: void onStepUpComplete(OperationContext* opCtx, long long term) override {} void onStepDown() override {} void onBecomeArbiter() override; + + void _recoverComponent(OperationContext* opCtx, + const BSONObj& in, + LogicalTimeArray* newTime, + Component component); + + void _persistComponent(OperationContext* opCtx, + BSONObjBuilder* out, + const VectorTime& time, + Component component) const; + + static const ComponentArray> _vectorClockStateFormatters; + + /* + * Manages the components persistence format, stripping field names of intitial '$' symbol. + */ + class PersistenceComponentFormat : public VectorClock::ComponentFormat { + public: + using ComponentFormat::ComponentFormat; + virtual ~PersistenceComponentFormat() = default; + + const std::string bsonFieldName = _fieldName[0] == '$' ? _fieldName.substr(1) : _fieldName; + + bool out(ServiceContext* service, + OperationContext* opCtx, + bool permitRefresh, + BSONObjBuilder* out, + LogicalTime time, + Component component) const override { + out->append(bsonFieldName, time.asTimestamp()); + return true; + } + + LogicalTime in(ServiceContext* service, + OperationContext* opCtx, + const BSONObj& in, + bool couldBeUnauthenticated, + Component component) const override { + const auto componentElem(in[bsonFieldName]); + + uassert(ErrorCodes::InvalidBSON, + str::stream() << bsonFieldName << " field not found", + !componentElem.eoo()); + + uassert(ErrorCodes::BadValue, + str::stream() << _fieldName << " is not a Timestamp", + componentElem.type() == bsonTimestamp); + + return LogicalTime(componentElem.timestamp()); + } + }; + + /* + * A VectorClockStateOperation represents an asynchronous operation on the VectorClockDocument + * guarded by a mutex. Calling threads are joining the ongoing operation or - in case no + * operation is in progress - scheduling a new one. + */ + class VectorClockStateOperation { + public: + VectorClockStateOperation() = default; + ~VectorClockStateOperation() = default; + + SharedSemiFuture performOperation(VectorClockMongoD* vectorClock, + ServiceContext* serviceContext) { + stdx::lock_guard lk(_opMutex); + _opFuture = + _opFuture.thenRunOn(_getExecutorPool()) + .then([this, vectorClock, serviceContext, initialGeneration = _generation] { + stdx::unique_lock lk(_opMutex); + + invariant(_generation >= initialGeneration); + if (_generation > initialGeneration) { + // The last run of this operation has definitively observed the + // scheduling thread's in-memory state. There is no need to run + // this operation again. + return; + } + ++_generation; + + lk.unlock(); + + ThreadClient tc("VectorClockStateOperation", serviceContext); + { + stdx::lock_guard lk(*tc.get()); + tc->setSystemOperationKillable(lk); + } + + const auto opCtx = tc->makeOperationContext(); + execute(vectorClock, opCtx.get()); + }) + .onError([=](const Status status) { + LOGV2(4924402, + "Error while performing a VectorClockStateOperation", + "opName"_attr = getOperationName(), + "error"_attr = status); + return status; + }) + .share(); + + return _opFuture; + } + + virtual std::string getOperationName() = 0; + + void waitForCompletion() { + _opFuture.get(); + } + + protected: + virtual void execute(VectorClockMongoD* vectorClock, OperationContext* opCtx) = 0; + + private: + std::shared_ptr _getExecutorPool() { + static Mutex mutex = MONGO_MAKE_LATCH("VectorClockStateOperation::_executorMutex"); + static std::shared_ptr executor; + + stdx::lock_guard lg(mutex); + if (!executor) { + ThreadPool::Options options; + options.poolName = "VectorClockStateOperation"; + options.minThreads = 0; + options.maxThreads = 1; + executor = std::make_shared(std::move(options)); + executor->startup(); + } + + return executor; + } + + Mutex _opMutex = MONGO_MAKE_LATCH(); + SharedSemiFuture _opFuture = SharedSemiFuture(); + size_t _generation = 0; + }; + + /* + * VectorClockStateOperation persisting configTime and topologyTime in the VectorClockDocument. + */ + class PersistOperation : public VectorClockStateOperation { + void execute(VectorClockMongoD* vectorClock, OperationContext* opCtx) override { + const auto time = vectorClock->getTime(); + + NamespaceString nss(NamespaceString::kVectorClockNamespace); + PersistentTaskStore store(nss); + + BSONObjBuilder bob; + VectorClockDocument vcd; + + vectorClock->_persistComponent(opCtx, &bob, time, Component::ConfigTime); + vectorClock->_persistComponent(opCtx, &bob, time, Component::TopologyTime); + + auto obj = bob.done(); + vcd.setVectorClock(obj); + + store.update(opCtx, + VectorClock::kStateQuery(), + vcd.toBSON(), + WriteConcerns::kMajorityWriteConcern, + true); + } + + std::string getOperationName() override { + return "persist"; + } + }; + + /* + * VectorClockStateOperation recovering configTime and topologyTime from the + * VectorClockDocument. + */ + class RecoverOperation : public VectorClockStateOperation { + void execute(VectorClockMongoD* vectorClock, OperationContext* opCtx) override { + NamespaceString nss(NamespaceString::kVectorClockNamespace); + PersistentTaskStore store(nss); + + int nDocuments = store.count(opCtx, VectorClock::kStateQuery()); + if (nDocuments == 0) { + LOGV2_DEBUG(4924403, 2, "No VectorClockDocument to recover"); + return; + } + fassert(4924404, nDocuments == 1); + + store.forEach(opCtx, VectorClock::kStateQuery(), [&](const VectorClockDocument& vcd) { + BSONObj obj = vcd.getVectorClock(); + + LogicalTimeArray newTime; + vectorClock->_recoverComponent(opCtx, obj, &newTime, Component::ConfigTime); + vectorClock->_recoverComponent(opCtx, obj, &newTime, Component::TopologyTime); + vectorClock->_advanceTime(std::move(newTime)); + + return true; + }); + } + + std::string getOperationName() override { + return "recover"; + } + }; + + PersistOperation _persistOperation; + RecoverOperation _recoverOperation; }; const auto vectorClockMongoDDecoration = ServiceContext::declareDecoration(); @@ -80,6 +292,16 @@ const auto vectorClockMongoDDecoration = ServiceContext::declareDecoration vectorClockMongoDRegisterer( "VectorClockMongoD-ReplicaSetAwareServiceRegistration"); + +const VectorClock::ComponentArray> + VectorClockMongoD::_vectorClockStateFormatters{ + std::make_unique( + VectorClock::kClusterTimeFieldName), + std::make_unique( + VectorClock::kConfigTimeFieldName), + std::make_unique( + VectorClock::kTopologyTimeFieldName)}; + VectorClockMongoD* VectorClockMongoD::get(ServiceContext* serviceContext) { return &vectorClockMongoDDecoration(serviceContext); } @@ -191,5 +413,37 @@ void VectorClockMongoD::tickTo(Component component, LogicalTime newTime) { MONGO_UNREACHABLE; } +void VectorClockMongoD::_persistComponent(OperationContext* opCtx, + BSONObjBuilder* out, + const VectorTime& time, + Component component) const { + _vectorClockStateFormatters[component]->out( + _service, opCtx, true, out, time[component], component); +} + +void VectorClockMongoD::_recoverComponent(OperationContext* opCtx, + const BSONObj& in, + LogicalTimeArray* newTime, + Component component) { + (*newTime)[component] = _vectorClockStateFormatters[component]->in( + _service, opCtx, in, true /*couldBeUnauthenticated*/, component); +} + +SharedSemiFuture VectorClockMongoD::persist(OperationContext* opCtx) { + return _persistOperation.performOperation(this, opCtx->getServiceContext()); +} + +void VectorClockMongoD::waitForInMemoryVectorClockToBePersisted(OperationContext* opCtx) { + _persistOperation.waitForCompletion(); +} + +SharedSemiFuture VectorClockMongoD::recover(OperationContext* opCtx) { + return _recoverOperation.performOperation(this, opCtx->getServiceContext()); +} + +void VectorClockMongoD::waitForVectorClockToBeRecovered(OperationContext* opCtx) { + _recoverOperation.waitForCompletion(); +}; + } // namespace } // namespace mongo diff --git a/src/mongo/db/vector_clock_mongod_test.cpp b/src/mongo/db/vector_clock_mongod_test.cpp index 711b1904c13..6e07080d591 100644 --- a/src/mongo/db/vector_clock_mongod_test.cpp +++ b/src/mongo/db/vector_clock_mongod_test.cpp @@ -32,7 +32,9 @@ #include "mongo/db/keys_collection_client_direct.h" #include "mongo/db/keys_collection_manager.h" #include "mongo/db/logical_time_validator.h" +#include "mongo/db/s/persistent_task_store.h" #include "mongo/db/s/sharding_mongod_test_fixture.h" +#include "mongo/db/vector_clock_document_gen.h" #include "mongo/db/vector_clock_mutable.h" #include "mongo/unittest/death_test.h" #include "mongo/util/clock_source_mock.h" @@ -47,6 +49,8 @@ namespace { */ class VectorClockMongoDTest : public ShardingMongodTestFixture { protected: + using Component = VectorClock::Component; + void setUp() override { ShardingMongodTestFixture::setUp(); @@ -56,7 +60,7 @@ protected: auto keysCollectionClient = std::make_unique(); VectorClockMutable::get(getServiceContext()) - ->tickTo(VectorClock::Component::ClusterTime, LogicalTime(Timestamp(1, 0))); + ->tickTo(Component::ClusterTime, LogicalTime(Timestamp(1, 0))); _keyManager = std::make_shared( "dummy", std::move(keysCollectionClient), Seconds(1000)); @@ -87,17 +91,17 @@ TEST_F(VectorClockMongoDTest, TickClusterTime) { auto vc = VectorClockMutable::get(sc); const auto t0 = vc->getTime(); - ASSERT_EQ(LogicalTime(Timestamp(1, 0)), t0[VectorClock::Component::ClusterTime]); + ASSERT_EQ(LogicalTime(Timestamp(1, 0)), t0[Component::ClusterTime]); - const auto r1 = vc->tick(VectorClock::Component::ClusterTime, 1); + const auto r1 = vc->tick(Component::ClusterTime, 1); const auto t1 = vc->getTime(); - ASSERT_EQ(r1, t1[VectorClock::Component::ClusterTime]); - ASSERT_GT(r1, t0[VectorClock::Component::ClusterTime]); + ASSERT_EQ(r1, t1[Component::ClusterTime]); + ASSERT_GT(r1, t0[Component::ClusterTime]); - const auto r2 = vc->tick(VectorClock::Component::ClusterTime, 2); + const auto r2 = vc->tick(Component::ClusterTime, 2); const auto t2 = vc->getTime(); ASSERT_GT(r2, r1); - ASSERT_GT(t2[VectorClock::Component::ClusterTime], r1); + ASSERT_GT(t2[Component::ClusterTime], r1); } TEST_F(VectorClockMongoDTest, TickToClusterTime) { @@ -105,43 +109,43 @@ TEST_F(VectorClockMongoDTest, TickToClusterTime) { auto vc = VectorClockMutable::get(sc); const auto t0 = vc->getTime(); - ASSERT_EQ(LogicalTime(Timestamp(1, 0)), t0[VectorClock::Component::ClusterTime]); + ASSERT_EQ(LogicalTime(Timestamp(1, 0)), t0[Component::ClusterTime]); - vc->tickTo(VectorClock::Component::ClusterTime, LogicalTime(Timestamp(1, 1))); + vc->tickTo(Component::ClusterTime, LogicalTime(Timestamp(1, 1))); const auto t1 = vc->getTime(); - ASSERT_EQ(LogicalTime(Timestamp(1, 1)), t1[VectorClock::Component::ClusterTime]); + ASSERT_EQ(LogicalTime(Timestamp(1, 1)), t1[Component::ClusterTime]); - vc->tickTo(VectorClock::Component::ClusterTime, LogicalTime(Timestamp(3, 3))); + vc->tickTo(Component::ClusterTime, LogicalTime(Timestamp(3, 3))); const auto t2 = vc->getTime(); - ASSERT_EQ(LogicalTime(Timestamp(3, 3)), t2[VectorClock::Component::ClusterTime]); + ASSERT_EQ(LogicalTime(Timestamp(3, 3)), t2[Component::ClusterTime]); - vc->tickTo(VectorClock::Component::ClusterTime, LogicalTime(Timestamp(2, 2))); + vc->tickTo(Component::ClusterTime, LogicalTime(Timestamp(2, 2))); const auto t3 = vc->getTime(); - ASSERT_EQ(LogicalTime(Timestamp(3, 3)), t3[VectorClock::Component::ClusterTime]); + ASSERT_EQ(LogicalTime(Timestamp(3, 3)), t3[Component::ClusterTime]); } DEATH_TEST_F(VectorClockMongoDTest, CannotTickConfigTime, "Hit a MONGO_UNREACHABLE") { auto sc = getServiceContext(); auto vc = VectorClockMutable::get(sc); - vc->tick(VectorClock::Component::ConfigTime, 1); + vc->tick(Component::ConfigTime, 1); } DEATH_TEST_F(VectorClockMongoDTest, CannotTickToConfigTime, "Hit a MONGO_UNREACHABLE") { auto sc = getServiceContext(); auto vc = VectorClockMutable::get(sc); - vc->tickTo(VectorClock::Component::ConfigTime, LogicalTime()); + vc->tickTo(Component::ConfigTime, LogicalTime()); } DEATH_TEST_F(VectorClockMongoDTest, CannotTickTopologyTime, "Hit a MONGO_UNREACHABLE") { auto sc = getServiceContext(); auto vc = VectorClockMutable::get(sc); - vc->tick(VectorClock::Component::TopologyTime, 1); + vc->tick(Component::TopologyTime, 1); } DEATH_TEST_F(VectorClockMongoDTest, CannotTickToTopologyTime, "Hit a MONGO_UNREACHABLE") { auto sc = getServiceContext(); auto vc = VectorClockMutable::get(sc); - vc->tickTo(VectorClock::Component::TopologyTime, LogicalTime()); + vc->tickTo(Component::TopologyTime, LogicalTime()); } TEST_F(VectorClockMongoDTest, GossipOutInternal) { @@ -151,7 +155,7 @@ TEST_F(VectorClockMongoDTest, GossipOutInternal) { LogicalTimeValidator::get(getServiceContext())->enableKeyGenerator(operationContext(), true); refreshKeyManager(); - const auto clusterTime = vc->tick(VectorClock::Component::ClusterTime, 1); + const auto clusterTime = vc->tick(Component::ClusterTime, 1); BSONObjBuilder bob; vc->gossipOut(nullptr, &bob, transport::Session::kInternalClient); @@ -172,7 +176,7 @@ TEST_F(VectorClockMongoDTest, GossipOutExternal) { LogicalTimeValidator::get(getServiceContext())->enableKeyGenerator(operationContext(), true); refreshKeyManager(); - const auto clusterTime = vc->tick(VectorClock::Component::ClusterTime, 1); + const auto clusterTime = vc->tick(Component::ClusterTime, 1); BSONObjBuilder bob; vc->gossipOut(nullptr, &bob); @@ -190,7 +194,7 @@ TEST_F(VectorClockMongoDTest, GossipInInternal) { auto sc = getServiceContext(); auto vc = VectorClockMutable::get(sc); - vc->tick(VectorClock::Component::ClusterTime, 1); + vc->tick(Component::ClusterTime, 1); auto dummySignature = BSON("hash" << BSONBinData("\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1", 20, BinDataGeneral) @@ -205,9 +209,9 @@ TEST_F(VectorClockMongoDTest, GossipInInternal) { // On plain replset servers, gossip in from internal clients should update $clusterTime, but not // $configTime or $topologyTime. auto afterTime = vc->getTime(); - ASSERT_EQ(afterTime[VectorClock::Component::ClusterTime].asTimestamp(), Timestamp(2, 2)); - ASSERT_EQ(afterTime[VectorClock::Component::ConfigTime].asTimestamp(), Timestamp(0, 0)); - ASSERT_EQ(afterTime[VectorClock::Component::TopologyTime].asTimestamp(), Timestamp(0, 0)); + ASSERT_EQ(afterTime[Component::ClusterTime].asTimestamp(), Timestamp(2, 2)); + ASSERT_EQ(afterTime[Component::ConfigTime].asTimestamp(), Timestamp(0, 0)); + ASSERT_EQ(afterTime[Component::TopologyTime].asTimestamp(), Timestamp(0, 0)); vc->gossipIn(nullptr, BSON("$clusterTime" @@ -217,9 +221,9 @@ TEST_F(VectorClockMongoDTest, GossipInInternal) { transport::Session::kInternalClient); auto afterTime2 = vc->getTime(); - ASSERT_EQ(afterTime2[VectorClock::Component::ClusterTime].asTimestamp(), Timestamp(2, 2)); - ASSERT_EQ(afterTime2[VectorClock::Component::ConfigTime].asTimestamp(), Timestamp(0, 0)); - ASSERT_EQ(afterTime2[VectorClock::Component::TopologyTime].asTimestamp(), Timestamp(0, 0)); + ASSERT_EQ(afterTime2[Component::ClusterTime].asTimestamp(), Timestamp(2, 2)); + ASSERT_EQ(afterTime2[Component::ConfigTime].asTimestamp(), Timestamp(0, 0)); + ASSERT_EQ(afterTime2[Component::TopologyTime].asTimestamp(), Timestamp(0, 0)); vc->gossipIn(nullptr, BSON("$clusterTime" @@ -229,16 +233,16 @@ TEST_F(VectorClockMongoDTest, GossipInInternal) { transport::Session::kInternalClient); auto afterTime3 = vc->getTime(); - ASSERT_EQ(afterTime3[VectorClock::Component::ClusterTime].asTimestamp(), Timestamp(3, 3)); - ASSERT_EQ(afterTime3[VectorClock::Component::ConfigTime].asTimestamp(), Timestamp(0, 0)); - ASSERT_EQ(afterTime3[VectorClock::Component::TopologyTime].asTimestamp(), Timestamp(0, 0)); + ASSERT_EQ(afterTime3[Component::ClusterTime].asTimestamp(), Timestamp(3, 3)); + ASSERT_EQ(afterTime3[Component::ConfigTime].asTimestamp(), Timestamp(0, 0)); + ASSERT_EQ(afterTime3[Component::TopologyTime].asTimestamp(), Timestamp(0, 0)); } TEST_F(VectorClockMongoDTest, GossipInExternal) { auto sc = getServiceContext(); auto vc = VectorClockMutable::get(sc); - vc->tick(VectorClock::Component::ClusterTime, 1); + vc->tick(Component::ClusterTime, 1); auto dummySignature = BSON("hash" << BSONBinData("\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1", 20, BinDataGeneral) @@ -252,9 +256,9 @@ TEST_F(VectorClockMongoDTest, GossipInExternal) { // On plain replset servers, gossip in from external clients should update $clusterTime, but not // $configTime or $topologyTime. auto afterTime = vc->getTime(); - ASSERT_EQ(afterTime[VectorClock::Component::ClusterTime].asTimestamp(), Timestamp(2, 2)); - ASSERT_EQ(afterTime[VectorClock::Component::ConfigTime].asTimestamp(), Timestamp(0, 0)); - ASSERT_EQ(afterTime[VectorClock::Component::TopologyTime].asTimestamp(), Timestamp(0, 0)); + ASSERT_EQ(afterTime[Component::ClusterTime].asTimestamp(), Timestamp(2, 2)); + ASSERT_EQ(afterTime[Component::ConfigTime].asTimestamp(), Timestamp(0, 0)); + ASSERT_EQ(afterTime[Component::TopologyTime].asTimestamp(), Timestamp(0, 0)); vc->gossipIn(nullptr, BSON("$clusterTime" @@ -263,9 +267,9 @@ TEST_F(VectorClockMongoDTest, GossipInExternal) { false); auto afterTime2 = vc->getTime(); - ASSERT_EQ(afterTime2[VectorClock::Component::ClusterTime].asTimestamp(), Timestamp(2, 2)); - ASSERT_EQ(afterTime2[VectorClock::Component::ConfigTime].asTimestamp(), Timestamp(0, 0)); - ASSERT_EQ(afterTime2[VectorClock::Component::TopologyTime].asTimestamp(), Timestamp(0, 0)); + ASSERT_EQ(afterTime2[Component::ClusterTime].asTimestamp(), Timestamp(2, 2)); + ASSERT_EQ(afterTime2[Component::ConfigTime].asTimestamp(), Timestamp(0, 0)); + ASSERT_EQ(afterTime2[Component::TopologyTime].asTimestamp(), Timestamp(0, 0)); vc->gossipIn(nullptr, BSON("$clusterTime" @@ -274,9 +278,120 @@ TEST_F(VectorClockMongoDTest, GossipInExternal) { false); auto afterTime3 = vc->getTime(); - ASSERT_EQ(afterTime3[VectorClock::Component::ClusterTime].asTimestamp(), Timestamp(3, 3)); - ASSERT_EQ(afterTime2[VectorClock::Component::ConfigTime].asTimestamp(), Timestamp(0, 0)); - ASSERT_EQ(afterTime3[VectorClock::Component::TopologyTime].asTimestamp(), Timestamp(0, 0)); + ASSERT_EQ(afterTime3[Component::ClusterTime].asTimestamp(), Timestamp(3, 3)); + ASSERT_EQ(afterTime2[Component::ConfigTime].asTimestamp(), Timestamp(0, 0)); + ASSERT_EQ(afterTime3[Component::TopologyTime].asTimestamp(), Timestamp(0, 0)); +} + +TEST_F(VectorClockMongoDTest, PersistVectorClockDocument) { + auto sc = getServiceContext(); + auto opCtx = operationContext(); + + auto vc = VectorClockMutable::get(sc); + vc->advanceTime_forTest(Component::ConfigTime, LogicalTime()); + vc->advanceTime_forTest(Component::TopologyTime, LogicalTime()); + + NamespaceString nss(NamespaceString::kVectorClockNamespace); + PersistentTaskStore store(nss); + + // Check that no vectorClockState document is present + ASSERT_EQUALS(store.count(opCtx, VectorClock::kStateQuery()), 0); + + // Persist and check that the vectorClockState document has been persisted + auto future = vc->persist(opCtx); + future.get(); + ASSERT_EQUALS(store.count(opCtx, VectorClock::kStateQuery()), 1); + + // Check that the vectorClockState document is still one after more persist calls + future = vc->persist(opCtx); + vc->waitForInMemoryVectorClockToBePersisted(opCtx); + ASSERT_EQUALS(store.count(opCtx, VectorClock::kStateQuery()), 1); +} + +TEST_F(VectorClockMongoDTest, RecoverVectorClockDocument) { + auto sc = getServiceContext(); + auto opCtx = operationContext(); + const auto configTime = LogicalTime(Timestamp(3, 3)); + const auto topologyTime = LogicalTime(Timestamp(4, 4)); + + auto vc = VectorClockMutable::get(sc); + vc->advanceTime_forTest(Component::ConfigTime, configTime); + vc->advanceTime_forTest(Component::TopologyTime, topologyTime); + + // Persist the vector clock, then reset its components + auto future = vc->persist(opCtx); + future.get(opCtx); + vc->resetVectorClock_forTest(); + + NamespaceString nss(NamespaceString::kVectorClockNamespace); + PersistentTaskStore store(nss); + + future = vc->recover(opCtx); + vc->waitForVectorClockToBeRecovered(opCtx); + + auto time = vc->getTime(); + auto actualConfTime = time[Component::ConfigTime]; + auto actualTopologyTime = time[Component::TopologyTime]; + + ASSERT_EQUALS(actualConfTime, configTime); + ASSERT_EQUALS(actualTopologyTime, topologyTime); +} + +TEST_F(VectorClockMongoDTest, RecoverNotExistingVectorClockDocument) { + auto sc = getServiceContext(); + auto opCtx = operationContext(); + auto vc = VectorClockMutable::get(sc); + + const auto configTime = LogicalTime(Timestamp(3, 3)); + const auto topologyTime = LogicalTime(Timestamp(4, 4)); + vc->advanceTime_forTest(Component::ConfigTime, configTime); + vc->advanceTime_forTest(Component::TopologyTime, topologyTime); + + NamespaceString nss(NamespaceString::kVectorClockNamespace); + PersistentTaskStore store(nss); + + // Check that no recovery document is stored and call recovery + int nDocuments = store.count(opCtx, VectorClock::kStateQuery()); + ASSERT_EQUALS(nDocuments, 0); + + auto future = vc->recover(opCtx); + vc->waitForVectorClockToBeRecovered(opCtx); + + // Verify that times didn't change after an unsuccessful recovery + auto time = vc->getTime(); + auto actualConfTime = time[Component::ConfigTime]; + auto actualTopologyTime = time[Component::TopologyTime]; + + ASSERT_EQUALS(actualConfTime, configTime); + ASSERT_EQUALS(actualTopologyTime, topologyTime); +} + +TEST_F(VectorClockMongoDTest, SubsequentPersistRecoverVectorClockDocument) { + auto sc = getServiceContext(); + auto opCtx = operationContext(); + auto vc = VectorClockMutable::get(sc); + + for (int i = 1; i < 10; i++) { + auto newTime = LogicalTime(Timestamp(i, i)); + vc->advanceTime_forTest(Component::ClusterTime, newTime); + vc->advanceTime_forTest(Component::ConfigTime, newTime); + vc->advanceTime_forTest(Component::TopologyTime, newTime); + + // Persist the vector clock, then reset its components + auto future = vc->persist(opCtx); + future.get(opCtx); + vc->resetVectorClock_forTest(); + + future = vc->recover(opCtx); + vc->waitForVectorClockToBeRecovered(opCtx); + + auto time = vc->getTime(); + auto actualConfTime = time[Component::ConfigTime]; + auto actualTopologyTime = time[Component::TopologyTime]; + + ASSERT_EQUALS(actualConfTime, newTime); + ASSERT_EQUALS(actualTopologyTime, newTime); + } } } // namespace -- cgit v1.2.1