From 8e7ad370f7d35501a9fdd563456b361cbb133d86 Mon Sep 17 00:00:00 2001 From: Daniel Gottlieb Date: Tue, 24 Nov 2020 20:56:33 -0500 Subject: SERVER-52978: Correct ordered batch inserts code path for document validation errors. --- .../db/commands/write_commands/write_commands.cpp | 6 +- src/mongo/db/ops/write_ops_exec.cpp | 6 ++ src/mongo/db/ops/write_ops_retryability_test.cpp | 65 ++++++++++++++++++++++ 3 files changed, 74 insertions(+), 3 deletions(-) diff --git a/src/mongo/db/commands/write_commands/write_commands.cpp b/src/mongo/db/commands/write_commands/write_commands.cpp index f6ded3a257f..8b53fe99b84 100644 --- a/src/mongo/db/commands/write_commands/write_commands.cpp +++ b/src/mongo/db/commands/write_commands/write_commands.cpp @@ -233,7 +233,7 @@ void serializeReply(OperationContext* opCtx, } } - long long n = 0; + long long nVal = 0; long long nModified = 0; std::vector upsertInfo; std::vector errors; @@ -255,7 +255,7 @@ void serializeReply(OperationContext* opCtx, for (size_t i = 0; i < result.results.size(); i++) { if (result.results[i].isOK()) { const auto& opResult = result.results[i].getValue(); - n += opResult.getN(); // Always there. + nVal += opResult.getN(); // Always there. if (replyStyle == ReplyStyle::kUpdate) { nModified += opResult.getNModified(); if (auto idElement = opResult.getUpsertedId().firstElement()) { @@ -325,7 +325,7 @@ void serializeReply(OperationContext* opCtx, errors.push_back(error.obj()); } - out->appendNumber("n", n); + out->appendNumber("n", nVal); if (replyStyle == ReplyStyle::kUpdate) { out->appendNumber("nModified", nModified); diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index c6185c78325..7e9856776cb 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -594,6 +594,8 @@ WriteResult performInserts(OperationContext* opCtx, batch.clear(); // We won't need the current batch any more. bytesInBatch = 0; + // If the batch had an error and decides to not continue, do not process a current doc that + // was unsuccessfully "fixed" or an already executed retryable write. if (!canContinue) break; @@ -610,6 +612,10 @@ WriteResult performInserts(OperationContext* opCtx, canContinue = handleError( opCtx, ex, wholeOp.getNamespace(), wholeOp.getWriteCommandBase(), &out); } + + if (!canContinue) { + break; + } } else if (wasAlreadyExecuted) { containsRetry = true; RetryableWritesStats::get(opCtx)->incrementRetriedStatementsCount(); diff --git a/src/mongo/db/ops/write_ops_retryability_test.cpp b/src/mongo/db/ops/write_ops_retryability_test.cpp index 07cd5600dd8..fcdef0c6845 100644 --- a/src/mongo/db/ops/write_ops_retryability_test.cpp +++ b/src/mongo/db/ops/write_ops_retryability_test.cpp @@ -267,6 +267,71 @@ TEST_F(WriteOpsRetryability, PerformOrderedInsertsStopsAtError) { ASSERT_EQ(ErrorCodes::DuplicateKey, result.results[1].getStatus()); } +TEST_F(WriteOpsRetryability, PerformOrderedInsertsStopsAtBadDoc) { + auto opCtxRaii = makeOperationContext(); + opCtxRaii->setLogicalSessionId({UUID::gen(), {}}); + OperationContextSession session(opCtxRaii.get()); + // Use an unreplicated write block to avoid setting up more structures. + repl::UnreplicatedWritesBlock unreplicated(opCtxRaii.get()); + setUpReplication(getServiceContext()); + + write_ops::Insert insertOp(NamespaceString("foo.bar")); + insertOp.getWriteCommandBase().setOrdered(true); + + // Setup documents such that the second cannot be successfully inserted. + auto largeBuffer = [](std::int32_t size) { + std::vector buffer(size); + DataRange bufferRange(&buffer.front(), &buffer.back()); + ASSERT_OK(bufferRange.writeNoThrow(LittleEndian(size))); + + return buffer; + }(17 * 1024 * 1024); + + insertOp.setDocuments({BSON("_id" << 0), + BSONObj(largeBuffer.data(), BSONObj::LargeSizeTrait{}), + BSON("_id" << 2)}); + write_ops_exec::WriteResult result = write_ops_exec::performInserts(opCtxRaii.get(), insertOp); + + // Assert that the third write is not attempted because this is an ordered insert. + ASSERT_EQ(2, result.results.size()); + ASSERT_TRUE(result.results[0].isOK()); + ASSERT_FALSE(result.results[1].isOK()); + ASSERT_EQ(ErrorCodes::BadValue, result.results[1].getStatus()); +} + +TEST_F(WriteOpsRetryability, PerformUnorderedInsertsContinuesAtBadDoc) { + auto opCtxRaii = makeOperationContext(); + opCtxRaii->setLogicalSessionId({UUID::gen(), {}}); + OperationContextSession session(opCtxRaii.get()); + // Use an unreplicated write block to avoid setting up more structures. + repl::UnreplicatedWritesBlock unreplicated(opCtxRaii.get()); + setUpReplication(getServiceContext()); + + write_ops::Insert insertOp(NamespaceString("foo.bar")); + insertOp.getWriteCommandBase().setOrdered(false); + + // Setup documents such that the second cannot be successfully inserted. + auto largeBuffer = [](std::int32_t size) { + std::vector buffer(size); + DataRange bufferRange(&buffer.front(), &buffer.back()); + ASSERT_OK(bufferRange.writeNoThrow(LittleEndian(size))); + + return buffer; + }(17 * 1024 * 1024); + + insertOp.setDocuments({BSON("_id" << 0), + BSONObj(largeBuffer.data(), BSONObj::LargeSizeTrait{}), + BSON("_id" << 2)}); + write_ops_exec::WriteResult result = write_ops_exec::performInserts(opCtxRaii.get(), insertOp); + + // Assert that the third write is attempted because this is an unordered insert. + ASSERT_EQ(3, result.results.size()); + ASSERT_TRUE(result.results[0].isOK()); + ASSERT_FALSE(result.results[1].isOK()); + ASSERT_TRUE(result.results[2].isOK()); + ASSERT_EQ(ErrorCodes::BadValue, result.results[1].getStatus()); +} + class FindAndModifyRetryability : public MockReplCoordServerFixture { public: -- cgit v1.2.1