summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorDaniel Gottlieb <daniel.gottlieb@mongodb.com>2020-11-16 11:40:59 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-11-16 18:24:56 +0000
commite1e8ea257b835c4d8e130205a2fd8297bcccbd47 (patch)
treef6c9a71edae85041e872a6fda83bd2a74a8e47e1 /src/mongo/db
parent051793654f08e39b6ed3b7d4a6ca78553f537192 (diff)
downloadmongo-e1e8ea257b835c4d8e130205a2fd8297bcccbd47.tar.gz
SERVER-52667: Correct performInserts batching.
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/ops/SConscript2
-rw-r--r--src/mongo/db/ops/write_ops_exec.cpp35
-rw-r--r--src/mongo/db/ops/write_ops_retryability_test.cpp112
-rw-r--r--src/mongo/db/transaction_participant.cpp8
-rw-r--r--src/mongo/db/transaction_participant.h2
5 files changed, 144 insertions, 15 deletions
diff --git a/src/mongo/db/ops/SConscript b/src/mongo/db/ops/SConscript
index f1ab9615526..3b7ad76017e 100644
--- a/src/mongo/db/ops/SConscript
+++ b/src/mongo/db/ops/SConscript
@@ -82,7 +82,9 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/db/query_exec',
'$BUILD_DIR/mongo/db/repl/mock_repl_coord_server_fixture',
'$BUILD_DIR/mongo/db/repl/oplog_entry',
+ '$BUILD_DIR/mongo/db/transaction',
'$BUILD_DIR/mongo/db/write_ops',
+ 'write_ops_exec',
'write_ops_parsers',
'write_ops_parsers_test_helpers',
],
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp
index 21960215359..c6185c78325 100644
--- a/src/mongo/db/ops/write_ops_exec.cpp
+++ b/src/mongo/db/ops/write_ops_exec.cpp
@@ -568,25 +568,23 @@ WriteResult performInserts(OperationContext* opCtx,
for (auto&& doc : wholeOp.getDocuments()) {
const bool isLastDoc = (&doc == &wholeOp.getDocuments().back());
auto fixedDoc = fixDocumentForInsert(opCtx, doc);
+ const StmtId stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++);
+ const bool wasAlreadyExecuted = opCtx->getTxnNumber() &&
+ !opCtx->inMultiDocumentTransaction() &&
+ txnParticipant.checkStatementExecutedNoOplogEntryFetch(stmtId);
+
if (!fixedDoc.isOK()) {
// Handled after we insert anything in the batch to be sure we report errors in the
// correct order. In an ordered insert, if one of the docs ahead of us fails, we should
// behave as-if we never got to this document.
+ } else if (wasAlreadyExecuted) {
+ // Similarly, if the insert was already executed as part of a retryable write, flush the
+ // current batch to preserve the error results order.
} else {
- const auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++);
- if (opCtx->getTxnNumber()) {
- if (!opCtx->inMultiDocumentTransaction() &&
- txnParticipant.checkStatementExecutedNoOplogEntryFetch(stmtId)) {
- containsRetry = true;
- RetryableWritesStats::get(opCtx)->incrementRetriedStatementsCount();
- out.results.emplace_back(makeWriteResultForInsertOrDeleteRetry());
- continue;
- }
- }
-
BSONObj toInsert = fixedDoc.getValue().isEmpty() ? doc : std::move(fixedDoc.getValue());
batch.emplace_back(stmtId, toInsert);
bytesInBatch += batch.back().doc.objsize();
+
if (!isLastDoc && batch.size() < maxBatchSize && bytesInBatch < maxBatchBytes)
continue; // Add more to batch before inserting.
}
@@ -596,7 +594,12 @@ WriteResult performInserts(OperationContext* opCtx,
batch.clear(); // We won't need the current batch any more.
bytesInBatch = 0;
- if (canContinue && !fixedDoc.isOK()) {
+ if (!canContinue)
+ break;
+
+ // Revisit any conditions that may have caused the batch to be flushed. In those cases,
+ // append the appropriate result to the output.
+ if (!fixedDoc.isOK()) {
globalOpCounters.gotInsert();
ServerWriteConcernMetrics::get(opCtx)->recordWriteConcernForInsert(
opCtx->getWriteConcern());
@@ -607,11 +610,13 @@ WriteResult performInserts(OperationContext* opCtx,
canContinue = handleError(
opCtx, ex, wholeOp.getNamespace(), wholeOp.getWriteCommandBase(), &out);
}
+ } else if (wasAlreadyExecuted) {
+ containsRetry = true;
+ RetryableWritesStats::get(opCtx)->incrementRetriedStatementsCount();
+ out.results.emplace_back(makeWriteResultForInsertOrDeleteRetry());
}
-
- if (!canContinue)
- break;
}
+ invariant(batch.empty());
return out;
}
diff --git a/src/mongo/db/ops/write_ops_retryability_test.cpp b/src/mongo/db/ops/write_ops_retryability_test.cpp
index a6912da2099..07cd5600dd8 100644
--- a/src/mongo/db/ops/write_ops_retryability_test.cpp
+++ b/src/mongo/db/ops/write_ops_retryability_test.cpp
@@ -30,14 +30,22 @@
#include "mongo/platform/basic.h"
#include "mongo/bson/bsonmisc.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/db/dbdirectclient.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/ops/write_ops.h"
+#include "mongo/db/ops/write_ops_exec.h"
+#include "mongo/db/ops/write_ops_gen.h"
#include "mongo/db/ops/write_ops_retryability.h"
#include "mongo/db/query/find_and_modify_request.h"
#include "mongo/db/repl/mock_repl_coord_server_fixture.h"
#include "mongo/db/repl/oplog_entry.h"
+#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/service_context.h"
#include "mongo/db/service_context_d_test_fixture.h"
+#include "mongo/db/transaction_participant.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
@@ -79,6 +87,18 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
boost::none); // _id
}
+void setUpReplication(ServiceContext* svcCtx) {
+ auto replMock = std::make_unique<repl::ReplicationCoordinatorMock>(svcCtx);
+ replMock->alwaysAllowWrites(true);
+ repl::ReplicationCoordinator::set(svcCtx, std::move(replMock));
+}
+
+void setUpTxnParticipant(OperationContext* opCtx, std::vector<int> executedStmtIds) {
+ opCtx->setTxnNumber(1);
+ auto txnPart = TransactionParticipant::get(opCtx);
+ txnPart.setCommittedStmtIdsForTest(std::move(executedStmtIds));
+}
+
TEST_F(WriteOpsRetryability, ParseOplogEntryForUpdate) {
const auto entry = assertGet(repl::OplogEntry::parse(
BSON("ts" << Timestamp(50, 10) << "t" << 1LL << "op"
@@ -156,6 +176,98 @@ TEST_F(WriteOpsRetryability, ShouldFailIfParsingDeleteOplogForUpdate) {
ASSERT_THROWS(parseOplogEntryForUpdate(deleteOplog), AssertionException);
}
+TEST_F(WriteOpsRetryability, PerformInsertsSuccess) {
+ auto opCtxRaii = makeOperationContext();
+ // Use an unreplicated write block to avoid setting up more structures.
+ repl::UnreplicatedWritesBlock unreplicated(opCtxRaii.get());
+ setUpReplication(getServiceContext());
+
+ write_ops::Insert insertOp(NamespaceString("foo.bar"));
+ insertOp.getWriteCommandBase().setOrdered(true);
+ insertOp.setDocuments({BSON("_id" << 0), BSON("_id" << 1)});
+ write_ops_exec::WriteResult result = write_ops_exec::performInserts(opCtxRaii.get(), insertOp);
+
+ ASSERT_EQ(2, result.results.size());
+ ASSERT_TRUE(result.results[0].isOK());
+ ASSERT_TRUE(result.results[1].isOK());
+}
+
+TEST_F(WriteOpsRetryability, PerformRetryableInsertsSuccess) {
+ auto opCtxRaii = makeOperationContext();
+ opCtxRaii->setLogicalSessionId({UUID::gen(), {}});
+ OperationContextSession session(opCtxRaii.get());
+ // Use an unreplicated write block to avoid setting up more structures.
+ repl::UnreplicatedWritesBlock unreplicated(opCtxRaii.get());
+ setUpReplication(getServiceContext());
+
+ // Set up a retryable write where statements 1 and 2 have already executed.
+ setUpTxnParticipant(opCtxRaii.get(), {1, 2});
+
+ write_ops::Insert insertOp(NamespaceString("foo.bar"));
+ insertOp.getWriteCommandBase().setOrdered(true);
+ // Setup documents that cannot be successfully inserted to show that the retryable logic was
+ // exercised.
+ insertOp.setDocuments({BSON("_id" << 0), BSON("_id" << 0)});
+ insertOp.getWriteCommandBase().setStmtIds({{1, 2}});
+ write_ops_exec::WriteResult result = write_ops_exec::performInserts(opCtxRaii.get(), insertOp);
+
+ // Assert that both writes "succeeded". While there should have been a duplicate key error, the
+ // `performInserts` obeyed the contract of not re-inserting a document that was declared to have
+ // been inserted.
+ ASSERT_EQ(2, result.results.size());
+ ASSERT_TRUE(result.results[0].isOK());
+ ASSERT_TRUE(result.results[1].isOK());
+}
+
+TEST_F(WriteOpsRetryability, PerformRetryableInsertsWithBatchedFailure) {
+ auto opCtxRaii = makeOperationContext();
+ opCtxRaii->setLogicalSessionId({UUID::gen(), {}});
+ OperationContextSession session(opCtxRaii.get());
+ // Use an unreplicated write block to avoid setting up more structures.
+ repl::UnreplicatedWritesBlock unreplicated(opCtxRaii.get());
+ setUpReplication(getServiceContext());
+
+ // Set up a retryable write where statement 3 has already executed.
+ setUpTxnParticipant(opCtxRaii.get(), {3});
+
+ write_ops::Insert insertOp(NamespaceString("foo.bar"));
+ insertOp.getWriteCommandBase().setOrdered(false);
+ // Setup documents such that the second will fail insertion.
+ insertOp.setDocuments({BSON("_id" << 0), BSON("_id" << 0), BSON("_id" << 1)});
+ insertOp.getWriteCommandBase().setStmtIds({{1, 2, 3}});
+ write_ops_exec::WriteResult result = write_ops_exec::performInserts(opCtxRaii.get(), insertOp);
+
+ // Assert that the third (already executed) write succeeds, despite the second write failing
+ // because this is an unordered insert.
+ ASSERT_EQ(3, result.results.size());
+ ASSERT_TRUE(result.results[0].isOK());
+ ASSERT_FALSE(result.results[1].isOK());
+ ASSERT_EQ(ErrorCodes::DuplicateKey, result.results[1].getStatus());
+ ASSERT_TRUE(result.results[2].isOK());
+}
+
+TEST_F(WriteOpsRetryability, PerformOrderedInsertsStopsAtError) {
+ auto opCtxRaii = makeOperationContext();
+ opCtxRaii->setLogicalSessionId({UUID::gen(), {}});
+ OperationContextSession session(opCtxRaii.get());
+ // Use an unreplicated write block to avoid setting up more structures.
+ repl::UnreplicatedWritesBlock unreplicated(opCtxRaii.get());
+ setUpReplication(getServiceContext());
+
+ write_ops::Insert insertOp(NamespaceString("foo.bar"));
+ insertOp.getWriteCommandBase().setOrdered(true);
+ // Setup documents such that the second cannot be successfully inserted.
+ insertOp.setDocuments({BSON("_id" << 0), BSON("_id" << 0), BSON("_id" << 1)});
+ write_ops_exec::WriteResult result = write_ops_exec::performInserts(opCtxRaii.get(), insertOp);
+
+ // Assert that the third write is not attempted because this is an ordered insert.
+ ASSERT_EQ(2, result.results.size());
+ ASSERT_TRUE(result.results[0].isOK());
+ ASSERT_FALSE(result.results[1].isOK());
+ ASSERT_EQ(ErrorCodes::DuplicateKey, result.results[1].getStatus());
+}
+
+
class FindAndModifyRetryability : public MockReplCoordServerFixture {
public:
FindAndModifyRetryability() = default;
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index e0d63e8ebac..f7b2423c48a 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -2375,6 +2375,14 @@ UpdateRequest TransactionParticipant::Participant::_makeUpdateRequest(
return updateRequest;
}
+void TransactionParticipant::Participant::setCommittedStmtIdsForTest(
+ std::vector<int> stmtIdsCommitted) {
+ p().isValid = true;
+ for (auto stmtId : stmtIdsCommitted) {
+ p().activeTxnCommittedStatements.emplace(stmtId, repl::OpTime());
+ }
+}
+
void TransactionParticipant::Participant::_registerUpdateCacheOnCommit(
OperationContext* opCtx,
std::vector<StmtId> stmtIdsWritten,
diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h
index 0bda26495db..26579b7bf99 100644
--- a/src/mongo/db/transaction_participant.h
+++ b/src/mongo/db/transaction_participant.h
@@ -671,6 +671,8 @@ public:
o(lk).txnState.transitionTo(TransactionState::kAbortedWithPrepare);
}
+ void setCommittedStmtIdsForTest(std::vector<int> stmtIdsCommitted);
+
private:
boost::optional<repl::OpTime> _checkStatementExecuted(StmtId stmtId) const;