summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
diff options
context:
space:
mode:
authorBlake Oler <blake.oler@mongodb.com>2019-05-28 19:20:56 -0400
committerBlake Oler <blake.oler@mongodb.com>2019-06-12 12:53:04 -0400
commitdfa8658c18142c560447c7bf6f34a6f788593d28 (patch)
tree587b77eacb8769f92e6976f0d875c79dc3179607 /src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
parent4304ce376401b5213b63611d94f9fb1d938d9e39 (diff)
downloadmongo-dfa8658c18142c560447c7bf6f34a6f788593d28.tar.gz
SERVER-40791 Track multi-statement transaction operations for migrations at commit time
Diffstat (limited to 'src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp')
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp121
1 files changed, 85 insertions, 36 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index a78553c804e..2c3f23ff2d4 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -43,6 +43,8 @@
#include "mongo/db/query/internal_plans.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/replication_process.h"
+#include "mongo/db/s/collection_sharding_runtime.h"
+#include "mongo/db/s/migration_source_manager.h"
#include "mongo/db/s/sharding_statistics.h"
#include "mongo/db/s/start_chunk_clone_request.h"
#include "mongo/db/service_context.h"
@@ -87,6 +89,34 @@ BSONObj createRequestWithSessionId(StringData commandName,
return builder.obj();
}
+const BSONObj& getDocumentKeyFromReplOperation(repl::ReplOperation replOperation,
+ repl::OpTypeEnum opType) {
+ switch (opType) {
+ case repl::OpTypeEnum::kInsert:
+ case repl::OpTypeEnum::kDelete:
+ return replOperation.getObject();
+ case repl::OpTypeEnum::kUpdate:
+ return *replOperation.getObject2();
+ default:
+ MONGO_UNREACHABLE;
+ }
+ MONGO_UNREACHABLE;
+}
+
+const char getOpCharForCrudOpType(repl::OpTypeEnum opType) {
+ switch (opType) {
+ case repl::OpTypeEnum::kInsert:
+ return 'i';
+ case repl::OpTypeEnum::kUpdate:
+ return 'u';
+ case repl::OpTypeEnum::kDelete:
+ return 'd';
+ default:
+ MONGO_UNREACHABLE;
+ }
+ MONGO_UNREACHABLE;
+}
+
} // namespace
/**
@@ -110,8 +140,7 @@ public:
_prePostImageOpTime(prePostImageOpTime) {}
void commit(boost::optional<Timestamp>) override {
- _cloner->_consumeOperationTrackRequestAndAddToTransferModsQueue(
- _idObj, _op, _opTime, _prePostImageOpTime);
+ _cloner->_addToTransferModsQueue(_idObj, _op, _opTime, _prePostImageOpTime);
_cloner->_decrementOutstandingOperationTrackRequests();
}
@@ -127,30 +156,62 @@ private:
const repl::OpTime _prePostImageOpTime;
};
-/**
- * Used to keep track of new transactions that involve documents in any chunk
- * with an ongoing migration.
- */
-class LogPrepareOrCommitOpForShardingHandler final : public RecoveryUnit::Change {
-public:
- LogPrepareOrCommitOpForShardingHandler(MigrationChunkClonerSourceLegacy* cloner,
- const repl::OpTime& opTime)
- : _cloner(cloner), _opTime(opTime) {}
+void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestamp>) {
+ std::set<NamespaceString> namespacesTouchedByTransaction;
- void commit(boost::optional<Timestamp>) override {
- _cloner->_addToSessionMigrationOptimeQueue(
- _opTime, SessionCatalogMigrationSource::EntryAtOpTimeType::kTransaction);
- _cloner->_decrementOutstandingOperationTrackRequests();
- }
+ for (const auto& stmt : _stmts) {
+ const auto& nss = stmt.getNss();
- void rollback() override {
- _cloner->_decrementOutstandingOperationTrackRequests();
- }
+ auto csr = CollectionShardingRuntime::get_UNSAFE(_svcCtx, nss);
+ auto msm = MigrationSourceManager::get_UNSAFE(csr);
+ if (!msm) {
+ continue;
+ }
+ auto cloner = dynamic_cast<MigrationChunkClonerSourceLegacy*>(msm->getCloner().get());
-private:
- MigrationChunkClonerSourceLegacy* const _cloner;
- const repl::OpTime _opTime;
-};
+ auto opType = stmt.getOpType();
+ auto documentKey = getDocumentKeyFromReplOperation(stmt, opType);
+
+ auto idElement = documentKey["_id"];
+ if (idElement.eoo()) {
+ warning() << "Received a document with no id, ignoring: " << redact(documentKey);
+ continue;
+ }
+
+ auto const& minKey = cloner->_args.getMinKey();
+ auto const& maxKey = cloner->_args.getMaxKey();
+ auto const& shardKeyPattern = cloner->_shardKeyPattern;
+
+ if (!isInRange(documentKey, minKey, maxKey, shardKeyPattern)) {
+ // If the preImageDoc is not in range but the postImageDoc was, we know that the
+ // document has changed shard keys and no longer belongs in the chunk being cloned.
+ // We will model the deletion of the preImage document so that the destination chunk
+ // does not receive an outdated version of this document.
+ if (opType == repl::OpTypeEnum::kUpdate &&
+ isInRange(stmt.getPreImageDocumentKey(), minKey, maxKey, shardKeyPattern) &&
+ !stmt.getPreImageDocumentKey()["_id"].eoo()) {
+ opType = repl::OpTypeEnum::kDelete;
+ idElement = stmt.getPreImageDocumentKey()["id"];
+ } else {
+ continue;
+ }
+ }
+
+ // Inform the session migration subsystem that a transaction has committed for all involved
+ // namespaces.
+ if (namespacesTouchedByTransaction.find(nss) == namespacesTouchedByTransaction.end()) {
+ cloner->_addToSessionMigrationOptimeQueue(
+ _prepareOrCommitOpTime,
+ SessionCatalogMigrationSource::EntryAtOpTimeType::kTransaction);
+
+ namespacesTouchedByTransaction.emplace(nss);
+ }
+
+ // Pass an empty prePostOpTime to the queue because retryable write history doesn't care
+ // about writes in transactions.
+ cloner->_addToTransferModsQueue(idElement.wrap(), getOpCharForCrudOpType(opType), {}, {});
+ }
+}
MigrationChunkClonerSourceLegacy::MigrationChunkClonerSourceLegacy(MoveChunkRequest request,
const BSONObj& shardKeyPattern,
@@ -456,18 +517,6 @@ void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* opCtx,
}
}
-void MigrationChunkClonerSourceLegacy::onTransactionPrepareOrUnpreparedCommit(
- OperationContext* opCtx, const repl::OpTime& opTime) {
-
- invariant(opCtx->getTxnNumber());
-
- if (!_addedOperationToOutstandingOperationTrackRequests()) {
- return;
- }
-
- opCtx->recoveryUnit()->registerChange(new LogPrepareOrCommitOpForShardingHandler(this, opTime));
-}
-
void MigrationChunkClonerSourceLegacy::_addToSessionMigrationOptimeQueue(
const repl::OpTime& opTime,
SessionCatalogMigrationSource::EntryAtOpTimeType entryAtOpTimeType) {
@@ -478,7 +527,7 @@ void MigrationChunkClonerSourceLegacy::_addToSessionMigrationOptimeQueue(
}
}
-void MigrationChunkClonerSourceLegacy::_consumeOperationTrackRequestAndAddToTransferModsQueue(
+void MigrationChunkClonerSourceLegacy::_addToTransferModsQueue(
const BSONObj& idObj,
const char op,
const repl::OpTime& opTime,