summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2022-12-01 20:24:22 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-02-03 19:43:42 +0000
commitbd227ce5529b0423ca791aa926db48ec8b10c07d (patch)
treed515feb5d4f487285ac3e8ef170393dd47a44356
parent55e4a05daabba24a6ec976a69335823384ed981b (diff)
downloadmongo-bd227ce5529b0423ca791aa926db48ec8b10c07d.tar.gz
SERVER-71219 Migration can miss writes from prepared transactions
This commit does 2 things: 1. Make sure that we register the migration source op observer hook in all paths where transactions transitions into prepare. 2. If we don't have the post image doc, fetch the latest doc from storage. (cherry picked from commit 2f708612dc39780410bf40d31a404cb121f653a9)
-rw-r--r--etc/backports_required_for_multiversion_tests.yml4
-rw-r--r--jstests/sharding/prepare_transaction_then_migrate.js38
-rw-r--r--src/mongo/db/auth/auth_op_observer.h4
-rw-r--r--src/mongo/db/fcv_op_observer.h4
-rw-r--r--src/mongo/db/free_mon/free_mon_op_observer.h4
-rw-r--r--src/mongo/db/op_observer.h9
-rw-r--r--src/mongo/db/op_observer_impl.cpp6
-rw-r--r--src/mongo/db/op_observer_impl.h9
-rw-r--r--src/mongo/db/op_observer_noop.h3
-rw-r--r--src/mongo/db/op_observer_registry.h9
-rw-r--r--src/mongo/db/repl/primary_only_service_op_observer.h4
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_op_observer.h4
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_op_observer.h4
-rw-r--r--src/mongo/db/repl/transaction_oplog_application.cpp6
-rw-r--r--src/mongo/db/s/collection_metadata.cpp14
-rw-r--r--src/mongo/db/s/collection_metadata.h6
-rw-r--r--src/mongo/db/s/config_server_op_observer.h4
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp176
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.h38
-rw-r--r--src/mongo/db/s/op_observer_sharding_impl.cpp10
-rw-r--r--src/mongo/db/s/op_observer_sharding_impl.h4
-rw-r--r--src/mongo/db/s/resharding/resharding_op_observer.h4
-rw-r--r--src/mongo/db/s/shard_server_op_observer.h4
-rw-r--r--src/mongo/db/serverless/shard_split_donor_op_observer.h4
-rw-r--r--src/mongo/db/user_write_block_mode_op_observer.h4
-rw-r--r--src/mongo/idl/cluster_server_parameter_op_observer.h4
26 files changed, 316 insertions, 64 deletions
diff --git a/etc/backports_required_for_multiversion_tests.yml b/etc/backports_required_for_multiversion_tests.yml
index 5cc3e03d326..302f42137e2 100644
--- a/etc/backports_required_for_multiversion_tests.yml
+++ b/etc/backports_required_for_multiversion_tests.yml
@@ -248,6 +248,8 @@ last-continuous:
ticket: SERVER-72620
- test_file: jstests/core/timeseries/bucket_unpacking_with_sort_extended_range.js
ticket: SERVER-73110
+ - test_file: jstests/sharding/prepare_transaction_then_migrate.js
+ ticket: SERVER-71219
suites: null
last-lts:
all:
@@ -316,7 +318,7 @@ last-lts:
- test_file: jstests/core/txns/errors_on_committed_transaction.js
ticket: SERVER-52547
- test_file: jstests/sharding/prepare_transaction_then_migrate.js
- ticket: SERVER-52906
+ ticket: SERVER-71219
- test_file: jstests/sharding/migration_waits_for_majority_commit.js
ticket: SERVER-52906
- test_file: jstests/sharding/migration_ignore_interrupts_1.js
diff --git a/jstests/sharding/prepare_transaction_then_migrate.js b/jstests/sharding/prepare_transaction_then_migrate.js
index 36a9581752d..5a12a83ed4c 100644
--- a/jstests/sharding/prepare_transaction_then_migrate.js
+++ b/jstests/sharding/prepare_transaction_then_migrate.js
@@ -3,7 +3,7 @@
* 1. Ignore multi-statement transaction prepare conflicts in the clone phase, and
* 2. Pick up the changes for prepared transactions in the transfer mods phase.
*
- * @tags: [uses_transactions, uses_prepare_transaction]
+ * @tags: [uses_transactions, uses_prepare_transaction, requires_persistence]
*/
(function() {
@@ -17,8 +17,17 @@ const collName = "user";
const staticMongod = MongoRunner.runMongod({}); // For startParallelOps.
-let runTest = function(withStepUp) {
- const st = new ShardingTest({shards: {rs0: {nodes: withStepUp ? 2 : 1}, rs1: {nodes: 1}}});
+const TestMode = {
+ kBasic: 'basic',
+ kWithStepUp: 'with stepUp',
+ kWithRestart: 'with restart',
+};
+
+let runTest = function(testMode) {
+ jsTest.log(`Running test in mode ${testMode}`);
+
+ const st = new ShardingTest(
+ {shards: {rs0: {nodes: testMode == TestMode.kWithStepUp ? 2 : 1}, rs1: {nodes: 1}}});
const collection = st.s.getDB(dbName).getCollection(collName);
CreateShardedCollectionUtil.shardCollectionWithChunks(collection, {x: 1}, [
@@ -81,8 +90,23 @@ let runTest = function(withStepUp) {
let prepareTimestamp = res.prepareTimestamp;
- if (withStepUp) {
+ if (testMode == TestMode.kWithStepUp) {
st.rs0.stepUp(st.rs0.getSecondary());
+ } else if (testMode == TestMode.kWithRestart) {
+ TestData.skipCollectionAndIndexValidation = true;
+ st.rs0.restart(st.rs0.getPrimary());
+ st.rs0.waitForPrimary();
+ TestData.skipCollectionAndIndexValidation = false;
+
+ assert.soon(() => {
+ try {
+ st.shard0.getDB(dbName).getCollection(collName).findOne();
+ return true;
+ } catch (ex) {
+ print("Caught expected once exception due to restart: " + tojson(ex));
+ return false;
+ }
+ });
}
const joinMoveChunk =
@@ -157,9 +181,9 @@ let runTest = function(withStepUp) {
st.stop();
};
-runTest(false);
-// TODO: SERVER-71219 Enable test after fixing.
-// runTest(true);
+runTest(TestMode.kBasic);
+runTest(TestMode.kWithStepUp);
+runTest(TestMode.kWithRestart);
MongoRunner.stopMongod(staticMongod);
})();
diff --git a/src/mongo/db/auth/auth_op_observer.h b/src/mongo/db/auth/auth_op_observer.h
index c145ebc3371..f0afcd2a860 100644
--- a/src/mongo/db/auth/auth_op_observer.h
+++ b/src/mongo/db/auth/auth_op_observer.h
@@ -206,6 +206,10 @@ public:
size_t numberOfPrePostImagesToWrite,
Date_t wallClockTime) final {}
+ void onTransactionPrepareNonPrimary(OperationContext* opCtx,
+ const std::vector<repl::OplogEntry>& statements,
+ const repl::OpTime& prepareOpTime) final {}
+
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final {}
diff --git a/src/mongo/db/fcv_op_observer.h b/src/mongo/db/fcv_op_observer.h
index 7380f65358f..8ee4a32dc05 100644
--- a/src/mongo/db/fcv_op_observer.h
+++ b/src/mongo/db/fcv_op_observer.h
@@ -203,6 +203,10 @@ public:
size_t numberOfPrePostImagesToWrite,
Date_t wallClockTime) final{};
+ void onTransactionPrepareNonPrimary(OperationContext* opCtx,
+ const std::vector<repl::OplogEntry>& statements,
+ const repl::OpTime& prepareOpTime) final {}
+
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final{};
diff --git a/src/mongo/db/free_mon/free_mon_op_observer.h b/src/mongo/db/free_mon/free_mon_op_observer.h
index 498d728d187..4e1b3d03a3e 100644
--- a/src/mongo/db/free_mon/free_mon_op_observer.h
+++ b/src/mongo/db/free_mon/free_mon_op_observer.h
@@ -206,6 +206,10 @@ public:
size_t numberOfPrePostImagesToWrite,
Date_t wallClockTime) final {}
+ void onTransactionPrepareNonPrimary(OperationContext* opCtx,
+ const std::vector<repl::OplogEntry>& statements,
+ const repl::OpTime& prepareOpTime) final {}
+
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final {}
diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h
index 163d4738b0e..5332973563d 100644
--- a/src/mongo/db/op_observer.h
+++ b/src/mongo/db/op_observer.h
@@ -496,6 +496,15 @@ public:
Date_t wallClockTime) = 0;
/**
+ * This is called when a transaction transitions into prepare while it is not primary. Example
+ * case can include secondary oplog application or when node was restared and tries to
+ * recover prepared transactions from the oplog.
+ */
+ virtual void onTransactionPrepareNonPrimary(OperationContext* opCtx,
+ const std::vector<repl::OplogEntry>& statements,
+ const repl::OpTime& prepareOpTime) = 0;
+
+ /**
* The onTransactionAbort method is called when an atomic transaction aborts, before the
* RecoveryUnit onRollback() is called. It must not be called when the transaction to abort is
* active.
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index f045cff94ff..61a960bea4b 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -2218,6 +2218,12 @@ void OpObserverImpl::onTransactionPrepare(
shardObserveTransactionPrepareOrUnpreparedCommit(opCtx, *statements, prepareOpTime);
}
+void OpObserverImpl::onTransactionPrepareNonPrimary(OperationContext* opCtx,
+ const std::vector<repl::OplogEntry>& statements,
+ const repl::OpTime& prepareOpTime) {
+ shardObserveNonPrimaryTransactionPrepare(opCtx, statements, prepareOpTime);
+}
+
void OpObserverImpl::onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) {
invariant(opCtx->getTxnNumber());
diff --git a/src/mongo/db/op_observer_impl.h b/src/mongo/db/op_observer_impl.h
index 77676e9548e..55c32faf581 100644
--- a/src/mongo/db/op_observer_impl.h
+++ b/src/mongo/db/op_observer_impl.h
@@ -209,6 +209,11 @@ public:
const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment,
size_t numberOfPrePostImagesToWrite,
Date_t wallClockTime) final;
+
+ void onTransactionPrepareNonPrimary(OperationContext* opCtx,
+ const std::vector<repl::OplogEntry>& statements,
+ const repl::OpTime& prepareOpTime) final;
+
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final;
void onMajorityCommitPointUpdate(ServiceContext* service,
@@ -244,6 +249,10 @@ private:
OperationContext* opCtx,
const std::vector<repl::ReplOperation>& stmts,
const repl::OpTime& prepareOrCommitOptime) {}
+ virtual void shardObserveNonPrimaryTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<repl::OplogEntry>& stmts,
+ const repl::OpTime& prepareOrCommitOptime) {}
void _onReplicationRollback(OperationContext* opCtx, const RollbackObserverInfo& rbInfo) final;
};
diff --git a/src/mongo/db/op_observer_noop.h b/src/mongo/db/op_observer_noop.h
index 667164d7564..feaf3a13f96 100644
--- a/src/mongo/db/op_observer_noop.h
+++ b/src/mongo/db/op_observer_noop.h
@@ -183,6 +183,9 @@ public:
const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment,
size_t numberOfPrePostImagesToWrite,
Date_t wallClockTime) override{};
+ void onTransactionPrepareNonPrimary(OperationContext* opCtx,
+ const std::vector<repl::OplogEntry>& statements,
+ const repl::OpTime& prepareOpTime) override {}
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) override{};
void onMajorityCommitPointUpdate(ServiceContext* service,
diff --git a/src/mongo/db/op_observer_registry.h b/src/mongo/db/op_observer_registry.h
index 4b30bc7c72b..2792854d735 100644
--- a/src/mongo/db/op_observer_registry.h
+++ b/src/mongo/db/op_observer_registry.h
@@ -425,6 +425,15 @@ public:
}
}
+ void onTransactionPrepareNonPrimary(OperationContext* opCtx,
+ const std::vector<repl::OplogEntry>& statements,
+ const repl::OpTime& prepareOpTime) override {
+ ReservedTimes times{opCtx};
+ for (auto& observer : _observers) {
+ observer->onTransactionPrepareNonPrimary(opCtx, statements, prepareOpTime);
+ }
+ }
+
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) override {
ReservedTimes times{opCtx};
diff --git a/src/mongo/db/repl/primary_only_service_op_observer.h b/src/mongo/db/repl/primary_only_service_op_observer.h
index 5f552149f98..76c225a483c 100644
--- a/src/mongo/db/repl/primary_only_service_op_observer.h
+++ b/src/mongo/db/repl/primary_only_service_op_observer.h
@@ -208,6 +208,10 @@ public:
size_t numberOfPrePostImagesToWrite,
Date_t wallClockTime) final {}
+ void onTransactionPrepareNonPrimary(OperationContext* opCtx,
+ const std::vector<repl::OplogEntry>& statements,
+ const repl::OpTime& prepareOpTime) final {}
+
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final {}
diff --git a/src/mongo/db/repl/tenant_migration_donor_op_observer.h b/src/mongo/db/repl/tenant_migration_donor_op_observer.h
index 43992f5e040..403d2b6b486 100644
--- a/src/mongo/db/repl/tenant_migration_donor_op_observer.h
+++ b/src/mongo/db/repl/tenant_migration_donor_op_observer.h
@@ -188,6 +188,10 @@ public:
Timestamp commitTimestamp,
const std::vector<repl::ReplOperation>& statements) noexcept final {}
+ void onTransactionPrepareNonPrimary(OperationContext* opCtx,
+ const std::vector<repl::OplogEntry>& statements,
+ const repl::OpTime& prepareOpTime) final {}
+
std::unique_ptr<ApplyOpsOplogSlotAndOperationAssignment> preTransactionPrepare(
OperationContext* opCtx,
const std::vector<OplogSlot>& reservedSlots,
diff --git a/src/mongo/db/repl/tenant_migration_recipient_op_observer.h b/src/mongo/db/repl/tenant_migration_recipient_op_observer.h
index dd42ff6581f..7f5181a6d4a 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_op_observer.h
+++ b/src/mongo/db/repl/tenant_migration_recipient_op_observer.h
@@ -207,6 +207,10 @@ public:
size_t numberOfPrePostImagesToWrite,
Date_t wallClockTime) final {}
+ void onTransactionPrepareNonPrimary(OperationContext* opCtx,
+ const std::vector<repl::OplogEntry>& statements,
+ const repl::OpTime& prepareOpTime) final {}
+
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final {}
diff --git a/src/mongo/db/repl/transaction_oplog_application.cpp b/src/mongo/db/repl/transaction_oplog_application.cpp
index b34403b0de6..94d06c02665 100644
--- a/src/mongo/db/repl/transaction_oplog_application.cpp
+++ b/src/mongo/db/repl/transaction_oplog_application.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/concurrency/exception_util.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/index_builds_coordinator.h"
+#include "mongo/db/op_observer.h"
#include "mongo/db/repl/apply_ops.h"
#include "mongo/db/repl/storage_interface_impl.h"
#include "mongo/db/repl/timestamp_block.h"
@@ -498,6 +499,11 @@ Status _applyPrepareTransaction(OperationContext* opCtx,
}
txnParticipant.prepareTransaction(opCtx, entry.getOpTime());
+
+ auto opObserver = opCtx->getServiceContext()->getOpObserver();
+ invariant(opObserver);
+ opObserver->onTransactionPrepareNonPrimary(opCtx, ops, entry.getOpTime());
+
// Prepare transaction success.
abortOnError.dismiss();
diff --git a/src/mongo/db/s/collection_metadata.cpp b/src/mongo/db/s/collection_metadata.cpp
index ea9f6f0c7c1..6b0fb689356 100644
--- a/src/mongo/db/s/collection_metadata.cpp
+++ b/src/mongo/db/s/collection_metadata.cpp
@@ -107,13 +107,13 @@ void CollectionMetadata::throwIfReshardingInProgress(NamespaceString const& nss)
}
}
-BSONObj CollectionMetadata::extractDocumentKey(const BSONObj& doc) const {
+BSONObj CollectionMetadata::extractDocumentKey(const ShardKeyPattern* shardKeyPattern,
+ const BSONObj& doc) {
BSONObj key;
- if (isSharded()) {
- auto const& pattern = _cm->getShardKeyPattern();
- key = dotted_path_support::extractElementsBasedOnTemplate(doc, pattern.toBSON());
- if (pattern.hasId()) {
+ if (shardKeyPattern) {
+ key = dotted_path_support::extractElementsBasedOnTemplate(doc, shardKeyPattern->toBSON());
+ if (shardKeyPattern->hasId()) {
return key;
}
// else, try to append an _id field from the document.
@@ -127,6 +127,10 @@ BSONObj CollectionMetadata::extractDocumentKey(const BSONObj& doc) const {
return doc;
}
+BSONObj CollectionMetadata::extractDocumentKey(const BSONObj& doc) const {
+ return extractDocumentKey(isSharded() ? &_cm->getShardKeyPattern() : nullptr, doc);
+}
+
std::string CollectionMetadata::toStringBasic() const {
if (isSharded()) {
return str::stream() << "collection version: " << _cm->getVersion().toString()
diff --git a/src/mongo/db/s/collection_metadata.h b/src/mongo/db/s/collection_metadata.h
index b691f94ebe5..c2b323c0cc7 100644
--- a/src/mongo/db/s/collection_metadata.h
+++ b/src/mongo/db/s/collection_metadata.h
@@ -162,6 +162,12 @@ public:
BSONObj extractDocumentKey(const BSONObj& doc) const;
/**
+ * Static version of the function above. Only use this for internal sharding operations where
+ * shard key pattern is fixed and cannot change.
+ */
+ static BSONObj extractDocumentKey(const ShardKeyPattern* shardKeyPattern, const BSONObj& doc);
+
+ /**
* String output of the collection and shard versions.
*/
std::string toStringBasic() const;
diff --git a/src/mongo/db/s/config_server_op_observer.h b/src/mongo/db/s/config_server_op_observer.h
index 18e83a4b994..2acd0017ab7 100644
--- a/src/mongo/db/s/config_server_op_observer.h
+++ b/src/mongo/db/s/config_server_op_observer.h
@@ -208,6 +208,10 @@ public:
size_t numberOfPrePostImagesToWrite,
Date_t wallClockTime) override {}
+ void onTransactionPrepareNonPrimary(OperationContext* opCtx,
+ const std::vector<repl::OplogEntry>& statements,
+ const repl::OpTime& prepareOpTime) override {}
+
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) override {}
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index 0b68b839712..2272599806e 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -33,20 +33,26 @@
#include "mongo/db/s/migration_chunk_cloner_source_legacy.h"
+#include <fmt/format.h>
+
#include "mongo/base/status.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/client/read_preference.h"
#include "mongo/db/catalog/index_catalog.h"
#include "mongo/db/catalog_raii.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/dbdirectclient.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/index/index_access_method.h"
#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/ops/write_ops_retryability.h"
+#include "mongo/db/query/get_executor.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/replication_process.h"
#include "mongo/db/s/collection_sharding_runtime.h"
#include "mongo/db/s/migration_source_manager.h"
+#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/shard_key_index_util.h"
#include "mongo/db/s/sharding_runtime_d_params_gen.h"
#include "mongo/db/s/sharding_statistics.h"
@@ -69,6 +75,8 @@
namespace mongo {
namespace {
+using namespace fmt::literals;
+
const char kRecvChunkStatus[] = "_recvChunkStatus";
const char kRecvChunkCommit[] = "_recvChunkCommit";
const char kRecvChunkAbort[] = "_recvChunkAbort";
@@ -107,9 +115,8 @@ BSONObj createRequestWithSessionId(StringData commandName,
return builder.obj();
}
-BSONObj getDocumentKeyFromReplOperation(repl::ReplOperation replOperation,
- repl::OpTypeEnum opType) {
- switch (opType) {
+BSONObj getDocumentKeyFromReplOperation(repl::ReplOperation replOperation) {
+ switch (replOperation.getOpType()) {
case repl::OpTypeEnum::kInsert:
case repl::OpTypeEnum::kDelete:
return replOperation.getObject();
@@ -168,6 +175,30 @@ private:
const repl::OpTime _opTime;
};
+LogTransactionOperationsForShardingHandler::LogTransactionOperationsForShardingHandler(
+ LogicalSessionId lsid,
+ const std::vector<repl::OplogEntry>& stmts,
+ repl::OpTime prepareOrCommitOpTime)
+ : _lsid(std::move(lsid)), _prepareOrCommitOpTime(std::move(prepareOrCommitOpTime)) {
+ _stmts.reserve(stmts.size());
+ _ownedReplBSONObj.reserve(stmts.size());
+
+ for (const auto& op : stmts) {
+ auto ownedBSON = op.getDurableReplOperation().toBSON().getOwned();
+ _ownedReplBSONObj.push_back(ownedBSON);
+ _stmts.push_back(
+ repl::ReplOperation::parse({"MigrationChunkClonerSource_toReplOperation"}, ownedBSON));
+ }
+}
+
+LogTransactionOperationsForShardingHandler::LogTransactionOperationsForShardingHandler(
+ LogicalSessionId lsid,
+ const std::vector<repl::ReplOperation>& stmts,
+ repl::OpTime prepareOrCommitOpTime)
+ : _lsid(std::move(lsid)),
+ _stmts(stmts),
+ _prepareOrCommitOpTime(std::move(prepareOrCommitOpTime)) {}
+
void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestamp>) {
std::set<NamespaceString> namespacesTouchedByTransaction;
@@ -225,7 +256,7 @@ void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestam
continue;
}
- auto preImageDocKey = getDocumentKeyFromReplOperation(stmt, opType);
+ auto preImageDocKey = getDocumentKeyFromReplOperation(stmt);
auto idElement = preImageDocKey["_id"];
if (idElement.eoo()) {
@@ -235,52 +266,35 @@ void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestam
continue;
}
- auto const& minKey = cloner->_args.getMin().get();
- auto const& maxKey = cloner->_args.getMax().get();
- auto const& shardKeyPattern = cloner->_shardKeyPattern;
-
- // Note: This assumes that prepared transactions will always have post document key
- // set. There is a small window where create collection coordinator releases the critical
- // section and before it writes down the chunks for non-empty collections. So in theory,
- // it is possible to have a prepared transaction while collection is unsharded
- // and becomes sharded midway. This doesn't happen in practice because the only way to
- // have a prepared transactions without being sharded is by directly connecting to the
- // shards and manually preparing the transaction. Another exception is when transaction
- // is prepared on an older version that doesn't set the post image document key.
- auto postImageDocKey = stmt.getPostImageDocumentKey();
- if (postImageDocKey.isEmpty()) {
- LOGV2_WARNING(
- 6836102,
- "Migration encountered a transaction operation without a post image document key",
- "preImageDocKey"_attr = preImageDocKey);
- } else {
- auto postShardKeyValues =
- shardKeyPattern.extractShardKeyFromDocumentKey(postImageDocKey);
- fassert(6836100, !postShardKeyValues.isEmpty());
-
- if (!isShardKeyValueInRange(postShardKeyValues, minKey, maxKey)) {
- // If the preImageDoc is not in range but the postImageDoc was, we know that the
- // document has changed shard keys and no longer belongs in the chunk being cloned.
- // We will model the deletion of the preImage document so that the destination chunk
- // does not receive an outdated version of this document.
-
- auto preImageShardKeyValues =
- shardKeyPattern.extractShardKeyFromDocumentKey(preImageDocKey);
- fassert(6836101, !preImageShardKeyValues.isEmpty());
-
- if (opType == repl::OpTypeEnum::kUpdate &&
- isShardKeyValueInRange(preImageShardKeyValues, minKey, maxKey)) {
- opType = repl::OpTypeEnum::kDelete;
- idElement = postImageDocKey["_id"];
- } else {
+ if (opType == repl::OpTypeEnum::kUpdate) {
+ auto const& shardKeyPattern = cloner->_shardKeyPattern;
+ auto preImageShardKeyValues =
+ shardKeyPattern.extractShardKeyFromDocumentKey(preImageDocKey);
+
+ // If prepare was performed from another term, we will not have the post image doc key
+ // since it is not persisted in the oplog.
+ auto postImageDocKey = stmt.getPostImageDocumentKey();
+ if (!postImageDocKey.isEmpty()) {
+ if (!cloner->_processUpdateForXferMod(preImageDocKey, postImageDocKey)) {
+ // We don't need to add this op to session migration if neither post or pre
+ // image doc falls within the chunk range.
continue;
}
+ } else {
+ // We can't perform reads here using the same recovery unit because the transaction
+ // is already committed. We instead defer performing the reads when xferMods command
+ // is called. Also allow this op to be added to session migration since we can't
+ // tell whether post image doc will fall within the chunk range. If it turns out
+ // both preImage and postImage doc don't fall into the chunk range, it is not wrong
+ // for this op to be added to session migration, but it will result in wasted work
+ // and unneccesary extra oplog storage on the destination.
+ cloner->_deferProcessingForXferMod(preImageDocKey);
}
+ } else {
+ cloner->_addToTransferModsQueue(idElement.wrap(), getOpCharForCrudOpType(opType), {});
}
addToSessionMigrationOptimeQueueIfNeeded(cloner, nss, _prepareOrCommitOpTime);
-
- cloner->_addToTransferModsQueue(idElement.wrap(), getOpCharForCrudOpType(opType), {});
}
}
@@ -800,11 +814,78 @@ Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* opCtx,
return Status::OK();
}
+bool MigrationChunkClonerSourceLegacy::_processUpdateForXferMod(const BSONObj& preImageDocKey,
+ const BSONObj& postImageDocKey) {
+ auto const& minKey = _args.getMin().value();
+ auto const& maxKey = _args.getMax().value();
+
+ auto postShardKeyValues = _shardKeyPattern.extractShardKeyFromDocumentKey(postImageDocKey);
+ fassert(6836100, !postShardKeyValues.isEmpty());
+
+ auto opType = repl::OpTypeEnum::kUpdate;
+ auto idElement = preImageDocKey["_id"];
+
+ if (!isShardKeyValueInRange(postShardKeyValues, minKey, maxKey)) {
+ // If the preImageDoc is not in range but the postImageDoc was, we know that the
+ // document has changed shard keys and no longer belongs in the chunk being cloned.
+ // We will model the deletion of the preImage document so that the destination chunk
+ // does not receive an outdated version of this document.
+
+ auto preImageShardKeyValues =
+ _shardKeyPattern.extractShardKeyFromDocumentKey(preImageDocKey);
+ fassert(6836101, !preImageShardKeyValues.isEmpty());
+
+ if (!isShardKeyValueInRange(preImageShardKeyValues, minKey, maxKey)) {
+ return false;
+ }
+
+ opType = repl::OpTypeEnum::kDelete;
+ idElement = postImageDocKey["_id"];
+ }
+
+ _addToTransferModsQueue(idElement.wrap(), getOpCharForCrudOpType(opType), {});
+
+ return true;
+}
+
+void MigrationChunkClonerSourceLegacy::_deferProcessingForXferMod(const BSONObj& preImageDocKey) {
+ stdx::lock_guard<Latch> sl(_mutex);
+ _deferredReloadOrDeletePreImageDocKeys.push_back(preImageDocKey.getOwned());
+ _deferredUntransferredOpsCounter++;
+}
+
+void MigrationChunkClonerSourceLegacy::_processDeferredXferMods(OperationContext* opCtx,
+ Database* db) {
+ std::vector<BSONObj> deferredReloadOrDeletePreImageDocKeys;
+
+ {
+ stdx::unique_lock lk(_mutex);
+ deferredReloadOrDeletePreImageDocKeys.swap(_deferredReloadOrDeletePreImageDocKeys);
+ }
+
+ for (const auto& preImageDocKey : deferredReloadOrDeletePreImageDocKeys) {
+ auto idElement = preImageDocKey["_id"];
+ BSONObj newerVersionDoc;
+ if (!Helpers::findById(opCtx, db, nss().ns(), BSON("_id" << idElement), newerVersionDoc)) {
+ // If the document can no longer be found, this means that another later op must have
+ // deleted it. That delete would have been captured by the xferMods so nothing else to
+ // do here.
+ continue;
+ }
+
+ auto postImageDocKey =
+ CollectionMetadata::extractDocumentKey(&_shardKeyPattern, newerVersionDoc);
+ static_cast<void>(_processUpdateForXferMod(preImageDocKey, postImageDocKey));
+ }
+}
+
Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx,
Database* db,
BSONObjBuilder* builder) {
dassert(opCtx->lockState()->isCollectionLockedForMode(nss(), MODE_IS));
+ _processDeferredXferMods(opCtx, db);
+
std::list<BSONObj> deleteList;
std::list<BSONObj> updateList;
@@ -849,6 +930,7 @@ Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx,
_untransferredDeletesCounter = _deleted.size();
_reload.splice(_reload.cbegin(), updateList);
_untransferredUpsertsCounter = _reload.size();
+ _deferredUntransferredOpsCounter = _deferredReloadOrDeletePreImageDocKeys.size();
return Status::OK();
}
@@ -863,6 +945,8 @@ void MigrationChunkClonerSourceLegacy::_cleanup() {
_untransferredUpsertsCounter = 0;
_deleted.clear();
_untransferredDeletesCounter = 0;
+ _deferredReloadOrDeletePreImageDocKeys.clear();
+ _deferredUntransferredOpsCounter = 0;
}
StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::_callRecipient(OperationContext* opCtx,
@@ -1111,7 +1195,8 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC
stdx::lock_guard<Latch> sl(_mutex);
int64_t untransferredModsSizeBytes = _untransferredDeletesCounter * _averageObjectIdSize +
- _untransferredUpsertsCounter * _averageObjectSizeForCloneLocs;
+ (_untransferredUpsertsCounter + _deferredUntransferredOpsCounter) *
+ _averageObjectSizeForCloneLocs;
if (_forceJumbo && _jumboChunkCloneState) {
LOGV2(21992,
@@ -1177,6 +1262,7 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC
"moveChunk data transfer within threshold to allow write blocking",
"_untransferredUpsertsCounter"_attr = _untransferredUpsertsCounter,
"_untransferredDeletesCounter"_attr = _untransferredDeletesCounter,
+ "_deferredUntransferredOpsCounter"_attr = _deferredUntransferredOpsCounter,
"_averageObjectSizeForCloneLocs"_attr = _averageObjectSizeForCloneLocs,
"_averageObjectIdSize"_attr = _averageObjectIdSize,
"untransferredModsSizeBytes"_attr = untransferredModsSizeBytes,
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
index 803c1f512af..4fff7da8d17 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
@@ -67,13 +67,13 @@ const long long kFixedCommandOverhead = 32 * 1024;
*/
class LogTransactionOperationsForShardingHandler final : public RecoveryUnit::Change {
public:
- /**
- * Invariant: idObj should belong to a document that is part of the active chunk being migrated
- */
- LogTransactionOperationsForShardingHandler(const LogicalSessionId lsid,
+ LogTransactionOperationsForShardingHandler(LogicalSessionId lsid,
+ const std::vector<repl::OplogEntry>& stmts,
+ repl::OpTime prepareOrCommitOpTime);
+
+ LogTransactionOperationsForShardingHandler(LogicalSessionId lsid,
const std::vector<repl::ReplOperation>& stmts,
- const repl::OpTime& prepareOrCommitOpTime)
- : _lsid(lsid), _stmts(stmts), _prepareOrCommitOpTime(prepareOrCommitOpTime) {}
+ repl::OpTime prepareOrCommitOpTime);
void commit(boost::optional<Timestamp>) override;
@@ -81,6 +81,8 @@ public:
private:
const LogicalSessionId _lsid;
+ // Use to keep BSON obj alive for the lifetime of this object.
+ std::vector<BSONObj> _ownedReplBSONObj;
std::vector<repl::ReplOperation> _stmts;
const repl::OpTime _prepareOrCommitOpTime;
};
@@ -486,6 +488,23 @@ private:
*/
Status _checkRecipientCloningStatus(OperationContext* opCtx, Milliseconds maxTimeToWait);
+ /**
+ * Inspects the pre and post image document keys and determines which xferMods bucket to
+ * add a new entry. Returns false if neither pre or post image document keys fall into
+ * the chunk boundaries being migrated.
+ */
+ bool _processUpdateForXferMod(const BSONObj& preImageDocKey, const BSONObj& postImageDocKey);
+
+ /**
+ * Defer processing of update ops into xferMods entries to when nextModsBatch is called.
+ */
+ void _deferProcessingForXferMod(const BSONObj& preImageDocKey);
+
+ /**
+ * Converts all deferred update ops captured by the op observer into xferMods entries.
+ */
+ void _processDeferredXferMods(OperationContext* opCtx, Database* database);
+
// The original move range request
const ShardsvrMoveRange _args;
@@ -547,6 +566,13 @@ private:
// Amount of delete xfer mods that have not yet reached the recipient.
size_t _untransferredDeletesCounter{0};
+ // Amount of ops that are yet to be converted to update/delete xferMods.
+ size_t _deferredUntransferredOpsCounter{0};
+
+ // Stores document keys of document that needs to be examined if we need to put in to xferMods
+ // list later.
+ std::vector<BSONObj> _deferredReloadOrDeletePreImageDocKeys;
+
// Total bytes in _reload + _deleted (xfer mods)
uint64_t _memoryUsed{0};
diff --git a/src/mongo/db/s/op_observer_sharding_impl.cpp b/src/mongo/db/s/op_observer_sharding_impl.cpp
index ab8ce8ca5e8..860fe7ad050 100644
--- a/src/mongo/db/s/op_observer_sharding_impl.cpp
+++ b/src/mongo/db/s/op_observer_sharding_impl.cpp
@@ -241,4 +241,14 @@ void OpObserverShardingImpl::shardObserveTransactionPrepareOrUnpreparedCommit(
*opCtx->getLogicalSessionId(), stmts, prepareOrCommitOptime));
}
+void OpObserverShardingImpl::shardObserveNonPrimaryTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<repl::OplogEntry>& stmts,
+ const repl::OpTime& prepareOrCommitOptime) {
+
+ opCtx->recoveryUnit()->registerChange(
+ std::make_unique<LogTransactionOperationsForShardingHandler>(
+ *opCtx->getLogicalSessionId(), stmts, prepareOrCommitOptime));
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/op_observer_sharding_impl.h b/src/mongo/db/s/op_observer_sharding_impl.h
index f9005497c57..d7295482332 100644
--- a/src/mongo/db/s/op_observer_sharding_impl.h
+++ b/src/mongo/db/s/op_observer_sharding_impl.h
@@ -74,6 +74,10 @@ protected:
OperationContext* opCtx,
const std::vector<repl::ReplOperation>& stmts,
const repl::OpTime& prepareOrCommitOptime) override;
+ void shardObserveNonPrimaryTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<repl::OplogEntry>& stmts,
+ const repl::OpTime& prepareOrCommitOptime) override;
};
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_op_observer.h b/src/mongo/db/s/resharding/resharding_op_observer.h
index 30d319a041d..e8affe3ef4a 100644
--- a/src/mongo/db/s/resharding/resharding_op_observer.h
+++ b/src/mongo/db/s/resharding/resharding_op_observer.h
@@ -228,6 +228,10 @@ public:
size_t numberOfPrePostImagesToWrite,
Date_t wallClockTime) override {}
+ void onTransactionPrepareNonPrimary(OperationContext* opCtx,
+ const std::vector<repl::OplogEntry>& statements,
+ const repl::OpTime& prepareOpTime) override {}
+
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) override {}
diff --git a/src/mongo/db/s/shard_server_op_observer.h b/src/mongo/db/s/shard_server_op_observer.h
index 5a0671254d6..78e84b664cf 100644
--- a/src/mongo/db/s/shard_server_op_observer.h
+++ b/src/mongo/db/s/shard_server_op_observer.h
@@ -207,6 +207,10 @@ public:
size_t numberOfPrePostImagesToWrite,
Date_t wallClockTime) override {}
+ void onTransactionPrepareNonPrimary(OperationContext* opCtx,
+ const std::vector<repl::OplogEntry>& statements,
+ const repl::OpTime& prepareOpTime) override {}
+
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) override {}
diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer.h b/src/mongo/db/serverless/shard_split_donor_op_observer.h
index 55796479ffe..d8b607db6df 100644
--- a/src/mongo/db/serverless/shard_split_donor_op_observer.h
+++ b/src/mongo/db/serverless/shard_split_donor_op_observer.h
@@ -204,6 +204,10 @@ public:
size_t numberOfPrePostImagesToWrite,
Date_t wallClockTime) final {}
+ void onTransactionPrepareNonPrimary(OperationContext* opCtx,
+ const std::vector<repl::OplogEntry>& statements,
+ const repl::OpTime& prepareOpTime) final {}
+
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final {}
diff --git a/src/mongo/db/user_write_block_mode_op_observer.h b/src/mongo/db/user_write_block_mode_op_observer.h
index 330779d0cd6..b021431befc 100644
--- a/src/mongo/db/user_write_block_mode_op_observer.h
+++ b/src/mongo/db/user_write_block_mode_op_observer.h
@@ -227,6 +227,10 @@ public:
size_t numberOfPrePostImagesToWrite,
Date_t wallClockTime) final {}
+ void onTransactionPrepareNonPrimary(OperationContext* opCtx,
+ const std::vector<repl::OplogEntry>& statements,
+ const repl::OpTime& prepareOpTime) final {}
+
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final {}
diff --git a/src/mongo/idl/cluster_server_parameter_op_observer.h b/src/mongo/idl/cluster_server_parameter_op_observer.h
index 2ef05729e39..f227598fc12 100644
--- a/src/mongo/idl/cluster_server_parameter_op_observer.h
+++ b/src/mongo/idl/cluster_server_parameter_op_observer.h
@@ -212,6 +212,10 @@ public:
size_t numberOfPrePostImagesToWrite,
Date_t wallClockTime) final {}
+ void onTransactionPrepareNonPrimary(OperationContext* opCtx,
+ const std::vector<repl::OplogEntry>& statements,
+ const repl::OpTime& prepareOpTime) final {}
+
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final {}