diff options
author | Daniel Gottlieb <daniel.gottlieb@mongodb.com> | 2020-11-16 11:40:59 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-11-16 18:24:56 +0000 |
commit | e1e8ea257b835c4d8e130205a2fd8297bcccbd47 (patch) | |
tree | f6c9a71edae85041e872a6fda83bd2a74a8e47e1 /src/mongo/db/ops | |
parent | 051793654f08e39b6ed3b7d4a6ca78553f537192 (diff) | |
download | mongo-e1e8ea257b835c4d8e130205a2fd8297bcccbd47.tar.gz |
SERVER-52667: Correct performInserts batching.
Diffstat (limited to 'src/mongo/db/ops')
-rw-r--r-- | src/mongo/db/ops/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_exec.cpp | 35 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_retryability_test.cpp | 112 |
3 files changed, 134 insertions, 15 deletions
diff --git a/src/mongo/db/ops/SConscript b/src/mongo/db/ops/SConscript index f1ab9615526..3b7ad76017e 100644 --- a/src/mongo/db/ops/SConscript +++ b/src/mongo/db/ops/SConscript @@ -82,7 +82,9 @@ env.CppUnitTest( '$BUILD_DIR/mongo/db/query_exec', '$BUILD_DIR/mongo/db/repl/mock_repl_coord_server_fixture', '$BUILD_DIR/mongo/db/repl/oplog_entry', + '$BUILD_DIR/mongo/db/transaction', '$BUILD_DIR/mongo/db/write_ops', + 'write_ops_exec', 'write_ops_parsers', 'write_ops_parsers_test_helpers', ], diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index 21960215359..c6185c78325 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -568,25 +568,23 @@ WriteResult performInserts(OperationContext* opCtx, for (auto&& doc : wholeOp.getDocuments()) { const bool isLastDoc = (&doc == &wholeOp.getDocuments().back()); auto fixedDoc = fixDocumentForInsert(opCtx, doc); + const StmtId stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++); + const bool wasAlreadyExecuted = opCtx->getTxnNumber() && + !opCtx->inMultiDocumentTransaction() && + txnParticipant.checkStatementExecutedNoOplogEntryFetch(stmtId); + if (!fixedDoc.isOK()) { // Handled after we insert anything in the batch to be sure we report errors in the // correct order. In an ordered insert, if one of the docs ahead of us fails, we should // behave as-if we never got to this document. + } else if (wasAlreadyExecuted) { + // Similarly, if the insert was already executed as part of a retryable write, flush the + // current batch to preserve the error results order. } else { - const auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++); - if (opCtx->getTxnNumber()) { - if (!opCtx->inMultiDocumentTransaction() && - txnParticipant.checkStatementExecutedNoOplogEntryFetch(stmtId)) { - containsRetry = true; - RetryableWritesStats::get(opCtx)->incrementRetriedStatementsCount(); - out.results.emplace_back(makeWriteResultForInsertOrDeleteRetry()); - continue; - } - } - BSONObj toInsert = fixedDoc.getValue().isEmpty() ? doc : std::move(fixedDoc.getValue()); batch.emplace_back(stmtId, toInsert); bytesInBatch += batch.back().doc.objsize(); + if (!isLastDoc && batch.size() < maxBatchSize && bytesInBatch < maxBatchBytes) continue; // Add more to batch before inserting. } @@ -596,7 +594,12 @@ WriteResult performInserts(OperationContext* opCtx, batch.clear(); // We won't need the current batch any more. bytesInBatch = 0; - if (canContinue && !fixedDoc.isOK()) { + if (!canContinue) + break; + + // Revisit any conditions that may have caused the batch to be flushed. In those cases, + // append the appropriate result to the output. + if (!fixedDoc.isOK()) { globalOpCounters.gotInsert(); ServerWriteConcernMetrics::get(opCtx)->recordWriteConcernForInsert( opCtx->getWriteConcern()); @@ -607,11 +610,13 @@ WriteResult performInserts(OperationContext* opCtx, canContinue = handleError( opCtx, ex, wholeOp.getNamespace(), wholeOp.getWriteCommandBase(), &out); } + } else if (wasAlreadyExecuted) { + containsRetry = true; + RetryableWritesStats::get(opCtx)->incrementRetriedStatementsCount(); + out.results.emplace_back(makeWriteResultForInsertOrDeleteRetry()); } - - if (!canContinue) - break; } + invariant(batch.empty()); return out; } diff --git a/src/mongo/db/ops/write_ops_retryability_test.cpp b/src/mongo/db/ops/write_ops_retryability_test.cpp index a6912da2099..07cd5600dd8 100644 --- a/src/mongo/db/ops/write_ops_retryability_test.cpp +++ b/src/mongo/db/ops/write_ops_retryability_test.cpp @@ -30,14 +30,22 @@ #include "mongo/platform/basic.h" #include "mongo/bson/bsonmisc.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/db/dbdirectclient.h" #include "mongo/db/namespace_string.h" #include "mongo/db/ops/write_ops.h" +#include "mongo/db/ops/write_ops_exec.h" +#include "mongo/db/ops/write_ops_gen.h" #include "mongo/db/ops/write_ops_retryability.h" #include "mongo/db/query/find_and_modify_request.h" #include "mongo/db/repl/mock_repl_coord_server_fixture.h" #include "mongo/db/repl/oplog_entry.h" +#include "mongo/db/repl/repl_client_info.h" +#include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/service_context.h" #include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/db/transaction_participant.h" #include "mongo/unittest/unittest.h" namespace mongo { @@ -79,6 +87,18 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime, boost::none); // _id } +void setUpReplication(ServiceContext* svcCtx) { + auto replMock = std::make_unique<repl::ReplicationCoordinatorMock>(svcCtx); + replMock->alwaysAllowWrites(true); + repl::ReplicationCoordinator::set(svcCtx, std::move(replMock)); +} + +void setUpTxnParticipant(OperationContext* opCtx, std::vector<int> executedStmtIds) { + opCtx->setTxnNumber(1); + auto txnPart = TransactionParticipant::get(opCtx); + txnPart.setCommittedStmtIdsForTest(std::move(executedStmtIds)); +} + TEST_F(WriteOpsRetryability, ParseOplogEntryForUpdate) { const auto entry = assertGet(repl::OplogEntry::parse( BSON("ts" << Timestamp(50, 10) << "t" << 1LL << "op" @@ -156,6 +176,98 @@ TEST_F(WriteOpsRetryability, ShouldFailIfParsingDeleteOplogForUpdate) { ASSERT_THROWS(parseOplogEntryForUpdate(deleteOplog), AssertionException); } +TEST_F(WriteOpsRetryability, PerformInsertsSuccess) { + auto opCtxRaii = makeOperationContext(); + // Use an unreplicated write block to avoid setting up more structures. + repl::UnreplicatedWritesBlock unreplicated(opCtxRaii.get()); + setUpReplication(getServiceContext()); + + write_ops::Insert insertOp(NamespaceString("foo.bar")); + insertOp.getWriteCommandBase().setOrdered(true); + insertOp.setDocuments({BSON("_id" << 0), BSON("_id" << 1)}); + write_ops_exec::WriteResult result = write_ops_exec::performInserts(opCtxRaii.get(), insertOp); + + ASSERT_EQ(2, result.results.size()); + ASSERT_TRUE(result.results[0].isOK()); + ASSERT_TRUE(result.results[1].isOK()); +} + +TEST_F(WriteOpsRetryability, PerformRetryableInsertsSuccess) { + auto opCtxRaii = makeOperationContext(); + opCtxRaii->setLogicalSessionId({UUID::gen(), {}}); + OperationContextSession session(opCtxRaii.get()); + // Use an unreplicated write block to avoid setting up more structures. + repl::UnreplicatedWritesBlock unreplicated(opCtxRaii.get()); + setUpReplication(getServiceContext()); + + // Set up a retryable write where statements 1 and 2 have already executed. + setUpTxnParticipant(opCtxRaii.get(), {1, 2}); + + write_ops::Insert insertOp(NamespaceString("foo.bar")); + insertOp.getWriteCommandBase().setOrdered(true); + // Setup documents that cannot be successfully inserted to show that the retryable logic was + // exercised. + insertOp.setDocuments({BSON("_id" << 0), BSON("_id" << 0)}); + insertOp.getWriteCommandBase().setStmtIds({{1, 2}}); + write_ops_exec::WriteResult result = write_ops_exec::performInserts(opCtxRaii.get(), insertOp); + + // Assert that both writes "succeeded". While there should have been a duplicate key error, the + // `performInserts` obeyed the contract of not re-inserting a document that was declared to have + // been inserted. + ASSERT_EQ(2, result.results.size()); + ASSERT_TRUE(result.results[0].isOK()); + ASSERT_TRUE(result.results[1].isOK()); +} + +TEST_F(WriteOpsRetryability, PerformRetryableInsertsWithBatchedFailure) { + auto opCtxRaii = makeOperationContext(); + opCtxRaii->setLogicalSessionId({UUID::gen(), {}}); + OperationContextSession session(opCtxRaii.get()); + // Use an unreplicated write block to avoid setting up more structures. + repl::UnreplicatedWritesBlock unreplicated(opCtxRaii.get()); + setUpReplication(getServiceContext()); + + // Set up a retryable write where statement 3 has already executed. + setUpTxnParticipant(opCtxRaii.get(), {3}); + + write_ops::Insert insertOp(NamespaceString("foo.bar")); + insertOp.getWriteCommandBase().setOrdered(false); + // Setup documents such that the second will fail insertion. + insertOp.setDocuments({BSON("_id" << 0), BSON("_id" << 0), BSON("_id" << 1)}); + insertOp.getWriteCommandBase().setStmtIds({{1, 2, 3}}); + write_ops_exec::WriteResult result = write_ops_exec::performInserts(opCtxRaii.get(), insertOp); + + // Assert that the third (already executed) write succeeds, despite the second write failing + // because this is an unordered insert. + ASSERT_EQ(3, result.results.size()); + ASSERT_TRUE(result.results[0].isOK()); + ASSERT_FALSE(result.results[1].isOK()); + ASSERT_EQ(ErrorCodes::DuplicateKey, result.results[1].getStatus()); + ASSERT_TRUE(result.results[2].isOK()); +} + +TEST_F(WriteOpsRetryability, PerformOrderedInsertsStopsAtError) { + auto opCtxRaii = makeOperationContext(); + opCtxRaii->setLogicalSessionId({UUID::gen(), {}}); + OperationContextSession session(opCtxRaii.get()); + // Use an unreplicated write block to avoid setting up more structures. + repl::UnreplicatedWritesBlock unreplicated(opCtxRaii.get()); + setUpReplication(getServiceContext()); + + write_ops::Insert insertOp(NamespaceString("foo.bar")); + insertOp.getWriteCommandBase().setOrdered(true); + // Setup documents such that the second cannot be successfully inserted. + insertOp.setDocuments({BSON("_id" << 0), BSON("_id" << 0), BSON("_id" << 1)}); + write_ops_exec::WriteResult result = write_ops_exec::performInserts(opCtxRaii.get(), insertOp); + + // Assert that the third write is not attempted because this is an ordered insert. + ASSERT_EQ(2, result.results.size()); + ASSERT_TRUE(result.results[0].isOK()); + ASSERT_FALSE(result.results[1].isOK()); + ASSERT_EQ(ErrorCodes::DuplicateKey, result.results[1].getStatus()); +} + + class FindAndModifyRetryability : public MockReplCoordServerFixture { public: FindAndModifyRetryability() = default; |