summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLouis Williams <louis.williams@mongodb.com>2022-12-29 20:41:13 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-12-29 21:13:44 +0000
commit96573561ca19babfc0fa5f535c6cec095e703ebc (patch)
treeb82a8c064b3584154efd3122f5f36fb0c928d814
parente836af153c045bee380646c9c8f3715cabfe73ed (diff)
downloadmongo-96573561ca19babfc0fa5f535c6cec095e703ebc.tar.gz
SERVER-66283 Correct tailable cursor support on unreplicated capped collections
This corrects previously incorrect behavior where tailable cursors on unreplicated capped collections like the profile collection could miss writes This pushes non-oplog capped collection visibility logic outside of the storage engine layer
-rw-r--r--jstests/core/local_tail_capped.js148
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/catalog/SConscript18
-rw-r--r--src/mongo/db/catalog/capped_collection_maintenance.cpp15
-rw-r--r--src/mongo/db/catalog/capped_collection_test.cpp514
-rw-r--r--src/mongo/db/catalog/capped_visibility.cpp179
-rw-r--r--src/mongo/db/catalog/capped_visibility.h249
-rw-r--r--src/mongo/db/catalog/capped_visibility_test.cpp380
-rw-r--r--src/mongo/db/catalog/collection.h28
-rw-r--r--src/mongo/db/catalog/collection_catalog.cpp18
-rw-r--r--src/mongo/db/catalog/collection_catalog.h1
-rw-r--r--src/mongo/db/catalog/collection_impl.cpp82
-rw-r--r--src/mongo/db/catalog/collection_impl.h17
-rw-r--r--src/mongo/db/catalog/collection_mock.h22
-rw-r--r--src/mongo/db/catalog/collection_write_path.cpp14
-rw-r--r--src/mongo/db/catalog/virtual_collection_impl.h25
-rw-r--r--src/mongo/db/db_raii.cpp20
-rw-r--r--src/mongo/db/repl/oplog.h4
-rw-r--r--src/mongo/db/storage/SConscript12
-rw-r--r--src/mongo/db/storage/capped_snapshots.cpp69
-rw-r--r--src/mongo/db/storage/capped_snapshots.h75
-rw-r--r--src/mongo/db/storage/devnull/devnull_kv_engine.cpp8
-rw-r--r--src/mongo/db/storage/devnull/ephemeral_catalog_record_store.h4
-rw-r--r--src/mongo/db/storage/external_record_store.h6
-rw-r--r--src/mongo/db/storage/record_store.h8
-rw-r--r--src/mongo/db/storage/recovery_unit.h1
-rw-r--r--src/mongo/db/storage/wiredtiger/SConscript1
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp67
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h8
29 files changed, 1962 insertions, 32 deletions
diff --git a/jstests/core/local_tail_capped.js b/jstests/core/local_tail_capped.js
new file mode 100644
index 00000000000..613eb58e669
--- /dev/null
+++ b/jstests/core/local_tail_capped.js
@@ -0,0 +1,148 @@
+/**
+ * This test tests concurrent read and write behavior for tailable cursors on unreplicated capped
+ * collections. These collections accept concurrent writes and thus must ensure no documents are
+ * skipped for forward cursors.
+ *
+ * This test sets up a single capped collection with many concurrent writers. Concurrent readers
+ * open tailable cursors and clone the contents into their own collection copies. The readers then
+ * assert that the contents match the source.
+ *
+ * @tags: [
+ * assumes_against_mongod_not_mongos,
+ * does_not_support_retryable_writes,
+ * requires_capped,
+ * requires_non_retryable_writes,
+ * # Tailable cursors do not work correctly on previous versions.
+ * requires_fcv_63,
+ * ]
+ */
+
+load("jstests/libs/parallelTester.js"); // For Thread
+
+(function() {
+'use strict';
+
+function insertWorker(host, collName, tid, nInserts) {
+ const conn = new Mongo(host);
+ const db = conn.getDB('local');
+
+ for (let i = 0; i < nInserts;) {
+ const bulk = db[collName].initializeUnorderedBulkOp();
+ for (let j = 0; j < 10; j++) {
+ bulk.insert({t: tid, i: i++});
+ }
+ assert.commandWorked(bulk.execute());
+ }
+ print(tid + ": done");
+}
+
+function tailWorker(host, collName, tid, expectedDocs) {
+ // Rewrite the connection string as a mongo URI so that we can add an 'appName' to make
+ // debugging easier. When run against a standalone, 'host' is in the form '<host>:<port>'. When
+ // run against a replica set, 'host' is in the form '<rs name>/<host1>:<port1>,...'
+ const iSlash = host.indexOf('/');
+ let connString = 'mongodb://';
+ if (iSlash > 0) {
+ connString += host.substr(iSlash + 1) + '/?appName=tid' + tid +
+ '&replicaSet=' + host.substr(0, iSlash);
+ } else {
+ connString += host + '/?appName=tid' + tid;
+ }
+ const conn = new Mongo(connString);
+ const db = conn.getDB('local');
+ const cloneColl = db[collName + "_clone_" + tid];
+ cloneColl.drop();
+
+ let res = db.runCommand({find: collName, batchSize: 0, awaitData: true, tailable: true});
+ assert.commandWorked(res);
+ assert.gt(res.cursor.id, NumberLong(0));
+ assert.eq(res.cursor.firstBatch.length, 0);
+
+ const curId = res.cursor.id;
+
+ let myCount = 0;
+ let emptyGetMores = 0;
+ let nonEmptyGetMores = 0;
+ assert.soon(() => {
+ res = db.runCommand({getMore: curId, collection: collName, maxTimeMS: 1000});
+ assert.commandWorked(res);
+
+ const batchLen = res.cursor.nextBatch.length;
+ if (batchLen > 0) {
+ nonEmptyGetMores++;
+ } else {
+ emptyGetMores++;
+ }
+
+ print(tid + ': got batch of size ' + batchLen +
+ '. first doc: ' + tojson(res.cursor.nextBatch[0]) +
+ '. last doc: ' + tojson(res.cursor.nextBatch[batchLen - 1]) +
+ '. empty getMores so far: ' + emptyGetMores +
+ '. non-empty getMores so far: ' + nonEmptyGetMores);
+ myCount += batchLen;
+
+ const bulk = cloneColl.initializeUnorderedBulkOp();
+ for (let i = 0; i < batchLen; i++) {
+ bulk.insert(res.cursor.nextBatch[i]);
+ }
+ assert.commandWorked(bulk.execute());
+
+ // The writers are done, so we are draining until we see as many docs as we
+ // expect.
+ if (myCount == expectedDocs) {
+ return true;
+ } else {
+ print(tid + ": waiting. my count: " + myCount + " expected: " + expectedDocs);
+ }
+ return false;
+ }, "failed to return all documents within timeout");
+
+ print(tid + ": validating");
+ const expected = db[collName].find().sort({_id: 1}).toArray();
+ const actual = cloneColl.find().sort({_id: 1}).toArray();
+ assert.eq(expected.length, actual.length, function() {
+ return "number of documents do not match. expected: " + tojson(expected) +
+ " actual: " + tojson(actual);
+ });
+ for (let i = 0; i < actual.length; i++) {
+ assert.docEq(actual[i], expected[i], function() {
+ return "mismatched documents. expected: " + tojson(expected) +
+ " actual: " + tojson(actual);
+ });
+ }
+ print(tid + ": done");
+}
+
+const collName = 'capped';
+const localDb = db.getSiblingDB('local');
+localDb[collName].drop();
+
+assert.commandWorked(localDb.runCommand({create: collName, capped: true, size: 10 * 1024 * 1024}));
+assert.commandWorked(localDb[collName].insert({firstDoc: 1, i: -1}));
+
+const nWriters = 5;
+const nReaders = 5;
+
+const insertsPerThread = 1000;
+const expectedDocs = nWriters * insertsPerThread + 1;
+
+let threads = [];
+
+for (let i = 0; i < nReaders; i++) {
+ const thread =
+ new Thread(tailWorker, db.getMongo().host, collName, threads.length, expectedDocs);
+ thread.start();
+ threads.push(thread);
+}
+
+for (let i = 0; i < nWriters; i++) {
+ const thread =
+ new Thread(insertWorker, db.getMongo().host, collName, threads.length, insertsPerThread);
+ thread.start();
+ threads.push(thread);
+}
+
+for (let i = 0; i < threads.length; i++) {
+ threads[i].join();
+}
+})();
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 71d3d5ad99a..29cb1ff18f7 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -877,6 +877,7 @@ env.Library(
'curop',
'multitenancy',
'server_base',
+ 'storage/capped_snapshots',
'storage/snapshot_helper',
],
)
diff --git a/src/mongo/db/catalog/SConscript b/src/mongo/db/catalog/SConscript
index 366924b6e56..f392123499e 100644
--- a/src/mongo/db/catalog/SConscript
+++ b/src/mongo/db/catalog/SConscript
@@ -224,6 +224,7 @@ env.Library(
'$BUILD_DIR/mongo/db/curop',
'$BUILD_DIR/mongo/db/record_id_helpers',
'$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
+ '$BUILD_DIR/mongo/db/storage/capped_snapshots',
'$BUILD_DIR/mongo/db/storage/record_store_base',
'$BUILD_DIR/mongo/db/storage/storage_options',
'$BUILD_DIR/mongo/db/storage/write_unit_of_work',
@@ -307,6 +308,7 @@ env.Library(
'$BUILD_DIR/mongo/db/server_options_core',
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/db/storage/bson_collection_catalog_entry',
+ '$BUILD_DIR/mongo/db/storage/capped_snapshots',
'$BUILD_DIR/mongo/db/storage/snapshot_helper',
'$BUILD_DIR/mongo/db/storage/storage_options',
'$BUILD_DIR/mongo/db/views/util',
@@ -401,6 +403,7 @@ env.Library(
'$BUILD_DIR/mongo/db/server_options_core',
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/db/shard_role',
+ '$BUILD_DIR/mongo/db/storage/capped_snapshots',
'$BUILD_DIR/mongo/db/storage/durable_catalog_impl',
'$BUILD_DIR/mongo/db/storage/execution_context',
'$BUILD_DIR/mongo/db/storage/historical_ident_tracker',
@@ -415,6 +418,7 @@ env.Library(
'$BUILD_DIR/mongo/db/ttl_collection_cache',
'$BUILD_DIR/mongo/db/vector_clock',
'$BUILD_DIR/mongo/db/views/view_catalog_helpers',
+ 'capped_visibility',
'catalog_helpers',
'catalog_stats',
'clustered_collection_options',
@@ -492,6 +496,17 @@ env.Library(
)
env.Library(
+ target='capped_visibility',
+ source=[
+ 'capped_visibility.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/db/storage/recovery_unit_base',
+ ],
+)
+
+env.Library(
target='local_oplog_info',
source=[
'local_oplog_info.cpp',
@@ -638,6 +653,8 @@ if wiredtiger:
env.CppUnitTest(
target='db_catalog_test',
source=[
+ 'capped_collection_test.cpp',
+ 'capped_visibility_test.cpp',
'capped_utils_test.cpp',
'catalog_control_test.cpp',
'collection_catalog_test.cpp',
@@ -689,6 +706,7 @@ if wiredtiger:
'$BUILD_DIR/mongo/util/clock_source_mock',
'$BUILD_DIR/mongo/util/fail_point',
'$BUILD_DIR/mongo/util/pcre_wrapper',
+ 'capped_visibility',
'catalog_control',
'catalog_helpers',
'catalog_impl',
diff --git a/src/mongo/db/catalog/capped_collection_maintenance.cpp b/src/mongo/db/catalog/capped_collection_maintenance.cpp
index 1c79fad50ef..2c7e36160cc 100644
--- a/src/mongo/db/catalog/capped_collection_maintenance.cpp
+++ b/src/mongo/db/catalog/capped_collection_maintenance.cpp
@@ -30,20 +30,29 @@
#include "mongo/db/catalog/capped_collection_maintenance.h"
#include "mongo/db/op_observer/op_observer.h"
-
+#include "mongo/db/storage/capped_snapshots.h"
namespace mongo {
namespace collection_internal {
namespace {
class CappedDeleteSideTxn {
public:
- CappedDeleteSideTxn(OperationContext* opCtx) : _opCtx(opCtx) {
+ CappedDeleteSideTxn(OperationContext* opCtx, const CollectionPtr& collection) : _opCtx(opCtx) {
_originalRecoveryUnit = _opCtx->releaseRecoveryUnit().release();
invariant(_originalRecoveryUnit);
_originalRecoveryUnitState = _opCtx->setRecoveryUnit(
std::unique_ptr<RecoveryUnit>(
_opCtx->getServiceContext()->getStorageEngine()->newRecoveryUnit()),
WriteUnitOfWork::RecoveryUnitState::kNotInUnitOfWork);
+
+ if (collection->usesCappedSnapshots()) {
+ // As is required by the API, we need to establish the capped visibility snapshot for
+ // this cursor on the new RecoveryUnit. This ensures we don't delete any records that
+ // come sequentially after any uncommitted records, which could mean we aren't actually
+ // deleting the oldest entry as we should. This is mostly a technicality and would only
+ // be an observable problem on capped collections with small limits.
+ CappedSnapshots::get(opCtx->recoveryUnit()).establish(opCtx, collection);
+ }
}
~CappedDeleteSideTxn() {
@@ -104,7 +113,7 @@ void cappedDeleteUntilBelowConfiguredMaximum(OperationContext* opCtx,
if (!collection->needsCappedLock()) {
// Any capped deletes not performed under the capped lock need to commit the innermost
// WriteUnitOfWork while 'cappedFirstRecordMutex' is locked.
- cappedDeleteSideTxn.emplace(opCtx);
+ cappedDeleteSideTxn.emplace(opCtx, collection);
}
const long long currentDataSize = collection->dataSize(opCtx);
diff --git a/src/mongo/db/catalog/capped_collection_test.cpp b/src/mongo/db/catalog/capped_collection_test.cpp
new file mode 100644
index 00000000000..420a48d70c9
--- /dev/null
+++ b/src/mongo/db/catalog/capped_collection_test.cpp
@@ -0,0 +1,514 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/catalog/capped_visibility.h"
+#include "mongo/db/catalog/catalog_test_fixture.h"
+#include "mongo/db/catalog/collection.h"
+#include "mongo/db/catalog/collection_write_path.h"
+#include "mongo/db/concurrency/lock_state.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/record_id_helpers.h"
+#include "mongo/db/repl/replication_coordinator_mock.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+class CappedCollectionTest : public ServiceContextMongoDTest {
+public:
+ CappedCollectionTest() : ServiceContextMongoDTest(Options().engine("wiredTiger")) {}
+
+protected:
+ void setUp() override {
+ // Set up mongod.
+ ServiceContextMongoDTest::setUp();
+
+ auto service = getServiceContext();
+ _storage = std::make_unique<repl::StorageInterfaceImpl>();
+
+ // Set up ReplicationCoordinator and ensure that we are primary.
+ auto replCoord = std::make_unique<repl::ReplicationCoordinatorMock>(service);
+ ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY));
+ repl::ReplicationCoordinator::set(service, std::move(replCoord));
+
+ // Set up oplog collection. If the WT storage engine is used, the oplog collection is
+ // expected to exist when fetching the next opTime (LocalOplogInfo::getNextOpTimes) to use
+ // for a write.
+ {
+ auto opCtx = newOperationContext();
+ repl::createOplog(opCtx.get());
+ }
+ }
+
+
+ void makeCapped(NamespaceString nss, long long cappedSize = 8192) {
+ CollectionOptions options;
+ options.capped = true;
+ options.cappedSize = cappedSize; // Maximum size of capped collection in bytes.
+ bool createIdIndex = false;
+ auto opCtx = newOperationContext();
+ ASSERT_OK(storageInterface()->createCollection(opCtx.get(), nss, options, createIdIndex));
+ }
+
+ ServiceContext::UniqueOperationContext newOperationContext() {
+ return Client::getCurrent()->makeOperationContext();
+ }
+
+ typedef std::pair<ServiceContext::UniqueClient, ServiceContext::UniqueOperationContext>
+ ClientAndCtx;
+
+ ClientAndCtx makeClientAndCtx(const std::string& clientName) {
+ auto client = getServiceContext()->makeClient(clientName);
+ auto opCtx = client->makeOperationContext();
+ client->swapLockState(std::make_unique<LockerImpl>(getServiceContext()));
+ return std::make_pair(std::move(client), std::move(opCtx));
+ }
+
+ repl::StorageInterface* storageInterface() {
+ return _storage.get();
+ }
+
+ std::unique_ptr<repl::StorageInterface> _storage;
+};
+
+template <typename T>
+void assertSwError(StatusWith<T> sw, ErrorCodes::Error code) {
+ ASSERT_EQ(sw.getStatus().code(), code);
+}
+
+Status insertBSON(OperationContext* opCtx, const NamespaceString& nss, RecordId id) {
+ AutoGetCollection ac(opCtx, nss, MODE_IX);
+ const CollectionPtr& coll = ac.getCollection();
+ BSONObj obj = BSON("a" << 1);
+ WriteUnitOfWork wuow(opCtx);
+
+ auto cappedObserver = coll->getCappedVisibilityObserver();
+ cappedObserver->registerWriter(opCtx->recoveryUnit());
+
+ coll->registerCappedInsert(opCtx, id);
+ auto status =
+ collection_internal::insertDocument(opCtx, coll, InsertStatement(obj, id), nullptr);
+ if (!status.isOK()) {
+ return status;
+ }
+ wuow.commit();
+ return Status::OK();
+}
+
+Status _insertBSON(OperationContext* opCtx, const CollectionPtr& coll, RecordId id) {
+ BSONObj obj = BSON("a" << 1);
+ auto cappedObserver = coll->getCappedVisibilityObserver();
+ cappedObserver->registerWriter(opCtx->recoveryUnit());
+ coll->registerCappedInsert(opCtx, id);
+ return collection_internal::insertDocument(opCtx, coll, InsertStatement(obj), nullptr);
+}
+
+TEST_F(CappedCollectionTest, SeekNear) {
+ NamespaceString nss("local.non.oplog");
+ makeCapped(nss);
+
+ {
+ auto opCtx = newOperationContext();
+ ASSERT_OK(insertBSON(opCtx.get(), nss, RecordId(1)));
+ ASSERT_OK(insertBSON(opCtx.get(), nss, RecordId(2)));
+ ASSERT_OK(insertBSON(opCtx.get(), nss, RecordId(3)));
+ ASSERT_OK(insertBSON(opCtx.get(), nss, RecordId(4)));
+ }
+
+ {
+ // Delete the first and third so that we have some gaps to use for inexact seeks.
+ auto opCtx(newOperationContext());
+ AutoGetCollection ac(opCtx.get(), nss, MODE_IX);
+ const CollectionPtr& coll = ac.getCollection();
+ WriteUnitOfWork wuow(opCtx.get());
+ collection_internal::deleteDocument(
+ opCtx.get(), coll, kUninitializedStmtId, RecordId(1), nullptr);
+ collection_internal::deleteDocument(
+ opCtx.get(), coll, kUninitializedStmtId, RecordId(3), nullptr);
+ wuow.commit();
+ }
+
+ // Forward cursor seeks
+ {
+ // Seek to a non-existent record and expect to land on the first record because no previous
+ // record exists.
+ auto opCtx(newOperationContext());
+ AutoGetCollection ac(opCtx.get(), nss, MODE_IX);
+ const CollectionPtr& coll = ac.getCollection();
+ WriteUnitOfWork wuow(opCtx.get());
+ auto cur = coll->getCursor(opCtx.get());
+ auto rec = cur->seekNear(RecordId(1));
+ ASSERT(rec);
+ ASSERT_EQ(rec->id, RecordId(2));
+ }
+
+ {
+ // Seek to a non-existent record and expect to land on the logically previous record.
+ auto opCtx(newOperationContext());
+ AutoGetCollection ac(opCtx.get(), nss, MODE_IX);
+ const CollectionPtr& coll = ac.getCollection();
+ WriteUnitOfWork wuow(opCtx.get());
+ auto cur = coll->getCursor(opCtx.get());
+ auto rec = cur->seekNear(RecordId(3));
+ ASSERT(rec);
+ ASSERT_EQ(rec->id, RecordId(2));
+ }
+
+ {
+ // Seek exactly.
+ auto opCtx(newOperationContext());
+ AutoGetCollection ac(opCtx.get(), nss, MODE_IX);
+ const CollectionPtr& coll = ac.getCollection();
+ WriteUnitOfWork wuow(opCtx.get());
+ auto cur = coll->getCursor(opCtx.get());
+ auto rec = cur->seekNear(RecordId(4));
+ ASSERT(rec);
+ ASSERT_EQ(rec->id, RecordId(4));
+ }
+
+ {
+ // Seek to a non-existent record and expect to land on the logically-previous record, which
+ // is the last record.
+ auto opCtx(newOperationContext());
+ AutoGetCollection ac(opCtx.get(), nss, MODE_IX);
+ const CollectionPtr& coll = ac.getCollection();
+ WriteUnitOfWork wuow(opCtx.get());
+ auto cur = coll->getCursor(opCtx.get());
+ auto rec = cur->seekNear(RecordId(5));
+ ASSERT(rec);
+ ASSERT_EQ(rec->id, RecordId(4));
+ }
+
+ // Reverse cursor seeks
+ {
+ // Seek to a non-existent record and expect to land on the logically-previous record, which
+ // is the first record.
+ auto opCtx(newOperationContext());
+ AutoGetCollection ac(opCtx.get(), nss, MODE_IX);
+ const CollectionPtr& coll = ac.getCollection();
+ WriteUnitOfWork wuow(opCtx.get());
+ auto cur = coll->getCursor(opCtx.get(), false /* forward */);
+ auto rec = cur->seekNear(RecordId(1));
+ ASSERT(rec);
+ ASSERT_EQ(rec->id, RecordId(2));
+ }
+
+ {
+ // Seek exactly.
+ auto opCtx(newOperationContext());
+ AutoGetCollection ac(opCtx.get(), nss, MODE_IX);
+ const CollectionPtr& coll = ac.getCollection();
+ WriteUnitOfWork wuow(opCtx.get());
+ auto cur = coll->getCursor(opCtx.get(), false /* forward */);
+ auto rec = cur->seekNear(RecordId(2));
+ ASSERT(rec);
+ ASSERT_EQ(rec->id, RecordId(2));
+ }
+
+ {
+ // Seek to a non-existent record and expect to land on the logically previous record.
+ auto opCtx(newOperationContext());
+ AutoGetCollection ac(opCtx.get(), nss, MODE_IX);
+ const CollectionPtr& coll = ac.getCollection();
+ WriteUnitOfWork wuow(opCtx.get());
+ auto cur = coll->getCursor(opCtx.get(), false /* forward */);
+ auto rec = cur->seekNear(RecordId(3));
+ ASSERT(rec);
+ ASSERT_EQ(rec->id, RecordId(4));
+ }
+
+ {
+ auto opCtx(newOperationContext());
+ AutoGetCollection ac(opCtx.get(), nss, MODE_IX);
+ const CollectionPtr& coll = ac.getCollection();
+ WriteUnitOfWork wuow(opCtx.get());
+ auto cur = coll->getCursor(opCtx.get(), false /* forward */);
+ auto rec = cur->seekNear(RecordId(5));
+ ASSERT(rec);
+ ASSERT_EQ(rec->id, RecordId(4));
+ }
+
+
+ {
+ // Delete the remaining records.
+ auto opCtx(newOperationContext());
+ AutoGetCollection ac(opCtx.get(), nss, MODE_IX);
+ const CollectionPtr& coll = ac.getCollection();
+ WriteUnitOfWork wuow(opCtx.get());
+ collection_internal::deleteDocument(
+ opCtx.get(), coll, kUninitializedStmtId, RecordId(2), nullptr);
+ collection_internal::deleteDocument(
+ opCtx.get(), coll, kUninitializedStmtId, RecordId(4), nullptr);
+ wuow.commit();
+ }
+
+ {
+ auto opCtx(newOperationContext());
+ AutoGetCollection ac(opCtx.get(), nss, MODE_IX);
+ const CollectionPtr& coll = ac.getCollection();
+ auto cur = coll->getCursor(opCtx.get());
+ auto rec = cur->seekNear(RecordId(2));
+ ASSERT_FALSE(rec);
+ rec = cur->seekNear(RecordId(4));
+ ASSERT_FALSE(rec);
+ }
+}
+
+TEST_F(CappedCollectionTest, InsertOutOfOrder) {
+ NamespaceString nss("local.non.oplog");
+ makeCapped(nss);
+ {
+ // RecordId's are inserted out-of-order.
+ auto opCtx = newOperationContext();
+ ASSERT_OK(insertBSON(opCtx.get(), nss, RecordId(1)));
+ ASSERT_OK(insertBSON(opCtx.get(), nss, RecordId(3)));
+ ASSERT_OK(insertBSON(opCtx.get(), nss, RecordId(2)));
+ }
+
+ {
+ auto opCtx = newOperationContext();
+ AutoGetCollectionForRead acfr(opCtx.get(), nss);
+ const CollectionPtr& coll = acfr.getCollection();
+ auto cursor = coll->getCursor(opCtx.get());
+ ASSERT_EQ(cursor->next()->id, RecordId(1));
+ ASSERT_EQ(cursor->next()->id, RecordId(2));
+ ASSERT_EQ(cursor->next()->id, RecordId(3));
+ ASSERT(!cursor->next());
+ }
+}
+
+TEST_F(CappedCollectionTest, OplogOrder) {
+ NamespaceString nss("local.non.oplog");
+ makeCapped(nss);
+
+ auto id1 = RecordId(1);
+
+ { // first insert a document
+ ServiceContext::UniqueOperationContext opCtx(newOperationContext());
+ ASSERT_OK(insertBSON(opCtx.get(), nss, id1));
+ }
+
+ {
+ ServiceContext::UniqueOperationContext opCtx(newOperationContext());
+ AutoGetCollectionForRead ac(opCtx.get(), nss);
+ const CollectionPtr& coll = ac.getCollection();
+ auto cursor = coll->getCursor(opCtx.get());
+ auto record = cursor->seekExact(id1);
+ ASSERT(record);
+ ASSERT_EQ(id1, record->id);
+ ASSERT(!cursor->next());
+ }
+
+ {
+ ServiceContext::UniqueOperationContext opCtx(newOperationContext());
+ AutoGetCollectionForRead ac(opCtx.get(), nss);
+ const CollectionPtr& coll = ac.getCollection();
+ auto cursor = coll->getCursor(opCtx.get());
+ auto record = cursor->seekNear(RecordId(id1.getLong() + 1));
+ ASSERT(record);
+ ASSERT_EQ(id1, record->id);
+ ASSERT(!cursor->next());
+ }
+
+ {
+ // now we insert 2 docs, but commit the 2nd one first.
+ // we make sure we can't find the 2nd until the first is committed.
+ auto [earlyClient, earlyCtx] = makeClientAndCtx("earlyReader");
+ AutoGetCollectionForRead ac(earlyCtx.get(), nss);
+ const CollectionPtr& coll = ac.getCollection();
+
+ auto earlyCursor = coll->getCursor(earlyCtx.get());
+ ASSERT_EQ(earlyCursor->seekExact(id1)->id, id1);
+ coll.yield();
+ earlyCursor->save();
+ earlyCtx->recoveryUnit()->abandonSnapshot();
+
+ auto [c1, t1] = makeClientAndCtx("t1");
+ AutoGetCollection ac1(t1.get(), nss, MODE_IX);
+ WriteUnitOfWork w1(t1.get());
+ auto id2 = RecordId(20);
+ ASSERT_OK(_insertBSON(t1.get(), ac1.getCollection(), id2));
+ // do not commit yet
+
+ auto id3 = RecordId(30);
+ { // create 2nd doc
+ auto t2 = newOperationContext();
+ AutoGetCollection ac2(t2.get(), nss, MODE_IX);
+ {
+ WriteUnitOfWork w2(t2.get());
+ ASSERT_OK(_insertBSON(t2.get(), ac2.getCollection(), id3));
+ w2.commit();
+ }
+ }
+
+ { // Other operations should not be able to see 2nd doc until w1 commits.
+ coll.restore();
+ earlyCursor->restore();
+ ASSERT(!earlyCursor->next());
+ }
+
+ {
+ auto [c2, t2] = makeClientAndCtx("t2");
+ AutoGetCollectionForRead ac2(t2.get(), nss);
+ auto cursor = ac2.getCollection()->getCursor(t2.get());
+ auto record = cursor->seekExact(id1);
+ ASSERT(record);
+ ASSERT_EQ(id1, record->id);
+ ASSERT(!cursor->next());
+ }
+
+ {
+ auto [c2, t2] = makeClientAndCtx("t2");
+ AutoGetCollectionForRead ac2(t2.get(), nss);
+ auto cursor = coll->getCursor(t2.get());
+ auto record = cursor->seekNear(id2);
+ ASSERT(record);
+ ASSERT_EQ(id1, record->id);
+ ASSERT(!cursor->next());
+ }
+
+ {
+ auto [c2, t2] = makeClientAndCtx("t2");
+ AutoGetCollectionForRead ac2(t2.get(), nss);
+ auto cursor = coll->getCursor(t2.get());
+ auto record = cursor->seekNear(id3);
+ ASSERT(record);
+ ASSERT_EQ(id1, record->id);
+ ASSERT(!cursor->next());
+ }
+
+ w1.commit();
+ }
+
+ { // now all 3 docs should be visible
+ auto opCtx = newOperationContext();
+ AutoGetCollectionForRead ac(opCtx.get(), nss);
+ const CollectionPtr& coll = ac.getCollection();
+ auto cursor = coll->getCursor(opCtx.get());
+ auto record = cursor->seekExact(id1);
+ ASSERT_EQ(id1, record->id);
+ ASSERT(cursor->next());
+ ASSERT(cursor->next());
+ ASSERT(!cursor->next());
+ }
+
+ // Rollback the last two writes entries, then insert entries with older RecordIds to ensure that
+ // the visibility rules aren't violated.
+ {
+ auto opCtx = newOperationContext();
+ AutoGetCollectionForRead ac(opCtx.get(), nss);
+ const CollectionPtr& coll = ac.getCollection();
+ coll->getRecordStore()->cappedTruncateAfter(
+ opCtx.get(), id1, /*inclusive*/ false, [](auto _1, auto _2, auto _3) {});
+ }
+
+ {
+ // Now we insert 2 docs with timestamps earlier than before, but commit the 2nd one first.
+ // We make sure we can't find the 2nd until the first is committed.
+ auto [earlyClient, earlyCtx] = makeClientAndCtx("earlyReader");
+ AutoGetCollectionForRead ac(earlyCtx.get(), nss);
+ const CollectionPtr& coll = ac.getCollection();
+ auto earlyCursor = coll->getCursor(earlyCtx.get());
+ ASSERT_EQ(earlyCursor->seekExact(id1)->id, id1);
+ coll.yield();
+ earlyCursor->save();
+ earlyCtx->recoveryUnit()->abandonSnapshot();
+
+ auto [c1, t1] = makeClientAndCtx("t1");
+ AutoGetCollection ac1(t1.get(), nss, MODE_IX);
+ WriteUnitOfWork w1(t1.get());
+ auto id2 = RecordId(2);
+ ASSERT_OK(_insertBSON(t1.get(), ac1.getCollection(), id2));
+
+ // do not commit yet
+
+ auto id3 = RecordId(3);
+ { // create 2nd doc
+ auto t2 = newOperationContext();
+ AutoGetCollection ac2(t2.get(), nss, MODE_IX);
+ {
+ WriteUnitOfWork w2(t2.get());
+ ASSERT_OK(_insertBSON(t2.get(), ac2.getCollection(), id3));
+ w2.commit();
+ }
+ }
+
+ // Other operations should not be able to see 2nd doc until w1 commits.
+ coll.restore();
+ ASSERT(earlyCursor->restore());
+ ASSERT(!earlyCursor->next());
+ {
+ auto [c2, t2] = makeClientAndCtx("t2");
+ AutoGetCollectionForRead ac2(t2.get(), nss);
+ auto cursor = coll->getCursor(t2.get());
+ auto record = cursor->seekExact(id1);
+ ASSERT(record);
+ ASSERT_EQ(id1, record->id);
+ ASSERT(!cursor->next());
+ }
+
+ {
+ auto [c2, t2] = makeClientAndCtx("t2");
+ AutoGetCollectionForRead ac2(t2.get(), nss);
+ auto cursor = coll->getCursor(t2.get());
+ auto record = cursor->seekNear(id2);
+ ASSERT(record);
+ ASSERT_EQ(id1, record->id);
+ ASSERT(!cursor->next());
+ }
+
+ {
+ auto [c2, t2] = makeClientAndCtx("t2");
+ AutoGetCollectionForRead ac2(t2.get(), nss);
+ auto cursor = coll->getCursor(t2.get());
+ auto record = cursor->seekNear(id3);
+ ASSERT(record);
+ ASSERT_EQ(id1, record->id);
+ ASSERT(!cursor->next());
+ }
+
+ w1.commit();
+ }
+
+ { // now all 3 docs should be visible
+ ServiceContext::UniqueOperationContext opCtx(newOperationContext());
+ AutoGetCollectionForRead ac(opCtx.get(), nss);
+ const CollectionPtr& coll = ac.getCollection();
+ auto cursor = coll->getCursor(opCtx.get());
+ auto record = cursor->seekExact(id1);
+ ASSERT_EQ(id1, record->id);
+ ASSERT(cursor->next());
+ ASSERT(cursor->next());
+ ASSERT(!cursor->next());
+ }
+}
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/catalog/capped_visibility.cpp b/src/mongo/db/catalog/capped_visibility.cpp
new file mode 100644
index 00000000000..fd2e8f10f73
--- /dev/null
+++ b/src/mongo/db/catalog/capped_visibility.cpp
@@ -0,0 +1,179 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kStorage
+
+#include "mongo/db/catalog/capped_visibility.h"
+
+#include "mongo/db/concurrency/exception_util.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/logv2/log.h"
+
+namespace mongo {
+
+namespace {
+const RecoveryUnit::Snapshot::Decoration<CappedWriter> getCappedWriters =
+ RecoveryUnit::Snapshot::declareDecoration<CappedWriter>();
+}
+
+CappedWriter& CappedWriter::get(RecoveryUnit* ru) {
+ return getCappedWriters(ru->getSnapshot());
+}
+
+CappedWriter& CappedWriter::get(OperationContext* opCtx) {
+ return getCappedWriters(opCtx->recoveryUnit()->getSnapshot());
+}
+
+CappedWriter::~CappedWriter() {
+ try {
+ // Signal that any uncommitted writes are either committed or aborted. Destruction of this
+ // CappedWriter is a direct result of the snapshot of the RecoveryUnit being destructed.
+ for (auto&& [_, uncommitted] : _identToUncommittedRecords) {
+ uncommitted->committedOrAborted();
+ }
+ } catch (...) {
+ LOGV2_FATAL(6628300,
+ "Caught exception destructing CappedWriter",
+ "exception"_attr = exceptionToStatus());
+ }
+}
+
+UncommittedRecords* CappedWriter::getUncommitedRecordsFor(const std::string& ident) {
+ auto& uncommitted = _identToUncommittedRecords[ident];
+ if (!uncommitted) {
+ uncommitted = std::make_unique<UncommittedRecords>();
+ }
+ return uncommitted.get();
+}
+
+void UncommittedRecords::registerRecordIds(const RecordId& min, const RecordId& max) {
+ // The only circumstances where we can correctly handle capped visibility are with integer
+ // RecordIds, since with clustered collections (string RecordIds) we don't control RecordId
+ // assignment.
+ invariant(min.isLong());
+ invariant(max.isLong());
+ invariant(max >= min);
+ // We can register multiple recordids on the same writer, but we update the min and max.
+ auto myMin = _min.load();
+ if (!myMin || min.getLong() < myMin) {
+ _min.store(min.getLong());
+ }
+ if (!_max || max.getLong() > _max) {
+ _max = max.getLong();
+ }
+}
+
+UncommittedRecords* CappedVisibilityObserver::registerWriter(
+ RecoveryUnit* recoveryUnit, UncommittedRecords::OnCommitOrAbortFn&& onCommitOrAbort) {
+ auto& writer = CappedWriter::get(recoveryUnit);
+ auto uncommitted = writer.getUncommitedRecordsFor(_ident);
+
+ // We have already been registered, so we should not insert a new entry.
+ if (uncommitted->getIterator().has_value()) {
+ return uncommitted;
+ }
+
+ uncommitted->onCommitOrAbort(std::move(onCommitOrAbort));
+
+ // Register ourselves as a writer with uncommitted records. We allocate a new single-element
+ // list outside the mutex, obtain an iterator, and then splice into the existing list under the
+ // mutex. The iterator remains valid even in the new list, and we can erase in constant time
+ // when the writer commits.
+ std::list<UncommittedRecords*> tmp;
+ auto it = tmp.insert(tmp.begin(), uncommitted);
+ uncommitted->setIterator(std::move(it));
+ {
+ stdx::unique_lock<Mutex> lk(_mutex);
+ _uncommittedRecords.splice(_uncommittedRecords.end(), tmp);
+ }
+
+ // Because CappedWriters is decorated on the RecoveryUnit and tied to its lifetime, we can
+ // capture the writer without risk of it dangling.
+ class CappedVisibilityChange : public RecoveryUnit::Change {
+ public:
+ CappedVisibilityChange(CappedVisibilityObserver* observer, CappedWriter* writer)
+ : _observer(observer), _writer(writer) {}
+ void commit(OperationContext* opCtx, boost::optional<Timestamp> commitTime) final {
+ _observer->_onWriterCommittedOrAborted(_writer, true /* commit */);
+ }
+ void rollback(OperationContext* opCtx) final {
+ _observer->_onWriterCommittedOrAborted(_writer, false /* commit */);
+ }
+
+ private:
+ CappedVisibilityObserver* _observer;
+ CappedWriter* _writer;
+ };
+
+ recoveryUnit->registerChange(std::make_unique<CappedVisibilityChange>(this, &writer));
+ return uncommitted;
+}
+
+void CappedVisibilityObserver::_onWriterCommittedOrAborted(CappedWriter* writer, bool committed) {
+ auto uncommitted = writer->getUncommitedRecordsFor(_ident);
+ invariant(uncommitted->getIterator());
+ auto min = uncommitted->getMinRecord();
+ auto max = uncommitted->getMaxRecord();
+
+ // Create a temporary list that we use to splice out the removed element and can deallocate
+ // outside of the mutex.
+ std::list<UncommittedRecords*> tmp;
+ {
+ stdx::unique_lock<Mutex> lk(_mutex);
+ // We only want to advance the highest record when a transaction commits.
+ if (committed) {
+ if (max > _highestRecord) {
+ _highestRecord = max;
+ }
+ }
+ tmp.splice(tmp.end(), _uncommittedRecords, *uncommitted->getIterator());
+ }
+}
+
+void CappedVisibilityObserver::setRecordImmediatelyVisible(const RecordId& rid) {
+ stdx::unique_lock<Mutex> lk(_mutex);
+ _highestRecord = rid;
+}
+
+CappedVisibilitySnapshot CappedVisibilityObserver::makeSnapshot() const {
+ stdx::unique_lock<Mutex> lk(_mutex);
+ return _makeSnapshot(lk);
+}
+
+CappedVisibilitySnapshot CappedVisibilityObserver::_makeSnapshot(WithLock wl) const {
+ RecordId lowestUncommitted;
+ for (auto&& uncommitted : _uncommittedRecords) {
+ auto min = uncommitted->getMinRecord();
+ if (lowestUncommitted.isNull() || (!min.isNull() && min < lowestUncommitted)) {
+ lowestUncommitted = min;
+ }
+ }
+ return {_highestRecord, lowestUncommitted};
+}
+} // namespace mongo
diff --git a/src/mongo/db/catalog/capped_visibility.h b/src/mongo/db/catalog/capped_visibility.h
new file mode 100644
index 00000000000..c8da8ee02c8
--- /dev/null
+++ b/src/mongo/db/catalog/capped_visibility.h
@@ -0,0 +1,249 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/record_id.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/util/decorable.h"
+
+namespace mongo {
+class OperationContext;
+class RecoveryUnit;
+class ServiceContext;
+
+/**
+ * A CappedVisibilitySnapshot represents a snapshot of the Records that should and should not be
+ * visible for a capped collection.
+ */
+class CappedVisibilitySnapshot {
+public:
+ CappedVisibilitySnapshot() = default;
+
+ CappedVisibilitySnapshot(const RecordId& highestRecord, const RecordId& lowestUncommitted)
+ : _highestRecord(highestRecord), _lowestUncommittedRecord(lowestUncommitted) {
+ invariant(_lowestUncommittedRecord.isNull() || _lowestUncommittedRecord.isLong());
+ invariant(_highestRecord.isNull() || _highestRecord.isLong());
+ }
+
+ /**
+ * Returns true if this RecordId is safely visible in our snapshot.
+ */
+ bool isRecordVisible(const RecordId& rid) const {
+ if (_lowestUncommittedRecord.isNull()) {
+ if (_highestRecord.isNull()) {
+ return true;
+ } else {
+ return rid <= _highestRecord;
+ }
+ }
+ return rid < _lowestUncommittedRecord;
+ }
+
+ /**
+ * Returns the highest RecordId that should be visible in our snapshot. May not represent a
+ * RecordId that exists.
+ */
+ RecordId getHighestVisible() const {
+ if (_lowestUncommittedRecord.isNull()) {
+ return _highestRecord;
+ }
+ return RecordId(_lowestUncommittedRecord.getLong() - 1);
+ }
+
+ RecordId _highestRecord;
+ RecordId _lowestUncommittedRecord;
+};
+
+/**
+ * UncommittedRecords hold RecordIds for uncommitted inserts into a capped collection by a single
+ * operation. Only valid for the duration of a storage snapshot on a single collection.
+ */
+class UncommittedRecords {
+public:
+ UncommittedRecords() = default;
+
+ /**
+ * Register a range of RecordIds as allocated and may be committed by this writer in the future.
+ * RecordIds must be of the long type. When registering a single RecordId, min and max must be
+ * the same.
+ */
+ void registerRecordIds(const RecordId& min, const RecordId& max);
+ void registerRecordId(const RecordId& id) {
+ registerRecordIds(id, id);
+ }
+
+ /**
+ * Returns the lowest uncommitted RecordId of this writer. This is thread safe.
+ */
+ RecordId getMinRecord() const {
+ return RecordId(_min.load());
+ }
+
+ /**
+ * Returns the highest uncommitted RecordId of this writer. This is not thread safe.
+ */
+ RecordId getMaxRecord() const {
+ return RecordId(_max);
+ }
+
+ using Iterator = boost::optional<std::list<UncommittedRecords*>::iterator>;
+
+ /**
+ * Returns an iterator to this writer's position in a list owned by the
+ * CappedVisibilityObserver.
+ */
+ Iterator& getIterator() {
+ return _it;
+ }
+
+ /**
+ * Set the iterator to this writer's position in a list.
+ */
+ void setIterator(Iterator&& it) {
+ _it = std::move(it);
+ }
+
+ /**
+ * Sets an optional function to be called when the uncommitted writes are either committed or
+ * aborted. The callback function must not throw.
+ */
+ using OnCommitOrAbortFn = std::function<void()>;
+ void onCommitOrAbort(OnCommitOrAbortFn&& fn) {
+ _onCommitOrAbort = std::move(fn);
+ }
+
+ void committedOrAborted() noexcept {
+ if (_onCommitOrAbort) {
+ _onCommitOrAbort();
+ }
+ }
+
+private:
+ /**
+ * This iterator is not thread safe and may only be modified by the writer itself. Points to
+ * this writer's position in the CappedVisibilityObserver's list of active writers.
+ */
+ boost::optional<std::list<UncommittedRecords*>::iterator> _it;
+
+ // Since a CappedVisibilitySnapshot is only concerned with the minimum uncommitted RecordId for
+ // a given writer, we use an atomic on the minimum. We can use a non-atomic for the maximum,
+ // which is never observed by another thread.
+ AtomicWord<std::int64_t> _min{0};
+
+ // For consistency with _min, we'll use an int64 type as well.
+ std::int64_t _max{0};
+
+ // An optional notification function that should be called when these uncommitted records are
+ // either committed or aborted.
+ OnCommitOrAbortFn _onCommitOrAbort;
+};
+
+/**
+ * Container that holds UncommittedRecords for different capped collections. This allows an
+ * operation to write to multiple capped collections at once, if necessary. A CappedWriter is only
+ * valid for the lifetime of a RecoveryUnit Snapshot, and may only be accessed by a single thread.
+ */
+class CappedWriter {
+public:
+ ~CappedWriter();
+ static CappedWriter& get(RecoveryUnit*);
+ static CappedWriter& get(OperationContext*);
+
+ /**
+ * Returns a pointer to the uncommitted writes for the given ident. The pointer is only valid
+ * for the duration of this storage snapshot.
+ */
+ UncommittedRecords* getUncommitedRecordsFor(const std::string& ident);
+
+private:
+ // This maps ident names to the uncommitted records for that collection.
+ StringMap<std::unique_ptr<UncommittedRecords>> _identToUncommittedRecords;
+};
+
+/**
+ * A CappedVisibilityObserver tracks the "visibility point" of a capped collection. For capped
+ * collections that accept concurrent writes which may not commit in RecordId order, the visibility
+ * point is the highest RecordId that is safe to read for a forward scanning cursor to guarantee
+ * that it doesn't miss "holes" for uncommitted records.
+ */
+class CappedVisibilityObserver {
+public:
+ CappedVisibilityObserver(StringData ident) : _ident(ident) {}
+
+ /**
+ * Register a writer for an uncommitted insert operation. The writer must follow-up by
+ * registering its allocated RecordIds with registerRecordIds() on the UncommittedRecords.
+ */
+ UncommittedRecords* registerWriter(
+ RecoveryUnit* recoveryUnit,
+ UncommittedRecords::OnCommitOrAbortFn&& onCommitOrAbort = nullptr);
+
+ /**
+ * Set a RecordId as committed and should be visible immediately. This bypasses any visibility
+ * tracking for uncommitted records so should only be used in cases where concurrent writes are
+ * not possible.
+ */
+ void setRecordImmediatelyVisible(const RecordId& rid);
+
+ /**
+ * Obtain a consistent view of the capped visibility point. This can be used by callers to
+ * determine whether records should be visible or not.
+ *
+ * It is critical that callers create a capped visibility snapshot before opening a storage
+ * engine snapshot unless the caller can guarantee there are no concurrent writes.
+ */
+ CappedVisibilitySnapshot makeSnapshot() const;
+
+private:
+ /**
+ * Notify that a previously-allocated RecordId for an uncommitted insert operation has either
+ * been committed or rolled-back.
+ */
+ void _onWriterCommittedOrAborted(CappedWriter* writer, bool committed);
+
+ CappedVisibilitySnapshot _makeSnapshot(WithLock) const;
+
+ const std::string _ident;
+
+ // This mutex protects all variables below.
+ mutable Mutex _mutex =
+ MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "CappedVisibilityObserver::_mutex");
+
+ // The set of uncommitted writes to this capped collection. We use a std::list so that we can
+ // use splice() for constant time insertion and deletion. This relies on the ability to maintain
+ // an iterator that is valid even after modifications to the container.
+ std::list<UncommittedRecords*> _uncommittedRecords;
+
+ // This is the highest RecordId ever committed to this collection.
+ RecordId _highestRecord;
+};
+} // namespace mongo
diff --git a/src/mongo/db/catalog/capped_visibility_test.cpp b/src/mongo/db/catalog/capped_visibility_test.cpp
new file mode 100644
index 00000000000..d235c4cb8fb
--- /dev/null
+++ b/src/mongo/db/catalog/capped_visibility_test.cpp
@@ -0,0 +1,380 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/catalog/capped_visibility.h"
+#include "mongo/db/operation_context_noop.h"
+#include "mongo/db/storage/recovery_unit_noop.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+using OpCtxAndRecoveryUnit =
+ std::pair<std::unique_ptr<OperationContext>, std::unique_ptr<RecoveryUnit>>;
+
+OpCtxAndRecoveryUnit makeOpCtxAndRecoveryUnit() {
+ auto opCtx = std::make_unique<OperationContextNoop>();
+ auto ru = std::make_unique<RecoveryUnitNoop>();
+ ru->setOperationContext(opCtx.get());
+ return {std::move(opCtx), std::move(ru)};
+}
+
+// Basic RecordId hole
+TEST(CappedVisibilityTest, BasicHole) {
+ CappedVisibilityObserver observer("test");
+ observer.setRecordImmediatelyVisible(RecordId(1));
+
+
+ auto [op1, ru1] = makeOpCtxAndRecoveryUnit();
+ auto [op2, ru2] = makeOpCtxAndRecoveryUnit();
+ auto writer1 = observer.registerWriter(ru1.get());
+ auto writer2 = observer.registerWriter(ru2.get());
+
+ writer1->registerRecordId(RecordId(2));
+ writer2->registerRecordId(RecordId(3));
+ ru2->commitUnitOfWork();
+
+ // Only RecordId 1 should be visible.
+ {
+ auto snapshot = observer.makeSnapshot();
+ ASSERT(snapshot.isRecordVisible(RecordId(1)));
+ ASSERT_FALSE(snapshot.isRecordVisible(RecordId(2)));
+ ASSERT_FALSE(snapshot.isRecordVisible(RecordId(3)));
+ }
+
+ ru1->commitUnitOfWork();
+
+ // All RecordIds should be visible now.
+ {
+ auto snapshot = observer.makeSnapshot();
+ ASSERT(snapshot.isRecordVisible(RecordId(1)));
+ ASSERT(snapshot.isRecordVisible(RecordId(2)));
+ ASSERT(snapshot.isRecordVisible(RecordId(3)));
+ ASSERT_FALSE(snapshot.isRecordVisible(RecordId(4)));
+ }
+}
+
+TEST(CappedVisibilityTest, RollBack) {
+ CappedVisibilityObserver observer("test");
+ observer.setRecordImmediatelyVisible(RecordId(1));
+
+ auto [op1, ru1] = makeOpCtxAndRecoveryUnit();
+ auto writer1 = observer.registerWriter(ru1.get());
+ writer1->registerRecordId(RecordId(2));
+
+ // Only RecordId 1 should be visible.
+ {
+ auto snapshot = observer.makeSnapshot();
+ ASSERT(snapshot.isRecordVisible(RecordId(1)));
+ ASSERT_FALSE(snapshot.isRecordVisible(RecordId(2)));
+ }
+
+ ru1->abortUnitOfWork();
+
+ {
+ auto snapshot = observer.makeSnapshot();
+ ASSERT(snapshot.isRecordVisible(RecordId(1)));
+ // RecordId 2 was not committed, so it should not be considered visible.
+ ASSERT_FALSE(snapshot.isRecordVisible(RecordId(2)));
+ ASSERT_FALSE(snapshot.isRecordVisible(RecordId(3)));
+ }
+}
+
+TEST(CappedVisibilityTest, RollBackHole) {
+ CappedVisibilityObserver observer("test");
+ observer.setRecordImmediatelyVisible(RecordId(1));
+
+ auto [op1, ru1] = makeOpCtxAndRecoveryUnit();
+ auto [op2, ru2] = makeOpCtxAndRecoveryUnit();
+ auto writer1 = observer.registerWriter(ru1.get());
+ auto writer2 = observer.registerWriter(ru2.get());
+
+ writer1->registerRecordId(RecordId(2));
+ writer2->registerRecordId(RecordId(3));
+ ru2->commitUnitOfWork();
+
+ // Only RecordId 1 should be visible.
+ {
+ auto snapshot = observer.makeSnapshot();
+ ASSERT(snapshot.isRecordVisible(RecordId(1)));
+ ASSERT_FALSE(snapshot.isRecordVisible(RecordId(2)));
+ ASSERT_FALSE(snapshot.isRecordVisible(RecordId(3)));
+ }
+
+ ru1->abortUnitOfWork();
+
+ // All committed RecordIds should be visible now.
+ {
+ auto snapshot = observer.makeSnapshot();
+ ASSERT(snapshot.isRecordVisible(RecordId(1)));
+ // Even though RecordId 2 was not committed, it should be considered visible.
+ ASSERT(snapshot.isRecordVisible(RecordId(2)));
+ ASSERT(snapshot.isRecordVisible(RecordId(3)));
+ ASSERT_FALSE(snapshot.isRecordVisible(RecordId(4)));
+ }
+}
+
+// Hole with multiple uncommitted writers and one writer hasn't register any records yet.
+TEST(CappedVisibilityTest, UnregisteredRecords) {
+ CappedVisibilityObserver observer("test");
+ observer.setRecordImmediatelyVisible(RecordId(1));
+
+ auto [op1, ru1] = makeOpCtxAndRecoveryUnit();
+ auto [op2, ru2] = makeOpCtxAndRecoveryUnit();
+ auto writer1 = observer.registerWriter(ru1.get());
+ auto writer2 = observer.registerWriter(ru2.get());
+
+ writer1->registerRecordId(RecordId(2));
+
+ // The highest visible record should be 1
+ {
+ auto snapshot = observer.makeSnapshot();
+ ASSERT(snapshot.isRecordVisible(RecordId(1)));
+ ASSERT_FALSE(snapshot.isRecordVisible(RecordId(2)));
+ ASSERT_FALSE(snapshot.isRecordVisible(RecordId(3)));
+ }
+
+ writer2->registerRecordId(RecordId(3));
+
+ // The highest visible record should still be 1
+ {
+ auto snapshot = observer.makeSnapshot();
+ ASSERT(snapshot.isRecordVisible(RecordId(1)));
+ ASSERT_FALSE(snapshot.isRecordVisible(RecordId(2)));
+ ASSERT_FALSE(snapshot.isRecordVisible(RecordId(3)));
+ }
+
+ ru1->commitUnitOfWork();
+
+ // RecordIds except for 3 should be visible.
+ {
+ auto snapshot = observer.makeSnapshot();
+ ASSERT(snapshot.isRecordVisible(RecordId(1)));
+ ASSERT(snapshot.isRecordVisible(RecordId(2)));
+ ASSERT_FALSE(snapshot.isRecordVisible(RecordId(3)));
+ }
+
+ ru2->commitUnitOfWork();
+
+ // All RecordIds should be visible now.
+ {
+ auto snapshot = observer.makeSnapshot();
+ ASSERT(snapshot.isRecordVisible(RecordId(1)));
+ ASSERT(snapshot.isRecordVisible(RecordId(2)));
+ ASSERT(snapshot.isRecordVisible(RecordId(3)));
+ ASSERT_FALSE(snapshot.isRecordVisible(RecordId(4)));
+ }
+}
+
+TEST(CappedVisibilityTest, RegisterRange) {
+ CappedVisibilityObserver observer("test");
+ observer.setRecordImmediatelyVisible(RecordId(1));
+
+ auto [op1, ru1] = makeOpCtxAndRecoveryUnit();
+ auto [op2, ru2] = makeOpCtxAndRecoveryUnit();
+ auto writer1 = observer.registerWriter(ru1.get());
+ auto writer2 = observer.registerWriter(ru2.get());
+
+ writer1->registerRecordIds(RecordId(2), RecordId(5));
+
+ writer2->registerRecordIds(RecordId(6), RecordId(10));
+
+ // The highest visible record should be 1.
+ {
+ auto snapshot = observer.makeSnapshot();
+ ASSERT(snapshot.isRecordVisible(RecordId(1)));
+ ASSERT_FALSE(snapshot.isRecordVisible(RecordId(2)));
+ ASSERT_FALSE(snapshot.isRecordVisible(RecordId(6)));
+ ASSERT_FALSE(snapshot.isRecordVisible(RecordId(10)));
+ }
+
+ ru2->commitUnitOfWork();
+
+ // The highest visible record should be 1.
+ {
+ auto snapshot = observer.makeSnapshot();
+ ASSERT(snapshot.isRecordVisible(RecordId(1)));
+ ASSERT_FALSE(snapshot.isRecordVisible(RecordId(2)));
+ ASSERT_FALSE(snapshot.isRecordVisible(RecordId(6)));
+ ASSERT_FALSE(snapshot.isRecordVisible(RecordId(10)));
+ }
+
+ ru1->commitUnitOfWork();
+ // All records should be visible.
+ {
+ auto snapshot = observer.makeSnapshot();
+ ASSERT(snapshot.isRecordVisible(RecordId(1)));
+ ASSERT(snapshot.isRecordVisible(RecordId(2)));
+ ASSERT(snapshot.isRecordVisible(RecordId(6)));
+ ASSERT(snapshot.isRecordVisible(RecordId(10)));
+ }
+}
+
+TEST(CappedVisibilityTest, MultiRegistration) {
+ CappedVisibilityObserver observer("test");
+ observer.setRecordImmediatelyVisible(RecordId(1));
+
+ auto [op1, ru1] = makeOpCtxAndRecoveryUnit();
+ auto [op2, ru2] = makeOpCtxAndRecoveryUnit();
+ auto writer1 = observer.registerWriter(ru1.get());
+ auto writer2 = observer.registerWriter(ru2.get());
+
+ writer1->registerRecordId(RecordId(2));
+ writer2->registerRecordId(RecordId(3));
+ writer1->registerRecordId(RecordId(4));
+ writer2->registerRecordId(RecordId(5));
+
+ // The highest visible record should be 1.
+ {
+ auto snapshot = observer.makeSnapshot();
+ ASSERT(snapshot.isRecordVisible(RecordId(1)));
+ ASSERT_FALSE(snapshot.isRecordVisible(RecordId(2)));
+ ASSERT_FALSE(snapshot.isRecordVisible(RecordId(3)));
+ ASSERT_FALSE(snapshot.isRecordVisible(RecordId(4)));
+ ASSERT_FALSE(snapshot.isRecordVisible(RecordId(5)));
+ }
+
+ ru2->commitUnitOfWork();
+
+ // The highest visible record should still be 1.
+ {
+ auto snapshot = observer.makeSnapshot();
+ ASSERT(snapshot.isRecordVisible(RecordId(1)));
+ ASSERT_FALSE(snapshot.isRecordVisible(RecordId(2)));
+ ASSERT_FALSE(snapshot.isRecordVisible(RecordId(3)));
+ ASSERT_FALSE(snapshot.isRecordVisible(RecordId(4)));
+ ASSERT_FALSE(snapshot.isRecordVisible(RecordId(5)));
+ }
+
+ ru1->commitUnitOfWork();
+
+ // All records should be visible.
+ {
+ auto snapshot = observer.makeSnapshot();
+ ASSERT(snapshot.isRecordVisible(RecordId(1)));
+ ASSERT(snapshot.isRecordVisible(RecordId(2)));
+ ASSERT(snapshot.isRecordVisible(RecordId(3)));
+ ASSERT(snapshot.isRecordVisible(RecordId(4)));
+ ASSERT(snapshot.isRecordVisible(RecordId(5)));
+ }
+}
+
+class CappedCollection {
+public:
+ CappedCollection(StringData ident) : _observer(ident) {}
+
+ void insertRecordImmediately(RecordId id) {
+ _observer.setRecordImmediatelyVisible(id);
+ }
+
+ void insertRecord(RecoveryUnit* ru, RecordId id) {
+ auto writer = _observer.registerWriter(ru);
+ writer->registerRecordId(id);
+ }
+
+ CappedVisibilitySnapshot makeSnapshot() {
+ return _observer.makeSnapshot();
+ }
+
+private:
+ CappedVisibilityObserver _observer;
+};
+
+// Tests writes to multiple capped collections at once
+TEST(CappedVisibilityTest, MultiCollection) {
+ CappedCollection coll1("coll1");
+ CappedCollection coll2("coll2");
+
+ coll1.insertRecordImmediately(RecordId(1));
+ coll2.insertRecordImmediately(RecordId(11));
+
+ auto [op1, ru1] = makeOpCtxAndRecoveryUnit();
+ auto [op2, ru2] = makeOpCtxAndRecoveryUnit();
+
+ coll1.insertRecord(ru1.get(), RecordId(2));
+ coll1.insertRecord(ru2.get(), RecordId(3));
+
+ coll2.insertRecord(ru1.get(), RecordId(12));
+ coll2.insertRecord(ru2.get(), RecordId(13));
+
+ // Only the first record should be visible to both collections.
+ {
+ auto snapshot = coll1.makeSnapshot();
+ ASSERT(snapshot.isRecordVisible(RecordId(1)));
+ ASSERT_FALSE(snapshot.isRecordVisible(RecordId(2)));
+ ASSERT_FALSE(snapshot.isRecordVisible(RecordId(3)));
+ }
+
+ {
+ auto snapshot = coll2.makeSnapshot();
+ ASSERT(snapshot.isRecordVisible(RecordId(11)));
+ ASSERT_FALSE(snapshot.isRecordVisible(RecordId(12)));
+ ASSERT_FALSE(snapshot.isRecordVisible(RecordId(13)));
+ }
+
+ ru2->commitUnitOfWork();
+
+ // Nothing should become newly visible
+ {
+ auto snapshot = coll1.makeSnapshot();
+ ASSERT(snapshot.isRecordVisible(RecordId(1)));
+ ASSERT_FALSE(snapshot.isRecordVisible(RecordId(2)));
+ ASSERT_FALSE(snapshot.isRecordVisible(RecordId(3)));
+ }
+
+ {
+ auto snapshot = coll2.makeSnapshot();
+ ASSERT(snapshot.isRecordVisible(RecordId(11)));
+ ASSERT_FALSE(snapshot.isRecordVisible(RecordId(12)));
+ ASSERT_FALSE(snapshot.isRecordVisible(RecordId(13)));
+ }
+
+ ru1->commitUnitOfWork();
+
+ // All RecordIds should be visible now.
+ {
+ auto snapshot = coll1.makeSnapshot();
+ ASSERT(snapshot.isRecordVisible(RecordId(1)));
+ ASSERT(snapshot.isRecordVisible(RecordId(2)));
+ ASSERT(snapshot.isRecordVisible(RecordId(3)));
+ ASSERT_FALSE(snapshot.isRecordVisible(RecordId(4)));
+ }
+
+ {
+ auto snapshot = coll2.makeSnapshot();
+ ASSERT(snapshot.isRecordVisible(RecordId(11)));
+ ASSERT(snapshot.isRecordVisible(RecordId(12)));
+ ASSERT(snapshot.isRecordVisible(RecordId(13)));
+ ASSERT_FALSE(snapshot.isRecordVisible(RecordId(14)));
+ }
+}
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h
index 75595c5fdaa..cc5f76aafd5 100644
--- a/src/mongo/db/catalog/collection.h
+++ b/src/mongo/db/catalog/collection.h
@@ -38,6 +38,7 @@
#include "mongo/base/status_with.h"
#include "mongo/base/string_data.h"
#include "mongo/bson/timestamp.h"
+#include "mongo/db/catalog/capped_visibility.h"
#include "mongo/db/catalog/collection_operation_source.h"
#include "mongo/db/catalog/collection_options.h"
#include "mongo/db/concurrency/d_concurrency.h"
@@ -581,6 +582,33 @@ public:
*/
virtual bool isCappedAndNeedsDelete(OperationContext* opCtx) const = 0;
+ /**
+ * When true, this collection uses the CappedSnapshots API to track concurrent writes and safely
+ * handle visibility for readers.
+ */
+ virtual bool usesCappedSnapshots() const = 0;
+
+ virtual std::vector<RecordId> reserveCappedRecordIds(OperationContext* opCtx,
+ size_t nIds) const = 0;
+
+ /**
+ * When we write to a capped collection, we call this so that that the storage engine can manage
+ * the visibility of documents to ensure they are ordered by RecordId.
+ *
+ * Since this is called inside of a WriteUnitOfWork while holding a std::mutex, it is
+ * illegal to acquire any LockManager locks inside of this function.
+ */
+ virtual void registerCappedInserts(OperationContext* opCtx,
+ const RecordId& minRecord,
+ const RecordId& maxRecord) const = 0;
+ void registerCappedInsert(OperationContext* opCtx, const RecordId& recordId) const {
+ registerCappedInserts(opCtx, recordId, recordId);
+ }
+
+ virtual CappedVisibilityObserver* getCappedVisibilityObserver() const = 0;
+ virtual CappedVisibilitySnapshot takeCappedVisibilitySnapshot() const = 0;
+
+
//
// Stats
//
diff --git a/src/mongo/db/catalog/collection_catalog.cpp b/src/mongo/db/catalog/collection_catalog.cpp
index 05c98e94a72..0254c50320b 100644
--- a/src/mongo/db/catalog/collection_catalog.cpp
+++ b/src/mongo/db/catalog/collection_catalog.cpp
@@ -39,6 +39,7 @@
#include "mongo/db/multitenancy_gen.h"
#include "mongo/db/server_feature_flags_gen.h"
#include "mongo/db/server_options.h"
+#include "mongo/db/storage/capped_snapshots.h"
#include "mongo/db/storage/kv/kv_engine.h"
#include "mongo/db/storage/recovery_unit.h"
#include "mongo/db/storage/snapshot_helper.h"
@@ -2210,11 +2211,15 @@ void CollectionCatalog::cleanupForCatalogReopen(Timestamp stable) {
void CollectionCatalog::invariantHasExclusiveAccessToCollection(OperationContext* opCtx,
const NamespaceString& nss) {
+ invariant(hasExclusiveAccessToCollection(opCtx, nss), nss.toString());
+}
+
+bool CollectionCatalog::hasExclusiveAccessToCollection(OperationContext* opCtx,
+ const NamespaceString& nss) {
auto& uncommittedCatalogUpdates = UncommittedCatalogUpdates::get(opCtx);
- invariant(opCtx->lockState()->isCollectionLockedForMode(nss, MODE_X) ||
- (uncommittedCatalogUpdates.isCreatedCollection(opCtx, nss) &&
- opCtx->lockState()->isCollectionLockedForMode(nss, MODE_IX)),
- nss.toString());
+ return opCtx->lockState()->isCollectionLockedForMode(nss, MODE_X) ||
+ (uncommittedCatalogUpdates.isCreatedCollection(opCtx, nss) &&
+ opCtx->lockState()->isCollectionLockedForMode(nss, MODE_IX));
}
CollectionPtr CollectionCatalog::_lookupSystemViews(OperationContext* opCtx,
@@ -2314,6 +2319,11 @@ const Collection* LookupCollectionForYieldRestore::operator()(OperationContext*
return nullptr;
}
+ // Non-lock-free readers use this path and need to re-establish their capped snapshot.
+ if (collection->usesCappedSnapshots()) {
+ CappedSnapshots::get(opCtx).establish(opCtx, collection.get());
+ }
+
// After yielding and reacquiring locks, the preconditions that were used to select our
// ReadSource initially need to be checked again. We select a ReadSource based on replication
// state. After a query yields its locks, the replication state may have changed, invalidating
diff --git a/src/mongo/db/catalog/collection_catalog.h b/src/mongo/db/catalog/collection_catalog.h
index 0d60c929409..32acf2bb959 100644
--- a/src/mongo/db/catalog/collection_catalog.h
+++ b/src/mongo/db/catalog/collection_catalog.h
@@ -644,6 +644,7 @@ public:
*/
static void invariantHasExclusiveAccessToCollection(OperationContext* opCtx,
const NamespaceString& nss);
+ static bool hasExclusiveAccessToCollection(OperationContext* opCtx, const NamespaceString& nss);
private:
friend class CollectionCatalog::iterator;
diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp
index ba12f55b898..c8b76fbba9d 100644
--- a/src/mongo/db/catalog/collection_impl.cpp
+++ b/src/mongo/db/catalog/collection_impl.cpp
@@ -58,6 +58,7 @@
#include "mongo/db/query/internal_plans.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/storage/capped_snapshots.h"
#include "mongo/db/storage/durable_catalog.h"
#include "mongo/db/timeseries/timeseries_constants.h"
#include "mongo/db/timeseries/timeseries_extended_range.h"
@@ -291,7 +292,10 @@ CollectionImpl::SharedState::SharedState(CollectionImpl* collection,
// clustered capped collections because they only guarantee insertion order when cluster keys
// are inserted in monotonically-increasing order.
_isCapped(options.capped),
- _needCappedLock(_isCapped && collection->ns().isReplicated() && !options.clusteredIndex) {}
+ _needCappedLock(_isCapped && collection->ns().isReplicated() && !options.clusteredIndex),
+ // The record store will be null when the collection is instantiated as part of the repair
+ // path.
+ _cappedObserver(_recordStore ? _recordStore->getIdent() : "") {}
CollectionImpl::SharedState::~SharedState() {
// The record store will be null when the collection is instantiated as part of the repair path.
@@ -524,6 +528,21 @@ bool CollectionImpl::requiresIdIndex() const {
std::unique_ptr<SeekableRecordCursor> CollectionImpl::getCursor(OperationContext* opCtx,
bool forward) const {
+ if (usesCappedSnapshots() && forward) {
+ if (opCtx->recoveryUnit()->isActive()) {
+ auto snapshot =
+ CappedSnapshots::get(opCtx).getSnapshot(_shared->_recordStore->getIdent());
+ invariant(
+ CollectionCatalog::hasExclusiveAccessToCollection(opCtx, ns()) || snapshot,
+ fmt::format("Capped visibility snapshot was not initialized before reading from "
+ "collection non-exclusively: {}",
+ _ns.ns()));
+ } else {
+ // We can lazily initialize the capped snapshot because no storage snapshot has been
+ // opened yet.
+ CappedSnapshots::get(opCtx).establish(opCtx, this);
+ }
+ }
return _shared->_recordStore->getCursor(opCtx, forward);
}
@@ -911,6 +930,67 @@ long long CollectionImpl::getCappedMaxSize() const {
return _metadata->options.cappedSize;
}
+bool CollectionImpl::usesCappedSnapshots() const {
+ // Only use the behavior for non-replicated capped collections (which can accept concurrent
+ // writes). This behavior relies on RecordIds being allocated in increasing order. For clustered
+ // collections, users define their RecordIds and are not constrained to creating them in
+ // increasing order.
+ // The oplog tracks its visibility through support from the storage engine.
+ return isCapped() && !ns().isReplicated() && !ns().isOplog() && !isClustered();
+}
+
+CappedVisibilityObserver* CollectionImpl::getCappedVisibilityObserver() const {
+ invariant(usesCappedSnapshots());
+ return &_shared->_cappedObserver;
+}
+
+std::vector<RecordId> CollectionImpl::reserveCappedRecordIds(OperationContext* opCtx,
+ size_t count) const {
+ invariant(usesCappedSnapshots());
+
+ // By registering ourselves as a writer, we inform the capped visibility system that we may be
+ // in the process of committing uncommitted records.
+ auto cappedObserver = getCappedVisibilityObserver();
+ cappedObserver->registerWriter(
+ opCtx->recoveryUnit(), [this]() { _shared->_recordStore->notifyCappedWaitersIfNeeded(); });
+
+ std::vector<RecordId> ids;
+ ids.reserve(count);
+ {
+ // We must atomically allocate and register any RecordIds so that we can correctly keep
+ // track of visibility. This ensures capped readers do not skip past any in-progress writes.
+ stdx::lock_guard<Latch> lk(_shared->_registerCappedIdsMutex);
+ _shared->_recordStore->reserveRecordIds(opCtx, &ids, count);
+
+ // We are guaranteed to have a contiguous range so we only register the min and max.
+ registerCappedInserts(opCtx, ids.front(), ids.back());
+ }
+
+ return ids;
+}
+
+void CollectionImpl::registerCappedInserts(OperationContext* opCtx,
+ const RecordId& minRecord,
+ const RecordId& maxRecord) const {
+ invariant(usesCappedSnapshots());
+ // Callers should be updating visibility as part of a write operation. We want to ensure that
+ // we never get here while holding an uninterruptible, read-ticketed lock. That would indicate
+ // that we are operating with the wrong global lock semantics, and either hold too weak a lock
+ // (e.g. IS) or that we upgraded in a way we shouldn't (e.g. IS -> IX).
+ invariant(opCtx->lockState()->isNoop() || !opCtx->lockState()->hasReadTicket() ||
+ !opCtx->lockState()->uninterruptibleLocksRequested());
+
+ auto* uncommitted =
+ CappedWriter::get(opCtx).getUncommitedRecordsFor(_shared->_recordStore->getIdent());
+ uncommitted->registerRecordIds(minRecord, maxRecord);
+ return;
+}
+
+CappedVisibilitySnapshot CollectionImpl::takeCappedVisibilitySnapshot() const {
+ invariant(usesCappedSnapshots());
+ return _shared->_cappedObserver.makeSnapshot();
+}
+
long long CollectionImpl::numRecords(OperationContext* opCtx) const {
return _shared->_recordStore->numRecords(opCtx);
}
diff --git a/src/mongo/db/catalog/collection_impl.h b/src/mongo/db/catalog/collection_impl.h
index fd1c854bbce..0f2f6fd69ee 100644
--- a/src/mongo/db/catalog/collection_impl.h
+++ b/src/mongo/db/catalog/collection_impl.h
@@ -237,6 +237,14 @@ public:
long long getCappedMaxDocs() const final;
long long getCappedMaxSize() const final;
+ bool usesCappedSnapshots() const final;
+ std::vector<RecordId> reserveCappedRecordIds(OperationContext* opCtx, size_t nIds) const final;
+ void registerCappedInserts(OperationContext* opCtx,
+ const RecordId& minRecord,
+ const RecordId& maxRecord) const final;
+ CappedVisibilityObserver* getCappedVisibilityObserver() const final;
+ CappedVisibilitySnapshot takeCappedVisibilitySnapshot() const final;
+
long long numRecords(OperationContext* opCtx) const final;
long long dataSize(OperationContext* opCtx) const final;
@@ -416,6 +424,15 @@ private:
AtomicWord<bool> _committed{true};
+ // Tracks in-progress capped inserts to inform visibility for forward scans so that no
+ // uncommitted records are skipped.
+ CappedVisibilityObserver _cappedObserver;
+
+ // This mutex synchronizes allocating and registering RecordIds for uncommited writes on
+ // capped collections that accept concurrent writes (i.e. usesCappedSnapshots()).
+ mutable Mutex _registerCappedIdsMutex =
+ MONGO_MAKE_LATCH("CollectionImpl::_registerCappedIdsMutex");
+
// Time-series collections are allowed to contain measurements with arbitrary dates;
// however, many of our query optimizations only work properly with dates that can be stored
// as an offset in seconds from the Unix epoch within 31 bits (roughly 1970-2038). When this
diff --git a/src/mongo/db/catalog/collection_mock.h b/src/mongo/db/catalog/collection_mock.h
index 4cc17cf983a..a7d1a7b4545 100644
--- a/src/mongo/db/catalog/collection_mock.h
+++ b/src/mongo/db/catalog/collection_mock.h
@@ -250,6 +250,28 @@ public:
MONGO_UNREACHABLE;
}
+ bool usesCappedSnapshots() const {
+ return false;
+ }
+
+ std::vector<RecordId> reserveCappedRecordIds(OperationContext* opCtx, size_t nIds) const final {
+ MONGO_UNREACHABLE;
+ }
+
+ void registerCappedInserts(OperationContext* opCtx,
+ const RecordId& minRecord,
+ const RecordId& maxRecord) const {
+ std::abort();
+ }
+
+ CappedVisibilityObserver* getCappedVisibilityObserver() const {
+ std::abort();
+ }
+
+ CappedVisibilitySnapshot takeCappedVisibilitySnapshot() const {
+ std::abort();
+ }
+
long long numRecords(OperationContext* opCtx) const {
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/db/catalog/collection_write_path.cpp b/src/mongo/db/catalog/collection_write_path.cpp
index a3fab3026e3..70d8e559e7e 100644
--- a/src/mongo/db/catalog/collection_write_path.cpp
+++ b/src/mongo/db/catalog/collection_write_path.cpp
@@ -137,7 +137,13 @@ Status insertDocumentsImpl(OperationContext* opCtx,
std::vector<Timestamp> timestamps;
timestamps.reserve(count);
- for (auto it = begin; it != end; it++) {
+ std::vector<RecordId> cappedRecordIds;
+ if (collection->usesCappedSnapshots()) {
+ cappedRecordIds = collection->reserveCappedRecordIds(opCtx, count);
+ }
+
+ size_t i = 0;
+ for (auto it = begin; it != end; it++, i++) {
const auto& doc = it->doc;
RecordId recordId;
@@ -147,6 +153,12 @@ Status insertDocumentsImpl(OperationContext* opCtx,
record_id_helpers::keyForDoc(doc,
collection->getClusteredInfo()->getIndexSpec(),
collection->getDefaultCollator()));
+ } else if (!it->recordId.isNull()) {
+ // This case would only normally be called in a testing circumstance to avoid
+ // automatically generating record ids for capped collections.
+ recordId = it->recordId;
+ } else if (cappedRecordIds.size()) {
+ recordId = std::move(cappedRecordIds[i]);
}
if (MONGO_unlikely(corruptDocumentOnInsert.shouldFail())) {
diff --git a/src/mongo/db/catalog/virtual_collection_impl.h b/src/mongo/db/catalog/virtual_collection_impl.h
index d45b46bd380..1aaa288e834 100644
--- a/src/mongo/db/catalog/virtual_collection_impl.h
+++ b/src/mongo/db/catalog/virtual_collection_impl.h
@@ -389,6 +389,31 @@ public:
return false;
}
+ bool usesCappedSnapshots() const final {
+ return false;
+ }
+
+ std::vector<RecordId> reserveCappedRecordIds(OperationContext* opCtx, size_t nIds) const final {
+ unimplementedTasserted();
+ return {};
+ }
+
+ void registerCappedInserts(OperationContext* opCtx,
+ const RecordId& minRecord,
+ const RecordId& maxRecord) const final {
+ unimplementedTasserted();
+ }
+
+ CappedVisibilityObserver* getCappedVisibilityObserver() const final {
+ unimplementedTasserted();
+ return nullptr;
+ }
+
+ CappedVisibilitySnapshot takeCappedVisibilitySnapshot() const final {
+ unimplementedTasserted();
+ return {};
+ }
+
bool isCapped() const final {
return false;
}
diff --git a/src/mongo/db/db_raii.cpp b/src/mongo/db/db_raii.cpp
index 5da919faf79..83b26701c88 100644
--- a/src/mongo/db/db_raii.cpp
+++ b/src/mongo/db/db_raii.cpp
@@ -37,6 +37,7 @@
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/operation_sharding_state.h"
+#include "mongo/db/storage/capped_snapshots.h"
#include "mongo/db/storage/snapshot_helper.h"
#include "mongo/db/storage/storage_parameters_gen.h"
#include "mongo/logv2/log.h"
@@ -500,6 +501,10 @@ AutoGetCollectionForReadBase<AutoGetCollectionType, EmplaceAutoCollFunc>::
assertReadConcernSupported(
coll, readConcernArgs, opCtx->recoveryUnit()->getTimestampReadSource());
+ if (coll->usesCappedSnapshots()) {
+ CappedSnapshots::get(opCtx).establish(opCtx, coll);
+ }
+
// We make a copy of the namespace so we can use the variable after locks are released,
// since releasing locks will allow the value of coll->ns() to change.
const NamespaceString nss = coll->ns();
@@ -680,12 +685,17 @@ void AutoGetCollectionForReadLockFreeLegacy::EmplaceHelper::emplace(
auto coll = catalog.lookupCollectionByUUIDForRead(opCtx, uuid);
- // After yielding and reacquiring locks, the preconditions that were used to
- // select our ReadSource initially need to be checked again. We select a
- // ReadSource based on replication state. After a query yields its locks, the
- // replication state may have changed, invalidating our current choice of
- // ReadSource. Using the same preconditions, change our ReadSource if necessary.
if (coll) {
+ if (coll->usesCappedSnapshots()) {
+ CappedSnapshots::get(opCtx).establish(opCtx, coll.get());
+ }
+
+ // After yielding and reacquiring locks, the preconditions that were used to
+ // select our ReadSource initially need to be checked again. We select a
+ // ReadSource based on replication state. After a query yields its locks,
+ // the replication state may have changed, invalidating our current choice
+ // of ReadSource. Using the same preconditions, change our ReadSource if
+ // necessary.
SnapshotHelper::changeReadSourceIfNeeded(opCtx, coll->ns());
}
diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h
index 9a4708082a0..abec01de8e1 100644
--- a/src/mongo/db/repl/oplog.h
+++ b/src/mongo/db/repl/oplog.h
@@ -72,8 +72,12 @@ public:
InsertStatement(BSONObj toInsert, Timestamp ts, long long term)
: oplogSlot(repl::OpTime(ts, term)), doc(std::move(toInsert)) {}
+ InsertStatement(BSONObj toInsert, RecordId rid)
+ : recordId(std::move(rid)), doc(std::move(toInsert)) {}
+
std::vector<StmtId> stmtIds = {kUninitializedStmtId};
OplogSlot oplogSlot;
+ RecordId recordId;
BSONObj doc;
};
diff --git a/src/mongo/db/storage/SConscript b/src/mongo/db/storage/SConscript
index 3d55f8208f9..f2e1725333f 100644
--- a/src/mongo/db/storage/SConscript
+++ b/src/mongo/db/storage/SConscript
@@ -116,6 +116,18 @@ env.Library(
)
env.Library(
+ target='capped_snapshots',
+ source=[
+ 'capped_snapshots.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/db/catalog/collection',
+ 'recovery_unit_base',
+ ],
+)
+
+env.Library(
target='bson_collection_catalog_entry',
source=[
'bson_collection_catalog_entry.cpp',
diff --git a/src/mongo/db/storage/capped_snapshots.cpp b/src/mongo/db/storage/capped_snapshots.cpp
new file mode 100644
index 00000000000..dd7f974b911
--- /dev/null
+++ b/src/mongo/db/storage/capped_snapshots.cpp
@@ -0,0 +1,69 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/storage/capped_snapshots.h"
+namespace mongo {
+
+auto getCappedSnapshots = RecoveryUnit::Snapshot::declareDecoration<CappedSnapshots>();
+
+CappedSnapshots& CappedSnapshots::get(RecoveryUnit* ru) {
+ return getCappedSnapshots(ru->getSnapshot());
+}
+
+CappedSnapshots& CappedSnapshots::get(OperationContext* opCtx) {
+ return getCappedSnapshots(opCtx->recoveryUnit()->getSnapshot());
+}
+
+
+void CappedSnapshots::establish(OperationContext* opCtx, const Collection* coll) {
+ invariant(!opCtx->recoveryUnit()->isActive() ||
+ opCtx->lockState()->isCollectionLockedForMode(coll->ns(), MODE_X));
+
+
+ auto snapshot = coll->takeCappedVisibilitySnapshot();
+ _setSnapshot(coll->getRecordStore()->getIdent(), std::move(snapshot));
+}
+
+void CappedSnapshots::establish(OperationContext* opCtx, const CollectionPtr& coll) {
+ establish(opCtx, coll.get());
+}
+
+boost::optional<CappedVisibilitySnapshot> CappedSnapshots::getSnapshot(StringData ident) const {
+ auto it = _snapshots.find(ident);
+ if (it == _snapshots.end()) {
+ return boost::none;
+ }
+ return it->second;
+}
+
+void CappedSnapshots::_setSnapshot(StringData ident, CappedVisibilitySnapshot snapshot) {
+ _snapshots[ident] = std::move(snapshot);
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/storage/capped_snapshots.h b/src/mongo/db/storage/capped_snapshots.h
new file mode 100644
index 00000000000..141fee746aa
--- /dev/null
+++ b/src/mongo/db/storage/capped_snapshots.h
@@ -0,0 +1,75 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/catalog/capped_visibility.h"
+#include "mongo/db/catalog/collection.h"
+
+namespace mongo {
+class OperationContext;
+class RecoveryUnit;
+
+/**
+ * CappedSnapshots is a container for managing the capped visibility snapshots for collections for a
+ * specific RecoveryUnit. Callers must establish a capped visibility snapshot before opening a
+ * storage snapshot. Once the storage snapshot is closed, due to a committed unit of work or end of
+ * a read transaction, the snapshot is invalidated, and another call is required to establish the
+ * snapshot.
+ */
+class CappedSnapshots {
+public:
+ static CappedSnapshots& get(RecoveryUnit* ru);
+ static CappedSnapshots& get(OperationContext* opCtx);
+
+ /**
+ * Must be called before opening a forward cursor on this capped collection. Establishes a
+ * consistent view of the capped visibility for this collection. The snapshot is invalidated for
+ * this collection when the storage engine snapshot is closed.
+ */
+ void establish(OperationContext* opCtx, const CollectionPtr& coll);
+ void establish(OperationContext* opCtx, const Collection* coll);
+
+ /**
+ * Retrieve a previously established visibility snapshot. If no prior call to establish() has
+ * been made or if the storage snapshot has been closed since the last call to establish(), this
+ * will return boost::none, indicating that the caller may be attempting to do something unsafe
+ * that would return records past the capped visibility point.
+ */
+ boost::optional<CappedVisibilitySnapshot> getSnapshot(StringData ident) const;
+
+private:
+ void _setSnapshot(StringData ident, CappedVisibilitySnapshot snapshot);
+
+private:
+ StringMap<CappedVisibilitySnapshot> _snapshots;
+};
+} // namespace mongo
diff --git a/src/mongo/db/storage/devnull/devnull_kv_engine.cpp b/src/mongo/db/storage/devnull/devnull_kv_engine.cpp
index 14e37b6c25d..7a506914d1d 100644
--- a/src/mongo/db/storage/devnull/devnull_kv_engine.cpp
+++ b/src/mongo/db/storage/devnull/devnull_kv_engine.cpp
@@ -159,6 +159,14 @@ public:
long long numRecords,
long long dataSize) {}
+ virtual void reserveRecordIds(OperationContext* opCtx,
+ std::vector<RecordId>* out,
+ size_t nRecords) final {
+ for (size_t i = 0; i < nRecords; i++) {
+ out->push_back(RecordId(i));
+ }
+ };
+
protected:
void waitForAllEarlierOplogWritesToBeVisibleImpl(OperationContext* opCtx) const override {}
diff --git a/src/mongo/db/storage/devnull/ephemeral_catalog_record_store.h b/src/mongo/db/storage/devnull/ephemeral_catalog_record_store.h
index 26d9337599e..14b832dc069 100644
--- a/src/mongo/db/storage/devnull/ephemeral_catalog_record_store.h
+++ b/src/mongo/db/storage/devnull/ephemeral_catalog_record_store.h
@@ -115,6 +115,10 @@ public:
_data->dataSize = dataSize;
}
+ virtual void reserveRecordIds(OperationContext* opCtx,
+ std::vector<RecordId>* out,
+ size_t nRecords) final{};
+
protected:
struct EphemeralForTestRecord {
EphemeralForTestRecord() : size(0) {}
diff --git a/src/mongo/db/storage/external_record_store.h b/src/mongo/db/storage/external_record_store.h
index ac0c61392c6..58a29d2a66c 100644
--- a/src/mongo/db/storage/external_record_store.h
+++ b/src/mongo/db/storage/external_record_store.h
@@ -138,6 +138,12 @@ protected:
unimplementedTasserted();
}
+ void reserveRecordIds(OperationContext* opCtx,
+ std::vector<RecordId>* out,
+ size_t nRecords) final {
+ unimplementedTasserted();
+ }
+
private:
void unimplementedTasserted() const {
MONGO_UNIMPLEMENTED_TASSERT(6968600);
diff --git a/src/mongo/db/storage/record_store.h b/src/mongo/db/storage/record_store.h
index 69e845b4809..f530187e7b9 100644
--- a/src/mongo/db/storage/record_store.h
+++ b/src/mongo/db/storage/record_store.h
@@ -629,6 +629,14 @@ public:
void waitForAllEarlierOplogWritesToBeVisible(OperationContext* opCtx) const;
/**
+ * Reserve a range of contiguous RecordIds. Returns the first valid RecordId in the range. Must
+ * only be called on a RecordStore with KeyFormat::Long.
+ */
+ virtual void reserveRecordIds(OperationContext* opCtx,
+ std::vector<RecordId>* out,
+ size_t nRecords) = 0;
+
+ /**
* Called after a repair operation is run with the recomputed numRecords and dataSize.
*/
virtual void updateStatsAfterRepair(OperationContext* opCtx,
diff --git a/src/mongo/db/storage/recovery_unit.h b/src/mongo/db/storage/recovery_unit.h
index 2c43a5c203b..d4a755292c7 100644
--- a/src/mongo/db/storage/recovery_unit.h
+++ b/src/mongo/db/storage/recovery_unit.h
@@ -126,6 +126,7 @@ public:
kCommit
};
+
void commitRegisteredChanges(boost::optional<Timestamp> commitTimestamp);
void abortRegisteredChanges();
virtual ~RecoveryUnit() {}
diff --git a/src/mongo/db/storage/wiredtiger/SConscript b/src/mongo/db/storage/wiredtiger/SConscript
index 7f6cee79dc4..5b17fafe450 100644
--- a/src/mongo/db/storage/wiredtiger/SConscript
+++ b/src/mongo/db/storage/wiredtiger/SConscript
@@ -85,6 +85,7 @@ wtEnv.Library(
'$BUILD_DIR/mongo/db/shard_role',
'$BUILD_DIR/mongo/db/snapshot_window_options',
'$BUILD_DIR/mongo/db/storage/backup_block',
+ '$BUILD_DIR/mongo/db/storage/capped_snapshots',
'$BUILD_DIR/mongo/db/storage/storage_engine_parameters',
'$BUILD_DIR/mongo/db/storage/storage_repair_observer',
'$BUILD_DIR/mongo/util/log_and_backoff',
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
index ac28e20db74..364a8f3014a 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
@@ -54,6 +54,7 @@
#include "mongo/db/server_recovery.h"
#include "mongo/db/service_context.h"
#include "mongo/db/stats/resource_consumption_metrics.h"
+#include "mongo/db/storage/capped_snapshots.h"
#include "mongo/db/storage/wiredtiger/oplog_stone_parameters_gen.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_cursor_helpers.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_customization_hooks.h"
@@ -1954,6 +1955,15 @@ void WiredTigerRecordStore::_initNextIdIfNeeded(OperationContext* opCtx) {
_nextIdNum.store(nextId);
}
+void WiredTigerRecordStore::reserveRecordIds(OperationContext* opCtx,
+ std::vector<RecordId>* out,
+ size_t nRecords) {
+ auto nextId = _reserveIdBlock(opCtx, nRecords);
+ for (size_t i = 0; i < nRecords; i++) {
+ out->push_back(RecordId(nextId++));
+ }
+}
+
long long WiredTigerRecordStore::_reserveIdBlock(OperationContext* opCtx, size_t nRecords) {
// Clustered record stores do not automatically generate int64 RecordIds. RecordIds are instead
// constructed as binary strings, KeyFormat::String, from the user-defined cluster key.
@@ -2134,8 +2144,8 @@ WiredTigerRecordStoreCursorBase::WiredTigerRecordStoreCursorBase(OperationContex
const WiredTigerRecordStore& rs,
bool forward)
: _rs(rs), _opCtx(opCtx), _forward(forward) {
- if (_rs._isOplog) {
- initOplogVisibility(_opCtx);
+ if (_rs._isCapped) {
+ initCappedVisibility(_opCtx);
}
_cursor.emplace(rs.getURI(), rs.tableId(), true, opCtx);
}
@@ -2193,7 +2203,7 @@ boost::optional<Record> WiredTigerRecordStoreCursorBase::next() {
return {};
}
- if (_forward && _oplogVisibleTs && id.getLong() > *_oplogVisibleTs) {
+ if (_forward && !isVisible(id)) {
_eof = true;
return {};
}
@@ -2226,7 +2236,7 @@ boost::optional<Record> WiredTigerRecordStoreCursorBase::seekExact(const RecordI
return {};
}
- if (_forward && _oplogVisibleTs && id.getLong() > *_oplogVisibleTs) {
+ if (_forward && !isVisible(id)) {
_eof = true;
return {};
}
@@ -2274,6 +2284,8 @@ boost::optional<Record> WiredTigerRecordStoreCursorBase::seekNear(const RecordId
// Additionally, forward scanning oplog cursors must not see past holes.
if (_forward && _oplogVisibleTs && start.getLong() > *_oplogVisibleTs) {
start = RecordId(*_oplogVisibleTs);
+ } else if (_forward && _cappedSnapshot && !_cappedSnapshot->isRecordVisible(start)) {
+ start = _cappedSnapshot->getHighestVisible();
}
_skipNextAdvance = false;
@@ -2325,7 +2337,7 @@ boost::optional<Record> WiredTigerRecordStoreCursorBase::seekNear(const RecordId
return boost::none;
}
- if (_forward && _oplogVisibleTs && curId.getLong() > *_oplogVisibleTs) {
+ if (_forward && !isVisible(curId)) {
_eof = true;
return boost::none;
}
@@ -2347,6 +2359,7 @@ void WiredTigerRecordStoreCursorBase::save() {
if (_cursor)
_cursor->reset();
_oplogVisibleTs = boost::none;
+ _cappedSnapshot = boost::none;
_readTimestampForOplog = boost::none;
_hasRestored = false;
} catch (const WriteConflictException&) {
@@ -2355,17 +2368,37 @@ void WiredTigerRecordStoreCursorBase::save() {
}
}
-void WiredTigerRecordStoreCursorBase::initOplogVisibility(OperationContext* opCtx) {
- auto wtRu = WiredTigerRecoveryUnit::get(opCtx);
- wtRu->setIsOplogReader();
- if (_forward) {
- _oplogVisibleTs = wtRu->getOplogVisibilityTs();
+bool WiredTigerRecordStoreCursorBase::isVisible(const RecordId& id) {
+ // The oplog does not use the capped snapshot mechanism, so it should be impossible for both to
+ // exist at once.
+ invariant(!(_oplogVisibleTs && _cappedSnapshot));
+ if (_oplogVisibleTs && id.getLong() > *_oplogVisibleTs) {
+ return false;
+ }
+ if (_cappedSnapshot && !_cappedSnapshot->isRecordVisible(id)) {
+ return false;
}
- boost::optional<Timestamp> readTs = wtRu->getPointInTimeReadTimestamp(opCtx);
- if (readTs && readTs->asLL() != 0) {
- // One cannot pass a read_timestamp of 0 to WT, but a "0" is commonly understand as every
- // time is visible.
- _readTimestampForOplog = readTs->asInt64();
+ return true;
+}
+
+void WiredTigerRecordStoreCursorBase::initCappedVisibility(OperationContext* opCtx) {
+ if (_rs._isOplog) {
+ auto wtRu = WiredTigerRecoveryUnit::get(opCtx);
+ wtRu->setIsOplogReader();
+ if (_forward) {
+ _oplogVisibleTs = wtRu->getOplogVisibilityTs();
+ }
+ boost::optional<Timestamp> readTs = wtRu->getPointInTimeReadTimestamp(opCtx);
+ if (readTs && readTs->asLL() != 0) {
+ // One cannot pass a read_timestamp of 0 to WT, but a "0" is commonly understood as
+ // every time is visible.
+ _readTimestampForOplog = readTs->asInt64();
+ }
+ } else if (_forward) {
+ // We can't enforce that the caller has initialized the capped snapshot before entering this
+ // function because we need to know, for example, what locks are held. So we expect higher
+ // layers to do so.
+ _cappedSnapshot = CappedSnapshots::get(_opCtx).getSnapshot(_rs._ident->getIdent());
}
}
@@ -2375,8 +2408,8 @@ void WiredTigerRecordStoreCursorBase::saveUnpositioned() {
}
bool WiredTigerRecordStoreCursorBase::restore(bool tolerateCappedRepositioning) {
- if (_rs._isOplog) {
- initOplogVisibility(_opCtx);
+ if (_rs._isCapped) {
+ initCappedVisibility(_opCtx);
}
if (!_cursor)
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
index c8203bb6c80..44826a6afd5 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
@@ -34,6 +34,7 @@
#include <string>
#include <wiredtiger.h>
+#include "mongo/db/catalog/capped_visibility.h"
#include "mongo/db/storage/record_store.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_cursor.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h"
@@ -264,6 +265,10 @@ public:
typedef stdx::variant<int64_t, WiredTigerItem> CursorKey;
+ void reserveRecordIds(OperationContext* opCtx,
+ std::vector<RecordId>* out,
+ size_t nRecords) final;
+
protected:
virtual RecordId getKey(WT_CURSOR* cursor) const = 0;
@@ -421,7 +426,7 @@ protected:
private:
bool isVisible(const RecordId& id);
- void initOplogVisibility(OperationContext* opCtx);
+ void initCappedVisibility(OperationContext* opCtx);
/**
* This value is used for visibility calculations on what oplog entries can be returned to a
@@ -429,6 +434,7 @@ private:
* established.
*/
boost::optional<std::int64_t> _oplogVisibleTs = boost::none;
+ boost::optional<CappedVisibilitySnapshot> _cappedSnapshot;
/**
* With WT-8601, WiredTiger no longer maintains commit_timestamp information on writes to logged