summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2022-12-01 20:24:22 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-02-08 20:21:33 +0000
commit58d5abbf8e6e809e7d3c126c74655d2553ceabc1 (patch)
treea160c9e473c5f200e94efb128c52d0b77bf4f0a8
parent9ed6f0873d305f4ab19d8b45a9217784a1145ad2 (diff)
downloadmongo-58d5abbf8e6e809e7d3c126c74655d2553ceabc1.tar.gz
SERVER-71219 Migration can miss writes from prepared transactions
This commit does 2 things: 1. Make sure that we register the migration source op observer hook in all paths where transactions transitions into prepare. 2. If we don't have the post image doc, fetch the latest doc from storage. (cherry picked from commit a20d97c8e30b805410d86ccdbeac2d3f05c407ba)
-rw-r--r--etc/backports_required_for_multiversion_tests.yml2
-rw-r--r--jstests/sharding/prepare_transaction_then_migrate.js38
-rw-r--r--src/mongo/db/auth/auth_op_observer.h4
-rw-r--r--src/mongo/db/free_mon/free_mon_op_observer.h4
-rw-r--r--src/mongo/db/op_observer.h9
-rw-r--r--src/mongo/db/op_observer_impl.cpp6
-rw-r--r--src/mongo/db/op_observer_impl.h8
-rw-r--r--src/mongo/db/op_observer_noop.h3
-rw-r--r--src/mongo/db/op_observer_registry.h9
-rw-r--r--src/mongo/db/repl/transaction_oplog_application.cpp6
-rw-r--r--src/mongo/db/s/collection_metadata.cpp14
-rw-r--r--src/mongo/db/s/collection_metadata.h6
-rw-r--r--src/mongo/db/s/config_server_op_observer.h4
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp192
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.h38
-rw-r--r--src/mongo/db/s/op_observer_sharding_impl.cpp10
-rw-r--r--src/mongo/db/s/op_observer_sharding_impl.h4
-rw-r--r--src/mongo/db/s/shard_server_op_observer.h4
18 files changed, 289 insertions, 72 deletions
diff --git a/etc/backports_required_for_multiversion_tests.yml b/etc/backports_required_for_multiversion_tests.yml
index 4f0e8a5ebf3..daaf058ef99 100644
--- a/etc/backports_required_for_multiversion_tests.yml
+++ b/etc/backports_required_for_multiversion_tests.yml
@@ -76,7 +76,7 @@ all:
- test_file: jstests/replsets/disconnect_on_legacy_write_to_secondary.js
ticket: SERVER-50416
- test_file: jstests/sharding/prepare_transaction_then_migrate.js
- ticket: SERVER-52906
+ ticket: SERVER-71219
- test_file: jstests/sharding/migration_waits_for_majority_commit.js
ticket: SERVER-52906
- test_file: jstests/sharding/migration_ignore_interrupts_1.js
diff --git a/jstests/sharding/prepare_transaction_then_migrate.js b/jstests/sharding/prepare_transaction_then_migrate.js
index df6db403393..3d530cca4c5 100644
--- a/jstests/sharding/prepare_transaction_then_migrate.js
+++ b/jstests/sharding/prepare_transaction_then_migrate.js
@@ -3,7 +3,7 @@
* 1. Ignore multi-statement transaction prepare conflicts in the clone phase, and
* 2. Pick up the changes for prepared transactions in the transfer mods phase.
*
- * @tags: [uses_transactions, uses_prepare_transaction]
+ * @tags: [uses_transactions, uses_prepare_transaction, requires_persistence]
*/
(function() {
@@ -17,8 +17,17 @@ const collName = "user";
const staticMongod = MongoRunner.runMongod({}); // For startParallelOps.
-let runTest = function(withStepUp) {
- const st = new ShardingTest({shards: {rs0: {nodes: withStepUp ? 2 : 1}, rs1: {nodes: 1}}});
+const TestMode = {
+ kBasic: 'basic',
+ kWithStepUp: 'with stepUp',
+ kWithRestart: 'with restart',
+};
+
+let runTest = function(testMode) {
+ jsTest.log(`Running test in mode ${testMode}`);
+
+ const st = new ShardingTest(
+ {shards: {rs0: {nodes: testMode == TestMode.kWithStepUp ? 2 : 1}, rs1: {nodes: 1}}});
const collection = st.s.getDB(dbName).getCollection(collName);
CreateShardedCollectionUtil.shardCollectionWithChunks(collection, {x: 1}, [
@@ -81,8 +90,23 @@ let runTest = function(withStepUp) {
let prepareTimestamp = res.prepareTimestamp;
- if (withStepUp) {
+ if (testMode == TestMode.kWithStepUp) {
st.rs0.stepUp(st.rs0.getSecondary());
+ } else if (testMode == TestMode.kWithRestart) {
+ TestData.skipCollectionAndIndexValidation = true;
+ st.rs0.restart(st.rs0.getPrimary());
+ st.rs0.waitForMaster();
+ TestData.skipCollectionAndIndexValidation = false;
+
+ assert.soon(() => {
+ try {
+ st.shard0.getDB(dbName).getCollection("dummy").findOne();
+ return true;
+ } catch (ex) {
+ print("Caught expected once exception due to restart: " + tojson(ex));
+ return false;
+ }
+ });
}
const joinMoveChunk =
@@ -145,9 +169,9 @@ let runTest = function(withStepUp) {
st.stop();
};
-runTest(false);
-// TODO: SERVER-71219 Enable test after fixing.
-// runTest(true);
+runTest(TestMode.kBasic);
+runTest(TestMode.kWithStepUp);
+runTest(TestMode.kWithRestart);
MongoRunner.stopMongod(staticMongod);
})();
diff --git a/src/mongo/db/auth/auth_op_observer.h b/src/mongo/db/auth/auth_op_observer.h
index 234b08b3eb4..4d1741ff467 100644
--- a/src/mongo/db/auth/auth_op_observer.h
+++ b/src/mongo/db/auth/auth_op_observer.h
@@ -174,6 +174,10 @@ public:
std::vector<repl::ReplOperation>* statements,
size_t numberOfPreImagesToWrite) final {}
+ void onTransactionPrepareNonPrimary(OperationContext* opCtx,
+ const std::vector<repl::OplogEntry>& statements,
+ const repl::OpTime& prepareOpTime) final {}
+
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final {}
diff --git a/src/mongo/db/free_mon/free_mon_op_observer.h b/src/mongo/db/free_mon/free_mon_op_observer.h
index 043465e8722..db9ef173a48 100644
--- a/src/mongo/db/free_mon/free_mon_op_observer.h
+++ b/src/mongo/db/free_mon/free_mon_op_observer.h
@@ -174,6 +174,10 @@ public:
std::vector<repl::ReplOperation>* statements,
size_t numberOfPreImagesToWrite) final {}
+ void onTransactionPrepareNonPrimary(OperationContext* opCtx,
+ const std::vector<repl::OplogEntry>& statements,
+ const repl::OpTime& prepareOpTime) final {}
+
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final {}
diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h
index 362b454c0ce..cfb1e55e623 100644
--- a/src/mongo/db/op_observer.h
+++ b/src/mongo/db/op_observer.h
@@ -349,6 +349,15 @@ public:
size_t numberOfPreImagesToWrite) = 0;
/**
+ * This is called when a transaction transitions into prepare while it is not primary. Example
+ * case can include secondary oplog application or when node was restared and tries to
+ * recover prepared transactions from the oplog.
+ */
+ virtual void onTransactionPrepareNonPrimary(OperationContext* opCtx,
+ const std::vector<repl::OplogEntry>& statements,
+ const repl::OpTime& prepareOpTime) = 0;
+
+ /**
* The onTransactionAbort method is called when an atomic transaction aborts, before the
* RecoveryUnit onRollback() is called. It must not be called when the transaction to abort is
* active.
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 2f2fbf942f5..118a0cd44b8 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -1330,6 +1330,12 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx,
shardObserveTransactionPrepareOrUnpreparedCommit(opCtx, *statements, prepareOpTime);
}
+void OpObserverImpl::onTransactionPrepareNonPrimary(OperationContext* opCtx,
+ const std::vector<repl::OplogEntry>& statements,
+ const repl::OpTime& prepareOpTime) {
+ shardObserveNonPrimaryTransactionPrepare(opCtx, statements, prepareOpTime);
+}
+
void OpObserverImpl::onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) {
invariant(opCtx->getTxnNumber());
diff --git a/src/mongo/db/op_observer_impl.h b/src/mongo/db/op_observer_impl.h
index c902180c98b..09ee1dd9b92 100644
--- a/src/mongo/db/op_observer_impl.h
+++ b/src/mongo/db/op_observer_impl.h
@@ -151,6 +151,10 @@ public:
const std::vector<OplogSlot>& reservedSlots,
std::vector<repl::ReplOperation>* statements,
size_t numberOfPreImagesToWrite) final;
+
+ void onTransactionPrepareNonPrimary(OperationContext* opCtx,
+ const std::vector<repl::OplogEntry>& statements,
+ const repl::OpTime& prepareOpTime) final;
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final;
void onReplicationRollback(OperationContext* opCtx, const RollbackObserverInfo& rbInfo) final;
@@ -187,6 +191,10 @@ private:
OperationContext* opCtx,
const std::vector<repl::ReplOperation>& stmts,
const repl::OpTime& prepareOrCommitOptime) {}
+ virtual void shardObserveNonPrimaryTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<repl::OplogEntry>& stmts,
+ const repl::OpTime& prepareOrCommitOptime) {}
};
} // namespace mongo
diff --git a/src/mongo/db/op_observer_noop.h b/src/mongo/db/op_observer_noop.h
index 7a73c792304..c2a5fd1c1ad 100644
--- a/src/mongo/db/op_observer_noop.h
+++ b/src/mongo/db/op_observer_noop.h
@@ -151,6 +151,9 @@ public:
const std::vector<OplogSlot>& reservedSlots,
std::vector<repl::ReplOperation>* statements,
size_t numberOfPreImagesToWrite) override{};
+ void onTransactionPrepareNonPrimary(OperationContext* opCtx,
+ const std::vector<repl::OplogEntry>& statements,
+ const repl::OpTime& prepareOpTime) override {}
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) override{};
void onReplicationRollback(OperationContext* opCtx,
diff --git a/src/mongo/db/op_observer_registry.h b/src/mongo/db/op_observer_registry.h
index ac7db0c5ee0..a18b2394c6f 100644
--- a/src/mongo/db/op_observer_registry.h
+++ b/src/mongo/db/op_observer_registry.h
@@ -297,6 +297,15 @@ public:
}
}
+ void onTransactionPrepareNonPrimary(OperationContext* opCtx,
+ const std::vector<repl::OplogEntry>& statements,
+ const repl::OpTime& prepareOpTime) override {
+ ReservedTimes times{opCtx};
+ for (auto& observer : _observers) {
+ observer->onTransactionPrepareNonPrimary(opCtx, statements, prepareOpTime);
+ }
+ }
+
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) override {
ReservedTimes times{opCtx};
diff --git a/src/mongo/db/repl/transaction_oplog_application.cpp b/src/mongo/db/repl/transaction_oplog_application.cpp
index 8090164d85e..46e09191e74 100644
--- a/src/mongo/db/repl/transaction_oplog_application.cpp
+++ b/src/mongo/db/repl/transaction_oplog_application.cpp
@@ -39,6 +39,7 @@
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/index_builds_coordinator.h"
+#include "mongo/db/op_observer.h"
#include "mongo/db/repl/apply_ops.h"
#include "mongo/db/repl/storage_interface_impl.h"
#include "mongo/db/repl/timestamp_block.h"
@@ -471,6 +472,11 @@ Status _applyPrepareTransaction(OperationContext* opCtx,
}
txnParticipant.prepareTransaction(opCtx, entry.getOpTime());
+
+ auto opObserver = opCtx->getServiceContext()->getOpObserver();
+ invariant(opObserver);
+ opObserver->onTransactionPrepareNonPrimary(opCtx, ops, entry.getOpTime());
+
// Prepare transaction success.
abortOnError.dismiss();
diff --git a/src/mongo/db/s/collection_metadata.cpp b/src/mongo/db/s/collection_metadata.cpp
index f21793c709e..19c2259a415 100644
--- a/src/mongo/db/s/collection_metadata.cpp
+++ b/src/mongo/db/s/collection_metadata.cpp
@@ -46,13 +46,13 @@ namespace mongo {
CollectionMetadata::CollectionMetadata(std::shared_ptr<ChunkManager> cm, const ShardId& thisShardId)
: _cm(std::move(cm)), _thisShardId(thisShardId) {}
-BSONObj CollectionMetadata::extractDocumentKey(const BSONObj& doc) const {
+BSONObj CollectionMetadata::extractDocumentKey(const ShardKeyPattern* shardKeyPattern,
+ const BSONObj& doc) {
BSONObj key;
- if (isSharded()) {
- auto const& pattern = _cm->getShardKeyPattern();
- key = dotted_path_support::extractElementsBasedOnTemplate(doc, pattern.toBSON());
- if (pattern.hasId()) {
+ if (shardKeyPattern) {
+ key = dotted_path_support::extractElementsBasedOnTemplate(doc, shardKeyPattern->toBSON());
+ if (shardKeyPattern->hasId()) {
return key;
}
// else, try to append an _id field from the document.
@@ -66,6 +66,10 @@ BSONObj CollectionMetadata::extractDocumentKey(const BSONObj& doc) const {
return doc;
}
+BSONObj CollectionMetadata::extractDocumentKey(const BSONObj& doc) const {
+ return extractDocumentKey(isSharded() ? &_cm->getShardKeyPattern() : nullptr, doc);
+}
+
void CollectionMetadata::toBSONBasic(BSONObjBuilder& bb) const {
if (isSharded()) {
_cm->getVersion().appendLegacyWithField(&bb, "collVersion");
diff --git a/src/mongo/db/s/collection_metadata.h b/src/mongo/db/s/collection_metadata.h
index 128df5272f0..a663e67b73a 100644
--- a/src/mongo/db/s/collection_metadata.h
+++ b/src/mongo/db/s/collection_metadata.h
@@ -149,6 +149,12 @@ public:
BSONObj extractDocumentKey(const BSONObj& doc) const;
/**
+ * Static version of the function above. Only use this for internal sharding operations where
+ * shard key pattern is fixed and cannot change.
+ */
+ static BSONObj extractDocumentKey(const ShardKeyPattern* shardKeyPattern, const BSONObj& doc);
+
+ /**
* BSON output of the basic metadata information (chunk and shard version).
*/
void toBSONBasic(BSONObjBuilder& bb) const;
diff --git a/src/mongo/db/s/config_server_op_observer.h b/src/mongo/db/s/config_server_op_observer.h
index 8a14905345a..bddd79350eb 100644
--- a/src/mongo/db/s/config_server_op_observer.h
+++ b/src/mongo/db/s/config_server_op_observer.h
@@ -175,6 +175,10 @@ public:
std::vector<repl::ReplOperation>* statements,
size_t numberOfPreImagesToWrite) override {}
+ void onTransactionPrepareNonPrimary(OperationContext* opCtx,
+ const std::vector<repl::OplogEntry>& statements,
+ const repl::OpTime& prepareOpTime) override {}
+
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) override {}
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index 657a9dc7306..408af48e6f7 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -33,10 +33,14 @@
#include "mongo/db/s/migration_chunk_cloner_source_legacy.h"
+#include <fmt/format.h>
+
#include "mongo/base/status.h"
#include "mongo/client/read_preference.h"
#include "mongo/db/catalog/index_catalog.h"
#include "mongo/db/catalog_raii.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/dbdirectclient.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/index/index_access_method.h"
@@ -45,6 +49,7 @@
#include "mongo/db/repl/replication_process.h"
#include "mongo/db/s/collection_sharding_runtime.h"
#include "mongo/db/s/migration_source_manager.h"
+#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/sharding_runtime_d_params_gen.h"
#include "mongo/db/s/sharding_statistics.h"
#include "mongo/db/s/start_chunk_clone_request.h"
@@ -66,6 +71,8 @@
namespace mongo {
namespace {
+using namespace fmt::literals;
+
const char kRecvChunkStatus[] = "_recvChunkStatus";
const char kRecvChunkCommit[] = "_recvChunkCommit";
const char kRecvChunkAbort[] = "_recvChunkAbort";
@@ -104,9 +111,8 @@ BSONObj createRequestWithSessionId(StringData commandName,
return builder.obj();
}
-BSONObj getDocumentKeyFromReplOperation(repl::ReplOperation replOperation,
- repl::OpTypeEnum opType) {
- switch (opType) {
+BSONObj getDocumentKeyFromReplOperation(repl::ReplOperation replOperation) {
+ switch (replOperation.getOpType()) {
case repl::OpTypeEnum::kInsert:
case repl::OpTypeEnum::kDelete:
return replOperation.getObject();
@@ -171,6 +177,28 @@ private:
const repl::OpTime _prePostImageOpTime;
};
+LogTransactionOperationsForShardingHandler::LogTransactionOperationsForShardingHandler(
+ ServiceContext* service,
+ const std::vector<repl::OplogEntry>& stmts,
+ repl::OpTime prepareOrCommitOpTime)
+ : _svcCtx(service), _prepareOrCommitOpTime(std::move(prepareOrCommitOpTime)) {
+ _stmts.reserve(stmts.size());
+ _ownedReplBSONObj.reserve(stmts.size());
+
+ for (const auto& op : stmts) {
+ auto ownedBSON = op.getDurableReplOperation().toBSON().getOwned();
+ _ownedReplBSONObj.push_back(ownedBSON);
+ _stmts.push_back(
+ repl::ReplOperation::parse({"MigrationChunkClonerSource_toReplOperation"}, ownedBSON));
+ }
+}
+
+LogTransactionOperationsForShardingHandler::LogTransactionOperationsForShardingHandler(
+ ServiceContext* service,
+ const std::vector<repl::ReplOperation>& stmts,
+ repl::OpTime prepareOrCommitOpTime)
+ : _svcCtx(service), _stmts(stmts), _prepareOrCommitOpTime(std::move(prepareOrCommitOpTime)) {}
+
void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestamp>) {
std::set<NamespaceString> namespacesTouchedByTransaction;
@@ -190,7 +218,7 @@ void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestam
auto cloner = dynamic_cast<MigrationChunkClonerSourceLegacy*>(msm->getCloner().get());
auto opType = stmt.getOpType();
- auto preImageDocKey = getDocumentKeyFromReplOperation(stmt, opType);
+ auto preImageDocKey = getDocumentKeyFromReplOperation(stmt);
auto idElement = preImageDocKey["_id"];
if (idElement.eoo()) {
@@ -200,47 +228,33 @@ void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestam
continue;
}
- auto const& minKey = cloner->_args.getMinKey();
- auto const& maxKey = cloner->_args.getMaxKey();
- auto const& shardKeyPattern = cloner->_shardKeyPattern;
-
- // Note: This assumes that prepared transactions will always have post document key
- // set. There is a small window where create collection coordinator releases the critical
- // section and before it writes down the chunks for non-empty collections. So in theory,
- // it is possible to have a prepared transaction while collection is unsharded
- // and becomes sharded midway. This doesn't happen in practice because the only way to
- // have a prepared transactions without being sharded is by directly connecting to the
- // shards and manually preparing the transaction. Another exception is when transaction
- // is prepared on an older version that doesn't set the post image document key.
- auto postImageDocKey = stmt.getPostImageDocumentKey();
- if (postImageDocKey.isEmpty()) {
- LOGV2_WARNING(
- 6836102,
- "Migration encountered a transaction operation without a post image document key",
- "preImageDocKey"_attr = preImageDocKey);
- } else {
- auto postShardKeyValues =
- shardKeyPattern.extractShardKeyFromDocumentKey(postImageDocKey);
- fassert(6836100, !postShardKeyValues.isEmpty());
-
- if (!isShardKeyValueInRange(postShardKeyValues, minKey, maxKey)) {
- // If the preImageDoc is not in range but the postImageDoc was, we know that the
- // document has changed shard keys and no longer belongs in the chunk being cloned.
- // We will model the deletion of the preImage document so that the destination chunk
- // does not receive an outdated version of this document.
-
- auto preImageShardKeyValues =
- shardKeyPattern.extractShardKeyFromDocumentKey(preImageDocKey);
- fassert(6836101, !preImageShardKeyValues.isEmpty());
-
- if (opType == repl::OpTypeEnum::kUpdate &&
- isShardKeyValueInRange(preImageShardKeyValues, minKey, maxKey)) {
- opType = repl::OpTypeEnum::kDelete;
- idElement = postImageDocKey["_id"];
- } else {
+ if (opType == repl::OpTypeEnum::kUpdate) {
+ auto const& shardKeyPattern = cloner->_shardKeyPattern;
+ auto preImageShardKeyValues =
+ shardKeyPattern.extractShardKeyFromDocumentKey(preImageDocKey);
+
+ // If prepare was performed from another term, we will not have the post image doc key
+ // since it is not persisted in the oplog.
+ auto postImageDocKey = stmt.getPostImageDocumentKey();
+ if (!postImageDocKey.isEmpty()) {
+ if (!cloner->_processUpdateForXferMod(preImageDocKey, postImageDocKey)) {
+ // We don't need to add this op to session migration if neither post or pre
+ // image doc falls within the chunk range.
continue;
}
+ } else {
+ // We can't perform reads here using the same recovery unit because the transaction
+ // is already committed. We instead defer performing the reads when xferMods command
+ // is called. Also allow this op to be added to session migration since we can't
+ // tell whether post image doc will fall within the chunk range. If it turns out
+ // both preImage and postImage doc don't fall into the chunk range, it is not wrong
+ // for this op to be added to session migration, but it will result in wasted work
+ // and unneccesary extra oplog storage on the destination.
+ cloner->_deferProcessingForXferMod(preImageDocKey);
}
+ } else {
+ cloner->_addToTransferModsQueue(
+ idElement.wrap(), getOpCharForCrudOpType(opType), {}, {});
}
// Inform the session migration subsystem that a transaction has committed for all involved
@@ -842,11 +856,79 @@ Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* opCtx,
return Status::OK();
}
+bool MigrationChunkClonerSourceLegacy::_processUpdateForXferMod(const BSONObj& preImageDocKey,
+ const BSONObj& postImageDocKey) {
+ auto const& minKey = _args.getMinKey();
+ auto const& maxKey = _args.getMaxKey();
+
+ auto postShardKeyValues = _shardKeyPattern.extractShardKeyFromDocumentKey(postImageDocKey);
+ fassert(6836100, !postShardKeyValues.isEmpty());
+
+ auto opType = repl::OpTypeEnum::kUpdate;
+ auto idElement = preImageDocKey["_id"];
+
+ if (!isShardKeyValueInRange(postShardKeyValues, minKey, maxKey)) {
+ // If the preImageDoc is not in range but the postImageDoc was, we know that the
+ // document has changed shard keys and no longer belongs in the chunk being cloned.
+ // We will model the deletion of the preImage document so that the destination chunk
+ // does not receive an outdated version of this document.
+
+ auto preImageShardKeyValues =
+ _shardKeyPattern.extractShardKeyFromDocumentKey(preImageDocKey);
+ fassert(6836101, !preImageShardKeyValues.isEmpty());
+
+ if (!isShardKeyValueInRange(preImageShardKeyValues, minKey, maxKey)) {
+ return false;
+ }
+
+ opType = repl::OpTypeEnum::kDelete;
+ idElement = postImageDocKey["_id"];
+ }
+
+ _addToTransferModsQueue(idElement.wrap(), getOpCharForCrudOpType(opType), {}, {});
+
+ return true;
+}
+
+void MigrationChunkClonerSourceLegacy::_deferProcessingForXferMod(const BSONObj& preImageDocKey) {
+ stdx::lock_guard<Latch> sl(_mutex);
+ _deferredReloadOrDeletePreImageDocKeys.push_back(preImageDocKey.getOwned());
+ _deferredUntransferredOpsCounter++;
+}
+
+void MigrationChunkClonerSourceLegacy::_processDeferredXferMods(OperationContext* opCtx,
+ Database* db) {
+ std::vector<BSONObj> deferredReloadOrDeletePreImageDocKeys;
+
+ {
+ stdx::unique_lock lk(_mutex);
+ deferredReloadOrDeletePreImageDocKeys.swap(_deferredReloadOrDeletePreImageDocKeys);
+ }
+
+ for (const auto& preImageDocKey : deferredReloadOrDeletePreImageDocKeys) {
+ auto idElement = preImageDocKey["_id"];
+ BSONObj newerVersionDoc;
+ if (!Helpers::findById(
+ opCtx, db, _args.getNss().ns(), BSON("_id" << idElement), newerVersionDoc)) {
+ // If the document can no longer be found, this means that another later op must have
+ // deleted it. That delete would have been captured by the xferMods so nothing else to
+ // do here.
+ continue;
+ }
+
+ auto postImageDocKey =
+ CollectionMetadata::extractDocumentKey(&_shardKeyPattern, newerVersionDoc);
+ static_cast<void>(_processUpdateForXferMod(preImageDocKey, postImageDocKey));
+ }
+}
+
Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx,
Database* db,
BSONObjBuilder* builder) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss(), MODE_IS));
+ _processDeferredXferMods(opCtx, db);
+
std::list<BSONObj> deleteList;
std::list<BSONObj> updateList;
@@ -876,6 +958,7 @@ Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx,
_untransferredDeletesCounter = _deleted.size();
_reload.splice(_reload.cbegin(), updateList);
_untransferredUpsertsCounter = _reload.size();
+ _deferredUntransferredOpsCounter = _deferredReloadOrDeletePreImageDocKeys.size();
return Status::OK();
}
@@ -890,6 +973,8 @@ void MigrationChunkClonerSourceLegacy::_cleanup(OperationContext* opCtx) {
_untransferredUpsertsCounter = 0;
_deleted.clear();
_untransferredDeletesCounter = 0;
+ _deferredReloadOrDeletePreImageDocKeys.clear();
+ _deferredUntransferredOpsCounter = 0;
}
StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::_callRecipient(OperationContext* opCtx,
@@ -1219,17 +1304,18 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC
estimateUntransferredSessionsSize < maxUntransferredSessionsSize) {
// The recipient is sufficiently caught-up with the writes on the donor.
// Block writes, so that it can drain everything.
- LOGV2_DEBUG(5630700,
- 1,
- "moveChunk data transfer within threshold to allow write blocking",
- "_untransferredUpsertsCounter"_attr = _untransferredUpsertsCounter,
- "_untransferredDeletesCounter"_attr = _untransferredDeletesCounter,
- "_averageObjectSizeForCloneLocs"_attr = _averageObjectSizeForCloneLocs,
- "_averageObjectIdSize"_attr = _averageObjectIdSize,
- "untransferredSessionDataInBytes"_attr =
- estimateUntransferredSessionsSize,
- "maxChunksSizeBytes"_attr = _args.getMaxChunkSizeBytes(),
- "_sessionId"_attr = _sessionId.toString());
+ LOGV2_DEBUG(
+ 5630700,
+ 1,
+ "moveChunk data transfer within threshold to allow write blocking",
+ "_untransferredUpsertsCounter"_attr = _untransferredUpsertsCounter,
+ "_untransferredDeletesCounter"_attr = _untransferredDeletesCounter,
+ "_deferredUntransferredOpsCounter"_attr = _deferredUntransferredOpsCounter,
+ "_averageObjectSizeForCloneLocs"_attr = _averageObjectSizeForCloneLocs,
+ "_averageObjectIdSize"_attr = _averageObjectIdSize,
+ "untransferredSessionDataInBytes"_attr = estimateUntransferredSessionsSize,
+ "maxChunksSizeBytes"_attr = _args.getMaxChunkSizeBytes(),
+ "_sessionId"_attr = _sessionId.toString());
return Status::OK();
}
}
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
index 402b31e6529..7fc24ef4672 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
@@ -61,13 +61,13 @@ class RecordId;
*/
class LogTransactionOperationsForShardingHandler final : public RecoveryUnit::Change {
public:
- /**
- * Invariant: idObj should belong to a document that is part of the active chunk being migrated
- */
+ LogTransactionOperationsForShardingHandler(ServiceContext* svcCtx,
+ const std::vector<repl::OplogEntry>& stmts,
+ repl::OpTime prepareOrCommitOpTime);
+
LogTransactionOperationsForShardingHandler(ServiceContext* svcCtx,
const std::vector<repl::ReplOperation>& stmts,
- const repl::OpTime& prepareOrCommitOpTime)
- : _svcCtx(svcCtx), _stmts(stmts), _prepareOrCommitOpTime(prepareOrCommitOpTime) {}
+ repl::OpTime prepareOrCommitOpTime);
void commit(boost::optional<Timestamp>) override;
@@ -75,6 +75,8 @@ public:
private:
ServiceContext* _svcCtx;
+ // Use to keep BSON obj alive for the lifetime of this object.
+ std::vector<BSONObj> _ownedReplBSONObj;
std::vector<repl::ReplOperation> _stmts;
const repl::OpTime _prepareOrCommitOpTime;
};
@@ -329,7 +331,24 @@ private:
*/
Status _checkRecipientCloningStatus(OperationContext* opCtx, Milliseconds maxTimeToWait);
- // The original move chunk request
+ /**
+ * Inspects the pre and post image document keys and determines which xferMods bucket to
+ * add a new entry. Returns false if neither pre or post image document keys fall into
+ * the chunk boundaries being migrated.
+ */
+ bool _processUpdateForXferMod(const BSONObj& preImageDocKey, const BSONObj& postImageDocKey);
+
+ /**
+ * Defer processing of update ops into xferMods entries to when nextModsBatch is called.
+ */
+ void _deferProcessingForXferMod(const BSONObj& preImageDocKey);
+
+ /**
+ * Converts all deferred update ops captured by the op observer into xferMods entries.
+ */
+ void _processDeferredXferMods(OperationContext* opCtx, Database* database);
+
+ // The original move range request
const MoveChunkRequest _args;
// The shard key associated with the namespace
@@ -385,6 +404,13 @@ private:
// Amount of delete xfer mods that have not yet reached the recipient.
size_t _untransferredDeletesCounter{0};
+ // Amount of ops that are yet to be converted to update/delete xferMods.
+ size_t _deferredUntransferredOpsCounter{0};
+
+ // Stores document keys of document that needs to be examined if we need to put in to xferMods
+ // list later.
+ std::vector<BSONObj> _deferredReloadOrDeletePreImageDocKeys;
+
// Total bytes in _reload + _deleted (xfer mods)
uint64_t _memoryUsed{0};
diff --git a/src/mongo/db/s/op_observer_sharding_impl.cpp b/src/mongo/db/s/op_observer_sharding_impl.cpp
index 89818297838..ffbb83e4254 100644
--- a/src/mongo/db/s/op_observer_sharding_impl.cpp
+++ b/src/mongo/db/s/op_observer_sharding_impl.cpp
@@ -219,4 +219,14 @@ void OpObserverShardingImpl::shardObserveTransactionPrepareOrUnpreparedCommit(
opCtx->getServiceContext(), stmts, prepareOrCommitOptime));
}
+void OpObserverShardingImpl::shardObserveNonPrimaryTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<repl::OplogEntry>& stmts,
+ const repl::OpTime& prepareOrCommitOptime) {
+
+ opCtx->recoveryUnit()->registerChange(
+ std::make_unique<LogTransactionOperationsForShardingHandler>(
+ opCtx->getServiceContext(), stmts, prepareOrCommitOptime));
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/op_observer_sharding_impl.h b/src/mongo/db/s/op_observer_sharding_impl.h
index 31bf4d2aa50..c821737af7a 100644
--- a/src/mongo/db/s/op_observer_sharding_impl.h
+++ b/src/mongo/db/s/op_observer_sharding_impl.h
@@ -69,6 +69,10 @@ protected:
OperationContext* opCtx,
const std::vector<repl::ReplOperation>& stmts,
const repl::OpTime& prepareOrCommitOptime) override;
+ void shardObserveNonPrimaryTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<repl::OplogEntry>& stmts,
+ const repl::OpTime& prepareOrCommitOptime) override;
};
} // namespace mongo
diff --git a/src/mongo/db/s/shard_server_op_observer.h b/src/mongo/db/s/shard_server_op_observer.h
index 42caa0ca85b..87696550b93 100644
--- a/src/mongo/db/s/shard_server_op_observer.h
+++ b/src/mongo/db/s/shard_server_op_observer.h
@@ -175,6 +175,10 @@ public:
std::vector<repl::ReplOperation>* statements,
size_t numberOfPreImagesToWrite) override {}
+ void onTransactionPrepareNonPrimary(OperationContext* opCtx,
+ const std::vector<repl::OplogEntry>& statements,
+ const repl::OpTime& prepareOpTime) override {}
+
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) override {}