diff options
author | Denis Grebennicov <denis.grebennicov@mongodb.com> | 2021-11-10 10:37:00 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-11-10 11:05:06 +0000 |
commit | 7b080dde19c982aee81f2a338295119e2e751a96 (patch) | |
tree | e0a48482517257ff0e386657fb45135afbc95feb /src | |
parent | bdc2e9b2ed299f4dfbf6183eed94707afbde8478 (diff) | |
download | mongo-7b080dde19c982aee81f2a338295119e2e751a96.tar.gz |
SERVER-58691 Implement deletion of pre-images which corresponding oplog entry fell off the oplog
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/mongod_main.cpp | 28 | ||||
-rw-r--r-- | src/mongo/db/pipeline/SConscript | 17 | ||||
-rw-r--r-- | src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp | 317 | ||||
-rw-r--r-- | src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h | 63 | ||||
-rw-r--r-- | src/mongo/db/pipeline/change_stream_preimage.idl | 19 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface.h | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface_impl.cpp | 33 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface_impl.h | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface_mock.h | 4 |
10 files changed, 488 insertions, 0 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 50079a32709..8c15cc0f22e 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -2229,6 +2229,7 @@ env.Library( # please add that library as a private libdep of # mongod_initializers. '$BUILD_DIR/mongo/client/clientdriver_minimal', + '$BUILD_DIR/mongo/db/pipeline/change_stream_expired_pre_image_remover', '$BUILD_DIR/mongo/s/grid', '$BUILD_DIR/mongo/s/sessions_collection_sharded', '$BUILD_DIR/mongo/scripting/scripting', diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp index d8944c895f2..0fa7f615786 100644 --- a/src/mongo/db/mongod_main.cpp +++ b/src/mongo/db/mongod_main.cpp @@ -107,6 +107,7 @@ #include "mongo/db/op_observer_registry.h" #include "mongo/db/operation_context.h" #include "mongo/db/periodic_runner_job_abort_expired_transactions.h" +#include "mongo/db/pipeline/change_stream_expired_pre_image_remover.h" #include "mongo/db/pipeline/process_interface/replica_set_node_process_interface.h" #include "mongo/db/query/internal_plans.h" #include "mongo/db/read_write_concern_defaults_cache_lookup_mongod.h" @@ -764,6 +765,23 @@ ExitCode _initAndListen(ServiceContext* serviceContext, int listenPort) { } } + // Start a background task to periodically remove expired pre-images from the 'system.preimages' + // collection if not in standalone mode. + const auto isStandalone = + repl::ReplicationCoordinator::get(serviceContext)->getReplicationMode() == + repl::ReplicationCoordinator::modeNone; + if (!isStandalone) { + try { + PeriodicChangeStreamExpiredPreImagesRemover::get(serviceContext)->start(); + } catch (ExceptionFor<ErrorCodes::PeriodicJobIsStopped>&) { + LOGV2_WARNING(5869107, "Not starting periodic jobs as shutdown is in progress"); + // Shutdown has already started before initialization is complete. Wait for the + // shutdown task to complete and return. + MONGO_IDLE_THREAD_BLOCK; + return waitForShutdown(); + } + } + // Set up the logical session cache LogicalSessionCacheServer kind = LogicalSessionCacheServer::kStandalone; if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { @@ -1221,6 +1239,16 @@ void shutdownTask(const ShutdownTaskArgs& shutdownArgs) { exec->join(); } + const auto isStandalone = + repl::ReplicationCoordinator::get(serviceContext)->getReplicationMode() == + repl::ReplicationCoordinator::modeNone; + if (!isStandalone) { + LOGV2_OPTIONS(5869108, + {LogComponent::kQuery}, + "Shutting down the ChangeStreamExpiredPreImagesRemover"); + PeriodicChangeStreamExpiredPreImagesRemover::get(serviceContext)->stop(); + } + if (auto storageEngine = serviceContext->getStorageEngine()) { if (storageEngine->supportsReadConcernSnapshot()) { LOGV2(4784908, "Shutting down the PeriodicThreadToAbortExpiredTransactions"); diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index b5c7309ea06..84bb5c0512f 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -33,6 +33,7 @@ env.Library( LIBDEPS=[ '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/idl/idl_parser', + '$BUILD_DIR/mongo/idl/server_parameter', ], ) @@ -443,6 +444,22 @@ env.Library( ], ) +env.Library( + target='change_stream_expired_pre_image_remover', + source=[ + 'change_stream_expired_pre_image_remover.cpp' + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/db/db_raii', + '$BUILD_DIR/mongo/db/query_exec', + '$BUILD_DIR/mongo/db/record_id_helpers', + '$BUILD_DIR/mongo/db/repl/oplog_entry', + '$BUILD_DIR/mongo/db/repl/storage_interface', + '$BUILD_DIR/mongo/util/periodic_runner', + 'change_stream_preimage', + ] +) + env.CppUnitTest( target='db_pipeline_test', source=[ diff --git a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp new file mode 100644 index 00000000000..10ae7280df4 --- /dev/null +++ b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp @@ -0,0 +1,317 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery + +#include "mongo/platform/basic.h" + +#include "change_stream_expired_pre_image_remover.h" + +#include "mongo/db/client.h" +#include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/pipeline/change_stream_preimage_gen.h" +#include "mongo/db/query/internal_plans.h" +#include "mongo/db/record_id_helpers.h" +#include "mongo/db/repl/oplog_entry_gen.h" +#include "mongo/db/repl/storage_interface.h" +#include "mongo/db/service_context.h" +#include "mongo/logv2/log.h" + +namespace mongo { + +namespace { +/** + * Scans the 'config.system.preimages' collection and deletes the expired pre-images from it. + * + * Pre-images are ordered by collection UUID, ie. if UUID of collection A is ordered before UUID of + * collection B, then pre-images of collection A will be stored before pre-images of collection B. + * + * While scanning the collection for expired pre-images, each pre-image timestamp is compared + * against the 'earliestOplogEntryTimestamp' value. Any pre-image that has a timestamp greater than + * the 'earliestOplogEntryTimestamp' value is not considered for deletion and the cursor seeks to + * the next UUID in the collection. + * + * Seek to the next UUID is done by setting the values of 'Timestamp' and 'ApplyOpsIndex' fields to + * max, ie. (currentPreImage.nsUUID, Timestamp::max(), ApplyOpsIndex::max()). + * + * +-------------------------+ + * | config.system.preimages | + * +------------+------------+ + * | + * +--------------------+---------+---------+-----------------------+ + * | | | | + * +-----------+-------+ +----------+--------+ +--------+----------+ +----------+--------+ + * | collA.preImageA | | collA.preImageB | | collB.preImageC | | collB.preImageD | + * +-----------+-------+ +----------+--------+ +---------+---------+ +----------+--------+ + * | timestamp: 1 | | timestamp: 10 | | timestamp: 5 | | timestamp: 9 | + * | applyIndex: 0 | | applyIndex: 0 | | applyIndex: 0 | | applyIndex: 1 | + * +-------------------+ +-------------------+ +-------------------+ +-------------------+ + */ +class ChangeStreamExpiredPreImageIterator { +public: + // Iterator over the expired pre-images. + struct Iterator { + using iterator_category = std::forward_iterator_tag; + using difference_type = void; + using value_type = BSONObj; + using pointer = const BSONObj*; + using reference = const BSONObj&; + + Iterator(OperationContext* opCtx, + const CollectionPtr* preImagesCollPtr, + Timestamp earliestOplogEntryTimestamp, + boost::optional<ChangeStreamPreImageId> minPreImageId = boost::none) + : _opCtx(opCtx), + _preImagesCollPtr(preImagesCollPtr), + _earliestOplogEntryTimestamp(earliestOplogEntryTimestamp) { + setupPlanExecutor(minPreImageId); + advance(); + } + + reference operator*() const { + return _currentPreImageObj; + } + + pointer operator->() const { + return &_currentPreImageObj; + } + + Iterator& operator++() { + advance(); + return *this; + } + + // Both iterators are equal if they are both at the same pre-image. + friend bool operator==(const Iterator& a, const Iterator& b) { + return a._currentPreImageObj.woCompare(b._currentPreImageObj) == 0; + }; + + friend bool operator!=(const Iterator& a, const Iterator& b) { + return !(a == b); + }; + + void saveState() { + _planExecutor->saveState(); + } + + void restoreState() { + _planExecutor->restoreState(_preImagesCollPtr); + } + + private: + // Scans the pre-images collection and gets the next expired pre-image or sets + // 'currentPreImage' to BSONObj() in case there are no more expired pre-images left. + void advance() { + const auto getNextPreImage = [&]() -> boost::optional<ChangeStreamPreImage> { + if (_planExecutor->getNext(&_currentPreImageObj, nullptr) == PlanExecutor::IS_EOF) { + _currentPreImageObj = BSONObj(); + return boost::none; + } + + return {ChangeStreamPreImage::parse(IDLParserErrorContext("pre-image"), + _currentPreImageObj)}; + }; + + // If current collection has no more expired pre-images, fetch the first pre-image from + // the next collection that has pre-images enabled. + boost::optional<ChangeStreamPreImage> preImage; + while ((preImage = getNextPreImage()) && !isExpiredPreImage(*preImage)) { + // Set the maximum values for timestamp and apply ops fields such that we jump to + // the next collection that has the pre-images enabled. + preImage->getId().setTs(Timestamp::max()); + preImage->getId().setApplyOpsIndex(std::numeric_limits<int64_t>::max()); + setupPlanExecutor(preImage->getId()); + } + } + + // Pre-image is expired if its timestamp is smaller than the 'earliestOplogEntryTimestamp' + bool isExpiredPreImage(const ChangeStreamPreImage& preImage) const { + return preImage.getId().getTs() < _earliestOplogEntryTimestamp; + } + + // Set up the new collection scan to start from the 'minPreImageId'. + void setupPlanExecutor(boost::optional<ChangeStreamPreImageId> minPreImageId) { + const auto minRecordId = + (minPreImageId ? boost::optional<RecordId>(record_id_helpers::keyForElem( + BSON("_id" << minPreImageId->toBSON()).firstElement())) + : boost::none); + _planExecutor = + InternalPlanner::collectionScan(_opCtx, + _preImagesCollPtr, + PlanYieldPolicy::YieldPolicy::YIELD_AUTO, + InternalPlanner::Direction::FORWARD, + boost::none, + minRecordId); + } + + OperationContext* _opCtx; + const CollectionPtr* _preImagesCollPtr; + BSONObj _currentPreImageObj; + Timestamp _earliestOplogEntryTimestamp; + std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> _planExecutor; + }; + + ChangeStreamExpiredPreImageIterator(OperationContext* opCtx, + const CollectionPtr* preImagesCollPtr, + Timestamp earliestOplogEntryTimestamp) + : _opCtx(opCtx), + _preImagesCollPtr(preImagesCollPtr), + _earliestOplogEntryTimestamp(earliestOplogEntryTimestamp) {} + + Iterator begin() const { + return Iterator(_opCtx, _preImagesCollPtr, _earliestOplogEntryTimestamp); + } + + Iterator end() const { + // For end iterator the collection scan 'minRecordId' has to be maximum pre-image id. + const auto maxUUID = UUID::parse("FFFFFFFF-FFFF-FFFF-FFFF-FFFFFFFFFFFF").getValue(); + ChangeStreamPreImageId maxPreImageId( + maxUUID, Timestamp::max(), std::numeric_limits<int64_t>::max()); + return Iterator(_opCtx, _preImagesCollPtr, _earliestOplogEntryTimestamp, maxPreImageId); + } + +private: + OperationContext* _opCtx; + const CollectionPtr* _preImagesCollPtr; + Timestamp _earliestOplogEntryTimestamp; +}; + +void deleteExpiredChangeStreamPreImages(Client* client) { + const auto startTime = Date_t::now(); + auto opCtx = client->makeOperationContext(); + + // Acquire intent-exclusive lock on the pre-images collection. Early exit if the collection + // doesn't exist. + AutoGetCollection autoColl( + opCtx.get(), NamespaceString::kChangeStreamPreImagesNamespace, MODE_IX); + const auto& preImagesColl = autoColl.getCollection(); + if (!preImagesColl) { + return; + } + + const bool shouldReplicateDeletes = gIsChangeStreamExpiredPreImageRemovalJobReplicating.load(); + const auto isPrimary = repl::ReplicationCoordinator::get(opCtx.get()) + ->canAcceptWritesForDatabase(opCtx.get(), NamespaceString::kAdminDb); + + // Do not run the job on secondaries if we should explicitly replicate the deletes. + if (!isPrimary && shouldReplicateDeletes) { + return; + } + + // Get the timestamp of the ealiest oplog entry. + const auto currentEarliestOplogEntryTs = + repl::StorageInterface::get(client->getServiceContext()) + ->getEarliestOplogTimestamp(opCtx.get()); + + // Iterate over all expired pre-images and remove them. + size_t numberOfRemovals = 0; + ChangeStreamExpiredPreImageIterator expiredPreImages( + opCtx.get(), &preImagesColl, currentEarliestOplogEntryTs); + // TODO SERVER-58693: consider adopting a recordID range-based deletion policy instead of + // iterating. + for (auto it = expiredPreImages.begin(); it != expiredPreImages.end(); ++it) { + it.saveState(); + + writeConflictRetry( + opCtx.get(), + "ChangeStreamExpiredPreImagesRemover", + NamespaceString::kChangeStreamPreImagesNamespace.ns(), + [&] { + boost::optional<repl::UnreplicatedWritesBlock> unReplBlock; + // TODO SERVER-60238: write the tests for non-replicating deletes, when pre-image + // replication to secondaries is implemented. + if (!shouldReplicateDeletes) { + unReplBlock.emplace(opCtx.get()); + } + + WriteUnitOfWork wuow(opCtx.get()); + const auto recordId = + record_id_helpers::keyForElem(it->getField(ChangeStreamPreImage::kIdFieldName)); + preImagesColl->deleteDocument( + opCtx.get(), kUninitializedStmtId, recordId, &CurOp::get(*opCtx)->debug()); + wuow.commit(); + numberOfRemovals++; + }); + + it.restoreState(); + } + + LOGV2(5869104, + "Periodic expired pre-images removal job finished executing", + "numberOfRemovals"_attr = numberOfRemovals, + "jobDuration"_attr = (Date_t::now() - startTime).toString()); +} +} // namespace + +PeriodicChangeStreamExpiredPreImagesRemover& PeriodicChangeStreamExpiredPreImagesRemover::get( + ServiceContext* serviceContext) { + auto& jobContainer = _serviceDecoration(serviceContext); + jobContainer._init(serviceContext); + return jobContainer; +} + +PeriodicJobAnchor& PeriodicChangeStreamExpiredPreImagesRemover::operator*() const noexcept { + stdx::lock_guard lk(_mutex); + return *_anchor; +} + +PeriodicJobAnchor* PeriodicChangeStreamExpiredPreImagesRemover::operator->() const noexcept { + stdx::lock_guard lk(_mutex); + return _anchor.get(); +} + +void PeriodicChangeStreamExpiredPreImagesRemover::_init(ServiceContext* serviceContext) { + stdx::lock_guard lk(_mutex); + if (_anchor) { + return; + } + + auto periodicRunner = serviceContext->getPeriodicRunner(); + invariant(periodicRunner); + + PeriodicRunner::PeriodicJob job( + "ChangeStreamExpiredPreImagesRemover", + [](Client* client) { + try { + deleteExpiredChangeStreamPreImages(client); + } catch (const ExceptionForCat<ErrorCategory::Interruption>&) { + LOGV2_WARNING(5869105, "Periodic expired pre-images removal job was interrupted"); + } catch (const DBException& exception) { + LOGV2_ERROR(5869106, + "Periodic expired pre-images removal job failed", + "reason"_attr = exception.reason()); + } + }, + Seconds(gExpiredChangeStreamPreImageRemovalJobSleepSecs.load())); + + _anchor = std::make_shared<PeriodicJobAnchor>(periodicRunner->makeJob(std::move(job))); +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h new file mode 100644 index 00000000000..b711b7b21c5 --- /dev/null +++ b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h @@ -0,0 +1,63 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/service_context.h" +#include "mongo/platform/mutex.h" +#include "mongo/util/hierarchical_acquisition.h" +#include "mongo/util/periodic_runner.h" + +namespace mongo { + +class ServiceContext; + +/** + * A periodic background job to remove expired documents from 'system.preimages' collection. A + * document in this collection is considered expired if its corresponding oplog entry falls off the + * oplog. + */ +class PeriodicChangeStreamExpiredPreImagesRemover final { +public: + static PeriodicChangeStreamExpiredPreImagesRemover& get(ServiceContext* serviceContext); + + PeriodicJobAnchor& operator*() const noexcept; + PeriodicJobAnchor* operator->() const noexcept; + +private: + void _init(ServiceContext* serviceContext); + + inline static const auto _serviceDecoration = + ServiceContext::declareDecoration<PeriodicChangeStreamExpiredPreImagesRemover>(); + + mutable Mutex _mutex = MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(1), + "PeriodicChangeStreamExpiredPreImagesRemover::_mutex"); + std::shared_ptr<PeriodicJobAnchor> _anchor; +}; +} // namespace mongo diff --git a/src/mongo/db/pipeline/change_stream_preimage.idl b/src/mongo/db/pipeline/change_stream_preimage.idl index 2a34bcc0512..5128a530c5c 100644 --- a/src/mongo/db/pipeline/change_stream_preimage.idl +++ b/src/mongo/db/pipeline/change_stream_preimage.idl @@ -34,6 +34,25 @@ global: imports: - "mongo/idl/basic_types.idl" +server_parameters: + expiredChangeStreamPreImageRemovalJobSleepSecs: + description: >- + Specifies the period within which expired pre-image removal job is running. + set_at: [ startup ] + cpp_vartype: AtomicWord<int> + cpp_varname: gExpiredChangeStreamPreImageRemovalJobSleepSecs + validator: + gte: 1 + default: 10 + + isChangeStreamExpiredPreImageRemovalJobReplicating: + description: >- + Specifies if the expired pre-images removal job is replicating the delete operations. + set_at: [ startup ] + cpp_vartype: AtomicWord<bool> + cpp_varname: gIsChangeStreamExpiredPreImageRemovalJobReplicating + default: true + structs: ChangeStreamPreImageId: description: Uniquely identifies a pre-image for a given node or replica set. diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h index 027b14af6df..a2aaff9153b 100644 --- a/src/mongo/db/repl/storage_interface.h +++ b/src/mongo/db/repl/storage_interface.h @@ -357,6 +357,11 @@ public: OperationContext* opCtx, const CollectionPtr& oplog, const Timestamp& timestamp) = 0; /** + * Fetches the oldest oplog entry's timestamp. + */ + virtual Timestamp getEarliestOplogTimestamp(OperationContext* opCtx) = 0; + + /** * Fetches the latest oplog entry's timestamp. Bypasses the oplog visibility rules. */ virtual Timestamp getLatestOplogTimestamp(OperationContext* opCtx) = 0; diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp index 0a49be47d85..d21812000d7 100644 --- a/src/mongo/db/repl/storage_interface_impl.cpp +++ b/src/mongo/db/repl/storage_interface_impl.cpp @@ -1222,6 +1222,39 @@ boost::optional<BSONObj> StorageInterfaceImpl::findOplogEntryLessThanOrEqualToTi } } +Timestamp StorageInterfaceImpl::getEarliestOplogTimestamp(OperationContext* opCtx) { + auto statusWithTimestamp = [&]() { + AutoGetOplog oplogRead(opCtx, OplogAccessMode::kRead); + return oplogRead.getCollection()->getRecordStore()->getEarliestOplogTimestamp(opCtx); + }(); + + // If the storage engine does not support getEarliestOplogTimestamp(), then fall back to higher + // level (above the storage engine) logic to fetch the earliest oplog entry timestamp. + if (statusWithTimestamp.getStatus() == ErrorCodes::OplogOperationUnsupported) { + // Reset the snapshot so that it is ensured to see the latest oplog entries. + opCtx->recoveryUnit()->abandonSnapshot(); + + BSONObj oplogEntryBSON; + tassert(5869100, + "Failed reading the earliest oplog entry", + Helpers::getSingleton( + opCtx, NamespaceString::kRsOplogNamespace.ns().c_str(), oplogEntryBSON)); + + auto optime = OpTime::parseFromOplogEntry(oplogEntryBSON); + tassert(5869101, + str::stream() << "Found an invalid oplog entry: " << oplogEntryBSON + << ", error: " << optime.getStatus(), + optime.isOK()); + return optime.getValue().getTimestamp(); + } + + tassert(5869102, + str::stream() << "Expected oplog entries to exist: " << statusWithTimestamp.getStatus(), + statusWithTimestamp.isOK()); + + return statusWithTimestamp.getValue(); +} + Timestamp StorageInterfaceImpl::getLatestOplogTimestamp(OperationContext* opCtx) { auto statusWithTimestamp = [&]() { AutoGetOplog oplogRead(opCtx, OplogAccessMode::kRead); diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h index f99b3a0c094..3dce1e53c61 100644 --- a/src/mongo/db/repl/storage_interface_impl.h +++ b/src/mongo/db/repl/storage_interface_impl.h @@ -156,6 +156,7 @@ public: boost::optional<BSONObj> findOplogEntryLessThanOrEqualToTimestampRetryOnWCE( OperationContext* opCtx, const CollectionPtr& oplog, const Timestamp& timestamp) override; + Timestamp getEarliestOplogTimestamp(OperationContext* opCtx) override; Timestamp getLatestOplogTimestamp(OperationContext* opCtx) override; StatusWith<StorageInterface::CollectionSize> getCollectionSize( diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h index 97b0a312584..747f82776c5 100644 --- a/src/mongo/db/repl/storage_interface_mock.h +++ b/src/mongo/db/repl/storage_interface_mock.h @@ -287,6 +287,10 @@ public: return boost::none; } + Timestamp getEarliestOplogTimestamp(OperationContext* opCtx) override { + return Timestamp(); + } + Timestamp getLatestOplogTimestamp(OperationContext* opCtx) override { return Timestamp(); } |