diff options
author | Louis Williams <louis.williams@mongodb.com> | 2019-01-22 17:29:55 -0500 |
---|---|---|
committer | Louis Williams <louis.williams@mongodb.com> | 2019-02-06 13:04:21 -0500 |
commit | bb69551adf200b1b050476ad01b765018d5bac80 (patch) | |
tree | 929f508105697e1d90ae84b93bbc549c0b853975 /src/mongo/db/index | |
parent | 33b153224e08a213f505c4f9e85d087d713cae5a (diff) | |
download | mongo-bb69551adf200b1b050476ad01b765018d5bac80.tar.gz |
SERVER-38986 Timestamp writes while draining
Diffstat (limited to 'src/mongo/db/index')
-rw-r--r-- | src/mongo/db/index/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/index/index_build_interceptor.cpp | 63 | ||||
-rw-r--r-- | src/mongo/db/index/index_build_interceptor.h | 11 |
3 files changed, 60 insertions, 15 deletions
diff --git a/src/mongo/db/index/SConscript b/src/mongo/db/index/SConscript index 21490053480..50b53e80630 100644 --- a/src/mongo/db/index/SConscript +++ b/src/mongo/db/index/SConscript @@ -146,6 +146,7 @@ env.Library( 'duplicate_key_tracker', ], LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/catalog/index_timestamp_helper', '$BUILD_DIR/mongo/db/multi_key_path_tracker', 'index_access_methods', ], diff --git a/src/mongo/db/index/index_build_interceptor.cpp b/src/mongo/db/index/index_build_interceptor.cpp index 41fea74abba..715c4c60fb1 100644 --- a/src/mongo/db/index/index_build_interceptor.cpp +++ b/src/mongo/db/index/index_build_interceptor.cpp @@ -34,7 +34,9 @@ #include "mongo/db/index/index_build_interceptor.h" #include "mongo/bson/bsonobj.h" +#include "mongo/db/catalog/index_timestamp_helper.h" #include "mongo/db/catalog_raii.h" +#include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/curop.h" #include "mongo/db/db_raii.h" #include "mongo/db/index/index_access_method.h" @@ -90,9 +92,26 @@ const std::string& IndexBuildInterceptor::getConstraintViolationsTableIdent() co Status IndexBuildInterceptor::drainWritesIntoIndex(OperationContext* opCtx, - const InsertDeleteOptions& options) { + const InsertDeleteOptions& options, + RecoveryUnit::ReadSource readSource) { invariant(!opCtx->lockState()->inAWriteUnitOfWork()); + // Callers may request to read at a specific timestamp so that no drained writes are timestamped + // earlier than their original write timestamp. Also ensure that leaving this function resets + // the ReadSource to its original value. + auto resetReadSourceGuard = + makeGuard([ opCtx, prevReadSource = opCtx->recoveryUnit()->getTimestampReadSource() ] { + opCtx->recoveryUnit()->abandonSnapshot(); + opCtx->recoveryUnit()->setTimestampReadSource(prevReadSource); + }); + + if (readSource != RecoveryUnit::ReadSource::kUnset) { + opCtx->recoveryUnit()->abandonSnapshot(); + opCtx->recoveryUnit()->setTimestampReadSource(readSource); + } else { + resetReadSourceGuard.dismiss(); + } + // These are used for logging only. int64_t totalDeleted = 0; int64_t totalInserted = 0; @@ -174,23 +193,36 @@ Status IndexBuildInterceptor::drainWritesIntoIndex(OperationContext* opCtx, invariant(!batch.empty()); + cursor->save(); + // If we are here, either we have reached the end of the table or the batch is full, so // insert everything in one WriteUnitOfWork, and delete each inserted document from the side // writes table. - WriteUnitOfWork wuow(opCtx); - for (auto& operation : batch) { - auto status = - _applyWrite(opCtx, operation.second, options, &totalInserted, &totalDeleted); - if (!status.isOK()) { - return status; + auto status = writeConflictRetry(opCtx, "index build drain", _indexCatalogEntry->ns(), [&] { + WriteUnitOfWork wuow(opCtx); + for (auto& operation : batch) { + auto status = + _applyWrite(opCtx, operation.second, options, &totalInserted, &totalDeleted); + if (!status.isOK()) { + return status; + } + + // Delete the document from the table as soon as it has been inserted into the + // index. This ensures that no key is ever inserted twice and no keys are skipped. + _sideWritesTable->rs()->deleteRecord(opCtx, operation.first); } - // Delete the document from the table as soon as it has been inserted into the index. - // This ensures that no key is ever inserted twice and no keys are skipped. - _sideWritesTable->rs()->deleteRecord(opCtx, operation.first); + // For rollback to work correctly, these writes need to be timestamped. The actual time + // is not important, as long as it not older than the most recent visible side write. + IndexTimestampHelper::setGhostCommitTimestampForWrite( + opCtx, NamespaceString(_indexCatalogEntry->ns())); + + wuow.commit(); + return Status::OK(); + }); + if (!status.isOK()) { + return status; } - cursor->save(); - wuow.commit(); progress->hit(batch.size()); @@ -250,7 +282,10 @@ Status IndexBuildInterceptor::_applyWrite(OperationContext* opCtx, } } - *keysInserted += result.numInserted; + int64_t numInserted = result.numInserted; + *keysInserted += numInserted; + opCtx->recoveryUnit()->onRollback( + [keysInserted, numInserted] { *keysInserted -= numInserted; }); } else { invariant(opType == Op::kDelete); DEV invariant(strcmp(operation.getStringField("op"), "d") == 0); @@ -262,6 +297,8 @@ Status IndexBuildInterceptor::_applyWrite(OperationContext* opCtx, } *keysDeleted += numDeleted; + opCtx->recoveryUnit()->onRollback( + [keysDeleted, numDeleted] { *keysDeleted -= numDeleted; }); } return Status::OK(); } diff --git a/src/mongo/db/index/index_build_interceptor.h b/src/mongo/db/index/index_build_interceptor.h index 39db95418f1..e4f2f2b3027 100644 --- a/src/mongo/db/index/index_build_interceptor.h +++ b/src/mongo/db/index/index_build_interceptor.h @@ -88,9 +88,16 @@ public: * * This is resumable, so subsequent calls will start the scan at the record immediately * following the last inserted record from a previous call to drainWritesIntoIndex. + * + * When 'readSource' is not kUnset, perform the drain by reading at the timestamp described by + * the ReadSource. This will always reset the ReadSource to its original value before returning. + * The drain otherwise reads at the pre-existing ReadSource on the RecoveryUnit. This may be + * necessary by callers that can only guarantee consistency of data up to a certain point in + * time. */ - Status drainWritesIntoIndex(OperationContext* opCtx, const InsertDeleteOptions& options); - + Status drainWritesIntoIndex(OperationContext* opCtx, + const InsertDeleteOptions& options, + RecoveryUnit::ReadSource readSource); /** * Returns 'true' if there are no visible records remaining to be applied from the side writes |