summaryrefslogtreecommitdiff
path: root/src/mongo/db/commands/bulk_write.cpp
diff options
context:
space:
mode:
authorLingzhi Deng <lingzhi.deng@mongodb.com>2023-04-17 19:53:00 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-04-17 21:05:47 +0000
commita45535c81c645dc190485e033a6ed6b91678bd50 (patch)
tree59ff4ee069e268721d3089b3966463442d17d254 /src/mongo/db/commands/bulk_write.cpp
parentabc8a9c1f151bcdb3c61d3cd58a95cddfcf7966e (diff)
downloadmongo-a45535c81c645dc190485e033a6ed6b91678bd50.tar.gz
SERVER-72794: Implement cursor response for bulkWrite on mongos
Diffstat (limited to 'src/mongo/db/commands/bulk_write.cpp')
-rw-r--r--src/mongo/db/commands/bulk_write.cpp31
1 files changed, 12 insertions, 19 deletions
diff --git a/src/mongo/db/commands/bulk_write.cpp b/src/mongo/db/commands/bulk_write.cpp
index a60240b36e1..6a5ab7b7039 100644
--- a/src/mongo/db/commands/bulk_write.cpp
+++ b/src/mongo/db/commands/bulk_write.cpp
@@ -57,6 +57,7 @@
#include "mongo/db/ops/write_ops_exec.h"
#include "mongo/db/ops/write_ops_gen.h"
#include "mongo/db/ops/write_ops_retryability.h"
+#include "mongo/db/query/find_common.h"
#include "mongo/db/query/plan_executor_factory.h"
#include "mongo/db/query/query_knobs_gen.h"
#include "mongo/db/repl/oplog.h"
@@ -755,17 +756,6 @@ bool handleDeleteOp(OperationContext* opCtx,
}
}
-bool haveSpaceForNext(const BSONObj& nextDoc, long long numDocs, size_t bytesBuffered) {
- invariant(numDocs >= 0);
- if (!numDocs) {
- // Allow the first output document to exceed the limit to ensure we can always make
- // progress.
- return true;
- }
-
- return (bytesBuffered + nextDoc.objsize()) <= BSONObjMaxUserSize;
-}
-
class BulkWriteCmd : public BulkWriteCmdVersion1Gen<BulkWriteCmd> {
public:
bool adminOnly() const final {
@@ -846,6 +836,7 @@ public:
const BulkWriteCommandRequest& req,
bulk_write::BulkWriteReplyItems replies,
bulk_write::RetriedStmtIds retriedStmtIds) {
+ auto reqObj = unparsedRequest().body;
const NamespaceString cursorNss = NamespaceString::makeBulkWriteNSS();
auto expCtx = make_intrusive<ExpressionContext>(
opCtx, std::unique_ptr<CollatorInterface>(nullptr), ns());
@@ -879,8 +870,8 @@ public:
batchSize = *req.getCursor()->getBatchSize();
}
- size_t numReplies = 0;
- size_t bytesBuffered = 0;
+ size_t numRepliesInFirstBatch = 0;
+ FindCommon::BSONArrayResponseSizeTracker responseSizeTracker;
for (long long objCount = 0; objCount < batchSize; objCount++) {
BSONObj nextDoc;
PlanExecutor::ExecState state = exec->getNext(&nextDoc, nullptr);
@@ -891,16 +882,17 @@ public:
// If we can't fit this result inside the current batch, then we stash it for
// later.
- if (!haveSpaceForNext(nextDoc, objCount, bytesBuffered)) {
+ if (!responseSizeTracker.haveSpaceForNext(nextDoc)) {
exec->stashResult(nextDoc);
break;
}
- numReplies++;
- bytesBuffered += nextDoc.objsize();
+ numRepliesInFirstBatch++;
+ responseSizeTracker.add(nextDoc);
}
if (exec->isEOF()) {
- invariant(numReplies == replies.size());
+ invariant(numRepliesInFirstBatch == replies.size());
+ collectTelemetryMongod(opCtx, reqObj, numRepliesInFirstBatch);
auto reply = BulkWriteCommandReply(BulkWriteCommandResponseCursor(
0, std::vector<BulkWriteReplyItem>(std::move(replies))));
if (!retriedStmtIds.empty()) {
@@ -921,14 +913,15 @@ public:
opCtx->getWriteConcern(),
repl::ReadConcernArgs::get(opCtx),
ReadPreferenceSetting::get(opCtx),
- unparsedRequest().body,
+ reqObj,
bulk_write_common::getPrivileges(req)});
auto cursorId = pinnedCursor.getCursor()->cursorid();
pinnedCursor->incNBatches();
pinnedCursor->incNReturnedSoFar(replies.size());
+ collectTelemetryMongod(opCtx, pinnedCursor, numRepliesInFirstBatch);
- replies.resize(numReplies);
+ replies.resize(numRepliesInFirstBatch);
auto reply = BulkWriteCommandReply(BulkWriteCommandResponseCursor(
cursorId, std::vector<BulkWriteReplyItem>(std::move(replies))));
if (!retriedStmtIds.empty()) {