diff options
author | Cheahuychou Mao <mao.cheahuychou@gmail.com> | 2022-03-23 15:37:57 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-03-23 16:42:28 +0000 |
commit | 7c4fa21deaee0f1e0280a1dc129c7e5da82df99b (patch) | |
tree | 04d30e8536ad1e19d20c676fee1b750b18ab24c8 /src/mongo | |
parent | 67971abe5270e4a7f04cb3ca7decbcdc5dc8af39 (diff) | |
download | mongo-7c4fa21deaee0f1e0280a1dc129c7e5da82df99b.tar.gz |
SERVER-63880 Make resharding handle applyOps oplog entries with WouldChangeOwningShard sentinel noop entry
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/exec/update_stage.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/exec/upsert_stage.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_retryability.h | 3 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_oplog_batch_preparer.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_oplog_batch_preparer_test.cpp | 64 | ||||
-rw-r--r-- | src/mongo/db/service_entry_point_common.cpp | 20 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.cpp | 17 | ||||
-rw-r--r-- | src/mongo/s/would_change_owning_shard_exception.cpp | 1 | ||||
-rw-r--r-- | src/mongo/s/would_change_owning_shard_exception.h | 14 |
9 files changed, 115 insertions, 29 deletions
diff --git a/src/mongo/db/exec/update_stage.cpp b/src/mongo/db/exec/update_stage.cpp index 2d4a5217308..7c793764ba3 100644 --- a/src/mongo/db/exec/update_stage.cpp +++ b/src/mongo/db/exec/update_stage.cpp @@ -672,7 +672,8 @@ bool UpdateStage::wasReshardingKeyUpdated(const ShardingWriteRouter& shardingWri auto newRecipShard = *shardingWriteRouter.getReshardingDestinedRecipient(newObj); uassert( - WouldChangeOwningShardInfo(oldObj.value(), newObj, false /* upsert */, collection()->ns()), + WouldChangeOwningShardInfo( + oldObj.value(), newObj, false /* upsert */, collection()->ns(), collection()->uuid()), "This update would cause the doc to change owning shards under the new shard key", oldRecipShard == newRecipShard); @@ -751,8 +752,11 @@ bool UpdateStage::wasExistingShardKeyUpdated(const ShardingWriteRouter& sharding hangBeforeThrowWouldChangeOwningShard.pauseWhileSet(opCtx()); } - uasserted(WouldChangeOwningShardInfo( - oldObj.value(), newObj, false /* upsert */, collection()->ns()), + uasserted(WouldChangeOwningShardInfo(oldObj.value(), + newObj, + false /* upsert */, + collection()->ns(), + collection()->uuid()), "This update would cause the doc to change owning shards"); } diff --git a/src/mongo/db/exec/upsert_stage.cpp b/src/mongo/db/exec/upsert_stage.cpp index beab4e6a9d7..66a4f823190 100644 --- a/src/mongo/db/exec/upsert_stage.cpp +++ b/src/mongo/db/exec/upsert_stage.cpp @@ -147,7 +147,8 @@ void UpsertStage::_performInsert(BSONObj newDocument) { uasserted(WouldChangeOwningShardInfo(_params.request->getQuery(), newDocument, true /* upsert */, - collection()->ns()), + collection()->ns(), + collection()->uuid()), "The document we are inserting belongs on a different shard"); } } diff --git a/src/mongo/db/ops/write_ops_retryability.h b/src/mongo/db/ops/write_ops_retryability.h index ad3be539fd7..525847f89f4 100644 --- a/src/mongo/db/ops/write_ops_retryability.h +++ b/src/mongo/db/ops/write_ops_retryability.h @@ -43,7 +43,8 @@ const BSONObj kWouldChangeOwningShardSentinel(BSON("$wouldChangeOwningShard" << template <typename OplogEntryType> bool isWouldChangeOwningShardSentinelOplogEntry(const OplogEntryType& oplogEntry) { return (oplogEntry.getOpType() == repl::OpTypeEnum::kNoop) && - (oplogEntry.getObject().woCompare(kWouldChangeOwningShardSentinel) == 0); + (oplogEntry.getObject().woCompare(kWouldChangeOwningShardSentinel) == 0) && + oplogEntry.getObject2() && oplogEntry.getObject2()->isEmpty(); } /** diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.cpp index bde8901926a..95240e60907 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.cpp @@ -35,6 +35,7 @@ #include "mongo/bson/bsonelement_comparator.h" #include "mongo/db/logical_session_id.h" +#include "mongo/db/ops/write_ops_retryability.h" #include "mongo/db/query/collation/collator_interface.h" #include "mongo/db/repl/apply_ops.h" #include "mongo/db/s/resharding/resharding_server_parameters_gen.h" @@ -128,8 +129,6 @@ WriterVectors ReshardingOplogBatchPreparer::makeCrudOpWriterVectors( } auto applyOpsInfo = repl::ApplyOpsCommandInfo::parse(op.getObject()); - // TODO (SERVER-63880): Make resharding handle applyOps oplog entries with - // WouldChangeOwningShard sentinel noop entry. uassert( ErrorCodes::OplogOperationUnsupported, str::stream() << "Commands within applyOps are not supported during resharding: " @@ -143,6 +142,10 @@ WriterVectors ReshardingOplogBatchPreparer::makeCrudOpWriterVectors( unrolledOp.setDurableReplOperation(repl::DurableReplOperation::parse( {"ReshardingOplogBatchPreparer::makeCrudOpWriterVectors innerOp"}, innerOp)); + if (isWouldChangeOwningShardSentinelOplogEntry(unrolledOp)) { + continue; + } + // There isn't a direct way to convert from a MutableOplogEntry to a // DurableOplogEntry or OplogEntry. We serialize the unrolledOp to have it get // re-parsed into an OplogEntry. @@ -214,8 +217,6 @@ WriterVectors ReshardingOplogBatchPreparer::makeSessionOpWriterVectors( // transaction applyOps oplog entry. auto applyOpsInfo = repl::ApplyOpsCommandInfo::parse(op.getObject()); - // TODO (SERVER-63880): Make resharding handle applyOps oplog entries with - // WouldChangeOwningShard sentinel noop entry. uassert(ErrorCodes::OplogOperationUnsupported, str::stream() << "Commands within applyOps are not supported during resharding: " @@ -241,7 +242,8 @@ WriterVectors ReshardingOplogBatchPreparer::makeSessionOpWriterVectors( // DurableOplogEntry or OplogEntry. We serialize the unrolledOp to have it get // re-parsed into an OplogEntry. auto& derivedOp = derivedOps.emplace_back(unrolledOp.toBSON()); - invariant(derivedOp.isCrudOpType()); + invariant(derivedOp.isCrudOpType() || + isWouldChangeOwningShardSentinelOplogEntry(unrolledOp)); // `&derivedOp` is guaranteed to remain stable while we append more derived // oplog entries because `derivedOps` is a std::list. diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer_test.cpp index c98104fcf3a..31d073a4790 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer_test.cpp @@ -32,6 +32,7 @@ #include <boost/optional/optional_io.hpp> #include "mongo/db/logical_session_id_helpers.h" +#include "mongo/db/ops/write_ops_retryability.h" #include "mongo/db/query/collation/collator_interface.h" #include "mongo/db/s/resharding/resharding_oplog_batch_preparer.h" #include "mongo/db/s/resharding/resharding_server_parameters_gen.h" @@ -77,19 +78,31 @@ protected: boost::optional<TxnNumber> txnNumber = boost::none, boost::optional<bool> isPrepare = boost::none, boost::optional<bool> isPartial = boost::none) { - BSONObjBuilder applyOpsBuilder; - - BSONArrayBuilder opsArrayBuilder = applyOpsBuilder.subarrayStart("applyOps"); + std::vector<repl::DurableReplOperation> ops; for (const auto& document : documents) { auto insertOp = repl::DurableReplOperation(repl::OpTypeEnum::kInsert, {}, document); if (lsid && isInternalSessionForRetryableWrite(*lsid)) { - if (!document.hasField("_id")) { - continue; + if (document.hasField("_id")) { + auto id = document.getIntField("_id"); + insertOp.setStatementIds({{id}}); } - auto id = document.getIntField("_id"); - insertOp.setStatementIds({{id}}); } - opsArrayBuilder.append(insertOp.toBSON()); + ops.emplace_back(insertOp); + } + + return makeApplyOpsOplogEntry(ops, lsid, txnNumber, isPrepare, isPartial); + } + + repl::OplogEntry makeApplyOpsOplogEntry(std::vector<repl::DurableReplOperation> ops, + boost::optional<LogicalSessionId> lsid = boost::none, + boost::optional<TxnNumber> txnNumber = boost::none, + boost::optional<bool> isPrepare = boost::none, + boost::optional<bool> isPartial = boost::none) { + BSONObjBuilder applyOpsBuilder; + + BSONArrayBuilder opsArrayBuilder = applyOpsBuilder.subarrayStart("applyOps"); + for (const auto& op : ops) { + opsArrayBuilder.append(op.toBSON()); } opsArrayBuilder.done(); @@ -414,6 +427,41 @@ TEST_F(ReshardingOplogBatchPreparerTest, DiscardsNoops) { runTest(makeLogicalSessionIdWithTxnNumberAndUUIDForTest(), txnNumber); } +TEST_F(ReshardingOplogBatchPreparerTest, + SessionWriterDoesNotDiscardWouldChangeOwningShardNoopForRetryableInternalTransaction) { + + const auto lsid = makeLogicalSessionIdWithTxnNumberAndUUIDForTest(); + const TxnNumber txnNumber{1}; + + OplogBatch batch; + + auto op = + repl::DurableReplOperation(repl::OpTypeEnum::kNoop, {}, kWouldChangeOwningShardSentinel); + op.setObject2(BSONObj()); + op.setStatementIds({{0}}); + batch.emplace_back(makeApplyOpsOplogEntry( + {op}, lsid, txnNumber, false /* isPrepare */, false /* isPartial */)); + + std::list<repl::OplogEntry> derivedOpsForCrudWriters; + auto crudWriterVectors = + _batchPreparer.makeCrudOpWriterVectors(batch, derivedOpsForCrudWriters); + ASSERT_EQ(crudWriterVectors.size(), kNumWriterVectors); + ASSERT_EQ(derivedOpsForCrudWriters.size(), 0U); + ASSERT_EQ(crudWriterVectors[0].size(), 0U); + ASSERT_EQ(crudWriterVectors[1].size(), 0U); + + std::list<repl::OplogEntry> derivedOpsForSessionWriters; + auto sessionWriterVectors = + _batchPreparer.makeSessionOpWriterVectors(batch, derivedOpsForSessionWriters); + ASSERT_EQ(sessionWriterVectors.size(), kNumWriterVectors); + auto writer = getNonEmptyWriterVector(sessionWriterVectors); + ASSERT_EQ(writer.size(), 1U); + ASSERT_EQ(derivedOpsForSessionWriters.size(), 1U); + ASSERT_EQ(writer[0]->getSessionId(), *getParentSessionId(lsid)); + ASSERT_EQ(*writer[0]->getTxnNumber(), *lsid.getTxnNumber()); + ASSERT(isWouldChangeOwningShardSentinelOplogEntry(*writer[0])); +} + TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForApplyOpsWithoutTxnNumber) { OplogBatch batch; batch.emplace_back(makeApplyOpsForInsert({BSON("_id" << 0)})); diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index 0114da44b14..f396e2d496f 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -798,6 +798,17 @@ Future<void> CheckoutSessionAndInvokeCommand::run() { uassertStatusOK(tenant_migration_access_blocker::handleTenantMigrationConflict( _ecd->getExecutionContext()->getOpCtx(), std::move(status))); }) + .onError<ErrorCodes::WouldChangeOwningShard>([this](Status status) -> Future<void> { + auto wouldChangeOwningShardInfo = status.extraInfo<WouldChangeOwningShardInfo>(); + invariant(wouldChangeOwningShardInfo); + _txnParticipant->handleWouldChangeOwningShardError( + _ecd->getExecutionContext()->getOpCtx(), wouldChangeOwningShardInfo); + _stashTransaction(); + + auto txnResponseMetadata = _txnParticipant->getResponseMetadata(); + txnResponseMetadata.serialize(_ecd->getExtraFieldsBuilder()); + return status; + }) .tapError([this](Status status) { _tapError(status); }) .then([this] { return _commitInvocation(); }); } @@ -967,7 +978,6 @@ void CheckoutSessionAndInvokeCommand::_checkOutSession() { } void CheckoutSessionAndInvokeCommand::_tapError(Status status) { - auto opCtx = _ecd->getExecutionContext()->getOpCtx(); const OperationSessionInfoFromClient& sessionOptions = _ecd->getSessionOptions(); if (status.code() == ErrorCodes::CommandOnShardedViewNotSupportedOnMongod) { // Exceptions are used to resolve views in a sharded cluster, so they should be handled @@ -988,14 +998,6 @@ void CheckoutSessionAndInvokeCommand::_tapError(Status status) { // If this shard has completed an earlier statement for this transaction, it must already be // in the transaction's participant list, so it is guaranteed to learn its outcome. _stashTransaction(); - } else if (status.code() == ErrorCodes::WouldChangeOwningShard) { - auto wouldChangeOwningShardInfo = status.extraInfo<WouldChangeOwningShardInfo>(); - invariant(wouldChangeOwningShardInfo); - _txnParticipant->handleWouldChangeOwningShardError(opCtx, wouldChangeOwningShardInfo); - _stashTransaction(); - - auto txnResponseMetadata = _txnParticipant->getResponseMetadata(); - txnResponseMetadata.serialize(_ecd->getExtraFieldsBuilder()); } } diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index 78864240462..b22770a4416 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -62,6 +62,7 @@ #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/retryable_writes_stats.h" +#include "mongo/db/s/sharding_write_router.h" #include "mongo/db/server_recovery.h" #include "mongo/db/server_transactions_metrics.h" #include "mongo/db/stats/fill_locker_info.h" @@ -71,6 +72,7 @@ #include "mongo/db/txn_retry_counter_too_old_info.h" #include "mongo/db/vector_clock_mutable.h" #include "mongo/logv2/log.h" +#include "mongo/s/grid.h" #include "mongo/s/would_change_owning_shard_exception.h" #include "mongo/util/fail_point.h" #include "mongo/util/log_with_sampling.h" @@ -3121,6 +3123,21 @@ void TransactionParticipant::Participant::handleWouldChangeOwningShardError( repl::ReplOperation operation; operation.setOpType(repl::OpTypeEnum::kNoop); operation.setObject(kWouldChangeOwningShardSentinel); + // Set the "o2" field to differentiate between a WouldChangeOwningShard noop oplog entry + // written while handling a WouldChangeOwningShard error and a noop oplog entry with + // {"o": {$wouldChangeOwningShard: 1}} written by an external client through the + // appendOplogNote command. + operation.setObject2(BSONObj()); + + // Required by chunk migration and resharding. + invariant(wouldChangeOwningShardInfo->getNs()); + invariant(wouldChangeOwningShardInfo->getUuid()); + operation.setNss(*wouldChangeOwningShardInfo->getNs()); + operation.setUuid(*wouldChangeOwningShardInfo->getUuid()); + ShardingWriteRouter shardingWriteRouter( + opCtx, *wouldChangeOwningShardInfo->getNs(), Grid::get(opCtx)->catalogCache()); + operation.setDestinedRecipient(shardingWriteRouter.getReshardingDestinedRecipient( + wouldChangeOwningShardInfo->getPreImage())); // Required by chunk migration. invariant(wouldChangeOwningShardInfo->getNs()); diff --git a/src/mongo/s/would_change_owning_shard_exception.cpp b/src/mongo/s/would_change_owning_shard_exception.cpp index ac131fb9440..eec555187ad 100644 --- a/src/mongo/s/would_change_owning_shard_exception.cpp +++ b/src/mongo/s/would_change_owning_shard_exception.cpp @@ -59,6 +59,7 @@ WouldChangeOwningShardInfo WouldChangeOwningShardInfo::parseFromCommandError(con return WouldChangeOwningShardInfo(obj[kPreImage].Obj().getOwned(), obj[kPostImage].Obj().getOwned(), obj[kShouldUpsert].Bool(), + boost::none, boost::none); } diff --git a/src/mongo/s/would_change_owning_shard_exception.h b/src/mongo/s/would_change_owning_shard_exception.h index 3468e062586..1d6333b285d 100644 --- a/src/mongo/s/would_change_owning_shard_exception.h +++ b/src/mongo/s/would_change_owning_shard_exception.h @@ -50,11 +50,13 @@ public: explicit WouldChangeOwningShardInfo(const BSONObj& preImage, const BSONObj& postImage, const bool shouldUpsert, - boost::optional<NamespaceString> ns) + boost::optional<NamespaceString> ns, + boost::optional<UUID> uuid) : _preImage(preImage.getOwned()), _postImage(postImage.getOwned()), _shouldUpsert(shouldUpsert), - _ns(ns) {} + _ns(ns), + _uuid(uuid) {} const auto& getPreImage() const { return _preImage; @@ -72,6 +74,10 @@ public: return _ns; } + const auto& getUuid() const { + return _uuid; + } + BSONObj toBSON() const { BSONObjBuilder bob; serialize(&bob); @@ -95,6 +101,10 @@ private: // The namespace of the collection containing the document. Does not get serialized into the // BSONObj for this error. boost::optional<NamespaceString> _ns; + + // The uuid of collection containing the document. Does not get serialized into the BSONObj for + // this error. + boost::optional<UUID> _uuid; }; using WouldChangeOwningShardException = ExceptionFor<ErrorCodes::WouldChangeOwningShard>; |