summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLingzhi Deng <lingzhi.deng@mongodb.com>2023-04-17 19:53:00 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-04-17 21:05:47 +0000
commita45535c81c645dc190485e033a6ed6b91678bd50 (patch)
tree59ff4ee069e268721d3089b3966463442d17d254
parentabc8a9c1f151bcdb3c61d3cd58a95cddfcf7966e (diff)
downloadmongo-a45535c81c645dc190485e033a6ed6b91678bd50.tar.gz
SERVER-72794: Implement cursor response for bulkWrite on mongos
-rw-r--r--src/mongo/db/commands/bulk_write.cpp31
-rw-r--r--src/mongo/s/cluster_cursor_stats.cpp1
-rw-r--r--src/mongo/s/commands/cluster_bulk_write_cmd.cpp87
-rw-r--r--src/mongo/s/query/SConscript1
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.cpp3
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.h6
-rw-r--r--src/mongo/s/query/router_stage_queued_data.cpp75
-rw-r--r--src/mongo/s/query/router_stage_queued_data.h71
8 files changed, 251 insertions, 24 deletions
diff --git a/src/mongo/db/commands/bulk_write.cpp b/src/mongo/db/commands/bulk_write.cpp
index a60240b36e1..6a5ab7b7039 100644
--- a/src/mongo/db/commands/bulk_write.cpp
+++ b/src/mongo/db/commands/bulk_write.cpp
@@ -57,6 +57,7 @@
#include "mongo/db/ops/write_ops_exec.h"
#include "mongo/db/ops/write_ops_gen.h"
#include "mongo/db/ops/write_ops_retryability.h"
+#include "mongo/db/query/find_common.h"
#include "mongo/db/query/plan_executor_factory.h"
#include "mongo/db/query/query_knobs_gen.h"
#include "mongo/db/repl/oplog.h"
@@ -755,17 +756,6 @@ bool handleDeleteOp(OperationContext* opCtx,
}
}
-bool haveSpaceForNext(const BSONObj& nextDoc, long long numDocs, size_t bytesBuffered) {
- invariant(numDocs >= 0);
- if (!numDocs) {
- // Allow the first output document to exceed the limit to ensure we can always make
- // progress.
- return true;
- }
-
- return (bytesBuffered + nextDoc.objsize()) <= BSONObjMaxUserSize;
-}
-
class BulkWriteCmd : public BulkWriteCmdVersion1Gen<BulkWriteCmd> {
public:
bool adminOnly() const final {
@@ -846,6 +836,7 @@ public:
const BulkWriteCommandRequest& req,
bulk_write::BulkWriteReplyItems replies,
bulk_write::RetriedStmtIds retriedStmtIds) {
+ auto reqObj = unparsedRequest().body;
const NamespaceString cursorNss = NamespaceString::makeBulkWriteNSS();
auto expCtx = make_intrusive<ExpressionContext>(
opCtx, std::unique_ptr<CollatorInterface>(nullptr), ns());
@@ -879,8 +870,8 @@ public:
batchSize = *req.getCursor()->getBatchSize();
}
- size_t numReplies = 0;
- size_t bytesBuffered = 0;
+ size_t numRepliesInFirstBatch = 0;
+ FindCommon::BSONArrayResponseSizeTracker responseSizeTracker;
for (long long objCount = 0; objCount < batchSize; objCount++) {
BSONObj nextDoc;
PlanExecutor::ExecState state = exec->getNext(&nextDoc, nullptr);
@@ -891,16 +882,17 @@ public:
// If we can't fit this result inside the current batch, then we stash it for
// later.
- if (!haveSpaceForNext(nextDoc, objCount, bytesBuffered)) {
+ if (!responseSizeTracker.haveSpaceForNext(nextDoc)) {
exec->stashResult(nextDoc);
break;
}
- numReplies++;
- bytesBuffered += nextDoc.objsize();
+ numRepliesInFirstBatch++;
+ responseSizeTracker.add(nextDoc);
}
if (exec->isEOF()) {
- invariant(numReplies == replies.size());
+ invariant(numRepliesInFirstBatch == replies.size());
+ collectTelemetryMongod(opCtx, reqObj, numRepliesInFirstBatch);
auto reply = BulkWriteCommandReply(BulkWriteCommandResponseCursor(
0, std::vector<BulkWriteReplyItem>(std::move(replies))));
if (!retriedStmtIds.empty()) {
@@ -921,14 +913,15 @@ public:
opCtx->getWriteConcern(),
repl::ReadConcernArgs::get(opCtx),
ReadPreferenceSetting::get(opCtx),
- unparsedRequest().body,
+ reqObj,
bulk_write_common::getPrivileges(req)});
auto cursorId = pinnedCursor.getCursor()->cursorid();
pinnedCursor->incNBatches();
pinnedCursor->incNReturnedSoFar(replies.size());
+ collectTelemetryMongod(opCtx, pinnedCursor, numRepliesInFirstBatch);
- replies.resize(numReplies);
+ replies.resize(numRepliesInFirstBatch);
auto reply = BulkWriteCommandReply(BulkWriteCommandResponseCursor(
cursorId, std::vector<BulkWriteReplyItem>(std::move(replies))));
if (!retriedStmtIds.empty()) {
diff --git a/src/mongo/s/cluster_cursor_stats.cpp b/src/mongo/s/cluster_cursor_stats.cpp
index 46ae32f88cd..ba46c1d1e3c 100644
--- a/src/mongo/s/cluster_cursor_stats.cpp
+++ b/src/mongo/s/cluster_cursor_stats.cpp
@@ -52,6 +52,7 @@ public:
auto stats = grid->getCursorManager()->stats();
openBob.append("multiTarget", static_cast<long long>(stats.cursorsMultiTarget));
openBob.append("singleTarget", static_cast<long long>(stats.cursorsSingleTarget));
+ openBob.append("queuedData", static_cast<long long>(stats.cursorsQueuedData));
openBob.append("pinned", static_cast<long long>(stats.cursorsPinned));
openBob.append(
"total",
diff --git a/src/mongo/s/commands/cluster_bulk_write_cmd.cpp b/src/mongo/s/commands/cluster_bulk_write_cmd.cpp
index edf181fd7e7..1c62749c8c5 100644
--- a/src/mongo/s/commands/cluster_bulk_write_cmd.cpp
+++ b/src/mongo/s/commands/cluster_bulk_write_cmd.cpp
@@ -40,11 +40,16 @@
#include "mongo/db/commands/bulk_write_common.h"
#include "mongo/db/commands/bulk_write_gen.h"
#include "mongo/db/not_primary_error_tracker.h"
+#include "mongo/db/query/find_common.h"
#include "mongo/db/query/query_knobs_gen.h"
#include "mongo/db/server_feature_flags_gen.h"
#include "mongo/db/server_options.h"
#include "mongo/db/service_context.h"
#include "mongo/s/cluster_write.h"
+#include "mongo/s/grid.h"
+#include "mongo/s/query/cluster_client_cursor_impl.h"
+#include "mongo/s/query/cluster_cursor_manager.h"
+#include "mongo/s/query/router_stage_queued_data.h"
namespace mongo {
namespace {
@@ -107,11 +112,7 @@ public:
bulk_write_common::validateRequest(request(), opCtx->isRetryableWrite());
auto replyItems = cluster::bulkWrite(opCtx, request());
-
- auto reply = Reply();
- // TODO(SERVER-72794): Support cursor response for bulkWrite on mongos.
- reply.setCursor(BulkWriteCommandResponseCursor(0, replyItems));
- return reply;
+ return _populateCursorReply(opCtx, std::move(replyItems));
}
void doCheckAuthorization(OperationContext* opCtx) const final try {
@@ -123,6 +124,82 @@ public:
NotPrimaryErrorTracker::get(opCtx->getClient()).recordError(e.code());
throw;
}
+
+ private:
+ Reply _populateCursorReply(OperationContext* opCtx,
+ std::vector<BulkWriteReplyItem> replyItems) {
+ const auto& req = request();
+ auto reqObj = unparsedRequest().body;
+
+ const NamespaceString cursorNss = NamespaceString::makeBulkWriteNSS();
+ ClusterClientCursorParams params(cursorNss,
+ APIParameters::get(opCtx),
+ ReadPreferenceSetting::get(opCtx),
+ repl::ReadConcernArgs::get(opCtx));
+
+ long long batchSize = std::numeric_limits<long long>::max();
+ if (req.getCursor() && req.getCursor()->getBatchSize()) {
+ params.batchSize = request().getCursor()->getBatchSize();
+ batchSize = *req.getCursor()->getBatchSize();
+ }
+ params.originatingCommandObj = reqObj.getOwned();
+ params.originatingPrivileges = bulk_write_common::getPrivileges(req);
+ params.lsid = opCtx->getLogicalSessionId();
+ params.txnNumber = opCtx->getTxnNumber();
+
+ auto queuedDataStage = std::make_unique<RouterStageQueuedData>(opCtx);
+ for (auto& replyItem : replyItems) {
+ queuedDataStage->queueResult(replyItem.toBSON());
+ }
+
+ auto ccc =
+ ClusterClientCursorImpl::make(opCtx, std::move(queuedDataStage), std::move(params));
+
+ size_t numRepliesInFirstBatch = 0;
+ FindCommon::BSONArrayResponseSizeTracker responseSizeTracker;
+ for (long long objCount = 0; objCount < batchSize; objCount++) {
+ auto next = uassertStatusOK(ccc->next());
+
+ if (next.isEOF()) {
+ break;
+ }
+
+ auto nextObj = *next.getResult();
+ if (!responseSizeTracker.haveSpaceForNext(nextObj)) {
+ ccc->queueResult(nextObj);
+ break;
+ }
+
+ numRepliesInFirstBatch++;
+ responseSizeTracker.add(nextObj);
+ }
+ if (numRepliesInFirstBatch == replyItems.size()) {
+ collectTelemetryMongos(opCtx, reqObj, numRepliesInFirstBatch);
+ return BulkWriteCommandReply(BulkWriteCommandResponseCursor(
+ 0, std::vector<BulkWriteReplyItem>(std::move(replyItems))));
+ }
+
+ ccc->detachFromOperationContext();
+ ccc->incNBatches();
+ collectTelemetryMongos(opCtx, ccc, numRepliesInFirstBatch);
+
+ auto authUser =
+ AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserName();
+ auto cursorId = uassertStatusOK(Grid::get(opCtx)->getCursorManager()->registerCursor(
+ opCtx,
+ ccc.releaseCursor(),
+ cursorNss,
+ ClusterCursorManager::CursorType::QueuedData,
+ ClusterCursorManager::CursorLifetime::Mortal,
+ authUser));
+
+ // Record the cursorID in CurOp.
+ CurOp::get(opCtx)->debug().cursorid = cursorId;
+
+ replyItems.resize(numRepliesInFirstBatch);
+ return BulkWriteCommandReply(BulkWriteCommandResponseCursor(
+ cursorId, std::vector<BulkWriteReplyItem>(std::move(replyItems))));
+ }
};
} clusterBulkWriteCmd;
diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript
index 1664b79209d..55ca63a433d 100644
--- a/src/mongo/s/query/SConscript
+++ b/src/mongo/s/query/SConscript
@@ -62,6 +62,7 @@ env.Library(
'router_stage_limit.cpp',
'router_stage_mock.cpp',
'router_stage_pipeline.cpp',
+ 'router_stage_queued_data.cpp',
'router_stage_remove_metadata_fields.cpp',
'router_stage_skip.cpp',
],
diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp
index 9d6e3a49f7f..152c990f163 100644
--- a/src/mongo/s/query/cluster_cursor_manager.cpp
+++ b/src/mongo/s/query/cluster_cursor_manager.cpp
@@ -455,6 +455,9 @@ ClusterCursorManager::Stats ClusterCursorManager::stats() const {
case CursorType::MultiTarget:
++stats.cursorsMultiTarget;
break;
+ case CursorType::QueuedData:
+ ++stats.cursorsQueuedData;
+ break;
}
}
diff --git a/src/mongo/s/query/cluster_cursor_manager.h b/src/mongo/s/query/cluster_cursor_manager.h
index 8666b262c5d..ec8f9576e40 100644
--- a/src/mongo/s/query/cluster_cursor_manager.h
+++ b/src/mongo/s/query/cluster_cursor_manager.h
@@ -85,6 +85,9 @@ public:
// Represents a cursor retrieving data from multiple remote sources.
MultiTarget,
+
+ // Represents a cursor retrieving data queued in memory on the router.
+ QueuedData,
};
enum class CursorLifetime {
@@ -110,6 +113,9 @@ public:
// Count of open cursors registered with CursorType::SingleTarget.
size_t cursorsSingleTarget = 0;
+ // Count of open cursors registered with CursorType::QueuedData.
+ size_t cursorsQueuedData = 0;
+
// Count of pinned cursors.
size_t cursorsPinned = 0;
};
diff --git a/src/mongo/s/query/router_stage_queued_data.cpp b/src/mongo/s/query/router_stage_queued_data.cpp
new file mode 100644
index 00000000000..8e64665cf1b
--- /dev/null
+++ b/src/mongo/s/query/router_stage_queued_data.cpp
@@ -0,0 +1,75 @@
+/**
+ * Copyright (C) 2023-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/query/router_stage_queued_data.h"
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
+
+
+namespace mongo {
+
+void RouterStageQueuedData::queueResult(const ClusterQueryResult& result) {
+ auto resultObj = result.getResult();
+ if (resultObj) {
+ invariant(resultObj->isOwned());
+ }
+ _resultsQueue.push({result});
+}
+
+void RouterStageQueuedData::queueError(Status status) {
+ _resultsQueue.push({status});
+}
+
+StatusWith<ClusterQueryResult> RouterStageQueuedData::next() {
+ if (_resultsQueue.empty()) {
+ return {ClusterQueryResult()};
+ }
+
+ auto out = _resultsQueue.front();
+ _resultsQueue.pop();
+ return out;
+}
+
+void RouterStageQueuedData::kill(OperationContext* opCtx) {
+ // No child to kill.
+}
+
+bool RouterStageQueuedData::remotesExhausted() {
+ // No underlying remote cursor.
+ return true;
+}
+
+std::size_t RouterStageQueuedData::getNumRemotes() const {
+ return 0;
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_queued_data.h b/src/mongo/s/query/router_stage_queued_data.h
new file mode 100644
index 00000000000..bece74cdc86
--- /dev/null
+++ b/src/mongo/s/query/router_stage_queued_data.h
@@ -0,0 +1,71 @@
+/**
+ * Copyright (C) 2023-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/optional.hpp>
+#include <queue>
+
+#include "mongo/s/query/cluster_query_result.h"
+#include "mongo/s/query/router_exec_stage.h"
+
+namespace mongo {
+
+/**
+ * Initialized by adding results to its results queue, it then passes through the results in its
+ * queue until the queue is empty.
+ */
+class RouterStageQueuedData final : public RouterExecStage {
+public:
+ RouterStageQueuedData(OperationContext* opCtx) : RouterExecStage(opCtx) {}
+ ~RouterStageQueuedData() final {}
+
+ StatusWith<ClusterQueryResult> next() final;
+
+ void kill(OperationContext* opCtx) final;
+
+ bool remotesExhausted() final;
+
+ std::size_t getNumRemotes() const final;
+
+ /**
+ * Queues a BSONObj to be returned.
+ */
+ void queueResult(const ClusterQueryResult& result);
+
+ /**
+ * Queues an error response.
+ */
+ void queueError(Status status);
+
+private:
+ std::queue<StatusWith<ClusterQueryResult>> _resultsQueue;
+};
+
+} // namespace mongo