diff options
Diffstat (limited to 'src/mongo/db/commands/bulk_write.cpp')
-rw-r--r-- | src/mongo/db/commands/bulk_write.cpp | 31 |
1 files changed, 12 insertions, 19 deletions
diff --git a/src/mongo/db/commands/bulk_write.cpp b/src/mongo/db/commands/bulk_write.cpp index a60240b36e1..6a5ab7b7039 100644 --- a/src/mongo/db/commands/bulk_write.cpp +++ b/src/mongo/db/commands/bulk_write.cpp @@ -57,6 +57,7 @@ #include "mongo/db/ops/write_ops_exec.h" #include "mongo/db/ops/write_ops_gen.h" #include "mongo/db/ops/write_ops_retryability.h" +#include "mongo/db/query/find_common.h" #include "mongo/db/query/plan_executor_factory.h" #include "mongo/db/query/query_knobs_gen.h" #include "mongo/db/repl/oplog.h" @@ -755,17 +756,6 @@ bool handleDeleteOp(OperationContext* opCtx, } } -bool haveSpaceForNext(const BSONObj& nextDoc, long long numDocs, size_t bytesBuffered) { - invariant(numDocs >= 0); - if (!numDocs) { - // Allow the first output document to exceed the limit to ensure we can always make - // progress. - return true; - } - - return (bytesBuffered + nextDoc.objsize()) <= BSONObjMaxUserSize; -} - class BulkWriteCmd : public BulkWriteCmdVersion1Gen<BulkWriteCmd> { public: bool adminOnly() const final { @@ -846,6 +836,7 @@ public: const BulkWriteCommandRequest& req, bulk_write::BulkWriteReplyItems replies, bulk_write::RetriedStmtIds retriedStmtIds) { + auto reqObj = unparsedRequest().body; const NamespaceString cursorNss = NamespaceString::makeBulkWriteNSS(); auto expCtx = make_intrusive<ExpressionContext>( opCtx, std::unique_ptr<CollatorInterface>(nullptr), ns()); @@ -879,8 +870,8 @@ public: batchSize = *req.getCursor()->getBatchSize(); } - size_t numReplies = 0; - size_t bytesBuffered = 0; + size_t numRepliesInFirstBatch = 0; + FindCommon::BSONArrayResponseSizeTracker responseSizeTracker; for (long long objCount = 0; objCount < batchSize; objCount++) { BSONObj nextDoc; PlanExecutor::ExecState state = exec->getNext(&nextDoc, nullptr); @@ -891,16 +882,17 @@ public: // If we can't fit this result inside the current batch, then we stash it for // later. - if (!haveSpaceForNext(nextDoc, objCount, bytesBuffered)) { + if (!responseSizeTracker.haveSpaceForNext(nextDoc)) { exec->stashResult(nextDoc); break; } - numReplies++; - bytesBuffered += nextDoc.objsize(); + numRepliesInFirstBatch++; + responseSizeTracker.add(nextDoc); } if (exec->isEOF()) { - invariant(numReplies == replies.size()); + invariant(numRepliesInFirstBatch == replies.size()); + collectTelemetryMongod(opCtx, reqObj, numRepliesInFirstBatch); auto reply = BulkWriteCommandReply(BulkWriteCommandResponseCursor( 0, std::vector<BulkWriteReplyItem>(std::move(replies)))); if (!retriedStmtIds.empty()) { @@ -921,14 +913,15 @@ public: opCtx->getWriteConcern(), repl::ReadConcernArgs::get(opCtx), ReadPreferenceSetting::get(opCtx), - unparsedRequest().body, + reqObj, bulk_write_common::getPrivileges(req)}); auto cursorId = pinnedCursor.getCursor()->cursorid(); pinnedCursor->incNBatches(); pinnedCursor->incNReturnedSoFar(replies.size()); + collectTelemetryMongod(opCtx, pinnedCursor, numRepliesInFirstBatch); - replies.resize(numReplies); + replies.resize(numRepliesInFirstBatch); auto reply = BulkWriteCommandReply(BulkWriteCommandResponseCursor( cursorId, std::vector<BulkWriteReplyItem>(std::move(replies)))); if (!retriedStmtIds.empty()) { |