diff options
author | Lingzhi Deng <lingzhi.deng@mongodb.com> | 2023-04-17 19:53:00 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-04-17 21:05:47 +0000 |
commit | a45535c81c645dc190485e033a6ed6b91678bd50 (patch) | |
tree | 59ff4ee069e268721d3089b3966463442d17d254 /src/mongo/db/commands/bulk_write.cpp | |
parent | abc8a9c1f151bcdb3c61d3cd58a95cddfcf7966e (diff) | |
download | mongo-a45535c81c645dc190485e033a6ed6b91678bd50.tar.gz |
SERVER-72794: Implement cursor response for bulkWrite on mongos
Diffstat (limited to 'src/mongo/db/commands/bulk_write.cpp')
-rw-r--r-- | src/mongo/db/commands/bulk_write.cpp | 31 |
1 files changed, 12 insertions, 19 deletions
diff --git a/src/mongo/db/commands/bulk_write.cpp b/src/mongo/db/commands/bulk_write.cpp index a60240b36e1..6a5ab7b7039 100644 --- a/src/mongo/db/commands/bulk_write.cpp +++ b/src/mongo/db/commands/bulk_write.cpp @@ -57,6 +57,7 @@ #include "mongo/db/ops/write_ops_exec.h" #include "mongo/db/ops/write_ops_gen.h" #include "mongo/db/ops/write_ops_retryability.h" +#include "mongo/db/query/find_common.h" #include "mongo/db/query/plan_executor_factory.h" #include "mongo/db/query/query_knobs_gen.h" #include "mongo/db/repl/oplog.h" @@ -755,17 +756,6 @@ bool handleDeleteOp(OperationContext* opCtx, } } -bool haveSpaceForNext(const BSONObj& nextDoc, long long numDocs, size_t bytesBuffered) { - invariant(numDocs >= 0); - if (!numDocs) { - // Allow the first output document to exceed the limit to ensure we can always make - // progress. - return true; - } - - return (bytesBuffered + nextDoc.objsize()) <= BSONObjMaxUserSize; -} - class BulkWriteCmd : public BulkWriteCmdVersion1Gen<BulkWriteCmd> { public: bool adminOnly() const final { @@ -846,6 +836,7 @@ public: const BulkWriteCommandRequest& req, bulk_write::BulkWriteReplyItems replies, bulk_write::RetriedStmtIds retriedStmtIds) { + auto reqObj = unparsedRequest().body; const NamespaceString cursorNss = NamespaceString::makeBulkWriteNSS(); auto expCtx = make_intrusive<ExpressionContext>( opCtx, std::unique_ptr<CollatorInterface>(nullptr), ns()); @@ -879,8 +870,8 @@ public: batchSize = *req.getCursor()->getBatchSize(); } - size_t numReplies = 0; - size_t bytesBuffered = 0; + size_t numRepliesInFirstBatch = 0; + FindCommon::BSONArrayResponseSizeTracker responseSizeTracker; for (long long objCount = 0; objCount < batchSize; objCount++) { BSONObj nextDoc; PlanExecutor::ExecState state = exec->getNext(&nextDoc, nullptr); @@ -891,16 +882,17 @@ public: // If we can't fit this result inside the current batch, then we stash it for // later. - if (!haveSpaceForNext(nextDoc, objCount, bytesBuffered)) { + if (!responseSizeTracker.haveSpaceForNext(nextDoc)) { exec->stashResult(nextDoc); break; } - numReplies++; - bytesBuffered += nextDoc.objsize(); + numRepliesInFirstBatch++; + responseSizeTracker.add(nextDoc); } if (exec->isEOF()) { - invariant(numReplies == replies.size()); + invariant(numRepliesInFirstBatch == replies.size()); + collectTelemetryMongod(opCtx, reqObj, numRepliesInFirstBatch); auto reply = BulkWriteCommandReply(BulkWriteCommandResponseCursor( 0, std::vector<BulkWriteReplyItem>(std::move(replies)))); if (!retriedStmtIds.empty()) { @@ -921,14 +913,15 @@ public: opCtx->getWriteConcern(), repl::ReadConcernArgs::get(opCtx), ReadPreferenceSetting::get(opCtx), - unparsedRequest().body, + reqObj, bulk_write_common::getPrivileges(req)}); auto cursorId = pinnedCursor.getCursor()->cursorid(); pinnedCursor->incNBatches(); pinnedCursor->incNReturnedSoFar(replies.size()); + collectTelemetryMongod(opCtx, pinnedCursor, numRepliesInFirstBatch); - replies.resize(numReplies); + replies.resize(numRepliesInFirstBatch); auto reply = BulkWriteCommandReply(BulkWriteCommandResponseCursor( cursorId, std::vector<BulkWriteReplyItem>(std::move(replies)))); if (!retriedStmtIds.empty()) { |