diff options
20 files changed, 624 insertions, 158 deletions
diff --git a/jstests/noPassthrough/hybrid_unique_index_with_updates.js b/jstests/noPassthrough/hybrid_unique_index_with_updates.js index b3dc94ca4fd..7aa7bbe21f2 100644 --- a/jstests/noPassthrough/hybrid_unique_index_with_updates.js +++ b/jstests/noPassthrough/hybrid_unique_index_with_updates.js @@ -3,14 +3,18 @@ * of hybrid unique index builds. This test inserts a duplicate document at different phases of an * index build to confirm that the resulting behavior is failure. * - * @tags: [requires_document_locking] + * @tags: [requires_document_locking, requires_replication] */ (function() { "use strict"; load("jstests/libs/check_log.js"); - let conn = MongoRunner.runMongod(); + let replSetTest = new ReplSetTest({name: "hybrid_updates", nodes: 2}); + replSetTest.startSet(); + replSetTest.initiate(); + + let conn = replSetTest.getPrimary(); let testDB = conn.getDB('test'); // Run 'func' while failpoint is enabled. @@ -53,7 +57,7 @@ /** * Run a background index build on a unique index under different configurations. Introduce * duplicate keys on the index that may cause it to fail or succeed, depending on the following - * optional parmeters: + * optional parameters: * { * // Which operation used to introduce a duplicate key. * operation {string}: "insert", "update" @@ -153,5 +157,5 @@ runTest({operation: "update", resolve: false, phase: i}); } - MongoRunner.stopMongod(conn); + replSetTest.stopSet(); })(); diff --git a/src/mongo/db/catalog/SConscript b/src/mongo/db/catalog/SConscript index 91499778d77..6b99eceab39 100644 --- a/src/mongo/db/catalog/SConscript +++ b/src/mongo/db/catalog/SConscript @@ -208,6 +208,22 @@ env.Library( ) env.Library( + target='index_timestamp_helper', + source=[ + "index_timestamp_helper.cpp", + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/concurrency/write_conflict_exception', + '$BUILD_DIR/mongo/db/logical_clock', + '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', + '$BUILD_DIR/mongo/db/server_options_core', + ] +) + +env.Library( target='collection', source=[ "collection.cpp", @@ -257,6 +273,7 @@ env.Library( ], LIBDEPS=[ 'collection', + 'index_timestamp_helper', '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/audit', '$BUILD_DIR/mongo/db/background', diff --git a/src/mongo/db/catalog/index_catalog.h b/src/mongo/db/catalog/index_catalog.h index d79e429cf6c..4a1a28f4b51 100644 --- a/src/mongo/db/catalog/index_catalog.h +++ b/src/mongo/db/catalog/index_catalog.h @@ -476,5 +476,4 @@ public: virtual void indexBuildSuccess(OperationContext* opCtx, IndexCatalogEntry* index) = 0; }; - } // namespace mongo diff --git a/src/mongo/db/catalog/index_catalog_entry.h b/src/mongo/db/catalog/index_catalog_entry.h index ef6628cfffc..8f9e6ab382e 100644 --- a/src/mongo/db/catalog/index_catalog_entry.h +++ b/src/mongo/db/catalog/index_catalog_entry.h @@ -79,6 +79,8 @@ public: virtual IndexBuildInterceptor* indexBuildInterceptor() = 0; + virtual const IndexBuildInterceptor* indexBuildInterceptor() const = 0; + virtual void setIndexBuildInterceptor(IndexBuildInterceptor* interceptor) = 0; virtual const Ordering& ordering() const = 0; diff --git a/src/mongo/db/catalog/index_catalog_entry_impl.h b/src/mongo/db/catalog/index_catalog_entry_impl.h index 7460b765b34..dd1443aebcc 100644 --- a/src/mongo/db/catalog/index_catalog_entry_impl.h +++ b/src/mongo/db/catalog/index_catalog_entry_impl.h @@ -98,6 +98,10 @@ public: return _indexBuildInterceptor; } + const IndexBuildInterceptor* indexBuildInterceptor() const final { + return _indexBuildInterceptor; + } + void setIndexBuildInterceptor(IndexBuildInterceptor* interceptor) final { _indexBuildInterceptor = interceptor; } diff --git a/src/mongo/db/catalog/index_timestamp_helper.cpp b/src/mongo/db/catalog/index_timestamp_helper.cpp new file mode 100644 index 00000000000..98ee8f8f8be --- /dev/null +++ b/src/mongo/db/catalog/index_timestamp_helper.cpp @@ -0,0 +1,159 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kIndex + +#include "mongo/db/catalog/index_timestamp_helper.h" + +#include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/logical_clock.h" +#include "mongo/db/repl/replication_coordinator.h" +#include "mongo/util/log.h" + +namespace mongo { + +namespace { +bool requiresGhostCommitTimestampForWrite(OperationContext* opCtx, const NamespaceString& nss) { + if (!nss.isReplicated()) { + return false; + } + + auto replCoord = repl::ReplicationCoordinator::get(opCtx); + if (!replCoord->isReplEnabled()) { + return false; + } + + // Only storage engines that support recover-to-stable need ghost commit timestamps. + if (!opCtx->getServiceContext()->getStorageEngine()->supportsRecoveryTimestamp()) { + return false; + } + + return true; +} +} // namespace + +void IndexTimestampHelper::setGhostCommitTimestampForWrite(OperationContext* opCtx, + const NamespaceString& nss) { + invariant(opCtx->lockState()->inAWriteUnitOfWork()); + + if (!requiresGhostCommitTimestampForWrite(opCtx, nss)) { + return; + } + + // The lastApplied timestamp is the last OpTime that a node has applied. We choose this + // timestamp on primaries because it is the most recent point-in-time a reader would be able to + // to read at, despite it lagging slighly behind recently committed writes. Because of this lag, + // both on primaries and secondaries, the lastApplied time may be older than any newly committed + // writes. It is therefore required that all callers holding intent locks and wishing to apply + // ghost timestamps also establish a storage engine snapshot for reading that is less than or + // equal to the lastApplied timestamp. + const auto replCoord = repl::ReplicationCoordinator::get(opCtx); + const auto commitTimestamp = replCoord->getMyLastAppliedOpTime().getTimestamp(); + + const auto mySnapshot = opCtx->recoveryUnit()->getPointInTimeReadTimestamp(); + // If a lock that blocks writes is held, there can be no uncommitted writes, so there is no + // need to check snapshot visibility, especially if a caller is not reading with a timestamp. + invariant(mySnapshot || opCtx->lockState()->isCollectionLockedForMode(nss.ns(), MODE_S), + "a write-blocking lock is required when applying a ghost timestamp without a read " + "timestamp"); + invariant(!mySnapshot || *mySnapshot <= commitTimestamp, + str::stream() << "commit timestamp " << commitTimestamp.toString() + << " cannot be older than current read timestamp " + << mySnapshot->toString()); + + auto status = opCtx->recoveryUnit()->setTimestamp(commitTimestamp); + if (status.code() == ErrorCodes::BadValue) { + log() << "Temporarily could not apply ghost commit timestamp. " << status.reason(); + throw WriteConflictException(); + } + LOG(1) << "assigning ghost commit timestamp: " << commitTimestamp.toString(); + + fassert(51053, status); +} + +/** + * Returns true if writes to the catalog entry for the input namespace require being + * timestamped. A ghost write is when the operation is not committed with an oplog entry and + * implies the caller will look at the logical clock to choose a time to use. + */ +namespace { +bool requiresGhostCommitTimestampForCatalogWrite(OperationContext* opCtx, NamespaceString nss) { + if (!nss.isReplicated() || nss.coll().startsWith("tmp.mr.")) { + return false; + } + + auto replCoord = repl::ReplicationCoordinator::get(opCtx); + if (!replCoord->isReplEnabled()) { + return false; + } + + // If there is a commit timestamp already assigned, there's no need to explicitly assign a + // timestamp. This case covers foreground index builds. + if (!opCtx->recoveryUnit()->getCommitTimestamp().isNull()) { + return false; + } + + // Only oplog entries (including a user's `applyOps` command) construct indexes via + // `IndexBuilder`. Nodes in `startup` may not yet have initialized the `LogicalClock`, however + // index builds during startup replication recovery must be timestamped. These index builds + // are foregrounded and timestamp their catalog writes with a "commit timestamp". Nodes in the + // oplog application phase of initial sync (`startup2`) must not timestamp index builds before + // the `initialDataTimestamp`. + const auto memberState = replCoord->getMemberState(); + if (memberState.startup() || memberState.startup2()) { + return false; + } + + // When in rollback via refetch, it's valid for all writes to be untimestamped. Additionally, + // it's illegal to timestamp a write later than the timestamp associated with the node exiting + // the rollback state. This condition opts for being conservative. + if (!serverGlobalParams.enableMajorityReadConcern && memberState.rollback()) { + return false; + } + + return true; +} +} // namespace + +void IndexTimestampHelper::setGhostCommitTimestampForCatalogWrite(OperationContext* opCtx, + const NamespaceString& nss) { + invariant(opCtx->lockState()->inAWriteUnitOfWork()); + if (!requiresGhostCommitTimestampForCatalogWrite(opCtx, nss)) { + return; + } + auto status = opCtx->recoveryUnit()->setTimestamp( + LogicalClock::get(opCtx)->getClusterTime().asTimestamp()); + if (status.code() == ErrorCodes::BadValue) { + log() << "Temporarily could not timestamp the index build commit, retrying. " + << status.reason(); + throw WriteConflictException(); + } + fassert(50701, status); +} +} // namespace mongo diff --git a/src/mongo/db/catalog/index_timestamp_helper.h b/src/mongo/db/catalog/index_timestamp_helper.h new file mode 100644 index 00000000000..af4a746a869 --- /dev/null +++ b/src/mongo/db/catalog/index_timestamp_helper.h @@ -0,0 +1,58 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context.h" + +namespace mongo { + +namespace IndexTimestampHelper { + +/** + * If required, sets a timestamp on an active WriteUnitOfWork. A ghost write is when the + * operation is not committed with an oplog entry, which may be necessary for certain index + * build operations not associated with a unique optime. This implementation uses the + * lastApplied OpTime. + * + * May throw a WriteConflictException if the timestamp chosen is too old. + */ +void setGhostCommitTimestampForWrite(OperationContext* opCtx, const NamespaceString& nss); + +/** + * If required, sets a timestamp on an active WriteUnitOfWork for a catalog write. A ghost write + * is when the operation is not committed with an oplog entry, which may be necessary for + * certain index catalog operations not associated with a unique optime. This implementation + * uses the LogicalClock to timestamp operations. + */ +void setGhostCommitTimestampForCatalogWrite(OperationContext* opCtx, const NamespaceString& nss); +}; + +} // mongo diff --git a/src/mongo/db/catalog/multi_index_block.cpp b/src/mongo/db/catalog/multi_index_block.cpp index fbddcd68dd0..1b4d277ebd9 100644 --- a/src/mongo/db/catalog/multi_index_block.cpp +++ b/src/mongo/db/catalog/multi_index_block.cpp @@ -573,7 +573,7 @@ Status MultiIndexBlock::dumpInsertsFromBulk(std::set<RecordId>* dupRecords) { return Status::OK(); } -Status MultiIndexBlock::drainBackgroundWrites() { +Status MultiIndexBlock::drainBackgroundWrites(RecoveryUnit::ReadSource readSource) { if (State::kAborted == _getState()) { return {ErrorCodes::IndexBuildAborted, str::stream() << "Index build aborted: " << _abortReason @@ -595,7 +595,7 @@ Status MultiIndexBlock::drainBackgroundWrites() { if (!interceptor) continue; - auto status = interceptor->drainWritesIntoIndex(_opCtx, _indexes[i].options); + auto status = interceptor->drainWritesIntoIndex(_opCtx, _indexes[i].options, readSource); if (!status.isOK()) { return status; } diff --git a/src/mongo/db/catalog/multi_index_block.h b/src/mongo/db/catalog/multi_index_block.h index 525a193bfd5..227ad8cc722 100644 --- a/src/mongo/db/catalog/multi_index_block.h +++ b/src/mongo/db/catalog/multi_index_block.h @@ -141,9 +141,13 @@ public: * before calling commit(), stop writes on the collection by holding a S or X while calling this * method. * + * When 'readSource' is not kUnset, perform the drain by reading at the timestamp described by + * the ReadSource. + * * Must not be in a WriteUnitOfWork. */ - Status drainBackgroundWrites(); + Status drainBackgroundWrites( + RecoveryUnit::ReadSource readSource = RecoveryUnit::ReadSource::kUnset); /** * Check any constraits that may have been temporarily violated during the index build for diff --git a/src/mongo/db/commands/create_indexes.cpp b/src/mongo/db/commands/create_indexes.cpp index f0d2d3d36d8..754e8cce6be 100644 --- a/src/mongo/db/commands/create_indexes.cpp +++ b/src/mongo/db/commands/create_indexes.cpp @@ -357,7 +357,9 @@ bool runCreateIndexes(OperationContext* opCtx, opCtx->recoveryUnit()->abandonSnapshot(); Lock::CollectionLock colLock(opCtx->lockState(), ns.ns(), MODE_IS); - uassertStatusOK(indexer.drainBackgroundWrites()); + // Read at a point in time so that the drain, which will timestamp writes at lastApplied, + // can never commit writes earlier than its read timestamp. + uassertStatusOK(indexer.drainBackgroundWrites(RecoveryUnit::ReadSource::kNoOverlap)); } if (MONGO_FAIL_POINT(hangAfterIndexBuildFirstDrain)) { diff --git a/src/mongo/db/index/SConscript b/src/mongo/db/index/SConscript index 21490053480..50b53e80630 100644 --- a/src/mongo/db/index/SConscript +++ b/src/mongo/db/index/SConscript @@ -146,6 +146,7 @@ env.Library( 'duplicate_key_tracker', ], LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/catalog/index_timestamp_helper', '$BUILD_DIR/mongo/db/multi_key_path_tracker', 'index_access_methods', ], diff --git a/src/mongo/db/index/index_build_interceptor.cpp b/src/mongo/db/index/index_build_interceptor.cpp index 41fea74abba..715c4c60fb1 100644 --- a/src/mongo/db/index/index_build_interceptor.cpp +++ b/src/mongo/db/index/index_build_interceptor.cpp @@ -34,7 +34,9 @@ #include "mongo/db/index/index_build_interceptor.h" #include "mongo/bson/bsonobj.h" +#include "mongo/db/catalog/index_timestamp_helper.h" #include "mongo/db/catalog_raii.h" +#include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/curop.h" #include "mongo/db/db_raii.h" #include "mongo/db/index/index_access_method.h" @@ -90,9 +92,26 @@ const std::string& IndexBuildInterceptor::getConstraintViolationsTableIdent() co Status IndexBuildInterceptor::drainWritesIntoIndex(OperationContext* opCtx, - const InsertDeleteOptions& options) { + const InsertDeleteOptions& options, + RecoveryUnit::ReadSource readSource) { invariant(!opCtx->lockState()->inAWriteUnitOfWork()); + // Callers may request to read at a specific timestamp so that no drained writes are timestamped + // earlier than their original write timestamp. Also ensure that leaving this function resets + // the ReadSource to its original value. + auto resetReadSourceGuard = + makeGuard([ opCtx, prevReadSource = opCtx->recoveryUnit()->getTimestampReadSource() ] { + opCtx->recoveryUnit()->abandonSnapshot(); + opCtx->recoveryUnit()->setTimestampReadSource(prevReadSource); + }); + + if (readSource != RecoveryUnit::ReadSource::kUnset) { + opCtx->recoveryUnit()->abandonSnapshot(); + opCtx->recoveryUnit()->setTimestampReadSource(readSource); + } else { + resetReadSourceGuard.dismiss(); + } + // These are used for logging only. int64_t totalDeleted = 0; int64_t totalInserted = 0; @@ -174,23 +193,36 @@ Status IndexBuildInterceptor::drainWritesIntoIndex(OperationContext* opCtx, invariant(!batch.empty()); + cursor->save(); + // If we are here, either we have reached the end of the table or the batch is full, so // insert everything in one WriteUnitOfWork, and delete each inserted document from the side // writes table. - WriteUnitOfWork wuow(opCtx); - for (auto& operation : batch) { - auto status = - _applyWrite(opCtx, operation.second, options, &totalInserted, &totalDeleted); - if (!status.isOK()) { - return status; + auto status = writeConflictRetry(opCtx, "index build drain", _indexCatalogEntry->ns(), [&] { + WriteUnitOfWork wuow(opCtx); + for (auto& operation : batch) { + auto status = + _applyWrite(opCtx, operation.second, options, &totalInserted, &totalDeleted); + if (!status.isOK()) { + return status; + } + + // Delete the document from the table as soon as it has been inserted into the + // index. This ensures that no key is ever inserted twice and no keys are skipped. + _sideWritesTable->rs()->deleteRecord(opCtx, operation.first); } - // Delete the document from the table as soon as it has been inserted into the index. - // This ensures that no key is ever inserted twice and no keys are skipped. - _sideWritesTable->rs()->deleteRecord(opCtx, operation.first); + // For rollback to work correctly, these writes need to be timestamped. The actual time + // is not important, as long as it not older than the most recent visible side write. + IndexTimestampHelper::setGhostCommitTimestampForWrite( + opCtx, NamespaceString(_indexCatalogEntry->ns())); + + wuow.commit(); + return Status::OK(); + }); + if (!status.isOK()) { + return status; } - cursor->save(); - wuow.commit(); progress->hit(batch.size()); @@ -250,7 +282,10 @@ Status IndexBuildInterceptor::_applyWrite(OperationContext* opCtx, } } - *keysInserted += result.numInserted; + int64_t numInserted = result.numInserted; + *keysInserted += numInserted; + opCtx->recoveryUnit()->onRollback( + [keysInserted, numInserted] { *keysInserted -= numInserted; }); } else { invariant(opType == Op::kDelete); DEV invariant(strcmp(operation.getStringField("op"), "d") == 0); @@ -262,6 +297,8 @@ Status IndexBuildInterceptor::_applyWrite(OperationContext* opCtx, } *keysDeleted += numDeleted; + opCtx->recoveryUnit()->onRollback( + [keysDeleted, numDeleted] { *keysDeleted -= numDeleted; }); } return Status::OK(); } diff --git a/src/mongo/db/index/index_build_interceptor.h b/src/mongo/db/index/index_build_interceptor.h index 39db95418f1..e4f2f2b3027 100644 --- a/src/mongo/db/index/index_build_interceptor.h +++ b/src/mongo/db/index/index_build_interceptor.h @@ -88,9 +88,16 @@ public: * * This is resumable, so subsequent calls will start the scan at the record immediately * following the last inserted record from a previous call to drainWritesIntoIndex. + * + * When 'readSource' is not kUnset, perform the drain by reading at the timestamp described by + * the ReadSource. This will always reset the ReadSource to its original value before returning. + * The drain otherwise reads at the pre-existing ReadSource on the RecoveryUnit. This may be + * necessary by callers that can only guarantee consistency of data up to a certain point in + * time. */ - Status drainWritesIntoIndex(OperationContext* opCtx, const InsertDeleteOptions& options); - + Status drainWritesIntoIndex(OperationContext* opCtx, + const InsertDeleteOptions& options, + RecoveryUnit::ReadSource readSource); /** * Returns 'true' if there are no visible records remaining to be applied from the side writes diff --git a/src/mongo/db/index_builder.cpp b/src/mongo/db/index_builder.cpp index fe252bc09ba..754848ae5cd 100644 --- a/src/mongo/db/index_builder.cpp +++ b/src/mongo/db/index_builder.cpp @@ -38,6 +38,7 @@ #include "mongo/db/auth/authorization_session.h" #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/database_holder.h" +#include "mongo/db/catalog/index_timestamp_helper.h" #include "mongo/db/catalog/multi_index_block.h" #include "mongo/db/client.h" #include "mongo/db/concurrency/write_conflict_exception.h" @@ -62,48 +63,6 @@ namespace { const StringData kIndexesFieldName = "indexes"_sd; const StringData kCommandName = "createIndexes"_sd; -/** - * Returns true if writes to the catalog entry for the input namespace require being - * timestamped. A ghost write is when the operation is not committed with an oplog entry and - * implies the caller will look at the logical clock to choose a time to use. - */ -bool requiresGhostCommitTimestamp(OperationContext* opCtx, NamespaceString nss) { - if (!nss.isReplicated() || nss.coll().startsWith("tmp.mr.")) { - return false; - } - - auto replCoord = repl::ReplicationCoordinator::get(opCtx); - if (!replCoord->getSettings().usingReplSets()) { - return false; - } - - // If there is a commit timestamp already assigned, there's no need to explicitly assign a - // timestamp. This case covers foreground index builds. - if (!opCtx->recoveryUnit()->getCommitTimestamp().isNull()) { - return false; - } - - // Only oplog entries (including a user's `applyOps` command) construct indexes via - // `IndexBuilder`. Nodes in `startup` may not yet have initialized the `LogicalClock`, however - // index builds during startup replication recovery must be timestamped. These index builds - // are foregrounded and timestamp their catalog writes with a "commit timestamp". Nodes in the - // oplog application phase of initial sync (`startup2`) must not timestamp index builds before - // the `initialDataTimestamp`. - const auto memberState = replCoord->getMemberState(); - if (memberState.startup() || memberState.startup2()) { - return false; - } - - // When in rollback via refetch, it's valid for all writes to be untimestamped. Additionally, - // it's illegal to timestamp a write later than the timestamp associated with the node exiting - // the rollback state. This condition opts for being conservative. - if (!serverGlobalParams.enableMajorityReadConcern && memberState.rollback()) { - return false; - } - - return true; -} - // Synchronization tools when replication spawns a background index in a new thread. // The bool is 'true' when a new background index has started in a new thread but the // parent thread has not yet synchronized with it. @@ -305,44 +264,47 @@ Status IndexBuilder::_build(OperationContext* opCtx, return status; } - { - // Perform the first drain while holding an intent lock. - Lock::CollectionLock collLock(opCtx->lockState(), ns.ns(), MODE_IX); - status = indexer.drainBackgroundWrites(); - } - if (!status.isOK()) { - return status; - } + if (buildInBackground) { + { + // Perform the first drain while holding an intent lock. + Lock::CollectionLock collLock(opCtx->lockState(), ns.ns(), MODE_IX); - // Perform the second drain while stopping inserts into the collection. - { - Lock::CollectionLock colLock(opCtx->lockState(), ns.ns(), MODE_S); - status = indexer.drainBackgroundWrites(); - } - if (!status.isOK()) { - return status; - } + // Read at a point in time so that the drain, which will timestamp writes at + // lastApplied, can never commit writes earlier than its read timestamp. + status = indexer.drainBackgroundWrites(RecoveryUnit::ReadSource::kNoOverlap); + } + if (!status.isOK()) { + return status; + } + + // Perform the second drain while stopping inserts into the collection. + { + Lock::CollectionLock colLock(opCtx->lockState(), ns.ns(), MODE_S); + status = indexer.drainBackgroundWrites(); + } + if (!status.isOK()) { + return status; + } - if (buildInBackground) { opCtx->recoveryUnit()->abandonSnapshot(); UninterruptibleLockGuard noInterrupt(opCtx->lockState()); dbLock->relockWithMode(MODE_X); - } - - // Perform the third and final drain after releasing a shared lock and reacquiring an - // exclusive lock on the database. - status = indexer.drainBackgroundWrites(); - if (!status.isOK()) { - return status; - } - // Only perform constraint checking when enforced (on primaries). - if (_indexConstraints == IndexConstraints::kEnforce) { - status = indexer.checkConstraints(); + // Perform the third and final drain after releasing a shared lock and reacquiring an + // exclusive lock on the database. + status = indexer.drainBackgroundWrites(); if (!status.isOK()) { return status; } + + // Only perform constraint checking when enforced (on primaries). + if (_indexConstraints == IndexConstraints::kEnforce) { + status = indexer.checkConstraints(); + if (!status.isOK()) { + return status; + } + } } status = writeConflictRetry(opCtx, "Commit index build", ns.ns(), [opCtx, coll, &indexer, &ns] { @@ -355,16 +317,7 @@ Status IndexBuilder::_build(OperationContext* opCtx, return status; } - if (requiresGhostCommitTimestamp(opCtx, ns)) { - auto status = opCtx->recoveryUnit()->setTimestamp( - LogicalClock::get(opCtx)->getClusterTime().asTimestamp()); - if (status.code() == ErrorCodes::BadValue) { - log() << "Temporarily could not timestamp the index build commit, retrying. " - << status.reason(); - throw WriteConflictException(); - } - fassert(50701, status); - } + IndexTimestampHelper::setGhostCommitTimestampForCatalogWrite(opCtx, ns); wunit.commit(); return Status::OK(); }); diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.cpp b/src/mongo/db/repl/collection_bulk_loader_impl.cpp index bddf1243ec4..f29692a1f0a 100644 --- a/src/mongo/db/repl/collection_bulk_loader_impl.cpp +++ b/src/mongo/db/repl/collection_bulk_loader_impl.cpp @@ -189,36 +189,16 @@ Status CollectionBulkLoaderImpl::commit() { } if (_idIndexBlock) { - // Delete dups. + // Gather RecordIds for uninserted duplicate keys to delete. std::set<RecordId> dups; // Do not do inside a WriteUnitOfWork (required by dumpInsertsFromBulk). auto status = _idIndexBlock->dumpInsertsFromBulk(&dups); if (!status.isOK()) { return status; } - for (auto&& it : dups) { - writeConflictRetry( - _opCtx.get(), "CollectionBulkLoaderImpl::commit", _nss.ns(), [this, &it] { - WriteUnitOfWork wunit(_opCtx.get()); - _autoColl->getCollection()->deleteDocument(_opCtx.get(), - kUninitializedStmtId, - it, - nullptr /** OpDebug **/, - false /* fromMigrate */, - true /* noWarn */); - wunit.commit(); - }); - } - status = _idIndexBlock->drainBackgroundWrites(); - if (!status.isOK()) { - return status; - } - status = _idIndexBlock->checkConstraints(); - if (!status.isOK()) { - return status; - } - // Commit _id index, without dups. + // Commit _id index without duplicate keys even though there may still be documents + // with duplicate _ids. These duplicates will be deleted in the following step. status = writeConflictRetry( _opCtx.get(), "CollectionBulkLoaderImpl::commit", _nss.ns(), [this] { WriteUnitOfWork wunit(_opCtx.get()); @@ -232,6 +212,22 @@ Status CollectionBulkLoaderImpl::commit() { if (!status.isOK()) { return status; } + + // Delete duplicate records after committing the index so these writes are not + // intercepted by the still in-progress index builder. + for (auto&& it : dups) { + writeConflictRetry( + _opCtx.get(), "CollectionBulkLoaderImpl::commit", _nss.ns(), [this, &it] { + WriteUnitOfWork wunit(_opCtx.get()); + _autoColl->getCollection()->deleteDocument(_opCtx.get(), + kUninitializedStmtId, + it, + nullptr /** OpDebug **/, + false /* fromMigrate */, + true /* noWarn */); + wunit.commit(); + }); + } } _stats.endBuildingIndexes = Date_t::now(); LOG(2) << "Done creating indexes for ns: " << _nss.ns() << ", stats: " << _stats.toString(); diff --git a/src/mongo/db/storage/recovery_unit.h b/src/mongo/db/storage/recovery_unit.h index a87f14a7091..5d5864ccc8c 100644 --- a/src/mongo/db/storage/recovery_unit.h +++ b/src/mongo/db/storage/recovery_unit.h @@ -190,6 +190,7 @@ public: * Returns the Timestamp being used by this recovery unit or boost::none if not reading from * a point in time. Any point in time returned will reflect one of the following: * - when using ReadSource::kProvided, the timestamp provided. + * - when using ReadSource::kNoOverlap, the timestamp chosen by the storage engine. * - when using ReadSource::kLastAppliedSnapshot, the timestamp chosen using the storage * engine's last applied timestamp. * - when using ReadSource::kAllCommittedSnapshot, the timestamp chosen using the storage @@ -294,7 +295,7 @@ public: } /** - * The ReadSource indicates which exteral or provided timestamp to read from for future + * The ReadSource indicates which external or provided timestamp to read from for future * transactions. */ enum ReadSource { @@ -311,6 +312,10 @@ public: */ kMajorityCommitted, /** + * Read from the latest timestamp where no future transactions will commit at or before. + */ + kNoOverlap, + /** * Read from the last applied timestamp. New transactions start at the most up-to-date * timestamp. */ diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp index ab5dcb6ff26..ea28f53be95 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp @@ -388,6 +388,10 @@ void WiredTigerRecoveryUnit::_txnClose(bool commit) { int wtRet; if (commit) { if (!_commitTimestamp.isNull()) { + // There is currently no scenario where it is intentional to commit before the current + // read timestamp. + invariant(_readAtTimestamp.isNull() || _commitTimestamp >= _readAtTimestamp); + const std::string conf = "commit_timestamp=" + integerToHex(_commitTimestamp.asULL()); invariantWTOK(s->timestamp_transaction(s, conf.c_str())); _isTimestamped = true; @@ -456,40 +460,54 @@ boost::optional<Timestamp> WiredTigerRecoveryUnit::getPointInTimeReadTimestamp() // this timestamp to inform visiblity of operations, it is therefore necessary to open a // transaction to establish a read timestamp, but only for ReadSources that are expected to have // read timestamps. - if (_timestampReadSource == ReadSource::kUnset || - _timestampReadSource == ReadSource::kNoTimestamp) { - return boost::none; - } - - // This ReadSource depends on a previous call to obtainMajorityCommittedSnapshot() and does not - // require an open transaction to return a valid timestamp. - if (_timestampReadSource == ReadSource::kMajorityCommitted) { - invariant(!_majorityCommittedSnapshot.isNull()); - return _majorityCommittedSnapshot; - } - - // The read timestamp is set by the user and does not require a transaction to be open. - if (_timestampReadSource == ReadSource::kProvided) { - invariant(!_readAtTimestamp.isNull()); - return _readAtTimestamp; + switch (_timestampReadSource) { + case ReadSource::kUnset: + case ReadSource::kNoTimestamp: + return boost::none; + case ReadSource::kMajorityCommitted: + // This ReadSource depends on a previous call to obtainMajorityCommittedSnapshot() and + // does not require an open transaction to return a valid timestamp. + invariant(!_majorityCommittedSnapshot.isNull()); + return _majorityCommittedSnapshot; + case ReadSource::kProvided: + // The read timestamp is set by the user and does not require a transaction to be open. + invariant(!_readAtTimestamp.isNull()); + return _readAtTimestamp; + + // The following ReadSources can only establish a read timestamp when a transaction is + // opened. + case ReadSource::kNoOverlap: + case ReadSource::kLastApplied: + case ReadSource::kLastAppliedSnapshot: + case ReadSource::kAllCommittedSnapshot: + break; } - // The following ReadSources can only establish a read timestamp when a transaction is opened. + // Ensure a transaction is opened. getSession(); - if (_timestampReadSource == ReadSource::kLastAppliedSnapshot || - _timestampReadSource == ReadSource::kAllCommittedSnapshot) { - invariant(!_readAtTimestamp.isNull()); - return _readAtTimestamp; - } - - // The lastApplied timestamp is not always available, so it is not possible to invariant that - // it exists as other ReadSources do. - if (_timestampReadSource == ReadSource::kLastApplied && !_readAtTimestamp.isNull()) { - return _readAtTimestamp; + switch (_timestampReadSource) { + case ReadSource::kLastApplied: + // The lastApplied timestamp is not always available, so it is not possible to invariant + // that it exists as other ReadSources do. + if (!_readAtTimestamp.isNull()) { + return _readAtTimestamp; + } + return boost::none; + case ReadSource::kNoOverlap: + case ReadSource::kLastAppliedSnapshot: + case ReadSource::kAllCommittedSnapshot: + invariant(!_readAtTimestamp.isNull()); + return _readAtTimestamp; + + // The follow ReadSources returned values in the first switch block. + case ReadSource::kUnset: + case ReadSource::kNoTimestamp: + case ReadSource::kMajorityCommitted: + case ReadSource::kProvided: + MONGO_UNREACHABLE; } - - return boost::none; + MONGO_UNREACHABLE; } void WiredTigerRecoveryUnit::_txnOpen() { @@ -531,6 +549,10 @@ void WiredTigerRecoveryUnit::_txnOpen() { } break; } + case ReadSource::kNoOverlap: { + _readAtTimestamp = _beginTransactionAtNoOverlapTimestamp(session); + break; + } case ReadSource::kAllCommittedSnapshot: { if (_readAtTimestamp.isNull()) { _readAtTimestamp = _beginTransactionAtAllCommittedTimestamp(session); @@ -576,12 +598,60 @@ Timestamp WiredTigerRecoveryUnit::_beginTransactionAtAllCommittedTimestamp(WT_SE // Since this is not in a critical section, we might have rounded to oldest between // calling getAllCommitted and setTimestamp. We need to get the actual read timestamp we // used. + auto readTimestamp = _getTransactionReadTimestamp(session); + txnOpen.done(); + return readTimestamp; +} + +Timestamp WiredTigerRecoveryUnit::_beginTransactionAtNoOverlapTimestamp(WT_SESSION* session) { + + auto lastApplied = _sessionCache->snapshotManager().getLocalSnapshot(); + Timestamp allCommitted = Timestamp(_oplogManager->fetchAllCommittedValue(session->connection)); + + // When using timestamps for reads and writes, it's important that readers and writers don't + // overlap with the timestamps they use. In other words, at any point in the system there should + // be a timestamp T such that writers only commit at times greater than T and readers only read + // at, or earlier than T. This time T is called the no-overlap point. Using the `kNoOverlap` + // ReadSource will compute the most recent known time that is safe to read at. + + // The no-overlap point is computed as the minimum of the storage engine's all-committed time + // and replication's last applied time. On primaries, the last applied time is updated as + // transactions commit, which is not necessarily in the order they appear in the oplog. Thus + // the all-committed time is an appropriate value to read at. + + // On secondaries, however, the all-committed time, as computed by the storage engine, can + // advance before oplog application completes a batch. This is because the all-committed time + // is only computed correctly if the storage engine is informed of commit timestamps in + // increasing order. Because oplog application processes a batch of oplog entries out of order, + // the timestamping requirement is not satisfied. Secondaries, however, only update the last + // applied time after a batch completes. Thus last applied is a valid no-overlap point on + // secondaries. + + // By taking the minimum of the two values, storage can compute a legal time to read at without + // knowledge of the replication state. The no-overlap point is the minimum of the all-committed + // time, which represents the point where no transactions will commit any earlier, and + // lastApplied, which represents the highest optime a node has applied, a point no readers + // should read afterward. + Timestamp readTimestamp = (lastApplied) ? std::min(*lastApplied, allCommitted) : allCommitted; + + WiredTigerBeginTxnBlock txnOpen(session, _ignorePrepared); + auto status = + txnOpen.setTimestamp(readTimestamp, WiredTigerBeginTxnBlock::RoundToOldest::kRound); + fassert(51061, status); + + // We might have rounded to oldest between calling getAllCommitted and setTimestamp. We need to + // get the actual read timestamp we used. + readTimestamp = _getTransactionReadTimestamp(session); + txnOpen.done(); + return readTimestamp; +} + +Timestamp WiredTigerRecoveryUnit::_getTransactionReadTimestamp(WT_SESSION* session) { char buf[(2 * 8 /*bytes in hex*/) + 1 /*nul terminator*/]; auto wtstatus = session->query_timestamp(session, buf, "get=read"); invariantWTOK(wtstatus); uint64_t read_timestamp; fassert(50949, parseNumberFromStringWithBase(buf, 16, &read_timestamp)); - txnOpen.done(); return Timestamp(read_timestamp); } @@ -595,6 +665,10 @@ Status WiredTigerRecoveryUnit::setTimestamp(Timestamp timestamp) { str::stream() << "Commit timestamp set to " << _commitTimestamp.toString() << " and trying to set WUOW timestamp to " << timestamp.toString()); + invariant(_readAtTimestamp.isNull() || timestamp >= _readAtTimestamp, + str::stream() << "future commit timestamp " << timestamp.toString() + << " cannot be older than read timestamp " + << _readAtTimestamp.toString()); _lastTimestampSet = timestamp; diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h index e83643a77b5..72e912233ac 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h @@ -257,6 +257,17 @@ private: Timestamp _beginTransactionAtAllCommittedTimestamp(WT_SESSION* session); /** + * Starts a transaction at the no-overlap timestamp. Returns the timestamp the transaction + * was started at. + */ + Timestamp _beginTransactionAtNoOverlapTimestamp(WT_SESSION* session); + + /** + * Returns the timestamp at which the current transaction is reading. + */ + Timestamp _getTransactionReadTimestamp(WT_SESSION* session); + + /** * Transitions to new state. */ void _setState(State newState); diff --git a/src/mongo/dbtests/SConscript b/src/mongo/dbtests/SConscript index af471589749..927842efd57 100644 --- a/src/mongo/dbtests/SConscript +++ b/src/mongo/dbtests/SConscript @@ -125,6 +125,7 @@ dbtest = env.Program( "$BUILD_DIR/mongo/db/commands/test_commands_enabled", "$BUILD_DIR/mongo/db/concurrency/deferred_writer", "$BUILD_DIR/mongo/db/index/index_access_methods", + "$BUILD_DIR/mongo/db/index/index_build_interceptor", "$BUILD_DIR/mongo/db/logical_clock", "$BUILD_DIR/mongo/db/logical_time_metadata_hook", "$BUILD_DIR/mongo/db/pipeline/document_value_test_util", diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp index 5787ef486ff..6b31f842b37 100644 --- a/src/mongo/dbtests/storage_timestamp_tests.cpp +++ b/src/mongo/dbtests/storage_timestamp_tests.cpp @@ -47,6 +47,7 @@ #include "mongo/db/dbdirectclient.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/global_settings.h" +#include "mongo/db/index/index_build_interceptor.h" #include "mongo/db/index/index_descriptor.h" #include "mongo/db/logical_clock.h" #include "mongo/db/multi_key_path_tracker.h" @@ -1876,6 +1877,135 @@ public: } }; +template <bool SimulatePrimary> +class TimestampIndexBuildDrain : public StorageTimestampTest { +public: + void run() { + const bool SimulateSecondary = !SimulatePrimary; + if (SimulateSecondary) { + // The MemberState is inspected during index builds to use a "ghost" write to timestamp + // index completion. + ASSERT_OK(_coordinatorMock->setFollowerMode({repl::MemberState::MS::RS_SECONDARY})); + } + + NamespaceString nss("unittests.timestampIndexBuildDrain"); + reset(nss); + + AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_X, LockMode::MODE_X); + + // Build an index on `{a: 1}`. + MultiIndexBlock indexer(_opCtx, autoColl.getCollection()); + const LogicalTime beforeIndexBuild = _clock->reserveTicks(2); + BSONObj indexInfoObj; + { + // Primaries do not have a wrapping `TimestampBlock`; secondaries do. + const Timestamp commitTimestamp = + SimulatePrimary ? Timestamp::min() : beforeIndexBuild.addTicks(1).asTimestamp(); + TimestampBlock tsBlock(_opCtx, commitTimestamp); + + // Secondaries will also be in an `UnreplicatedWritesBlock` that prevents the `logOp` + // from making creating an entry. + boost::optional<repl::UnreplicatedWritesBlock> unreplicated; + if (SimulateSecondary) { + unreplicated.emplace(_opCtx); + } + + auto swIndexInfoObj = indexer.init({BSON("v" << 2 << "unique" << true << "name" + << "a_1" + << "ns" + << nss.ns() + << "key" + << BSON("a" << 1))}); + ASSERT_OK(swIndexInfoObj.getStatus()); + indexInfoObj = std::move(swIndexInfoObj.getValue()[0]); + } + + const LogicalTime afterIndexInit = _clock->reserveTicks(1); + + // Insert a document that will be intercepted and need to be drained. This timestamp will + // become the lastApplied time. + const LogicalTime firstInsert = _clock->reserveTicks(1); + { + WriteUnitOfWork wuow(_opCtx); + insertDocument(autoColl.getCollection(), + InsertStatement(BSON("_id" << 0 << "a" << 1), + firstInsert.asTimestamp(), + presentTerm)); + wuow.commit(); + ASSERT_EQ(1, itCount(autoColl.getCollection())); + } + + // Index build drain will timestamp writes from the side table into the index with the + // lastApplied timestamp. This is because these writes are not associated with any specific + // oplog entry. + ASSERT_EQ(repl::ReplicationCoordinator::get(getGlobalServiceContext()) + ->getMyLastAppliedOpTime() + .getTimestamp(), + firstInsert.asTimestamp()); + + ASSERT_OK(indexer.drainBackgroundWrites()); + + auto indexCatalog = autoColl.getCollection()->getIndexCatalog(); + const IndexCatalogEntry* buildingIndex = indexCatalog->getEntry( + indexCatalog->findIndexByName(_opCtx, "a_1", /* includeUnfinished */ true)); + ASSERT(buildingIndex); + + { + // Before the drain, there are no writes to apply. + OneOffRead oor(_opCtx, afterIndexInit.asTimestamp()); + ASSERT_TRUE(buildingIndex->indexBuildInterceptor()->areAllWritesApplied(_opCtx)); + } + + // Note: In this case, we can't observe a state where all writes are not applied, because + // the index build drain effectively rewrites history by retroactively committing the drain + // at the same time as the first insert, meaning there is no point-in-time with undrained + // writes. This is fine, as long as the drain does not commit at a time before this insert. + + { + // At time of the first insert, all writes are applied. + OneOffRead oor(_opCtx, firstInsert.asTimestamp()); + ASSERT_TRUE(buildingIndex->indexBuildInterceptor()->areAllWritesApplied(_opCtx)); + } + + // Insert a second document that will be intercepted and need to be drained. + const LogicalTime secondInsert = _clock->reserveTicks(1); + { + WriteUnitOfWork wuow(_opCtx); + insertDocument(autoColl.getCollection(), + InsertStatement(BSON("_id" << 1 << "a" << 2), + secondInsert.asTimestamp(), + presentTerm)); + wuow.commit(); + ASSERT_EQ(2, itCount(autoColl.getCollection())); + } + + // Advance the lastApplied optime to observe a point before the drain where there are + // un-drained writes. + const LogicalTime afterSecondInsert = _clock->reserveTicks(1); + setReplCoordAppliedOpTime(repl::OpTime(afterSecondInsert.asTimestamp(), presentTerm)); + + ASSERT_OK(indexer.drainBackgroundWrites()); + + { + // At time of the second insert, there are un-drained writes. + OneOffRead oor(_opCtx, secondInsert.asTimestamp()); + ASSERT_FALSE(buildingIndex->indexBuildInterceptor()->areAllWritesApplied(_opCtx)); + } + + { + // After the second insert, also the lastApplied time, all writes are applied. + OneOffRead oor(_opCtx, afterSecondInsert.asTimestamp()); + ASSERT_TRUE(buildingIndex->indexBuildInterceptor()->areAllWritesApplied(_opCtx)); + } + + { + WriteUnitOfWork wuow(_opCtx); + ASSERT_OK(indexer.commit()); + wuow.commit(); + } + } +}; + class TimestampMultiIndexBuilds : public StorageTimestampTest { public: void run() { @@ -2689,6 +2819,8 @@ public: // TimestampIndexBuilds<SimulatePrimary> add<TimestampIndexBuilds<false>>(); add<TimestampIndexBuilds<true>>(); + add<TimestampIndexBuildDrain<false>>(); + add<TimestampIndexBuildDrain<true>>(); add<TimestampMultiIndexBuilds>(); add<TimestampMultiIndexBuildsDuringRename>(); add<TimestampIndexDrops>(); |