diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2020-08-12 06:44:01 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-08-17 16:31:03 +0000 |
commit | f37ab825928d30d911f419b9995658866bbe43e4 (patch) | |
tree | 0b5a4ff6e741e48836e43f2a644215e1da4e80d6 /src/mongo/db/vector_clock_mongod.cpp | |
parent | e1daee88b3f4ef640e677143d210432e02a8fac2 (diff) | |
download | mongo-f37ab825928d30d911f419b9995658866bbe43e4.tar.gz |
SERVER-49921 Optimised persistence and recovery of the VectorClock
Diffstat (limited to 'src/mongo/db/vector_clock_mongod.cpp')
-rw-r--r-- | src/mongo/db/vector_clock_mongod.cpp | 527 |
1 files changed, 265 insertions, 262 deletions
diff --git a/src/mongo/db/vector_clock_mongod.cpp b/src/mongo/db/vector_clock_mongod.cpp index 015a4a55b17..5f4bf1be052 100644 --- a/src/mongo/db/vector_clock_mongod.cpp +++ b/src/mongo/db/vector_clock_mongod.cpp @@ -26,6 +26,7 @@ * exception statement from all source files in the program, then also delete * it in the license file. */ + #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault #include "mongo/platform/basic.h" @@ -37,6 +38,7 @@ #include "mongo/db/s/sharding_state.h" #include "mongo/db/vector_clock_document_gen.h" #include "mongo/db/vector_clock_mutable.h" +#include "mongo/executor/scoped_task_executor.h" #include "mongo/logv2/log.h" #include "mongo/s/grid.h" @@ -54,12 +56,6 @@ public: VectorClockMongoD(); virtual ~VectorClockMongoD(); - SharedSemiFuture<void> persist() override; - void waitForInMemoryVectorClockToBePersisted() override; - - SharedSemiFuture<void> recover() override; - void waitForVectorClockToBeRecovered() override; - private: // VectorClock methods implementation @@ -81,232 +77,84 @@ private: // VectorClockMutable methods implementation + SharedSemiFuture<void> waitForDurableConfigTime() override; + SharedSemiFuture<void> waitForDurableTopologyTime() override; + SharedSemiFuture<void> waitForDurable() override; + SharedSemiFuture<void> recover() override; + LogicalTime _tick(Component component, uint64_t nTicks) override; void _tickTo(Component component, LogicalTime newTime) override; // ReplicaSetAwareService methods implementation void onStartup(OperationContext* opCtx) override {} - void onStepUpBegin(OperationContext* opCtx, long long term) override {} + void onStepUpBegin(OperationContext* opCtx, long long term) override; void onStepUpComplete(OperationContext* opCtx, long long term) override {} - void onStepDown() override {} + void onStepDown() override; void onBecomeArbiter() override; - void _recoverComponent(OperationContext* opCtx, - const BSONObj& in, - LogicalTimeArray* newTime, - Component component); - - void _persistComponent(OperationContext* opCtx, - BSONObjBuilder* out, - const VectorTime& time, - Component component) const; - - /* - * Manages the components persistence format, stripping field names of intitial '$' symbol. - */ - class PersistenceComponentFormat : public VectorClock::ComponentFormat { - public: - using ComponentFormat::ComponentFormat; - virtual ~PersistenceComponentFormat() = default; - - const std::string bsonFieldName = _fieldName[0] == '$' ? _fieldName.substr(1) : _fieldName; - - bool out(ServiceContext* service, - OperationContext* opCtx, - bool permitRefresh, - BSONObjBuilder* out, - LogicalTime time, - Component component) const override { - out->append(bsonFieldName, time.asTimestamp()); - return true; - } - - LogicalTime in(ServiceContext* service, - OperationContext* opCtx, - const BSONObj& in, - bool couldBeUnauthenticated, - Component component) const override { - const auto componentElem(in[bsonFieldName]); - - uassert(ErrorCodes::InvalidBSON, - str::stream() << bsonFieldName << " field not found", - !componentElem.eoo()); - - uassert(ErrorCodes::BadValue, - str::stream() << _fieldName << " is not a Timestamp", - componentElem.type() == bsonTimestamp); - - return LogicalTime(componentElem.timestamp()); - } - }; - - /* - * A VectorClockStateOperation represents an asynchronous operation on the VectorClockDocument - * guarded by a mutex. Calling threads are joining the ongoing operation or - in case no - * operation is in progress - scheduling a new one. - */ - class VectorClockStateOperation { - public: - VectorClockStateOperation() = default; - ~VectorClockStateOperation() = default; - - SharedSemiFuture<void> performOperation(VectorClockMongoD* vectorClock, - ServiceContext* serviceContext) { - stdx::lock_guard<Latch> lk(_opMutex); - _opFuture = - _opFuture - .thenRunOn(Grid::get(serviceContext)->getExecutorPool()->getFixedExecutor()) - .then([this, vectorClock, serviceContext, initialGeneration = _generation] { - stdx::unique_lock<Latch> lk(_opMutex); - - invariant(_generation >= initialGeneration); - if (_generation > initialGeneration) { - // The last run of this operation has definitively observed the - // scheduling thread's in-memory state. There is no need to run - // this operation again. - return; - } - ++_generation; - - lk.unlock(); - - ThreadClient tc("VectorClockStateOperation", serviceContext); - { - stdx::lock_guard<Client> lk(*tc.get()); - tc->setSystemOperationKillable(lk); - } - - const auto opCtx = tc->makeOperationContext(); - execute(vectorClock, opCtx.get()); - }) - .onError([=](const Status status) { - LOGV2(4924402, - "Error while performing a VectorClockStateOperation", - "opName"_attr = getOperationName(), - "error"_attr = status); - return status; - }) - .share(); - - return _opFuture; - } - - virtual std::string getOperationName() = 0; - - void waitForCompletion() { - _opFuture.get(); - } - - private: - virtual void execute(VectorClockMongoD* vectorClock, OperationContext* opCtx) = 0; - - Mutex _opMutex = MONGO_MAKE_LATCH("VectorClockStateOperation::_opMutex"); - - SharedSemiFuture<void> _opFuture; - - size_t _generation = 0; - }; - - /* - * VectorClockStateOperation persisting configTime and topologyTime in the VectorClockDocument. + /** + * Structure used as keys for the map of waiters for VectorClock durability. */ - class PersistOperation : public VectorClockStateOperation { - private: - void execute(VectorClockMongoD* vectorClock, OperationContext* opCtx) override { - const auto time = vectorClock->getTime(); - - NamespaceString nss(NamespaceString::kVectorClockNamespace); - PersistentTaskStore<VectorClockDocument> store(nss); - - BSONObjBuilder bob; - VectorClockDocument vcd; - - vectorClock->_persistComponent(opCtx, &bob, time, Component::ConfigTime); - vectorClock->_persistComponent(opCtx, &bob, time, Component::TopologyTime); - - auto obj = bob.done(); - vcd.setVectorClock(obj); - - store.update(opCtx, - VectorClock::stateQuery(), - vcd.toBSON(), - WriteConcerns::kMajorityWriteConcern, - true); + struct ComparableVectorTime { + bool operator<(const ComparableVectorTime& other) const { + return vt.configTime() < other.vt.configTime() || + vt.topologyTime() < other.vt.topologyTime(); } - - std::string getOperationName() override { - return "localPersist"; + bool operator>(const ComparableVectorTime& other) const { + return vt.configTime() > other.vt.configTime() || + vt.topologyTime() > other.vt.topologyTime(); } - }; - - /* - * VectorClockStateOperation invoking PersistOperation on a shard server's primary. - */ - class RemotePersistOperation : public VectorClockStateOperation { - private: - void execute(VectorClockMongoD* vectorClock, OperationContext* opCtx) override { - auto const shardingState = ShardingState::get(opCtx); - invariant(shardingState->enabled()); - - auto selfShard = uassertStatusOK( - Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardingState->shardId())); - - auto cmdResponse = uassertStatusOK(selfShard->runCommandWithFixedRetryAttempts( - opCtx, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - NamespaceString::kVectorClockNamespace.toString(), - BSON("_vectorClockPersist" << 1), - Seconds{30}, - Shard::RetryPolicy::kIdempotent)); - - uassertStatusOK(cmdResponse.commandStatus); + bool operator==(const ComparableVectorTime& other) const { + return vt.configTime() == other.vt.configTime() && + vt.topologyTime() == other.vt.topologyTime(); } - std::string getOperationName() override { - return "remotePersist"; - } + VectorTime vt; }; - /* - * VectorClockStateOperation recovering configTime and topologyTime from the - * VectorClockDocument. + /** + * The way the VectorClock durability works is by maintaining an `_queue` of callers, which wait + * for a particular VectorTime to become durable. + * + * When the queue is empty, there is no persistence activity going on. The first caller, who + * finds `_loopScheduled` to be false, will set it to true, indicating it will schedule the + * asynchronous persistence task. The asynchronous persistence task is effectively the following + * loop: + * + * while (!_queue.empty()) { + * timeToPersist = getTime(); + * persistTime(timeToPersist); + * _durableTime = timeToPersist; + * // Notify entries in _queue, whose time is <= _durableTime and remove them + * } */ - class RecoverOperation : public VectorClockStateOperation { - private: - void execute(VectorClockMongoD* vectorClock, OperationContext* opCtx) override { - NamespaceString nss(NamespaceString::kVectorClockNamespace); - PersistentTaskStore<VectorClockDocument> store(nss); - - int nDocuments = store.count(opCtx, VectorClock::stateQuery()); - if (nDocuments == 0) { - LOGV2_DEBUG(4924403, 2, "No VectorClockDocument to recover"); - return; - } - fassert(4924404, nDocuments == 1); + SharedSemiFuture<void> _enqueueWaiterAndScheduleLoopIfNeeded(stdx::unique_lock<Mutex> ul, + VectorTime time); + Future<void> _doWhileQueueNotEmptyOrError(ServiceContext* service); - store.forEach(opCtx, VectorClock::stateQuery(), [&](const VectorClockDocument& vcd) { - BSONObj obj = vcd.getVectorClock(); + // Protects the shared state below + Mutex _mutex = MONGO_MAKE_LATCH("VectorClockMongoD::_mutex"); - LogicalTimeArray newTime; - vectorClock->_recoverComponent(opCtx, obj, &newTime, Component::ConfigTime); - vectorClock->_recoverComponent(opCtx, obj, &newTime, Component::TopologyTime); - vectorClock->_advanceTime(std::move(newTime)); + // This value is incremented every time the node changes its role between primary and secondary + uint64_t _generation{0}; - return true; - }); - } + // If set to true, means that another operation already scheduled the `_queue` draining loop, if + // false it means that this operation must do it + bool _loopScheduled{false}; - std::string getOperationName() override { - return "recover"; - } - }; + // This value is only boost::none once, just after the object is constructuted. From the moment, + // the first operation schedules the `_queue`-draining loop, it will be set to a future, which + // will be signaled when the previously-scheduled `_queue` draining loop completes. + boost::optional<Future<void>> _currentWhileLoop; - static const ComponentArray<std::unique_ptr<ComponentFormat>> _vectorClockStateFormatters; + // If boost::none, means the durable time needs to be recovered from disk, otherwise contains + // the latest-known durable time + boost::optional<VectorTime> _durableTime; - PersistOperation _persistOperation; - RemotePersistOperation _remotePersistOperation; - RecoverOperation _recoverOperation; + // Queue ordered in increasing order of the VectorTimes, which are waiting to be persisted + using Queue = std::map<ComparableVectorTime, std::unique_ptr<SharedPromise<void>>>; + Queue _queue; }; const auto vectorClockMongoDDecoration = ServiceContext::declareDecoration<VectorClockMongoD>(); @@ -323,15 +171,6 @@ const ServiceContext::ConstructorActionRegisterer vectorClockMongoDRegisterer( }, {}); -const VectorClock::ComponentArray<std::unique_ptr<VectorClock::ComponentFormat>> - VectorClockMongoD::_vectorClockStateFormatters{ - std::make_unique<VectorClockMongoD::PersistenceComponentFormat>( - VectorClock::kClusterTimeFieldName), - std::make_unique<VectorClockMongoD::PersistenceComponentFormat>( - VectorClock::kConfigTimeFieldName), - std::make_unique<VectorClockMongoD::PersistenceComponentFormat>( - VectorClock::kTopologyTimeFieldName)}; - VectorClockMongoD* VectorClockMongoD::get(ServiceContext* serviceContext) { return &vectorClockMongoDDecoration(serviceContext); } @@ -340,14 +179,222 @@ VectorClockMongoD::VectorClockMongoD() = default; VectorClockMongoD::~VectorClockMongoD() = default; +void VectorClockMongoD::onStepUpBegin(OperationContext* opCtx, long long term) { + stdx::lock_guard lg(_mutex); + ++_generation; + _durableTime.reset(); +} + +void VectorClockMongoD::onStepDown() { + stdx::lock_guard lg(_mutex); + ++_generation; + _durableTime.reset(); +} + void VectorClockMongoD::onBecomeArbiter() { // The node has become an arbiter, hence will not need logical clock for external operations. _disable(); + if (auto validator = LogicalTimeValidator::get(_service)) { validator->stopKeyManager(); } } +SharedSemiFuture<void> VectorClockMongoD::waitForDurableConfigTime() { + auto time = getTime(); + + stdx::unique_lock ul(_mutex); + if (_durableTime && _durableTime->configTime() >= time.configTime()) + return SharedSemiFuture<void>(); + + return _enqueueWaiterAndScheduleLoopIfNeeded(std::move(ul), std::move(time)); +} + +SharedSemiFuture<void> VectorClockMongoD::waitForDurableTopologyTime() { + auto time = getTime(); + + stdx::unique_lock ul(_mutex); + if (_durableTime && _durableTime->topologyTime() >= time.topologyTime()) + return SharedSemiFuture<void>(); + + return _enqueueWaiterAndScheduleLoopIfNeeded(std::move(ul), std::move(time)); +} + +SharedSemiFuture<void> VectorClockMongoD::waitForDurable() { + auto time = getTime(); + + stdx::unique_lock ul(_mutex); + if (_durableTime && _durableTime->configTime() >= time.configTime() && + _durableTime->topologyTime() >= time.topologyTime()) + return SharedSemiFuture<void>(); + + return _enqueueWaiterAndScheduleLoopIfNeeded(std::move(ul), std::move(time)); +} + +SharedSemiFuture<void> VectorClockMongoD::recover() { + stdx::unique_lock ul(_mutex); + if (_durableTime) + return SharedSemiFuture<void>(); + + return _enqueueWaiterAndScheduleLoopIfNeeded(std::move(ul), VectorTime()); +} + +SharedSemiFuture<void> VectorClockMongoD::_enqueueWaiterAndScheduleLoopIfNeeded( + stdx::unique_lock<Mutex> ul, VectorTime time) { + auto [it, unusedEmplaced] = + _queue.try_emplace({std::move(time)}, std::make_unique<SharedPromise<void>>()); + + if (!_loopScheduled) { + _loopScheduled = true; + + auto joinPreviousLoop(_currentWhileLoop ? std::move(*_currentWhileLoop) + : Future<void>::makeReady()); + + _currentWhileLoop.emplace(std::move(joinPreviousLoop).onCompletion([this](auto) { + return _doWhileQueueNotEmptyOrError(vectorClockMongoDDecoration.owner(this)); + })); + } + + return it->second->getFuture(); +} + +Future<void> VectorClockMongoD::_doWhileQueueNotEmptyOrError(ServiceContext* service) { + auto [p, f] = makePromiseFuture<std::pair<uint64_t, VectorTime>>(); + auto future = std::move(f) + .then([this](std::pair<uint64_t, VectorTime> newDurableTime) { + stdx::unique_lock ul(_mutex); + uassert(ErrorCodes::InterruptedDueToReplStateChange, + "VectorClock failed to persist due to stepdown", + newDurableTime.first == _generation); + + _durableTime.emplace(newDurableTime.second); + + ComparableVectorTime time{*_durableTime}; + + std::vector<Queue::value_type::second_type> promises; + for (auto it = _queue.begin(); it != _queue.end();) { + if (it->first > time) + break; + promises.emplace_back(std::move(it->second)); + it = _queue.erase(it); + } + ul.unlock(); + + // Make sure the VectorClock advances at least up to the just recovered + // durable time + _advanceTime({newDurableTime.second.clusterTime(), + newDurableTime.second.configTime(), + newDurableTime.second.topologyTime()}); + + for (auto& p : promises) + p->emplaceValue(); + }) + .onError([this](Status status) { + stdx::unique_lock ul(_mutex); + std::vector<Queue::value_type::second_type> promises; + for (auto it = _queue.begin(); it != _queue.end();) { + promises.emplace_back(std::move(it->second)); + it = _queue.erase(it); + } + ul.unlock(); + + for (auto& p : promises) + p->setError(status); + }) + .onCompletion([this, service](auto) { + { + stdx::lock_guard lg(_mutex); + if (_queue.empty()) { + _loopScheduled = false; + return Future<void>::makeReady(); + } + } + return _doWhileQueueNotEmptyOrError(service); + }); + + // Blocking work to recover and/or persist the current vector time + ExecutorFuture<void>(Grid::get(service)->getExecutorPool()->getFixedExecutor()) + .then([this, service] { + auto [generation, mustRecoverDurableTime] = [&] { + stdx::lock_guard lg(_mutex); + return std::make_pair(_generation, !_durableTime); + }(); + + ThreadClient tc("VectorClockStateOperation", service); + + { + stdx::lock_guard<Client> lk(*tc.get()); + tc->setSystemOperationKillable(lk); + } + + const auto opCtxHolder = tc->makeOperationContext(); + auto* const opCtx = opCtxHolder.get(); + + if (mustRecoverDurableTime) { + VectorClockDocument durableVectorClock; + + PersistentTaskStore<VectorClockDocument> store( + NamespaceString::kVectorClockNamespace); + store.forEach( + opCtx, + QUERY(VectorClockDocument::k_idFieldName << durableVectorClock.get_id()), + [&, numDocsFound = 0](const auto& doc) mutable { + invariant(++numDocsFound == 1); + durableVectorClock = doc; + return true; + }); + + return std::make_pair( + generation, + VectorTime({LogicalTime(Timestamp(0)), + LogicalTime(durableVectorClock.getConfigTime()), + LogicalTime(durableVectorClock.getTopologyTime())})); + } else { + auto vectorTime = getTime(); + + auto* const replCoord = repl::ReplicationCoordinator::get(opCtx); + if (replCoord->getMemberState().primary()) { + // Persist as primary + const VectorClockDocument vcd(vectorTime.configTime().asTimestamp(), + vectorTime.topologyTime().asTimestamp()); + + PersistentTaskStore<VectorClockDocument> store( + NamespaceString::kVectorClockNamespace); + store.update(opCtx, + QUERY(VectorClockDocument::k_idFieldName << vcd.get_id()), + vcd.toBSON(), + WriteConcerns::kMajorityWriteConcern, + true /* upsert */); + } else { + // Persist as secondary, by asking the primary + auto const shardingState = ShardingState::get(opCtx); + invariant(shardingState->enabled()); + + auto selfShard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard( + opCtx, shardingState->shardId())); + + auto cmdResponse = uassertStatusOK(selfShard->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + NamespaceString::kVectorClockNamespace.toString(), + BSON("_vectorClockPersist" << 1), + Seconds{30}, + Shard::RetryPolicy::kIdempotent)); + + uassertStatusOK(cmdResponse.commandStatus); + } + + return std::make_pair(generation, vectorTime); + } + }) + .getAsync([this, promise = std::move(p)]( + StatusWith<std::pair<uint64_t, VectorTime>> swResult) mutable { + promise.setFromStatusWith(std::move(swResult)); + }); + + return future; +} + bool VectorClockMongoD::_gossipOutInternal(OperationContext* opCtx, BSONObjBuilder* out, const LogicalTimeArray& time) const { @@ -430,49 +477,5 @@ void VectorClockMongoD::_tickTo(Component component, LogicalTime newTime) { MONGO_UNREACHABLE; } -void VectorClockMongoD::_persistComponent(OperationContext* opCtx, - BSONObjBuilder* out, - const VectorTime& time, - Component component) const { - _vectorClockStateFormatters[component]->out( - _service, opCtx, true, out, time[component], component); -} - -void VectorClockMongoD::_recoverComponent(OperationContext* opCtx, - const BSONObj& in, - LogicalTimeArray* newTime, - Component component) { - (*newTime)[component] = _vectorClockStateFormatters[component]->in( - _service, opCtx, in, true /*couldBeUnauthenticated*/, component); -} - -SharedSemiFuture<void> VectorClockMongoD::persist() { - if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { - auto serviceContext = vectorClockMongoDDecoration.owner(this); - - const auto replCoord = repl::ReplicationCoordinator::get(serviceContext); - if (replCoord->getMemberState().primary()) { - return _persistOperation.performOperation(this, serviceContext); - } - - return _remotePersistOperation.performOperation(this, serviceContext); - } - - return SharedSemiFuture<void>(); -} - -void VectorClockMongoD::waitForInMemoryVectorClockToBePersisted() { - _persistOperation.waitForCompletion(); -} - -SharedSemiFuture<void> VectorClockMongoD::recover() { - auto serviceContext = vectorClockMongoDDecoration.owner(this); - return _recoverOperation.performOperation(this, serviceContext); -} - -void VectorClockMongoD::waitForVectorClockToBeRecovered() { - _recoverOperation.waitForCompletion(); -} - } // namespace } // namespace mongo |