diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/s/write_ops/batch_write_exec_test.cpp | 117 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_write_op.cpp | 24 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_write_op.h | 11 |
3 files changed, 126 insertions, 26 deletions
diff --git a/src/mongo/s/write_ops/batch_write_exec_test.cpp b/src/mongo/s/write_ops/batch_write_exec_test.cpp index 09f7f002b99..eb32b7c8204 100644 --- a/src/mongo/s/write_ops/batch_write_exec_test.cpp +++ b/src/mongo/s/write_ops/batch_write_exec_test.cpp @@ -95,6 +95,11 @@ public: } void expectInsertsReturnSuccess(const std::vector<BSONObj>& expected) { + expectInsertsReturnSuccess(expected.begin(), expected.end()); + } + + void expectInsertsReturnSuccess(std::vector<BSONObj>::const_iterator expectedFrom, + std::vector<BSONObj>::const_iterator expectedTo) { onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) { ASSERT_EQUALS(nss.db(), request.dbname); @@ -103,10 +108,11 @@ public: ASSERT_EQUALS(nss.toString(), actualBatchedInsert.getNS().ns()); const auto& inserted = actualBatchedInsert.getInsertRequest().getDocuments(); - ASSERT_EQUALS(expected.size(), inserted.size()); + const size_t expectedSize = std::distance(expectedFrom, expectedTo); + ASSERT_EQUALS(expectedSize, inserted.size()); auto itInserted = inserted.begin(); - auto itExpected = expected.begin(); + auto itExpected = expectedFrom; for (; itInserted != inserted.end(); itInserted++, itExpected++) { ASSERT_BSONOBJ_EQ(*itExpected, *itInserted); @@ -114,6 +120,7 @@ public: BatchedCommandResponse response; response.setOk(true); + response.setN(inserted.size()); return response.toBSON(); }); @@ -209,7 +216,8 @@ TEST_F(BatchWriteExecTest, SingleOp) { BatchWriteExecStats stats; BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats); ASSERT(response.getOk()); - ASSERT_EQUALS(stats.numRounds, 1); + ASSERT_EQ(1LL, response.getN()); + ASSERT_EQ(1, stats.numRounds); }); expectInsertsReturnSuccess(std::vector<BSONObj>{BSON("x" << 1)}); @@ -217,6 +225,44 @@ TEST_F(BatchWriteExecTest, SingleOp) { future.timed_get(kFutureTimeout); } +TEST_F(BatchWriteExecTest, MultiOpLarge) { + const int kNumDocsToInsert = 100'000; + const std::string kDocValue(200, 'x'); + + std::vector<BSONObj> docsToInsert; + docsToInsert.reserve(kNumDocsToInsert); + for (int i = 0; i < kNumDocsToInsert; i++) { + docsToInsert.push_back(BSON("_id" << i << "someLargeKeyToWasteSpace" << kDocValue)); + } + + BatchedCommandRequest request([&] { + write_ops::Insert insertOp(nss); + insertOp.setWriteCommandBase([] { + write_ops::WriteCommandBase writeCommandBase; + writeCommandBase.setOrdered(true); + return writeCommandBase; + }()); + insertOp.setDocuments(docsToInsert); + return insertOp; + }()); + request.setWriteConcern(BSONObj()); + + auto future = launchAsync([&] { + BatchedCommandResponse response; + BatchWriteExecStats stats; + BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats); + + ASSERT(response.getOk()); + ASSERT_EQUALS(response.getN(), kNumDocsToInsert); + ASSERT_EQUALS(stats.numRounds, 2); + }); + + expectInsertsReturnSuccess(docsToInsert.begin(), docsToInsert.begin() + 66576); + expectInsertsReturnSuccess(docsToInsert.begin() + 66576, docsToInsert.end()); + + future.timed_get(kFutureTimeout); +} + TEST_F(BatchWriteExecTest, SingleOpError) { BatchedCommandResponse errResponse; errResponse.setOk(false); @@ -240,13 +286,13 @@ TEST_F(BatchWriteExecTest, SingleOpError) { BatchWriteExecStats stats; BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats); ASSERT(response.getOk()); - ASSERT_EQUALS(response.getN(), 0); + ASSERT_EQ(0, response.getN()); ASSERT(response.isErrDetailsSet()); - ASSERT_EQUALS(response.getErrDetailsAt(0)->getErrCode(), errResponse.getErrCode()); + ASSERT_EQ(errResponse.getErrCode(), response.getErrDetailsAt(0)->getErrCode()); ASSERT(response.getErrDetailsAt(0)->getErrMessage().find(errResponse.getErrMessage()) != string::npos); - ASSERT_EQUALS(stats.numRounds, 1); + ASSERT_EQ(1, stats.numRounds); }); expectInsertsReturnError({BSON("x" << 1)}, errResponse); @@ -278,7 +324,7 @@ TEST_F(BatchWriteExecTest, StaleOp) { BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats); ASSERT(response.getOk()); - ASSERT_EQUALS(stats.numStaleBatches, 1); + ASSERT_EQUALS(1, stats.numStaleBatches); }); const std::vector<BSONObj> expected{BSON("x" << 1)}; @@ -308,7 +354,7 @@ TEST_F(BatchWriteExecTest, MultiStaleOp) { BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats); ASSERT(response.getOk()); - ASSERT_EQUALS(stats.numStaleBatches, 3); + ASSERT_EQUALS(3, stats.numStaleBatches); }); const std::vector<BSONObj> expected{BSON("x" << 1)}; @@ -344,7 +390,7 @@ TEST_F(BatchWriteExecTest, TooManyStaleOp) { BatchWriteExecStats stats; BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats); ASSERT(response.getOk()); - ASSERT_EQUALS(response.getN(), 0); + ASSERT_EQ(0, response.getN()); ASSERT(response.isErrDetailsSet()); ASSERT_EQUALS(response.getErrDetailsAt(0)->getErrCode(), ErrorCodes::NoProgressMade); ASSERT_EQUALS(response.getErrDetailsAt(1)->getErrCode(), ErrorCodes::NoProgressMade); @@ -360,6 +406,49 @@ TEST_F(BatchWriteExecTest, TooManyStaleOp) { future.timed_get(kFutureTimeout); } +TEST_F(BatchWriteExecTest, RetryableWritesLargeBatch) { + // A retryable error without a txnNumber is not retried. + + const int kNumDocsToInsert = 100'000; + const std::string kDocValue(200, 'x'); + + std::vector<BSONObj> docsToInsert; + docsToInsert.reserve(kNumDocsToInsert); + for (int i = 0; i < kNumDocsToInsert; i++) { + docsToInsert.push_back(BSON("_id" << i << "someLargeKeyToWasteSpace" << kDocValue)); + } + + BatchedCommandRequest request([&] { + write_ops::Insert insertOp(nss); + insertOp.setWriteCommandBase([] { + write_ops::WriteCommandBase writeCommandBase; + writeCommandBase.setOrdered(true); + return writeCommandBase; + }()); + insertOp.setDocuments(docsToInsert); + return insertOp; + }()); + request.setWriteConcern(BSONObj()); + + operationContext()->setLogicalSessionId(makeLogicalSessionIdForTest()); + operationContext()->setTxnNumber(5); + + auto future = launchAsync([&] { + BatchedCommandResponse response; + BatchWriteExecStats stats; + BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats); + + ASSERT(response.getOk()); + ASSERT_EQUALS(response.getN(), kNumDocsToInsert); + ASSERT_EQUALS(stats.numRounds, 2); + }); + + expectInsertsReturnSuccess(docsToInsert.begin(), docsToInsert.begin() + 63791); + expectInsertsReturnSuccess(docsToInsert.begin() + 63791, docsToInsert.end()); + + future.timed_get(kFutureTimeout); +} + TEST_F(BatchWriteExecTest, RetryableErrorNoTxnNumber) { // A retryable error without a txnNumber is not retried. @@ -386,12 +475,12 @@ TEST_F(BatchWriteExecTest, RetryableErrorNoTxnNumber) { BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats); ASSERT(response.getOk()); - ASSERT_EQUALS(response.getN(), 0); + ASSERT_EQ(0, response.getN()); ASSERT(response.isErrDetailsSet()); ASSERT_EQUALS(response.getErrDetailsAt(0)->getErrCode(), retryableErrResponse.getErrCode()); ASSERT(response.getErrDetailsAt(0)->getErrMessage().find( retryableErrResponse.getErrMessage()) != string::npos); - ASSERT_EQUALS(stats.numRounds, 1); + ASSERT_EQ(1, stats.numRounds); }); expectInsertsReturnError({BSON("x" << 1), BSON("x" << 2)}, retryableErrResponse); @@ -429,7 +518,7 @@ TEST_F(BatchWriteExecTest, RetryableErrorTxnNumber) { ASSERT(response.getOk()); ASSERT(!response.isErrDetailsSet()); - ASSERT_EQUALS(stats.numRounds, 1); + ASSERT_EQ(1, stats.numRounds); }); expectInsertsReturnError({BSON("x" << 1), BSON("x" << 2)}, retryableErrResponse); @@ -467,13 +556,13 @@ TEST_F(BatchWriteExecTest, NonRetryableErrorTxnNumber) { BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats); ASSERT(response.getOk()); - ASSERT_EQUALS(response.getN(), 0); + ASSERT_EQ(0, response.getN()); ASSERT(response.isErrDetailsSet()); ASSERT_EQUALS(response.getErrDetailsAt(0)->getErrCode(), nonRetryableErrResponse.getErrCode()); ASSERT(response.getErrDetailsAt(0)->getErrMessage().find( nonRetryableErrResponse.getErrMessage()) != string::npos); - ASSERT_EQUALS(stats.numRounds, 1); + ASSERT_EQ(1, stats.numRounds); }); expectInsertsReturnError({BSON("x" << 1), BSON("x" << 2)}, nonRetryableErrResponse); diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp index 802f1ebbda0..4dad23a96a9 100644 --- a/src/mongo/s/write_ops/batch_write_op.cpp +++ b/src/mongo/s/write_ops/batch_write_op.cpp @@ -46,6 +46,11 @@ using std::vector; namespace { +// Conservative overhead per element contained in the write batch. This value was calculated as 1 +// byte (element type) + 5 bytes (max string encoding of the array index encoded as string and the +// maximum key is 99999) + 1 byte (zero terminator) = 7 bytes +const int kBSONArrayPerElementOverheadBytes = 7; + struct BatchSize { int numOps{0}; int sizeBytes{0}; @@ -214,7 +219,7 @@ void trackErrors(const ShardEndpoint& endpoint, } // namespace BatchWriteOp::BatchWriteOp(OperationContext* opCtx, const BatchedCommandRequest& clientRequest) - : _opCtx(opCtx), _clientRequest(clientRequest) { + : _opCtx(opCtx), _clientRequest(clientRequest), _batchTxnNum(_opCtx->getTxnNumber()) { _writeOps.reserve(_clientRequest.sizeWriteOps()); for (size_t i = 0; i < _clientRequest.sizeWriteOps(); ++i) { @@ -330,11 +335,12 @@ Status BatchWriteOp::targetBatch(const NSTargeter& targeter, } } - // - // If this write will push us over some sort of size limit, stop targeting - // + // Account the array overhead once for the actual updates array and once for the statement + // ids array, if retryable writes are used + const int writeSizeBytes = getWriteSizeBytes(writeOp) + kBSONArrayPerElementOverheadBytes + + (_batchTxnNum ? kBSONArrayPerElementOverheadBytes + 4 : 0); - int writeSizeBytes = getWriteSizeBytes(writeOp); + // If this write will push us over some sort of size limit, stop targeting if (wouldMakeBatchesTooBig(writes, writeSizeBytes, batchSizes)) { invariant(!batchMap.empty()); writeOp.cancelWrites(NULL); @@ -362,6 +368,9 @@ Status BatchWriteOp::targetBatch(const NSTargeter& targeter, BatchSize& batchSize = batchSizeIt->second; ++batchSize.numOps; + + // If the request contains transaction number, this means the end result will contain a + // statement ids array, so we need to account for that overhead. batchSize.sizeBytes += writeSizeBytes; batch->addWrite(write); } @@ -402,10 +411,9 @@ Status BatchWriteOp::targetBatch(const NSTargeter& targeter, BatchedCommandRequest BatchWriteOp::buildBatchRequest( const TargetedWriteBatch& targetedBatch) const { const auto batchType = _clientRequest.getBatchType(); - const auto batchTxnNum = _opCtx->getTxnNumber(); boost::optional<std::vector<int32_t>> stmtIdsForOp; - if (batchTxnNum) { + if (_batchTxnNum) { stmtIdsForOp.emplace(); } @@ -476,7 +484,7 @@ BatchedCommandRequest BatchWriteOp::buildBatchRequest( _clientRequest.getWriteCommandBase().getBypassDocumentValidation()); wcb.setOrdered(_clientRequest.getWriteCommandBase().getOrdered()); - if (batchTxnNum) { + if (_batchTxnNum) { wcb.setStmtIds(std::move(stmtIdsForOp)); } diff --git a/src/mongo/s/write_ops/batch_write_op.h b/src/mongo/s/write_ops/batch_write_op.h index ccda68f5ada..22d18b579ff 100644 --- a/src/mongo/s/write_ops/batch_write_op.h +++ b/src/mongo/s/write_ops/batch_write_op.h @@ -202,6 +202,9 @@ private: // The incoming client request const BatchedCommandRequest& _clientRequest; + // Cached transaction number (if one is present on the operation contex) + boost::optional<TxnNumber> _batchTxnNum; + // Array of ops being processed from the client request std::vector<WriteOp> _writeOps; @@ -240,6 +243,10 @@ public: return _endpoint; } + const std::vector<TargetedWrite*>& getWrites() const { + return _writes.vector(); + } + /** * TargetedWrite is owned here once given to the TargetedWriteBatch */ @@ -247,10 +254,6 @@ public: _writes.mutableVector().push_back(targetedWrite); } - const std::vector<TargetedWrite*>& getWrites() const { - return _writes.vector(); - } - private: // Where to send the batch const ShardEndpoint _endpoint; |