summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/s/write_ops/batch_write_exec_test.cpp117
-rw-r--r--src/mongo/s/write_ops/batch_write_op.cpp24
-rw-r--r--src/mongo/s/write_ops/batch_write_op.h11
3 files changed, 126 insertions, 26 deletions
diff --git a/src/mongo/s/write_ops/batch_write_exec_test.cpp b/src/mongo/s/write_ops/batch_write_exec_test.cpp
index 09f7f002b99..eb32b7c8204 100644
--- a/src/mongo/s/write_ops/batch_write_exec_test.cpp
+++ b/src/mongo/s/write_ops/batch_write_exec_test.cpp
@@ -95,6 +95,11 @@ public:
}
void expectInsertsReturnSuccess(const std::vector<BSONObj>& expected) {
+ expectInsertsReturnSuccess(expected.begin(), expected.end());
+ }
+
+ void expectInsertsReturnSuccess(std::vector<BSONObj>::const_iterator expectedFrom,
+ std::vector<BSONObj>::const_iterator expectedTo) {
onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) {
ASSERT_EQUALS(nss.db(), request.dbname);
@@ -103,10 +108,11 @@ public:
ASSERT_EQUALS(nss.toString(), actualBatchedInsert.getNS().ns());
const auto& inserted = actualBatchedInsert.getInsertRequest().getDocuments();
- ASSERT_EQUALS(expected.size(), inserted.size());
+ const size_t expectedSize = std::distance(expectedFrom, expectedTo);
+ ASSERT_EQUALS(expectedSize, inserted.size());
auto itInserted = inserted.begin();
- auto itExpected = expected.begin();
+ auto itExpected = expectedFrom;
for (; itInserted != inserted.end(); itInserted++, itExpected++) {
ASSERT_BSONOBJ_EQ(*itExpected, *itInserted);
@@ -114,6 +120,7 @@ public:
BatchedCommandResponse response;
response.setOk(true);
+ response.setN(inserted.size());
return response.toBSON();
});
@@ -209,7 +216,8 @@ TEST_F(BatchWriteExecTest, SingleOp) {
BatchWriteExecStats stats;
BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats);
ASSERT(response.getOk());
- ASSERT_EQUALS(stats.numRounds, 1);
+ ASSERT_EQ(1LL, response.getN());
+ ASSERT_EQ(1, stats.numRounds);
});
expectInsertsReturnSuccess(std::vector<BSONObj>{BSON("x" << 1)});
@@ -217,6 +225,44 @@ TEST_F(BatchWriteExecTest, SingleOp) {
future.timed_get(kFutureTimeout);
}
+TEST_F(BatchWriteExecTest, MultiOpLarge) {
+ const int kNumDocsToInsert = 100'000;
+ const std::string kDocValue(200, 'x');
+
+ std::vector<BSONObj> docsToInsert;
+ docsToInsert.reserve(kNumDocsToInsert);
+ for (int i = 0; i < kNumDocsToInsert; i++) {
+ docsToInsert.push_back(BSON("_id" << i << "someLargeKeyToWasteSpace" << kDocValue));
+ }
+
+ BatchedCommandRequest request([&] {
+ write_ops::Insert insertOp(nss);
+ insertOp.setWriteCommandBase([] {
+ write_ops::WriteCommandBase writeCommandBase;
+ writeCommandBase.setOrdered(true);
+ return writeCommandBase;
+ }());
+ insertOp.setDocuments(docsToInsert);
+ return insertOp;
+ }());
+ request.setWriteConcern(BSONObj());
+
+ auto future = launchAsync([&] {
+ BatchedCommandResponse response;
+ BatchWriteExecStats stats;
+ BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats);
+
+ ASSERT(response.getOk());
+ ASSERT_EQUALS(response.getN(), kNumDocsToInsert);
+ ASSERT_EQUALS(stats.numRounds, 2);
+ });
+
+ expectInsertsReturnSuccess(docsToInsert.begin(), docsToInsert.begin() + 66576);
+ expectInsertsReturnSuccess(docsToInsert.begin() + 66576, docsToInsert.end());
+
+ future.timed_get(kFutureTimeout);
+}
+
TEST_F(BatchWriteExecTest, SingleOpError) {
BatchedCommandResponse errResponse;
errResponse.setOk(false);
@@ -240,13 +286,13 @@ TEST_F(BatchWriteExecTest, SingleOpError) {
BatchWriteExecStats stats;
BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats);
ASSERT(response.getOk());
- ASSERT_EQUALS(response.getN(), 0);
+ ASSERT_EQ(0, response.getN());
ASSERT(response.isErrDetailsSet());
- ASSERT_EQUALS(response.getErrDetailsAt(0)->getErrCode(), errResponse.getErrCode());
+ ASSERT_EQ(errResponse.getErrCode(), response.getErrDetailsAt(0)->getErrCode());
ASSERT(response.getErrDetailsAt(0)->getErrMessage().find(errResponse.getErrMessage()) !=
string::npos);
- ASSERT_EQUALS(stats.numRounds, 1);
+ ASSERT_EQ(1, stats.numRounds);
});
expectInsertsReturnError({BSON("x" << 1)}, errResponse);
@@ -278,7 +324,7 @@ TEST_F(BatchWriteExecTest, StaleOp) {
BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats);
ASSERT(response.getOk());
- ASSERT_EQUALS(stats.numStaleBatches, 1);
+ ASSERT_EQUALS(1, stats.numStaleBatches);
});
const std::vector<BSONObj> expected{BSON("x" << 1)};
@@ -308,7 +354,7 @@ TEST_F(BatchWriteExecTest, MultiStaleOp) {
BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats);
ASSERT(response.getOk());
- ASSERT_EQUALS(stats.numStaleBatches, 3);
+ ASSERT_EQUALS(3, stats.numStaleBatches);
});
const std::vector<BSONObj> expected{BSON("x" << 1)};
@@ -344,7 +390,7 @@ TEST_F(BatchWriteExecTest, TooManyStaleOp) {
BatchWriteExecStats stats;
BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats);
ASSERT(response.getOk());
- ASSERT_EQUALS(response.getN(), 0);
+ ASSERT_EQ(0, response.getN());
ASSERT(response.isErrDetailsSet());
ASSERT_EQUALS(response.getErrDetailsAt(0)->getErrCode(), ErrorCodes::NoProgressMade);
ASSERT_EQUALS(response.getErrDetailsAt(1)->getErrCode(), ErrorCodes::NoProgressMade);
@@ -360,6 +406,49 @@ TEST_F(BatchWriteExecTest, TooManyStaleOp) {
future.timed_get(kFutureTimeout);
}
+TEST_F(BatchWriteExecTest, RetryableWritesLargeBatch) {
+ // A retryable error without a txnNumber is not retried.
+
+ const int kNumDocsToInsert = 100'000;
+ const std::string kDocValue(200, 'x');
+
+ std::vector<BSONObj> docsToInsert;
+ docsToInsert.reserve(kNumDocsToInsert);
+ for (int i = 0; i < kNumDocsToInsert; i++) {
+ docsToInsert.push_back(BSON("_id" << i << "someLargeKeyToWasteSpace" << kDocValue));
+ }
+
+ BatchedCommandRequest request([&] {
+ write_ops::Insert insertOp(nss);
+ insertOp.setWriteCommandBase([] {
+ write_ops::WriteCommandBase writeCommandBase;
+ writeCommandBase.setOrdered(true);
+ return writeCommandBase;
+ }());
+ insertOp.setDocuments(docsToInsert);
+ return insertOp;
+ }());
+ request.setWriteConcern(BSONObj());
+
+ operationContext()->setLogicalSessionId(makeLogicalSessionIdForTest());
+ operationContext()->setTxnNumber(5);
+
+ auto future = launchAsync([&] {
+ BatchedCommandResponse response;
+ BatchWriteExecStats stats;
+ BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats);
+
+ ASSERT(response.getOk());
+ ASSERT_EQUALS(response.getN(), kNumDocsToInsert);
+ ASSERT_EQUALS(stats.numRounds, 2);
+ });
+
+ expectInsertsReturnSuccess(docsToInsert.begin(), docsToInsert.begin() + 63791);
+ expectInsertsReturnSuccess(docsToInsert.begin() + 63791, docsToInsert.end());
+
+ future.timed_get(kFutureTimeout);
+}
+
TEST_F(BatchWriteExecTest, RetryableErrorNoTxnNumber) {
// A retryable error without a txnNumber is not retried.
@@ -386,12 +475,12 @@ TEST_F(BatchWriteExecTest, RetryableErrorNoTxnNumber) {
BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats);
ASSERT(response.getOk());
- ASSERT_EQUALS(response.getN(), 0);
+ ASSERT_EQ(0, response.getN());
ASSERT(response.isErrDetailsSet());
ASSERT_EQUALS(response.getErrDetailsAt(0)->getErrCode(), retryableErrResponse.getErrCode());
ASSERT(response.getErrDetailsAt(0)->getErrMessage().find(
retryableErrResponse.getErrMessage()) != string::npos);
- ASSERT_EQUALS(stats.numRounds, 1);
+ ASSERT_EQ(1, stats.numRounds);
});
expectInsertsReturnError({BSON("x" << 1), BSON("x" << 2)}, retryableErrResponse);
@@ -429,7 +518,7 @@ TEST_F(BatchWriteExecTest, RetryableErrorTxnNumber) {
ASSERT(response.getOk());
ASSERT(!response.isErrDetailsSet());
- ASSERT_EQUALS(stats.numRounds, 1);
+ ASSERT_EQ(1, stats.numRounds);
});
expectInsertsReturnError({BSON("x" << 1), BSON("x" << 2)}, retryableErrResponse);
@@ -467,13 +556,13 @@ TEST_F(BatchWriteExecTest, NonRetryableErrorTxnNumber) {
BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats);
ASSERT(response.getOk());
- ASSERT_EQUALS(response.getN(), 0);
+ ASSERT_EQ(0, response.getN());
ASSERT(response.isErrDetailsSet());
ASSERT_EQUALS(response.getErrDetailsAt(0)->getErrCode(),
nonRetryableErrResponse.getErrCode());
ASSERT(response.getErrDetailsAt(0)->getErrMessage().find(
nonRetryableErrResponse.getErrMessage()) != string::npos);
- ASSERT_EQUALS(stats.numRounds, 1);
+ ASSERT_EQ(1, stats.numRounds);
});
expectInsertsReturnError({BSON("x" << 1), BSON("x" << 2)}, nonRetryableErrResponse);
diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp
index 802f1ebbda0..4dad23a96a9 100644
--- a/src/mongo/s/write_ops/batch_write_op.cpp
+++ b/src/mongo/s/write_ops/batch_write_op.cpp
@@ -46,6 +46,11 @@ using std::vector;
namespace {
+// Conservative overhead per element contained in the write batch. This value was calculated as 1
+// byte (element type) + 5 bytes (max string encoding of the array index encoded as string and the
+// maximum key is 99999) + 1 byte (zero terminator) = 7 bytes
+const int kBSONArrayPerElementOverheadBytes = 7;
+
struct BatchSize {
int numOps{0};
int sizeBytes{0};
@@ -214,7 +219,7 @@ void trackErrors(const ShardEndpoint& endpoint,
} // namespace
BatchWriteOp::BatchWriteOp(OperationContext* opCtx, const BatchedCommandRequest& clientRequest)
- : _opCtx(opCtx), _clientRequest(clientRequest) {
+ : _opCtx(opCtx), _clientRequest(clientRequest), _batchTxnNum(_opCtx->getTxnNumber()) {
_writeOps.reserve(_clientRequest.sizeWriteOps());
for (size_t i = 0; i < _clientRequest.sizeWriteOps(); ++i) {
@@ -330,11 +335,12 @@ Status BatchWriteOp::targetBatch(const NSTargeter& targeter,
}
}
- //
- // If this write will push us over some sort of size limit, stop targeting
- //
+ // Account the array overhead once for the actual updates array and once for the statement
+ // ids array, if retryable writes are used
+ const int writeSizeBytes = getWriteSizeBytes(writeOp) + kBSONArrayPerElementOverheadBytes +
+ (_batchTxnNum ? kBSONArrayPerElementOverheadBytes + 4 : 0);
- int writeSizeBytes = getWriteSizeBytes(writeOp);
+ // If this write will push us over some sort of size limit, stop targeting
if (wouldMakeBatchesTooBig(writes, writeSizeBytes, batchSizes)) {
invariant(!batchMap.empty());
writeOp.cancelWrites(NULL);
@@ -362,6 +368,9 @@ Status BatchWriteOp::targetBatch(const NSTargeter& targeter,
BatchSize& batchSize = batchSizeIt->second;
++batchSize.numOps;
+
+ // If the request contains transaction number, this means the end result will contain a
+ // statement ids array, so we need to account for that overhead.
batchSize.sizeBytes += writeSizeBytes;
batch->addWrite(write);
}
@@ -402,10 +411,9 @@ Status BatchWriteOp::targetBatch(const NSTargeter& targeter,
BatchedCommandRequest BatchWriteOp::buildBatchRequest(
const TargetedWriteBatch& targetedBatch) const {
const auto batchType = _clientRequest.getBatchType();
- const auto batchTxnNum = _opCtx->getTxnNumber();
boost::optional<std::vector<int32_t>> stmtIdsForOp;
- if (batchTxnNum) {
+ if (_batchTxnNum) {
stmtIdsForOp.emplace();
}
@@ -476,7 +484,7 @@ BatchedCommandRequest BatchWriteOp::buildBatchRequest(
_clientRequest.getWriteCommandBase().getBypassDocumentValidation());
wcb.setOrdered(_clientRequest.getWriteCommandBase().getOrdered());
- if (batchTxnNum) {
+ if (_batchTxnNum) {
wcb.setStmtIds(std::move(stmtIdsForOp));
}
diff --git a/src/mongo/s/write_ops/batch_write_op.h b/src/mongo/s/write_ops/batch_write_op.h
index ccda68f5ada..22d18b579ff 100644
--- a/src/mongo/s/write_ops/batch_write_op.h
+++ b/src/mongo/s/write_ops/batch_write_op.h
@@ -202,6 +202,9 @@ private:
// The incoming client request
const BatchedCommandRequest& _clientRequest;
+ // Cached transaction number (if one is present on the operation contex)
+ boost::optional<TxnNumber> _batchTxnNum;
+
// Array of ops being processed from the client request
std::vector<WriteOp> _writeOps;
@@ -240,6 +243,10 @@ public:
return _endpoint;
}
+ const std::vector<TargetedWrite*>& getWrites() const {
+ return _writes.vector();
+ }
+
/**
* TargetedWrite is owned here once given to the TargetedWriteBatch
*/
@@ -247,10 +254,6 @@ public:
_writes.mutableVector().push_back(targetedWrite);
}
- const std::vector<TargetedWrite*>& getWrites() const {
- return _writes.vector();
- }
-
private:
// Where to send the batch
const ShardEndpoint _endpoint;