summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPierlauro Sciarelli <pierlauro.sciarelli@mongodb.com>2020-07-15 19:46:11 +0200
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-07-16 21:34:39 +0000
commit97fd38d397f586d2d296714cfe3a27dafb37b26a (patch)
tree4d93577501dadbd70ef885631362de58e0b6d17d
parentdb63f4a342f836eb7c9e39a5a05e33af4b19dd4f (diff)
downloadmongo-97fd38d397f586d2d296714cfe3a27dafb37b26a.tar.gz
SERVER-48717 Implement the persist/recover functionalities of the VectorClock
-rw-r--r--src/mongo/db/SConscript2
-rw-r--r--src/mongo/db/namespace_string.cpp5
-rw-r--r--src/mongo/db/namespace_string.h3
-rw-r--r--src/mongo/db/vector_clock.cpp1
-rw-r--r--src/mongo/db/vector_clock.h26
-rw-r--r--src/mongo/db/vector_clock_mongod.cpp254
-rw-r--r--src/mongo/db/vector_clock_mongod_test.cpp195
7 files changed, 445 insertions, 41 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 6c6b8ce2301..a649e04031d 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -1613,6 +1613,8 @@ env.Library(
'vector_clock_mutable',
],
LIBDEPS_PRIVATE=[
+ 'dbdirectclient',
+ 'rw_concern_d',
'repl/replica_set_aware_service',
'server_options_core',
],
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp
index 92c46ef6652..8b23b28045c 100644
--- a/src/mongo/db/namespace_string.cpp
+++ b/src/mongo/db/namespace_string.cpp
@@ -87,6 +87,9 @@ const NamespaceString NamespaceString::kRangeDeletionNamespace(NamespaceString::
"rangeDeletions");
const NamespaceString NamespaceString::kConfigSettingsNamespace(NamespaceString::kConfigDb,
"settings");
+const NamespaceString NamespaceString::kVectorClockNamespace(NamespaceString::kConfigDb,
+ "vectorClock");
+
bool NamespaceString::isListCollectionsCursorNS() const {
return coll() == listCollectionsCursorCol;
@@ -217,7 +220,7 @@ bool NamespaceString::isNamespaceAlwaysUnsharded() const {
// Certain config collections can never be sharded
if (ns() == kSessionTransactionsTableNamespace.ns() || ns() == kRangeDeletionNamespace.ns() ||
- ns() == kTransactionCoordinatorsNamespace.ns() ||
+ ns() == kTransactionCoordinatorsNamespace.ns() || ns() == kVectorClockNamespace.ns() ||
ns() == kMigrationCoordinatorsNamespace.ns() || ns() == kIndexBuildEntryNamespace.ns())
return true;
diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h
index c80d9eb3bb1..566b9b159ad 100644
--- a/src/mongo/db/namespace_string.h
+++ b/src/mongo/db/namespace_string.h
@@ -121,6 +121,9 @@ public:
// Namespace for balancer settings and default read and write concerns.
static const NamespaceString kConfigSettingsNamespace;
+ // Namespace for vector clock state.
+ static const NamespaceString kVectorClockNamespace;
+
/**
* Constructs an empty NamespaceString.
*/
diff --git a/src/mongo/db/vector_clock.cpp b/src/mongo/db/vector_clock.cpp
index 5ea6b280bed..2168c852c33 100644
--- a/src/mongo/db/vector_clock.cpp
+++ b/src/mongo/db/vector_clock.cpp
@@ -32,6 +32,7 @@
#include "mongo/db/vector_clock.h"
#include "mongo/bson/util/bson_extract.h"
+#include "mongo/client/query.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/logical_clock_gen.h"
#include "mongo/db/logical_time_validator.h"
diff --git a/src/mongo/db/vector_clock.h b/src/mongo/db/vector_clock.h
index 6434cd3aa6b..750f3a9133c 100644
--- a/src/mongo/db/vector_clock.h
+++ b/src/mongo/db/vector_clock.h
@@ -31,11 +31,14 @@
#include <array>
+#include "mongo/client/query.h"
#include "mongo/db/logical_time.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/vector_clock_document_gen.h"
#include "mongo/platform/mutex.h"
#include "mongo/transport/session.h"
+#include "mongo/util/static_immortal.h"
namespace mongo {
@@ -132,9 +135,32 @@ public:
*/
bool isEnabled() const;
+ /*
+ * Methods to save/recover the the vector clock to/from persistent storage. Subclasses are
+ * eventually expected to override those method to provide persistence mechanisms. Default
+ * implementations result in a NOP.
+ */
+ virtual SharedSemiFuture<void> persist(OperationContext* opCtx) {
+ return SharedSemiFuture<void>();
+ }
+ virtual SharedSemiFuture<void> recover(OperationContext* opCtx) {
+ return SharedSemiFuture<void>();
+ }
+ virtual void waitForInMemoryVectorClockToBePersisted(OperationContext* opCtx) {}
+ virtual void waitForVectorClockToBeRecovered(OperationContext* opCtx){};
+
void resetVectorClock_forTest();
void advanceTime_forTest(Component component, LogicalTime newTime);
+ // Query to use when reading/writing the vector clock state document.
+ static const Query& kStateQuery() {
+ static StaticImmortal<Query> q{QUERY(VectorClockDocument::k_idFieldName << kDocIdKey)};
+ return *q;
+ }
+
+ // The _id value of the vector clock singleton document.
+ static constexpr StringData kDocIdKey = "vectorClockState"_sd;
+
protected:
class ComponentFormat {
diff --git a/src/mongo/db/vector_clock_mongod.cpp b/src/mongo/db/vector_clock_mongod.cpp
index ab60403d149..c0c1caea299 100644
--- a/src/mongo/db/vector_clock_mongod.cpp
+++ b/src/mongo/db/vector_clock_mongod.cpp
@@ -26,12 +26,18 @@
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault
#include "mongo/platform/basic.h"
#include "mongo/db/logical_time_validator.h"
#include "mongo/db/repl/replica_set_aware_service.h"
+#include "mongo/db/s/persistent_task_store.h"
+#include "mongo/db/vector_clock_document_gen.h"
#include "mongo/db/vector_clock_mutable.h"
+#include "mongo/executor/task_executor_pool.h"
+#include "mongo/logv2/log.h"
+#include "mongo/util/concurrency/thread_pool.h"
namespace mongo {
namespace {
@@ -53,6 +59,12 @@ public:
LogicalTime tick(Component component, uint64_t nTicks) override;
void tickTo(Component component, LogicalTime newTime) override;
+ SharedSemiFuture<void> persist(OperationContext* opCtx) override;
+ void waitForInMemoryVectorClockToBePersisted(OperationContext* opCtx) override;
+
+ SharedSemiFuture<void> recover(OperationContext* opCtx) override;
+ void waitForVectorClockToBeRecovered(OperationContext* opCtx) override;
+
protected:
bool _gossipOutInternal(OperationContext* opCtx,
BSONObjBuilder* out,
@@ -73,6 +85,206 @@ private:
void onStepUpComplete(OperationContext* opCtx, long long term) override {}
void onStepDown() override {}
void onBecomeArbiter() override;
+
+ void _recoverComponent(OperationContext* opCtx,
+ const BSONObj& in,
+ LogicalTimeArray* newTime,
+ Component component);
+
+ void _persistComponent(OperationContext* opCtx,
+ BSONObjBuilder* out,
+ const VectorTime& time,
+ Component component) const;
+
+ static const ComponentArray<std::unique_ptr<ComponentFormat>> _vectorClockStateFormatters;
+
+ /*
+ * Manages the components persistence format, stripping field names of intitial '$' symbol.
+ */
+ class PersistenceComponentFormat : public VectorClock::ComponentFormat {
+ public:
+ using ComponentFormat::ComponentFormat;
+ virtual ~PersistenceComponentFormat() = default;
+
+ const std::string bsonFieldName = _fieldName[0] == '$' ? _fieldName.substr(1) : _fieldName;
+
+ bool out(ServiceContext* service,
+ OperationContext* opCtx,
+ bool permitRefresh,
+ BSONObjBuilder* out,
+ LogicalTime time,
+ Component component) const override {
+ out->append(bsonFieldName, time.asTimestamp());
+ return true;
+ }
+
+ LogicalTime in(ServiceContext* service,
+ OperationContext* opCtx,
+ const BSONObj& in,
+ bool couldBeUnauthenticated,
+ Component component) const override {
+ const auto componentElem(in[bsonFieldName]);
+
+ uassert(ErrorCodes::InvalidBSON,
+ str::stream() << bsonFieldName << " field not found",
+ !componentElem.eoo());
+
+ uassert(ErrorCodes::BadValue,
+ str::stream() << _fieldName << " is not a Timestamp",
+ componentElem.type() == bsonTimestamp);
+
+ return LogicalTime(componentElem.timestamp());
+ }
+ };
+
+ /*
+ * A VectorClockStateOperation represents an asynchronous operation on the VectorClockDocument
+ * guarded by a mutex. Calling threads are joining the ongoing operation or - in case no
+ * operation is in progress - scheduling a new one.
+ */
+ class VectorClockStateOperation {
+ public:
+ VectorClockStateOperation() = default;
+ ~VectorClockStateOperation() = default;
+
+ SharedSemiFuture<void> performOperation(VectorClockMongoD* vectorClock,
+ ServiceContext* serviceContext) {
+ stdx::lock_guard<Latch> lk(_opMutex);
+ _opFuture =
+ _opFuture.thenRunOn(_getExecutorPool())
+ .then([this, vectorClock, serviceContext, initialGeneration = _generation] {
+ stdx::unique_lock<Latch> lk(_opMutex);
+
+ invariant(_generation >= initialGeneration);
+ if (_generation > initialGeneration) {
+ // The last run of this operation has definitively observed the
+ // scheduling thread's in-memory state. There is no need to run
+ // this operation again.
+ return;
+ }
+ ++_generation;
+
+ lk.unlock();
+
+ ThreadClient tc("VectorClockStateOperation", serviceContext);
+ {
+ stdx::lock_guard<Client> lk(*tc.get());
+ tc->setSystemOperationKillable(lk);
+ }
+
+ const auto opCtx = tc->makeOperationContext();
+ execute(vectorClock, opCtx.get());
+ })
+ .onError([=](const Status status) {
+ LOGV2(4924402,
+ "Error while performing a VectorClockStateOperation",
+ "opName"_attr = getOperationName(),
+ "error"_attr = status);
+ return status;
+ })
+ .share();
+
+ return _opFuture;
+ }
+
+ virtual std::string getOperationName() = 0;
+
+ void waitForCompletion() {
+ _opFuture.get();
+ }
+
+ protected:
+ virtual void execute(VectorClockMongoD* vectorClock, OperationContext* opCtx) = 0;
+
+ private:
+ std::shared_ptr<ThreadPool> _getExecutorPool() {
+ static Mutex mutex = MONGO_MAKE_LATCH("VectorClockStateOperation::_executorMutex");
+ static std::shared_ptr<ThreadPool> executor;
+
+ stdx::lock_guard<Latch> lg(mutex);
+ if (!executor) {
+ ThreadPool::Options options;
+ options.poolName = "VectorClockStateOperation";
+ options.minThreads = 0;
+ options.maxThreads = 1;
+ executor = std::make_shared<ThreadPool>(std::move(options));
+ executor->startup();
+ }
+
+ return executor;
+ }
+
+ Mutex _opMutex = MONGO_MAKE_LATCH();
+ SharedSemiFuture<void> _opFuture = SharedSemiFuture<void>();
+ size_t _generation = 0;
+ };
+
+ /*
+ * VectorClockStateOperation persisting configTime and topologyTime in the VectorClockDocument.
+ */
+ class PersistOperation : public VectorClockStateOperation {
+ void execute(VectorClockMongoD* vectorClock, OperationContext* opCtx) override {
+ const auto time = vectorClock->getTime();
+
+ NamespaceString nss(NamespaceString::kVectorClockNamespace);
+ PersistentTaskStore<VectorClockDocument> store(nss);
+
+ BSONObjBuilder bob;
+ VectorClockDocument vcd;
+
+ vectorClock->_persistComponent(opCtx, &bob, time, Component::ConfigTime);
+ vectorClock->_persistComponent(opCtx, &bob, time, Component::TopologyTime);
+
+ auto obj = bob.done();
+ vcd.setVectorClock(obj);
+
+ store.update(opCtx,
+ VectorClock::kStateQuery(),
+ vcd.toBSON(),
+ WriteConcerns::kMajorityWriteConcern,
+ true);
+ }
+
+ std::string getOperationName() override {
+ return "persist";
+ }
+ };
+
+ /*
+ * VectorClockStateOperation recovering configTime and topologyTime from the
+ * VectorClockDocument.
+ */
+ class RecoverOperation : public VectorClockStateOperation {
+ void execute(VectorClockMongoD* vectorClock, OperationContext* opCtx) override {
+ NamespaceString nss(NamespaceString::kVectorClockNamespace);
+ PersistentTaskStore<VectorClockDocument> store(nss);
+
+ int nDocuments = store.count(opCtx, VectorClock::kStateQuery());
+ if (nDocuments == 0) {
+ LOGV2_DEBUG(4924403, 2, "No VectorClockDocument to recover");
+ return;
+ }
+ fassert(4924404, nDocuments == 1);
+
+ store.forEach(opCtx, VectorClock::kStateQuery(), [&](const VectorClockDocument& vcd) {
+ BSONObj obj = vcd.getVectorClock();
+
+ LogicalTimeArray newTime;
+ vectorClock->_recoverComponent(opCtx, obj, &newTime, Component::ConfigTime);
+ vectorClock->_recoverComponent(opCtx, obj, &newTime, Component::TopologyTime);
+ vectorClock->_advanceTime(std::move(newTime));
+
+ return true;
+ });
+ }
+
+ std::string getOperationName() override {
+ return "recover";
+ }
+ };
+
+ PersistOperation _persistOperation;
+ RecoverOperation _recoverOperation;
};
const auto vectorClockMongoDDecoration = ServiceContext::declareDecoration<VectorClockMongoD>();
@@ -80,6 +292,16 @@ const auto vectorClockMongoDDecoration = ServiceContext::declareDecoration<Vecto
const ReplicaSetAwareServiceRegistry::Registerer<VectorClockMongoD> vectorClockMongoDRegisterer(
"VectorClockMongoD-ReplicaSetAwareServiceRegistration");
+
+const VectorClock::ComponentArray<std::unique_ptr<VectorClock::ComponentFormat>>
+ VectorClockMongoD::_vectorClockStateFormatters{
+ std::make_unique<VectorClockMongoD::PersistenceComponentFormat>(
+ VectorClock::kClusterTimeFieldName),
+ std::make_unique<VectorClockMongoD::PersistenceComponentFormat>(
+ VectorClock::kConfigTimeFieldName),
+ std::make_unique<VectorClockMongoD::PersistenceComponentFormat>(
+ VectorClock::kTopologyTimeFieldName)};
+
VectorClockMongoD* VectorClockMongoD::get(ServiceContext* serviceContext) {
return &vectorClockMongoDDecoration(serviceContext);
}
@@ -191,5 +413,37 @@ void VectorClockMongoD::tickTo(Component component, LogicalTime newTime) {
MONGO_UNREACHABLE;
}
+void VectorClockMongoD::_persistComponent(OperationContext* opCtx,
+ BSONObjBuilder* out,
+ const VectorTime& time,
+ Component component) const {
+ _vectorClockStateFormatters[component]->out(
+ _service, opCtx, true, out, time[component], component);
+}
+
+void VectorClockMongoD::_recoverComponent(OperationContext* opCtx,
+ const BSONObj& in,
+ LogicalTimeArray* newTime,
+ Component component) {
+ (*newTime)[component] = _vectorClockStateFormatters[component]->in(
+ _service, opCtx, in, true /*couldBeUnauthenticated*/, component);
+}
+
+SharedSemiFuture<void> VectorClockMongoD::persist(OperationContext* opCtx) {
+ return _persistOperation.performOperation(this, opCtx->getServiceContext());
+}
+
+void VectorClockMongoD::waitForInMemoryVectorClockToBePersisted(OperationContext* opCtx) {
+ _persistOperation.waitForCompletion();
+}
+
+SharedSemiFuture<void> VectorClockMongoD::recover(OperationContext* opCtx) {
+ return _recoverOperation.performOperation(this, opCtx->getServiceContext());
+}
+
+void VectorClockMongoD::waitForVectorClockToBeRecovered(OperationContext* opCtx) {
+ _recoverOperation.waitForCompletion();
+};
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/vector_clock_mongod_test.cpp b/src/mongo/db/vector_clock_mongod_test.cpp
index 711b1904c13..6e07080d591 100644
--- a/src/mongo/db/vector_clock_mongod_test.cpp
+++ b/src/mongo/db/vector_clock_mongod_test.cpp
@@ -32,7 +32,9 @@
#include "mongo/db/keys_collection_client_direct.h"
#include "mongo/db/keys_collection_manager.h"
#include "mongo/db/logical_time_validator.h"
+#include "mongo/db/s/persistent_task_store.h"
#include "mongo/db/s/sharding_mongod_test_fixture.h"
+#include "mongo/db/vector_clock_document_gen.h"
#include "mongo/db/vector_clock_mutable.h"
#include "mongo/unittest/death_test.h"
#include "mongo/util/clock_source_mock.h"
@@ -47,6 +49,8 @@ namespace {
*/
class VectorClockMongoDTest : public ShardingMongodTestFixture {
protected:
+ using Component = VectorClock::Component;
+
void setUp() override {
ShardingMongodTestFixture::setUp();
@@ -56,7 +60,7 @@ protected:
auto keysCollectionClient = std::make_unique<KeysCollectionClientDirect>();
VectorClockMutable::get(getServiceContext())
- ->tickTo(VectorClock::Component::ClusterTime, LogicalTime(Timestamp(1, 0)));
+ ->tickTo(Component::ClusterTime, LogicalTime(Timestamp(1, 0)));
_keyManager = std::make_shared<KeysCollectionManager>(
"dummy", std::move(keysCollectionClient), Seconds(1000));
@@ -87,17 +91,17 @@ TEST_F(VectorClockMongoDTest, TickClusterTime) {
auto vc = VectorClockMutable::get(sc);
const auto t0 = vc->getTime();
- ASSERT_EQ(LogicalTime(Timestamp(1, 0)), t0[VectorClock::Component::ClusterTime]);
+ ASSERT_EQ(LogicalTime(Timestamp(1, 0)), t0[Component::ClusterTime]);
- const auto r1 = vc->tick(VectorClock::Component::ClusterTime, 1);
+ const auto r1 = vc->tick(Component::ClusterTime, 1);
const auto t1 = vc->getTime();
- ASSERT_EQ(r1, t1[VectorClock::Component::ClusterTime]);
- ASSERT_GT(r1, t0[VectorClock::Component::ClusterTime]);
+ ASSERT_EQ(r1, t1[Component::ClusterTime]);
+ ASSERT_GT(r1, t0[Component::ClusterTime]);
- const auto r2 = vc->tick(VectorClock::Component::ClusterTime, 2);
+ const auto r2 = vc->tick(Component::ClusterTime, 2);
const auto t2 = vc->getTime();
ASSERT_GT(r2, r1);
- ASSERT_GT(t2[VectorClock::Component::ClusterTime], r1);
+ ASSERT_GT(t2[Component::ClusterTime], r1);
}
TEST_F(VectorClockMongoDTest, TickToClusterTime) {
@@ -105,43 +109,43 @@ TEST_F(VectorClockMongoDTest, TickToClusterTime) {
auto vc = VectorClockMutable::get(sc);
const auto t0 = vc->getTime();
- ASSERT_EQ(LogicalTime(Timestamp(1, 0)), t0[VectorClock::Component::ClusterTime]);
+ ASSERT_EQ(LogicalTime(Timestamp(1, 0)), t0[Component::ClusterTime]);
- vc->tickTo(VectorClock::Component::ClusterTime, LogicalTime(Timestamp(1, 1)));
+ vc->tickTo(Component::ClusterTime, LogicalTime(Timestamp(1, 1)));
const auto t1 = vc->getTime();
- ASSERT_EQ(LogicalTime(Timestamp(1, 1)), t1[VectorClock::Component::ClusterTime]);
+ ASSERT_EQ(LogicalTime(Timestamp(1, 1)), t1[Component::ClusterTime]);
- vc->tickTo(VectorClock::Component::ClusterTime, LogicalTime(Timestamp(3, 3)));
+ vc->tickTo(Component::ClusterTime, LogicalTime(Timestamp(3, 3)));
const auto t2 = vc->getTime();
- ASSERT_EQ(LogicalTime(Timestamp(3, 3)), t2[VectorClock::Component::ClusterTime]);
+ ASSERT_EQ(LogicalTime(Timestamp(3, 3)), t2[Component::ClusterTime]);
- vc->tickTo(VectorClock::Component::ClusterTime, LogicalTime(Timestamp(2, 2)));
+ vc->tickTo(Component::ClusterTime, LogicalTime(Timestamp(2, 2)));
const auto t3 = vc->getTime();
- ASSERT_EQ(LogicalTime(Timestamp(3, 3)), t3[VectorClock::Component::ClusterTime]);
+ ASSERT_EQ(LogicalTime(Timestamp(3, 3)), t3[Component::ClusterTime]);
}
DEATH_TEST_F(VectorClockMongoDTest, CannotTickConfigTime, "Hit a MONGO_UNREACHABLE") {
auto sc = getServiceContext();
auto vc = VectorClockMutable::get(sc);
- vc->tick(VectorClock::Component::ConfigTime, 1);
+ vc->tick(Component::ConfigTime, 1);
}
DEATH_TEST_F(VectorClockMongoDTest, CannotTickToConfigTime, "Hit a MONGO_UNREACHABLE") {
auto sc = getServiceContext();
auto vc = VectorClockMutable::get(sc);
- vc->tickTo(VectorClock::Component::ConfigTime, LogicalTime());
+ vc->tickTo(Component::ConfigTime, LogicalTime());
}
DEATH_TEST_F(VectorClockMongoDTest, CannotTickTopologyTime, "Hit a MONGO_UNREACHABLE") {
auto sc = getServiceContext();
auto vc = VectorClockMutable::get(sc);
- vc->tick(VectorClock::Component::TopologyTime, 1);
+ vc->tick(Component::TopologyTime, 1);
}
DEATH_TEST_F(VectorClockMongoDTest, CannotTickToTopologyTime, "Hit a MONGO_UNREACHABLE") {
auto sc = getServiceContext();
auto vc = VectorClockMutable::get(sc);
- vc->tickTo(VectorClock::Component::TopologyTime, LogicalTime());
+ vc->tickTo(Component::TopologyTime, LogicalTime());
}
TEST_F(VectorClockMongoDTest, GossipOutInternal) {
@@ -151,7 +155,7 @@ TEST_F(VectorClockMongoDTest, GossipOutInternal) {
LogicalTimeValidator::get(getServiceContext())->enableKeyGenerator(operationContext(), true);
refreshKeyManager();
- const auto clusterTime = vc->tick(VectorClock::Component::ClusterTime, 1);
+ const auto clusterTime = vc->tick(Component::ClusterTime, 1);
BSONObjBuilder bob;
vc->gossipOut(nullptr, &bob, transport::Session::kInternalClient);
@@ -172,7 +176,7 @@ TEST_F(VectorClockMongoDTest, GossipOutExternal) {
LogicalTimeValidator::get(getServiceContext())->enableKeyGenerator(operationContext(), true);
refreshKeyManager();
- const auto clusterTime = vc->tick(VectorClock::Component::ClusterTime, 1);
+ const auto clusterTime = vc->tick(Component::ClusterTime, 1);
BSONObjBuilder bob;
vc->gossipOut(nullptr, &bob);
@@ -190,7 +194,7 @@ TEST_F(VectorClockMongoDTest, GossipInInternal) {
auto sc = getServiceContext();
auto vc = VectorClockMutable::get(sc);
- vc->tick(VectorClock::Component::ClusterTime, 1);
+ vc->tick(Component::ClusterTime, 1);
auto dummySignature =
BSON("hash" << BSONBinData("\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1", 20, BinDataGeneral)
@@ -205,9 +209,9 @@ TEST_F(VectorClockMongoDTest, GossipInInternal) {
// On plain replset servers, gossip in from internal clients should update $clusterTime, but not
// $configTime or $topologyTime.
auto afterTime = vc->getTime();
- ASSERT_EQ(afterTime[VectorClock::Component::ClusterTime].asTimestamp(), Timestamp(2, 2));
- ASSERT_EQ(afterTime[VectorClock::Component::ConfigTime].asTimestamp(), Timestamp(0, 0));
- ASSERT_EQ(afterTime[VectorClock::Component::TopologyTime].asTimestamp(), Timestamp(0, 0));
+ ASSERT_EQ(afterTime[Component::ClusterTime].asTimestamp(), Timestamp(2, 2));
+ ASSERT_EQ(afterTime[Component::ConfigTime].asTimestamp(), Timestamp(0, 0));
+ ASSERT_EQ(afterTime[Component::TopologyTime].asTimestamp(), Timestamp(0, 0));
vc->gossipIn(nullptr,
BSON("$clusterTime"
@@ -217,9 +221,9 @@ TEST_F(VectorClockMongoDTest, GossipInInternal) {
transport::Session::kInternalClient);
auto afterTime2 = vc->getTime();
- ASSERT_EQ(afterTime2[VectorClock::Component::ClusterTime].asTimestamp(), Timestamp(2, 2));
- ASSERT_EQ(afterTime2[VectorClock::Component::ConfigTime].asTimestamp(), Timestamp(0, 0));
- ASSERT_EQ(afterTime2[VectorClock::Component::TopologyTime].asTimestamp(), Timestamp(0, 0));
+ ASSERT_EQ(afterTime2[Component::ClusterTime].asTimestamp(), Timestamp(2, 2));
+ ASSERT_EQ(afterTime2[Component::ConfigTime].asTimestamp(), Timestamp(0, 0));
+ ASSERT_EQ(afterTime2[Component::TopologyTime].asTimestamp(), Timestamp(0, 0));
vc->gossipIn(nullptr,
BSON("$clusterTime"
@@ -229,16 +233,16 @@ TEST_F(VectorClockMongoDTest, GossipInInternal) {
transport::Session::kInternalClient);
auto afterTime3 = vc->getTime();
- ASSERT_EQ(afterTime3[VectorClock::Component::ClusterTime].asTimestamp(), Timestamp(3, 3));
- ASSERT_EQ(afterTime3[VectorClock::Component::ConfigTime].asTimestamp(), Timestamp(0, 0));
- ASSERT_EQ(afterTime3[VectorClock::Component::TopologyTime].asTimestamp(), Timestamp(0, 0));
+ ASSERT_EQ(afterTime3[Component::ClusterTime].asTimestamp(), Timestamp(3, 3));
+ ASSERT_EQ(afterTime3[Component::ConfigTime].asTimestamp(), Timestamp(0, 0));
+ ASSERT_EQ(afterTime3[Component::TopologyTime].asTimestamp(), Timestamp(0, 0));
}
TEST_F(VectorClockMongoDTest, GossipInExternal) {
auto sc = getServiceContext();
auto vc = VectorClockMutable::get(sc);
- vc->tick(VectorClock::Component::ClusterTime, 1);
+ vc->tick(Component::ClusterTime, 1);
auto dummySignature =
BSON("hash" << BSONBinData("\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1\1", 20, BinDataGeneral)
@@ -252,9 +256,9 @@ TEST_F(VectorClockMongoDTest, GossipInExternal) {
// On plain replset servers, gossip in from external clients should update $clusterTime, but not
// $configTime or $topologyTime.
auto afterTime = vc->getTime();
- ASSERT_EQ(afterTime[VectorClock::Component::ClusterTime].asTimestamp(), Timestamp(2, 2));
- ASSERT_EQ(afterTime[VectorClock::Component::ConfigTime].asTimestamp(), Timestamp(0, 0));
- ASSERT_EQ(afterTime[VectorClock::Component::TopologyTime].asTimestamp(), Timestamp(0, 0));
+ ASSERT_EQ(afterTime[Component::ClusterTime].asTimestamp(), Timestamp(2, 2));
+ ASSERT_EQ(afterTime[Component::ConfigTime].asTimestamp(), Timestamp(0, 0));
+ ASSERT_EQ(afterTime[Component::TopologyTime].asTimestamp(), Timestamp(0, 0));
vc->gossipIn(nullptr,
BSON("$clusterTime"
@@ -263,9 +267,9 @@ TEST_F(VectorClockMongoDTest, GossipInExternal) {
false);
auto afterTime2 = vc->getTime();
- ASSERT_EQ(afterTime2[VectorClock::Component::ClusterTime].asTimestamp(), Timestamp(2, 2));
- ASSERT_EQ(afterTime2[VectorClock::Component::ConfigTime].asTimestamp(), Timestamp(0, 0));
- ASSERT_EQ(afterTime2[VectorClock::Component::TopologyTime].asTimestamp(), Timestamp(0, 0));
+ ASSERT_EQ(afterTime2[Component::ClusterTime].asTimestamp(), Timestamp(2, 2));
+ ASSERT_EQ(afterTime2[Component::ConfigTime].asTimestamp(), Timestamp(0, 0));
+ ASSERT_EQ(afterTime2[Component::TopologyTime].asTimestamp(), Timestamp(0, 0));
vc->gossipIn(nullptr,
BSON("$clusterTime"
@@ -274,9 +278,120 @@ TEST_F(VectorClockMongoDTest, GossipInExternal) {
false);
auto afterTime3 = vc->getTime();
- ASSERT_EQ(afterTime3[VectorClock::Component::ClusterTime].asTimestamp(), Timestamp(3, 3));
- ASSERT_EQ(afterTime2[VectorClock::Component::ConfigTime].asTimestamp(), Timestamp(0, 0));
- ASSERT_EQ(afterTime3[VectorClock::Component::TopologyTime].asTimestamp(), Timestamp(0, 0));
+ ASSERT_EQ(afterTime3[Component::ClusterTime].asTimestamp(), Timestamp(3, 3));
+ ASSERT_EQ(afterTime2[Component::ConfigTime].asTimestamp(), Timestamp(0, 0));
+ ASSERT_EQ(afterTime3[Component::TopologyTime].asTimestamp(), Timestamp(0, 0));
+}
+
+TEST_F(VectorClockMongoDTest, PersistVectorClockDocument) {
+ auto sc = getServiceContext();
+ auto opCtx = operationContext();
+
+ auto vc = VectorClockMutable::get(sc);
+ vc->advanceTime_forTest(Component::ConfigTime, LogicalTime());
+ vc->advanceTime_forTest(Component::TopologyTime, LogicalTime());
+
+ NamespaceString nss(NamespaceString::kVectorClockNamespace);
+ PersistentTaskStore<VectorClockDocument> store(nss);
+
+ // Check that no vectorClockState document is present
+ ASSERT_EQUALS(store.count(opCtx, VectorClock::kStateQuery()), 0);
+
+ // Persist and check that the vectorClockState document has been persisted
+ auto future = vc->persist(opCtx);
+ future.get();
+ ASSERT_EQUALS(store.count(opCtx, VectorClock::kStateQuery()), 1);
+
+ // Check that the vectorClockState document is still one after more persist calls
+ future = vc->persist(opCtx);
+ vc->waitForInMemoryVectorClockToBePersisted(opCtx);
+ ASSERT_EQUALS(store.count(opCtx, VectorClock::kStateQuery()), 1);
+}
+
+TEST_F(VectorClockMongoDTest, RecoverVectorClockDocument) {
+ auto sc = getServiceContext();
+ auto opCtx = operationContext();
+ const auto configTime = LogicalTime(Timestamp(3, 3));
+ const auto topologyTime = LogicalTime(Timestamp(4, 4));
+
+ auto vc = VectorClockMutable::get(sc);
+ vc->advanceTime_forTest(Component::ConfigTime, configTime);
+ vc->advanceTime_forTest(Component::TopologyTime, topologyTime);
+
+ // Persist the vector clock, then reset its components
+ auto future = vc->persist(opCtx);
+ future.get(opCtx);
+ vc->resetVectorClock_forTest();
+
+ NamespaceString nss(NamespaceString::kVectorClockNamespace);
+ PersistentTaskStore<VectorClockDocument> store(nss);
+
+ future = vc->recover(opCtx);
+ vc->waitForVectorClockToBeRecovered(opCtx);
+
+ auto time = vc->getTime();
+ auto actualConfTime = time[Component::ConfigTime];
+ auto actualTopologyTime = time[Component::TopologyTime];
+
+ ASSERT_EQUALS(actualConfTime, configTime);
+ ASSERT_EQUALS(actualTopologyTime, topologyTime);
+}
+
+TEST_F(VectorClockMongoDTest, RecoverNotExistingVectorClockDocument) {
+ auto sc = getServiceContext();
+ auto opCtx = operationContext();
+ auto vc = VectorClockMutable::get(sc);
+
+ const auto configTime = LogicalTime(Timestamp(3, 3));
+ const auto topologyTime = LogicalTime(Timestamp(4, 4));
+ vc->advanceTime_forTest(Component::ConfigTime, configTime);
+ vc->advanceTime_forTest(Component::TopologyTime, topologyTime);
+
+ NamespaceString nss(NamespaceString::kVectorClockNamespace);
+ PersistentTaskStore<VectorClockDocument> store(nss);
+
+ // Check that no recovery document is stored and call recovery
+ int nDocuments = store.count(opCtx, VectorClock::kStateQuery());
+ ASSERT_EQUALS(nDocuments, 0);
+
+ auto future = vc->recover(opCtx);
+ vc->waitForVectorClockToBeRecovered(opCtx);
+
+ // Verify that times didn't change after an unsuccessful recovery
+ auto time = vc->getTime();
+ auto actualConfTime = time[Component::ConfigTime];
+ auto actualTopologyTime = time[Component::TopologyTime];
+
+ ASSERT_EQUALS(actualConfTime, configTime);
+ ASSERT_EQUALS(actualTopologyTime, topologyTime);
+}
+
+TEST_F(VectorClockMongoDTest, SubsequentPersistRecoverVectorClockDocument) {
+ auto sc = getServiceContext();
+ auto opCtx = operationContext();
+ auto vc = VectorClockMutable::get(sc);
+
+ for (int i = 1; i < 10; i++) {
+ auto newTime = LogicalTime(Timestamp(i, i));
+ vc->advanceTime_forTest(Component::ClusterTime, newTime);
+ vc->advanceTime_forTest(Component::ConfigTime, newTime);
+ vc->advanceTime_forTest(Component::TopologyTime, newTime);
+
+ // Persist the vector clock, then reset its components
+ auto future = vc->persist(opCtx);
+ future.get(opCtx);
+ vc->resetVectorClock_forTest();
+
+ future = vc->recover(opCtx);
+ vc->waitForVectorClockToBeRecovered(opCtx);
+
+ auto time = vc->getTime();
+ auto actualConfTime = time[Component::ConfigTime];
+ auto actualTopologyTime = time[Component::TopologyTime];
+
+ ASSERT_EQUALS(actualConfTime, newTime);
+ ASSERT_EQUALS(actualTopologyTime, newTime);
+ }
}
} // namespace