summaryrefslogtreecommitdiff
path: root/src/mongo/db/vector_clock_mongod.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/vector_clock_mongod.cpp')
-rw-r--r--src/mongo/db/vector_clock_mongod.cpp254
1 files changed, 254 insertions, 0 deletions
diff --git a/src/mongo/db/vector_clock_mongod.cpp b/src/mongo/db/vector_clock_mongod.cpp
index ab60403d149..c0c1caea299 100644
--- a/src/mongo/db/vector_clock_mongod.cpp
+++ b/src/mongo/db/vector_clock_mongod.cpp
@@ -26,12 +26,18 @@
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault
#include "mongo/platform/basic.h"
#include "mongo/db/logical_time_validator.h"
#include "mongo/db/repl/replica_set_aware_service.h"
+#include "mongo/db/s/persistent_task_store.h"
+#include "mongo/db/vector_clock_document_gen.h"
#include "mongo/db/vector_clock_mutable.h"
+#include "mongo/executor/task_executor_pool.h"
+#include "mongo/logv2/log.h"
+#include "mongo/util/concurrency/thread_pool.h"
namespace mongo {
namespace {
@@ -53,6 +59,12 @@ public:
LogicalTime tick(Component component, uint64_t nTicks) override;
void tickTo(Component component, LogicalTime newTime) override;
+ SharedSemiFuture<void> persist(OperationContext* opCtx) override;
+ void waitForInMemoryVectorClockToBePersisted(OperationContext* opCtx) override;
+
+ SharedSemiFuture<void> recover(OperationContext* opCtx) override;
+ void waitForVectorClockToBeRecovered(OperationContext* opCtx) override;
+
protected:
bool _gossipOutInternal(OperationContext* opCtx,
BSONObjBuilder* out,
@@ -73,6 +85,206 @@ private:
void onStepUpComplete(OperationContext* opCtx, long long term) override {}
void onStepDown() override {}
void onBecomeArbiter() override;
+
+ void _recoverComponent(OperationContext* opCtx,
+ const BSONObj& in,
+ LogicalTimeArray* newTime,
+ Component component);
+
+ void _persistComponent(OperationContext* opCtx,
+ BSONObjBuilder* out,
+ const VectorTime& time,
+ Component component) const;
+
+ static const ComponentArray<std::unique_ptr<ComponentFormat>> _vectorClockStateFormatters;
+
+ /*
+ * Manages the components persistence format, stripping field names of intitial '$' symbol.
+ */
+ class PersistenceComponentFormat : public VectorClock::ComponentFormat {
+ public:
+ using ComponentFormat::ComponentFormat;
+ virtual ~PersistenceComponentFormat() = default;
+
+ const std::string bsonFieldName = _fieldName[0] == '$' ? _fieldName.substr(1) : _fieldName;
+
+ bool out(ServiceContext* service,
+ OperationContext* opCtx,
+ bool permitRefresh,
+ BSONObjBuilder* out,
+ LogicalTime time,
+ Component component) const override {
+ out->append(bsonFieldName, time.asTimestamp());
+ return true;
+ }
+
+ LogicalTime in(ServiceContext* service,
+ OperationContext* opCtx,
+ const BSONObj& in,
+ bool couldBeUnauthenticated,
+ Component component) const override {
+ const auto componentElem(in[bsonFieldName]);
+
+ uassert(ErrorCodes::InvalidBSON,
+ str::stream() << bsonFieldName << " field not found",
+ !componentElem.eoo());
+
+ uassert(ErrorCodes::BadValue,
+ str::stream() << _fieldName << " is not a Timestamp",
+ componentElem.type() == bsonTimestamp);
+
+ return LogicalTime(componentElem.timestamp());
+ }
+ };
+
+ /*
+ * A VectorClockStateOperation represents an asynchronous operation on the VectorClockDocument
+ * guarded by a mutex. Calling threads are joining the ongoing operation or - in case no
+ * operation is in progress - scheduling a new one.
+ */
+ class VectorClockStateOperation {
+ public:
+ VectorClockStateOperation() = default;
+ ~VectorClockStateOperation() = default;
+
+ SharedSemiFuture<void> performOperation(VectorClockMongoD* vectorClock,
+ ServiceContext* serviceContext) {
+ stdx::lock_guard<Latch> lk(_opMutex);
+ _opFuture =
+ _opFuture.thenRunOn(_getExecutorPool())
+ .then([this, vectorClock, serviceContext, initialGeneration = _generation] {
+ stdx::unique_lock<Latch> lk(_opMutex);
+
+ invariant(_generation >= initialGeneration);
+ if (_generation > initialGeneration) {
+ // The last run of this operation has definitively observed the
+ // scheduling thread's in-memory state. There is no need to run
+ // this operation again.
+ return;
+ }
+ ++_generation;
+
+ lk.unlock();
+
+ ThreadClient tc("VectorClockStateOperation", serviceContext);
+ {
+ stdx::lock_guard<Client> lk(*tc.get());
+ tc->setSystemOperationKillable(lk);
+ }
+
+ const auto opCtx = tc->makeOperationContext();
+ execute(vectorClock, opCtx.get());
+ })
+ .onError([=](const Status status) {
+ LOGV2(4924402,
+ "Error while performing a VectorClockStateOperation",
+ "opName"_attr = getOperationName(),
+ "error"_attr = status);
+ return status;
+ })
+ .share();
+
+ return _opFuture;
+ }
+
+ virtual std::string getOperationName() = 0;
+
+ void waitForCompletion() {
+ _opFuture.get();
+ }
+
+ protected:
+ virtual void execute(VectorClockMongoD* vectorClock, OperationContext* opCtx) = 0;
+
+ private:
+ std::shared_ptr<ThreadPool> _getExecutorPool() {
+ static Mutex mutex = MONGO_MAKE_LATCH("VectorClockStateOperation::_executorMutex");
+ static std::shared_ptr<ThreadPool> executor;
+
+ stdx::lock_guard<Latch> lg(mutex);
+ if (!executor) {
+ ThreadPool::Options options;
+ options.poolName = "VectorClockStateOperation";
+ options.minThreads = 0;
+ options.maxThreads = 1;
+ executor = std::make_shared<ThreadPool>(std::move(options));
+ executor->startup();
+ }
+
+ return executor;
+ }
+
+ Mutex _opMutex = MONGO_MAKE_LATCH();
+ SharedSemiFuture<void> _opFuture = SharedSemiFuture<void>();
+ size_t _generation = 0;
+ };
+
+ /*
+ * VectorClockStateOperation persisting configTime and topologyTime in the VectorClockDocument.
+ */
+ class PersistOperation : public VectorClockStateOperation {
+ void execute(VectorClockMongoD* vectorClock, OperationContext* opCtx) override {
+ const auto time = vectorClock->getTime();
+
+ NamespaceString nss(NamespaceString::kVectorClockNamespace);
+ PersistentTaskStore<VectorClockDocument> store(nss);
+
+ BSONObjBuilder bob;
+ VectorClockDocument vcd;
+
+ vectorClock->_persistComponent(opCtx, &bob, time, Component::ConfigTime);
+ vectorClock->_persistComponent(opCtx, &bob, time, Component::TopologyTime);
+
+ auto obj = bob.done();
+ vcd.setVectorClock(obj);
+
+ store.update(opCtx,
+ VectorClock::kStateQuery(),
+ vcd.toBSON(),
+ WriteConcerns::kMajorityWriteConcern,
+ true);
+ }
+
+ std::string getOperationName() override {
+ return "persist";
+ }
+ };
+
+ /*
+ * VectorClockStateOperation recovering configTime and topologyTime from the
+ * VectorClockDocument.
+ */
+ class RecoverOperation : public VectorClockStateOperation {
+ void execute(VectorClockMongoD* vectorClock, OperationContext* opCtx) override {
+ NamespaceString nss(NamespaceString::kVectorClockNamespace);
+ PersistentTaskStore<VectorClockDocument> store(nss);
+
+ int nDocuments = store.count(opCtx, VectorClock::kStateQuery());
+ if (nDocuments == 0) {
+ LOGV2_DEBUG(4924403, 2, "No VectorClockDocument to recover");
+ return;
+ }
+ fassert(4924404, nDocuments == 1);
+
+ store.forEach(opCtx, VectorClock::kStateQuery(), [&](const VectorClockDocument& vcd) {
+ BSONObj obj = vcd.getVectorClock();
+
+ LogicalTimeArray newTime;
+ vectorClock->_recoverComponent(opCtx, obj, &newTime, Component::ConfigTime);
+ vectorClock->_recoverComponent(opCtx, obj, &newTime, Component::TopologyTime);
+ vectorClock->_advanceTime(std::move(newTime));
+
+ return true;
+ });
+ }
+
+ std::string getOperationName() override {
+ return "recover";
+ }
+ };
+
+ PersistOperation _persistOperation;
+ RecoverOperation _recoverOperation;
};
const auto vectorClockMongoDDecoration = ServiceContext::declareDecoration<VectorClockMongoD>();
@@ -80,6 +292,16 @@ const auto vectorClockMongoDDecoration = ServiceContext::declareDecoration<Vecto
const ReplicaSetAwareServiceRegistry::Registerer<VectorClockMongoD> vectorClockMongoDRegisterer(
"VectorClockMongoD-ReplicaSetAwareServiceRegistration");
+
+const VectorClock::ComponentArray<std::unique_ptr<VectorClock::ComponentFormat>>
+ VectorClockMongoD::_vectorClockStateFormatters{
+ std::make_unique<VectorClockMongoD::PersistenceComponentFormat>(
+ VectorClock::kClusterTimeFieldName),
+ std::make_unique<VectorClockMongoD::PersistenceComponentFormat>(
+ VectorClock::kConfigTimeFieldName),
+ std::make_unique<VectorClockMongoD::PersistenceComponentFormat>(
+ VectorClock::kTopologyTimeFieldName)};
+
VectorClockMongoD* VectorClockMongoD::get(ServiceContext* serviceContext) {
return &vectorClockMongoDDecoration(serviceContext);
}
@@ -191,5 +413,37 @@ void VectorClockMongoD::tickTo(Component component, LogicalTime newTime) {
MONGO_UNREACHABLE;
}
+void VectorClockMongoD::_persistComponent(OperationContext* opCtx,
+ BSONObjBuilder* out,
+ const VectorTime& time,
+ Component component) const {
+ _vectorClockStateFormatters[component]->out(
+ _service, opCtx, true, out, time[component], component);
+}
+
+void VectorClockMongoD::_recoverComponent(OperationContext* opCtx,
+ const BSONObj& in,
+ LogicalTimeArray* newTime,
+ Component component) {
+ (*newTime)[component] = _vectorClockStateFormatters[component]->in(
+ _service, opCtx, in, true /*couldBeUnauthenticated*/, component);
+}
+
+SharedSemiFuture<void> VectorClockMongoD::persist(OperationContext* opCtx) {
+ return _persistOperation.performOperation(this, opCtx->getServiceContext());
+}
+
+void VectorClockMongoD::waitForInMemoryVectorClockToBePersisted(OperationContext* opCtx) {
+ _persistOperation.waitForCompletion();
+}
+
+SharedSemiFuture<void> VectorClockMongoD::recover(OperationContext* opCtx) {
+ return _recoverOperation.performOperation(this, opCtx->getServiceContext());
+}
+
+void VectorClockMongoD::waitForVectorClockToBeRecovered(OperationContext* opCtx) {
+ _recoverOperation.waitForCompletion();
+};
+
} // namespace
} // namespace mongo