summaryrefslogtreecommitdiff
path: root/src/mongo/db/index
diff options
context:
space:
mode:
authorLouis Williams <louis.williams@mongodb.com>2019-01-22 17:29:55 -0500
committerLouis Williams <louis.williams@mongodb.com>2019-02-06 13:04:21 -0500
commitbb69551adf200b1b050476ad01b765018d5bac80 (patch)
tree929f508105697e1d90ae84b93bbc549c0b853975 /src/mongo/db/index
parent33b153224e08a213f505c4f9e85d087d713cae5a (diff)
downloadmongo-bb69551adf200b1b050476ad01b765018d5bac80.tar.gz
SERVER-38986 Timestamp writes while draining
Diffstat (limited to 'src/mongo/db/index')
-rw-r--r--src/mongo/db/index/SConscript1
-rw-r--r--src/mongo/db/index/index_build_interceptor.cpp63
-rw-r--r--src/mongo/db/index/index_build_interceptor.h11
3 files changed, 60 insertions, 15 deletions
diff --git a/src/mongo/db/index/SConscript b/src/mongo/db/index/SConscript
index 21490053480..50b53e80630 100644
--- a/src/mongo/db/index/SConscript
+++ b/src/mongo/db/index/SConscript
@@ -146,6 +146,7 @@ env.Library(
'duplicate_key_tracker',
],
LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/catalog/index_timestamp_helper',
'$BUILD_DIR/mongo/db/multi_key_path_tracker',
'index_access_methods',
],
diff --git a/src/mongo/db/index/index_build_interceptor.cpp b/src/mongo/db/index/index_build_interceptor.cpp
index 41fea74abba..715c4c60fb1 100644
--- a/src/mongo/db/index/index_build_interceptor.cpp
+++ b/src/mongo/db/index/index_build_interceptor.cpp
@@ -34,7 +34,9 @@
#include "mongo/db/index/index_build_interceptor.h"
#include "mongo/bson/bsonobj.h"
+#include "mongo/db/catalog/index_timestamp_helper.h"
#include "mongo/db/catalog_raii.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/index/index_access_method.h"
@@ -90,9 +92,26 @@ const std::string& IndexBuildInterceptor::getConstraintViolationsTableIdent() co
Status IndexBuildInterceptor::drainWritesIntoIndex(OperationContext* opCtx,
- const InsertDeleteOptions& options) {
+ const InsertDeleteOptions& options,
+ RecoveryUnit::ReadSource readSource) {
invariant(!opCtx->lockState()->inAWriteUnitOfWork());
+ // Callers may request to read at a specific timestamp so that no drained writes are timestamped
+ // earlier than their original write timestamp. Also ensure that leaving this function resets
+ // the ReadSource to its original value.
+ auto resetReadSourceGuard =
+ makeGuard([ opCtx, prevReadSource = opCtx->recoveryUnit()->getTimestampReadSource() ] {
+ opCtx->recoveryUnit()->abandonSnapshot();
+ opCtx->recoveryUnit()->setTimestampReadSource(prevReadSource);
+ });
+
+ if (readSource != RecoveryUnit::ReadSource::kUnset) {
+ opCtx->recoveryUnit()->abandonSnapshot();
+ opCtx->recoveryUnit()->setTimestampReadSource(readSource);
+ } else {
+ resetReadSourceGuard.dismiss();
+ }
+
// These are used for logging only.
int64_t totalDeleted = 0;
int64_t totalInserted = 0;
@@ -174,23 +193,36 @@ Status IndexBuildInterceptor::drainWritesIntoIndex(OperationContext* opCtx,
invariant(!batch.empty());
+ cursor->save();
+
// If we are here, either we have reached the end of the table or the batch is full, so
// insert everything in one WriteUnitOfWork, and delete each inserted document from the side
// writes table.
- WriteUnitOfWork wuow(opCtx);
- for (auto& operation : batch) {
- auto status =
- _applyWrite(opCtx, operation.second, options, &totalInserted, &totalDeleted);
- if (!status.isOK()) {
- return status;
+ auto status = writeConflictRetry(opCtx, "index build drain", _indexCatalogEntry->ns(), [&] {
+ WriteUnitOfWork wuow(opCtx);
+ for (auto& operation : batch) {
+ auto status =
+ _applyWrite(opCtx, operation.second, options, &totalInserted, &totalDeleted);
+ if (!status.isOK()) {
+ return status;
+ }
+
+ // Delete the document from the table as soon as it has been inserted into the
+ // index. This ensures that no key is ever inserted twice and no keys are skipped.
+ _sideWritesTable->rs()->deleteRecord(opCtx, operation.first);
}
- // Delete the document from the table as soon as it has been inserted into the index.
- // This ensures that no key is ever inserted twice and no keys are skipped.
- _sideWritesTable->rs()->deleteRecord(opCtx, operation.first);
+ // For rollback to work correctly, these writes need to be timestamped. The actual time
+ // is not important, as long as it not older than the most recent visible side write.
+ IndexTimestampHelper::setGhostCommitTimestampForWrite(
+ opCtx, NamespaceString(_indexCatalogEntry->ns()));
+
+ wuow.commit();
+ return Status::OK();
+ });
+ if (!status.isOK()) {
+ return status;
}
- cursor->save();
- wuow.commit();
progress->hit(batch.size());
@@ -250,7 +282,10 @@ Status IndexBuildInterceptor::_applyWrite(OperationContext* opCtx,
}
}
- *keysInserted += result.numInserted;
+ int64_t numInserted = result.numInserted;
+ *keysInserted += numInserted;
+ opCtx->recoveryUnit()->onRollback(
+ [keysInserted, numInserted] { *keysInserted -= numInserted; });
} else {
invariant(opType == Op::kDelete);
DEV invariant(strcmp(operation.getStringField("op"), "d") == 0);
@@ -262,6 +297,8 @@ Status IndexBuildInterceptor::_applyWrite(OperationContext* opCtx,
}
*keysDeleted += numDeleted;
+ opCtx->recoveryUnit()->onRollback(
+ [keysDeleted, numDeleted] { *keysDeleted -= numDeleted; });
}
return Status::OK();
}
diff --git a/src/mongo/db/index/index_build_interceptor.h b/src/mongo/db/index/index_build_interceptor.h
index 39db95418f1..e4f2f2b3027 100644
--- a/src/mongo/db/index/index_build_interceptor.h
+++ b/src/mongo/db/index/index_build_interceptor.h
@@ -88,9 +88,16 @@ public:
*
* This is resumable, so subsequent calls will start the scan at the record immediately
* following the last inserted record from a previous call to drainWritesIntoIndex.
+ *
+ * When 'readSource' is not kUnset, perform the drain by reading at the timestamp described by
+ * the ReadSource. This will always reset the ReadSource to its original value before returning.
+ * The drain otherwise reads at the pre-existing ReadSource on the RecoveryUnit. This may be
+ * necessary by callers that can only guarantee consistency of data up to a certain point in
+ * time.
*/
- Status drainWritesIntoIndex(OperationContext* opCtx, const InsertDeleteOptions& options);
-
+ Status drainWritesIntoIndex(OperationContext* opCtx,
+ const InsertDeleteOptions& options,
+ RecoveryUnit::ReadSource readSource);
/**
* Returns 'true' if there are no visible records remaining to be applied from the side writes