diff options
Diffstat (limited to 'src/mongo/db/vector_clock_mongod.cpp')
-rw-r--r-- | src/mongo/db/vector_clock_mongod.cpp | 254 |
1 files changed, 254 insertions, 0 deletions
diff --git a/src/mongo/db/vector_clock_mongod.cpp b/src/mongo/db/vector_clock_mongod.cpp index ab60403d149..c0c1caea299 100644 --- a/src/mongo/db/vector_clock_mongod.cpp +++ b/src/mongo/db/vector_clock_mongod.cpp @@ -26,12 +26,18 @@ * exception statement from all source files in the program, then also delete * it in the license file. */ +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault #include "mongo/platform/basic.h" #include "mongo/db/logical_time_validator.h" #include "mongo/db/repl/replica_set_aware_service.h" +#include "mongo/db/s/persistent_task_store.h" +#include "mongo/db/vector_clock_document_gen.h" #include "mongo/db/vector_clock_mutable.h" +#include "mongo/executor/task_executor_pool.h" +#include "mongo/logv2/log.h" +#include "mongo/util/concurrency/thread_pool.h" namespace mongo { namespace { @@ -53,6 +59,12 @@ public: LogicalTime tick(Component component, uint64_t nTicks) override; void tickTo(Component component, LogicalTime newTime) override; + SharedSemiFuture<void> persist(OperationContext* opCtx) override; + void waitForInMemoryVectorClockToBePersisted(OperationContext* opCtx) override; + + SharedSemiFuture<void> recover(OperationContext* opCtx) override; + void waitForVectorClockToBeRecovered(OperationContext* opCtx) override; + protected: bool _gossipOutInternal(OperationContext* opCtx, BSONObjBuilder* out, @@ -73,6 +85,206 @@ private: void onStepUpComplete(OperationContext* opCtx, long long term) override {} void onStepDown() override {} void onBecomeArbiter() override; + + void _recoverComponent(OperationContext* opCtx, + const BSONObj& in, + LogicalTimeArray* newTime, + Component component); + + void _persistComponent(OperationContext* opCtx, + BSONObjBuilder* out, + const VectorTime& time, + Component component) const; + + static const ComponentArray<std::unique_ptr<ComponentFormat>> _vectorClockStateFormatters; + + /* + * Manages the components persistence format, stripping field names of intitial '$' symbol. + */ + class PersistenceComponentFormat : public VectorClock::ComponentFormat { + public: + using ComponentFormat::ComponentFormat; + virtual ~PersistenceComponentFormat() = default; + + const std::string bsonFieldName = _fieldName[0] == '$' ? _fieldName.substr(1) : _fieldName; + + bool out(ServiceContext* service, + OperationContext* opCtx, + bool permitRefresh, + BSONObjBuilder* out, + LogicalTime time, + Component component) const override { + out->append(bsonFieldName, time.asTimestamp()); + return true; + } + + LogicalTime in(ServiceContext* service, + OperationContext* opCtx, + const BSONObj& in, + bool couldBeUnauthenticated, + Component component) const override { + const auto componentElem(in[bsonFieldName]); + + uassert(ErrorCodes::InvalidBSON, + str::stream() << bsonFieldName << " field not found", + !componentElem.eoo()); + + uassert(ErrorCodes::BadValue, + str::stream() << _fieldName << " is not a Timestamp", + componentElem.type() == bsonTimestamp); + + return LogicalTime(componentElem.timestamp()); + } + }; + + /* + * A VectorClockStateOperation represents an asynchronous operation on the VectorClockDocument + * guarded by a mutex. Calling threads are joining the ongoing operation or - in case no + * operation is in progress - scheduling a new one. + */ + class VectorClockStateOperation { + public: + VectorClockStateOperation() = default; + ~VectorClockStateOperation() = default; + + SharedSemiFuture<void> performOperation(VectorClockMongoD* vectorClock, + ServiceContext* serviceContext) { + stdx::lock_guard<Latch> lk(_opMutex); + _opFuture = + _opFuture.thenRunOn(_getExecutorPool()) + .then([this, vectorClock, serviceContext, initialGeneration = _generation] { + stdx::unique_lock<Latch> lk(_opMutex); + + invariant(_generation >= initialGeneration); + if (_generation > initialGeneration) { + // The last run of this operation has definitively observed the + // scheduling thread's in-memory state. There is no need to run + // this operation again. + return; + } + ++_generation; + + lk.unlock(); + + ThreadClient tc("VectorClockStateOperation", serviceContext); + { + stdx::lock_guard<Client> lk(*tc.get()); + tc->setSystemOperationKillable(lk); + } + + const auto opCtx = tc->makeOperationContext(); + execute(vectorClock, opCtx.get()); + }) + .onError([=](const Status status) { + LOGV2(4924402, + "Error while performing a VectorClockStateOperation", + "opName"_attr = getOperationName(), + "error"_attr = status); + return status; + }) + .share(); + + return _opFuture; + } + + virtual std::string getOperationName() = 0; + + void waitForCompletion() { + _opFuture.get(); + } + + protected: + virtual void execute(VectorClockMongoD* vectorClock, OperationContext* opCtx) = 0; + + private: + std::shared_ptr<ThreadPool> _getExecutorPool() { + static Mutex mutex = MONGO_MAKE_LATCH("VectorClockStateOperation::_executorMutex"); + static std::shared_ptr<ThreadPool> executor; + + stdx::lock_guard<Latch> lg(mutex); + if (!executor) { + ThreadPool::Options options; + options.poolName = "VectorClockStateOperation"; + options.minThreads = 0; + options.maxThreads = 1; + executor = std::make_shared<ThreadPool>(std::move(options)); + executor->startup(); + } + + return executor; + } + + Mutex _opMutex = MONGO_MAKE_LATCH(); + SharedSemiFuture<void> _opFuture = SharedSemiFuture<void>(); + size_t _generation = 0; + }; + + /* + * VectorClockStateOperation persisting configTime and topologyTime in the VectorClockDocument. + */ + class PersistOperation : public VectorClockStateOperation { + void execute(VectorClockMongoD* vectorClock, OperationContext* opCtx) override { + const auto time = vectorClock->getTime(); + + NamespaceString nss(NamespaceString::kVectorClockNamespace); + PersistentTaskStore<VectorClockDocument> store(nss); + + BSONObjBuilder bob; + VectorClockDocument vcd; + + vectorClock->_persistComponent(opCtx, &bob, time, Component::ConfigTime); + vectorClock->_persistComponent(opCtx, &bob, time, Component::TopologyTime); + + auto obj = bob.done(); + vcd.setVectorClock(obj); + + store.update(opCtx, + VectorClock::kStateQuery(), + vcd.toBSON(), + WriteConcerns::kMajorityWriteConcern, + true); + } + + std::string getOperationName() override { + return "persist"; + } + }; + + /* + * VectorClockStateOperation recovering configTime and topologyTime from the + * VectorClockDocument. + */ + class RecoverOperation : public VectorClockStateOperation { + void execute(VectorClockMongoD* vectorClock, OperationContext* opCtx) override { + NamespaceString nss(NamespaceString::kVectorClockNamespace); + PersistentTaskStore<VectorClockDocument> store(nss); + + int nDocuments = store.count(opCtx, VectorClock::kStateQuery()); + if (nDocuments == 0) { + LOGV2_DEBUG(4924403, 2, "No VectorClockDocument to recover"); + return; + } + fassert(4924404, nDocuments == 1); + + store.forEach(opCtx, VectorClock::kStateQuery(), [&](const VectorClockDocument& vcd) { + BSONObj obj = vcd.getVectorClock(); + + LogicalTimeArray newTime; + vectorClock->_recoverComponent(opCtx, obj, &newTime, Component::ConfigTime); + vectorClock->_recoverComponent(opCtx, obj, &newTime, Component::TopologyTime); + vectorClock->_advanceTime(std::move(newTime)); + + return true; + }); + } + + std::string getOperationName() override { + return "recover"; + } + }; + + PersistOperation _persistOperation; + RecoverOperation _recoverOperation; }; const auto vectorClockMongoDDecoration = ServiceContext::declareDecoration<VectorClockMongoD>(); @@ -80,6 +292,16 @@ const auto vectorClockMongoDDecoration = ServiceContext::declareDecoration<Vecto const ReplicaSetAwareServiceRegistry::Registerer<VectorClockMongoD> vectorClockMongoDRegisterer( "VectorClockMongoD-ReplicaSetAwareServiceRegistration"); + +const VectorClock::ComponentArray<std::unique_ptr<VectorClock::ComponentFormat>> + VectorClockMongoD::_vectorClockStateFormatters{ + std::make_unique<VectorClockMongoD::PersistenceComponentFormat>( + VectorClock::kClusterTimeFieldName), + std::make_unique<VectorClockMongoD::PersistenceComponentFormat>( + VectorClock::kConfigTimeFieldName), + std::make_unique<VectorClockMongoD::PersistenceComponentFormat>( + VectorClock::kTopologyTimeFieldName)}; + VectorClockMongoD* VectorClockMongoD::get(ServiceContext* serviceContext) { return &vectorClockMongoDDecoration(serviceContext); } @@ -191,5 +413,37 @@ void VectorClockMongoD::tickTo(Component component, LogicalTime newTime) { MONGO_UNREACHABLE; } +void VectorClockMongoD::_persistComponent(OperationContext* opCtx, + BSONObjBuilder* out, + const VectorTime& time, + Component component) const { + _vectorClockStateFormatters[component]->out( + _service, opCtx, true, out, time[component], component); +} + +void VectorClockMongoD::_recoverComponent(OperationContext* opCtx, + const BSONObj& in, + LogicalTimeArray* newTime, + Component component) { + (*newTime)[component] = _vectorClockStateFormatters[component]->in( + _service, opCtx, in, true /*couldBeUnauthenticated*/, component); +} + +SharedSemiFuture<void> VectorClockMongoD::persist(OperationContext* opCtx) { + return _persistOperation.performOperation(this, opCtx->getServiceContext()); +} + +void VectorClockMongoD::waitForInMemoryVectorClockToBePersisted(OperationContext* opCtx) { + _persistOperation.waitForCompletion(); +} + +SharedSemiFuture<void> VectorClockMongoD::recover(OperationContext* opCtx) { + return _recoverOperation.performOperation(this, opCtx->getServiceContext()); +} + +void VectorClockMongoD::waitForVectorClockToBeRecovered(OperationContext* opCtx) { + _recoverOperation.waitForCompletion(); +}; + } // namespace } // namespace mongo |