summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDenis Grebennicov <denis.grebennicov@mongodb.com>2021-11-10 10:37:00 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-11-10 11:05:06 +0000
commit7b080dde19c982aee81f2a338295119e2e751a96 (patch)
treee0a48482517257ff0e386657fb45135afbc95feb /src
parentbdc2e9b2ed299f4dfbf6183eed94707afbde8478 (diff)
downloadmongo-7b080dde19c982aee81f2a338295119e2e751a96.tar.gz
SERVER-58691 Implement deletion of pre-images which corresponding oplog entry fell off the oplog
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/mongod_main.cpp28
-rw-r--r--src/mongo/db/pipeline/SConscript17
-rw-r--r--src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp317
-rw-r--r--src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h63
-rw-r--r--src/mongo/db/pipeline/change_stream_preimage.idl19
-rw-r--r--src/mongo/db/repl/storage_interface.h5
-rw-r--r--src/mongo/db/repl/storage_interface_impl.cpp33
-rw-r--r--src/mongo/db/repl/storage_interface_impl.h1
-rw-r--r--src/mongo/db/repl/storage_interface_mock.h4
10 files changed, 488 insertions, 0 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 50079a32709..8c15cc0f22e 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -2229,6 +2229,7 @@ env.Library(
# please add that library as a private libdep of
# mongod_initializers.
'$BUILD_DIR/mongo/client/clientdriver_minimal',
+ '$BUILD_DIR/mongo/db/pipeline/change_stream_expired_pre_image_remover',
'$BUILD_DIR/mongo/s/grid',
'$BUILD_DIR/mongo/s/sessions_collection_sharded',
'$BUILD_DIR/mongo/scripting/scripting',
diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp
index d8944c895f2..0fa7f615786 100644
--- a/src/mongo/db/mongod_main.cpp
+++ b/src/mongo/db/mongod_main.cpp
@@ -107,6 +107,7 @@
#include "mongo/db/op_observer_registry.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/periodic_runner_job_abort_expired_transactions.h"
+#include "mongo/db/pipeline/change_stream_expired_pre_image_remover.h"
#include "mongo/db/pipeline/process_interface/replica_set_node_process_interface.h"
#include "mongo/db/query/internal_plans.h"
#include "mongo/db/read_write_concern_defaults_cache_lookup_mongod.h"
@@ -764,6 +765,23 @@ ExitCode _initAndListen(ServiceContext* serviceContext, int listenPort) {
}
}
+ // Start a background task to periodically remove expired pre-images from the 'system.preimages'
+ // collection if not in standalone mode.
+ const auto isStandalone =
+ repl::ReplicationCoordinator::get(serviceContext)->getReplicationMode() ==
+ repl::ReplicationCoordinator::modeNone;
+ if (!isStandalone) {
+ try {
+ PeriodicChangeStreamExpiredPreImagesRemover::get(serviceContext)->start();
+ } catch (ExceptionFor<ErrorCodes::PeriodicJobIsStopped>&) {
+ LOGV2_WARNING(5869107, "Not starting periodic jobs as shutdown is in progress");
+ // Shutdown has already started before initialization is complete. Wait for the
+ // shutdown task to complete and return.
+ MONGO_IDLE_THREAD_BLOCK;
+ return waitForShutdown();
+ }
+ }
+
// Set up the logical session cache
LogicalSessionCacheServer kind = LogicalSessionCacheServer::kStandalone;
if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
@@ -1221,6 +1239,16 @@ void shutdownTask(const ShutdownTaskArgs& shutdownArgs) {
exec->join();
}
+ const auto isStandalone =
+ repl::ReplicationCoordinator::get(serviceContext)->getReplicationMode() ==
+ repl::ReplicationCoordinator::modeNone;
+ if (!isStandalone) {
+ LOGV2_OPTIONS(5869108,
+ {LogComponent::kQuery},
+ "Shutting down the ChangeStreamExpiredPreImagesRemover");
+ PeriodicChangeStreamExpiredPreImagesRemover::get(serviceContext)->stop();
+ }
+
if (auto storageEngine = serviceContext->getStorageEngine()) {
if (storageEngine->supportsReadConcernSnapshot()) {
LOGV2(4784908, "Shutting down the PeriodicThreadToAbortExpiredTransactions");
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index b5c7309ea06..84bb5c0512f 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -33,6 +33,7 @@ env.Library(
LIBDEPS=[
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/idl/idl_parser',
+ '$BUILD_DIR/mongo/idl/server_parameter',
],
)
@@ -443,6 +444,22 @@ env.Library(
],
)
+env.Library(
+ target='change_stream_expired_pre_image_remover',
+ source=[
+ 'change_stream_expired_pre_image_remover.cpp'
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/db/db_raii',
+ '$BUILD_DIR/mongo/db/query_exec',
+ '$BUILD_DIR/mongo/db/record_id_helpers',
+ '$BUILD_DIR/mongo/db/repl/oplog_entry',
+ '$BUILD_DIR/mongo/db/repl/storage_interface',
+ '$BUILD_DIR/mongo/util/periodic_runner',
+ 'change_stream_preimage',
+ ]
+)
+
env.CppUnitTest(
target='db_pipeline_test',
source=[
diff --git a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp
new file mode 100644
index 00000000000..10ae7280df4
--- /dev/null
+++ b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp
@@ -0,0 +1,317 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
+
+#include "mongo/platform/basic.h"
+
+#include "change_stream_expired_pre_image_remover.h"
+
+#include "mongo/db/client.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/pipeline/change_stream_preimage_gen.h"
+#include "mongo/db/query/internal_plans.h"
+#include "mongo/db/record_id_helpers.h"
+#include "mongo/db/repl/oplog_entry_gen.h"
+#include "mongo/db/repl/storage_interface.h"
+#include "mongo/db/service_context.h"
+#include "mongo/logv2/log.h"
+
+namespace mongo {
+
+namespace {
+/**
+ * Scans the 'config.system.preimages' collection and deletes the expired pre-images from it.
+ *
+ * Pre-images are ordered by collection UUID, ie. if UUID of collection A is ordered before UUID of
+ * collection B, then pre-images of collection A will be stored before pre-images of collection B.
+ *
+ * While scanning the collection for expired pre-images, each pre-image timestamp is compared
+ * against the 'earliestOplogEntryTimestamp' value. Any pre-image that has a timestamp greater than
+ * the 'earliestOplogEntryTimestamp' value is not considered for deletion and the cursor seeks to
+ * the next UUID in the collection.
+ *
+ * Seek to the next UUID is done by setting the values of 'Timestamp' and 'ApplyOpsIndex' fields to
+ * max, ie. (currentPreImage.nsUUID, Timestamp::max(), ApplyOpsIndex::max()).
+ *
+ * +-------------------------+
+ * | config.system.preimages |
+ * +------------+------------+
+ * |
+ * +--------------------+---------+---------+-----------------------+
+ * | | | |
+ * +-----------+-------+ +----------+--------+ +--------+----------+ +----------+--------+
+ * | collA.preImageA | | collA.preImageB | | collB.preImageC | | collB.preImageD |
+ * +-----------+-------+ +----------+--------+ +---------+---------+ +----------+--------+
+ * | timestamp: 1 | | timestamp: 10 | | timestamp: 5 | | timestamp: 9 |
+ * | applyIndex: 0 | | applyIndex: 0 | | applyIndex: 0 | | applyIndex: 1 |
+ * +-------------------+ +-------------------+ +-------------------+ +-------------------+
+ */
+class ChangeStreamExpiredPreImageIterator {
+public:
+ // Iterator over the expired pre-images.
+ struct Iterator {
+ using iterator_category = std::forward_iterator_tag;
+ using difference_type = void;
+ using value_type = BSONObj;
+ using pointer = const BSONObj*;
+ using reference = const BSONObj&;
+
+ Iterator(OperationContext* opCtx,
+ const CollectionPtr* preImagesCollPtr,
+ Timestamp earliestOplogEntryTimestamp,
+ boost::optional<ChangeStreamPreImageId> minPreImageId = boost::none)
+ : _opCtx(opCtx),
+ _preImagesCollPtr(preImagesCollPtr),
+ _earliestOplogEntryTimestamp(earliestOplogEntryTimestamp) {
+ setupPlanExecutor(minPreImageId);
+ advance();
+ }
+
+ reference operator*() const {
+ return _currentPreImageObj;
+ }
+
+ pointer operator->() const {
+ return &_currentPreImageObj;
+ }
+
+ Iterator& operator++() {
+ advance();
+ return *this;
+ }
+
+ // Both iterators are equal if they are both at the same pre-image.
+ friend bool operator==(const Iterator& a, const Iterator& b) {
+ return a._currentPreImageObj.woCompare(b._currentPreImageObj) == 0;
+ };
+
+ friend bool operator!=(const Iterator& a, const Iterator& b) {
+ return !(a == b);
+ };
+
+ void saveState() {
+ _planExecutor->saveState();
+ }
+
+ void restoreState() {
+ _planExecutor->restoreState(_preImagesCollPtr);
+ }
+
+ private:
+ // Scans the pre-images collection and gets the next expired pre-image or sets
+ // 'currentPreImage' to BSONObj() in case there are no more expired pre-images left.
+ void advance() {
+ const auto getNextPreImage = [&]() -> boost::optional<ChangeStreamPreImage> {
+ if (_planExecutor->getNext(&_currentPreImageObj, nullptr) == PlanExecutor::IS_EOF) {
+ _currentPreImageObj = BSONObj();
+ return boost::none;
+ }
+
+ return {ChangeStreamPreImage::parse(IDLParserErrorContext("pre-image"),
+ _currentPreImageObj)};
+ };
+
+ // If current collection has no more expired pre-images, fetch the first pre-image from
+ // the next collection that has pre-images enabled.
+ boost::optional<ChangeStreamPreImage> preImage;
+ while ((preImage = getNextPreImage()) && !isExpiredPreImage(*preImage)) {
+ // Set the maximum values for timestamp and apply ops fields such that we jump to
+ // the next collection that has the pre-images enabled.
+ preImage->getId().setTs(Timestamp::max());
+ preImage->getId().setApplyOpsIndex(std::numeric_limits<int64_t>::max());
+ setupPlanExecutor(preImage->getId());
+ }
+ }
+
+ // Pre-image is expired if its timestamp is smaller than the 'earliestOplogEntryTimestamp'
+ bool isExpiredPreImage(const ChangeStreamPreImage& preImage) const {
+ return preImage.getId().getTs() < _earliestOplogEntryTimestamp;
+ }
+
+ // Set up the new collection scan to start from the 'minPreImageId'.
+ void setupPlanExecutor(boost::optional<ChangeStreamPreImageId> minPreImageId) {
+ const auto minRecordId =
+ (minPreImageId ? boost::optional<RecordId>(record_id_helpers::keyForElem(
+ BSON("_id" << minPreImageId->toBSON()).firstElement()))
+ : boost::none);
+ _planExecutor =
+ InternalPlanner::collectionScan(_opCtx,
+ _preImagesCollPtr,
+ PlanYieldPolicy::YieldPolicy::YIELD_AUTO,
+ InternalPlanner::Direction::FORWARD,
+ boost::none,
+ minRecordId);
+ }
+
+ OperationContext* _opCtx;
+ const CollectionPtr* _preImagesCollPtr;
+ BSONObj _currentPreImageObj;
+ Timestamp _earliestOplogEntryTimestamp;
+ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> _planExecutor;
+ };
+
+ ChangeStreamExpiredPreImageIterator(OperationContext* opCtx,
+ const CollectionPtr* preImagesCollPtr,
+ Timestamp earliestOplogEntryTimestamp)
+ : _opCtx(opCtx),
+ _preImagesCollPtr(preImagesCollPtr),
+ _earliestOplogEntryTimestamp(earliestOplogEntryTimestamp) {}
+
+ Iterator begin() const {
+ return Iterator(_opCtx, _preImagesCollPtr, _earliestOplogEntryTimestamp);
+ }
+
+ Iterator end() const {
+ // For end iterator the collection scan 'minRecordId' has to be maximum pre-image id.
+ const auto maxUUID = UUID::parse("FFFFFFFF-FFFF-FFFF-FFFF-FFFFFFFFFFFF").getValue();
+ ChangeStreamPreImageId maxPreImageId(
+ maxUUID, Timestamp::max(), std::numeric_limits<int64_t>::max());
+ return Iterator(_opCtx, _preImagesCollPtr, _earliestOplogEntryTimestamp, maxPreImageId);
+ }
+
+private:
+ OperationContext* _opCtx;
+ const CollectionPtr* _preImagesCollPtr;
+ Timestamp _earliestOplogEntryTimestamp;
+};
+
+void deleteExpiredChangeStreamPreImages(Client* client) {
+ const auto startTime = Date_t::now();
+ auto opCtx = client->makeOperationContext();
+
+ // Acquire intent-exclusive lock on the pre-images collection. Early exit if the collection
+ // doesn't exist.
+ AutoGetCollection autoColl(
+ opCtx.get(), NamespaceString::kChangeStreamPreImagesNamespace, MODE_IX);
+ const auto& preImagesColl = autoColl.getCollection();
+ if (!preImagesColl) {
+ return;
+ }
+
+ const bool shouldReplicateDeletes = gIsChangeStreamExpiredPreImageRemovalJobReplicating.load();
+ const auto isPrimary = repl::ReplicationCoordinator::get(opCtx.get())
+ ->canAcceptWritesForDatabase(opCtx.get(), NamespaceString::kAdminDb);
+
+ // Do not run the job on secondaries if we should explicitly replicate the deletes.
+ if (!isPrimary && shouldReplicateDeletes) {
+ return;
+ }
+
+ // Get the timestamp of the ealiest oplog entry.
+ const auto currentEarliestOplogEntryTs =
+ repl::StorageInterface::get(client->getServiceContext())
+ ->getEarliestOplogTimestamp(opCtx.get());
+
+ // Iterate over all expired pre-images and remove them.
+ size_t numberOfRemovals = 0;
+ ChangeStreamExpiredPreImageIterator expiredPreImages(
+ opCtx.get(), &preImagesColl, currentEarliestOplogEntryTs);
+ // TODO SERVER-58693: consider adopting a recordID range-based deletion policy instead of
+ // iterating.
+ for (auto it = expiredPreImages.begin(); it != expiredPreImages.end(); ++it) {
+ it.saveState();
+
+ writeConflictRetry(
+ opCtx.get(),
+ "ChangeStreamExpiredPreImagesRemover",
+ NamespaceString::kChangeStreamPreImagesNamespace.ns(),
+ [&] {
+ boost::optional<repl::UnreplicatedWritesBlock> unReplBlock;
+ // TODO SERVER-60238: write the tests for non-replicating deletes, when pre-image
+ // replication to secondaries is implemented.
+ if (!shouldReplicateDeletes) {
+ unReplBlock.emplace(opCtx.get());
+ }
+
+ WriteUnitOfWork wuow(opCtx.get());
+ const auto recordId =
+ record_id_helpers::keyForElem(it->getField(ChangeStreamPreImage::kIdFieldName));
+ preImagesColl->deleteDocument(
+ opCtx.get(), kUninitializedStmtId, recordId, &CurOp::get(*opCtx)->debug());
+ wuow.commit();
+ numberOfRemovals++;
+ });
+
+ it.restoreState();
+ }
+
+ LOGV2(5869104,
+ "Periodic expired pre-images removal job finished executing",
+ "numberOfRemovals"_attr = numberOfRemovals,
+ "jobDuration"_attr = (Date_t::now() - startTime).toString());
+}
+} // namespace
+
+PeriodicChangeStreamExpiredPreImagesRemover& PeriodicChangeStreamExpiredPreImagesRemover::get(
+ ServiceContext* serviceContext) {
+ auto& jobContainer = _serviceDecoration(serviceContext);
+ jobContainer._init(serviceContext);
+ return jobContainer;
+}
+
+PeriodicJobAnchor& PeriodicChangeStreamExpiredPreImagesRemover::operator*() const noexcept {
+ stdx::lock_guard lk(_mutex);
+ return *_anchor;
+}
+
+PeriodicJobAnchor* PeriodicChangeStreamExpiredPreImagesRemover::operator->() const noexcept {
+ stdx::lock_guard lk(_mutex);
+ return _anchor.get();
+}
+
+void PeriodicChangeStreamExpiredPreImagesRemover::_init(ServiceContext* serviceContext) {
+ stdx::lock_guard lk(_mutex);
+ if (_anchor) {
+ return;
+ }
+
+ auto periodicRunner = serviceContext->getPeriodicRunner();
+ invariant(periodicRunner);
+
+ PeriodicRunner::PeriodicJob job(
+ "ChangeStreamExpiredPreImagesRemover",
+ [](Client* client) {
+ try {
+ deleteExpiredChangeStreamPreImages(client);
+ } catch (const ExceptionForCat<ErrorCategory::Interruption>&) {
+ LOGV2_WARNING(5869105, "Periodic expired pre-images removal job was interrupted");
+ } catch (const DBException& exception) {
+ LOGV2_ERROR(5869106,
+ "Periodic expired pre-images removal job failed",
+ "reason"_attr = exception.reason());
+ }
+ },
+ Seconds(gExpiredChangeStreamPreImageRemovalJobSleepSecs.load()));
+
+ _anchor = std::make_shared<PeriodicJobAnchor>(periodicRunner->makeJob(std::move(job)));
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h
new file mode 100644
index 00000000000..b711b7b21c5
--- /dev/null
+++ b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h
@@ -0,0 +1,63 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/service_context.h"
+#include "mongo/platform/mutex.h"
+#include "mongo/util/hierarchical_acquisition.h"
+#include "mongo/util/periodic_runner.h"
+
+namespace mongo {
+
+class ServiceContext;
+
+/**
+ * A periodic background job to remove expired documents from 'system.preimages' collection. A
+ * document in this collection is considered expired if its corresponding oplog entry falls off the
+ * oplog.
+ */
+class PeriodicChangeStreamExpiredPreImagesRemover final {
+public:
+ static PeriodicChangeStreamExpiredPreImagesRemover& get(ServiceContext* serviceContext);
+
+ PeriodicJobAnchor& operator*() const noexcept;
+ PeriodicJobAnchor* operator->() const noexcept;
+
+private:
+ void _init(ServiceContext* serviceContext);
+
+ inline static const auto _serviceDecoration =
+ ServiceContext::declareDecoration<PeriodicChangeStreamExpiredPreImagesRemover>();
+
+ mutable Mutex _mutex = MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(1),
+ "PeriodicChangeStreamExpiredPreImagesRemover::_mutex");
+ std::shared_ptr<PeriodicJobAnchor> _anchor;
+};
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/change_stream_preimage.idl b/src/mongo/db/pipeline/change_stream_preimage.idl
index 2a34bcc0512..5128a530c5c 100644
--- a/src/mongo/db/pipeline/change_stream_preimage.idl
+++ b/src/mongo/db/pipeline/change_stream_preimage.idl
@@ -34,6 +34,25 @@ global:
imports:
- "mongo/idl/basic_types.idl"
+server_parameters:
+ expiredChangeStreamPreImageRemovalJobSleepSecs:
+ description: >-
+ Specifies the period within which expired pre-image removal job is running.
+ set_at: [ startup ]
+ cpp_vartype: AtomicWord<int>
+ cpp_varname: gExpiredChangeStreamPreImageRemovalJobSleepSecs
+ validator:
+ gte: 1
+ default: 10
+
+ isChangeStreamExpiredPreImageRemovalJobReplicating:
+ description: >-
+ Specifies if the expired pre-images removal job is replicating the delete operations.
+ set_at: [ startup ]
+ cpp_vartype: AtomicWord<bool>
+ cpp_varname: gIsChangeStreamExpiredPreImageRemovalJobReplicating
+ default: true
+
structs:
ChangeStreamPreImageId:
description: Uniquely identifies a pre-image for a given node or replica set.
diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h
index 027b14af6df..a2aaff9153b 100644
--- a/src/mongo/db/repl/storage_interface.h
+++ b/src/mongo/db/repl/storage_interface.h
@@ -357,6 +357,11 @@ public:
OperationContext* opCtx, const CollectionPtr& oplog, const Timestamp& timestamp) = 0;
/**
+ * Fetches the oldest oplog entry's timestamp.
+ */
+ virtual Timestamp getEarliestOplogTimestamp(OperationContext* opCtx) = 0;
+
+ /**
* Fetches the latest oplog entry's timestamp. Bypasses the oplog visibility rules.
*/
virtual Timestamp getLatestOplogTimestamp(OperationContext* opCtx) = 0;
diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp
index 0a49be47d85..d21812000d7 100644
--- a/src/mongo/db/repl/storage_interface_impl.cpp
+++ b/src/mongo/db/repl/storage_interface_impl.cpp
@@ -1222,6 +1222,39 @@ boost::optional<BSONObj> StorageInterfaceImpl::findOplogEntryLessThanOrEqualToTi
}
}
+Timestamp StorageInterfaceImpl::getEarliestOplogTimestamp(OperationContext* opCtx) {
+ auto statusWithTimestamp = [&]() {
+ AutoGetOplog oplogRead(opCtx, OplogAccessMode::kRead);
+ return oplogRead.getCollection()->getRecordStore()->getEarliestOplogTimestamp(opCtx);
+ }();
+
+ // If the storage engine does not support getEarliestOplogTimestamp(), then fall back to higher
+ // level (above the storage engine) logic to fetch the earliest oplog entry timestamp.
+ if (statusWithTimestamp.getStatus() == ErrorCodes::OplogOperationUnsupported) {
+ // Reset the snapshot so that it is ensured to see the latest oplog entries.
+ opCtx->recoveryUnit()->abandonSnapshot();
+
+ BSONObj oplogEntryBSON;
+ tassert(5869100,
+ "Failed reading the earliest oplog entry",
+ Helpers::getSingleton(
+ opCtx, NamespaceString::kRsOplogNamespace.ns().c_str(), oplogEntryBSON));
+
+ auto optime = OpTime::parseFromOplogEntry(oplogEntryBSON);
+ tassert(5869101,
+ str::stream() << "Found an invalid oplog entry: " << oplogEntryBSON
+ << ", error: " << optime.getStatus(),
+ optime.isOK());
+ return optime.getValue().getTimestamp();
+ }
+
+ tassert(5869102,
+ str::stream() << "Expected oplog entries to exist: " << statusWithTimestamp.getStatus(),
+ statusWithTimestamp.isOK());
+
+ return statusWithTimestamp.getValue();
+}
+
Timestamp StorageInterfaceImpl::getLatestOplogTimestamp(OperationContext* opCtx) {
auto statusWithTimestamp = [&]() {
AutoGetOplog oplogRead(opCtx, OplogAccessMode::kRead);
diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h
index f99b3a0c094..3dce1e53c61 100644
--- a/src/mongo/db/repl/storage_interface_impl.h
+++ b/src/mongo/db/repl/storage_interface_impl.h
@@ -156,6 +156,7 @@ public:
boost::optional<BSONObj> findOplogEntryLessThanOrEqualToTimestampRetryOnWCE(
OperationContext* opCtx, const CollectionPtr& oplog, const Timestamp& timestamp) override;
+ Timestamp getEarliestOplogTimestamp(OperationContext* opCtx) override;
Timestamp getLatestOplogTimestamp(OperationContext* opCtx) override;
StatusWith<StorageInterface::CollectionSize> getCollectionSize(
diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h
index 97b0a312584..747f82776c5 100644
--- a/src/mongo/db/repl/storage_interface_mock.h
+++ b/src/mongo/db/repl/storage_interface_mock.h
@@ -287,6 +287,10 @@ public:
return boost::none;
}
+ Timestamp getEarliestOplogTimestamp(OperationContext* opCtx) override {
+ return Timestamp();
+ }
+
Timestamp getLatestOplogTimestamp(OperationContext* opCtx) override {
return Timestamp();
}