summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Gottlieb <daniel.gottlieb@mongodb.com>2020-11-24 20:56:33 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-11-25 02:47:05 +0000
commit8e7ad370f7d35501a9fdd563456b361cbb133d86 (patch)
tree914b7f742215aac4fb7fc73119303e807c1ced51
parentcda3a52701fe4143b06bd981b98514e69d0a93eb (diff)
downloadmongo-8e7ad370f7d35501a9fdd563456b361cbb133d86.tar.gz
SERVER-52978: Correct ordered batch inserts code path for document validation errors.
-rw-r--r--src/mongo/db/commands/write_commands/write_commands.cpp6
-rw-r--r--src/mongo/db/ops/write_ops_exec.cpp6
-rw-r--r--src/mongo/db/ops/write_ops_retryability_test.cpp65
3 files changed, 74 insertions, 3 deletions
diff --git a/src/mongo/db/commands/write_commands/write_commands.cpp b/src/mongo/db/commands/write_commands/write_commands.cpp
index f6ded3a257f..8b53fe99b84 100644
--- a/src/mongo/db/commands/write_commands/write_commands.cpp
+++ b/src/mongo/db/commands/write_commands/write_commands.cpp
@@ -233,7 +233,7 @@ void serializeReply(OperationContext* opCtx,
}
}
- long long n = 0;
+ long long nVal = 0;
long long nModified = 0;
std::vector<BSONObj> upsertInfo;
std::vector<BSONObj> errors;
@@ -255,7 +255,7 @@ void serializeReply(OperationContext* opCtx,
for (size_t i = 0; i < result.results.size(); i++) {
if (result.results[i].isOK()) {
const auto& opResult = result.results[i].getValue();
- n += opResult.getN(); // Always there.
+ nVal += opResult.getN(); // Always there.
if (replyStyle == ReplyStyle::kUpdate) {
nModified += opResult.getNModified();
if (auto idElement = opResult.getUpsertedId().firstElement()) {
@@ -325,7 +325,7 @@ void serializeReply(OperationContext* opCtx,
errors.push_back(error.obj());
}
- out->appendNumber("n", n);
+ out->appendNumber("n", nVal);
if (replyStyle == ReplyStyle::kUpdate) {
out->appendNumber("nModified", nModified);
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp
index c6185c78325..7e9856776cb 100644
--- a/src/mongo/db/ops/write_ops_exec.cpp
+++ b/src/mongo/db/ops/write_ops_exec.cpp
@@ -594,6 +594,8 @@ WriteResult performInserts(OperationContext* opCtx,
batch.clear(); // We won't need the current batch any more.
bytesInBatch = 0;
+ // If the batch had an error and decides to not continue, do not process a current doc that
+ // was unsuccessfully "fixed" or an already executed retryable write.
if (!canContinue)
break;
@@ -610,6 +612,10 @@ WriteResult performInserts(OperationContext* opCtx,
canContinue = handleError(
opCtx, ex, wholeOp.getNamespace(), wholeOp.getWriteCommandBase(), &out);
}
+
+ if (!canContinue) {
+ break;
+ }
} else if (wasAlreadyExecuted) {
containsRetry = true;
RetryableWritesStats::get(opCtx)->incrementRetriedStatementsCount();
diff --git a/src/mongo/db/ops/write_ops_retryability_test.cpp b/src/mongo/db/ops/write_ops_retryability_test.cpp
index 07cd5600dd8..fcdef0c6845 100644
--- a/src/mongo/db/ops/write_ops_retryability_test.cpp
+++ b/src/mongo/db/ops/write_ops_retryability_test.cpp
@@ -267,6 +267,71 @@ TEST_F(WriteOpsRetryability, PerformOrderedInsertsStopsAtError) {
ASSERT_EQ(ErrorCodes::DuplicateKey, result.results[1].getStatus());
}
+TEST_F(WriteOpsRetryability, PerformOrderedInsertsStopsAtBadDoc) {
+ auto opCtxRaii = makeOperationContext();
+ opCtxRaii->setLogicalSessionId({UUID::gen(), {}});
+ OperationContextSession session(opCtxRaii.get());
+ // Use an unreplicated write block to avoid setting up more structures.
+ repl::UnreplicatedWritesBlock unreplicated(opCtxRaii.get());
+ setUpReplication(getServiceContext());
+
+ write_ops::Insert insertOp(NamespaceString("foo.bar"));
+ insertOp.getWriteCommandBase().setOrdered(true);
+
+ // Setup documents such that the second cannot be successfully inserted.
+ auto largeBuffer = [](std::int32_t size) {
+ std::vector<char> buffer(size);
+ DataRange bufferRange(&buffer.front(), &buffer.back());
+ ASSERT_OK(bufferRange.writeNoThrow(LittleEndian<int32_t>(size)));
+
+ return buffer;
+ }(17 * 1024 * 1024);
+
+ insertOp.setDocuments({BSON("_id" << 0),
+ BSONObj(largeBuffer.data(), BSONObj::LargeSizeTrait{}),
+ BSON("_id" << 2)});
+ write_ops_exec::WriteResult result = write_ops_exec::performInserts(opCtxRaii.get(), insertOp);
+
+ // Assert that the third write is not attempted because this is an ordered insert.
+ ASSERT_EQ(2, result.results.size());
+ ASSERT_TRUE(result.results[0].isOK());
+ ASSERT_FALSE(result.results[1].isOK());
+ ASSERT_EQ(ErrorCodes::BadValue, result.results[1].getStatus());
+}
+
+TEST_F(WriteOpsRetryability, PerformUnorderedInsertsContinuesAtBadDoc) {
+ auto opCtxRaii = makeOperationContext();
+ opCtxRaii->setLogicalSessionId({UUID::gen(), {}});
+ OperationContextSession session(opCtxRaii.get());
+ // Use an unreplicated write block to avoid setting up more structures.
+ repl::UnreplicatedWritesBlock unreplicated(opCtxRaii.get());
+ setUpReplication(getServiceContext());
+
+ write_ops::Insert insertOp(NamespaceString("foo.bar"));
+ insertOp.getWriteCommandBase().setOrdered(false);
+
+ // Setup documents such that the second cannot be successfully inserted.
+ auto largeBuffer = [](std::int32_t size) {
+ std::vector<char> buffer(size);
+ DataRange bufferRange(&buffer.front(), &buffer.back());
+ ASSERT_OK(bufferRange.writeNoThrow(LittleEndian<int32_t>(size)));
+
+ return buffer;
+ }(17 * 1024 * 1024);
+
+ insertOp.setDocuments({BSON("_id" << 0),
+ BSONObj(largeBuffer.data(), BSONObj::LargeSizeTrait{}),
+ BSON("_id" << 2)});
+ write_ops_exec::WriteResult result = write_ops_exec::performInserts(opCtxRaii.get(), insertOp);
+
+ // Assert that the third write is attempted because this is an unordered insert.
+ ASSERT_EQ(3, result.results.size());
+ ASSERT_TRUE(result.results[0].isOK());
+ ASSERT_FALSE(result.results[1].isOK());
+ ASSERT_TRUE(result.results[2].isOK());
+ ASSERT_EQ(ErrorCodes::BadValue, result.results[1].getStatus());
+}
+
class FindAndModifyRetryability : public MockReplCoordServerFixture {
public: