summaryrefslogtreecommitdiff
path: root/src/mongo/db/vector_clock_mongod.cpp
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2020-08-12 06:44:01 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-08-17 16:31:03 +0000
commitf37ab825928d30d911f419b9995658866bbe43e4 (patch)
tree0b5a4ff6e741e48836e43f2a644215e1da4e80d6 /src/mongo/db/vector_clock_mongod.cpp
parente1daee88b3f4ef640e677143d210432e02a8fac2 (diff)
downloadmongo-f37ab825928d30d911f419b9995658866bbe43e4.tar.gz
SERVER-49921 Optimised persistence and recovery of the VectorClock
Diffstat (limited to 'src/mongo/db/vector_clock_mongod.cpp')
-rw-r--r--src/mongo/db/vector_clock_mongod.cpp527
1 files changed, 265 insertions, 262 deletions
diff --git a/src/mongo/db/vector_clock_mongod.cpp b/src/mongo/db/vector_clock_mongod.cpp
index 015a4a55b17..5f4bf1be052 100644
--- a/src/mongo/db/vector_clock_mongod.cpp
+++ b/src/mongo/db/vector_clock_mongod.cpp
@@ -26,6 +26,7 @@
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
+
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault
#include "mongo/platform/basic.h"
@@ -37,6 +38,7 @@
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/vector_clock_document_gen.h"
#include "mongo/db/vector_clock_mutable.h"
+#include "mongo/executor/scoped_task_executor.h"
#include "mongo/logv2/log.h"
#include "mongo/s/grid.h"
@@ -54,12 +56,6 @@ public:
VectorClockMongoD();
virtual ~VectorClockMongoD();
- SharedSemiFuture<void> persist() override;
- void waitForInMemoryVectorClockToBePersisted() override;
-
- SharedSemiFuture<void> recover() override;
- void waitForVectorClockToBeRecovered() override;
-
private:
// VectorClock methods implementation
@@ -81,232 +77,84 @@ private:
// VectorClockMutable methods implementation
+ SharedSemiFuture<void> waitForDurableConfigTime() override;
+ SharedSemiFuture<void> waitForDurableTopologyTime() override;
+ SharedSemiFuture<void> waitForDurable() override;
+ SharedSemiFuture<void> recover() override;
+
LogicalTime _tick(Component component, uint64_t nTicks) override;
void _tickTo(Component component, LogicalTime newTime) override;
// ReplicaSetAwareService methods implementation
void onStartup(OperationContext* opCtx) override {}
- void onStepUpBegin(OperationContext* opCtx, long long term) override {}
+ void onStepUpBegin(OperationContext* opCtx, long long term) override;
void onStepUpComplete(OperationContext* opCtx, long long term) override {}
- void onStepDown() override {}
+ void onStepDown() override;
void onBecomeArbiter() override;
- void _recoverComponent(OperationContext* opCtx,
- const BSONObj& in,
- LogicalTimeArray* newTime,
- Component component);
-
- void _persistComponent(OperationContext* opCtx,
- BSONObjBuilder* out,
- const VectorTime& time,
- Component component) const;
-
- /*
- * Manages the components persistence format, stripping field names of intitial '$' symbol.
- */
- class PersistenceComponentFormat : public VectorClock::ComponentFormat {
- public:
- using ComponentFormat::ComponentFormat;
- virtual ~PersistenceComponentFormat() = default;
-
- const std::string bsonFieldName = _fieldName[0] == '$' ? _fieldName.substr(1) : _fieldName;
-
- bool out(ServiceContext* service,
- OperationContext* opCtx,
- bool permitRefresh,
- BSONObjBuilder* out,
- LogicalTime time,
- Component component) const override {
- out->append(bsonFieldName, time.asTimestamp());
- return true;
- }
-
- LogicalTime in(ServiceContext* service,
- OperationContext* opCtx,
- const BSONObj& in,
- bool couldBeUnauthenticated,
- Component component) const override {
- const auto componentElem(in[bsonFieldName]);
-
- uassert(ErrorCodes::InvalidBSON,
- str::stream() << bsonFieldName << " field not found",
- !componentElem.eoo());
-
- uassert(ErrorCodes::BadValue,
- str::stream() << _fieldName << " is not a Timestamp",
- componentElem.type() == bsonTimestamp);
-
- return LogicalTime(componentElem.timestamp());
- }
- };
-
- /*
- * A VectorClockStateOperation represents an asynchronous operation on the VectorClockDocument
- * guarded by a mutex. Calling threads are joining the ongoing operation or - in case no
- * operation is in progress - scheduling a new one.
- */
- class VectorClockStateOperation {
- public:
- VectorClockStateOperation() = default;
- ~VectorClockStateOperation() = default;
-
- SharedSemiFuture<void> performOperation(VectorClockMongoD* vectorClock,
- ServiceContext* serviceContext) {
- stdx::lock_guard<Latch> lk(_opMutex);
- _opFuture =
- _opFuture
- .thenRunOn(Grid::get(serviceContext)->getExecutorPool()->getFixedExecutor())
- .then([this, vectorClock, serviceContext, initialGeneration = _generation] {
- stdx::unique_lock<Latch> lk(_opMutex);
-
- invariant(_generation >= initialGeneration);
- if (_generation > initialGeneration) {
- // The last run of this operation has definitively observed the
- // scheduling thread's in-memory state. There is no need to run
- // this operation again.
- return;
- }
- ++_generation;
-
- lk.unlock();
-
- ThreadClient tc("VectorClockStateOperation", serviceContext);
- {
- stdx::lock_guard<Client> lk(*tc.get());
- tc->setSystemOperationKillable(lk);
- }
-
- const auto opCtx = tc->makeOperationContext();
- execute(vectorClock, opCtx.get());
- })
- .onError([=](const Status status) {
- LOGV2(4924402,
- "Error while performing a VectorClockStateOperation",
- "opName"_attr = getOperationName(),
- "error"_attr = status);
- return status;
- })
- .share();
-
- return _opFuture;
- }
-
- virtual std::string getOperationName() = 0;
-
- void waitForCompletion() {
- _opFuture.get();
- }
-
- private:
- virtual void execute(VectorClockMongoD* vectorClock, OperationContext* opCtx) = 0;
-
- Mutex _opMutex = MONGO_MAKE_LATCH("VectorClockStateOperation::_opMutex");
-
- SharedSemiFuture<void> _opFuture;
-
- size_t _generation = 0;
- };
-
- /*
- * VectorClockStateOperation persisting configTime and topologyTime in the VectorClockDocument.
+ /**
+ * Structure used as keys for the map of waiters for VectorClock durability.
*/
- class PersistOperation : public VectorClockStateOperation {
- private:
- void execute(VectorClockMongoD* vectorClock, OperationContext* opCtx) override {
- const auto time = vectorClock->getTime();
-
- NamespaceString nss(NamespaceString::kVectorClockNamespace);
- PersistentTaskStore<VectorClockDocument> store(nss);
-
- BSONObjBuilder bob;
- VectorClockDocument vcd;
-
- vectorClock->_persistComponent(opCtx, &bob, time, Component::ConfigTime);
- vectorClock->_persistComponent(opCtx, &bob, time, Component::TopologyTime);
-
- auto obj = bob.done();
- vcd.setVectorClock(obj);
-
- store.update(opCtx,
- VectorClock::stateQuery(),
- vcd.toBSON(),
- WriteConcerns::kMajorityWriteConcern,
- true);
+ struct ComparableVectorTime {
+ bool operator<(const ComparableVectorTime& other) const {
+ return vt.configTime() < other.vt.configTime() ||
+ vt.topologyTime() < other.vt.topologyTime();
}
-
- std::string getOperationName() override {
- return "localPersist";
+ bool operator>(const ComparableVectorTime& other) const {
+ return vt.configTime() > other.vt.configTime() ||
+ vt.topologyTime() > other.vt.topologyTime();
}
- };
-
- /*
- * VectorClockStateOperation invoking PersistOperation on a shard server's primary.
- */
- class RemotePersistOperation : public VectorClockStateOperation {
- private:
- void execute(VectorClockMongoD* vectorClock, OperationContext* opCtx) override {
- auto const shardingState = ShardingState::get(opCtx);
- invariant(shardingState->enabled());
-
- auto selfShard = uassertStatusOK(
- Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardingState->shardId()));
-
- auto cmdResponse = uassertStatusOK(selfShard->runCommandWithFixedRetryAttempts(
- opCtx,
- ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- NamespaceString::kVectorClockNamespace.toString(),
- BSON("_vectorClockPersist" << 1),
- Seconds{30},
- Shard::RetryPolicy::kIdempotent));
-
- uassertStatusOK(cmdResponse.commandStatus);
+ bool operator==(const ComparableVectorTime& other) const {
+ return vt.configTime() == other.vt.configTime() &&
+ vt.topologyTime() == other.vt.topologyTime();
}
- std::string getOperationName() override {
- return "remotePersist";
- }
+ VectorTime vt;
};
- /*
- * VectorClockStateOperation recovering configTime and topologyTime from the
- * VectorClockDocument.
+ /**
+ * The way the VectorClock durability works is by maintaining an `_queue` of callers, which wait
+ * for a particular VectorTime to become durable.
+ *
+ * When the queue is empty, there is no persistence activity going on. The first caller, who
+ * finds `_loopScheduled` to be false, will set it to true, indicating it will schedule the
+ * asynchronous persistence task. The asynchronous persistence task is effectively the following
+ * loop:
+ *
+ * while (!_queue.empty()) {
+ * timeToPersist = getTime();
+ * persistTime(timeToPersist);
+ * _durableTime = timeToPersist;
+ * // Notify entries in _queue, whose time is <= _durableTime and remove them
+ * }
*/
- class RecoverOperation : public VectorClockStateOperation {
- private:
- void execute(VectorClockMongoD* vectorClock, OperationContext* opCtx) override {
- NamespaceString nss(NamespaceString::kVectorClockNamespace);
- PersistentTaskStore<VectorClockDocument> store(nss);
-
- int nDocuments = store.count(opCtx, VectorClock::stateQuery());
- if (nDocuments == 0) {
- LOGV2_DEBUG(4924403, 2, "No VectorClockDocument to recover");
- return;
- }
- fassert(4924404, nDocuments == 1);
+ SharedSemiFuture<void> _enqueueWaiterAndScheduleLoopIfNeeded(stdx::unique_lock<Mutex> ul,
+ VectorTime time);
+ Future<void> _doWhileQueueNotEmptyOrError(ServiceContext* service);
- store.forEach(opCtx, VectorClock::stateQuery(), [&](const VectorClockDocument& vcd) {
- BSONObj obj = vcd.getVectorClock();
+ // Protects the shared state below
+ Mutex _mutex = MONGO_MAKE_LATCH("VectorClockMongoD::_mutex");
- LogicalTimeArray newTime;
- vectorClock->_recoverComponent(opCtx, obj, &newTime, Component::ConfigTime);
- vectorClock->_recoverComponent(opCtx, obj, &newTime, Component::TopologyTime);
- vectorClock->_advanceTime(std::move(newTime));
+ // This value is incremented every time the node changes its role between primary and secondary
+ uint64_t _generation{0};
- return true;
- });
- }
+ // If set to true, means that another operation already scheduled the `_queue` draining loop, if
+ // false it means that this operation must do it
+ bool _loopScheduled{false};
- std::string getOperationName() override {
- return "recover";
- }
- };
+ // This value is only boost::none once, just after the object is constructuted. From the moment,
+ // the first operation schedules the `_queue`-draining loop, it will be set to a future, which
+ // will be signaled when the previously-scheduled `_queue` draining loop completes.
+ boost::optional<Future<void>> _currentWhileLoop;
- static const ComponentArray<std::unique_ptr<ComponentFormat>> _vectorClockStateFormatters;
+ // If boost::none, means the durable time needs to be recovered from disk, otherwise contains
+ // the latest-known durable time
+ boost::optional<VectorTime> _durableTime;
- PersistOperation _persistOperation;
- RemotePersistOperation _remotePersistOperation;
- RecoverOperation _recoverOperation;
+ // Queue ordered in increasing order of the VectorTimes, which are waiting to be persisted
+ using Queue = std::map<ComparableVectorTime, std::unique_ptr<SharedPromise<void>>>;
+ Queue _queue;
};
const auto vectorClockMongoDDecoration = ServiceContext::declareDecoration<VectorClockMongoD>();
@@ -323,15 +171,6 @@ const ServiceContext::ConstructorActionRegisterer vectorClockMongoDRegisterer(
},
{});
-const VectorClock::ComponentArray<std::unique_ptr<VectorClock::ComponentFormat>>
- VectorClockMongoD::_vectorClockStateFormatters{
- std::make_unique<VectorClockMongoD::PersistenceComponentFormat>(
- VectorClock::kClusterTimeFieldName),
- std::make_unique<VectorClockMongoD::PersistenceComponentFormat>(
- VectorClock::kConfigTimeFieldName),
- std::make_unique<VectorClockMongoD::PersistenceComponentFormat>(
- VectorClock::kTopologyTimeFieldName)};
-
VectorClockMongoD* VectorClockMongoD::get(ServiceContext* serviceContext) {
return &vectorClockMongoDDecoration(serviceContext);
}
@@ -340,14 +179,222 @@ VectorClockMongoD::VectorClockMongoD() = default;
VectorClockMongoD::~VectorClockMongoD() = default;
+void VectorClockMongoD::onStepUpBegin(OperationContext* opCtx, long long term) {
+ stdx::lock_guard lg(_mutex);
+ ++_generation;
+ _durableTime.reset();
+}
+
+void VectorClockMongoD::onStepDown() {
+ stdx::lock_guard lg(_mutex);
+ ++_generation;
+ _durableTime.reset();
+}
+
void VectorClockMongoD::onBecomeArbiter() {
// The node has become an arbiter, hence will not need logical clock for external operations.
_disable();
+
if (auto validator = LogicalTimeValidator::get(_service)) {
validator->stopKeyManager();
}
}
+SharedSemiFuture<void> VectorClockMongoD::waitForDurableConfigTime() {
+ auto time = getTime();
+
+ stdx::unique_lock ul(_mutex);
+ if (_durableTime && _durableTime->configTime() >= time.configTime())
+ return SharedSemiFuture<void>();
+
+ return _enqueueWaiterAndScheduleLoopIfNeeded(std::move(ul), std::move(time));
+}
+
+SharedSemiFuture<void> VectorClockMongoD::waitForDurableTopologyTime() {
+ auto time = getTime();
+
+ stdx::unique_lock ul(_mutex);
+ if (_durableTime && _durableTime->topologyTime() >= time.topologyTime())
+ return SharedSemiFuture<void>();
+
+ return _enqueueWaiterAndScheduleLoopIfNeeded(std::move(ul), std::move(time));
+}
+
+SharedSemiFuture<void> VectorClockMongoD::waitForDurable() {
+ auto time = getTime();
+
+ stdx::unique_lock ul(_mutex);
+ if (_durableTime && _durableTime->configTime() >= time.configTime() &&
+ _durableTime->topologyTime() >= time.topologyTime())
+ return SharedSemiFuture<void>();
+
+ return _enqueueWaiterAndScheduleLoopIfNeeded(std::move(ul), std::move(time));
+}
+
+SharedSemiFuture<void> VectorClockMongoD::recover() {
+ stdx::unique_lock ul(_mutex);
+ if (_durableTime)
+ return SharedSemiFuture<void>();
+
+ return _enqueueWaiterAndScheduleLoopIfNeeded(std::move(ul), VectorTime());
+}
+
+SharedSemiFuture<void> VectorClockMongoD::_enqueueWaiterAndScheduleLoopIfNeeded(
+ stdx::unique_lock<Mutex> ul, VectorTime time) {
+ auto [it, unusedEmplaced] =
+ _queue.try_emplace({std::move(time)}, std::make_unique<SharedPromise<void>>());
+
+ if (!_loopScheduled) {
+ _loopScheduled = true;
+
+ auto joinPreviousLoop(_currentWhileLoop ? std::move(*_currentWhileLoop)
+ : Future<void>::makeReady());
+
+ _currentWhileLoop.emplace(std::move(joinPreviousLoop).onCompletion([this](auto) {
+ return _doWhileQueueNotEmptyOrError(vectorClockMongoDDecoration.owner(this));
+ }));
+ }
+
+ return it->second->getFuture();
+}
+
+Future<void> VectorClockMongoD::_doWhileQueueNotEmptyOrError(ServiceContext* service) {
+ auto [p, f] = makePromiseFuture<std::pair<uint64_t, VectorTime>>();
+ auto future = std::move(f)
+ .then([this](std::pair<uint64_t, VectorTime> newDurableTime) {
+ stdx::unique_lock ul(_mutex);
+ uassert(ErrorCodes::InterruptedDueToReplStateChange,
+ "VectorClock failed to persist due to stepdown",
+ newDurableTime.first == _generation);
+
+ _durableTime.emplace(newDurableTime.second);
+
+ ComparableVectorTime time{*_durableTime};
+
+ std::vector<Queue::value_type::second_type> promises;
+ for (auto it = _queue.begin(); it != _queue.end();) {
+ if (it->first > time)
+ break;
+ promises.emplace_back(std::move(it->second));
+ it = _queue.erase(it);
+ }
+ ul.unlock();
+
+ // Make sure the VectorClock advances at least up to the just recovered
+ // durable time
+ _advanceTime({newDurableTime.second.clusterTime(),
+ newDurableTime.second.configTime(),
+ newDurableTime.second.topologyTime()});
+
+ for (auto& p : promises)
+ p->emplaceValue();
+ })
+ .onError([this](Status status) {
+ stdx::unique_lock ul(_mutex);
+ std::vector<Queue::value_type::second_type> promises;
+ for (auto it = _queue.begin(); it != _queue.end();) {
+ promises.emplace_back(std::move(it->second));
+ it = _queue.erase(it);
+ }
+ ul.unlock();
+
+ for (auto& p : promises)
+ p->setError(status);
+ })
+ .onCompletion([this, service](auto) {
+ {
+ stdx::lock_guard lg(_mutex);
+ if (_queue.empty()) {
+ _loopScheduled = false;
+ return Future<void>::makeReady();
+ }
+ }
+ return _doWhileQueueNotEmptyOrError(service);
+ });
+
+ // Blocking work to recover and/or persist the current vector time
+ ExecutorFuture<void>(Grid::get(service)->getExecutorPool()->getFixedExecutor())
+ .then([this, service] {
+ auto [generation, mustRecoverDurableTime] = [&] {
+ stdx::lock_guard lg(_mutex);
+ return std::make_pair(_generation, !_durableTime);
+ }();
+
+ ThreadClient tc("VectorClockStateOperation", service);
+
+ {
+ stdx::lock_guard<Client> lk(*tc.get());
+ tc->setSystemOperationKillable(lk);
+ }
+
+ const auto opCtxHolder = tc->makeOperationContext();
+ auto* const opCtx = opCtxHolder.get();
+
+ if (mustRecoverDurableTime) {
+ VectorClockDocument durableVectorClock;
+
+ PersistentTaskStore<VectorClockDocument> store(
+ NamespaceString::kVectorClockNamespace);
+ store.forEach(
+ opCtx,
+ QUERY(VectorClockDocument::k_idFieldName << durableVectorClock.get_id()),
+ [&, numDocsFound = 0](const auto& doc) mutable {
+ invariant(++numDocsFound == 1);
+ durableVectorClock = doc;
+ return true;
+ });
+
+ return std::make_pair(
+ generation,
+ VectorTime({LogicalTime(Timestamp(0)),
+ LogicalTime(durableVectorClock.getConfigTime()),
+ LogicalTime(durableVectorClock.getTopologyTime())}));
+ } else {
+ auto vectorTime = getTime();
+
+ auto* const replCoord = repl::ReplicationCoordinator::get(opCtx);
+ if (replCoord->getMemberState().primary()) {
+ // Persist as primary
+ const VectorClockDocument vcd(vectorTime.configTime().asTimestamp(),
+ vectorTime.topologyTime().asTimestamp());
+
+ PersistentTaskStore<VectorClockDocument> store(
+ NamespaceString::kVectorClockNamespace);
+ store.update(opCtx,
+ QUERY(VectorClockDocument::k_idFieldName << vcd.get_id()),
+ vcd.toBSON(),
+ WriteConcerns::kMajorityWriteConcern,
+ true /* upsert */);
+ } else {
+ // Persist as secondary, by asking the primary
+ auto const shardingState = ShardingState::get(opCtx);
+ invariant(shardingState->enabled());
+
+ auto selfShard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(
+ opCtx, shardingState->shardId()));
+
+ auto cmdResponse = uassertStatusOK(selfShard->runCommandWithFixedRetryAttempts(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ NamespaceString::kVectorClockNamespace.toString(),
+ BSON("_vectorClockPersist" << 1),
+ Seconds{30},
+ Shard::RetryPolicy::kIdempotent));
+
+ uassertStatusOK(cmdResponse.commandStatus);
+ }
+
+ return std::make_pair(generation, vectorTime);
+ }
+ })
+ .getAsync([this, promise = std::move(p)](
+ StatusWith<std::pair<uint64_t, VectorTime>> swResult) mutable {
+ promise.setFromStatusWith(std::move(swResult));
+ });
+
+ return future;
+}
+
bool VectorClockMongoD::_gossipOutInternal(OperationContext* opCtx,
BSONObjBuilder* out,
const LogicalTimeArray& time) const {
@@ -430,49 +477,5 @@ void VectorClockMongoD::_tickTo(Component component, LogicalTime newTime) {
MONGO_UNREACHABLE;
}
-void VectorClockMongoD::_persistComponent(OperationContext* opCtx,
- BSONObjBuilder* out,
- const VectorTime& time,
- Component component) const {
- _vectorClockStateFormatters[component]->out(
- _service, opCtx, true, out, time[component], component);
-}
-
-void VectorClockMongoD::_recoverComponent(OperationContext* opCtx,
- const BSONObj& in,
- LogicalTimeArray* newTime,
- Component component) {
- (*newTime)[component] = _vectorClockStateFormatters[component]->in(
- _service, opCtx, in, true /*couldBeUnauthenticated*/, component);
-}
-
-SharedSemiFuture<void> VectorClockMongoD::persist() {
- if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
- auto serviceContext = vectorClockMongoDDecoration.owner(this);
-
- const auto replCoord = repl::ReplicationCoordinator::get(serviceContext);
- if (replCoord->getMemberState().primary()) {
- return _persistOperation.performOperation(this, serviceContext);
- }
-
- return _remotePersistOperation.performOperation(this, serviceContext);
- }
-
- return SharedSemiFuture<void>();
-}
-
-void VectorClockMongoD::waitForInMemoryVectorClockToBePersisted() {
- _persistOperation.waitForCompletion();
-}
-
-SharedSemiFuture<void> VectorClockMongoD::recover() {
- auto serviceContext = vectorClockMongoDDecoration.owner(this);
- return _recoverOperation.performOperation(this, serviceContext);
-}
-
-void VectorClockMongoD::waitForVectorClockToBeRecovered() {
- _recoverOperation.waitForCompletion();
-}
-
} // namespace
} // namespace mongo