summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/noPassthrough/hybrid_unique_index_with_updates.js12
-rw-r--r--src/mongo/db/catalog/SConscript17
-rw-r--r--src/mongo/db/catalog/index_catalog.h1
-rw-r--r--src/mongo/db/catalog/index_catalog_entry.h2
-rw-r--r--src/mongo/db/catalog/index_catalog_entry_impl.h4
-rw-r--r--src/mongo/db/catalog/index_timestamp_helper.cpp159
-rw-r--r--src/mongo/db/catalog/index_timestamp_helper.h58
-rw-r--r--src/mongo/db/catalog/multi_index_block.cpp4
-rw-r--r--src/mongo/db/catalog/multi_index_block.h6
-rw-r--r--src/mongo/db/commands/create_indexes.cpp4
-rw-r--r--src/mongo/db/index/SConscript1
-rw-r--r--src/mongo/db/index/index_build_interceptor.cpp63
-rw-r--r--src/mongo/db/index/index_build_interceptor.h11
-rw-r--r--src/mongo/db/index_builder.cpp113
-rw-r--r--src/mongo/db/repl/collection_bulk_loader_impl.cpp42
-rw-r--r--src/mongo/db/storage/recovery_unit.h7
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp134
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h11
-rw-r--r--src/mongo/dbtests/SConscript1
-rw-r--r--src/mongo/dbtests/storage_timestamp_tests.cpp132
20 files changed, 624 insertions, 158 deletions
diff --git a/jstests/noPassthrough/hybrid_unique_index_with_updates.js b/jstests/noPassthrough/hybrid_unique_index_with_updates.js
index b3dc94ca4fd..7aa7bbe21f2 100644
--- a/jstests/noPassthrough/hybrid_unique_index_with_updates.js
+++ b/jstests/noPassthrough/hybrid_unique_index_with_updates.js
@@ -3,14 +3,18 @@
* of hybrid unique index builds. This test inserts a duplicate document at different phases of an
* index build to confirm that the resulting behavior is failure.
*
- * @tags: [requires_document_locking]
+ * @tags: [requires_document_locking, requires_replication]
*/
(function() {
"use strict";
load("jstests/libs/check_log.js");
- let conn = MongoRunner.runMongod();
+ let replSetTest = new ReplSetTest({name: "hybrid_updates", nodes: 2});
+ replSetTest.startSet();
+ replSetTest.initiate();
+
+ let conn = replSetTest.getPrimary();
let testDB = conn.getDB('test');
// Run 'func' while failpoint is enabled.
@@ -53,7 +57,7 @@
/**
* Run a background index build on a unique index under different configurations. Introduce
* duplicate keys on the index that may cause it to fail or succeed, depending on the following
- * optional parmeters:
+ * optional parameters:
* {
* // Which operation used to introduce a duplicate key.
* operation {string}: "insert", "update"
@@ -153,5 +157,5 @@
runTest({operation: "update", resolve: false, phase: i});
}
- MongoRunner.stopMongod(conn);
+ replSetTest.stopSet();
})();
diff --git a/src/mongo/db/catalog/SConscript b/src/mongo/db/catalog/SConscript
index 91499778d77..6b99eceab39 100644
--- a/src/mongo/db/catalog/SConscript
+++ b/src/mongo/db/catalog/SConscript
@@ -208,6 +208,22 @@ env.Library(
)
env.Library(
+ target='index_timestamp_helper',
+ source=[
+ "index_timestamp_helper.cpp",
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base',
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/concurrency/write_conflict_exception',
+ '$BUILD_DIR/mongo/db/logical_clock',
+ '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
+ '$BUILD_DIR/mongo/db/server_options_core',
+ ]
+)
+
+env.Library(
target='collection',
source=[
"collection.cpp",
@@ -257,6 +273,7 @@ env.Library(
],
LIBDEPS=[
'collection',
+ 'index_timestamp_helper',
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/audit',
'$BUILD_DIR/mongo/db/background',
diff --git a/src/mongo/db/catalog/index_catalog.h b/src/mongo/db/catalog/index_catalog.h
index d79e429cf6c..4a1a28f4b51 100644
--- a/src/mongo/db/catalog/index_catalog.h
+++ b/src/mongo/db/catalog/index_catalog.h
@@ -476,5 +476,4 @@ public:
virtual void indexBuildSuccess(OperationContext* opCtx, IndexCatalogEntry* index) = 0;
};
-
} // namespace mongo
diff --git a/src/mongo/db/catalog/index_catalog_entry.h b/src/mongo/db/catalog/index_catalog_entry.h
index ef6628cfffc..8f9e6ab382e 100644
--- a/src/mongo/db/catalog/index_catalog_entry.h
+++ b/src/mongo/db/catalog/index_catalog_entry.h
@@ -79,6 +79,8 @@ public:
virtual IndexBuildInterceptor* indexBuildInterceptor() = 0;
+ virtual const IndexBuildInterceptor* indexBuildInterceptor() const = 0;
+
virtual void setIndexBuildInterceptor(IndexBuildInterceptor* interceptor) = 0;
virtual const Ordering& ordering() const = 0;
diff --git a/src/mongo/db/catalog/index_catalog_entry_impl.h b/src/mongo/db/catalog/index_catalog_entry_impl.h
index 7460b765b34..dd1443aebcc 100644
--- a/src/mongo/db/catalog/index_catalog_entry_impl.h
+++ b/src/mongo/db/catalog/index_catalog_entry_impl.h
@@ -98,6 +98,10 @@ public:
return _indexBuildInterceptor;
}
+ const IndexBuildInterceptor* indexBuildInterceptor() const final {
+ return _indexBuildInterceptor;
+ }
+
void setIndexBuildInterceptor(IndexBuildInterceptor* interceptor) final {
_indexBuildInterceptor = interceptor;
}
diff --git a/src/mongo/db/catalog/index_timestamp_helper.cpp b/src/mongo/db/catalog/index_timestamp_helper.cpp
new file mode 100644
index 00000000000..98ee8f8f8be
--- /dev/null
+++ b/src/mongo/db/catalog/index_timestamp_helper.cpp
@@ -0,0 +1,159 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kIndex
+
+#include "mongo/db/catalog/index_timestamp_helper.h"
+
+#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/logical_clock.h"
+#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+
+namespace {
+bool requiresGhostCommitTimestampForWrite(OperationContext* opCtx, const NamespaceString& nss) {
+ if (!nss.isReplicated()) {
+ return false;
+ }
+
+ auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ if (!replCoord->isReplEnabled()) {
+ return false;
+ }
+
+ // Only storage engines that support recover-to-stable need ghost commit timestamps.
+ if (!opCtx->getServiceContext()->getStorageEngine()->supportsRecoveryTimestamp()) {
+ return false;
+ }
+
+ return true;
+}
+} // namespace
+
+void IndexTimestampHelper::setGhostCommitTimestampForWrite(OperationContext* opCtx,
+ const NamespaceString& nss) {
+ invariant(opCtx->lockState()->inAWriteUnitOfWork());
+
+ if (!requiresGhostCommitTimestampForWrite(opCtx, nss)) {
+ return;
+ }
+
+ // The lastApplied timestamp is the last OpTime that a node has applied. We choose this
+ // timestamp on primaries because it is the most recent point-in-time a reader would be able to
+ // to read at, despite it lagging slighly behind recently committed writes. Because of this lag,
+ // both on primaries and secondaries, the lastApplied time may be older than any newly committed
+ // writes. It is therefore required that all callers holding intent locks and wishing to apply
+ // ghost timestamps also establish a storage engine snapshot for reading that is less than or
+ // equal to the lastApplied timestamp.
+ const auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ const auto commitTimestamp = replCoord->getMyLastAppliedOpTime().getTimestamp();
+
+ const auto mySnapshot = opCtx->recoveryUnit()->getPointInTimeReadTimestamp();
+ // If a lock that blocks writes is held, there can be no uncommitted writes, so there is no
+ // need to check snapshot visibility, especially if a caller is not reading with a timestamp.
+ invariant(mySnapshot || opCtx->lockState()->isCollectionLockedForMode(nss.ns(), MODE_S),
+ "a write-blocking lock is required when applying a ghost timestamp without a read "
+ "timestamp");
+ invariant(!mySnapshot || *mySnapshot <= commitTimestamp,
+ str::stream() << "commit timestamp " << commitTimestamp.toString()
+ << " cannot be older than current read timestamp "
+ << mySnapshot->toString());
+
+ auto status = opCtx->recoveryUnit()->setTimestamp(commitTimestamp);
+ if (status.code() == ErrorCodes::BadValue) {
+ log() << "Temporarily could not apply ghost commit timestamp. " << status.reason();
+ throw WriteConflictException();
+ }
+ LOG(1) << "assigning ghost commit timestamp: " << commitTimestamp.toString();
+
+ fassert(51053, status);
+}
+
+/**
+ * Returns true if writes to the catalog entry for the input namespace require being
+ * timestamped. A ghost write is when the operation is not committed with an oplog entry and
+ * implies the caller will look at the logical clock to choose a time to use.
+ */
+namespace {
+bool requiresGhostCommitTimestampForCatalogWrite(OperationContext* opCtx, NamespaceString nss) {
+ if (!nss.isReplicated() || nss.coll().startsWith("tmp.mr.")) {
+ return false;
+ }
+
+ auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ if (!replCoord->isReplEnabled()) {
+ return false;
+ }
+
+ // If there is a commit timestamp already assigned, there's no need to explicitly assign a
+ // timestamp. This case covers foreground index builds.
+ if (!opCtx->recoveryUnit()->getCommitTimestamp().isNull()) {
+ return false;
+ }
+
+ // Only oplog entries (including a user's `applyOps` command) construct indexes via
+ // `IndexBuilder`. Nodes in `startup` may not yet have initialized the `LogicalClock`, however
+ // index builds during startup replication recovery must be timestamped. These index builds
+ // are foregrounded and timestamp their catalog writes with a "commit timestamp". Nodes in the
+ // oplog application phase of initial sync (`startup2`) must not timestamp index builds before
+ // the `initialDataTimestamp`.
+ const auto memberState = replCoord->getMemberState();
+ if (memberState.startup() || memberState.startup2()) {
+ return false;
+ }
+
+ // When in rollback via refetch, it's valid for all writes to be untimestamped. Additionally,
+ // it's illegal to timestamp a write later than the timestamp associated with the node exiting
+ // the rollback state. This condition opts for being conservative.
+ if (!serverGlobalParams.enableMajorityReadConcern && memberState.rollback()) {
+ return false;
+ }
+
+ return true;
+}
+} // namespace
+
+void IndexTimestampHelper::setGhostCommitTimestampForCatalogWrite(OperationContext* opCtx,
+ const NamespaceString& nss) {
+ invariant(opCtx->lockState()->inAWriteUnitOfWork());
+ if (!requiresGhostCommitTimestampForCatalogWrite(opCtx, nss)) {
+ return;
+ }
+ auto status = opCtx->recoveryUnit()->setTimestamp(
+ LogicalClock::get(opCtx)->getClusterTime().asTimestamp());
+ if (status.code() == ErrorCodes::BadValue) {
+ log() << "Temporarily could not timestamp the index build commit, retrying. "
+ << status.reason();
+ throw WriteConflictException();
+ }
+ fassert(50701, status);
+}
+} // namespace mongo
diff --git a/src/mongo/db/catalog/index_timestamp_helper.h b/src/mongo/db/catalog/index_timestamp_helper.h
new file mode 100644
index 00000000000..af4a746a869
--- /dev/null
+++ b/src/mongo/db/catalog/index_timestamp_helper.h
@@ -0,0 +1,58 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/operation_context.h"
+
+namespace mongo {
+
+namespace IndexTimestampHelper {
+
+/**
+ * If required, sets a timestamp on an active WriteUnitOfWork. A ghost write is when the
+ * operation is not committed with an oplog entry, which may be necessary for certain index
+ * build operations not associated with a unique optime. This implementation uses the
+ * lastApplied OpTime.
+ *
+ * May throw a WriteConflictException if the timestamp chosen is too old.
+ */
+void setGhostCommitTimestampForWrite(OperationContext* opCtx, const NamespaceString& nss);
+
+/**
+ * If required, sets a timestamp on an active WriteUnitOfWork for a catalog write. A ghost write
+ * is when the operation is not committed with an oplog entry, which may be necessary for
+ * certain index catalog operations not associated with a unique optime. This implementation
+ * uses the LogicalClock to timestamp operations.
+ */
+void setGhostCommitTimestampForCatalogWrite(OperationContext* opCtx, const NamespaceString& nss);
+};
+
+} // mongo
diff --git a/src/mongo/db/catalog/multi_index_block.cpp b/src/mongo/db/catalog/multi_index_block.cpp
index fbddcd68dd0..1b4d277ebd9 100644
--- a/src/mongo/db/catalog/multi_index_block.cpp
+++ b/src/mongo/db/catalog/multi_index_block.cpp
@@ -573,7 +573,7 @@ Status MultiIndexBlock::dumpInsertsFromBulk(std::set<RecordId>* dupRecords) {
return Status::OK();
}
-Status MultiIndexBlock::drainBackgroundWrites() {
+Status MultiIndexBlock::drainBackgroundWrites(RecoveryUnit::ReadSource readSource) {
if (State::kAborted == _getState()) {
return {ErrorCodes::IndexBuildAborted,
str::stream() << "Index build aborted: " << _abortReason
@@ -595,7 +595,7 @@ Status MultiIndexBlock::drainBackgroundWrites() {
if (!interceptor)
continue;
- auto status = interceptor->drainWritesIntoIndex(_opCtx, _indexes[i].options);
+ auto status = interceptor->drainWritesIntoIndex(_opCtx, _indexes[i].options, readSource);
if (!status.isOK()) {
return status;
}
diff --git a/src/mongo/db/catalog/multi_index_block.h b/src/mongo/db/catalog/multi_index_block.h
index 525a193bfd5..227ad8cc722 100644
--- a/src/mongo/db/catalog/multi_index_block.h
+++ b/src/mongo/db/catalog/multi_index_block.h
@@ -141,9 +141,13 @@ public:
* before calling commit(), stop writes on the collection by holding a S or X while calling this
* method.
*
+ * When 'readSource' is not kUnset, perform the drain by reading at the timestamp described by
+ * the ReadSource.
+ *
* Must not be in a WriteUnitOfWork.
*/
- Status drainBackgroundWrites();
+ Status drainBackgroundWrites(
+ RecoveryUnit::ReadSource readSource = RecoveryUnit::ReadSource::kUnset);
/**
* Check any constraits that may have been temporarily violated during the index build for
diff --git a/src/mongo/db/commands/create_indexes.cpp b/src/mongo/db/commands/create_indexes.cpp
index f0d2d3d36d8..754e8cce6be 100644
--- a/src/mongo/db/commands/create_indexes.cpp
+++ b/src/mongo/db/commands/create_indexes.cpp
@@ -357,7 +357,9 @@ bool runCreateIndexes(OperationContext* opCtx,
opCtx->recoveryUnit()->abandonSnapshot();
Lock::CollectionLock colLock(opCtx->lockState(), ns.ns(), MODE_IS);
- uassertStatusOK(indexer.drainBackgroundWrites());
+ // Read at a point in time so that the drain, which will timestamp writes at lastApplied,
+ // can never commit writes earlier than its read timestamp.
+ uassertStatusOK(indexer.drainBackgroundWrites(RecoveryUnit::ReadSource::kNoOverlap));
}
if (MONGO_FAIL_POINT(hangAfterIndexBuildFirstDrain)) {
diff --git a/src/mongo/db/index/SConscript b/src/mongo/db/index/SConscript
index 21490053480..50b53e80630 100644
--- a/src/mongo/db/index/SConscript
+++ b/src/mongo/db/index/SConscript
@@ -146,6 +146,7 @@ env.Library(
'duplicate_key_tracker',
],
LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/catalog/index_timestamp_helper',
'$BUILD_DIR/mongo/db/multi_key_path_tracker',
'index_access_methods',
],
diff --git a/src/mongo/db/index/index_build_interceptor.cpp b/src/mongo/db/index/index_build_interceptor.cpp
index 41fea74abba..715c4c60fb1 100644
--- a/src/mongo/db/index/index_build_interceptor.cpp
+++ b/src/mongo/db/index/index_build_interceptor.cpp
@@ -34,7 +34,9 @@
#include "mongo/db/index/index_build_interceptor.h"
#include "mongo/bson/bsonobj.h"
+#include "mongo/db/catalog/index_timestamp_helper.h"
#include "mongo/db/catalog_raii.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/index/index_access_method.h"
@@ -90,9 +92,26 @@ const std::string& IndexBuildInterceptor::getConstraintViolationsTableIdent() co
Status IndexBuildInterceptor::drainWritesIntoIndex(OperationContext* opCtx,
- const InsertDeleteOptions& options) {
+ const InsertDeleteOptions& options,
+ RecoveryUnit::ReadSource readSource) {
invariant(!opCtx->lockState()->inAWriteUnitOfWork());
+ // Callers may request to read at a specific timestamp so that no drained writes are timestamped
+ // earlier than their original write timestamp. Also ensure that leaving this function resets
+ // the ReadSource to its original value.
+ auto resetReadSourceGuard =
+ makeGuard([ opCtx, prevReadSource = opCtx->recoveryUnit()->getTimestampReadSource() ] {
+ opCtx->recoveryUnit()->abandonSnapshot();
+ opCtx->recoveryUnit()->setTimestampReadSource(prevReadSource);
+ });
+
+ if (readSource != RecoveryUnit::ReadSource::kUnset) {
+ opCtx->recoveryUnit()->abandonSnapshot();
+ opCtx->recoveryUnit()->setTimestampReadSource(readSource);
+ } else {
+ resetReadSourceGuard.dismiss();
+ }
+
// These are used for logging only.
int64_t totalDeleted = 0;
int64_t totalInserted = 0;
@@ -174,23 +193,36 @@ Status IndexBuildInterceptor::drainWritesIntoIndex(OperationContext* opCtx,
invariant(!batch.empty());
+ cursor->save();
+
// If we are here, either we have reached the end of the table or the batch is full, so
// insert everything in one WriteUnitOfWork, and delete each inserted document from the side
// writes table.
- WriteUnitOfWork wuow(opCtx);
- for (auto& operation : batch) {
- auto status =
- _applyWrite(opCtx, operation.second, options, &totalInserted, &totalDeleted);
- if (!status.isOK()) {
- return status;
+ auto status = writeConflictRetry(opCtx, "index build drain", _indexCatalogEntry->ns(), [&] {
+ WriteUnitOfWork wuow(opCtx);
+ for (auto& operation : batch) {
+ auto status =
+ _applyWrite(opCtx, operation.second, options, &totalInserted, &totalDeleted);
+ if (!status.isOK()) {
+ return status;
+ }
+
+ // Delete the document from the table as soon as it has been inserted into the
+ // index. This ensures that no key is ever inserted twice and no keys are skipped.
+ _sideWritesTable->rs()->deleteRecord(opCtx, operation.first);
}
- // Delete the document from the table as soon as it has been inserted into the index.
- // This ensures that no key is ever inserted twice and no keys are skipped.
- _sideWritesTable->rs()->deleteRecord(opCtx, operation.first);
+ // For rollback to work correctly, these writes need to be timestamped. The actual time
+ // is not important, as long as it not older than the most recent visible side write.
+ IndexTimestampHelper::setGhostCommitTimestampForWrite(
+ opCtx, NamespaceString(_indexCatalogEntry->ns()));
+
+ wuow.commit();
+ return Status::OK();
+ });
+ if (!status.isOK()) {
+ return status;
}
- cursor->save();
- wuow.commit();
progress->hit(batch.size());
@@ -250,7 +282,10 @@ Status IndexBuildInterceptor::_applyWrite(OperationContext* opCtx,
}
}
- *keysInserted += result.numInserted;
+ int64_t numInserted = result.numInserted;
+ *keysInserted += numInserted;
+ opCtx->recoveryUnit()->onRollback(
+ [keysInserted, numInserted] { *keysInserted -= numInserted; });
} else {
invariant(opType == Op::kDelete);
DEV invariant(strcmp(operation.getStringField("op"), "d") == 0);
@@ -262,6 +297,8 @@ Status IndexBuildInterceptor::_applyWrite(OperationContext* opCtx,
}
*keysDeleted += numDeleted;
+ opCtx->recoveryUnit()->onRollback(
+ [keysDeleted, numDeleted] { *keysDeleted -= numDeleted; });
}
return Status::OK();
}
diff --git a/src/mongo/db/index/index_build_interceptor.h b/src/mongo/db/index/index_build_interceptor.h
index 39db95418f1..e4f2f2b3027 100644
--- a/src/mongo/db/index/index_build_interceptor.h
+++ b/src/mongo/db/index/index_build_interceptor.h
@@ -88,9 +88,16 @@ public:
*
* This is resumable, so subsequent calls will start the scan at the record immediately
* following the last inserted record from a previous call to drainWritesIntoIndex.
+ *
+ * When 'readSource' is not kUnset, perform the drain by reading at the timestamp described by
+ * the ReadSource. This will always reset the ReadSource to its original value before returning.
+ * The drain otherwise reads at the pre-existing ReadSource on the RecoveryUnit. This may be
+ * necessary by callers that can only guarantee consistency of data up to a certain point in
+ * time.
*/
- Status drainWritesIntoIndex(OperationContext* opCtx, const InsertDeleteOptions& options);
-
+ Status drainWritesIntoIndex(OperationContext* opCtx,
+ const InsertDeleteOptions& options,
+ RecoveryUnit::ReadSource readSource);
/**
* Returns 'true' if there are no visible records remaining to be applied from the side writes
diff --git a/src/mongo/db/index_builder.cpp b/src/mongo/db/index_builder.cpp
index fe252bc09ba..754848ae5cd 100644
--- a/src/mongo/db/index_builder.cpp
+++ b/src/mongo/db/index_builder.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/catalog/database.h"
#include "mongo/db/catalog/database_holder.h"
+#include "mongo/db/catalog/index_timestamp_helper.h"
#include "mongo/db/catalog/multi_index_block.h"
#include "mongo/db/client.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
@@ -62,48 +63,6 @@ namespace {
const StringData kIndexesFieldName = "indexes"_sd;
const StringData kCommandName = "createIndexes"_sd;
-/**
- * Returns true if writes to the catalog entry for the input namespace require being
- * timestamped. A ghost write is when the operation is not committed with an oplog entry and
- * implies the caller will look at the logical clock to choose a time to use.
- */
-bool requiresGhostCommitTimestamp(OperationContext* opCtx, NamespaceString nss) {
- if (!nss.isReplicated() || nss.coll().startsWith("tmp.mr.")) {
- return false;
- }
-
- auto replCoord = repl::ReplicationCoordinator::get(opCtx);
- if (!replCoord->getSettings().usingReplSets()) {
- return false;
- }
-
- // If there is a commit timestamp already assigned, there's no need to explicitly assign a
- // timestamp. This case covers foreground index builds.
- if (!opCtx->recoveryUnit()->getCommitTimestamp().isNull()) {
- return false;
- }
-
- // Only oplog entries (including a user's `applyOps` command) construct indexes via
- // `IndexBuilder`. Nodes in `startup` may not yet have initialized the `LogicalClock`, however
- // index builds during startup replication recovery must be timestamped. These index builds
- // are foregrounded and timestamp their catalog writes with a "commit timestamp". Nodes in the
- // oplog application phase of initial sync (`startup2`) must not timestamp index builds before
- // the `initialDataTimestamp`.
- const auto memberState = replCoord->getMemberState();
- if (memberState.startup() || memberState.startup2()) {
- return false;
- }
-
- // When in rollback via refetch, it's valid for all writes to be untimestamped. Additionally,
- // it's illegal to timestamp a write later than the timestamp associated with the node exiting
- // the rollback state. This condition opts for being conservative.
- if (!serverGlobalParams.enableMajorityReadConcern && memberState.rollback()) {
- return false;
- }
-
- return true;
-}
-
// Synchronization tools when replication spawns a background index in a new thread.
// The bool is 'true' when a new background index has started in a new thread but the
// parent thread has not yet synchronized with it.
@@ -305,44 +264,47 @@ Status IndexBuilder::_build(OperationContext* opCtx,
return status;
}
- {
- // Perform the first drain while holding an intent lock.
- Lock::CollectionLock collLock(opCtx->lockState(), ns.ns(), MODE_IX);
- status = indexer.drainBackgroundWrites();
- }
- if (!status.isOK()) {
- return status;
- }
+ if (buildInBackground) {
+ {
+ // Perform the first drain while holding an intent lock.
+ Lock::CollectionLock collLock(opCtx->lockState(), ns.ns(), MODE_IX);
- // Perform the second drain while stopping inserts into the collection.
- {
- Lock::CollectionLock colLock(opCtx->lockState(), ns.ns(), MODE_S);
- status = indexer.drainBackgroundWrites();
- }
- if (!status.isOK()) {
- return status;
- }
+ // Read at a point in time so that the drain, which will timestamp writes at
+ // lastApplied, can never commit writes earlier than its read timestamp.
+ status = indexer.drainBackgroundWrites(RecoveryUnit::ReadSource::kNoOverlap);
+ }
+ if (!status.isOK()) {
+ return status;
+ }
+
+ // Perform the second drain while stopping inserts into the collection.
+ {
+ Lock::CollectionLock colLock(opCtx->lockState(), ns.ns(), MODE_S);
+ status = indexer.drainBackgroundWrites();
+ }
+ if (!status.isOK()) {
+ return status;
+ }
- if (buildInBackground) {
opCtx->recoveryUnit()->abandonSnapshot();
UninterruptibleLockGuard noInterrupt(opCtx->lockState());
dbLock->relockWithMode(MODE_X);
- }
-
- // Perform the third and final drain after releasing a shared lock and reacquiring an
- // exclusive lock on the database.
- status = indexer.drainBackgroundWrites();
- if (!status.isOK()) {
- return status;
- }
- // Only perform constraint checking when enforced (on primaries).
- if (_indexConstraints == IndexConstraints::kEnforce) {
- status = indexer.checkConstraints();
+ // Perform the third and final drain after releasing a shared lock and reacquiring an
+ // exclusive lock on the database.
+ status = indexer.drainBackgroundWrites();
if (!status.isOK()) {
return status;
}
+
+ // Only perform constraint checking when enforced (on primaries).
+ if (_indexConstraints == IndexConstraints::kEnforce) {
+ status = indexer.checkConstraints();
+ if (!status.isOK()) {
+ return status;
+ }
+ }
}
status = writeConflictRetry(opCtx, "Commit index build", ns.ns(), [opCtx, coll, &indexer, &ns] {
@@ -355,16 +317,7 @@ Status IndexBuilder::_build(OperationContext* opCtx,
return status;
}
- if (requiresGhostCommitTimestamp(opCtx, ns)) {
- auto status = opCtx->recoveryUnit()->setTimestamp(
- LogicalClock::get(opCtx)->getClusterTime().asTimestamp());
- if (status.code() == ErrorCodes::BadValue) {
- log() << "Temporarily could not timestamp the index build commit, retrying. "
- << status.reason();
- throw WriteConflictException();
- }
- fassert(50701, status);
- }
+ IndexTimestampHelper::setGhostCommitTimestampForCatalogWrite(opCtx, ns);
wunit.commit();
return Status::OK();
});
diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.cpp b/src/mongo/db/repl/collection_bulk_loader_impl.cpp
index bddf1243ec4..f29692a1f0a 100644
--- a/src/mongo/db/repl/collection_bulk_loader_impl.cpp
+++ b/src/mongo/db/repl/collection_bulk_loader_impl.cpp
@@ -189,36 +189,16 @@ Status CollectionBulkLoaderImpl::commit() {
}
if (_idIndexBlock) {
- // Delete dups.
+ // Gather RecordIds for uninserted duplicate keys to delete.
std::set<RecordId> dups;
// Do not do inside a WriteUnitOfWork (required by dumpInsertsFromBulk).
auto status = _idIndexBlock->dumpInsertsFromBulk(&dups);
if (!status.isOK()) {
return status;
}
- for (auto&& it : dups) {
- writeConflictRetry(
- _opCtx.get(), "CollectionBulkLoaderImpl::commit", _nss.ns(), [this, &it] {
- WriteUnitOfWork wunit(_opCtx.get());
- _autoColl->getCollection()->deleteDocument(_opCtx.get(),
- kUninitializedStmtId,
- it,
- nullptr /** OpDebug **/,
- false /* fromMigrate */,
- true /* noWarn */);
- wunit.commit();
- });
- }
- status = _idIndexBlock->drainBackgroundWrites();
- if (!status.isOK()) {
- return status;
- }
- status = _idIndexBlock->checkConstraints();
- if (!status.isOK()) {
- return status;
- }
- // Commit _id index, without dups.
+ // Commit _id index without duplicate keys even though there may still be documents
+ // with duplicate _ids. These duplicates will be deleted in the following step.
status = writeConflictRetry(
_opCtx.get(), "CollectionBulkLoaderImpl::commit", _nss.ns(), [this] {
WriteUnitOfWork wunit(_opCtx.get());
@@ -232,6 +212,22 @@ Status CollectionBulkLoaderImpl::commit() {
if (!status.isOK()) {
return status;
}
+
+ // Delete duplicate records after committing the index so these writes are not
+ // intercepted by the still in-progress index builder.
+ for (auto&& it : dups) {
+ writeConflictRetry(
+ _opCtx.get(), "CollectionBulkLoaderImpl::commit", _nss.ns(), [this, &it] {
+ WriteUnitOfWork wunit(_opCtx.get());
+ _autoColl->getCollection()->deleteDocument(_opCtx.get(),
+ kUninitializedStmtId,
+ it,
+ nullptr /** OpDebug **/,
+ false /* fromMigrate */,
+ true /* noWarn */);
+ wunit.commit();
+ });
+ }
}
_stats.endBuildingIndexes = Date_t::now();
LOG(2) << "Done creating indexes for ns: " << _nss.ns() << ", stats: " << _stats.toString();
diff --git a/src/mongo/db/storage/recovery_unit.h b/src/mongo/db/storage/recovery_unit.h
index a87f14a7091..5d5864ccc8c 100644
--- a/src/mongo/db/storage/recovery_unit.h
+++ b/src/mongo/db/storage/recovery_unit.h
@@ -190,6 +190,7 @@ public:
* Returns the Timestamp being used by this recovery unit or boost::none if not reading from
* a point in time. Any point in time returned will reflect one of the following:
* - when using ReadSource::kProvided, the timestamp provided.
+ * - when using ReadSource::kNoOverlap, the timestamp chosen by the storage engine.
* - when using ReadSource::kLastAppliedSnapshot, the timestamp chosen using the storage
* engine's last applied timestamp.
* - when using ReadSource::kAllCommittedSnapshot, the timestamp chosen using the storage
@@ -294,7 +295,7 @@ public:
}
/**
- * The ReadSource indicates which exteral or provided timestamp to read from for future
+ * The ReadSource indicates which external or provided timestamp to read from for future
* transactions.
*/
enum ReadSource {
@@ -311,6 +312,10 @@ public:
*/
kMajorityCommitted,
/**
+ * Read from the latest timestamp where no future transactions will commit at or before.
+ */
+ kNoOverlap,
+ /**
* Read from the last applied timestamp. New transactions start at the most up-to-date
* timestamp.
*/
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
index ab5dcb6ff26..ea28f53be95 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
@@ -388,6 +388,10 @@ void WiredTigerRecoveryUnit::_txnClose(bool commit) {
int wtRet;
if (commit) {
if (!_commitTimestamp.isNull()) {
+ // There is currently no scenario where it is intentional to commit before the current
+ // read timestamp.
+ invariant(_readAtTimestamp.isNull() || _commitTimestamp >= _readAtTimestamp);
+
const std::string conf = "commit_timestamp=" + integerToHex(_commitTimestamp.asULL());
invariantWTOK(s->timestamp_transaction(s, conf.c_str()));
_isTimestamped = true;
@@ -456,40 +460,54 @@ boost::optional<Timestamp> WiredTigerRecoveryUnit::getPointInTimeReadTimestamp()
// this timestamp to inform visiblity of operations, it is therefore necessary to open a
// transaction to establish a read timestamp, but only for ReadSources that are expected to have
// read timestamps.
- if (_timestampReadSource == ReadSource::kUnset ||
- _timestampReadSource == ReadSource::kNoTimestamp) {
- return boost::none;
- }
-
- // This ReadSource depends on a previous call to obtainMajorityCommittedSnapshot() and does not
- // require an open transaction to return a valid timestamp.
- if (_timestampReadSource == ReadSource::kMajorityCommitted) {
- invariant(!_majorityCommittedSnapshot.isNull());
- return _majorityCommittedSnapshot;
- }
-
- // The read timestamp is set by the user and does not require a transaction to be open.
- if (_timestampReadSource == ReadSource::kProvided) {
- invariant(!_readAtTimestamp.isNull());
- return _readAtTimestamp;
+ switch (_timestampReadSource) {
+ case ReadSource::kUnset:
+ case ReadSource::kNoTimestamp:
+ return boost::none;
+ case ReadSource::kMajorityCommitted:
+ // This ReadSource depends on a previous call to obtainMajorityCommittedSnapshot() and
+ // does not require an open transaction to return a valid timestamp.
+ invariant(!_majorityCommittedSnapshot.isNull());
+ return _majorityCommittedSnapshot;
+ case ReadSource::kProvided:
+ // The read timestamp is set by the user and does not require a transaction to be open.
+ invariant(!_readAtTimestamp.isNull());
+ return _readAtTimestamp;
+
+ // The following ReadSources can only establish a read timestamp when a transaction is
+ // opened.
+ case ReadSource::kNoOverlap:
+ case ReadSource::kLastApplied:
+ case ReadSource::kLastAppliedSnapshot:
+ case ReadSource::kAllCommittedSnapshot:
+ break;
}
- // The following ReadSources can only establish a read timestamp when a transaction is opened.
+ // Ensure a transaction is opened.
getSession();
- if (_timestampReadSource == ReadSource::kLastAppliedSnapshot ||
- _timestampReadSource == ReadSource::kAllCommittedSnapshot) {
- invariant(!_readAtTimestamp.isNull());
- return _readAtTimestamp;
- }
-
- // The lastApplied timestamp is not always available, so it is not possible to invariant that
- // it exists as other ReadSources do.
- if (_timestampReadSource == ReadSource::kLastApplied && !_readAtTimestamp.isNull()) {
- return _readAtTimestamp;
+ switch (_timestampReadSource) {
+ case ReadSource::kLastApplied:
+ // The lastApplied timestamp is not always available, so it is not possible to invariant
+ // that it exists as other ReadSources do.
+ if (!_readAtTimestamp.isNull()) {
+ return _readAtTimestamp;
+ }
+ return boost::none;
+ case ReadSource::kNoOverlap:
+ case ReadSource::kLastAppliedSnapshot:
+ case ReadSource::kAllCommittedSnapshot:
+ invariant(!_readAtTimestamp.isNull());
+ return _readAtTimestamp;
+
+ // The follow ReadSources returned values in the first switch block.
+ case ReadSource::kUnset:
+ case ReadSource::kNoTimestamp:
+ case ReadSource::kMajorityCommitted:
+ case ReadSource::kProvided:
+ MONGO_UNREACHABLE;
}
-
- return boost::none;
+ MONGO_UNREACHABLE;
}
void WiredTigerRecoveryUnit::_txnOpen() {
@@ -531,6 +549,10 @@ void WiredTigerRecoveryUnit::_txnOpen() {
}
break;
}
+ case ReadSource::kNoOverlap: {
+ _readAtTimestamp = _beginTransactionAtNoOverlapTimestamp(session);
+ break;
+ }
case ReadSource::kAllCommittedSnapshot: {
if (_readAtTimestamp.isNull()) {
_readAtTimestamp = _beginTransactionAtAllCommittedTimestamp(session);
@@ -576,12 +598,60 @@ Timestamp WiredTigerRecoveryUnit::_beginTransactionAtAllCommittedTimestamp(WT_SE
// Since this is not in a critical section, we might have rounded to oldest between
// calling getAllCommitted and setTimestamp. We need to get the actual read timestamp we
// used.
+ auto readTimestamp = _getTransactionReadTimestamp(session);
+ txnOpen.done();
+ return readTimestamp;
+}
+
+Timestamp WiredTigerRecoveryUnit::_beginTransactionAtNoOverlapTimestamp(WT_SESSION* session) {
+
+ auto lastApplied = _sessionCache->snapshotManager().getLocalSnapshot();
+ Timestamp allCommitted = Timestamp(_oplogManager->fetchAllCommittedValue(session->connection));
+
+ // When using timestamps for reads and writes, it's important that readers and writers don't
+ // overlap with the timestamps they use. In other words, at any point in the system there should
+ // be a timestamp T such that writers only commit at times greater than T and readers only read
+ // at, or earlier than T. This time T is called the no-overlap point. Using the `kNoOverlap`
+ // ReadSource will compute the most recent known time that is safe to read at.
+
+ // The no-overlap point is computed as the minimum of the storage engine's all-committed time
+ // and replication's last applied time. On primaries, the last applied time is updated as
+ // transactions commit, which is not necessarily in the order they appear in the oplog. Thus
+ // the all-committed time is an appropriate value to read at.
+
+ // On secondaries, however, the all-committed time, as computed by the storage engine, can
+ // advance before oplog application completes a batch. This is because the all-committed time
+ // is only computed correctly if the storage engine is informed of commit timestamps in
+ // increasing order. Because oplog application processes a batch of oplog entries out of order,
+ // the timestamping requirement is not satisfied. Secondaries, however, only update the last
+ // applied time after a batch completes. Thus last applied is a valid no-overlap point on
+ // secondaries.
+
+ // By taking the minimum of the two values, storage can compute a legal time to read at without
+ // knowledge of the replication state. The no-overlap point is the minimum of the all-committed
+ // time, which represents the point where no transactions will commit any earlier, and
+ // lastApplied, which represents the highest optime a node has applied, a point no readers
+ // should read afterward.
+ Timestamp readTimestamp = (lastApplied) ? std::min(*lastApplied, allCommitted) : allCommitted;
+
+ WiredTigerBeginTxnBlock txnOpen(session, _ignorePrepared);
+ auto status =
+ txnOpen.setTimestamp(readTimestamp, WiredTigerBeginTxnBlock::RoundToOldest::kRound);
+ fassert(51061, status);
+
+ // We might have rounded to oldest between calling getAllCommitted and setTimestamp. We need to
+ // get the actual read timestamp we used.
+ readTimestamp = _getTransactionReadTimestamp(session);
+ txnOpen.done();
+ return readTimestamp;
+}
+
+Timestamp WiredTigerRecoveryUnit::_getTransactionReadTimestamp(WT_SESSION* session) {
char buf[(2 * 8 /*bytes in hex*/) + 1 /*nul terminator*/];
auto wtstatus = session->query_timestamp(session, buf, "get=read");
invariantWTOK(wtstatus);
uint64_t read_timestamp;
fassert(50949, parseNumberFromStringWithBase(buf, 16, &read_timestamp));
- txnOpen.done();
return Timestamp(read_timestamp);
}
@@ -595,6 +665,10 @@ Status WiredTigerRecoveryUnit::setTimestamp(Timestamp timestamp) {
str::stream() << "Commit timestamp set to " << _commitTimestamp.toString()
<< " and trying to set WUOW timestamp to "
<< timestamp.toString());
+ invariant(_readAtTimestamp.isNull() || timestamp >= _readAtTimestamp,
+ str::stream() << "future commit timestamp " << timestamp.toString()
+ << " cannot be older than read timestamp "
+ << _readAtTimestamp.toString());
_lastTimestampSet = timestamp;
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
index e83643a77b5..72e912233ac 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
@@ -257,6 +257,17 @@ private:
Timestamp _beginTransactionAtAllCommittedTimestamp(WT_SESSION* session);
/**
+ * Starts a transaction at the no-overlap timestamp. Returns the timestamp the transaction
+ * was started at.
+ */
+ Timestamp _beginTransactionAtNoOverlapTimestamp(WT_SESSION* session);
+
+ /**
+ * Returns the timestamp at which the current transaction is reading.
+ */
+ Timestamp _getTransactionReadTimestamp(WT_SESSION* session);
+
+ /**
* Transitions to new state.
*/
void _setState(State newState);
diff --git a/src/mongo/dbtests/SConscript b/src/mongo/dbtests/SConscript
index af471589749..927842efd57 100644
--- a/src/mongo/dbtests/SConscript
+++ b/src/mongo/dbtests/SConscript
@@ -125,6 +125,7 @@ dbtest = env.Program(
"$BUILD_DIR/mongo/db/commands/test_commands_enabled",
"$BUILD_DIR/mongo/db/concurrency/deferred_writer",
"$BUILD_DIR/mongo/db/index/index_access_methods",
+ "$BUILD_DIR/mongo/db/index/index_build_interceptor",
"$BUILD_DIR/mongo/db/logical_clock",
"$BUILD_DIR/mongo/db/logical_time_metadata_hook",
"$BUILD_DIR/mongo/db/pipeline/document_value_test_util",
diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp
index 5787ef486ff..6b31f842b37 100644
--- a/src/mongo/dbtests/storage_timestamp_tests.cpp
+++ b/src/mongo/dbtests/storage_timestamp_tests.cpp
@@ -47,6 +47,7 @@
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/global_settings.h"
+#include "mongo/db/index/index_build_interceptor.h"
#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/logical_clock.h"
#include "mongo/db/multi_key_path_tracker.h"
@@ -1876,6 +1877,135 @@ public:
}
};
+template <bool SimulatePrimary>
+class TimestampIndexBuildDrain : public StorageTimestampTest {
+public:
+ void run() {
+ const bool SimulateSecondary = !SimulatePrimary;
+ if (SimulateSecondary) {
+ // The MemberState is inspected during index builds to use a "ghost" write to timestamp
+ // index completion.
+ ASSERT_OK(_coordinatorMock->setFollowerMode({repl::MemberState::MS::RS_SECONDARY}));
+ }
+
+ NamespaceString nss("unittests.timestampIndexBuildDrain");
+ reset(nss);
+
+ AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_X, LockMode::MODE_X);
+
+ // Build an index on `{a: 1}`.
+ MultiIndexBlock indexer(_opCtx, autoColl.getCollection());
+ const LogicalTime beforeIndexBuild = _clock->reserveTicks(2);
+ BSONObj indexInfoObj;
+ {
+ // Primaries do not have a wrapping `TimestampBlock`; secondaries do.
+ const Timestamp commitTimestamp =
+ SimulatePrimary ? Timestamp::min() : beforeIndexBuild.addTicks(1).asTimestamp();
+ TimestampBlock tsBlock(_opCtx, commitTimestamp);
+
+ // Secondaries will also be in an `UnreplicatedWritesBlock` that prevents the `logOp`
+ // from making creating an entry.
+ boost::optional<repl::UnreplicatedWritesBlock> unreplicated;
+ if (SimulateSecondary) {
+ unreplicated.emplace(_opCtx);
+ }
+
+ auto swIndexInfoObj = indexer.init({BSON("v" << 2 << "unique" << true << "name"
+ << "a_1"
+ << "ns"
+ << nss.ns()
+ << "key"
+ << BSON("a" << 1))});
+ ASSERT_OK(swIndexInfoObj.getStatus());
+ indexInfoObj = std::move(swIndexInfoObj.getValue()[0]);
+ }
+
+ const LogicalTime afterIndexInit = _clock->reserveTicks(1);
+
+ // Insert a document that will be intercepted and need to be drained. This timestamp will
+ // become the lastApplied time.
+ const LogicalTime firstInsert = _clock->reserveTicks(1);
+ {
+ WriteUnitOfWork wuow(_opCtx);
+ insertDocument(autoColl.getCollection(),
+ InsertStatement(BSON("_id" << 0 << "a" << 1),
+ firstInsert.asTimestamp(),
+ presentTerm));
+ wuow.commit();
+ ASSERT_EQ(1, itCount(autoColl.getCollection()));
+ }
+
+ // Index build drain will timestamp writes from the side table into the index with the
+ // lastApplied timestamp. This is because these writes are not associated with any specific
+ // oplog entry.
+ ASSERT_EQ(repl::ReplicationCoordinator::get(getGlobalServiceContext())
+ ->getMyLastAppliedOpTime()
+ .getTimestamp(),
+ firstInsert.asTimestamp());
+
+ ASSERT_OK(indexer.drainBackgroundWrites());
+
+ auto indexCatalog = autoColl.getCollection()->getIndexCatalog();
+ const IndexCatalogEntry* buildingIndex = indexCatalog->getEntry(
+ indexCatalog->findIndexByName(_opCtx, "a_1", /* includeUnfinished */ true));
+ ASSERT(buildingIndex);
+
+ {
+ // Before the drain, there are no writes to apply.
+ OneOffRead oor(_opCtx, afterIndexInit.asTimestamp());
+ ASSERT_TRUE(buildingIndex->indexBuildInterceptor()->areAllWritesApplied(_opCtx));
+ }
+
+ // Note: In this case, we can't observe a state where all writes are not applied, because
+ // the index build drain effectively rewrites history by retroactively committing the drain
+ // at the same time as the first insert, meaning there is no point-in-time with undrained
+ // writes. This is fine, as long as the drain does not commit at a time before this insert.
+
+ {
+ // At time of the first insert, all writes are applied.
+ OneOffRead oor(_opCtx, firstInsert.asTimestamp());
+ ASSERT_TRUE(buildingIndex->indexBuildInterceptor()->areAllWritesApplied(_opCtx));
+ }
+
+ // Insert a second document that will be intercepted and need to be drained.
+ const LogicalTime secondInsert = _clock->reserveTicks(1);
+ {
+ WriteUnitOfWork wuow(_opCtx);
+ insertDocument(autoColl.getCollection(),
+ InsertStatement(BSON("_id" << 1 << "a" << 2),
+ secondInsert.asTimestamp(),
+ presentTerm));
+ wuow.commit();
+ ASSERT_EQ(2, itCount(autoColl.getCollection()));
+ }
+
+ // Advance the lastApplied optime to observe a point before the drain where there are
+ // un-drained writes.
+ const LogicalTime afterSecondInsert = _clock->reserveTicks(1);
+ setReplCoordAppliedOpTime(repl::OpTime(afterSecondInsert.asTimestamp(), presentTerm));
+
+ ASSERT_OK(indexer.drainBackgroundWrites());
+
+ {
+ // At time of the second insert, there are un-drained writes.
+ OneOffRead oor(_opCtx, secondInsert.asTimestamp());
+ ASSERT_FALSE(buildingIndex->indexBuildInterceptor()->areAllWritesApplied(_opCtx));
+ }
+
+ {
+ // After the second insert, also the lastApplied time, all writes are applied.
+ OneOffRead oor(_opCtx, afterSecondInsert.asTimestamp());
+ ASSERT_TRUE(buildingIndex->indexBuildInterceptor()->areAllWritesApplied(_opCtx));
+ }
+
+ {
+ WriteUnitOfWork wuow(_opCtx);
+ ASSERT_OK(indexer.commit());
+ wuow.commit();
+ }
+ }
+};
+
class TimestampMultiIndexBuilds : public StorageTimestampTest {
public:
void run() {
@@ -2689,6 +2819,8 @@ public:
// TimestampIndexBuilds<SimulatePrimary>
add<TimestampIndexBuilds<false>>();
add<TimestampIndexBuilds<true>>();
+ add<TimestampIndexBuildDrain<false>>();
+ add<TimestampIndexBuildDrain<true>>();
add<TimestampMultiIndexBuilds>();
add<TimestampMultiIndexBuildsDuringRename>();
add<TimestampIndexDrops>();