summaryrefslogtreecommitdiff
path: root/src/mongo/db/commands/bulk_write.cpp
diff options
context:
space:
mode:
authorseanzimm <sean.zimmerman@mongodb.com>2023-01-30 18:58:14 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-01-30 21:17:40 +0000
commit20d59bdd62b679d282ee9e038290caa6306b35ca (patch)
tree365285e74f636ccf5d126b242bb3a4b86f0ced0b /src/mongo/db/commands/bulk_write.cpp
parent130b0081de32d24da58ed8aae2289d0ff00197be (diff)
downloadmongo-20d59bdd62b679d282ee9e038290caa6306b35ca.tar.gz
SERVER-72602: Populate Cursor Response for BulkWrite on Mongod
Diffstat (limited to 'src/mongo/db/commands/bulk_write.cpp')
-rw-r--r--src/mongo/db/commands/bulk_write.cpp202
1 files changed, 167 insertions, 35 deletions
diff --git a/src/mongo/db/commands/bulk_write.cpp b/src/mongo/db/commands/bulk_write.cpp
index 2a0774397a7..0500e2ad144 100644
--- a/src/mongo/db/commands/bulk_write.cpp
+++ b/src/mongo/db/commands/bulk_write.cpp
@@ -39,9 +39,12 @@
#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/commands.h"
#include "mongo/db/commands/bulk_write_gen.h"
+#include "mongo/db/cursor_manager.h"
+#include "mongo/db/exec/queued_data_stage.h"
#include "mongo/db/not_primary_error_tracker.h"
#include "mongo/db/ops/insert.h"
#include "mongo/db/ops/write_ops_exec.h"
+#include "mongo/db/query/plan_executor_factory.h"
#include "mongo/db/query/query_knobs_gen.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/server_feature_flags_gen.h"
@@ -61,11 +64,11 @@ namespace {
class InsertBatch {
public:
using ReplyHandler =
- std::function<void(OperationContext*, size_t, int, write_ops_exec::WriteResult&)>;
+ std::function<void(OperationContext*, size_t, write_ops_exec::WriteResult&)>;
InsertBatch() = delete;
InsertBatch(const BulkWriteCommandRequest& request, int capacity, ReplyHandler replyCallback)
- : _req(request), _replyFn(replyCallback), _currentNs(), _batch() {
+ : _req(request), _replyFn(replyCallback), _currentNs(), _batch(), _firstOpIdx() {
_batch.reserve(capacity);
}
@@ -74,11 +77,14 @@ public:
}
// Returns true if the write was successful and did not encounter errors.
- bool flush(OperationContext* opCtx, size_t currentOpIdx) {
+ bool flush(OperationContext* opCtx) {
if (empty()) {
return true;
}
+ invariant(_firstOpIdx);
+ invariant(_isDifferentFromSavedNamespace(NamespaceInfoEntry()));
+
write_ops_exec::WriteResult out;
auto size = _batch.size();
out.results.reserve(size);
@@ -94,7 +100,9 @@ public:
&out,
OperationSource::kStandard);
_batch.clear();
- _replyFn(opCtx, currentOpIdx, size, out);
+ _replyFn(opCtx, _firstOpIdx.get(), out);
+ _currentNs = NamespaceInfoEntry();
+ _firstOpIdx = boost::none;
return out.canContinue;
}
@@ -111,17 +119,18 @@ public:
// TODO SERVER-72682 refactor insertBatchAndHandleErrors to batch across namespaces.
if (_isDifferentFromSavedNamespace(nsInfo)) {
// Write the current batch since we have a different namespace to process.
- if (!flush(opCtx, currentOpIdx)) {
+ if (!flush(opCtx)) {
return false;
}
+ invariant(empty());
_currentNs = nsInfo;
+ _firstOpIdx = currentOpIdx;
}
if (_addInsertToBatch(opCtx, stmtId, op)) {
- if (!flush(opCtx, currentOpIdx)) {
+ if (!flush(opCtx)) {
return false;
}
- _currentNs = NamespaceInfoEntry();
}
return true;
}
@@ -131,6 +140,7 @@ private:
ReplyHandler _replyFn;
NamespaceInfoEntry _currentNs;
std::vector<InsertStatement> _batch;
+ boost::optional<int> _firstOpIdx;
bool _addInsertToBatch(OperationContext* opCtx, const int stmtId, const BSONObj& toInsert) {
_batch.emplace_back(stmtId, toInsert);
@@ -164,15 +174,35 @@ public:
}
void addInsertReplies(OperationContext* opCtx,
- size_t currentOpIdx,
- int numOps,
+ size_t firstOpIdx,
write_ops_exec::WriteResult& writes) {
- // TODO SERVER-72607
+ invariant(!writes.results.empty());
+
+ for (size_t i = 0; i < writes.results.size(); ++i) {
+ auto idx = firstOpIdx + i;
+ // We do not pass in a proper numErrors since it causes unwanted truncation in error
+ // message generation.
+ if (auto error = write_ops_exec::generateError(
+ opCtx, writes.results[i].getStatus(), i, 0 /* numErrors */)) {
+ auto replyItem = BulkWriteReplyItem(0, idx);
+ replyItem.setCode(error.get().getStatus().code());
+ replyItem.setErrmsg(StringData(error.get().getStatus().reason()));
+ _replies.emplace_back(replyItem);
+ } else {
+ auto replyItem = BulkWriteReplyItem(1, idx);
+ replyItem.setN(writes.results[i].getValue().getN());
+ _replies.emplace_back(replyItem);
+ }
+ }
}
- void addUpdateDeleteReply(OperationContext* opCtx,
- size_t currentOpIdx,
- const SingleWriteResult& write) {}
+ void addUpdateReply(OperationContext* opCtx,
+ size_t currentOpIdx,
+ const SingleWriteResult& write) {}
+
+ void addDeleteReply(OperationContext* opCtx,
+ size_t currentOpIdx,
+ const SingleWriteResult& write) {}
std::vector<BulkWriteReplyItem>& getReplies() {
return _replies;
@@ -258,13 +288,16 @@ std::vector<BulkWriteReplyItem> performWrites(OperationContext* opCtx,
// Construct reply handler callbacks.
auto insertCB = [&responses](OperationContext* opCtx,
int currentOpIdx,
- int numOps,
write_ops_exec::WriteResult& writes) {
- responses.addInsertReplies(opCtx, currentOpIdx, numOps, writes);
+ responses.addInsertReplies(opCtx, currentOpIdx, writes);
};
- auto updateDeleteCB =
+ auto updateCB =
+ [&responses](OperationContext* opCtx, int currentOpIdx, const SingleWriteResult& write) {
+ responses.addUpdateReply(opCtx, currentOpIdx, write);
+ };
+ auto deleteCB =
[&responses](OperationContext* opCtx, int currentOpIdx, const SingleWriteResult& write) {
- responses.addUpdateDeleteReply(opCtx, currentOpIdx, write);
+ responses.addDeleteReply(opCtx, currentOpIdx, write);
};
// Create a current insert batch.
@@ -283,19 +316,19 @@ std::vector<BulkWriteReplyItem> performWrites(OperationContext* opCtx,
}
} else if (opType == kUpdate) {
// Flush insert ops before handling update ops.
- if (!batch.flush(opCtx, idx)) {
+ if (!batch.flush(opCtx)) {
break;
}
- if (!handleUpdateOp(opCtx, req, idx, updateDeleteCB)) {
+ if (!handleUpdateOp(opCtx, req, idx, updateCB)) {
// Update write failed can no longer continue.
break;
}
} else {
// Flush insert ops before handling delete ops.
- if (!batch.flush(opCtx, idx)) {
+ if (!batch.flush(opCtx)) {
break;
}
- if (!handleDeleteOp(opCtx, req, idx, updateDeleteCB)) {
+ if (!handleDeleteOp(opCtx, req, idx, deleteCB)) {
// Delete write failed can no longer continue.
break;
}
@@ -304,13 +337,24 @@ std::vector<BulkWriteReplyItem> performWrites(OperationContext* opCtx,
// It does not matter if this final flush had errors or not since we finished processing
// the last op already.
- batch.flush(opCtx, idx);
+ batch.flush(opCtx);
invariant(batch.empty());
return responses.getReplies();
}
+bool haveSpaceForNext(const BSONObj& nextDoc, long long numDocs, size_t bytesBuffered) {
+ invariant(numDocs >= 0);
+ if (!numDocs) {
+ // Allow the first output document to exceed the limit to ensure we can always make
+ // progress.
+ return true;
+ }
+
+ return (bytesBuffered + nextDoc.objsize()) <= BSONObjMaxUserSize;
+}
+
class BulkWriteCmd : public BulkWriteCmdVersion1Gen<BulkWriteCmd> {
public:
bool adminOnly() const final {
@@ -379,16 +423,24 @@ public:
// Apply all of the write operations.
auto replies = performWrites(opCtx, req);
- // TODO SERVER-72607 break replies into multiple batches to create cursor.
- auto reply = Reply();
- replies.emplace_back(1, 0);
- reply.setCursor(BulkWriteCommandResponseCursor(0, replies));
-
- return reply;
+ return _populateCursorReply(opCtx, req, std::move(replies));
}
void doCheckAuthorization(OperationContext* opCtx) const final try {
auto session = AuthorizationSession::get(opCtx->getClient());
+ auto privileges = _getPrivileges();
+
+ // Make sure all privileges are authorized.
+ uassert(ErrorCodes::Unauthorized,
+ "unauthorized",
+ session->isAuthorizedForPrivileges(privileges));
+ } catch (const DBException& ex) {
+ NotPrimaryErrorTracker::get(opCtx->getClient()).recordError(ex.code());
+ throw;
+ }
+
+ private:
+ std::vector<Privilege> _getPrivileges() const {
const auto& ops = request().getOps();
const auto& nsInfo = request().getNsInfo();
@@ -420,13 +472,93 @@ public:
privilege.addActions(newActions);
}
- // Make sure all privileges are authorized.
- uassert(ErrorCodes::Unauthorized,
- "unauthorized",
- session->isAuthorizedForPrivileges(privileges));
- } catch (const DBException& ex) {
- NotPrimaryErrorTracker::get(opCtx->getClient()).recordError(ex.code());
- throw;
+ return privileges;
+ }
+
+ Reply _populateCursorReply(OperationContext* opCtx,
+ const BulkWriteCommandRequest& req,
+ std::vector<BulkWriteReplyItem> replies) {
+ const NamespaceString cursorNss = NamespaceString::makeBulkWriteNSS();
+ auto expCtx = make_intrusive<ExpressionContext>(
+ opCtx, std::unique_ptr<CollatorInterface>(nullptr), ns());
+
+ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec;
+ auto ws = std::make_unique<WorkingSet>();
+ auto root = std::make_unique<QueuedDataStage>(expCtx.get(), ws.get());
+
+ for (auto& reply : replies) {
+ WorkingSetID id = ws->allocate();
+ WorkingSetMember* member = ws->get(id);
+ member->keyData.clear();
+ member->recordId = RecordId();
+ member->resetDocument(SnapshotId(), reply.toBSON());
+ member->transitionToOwnedObj();
+ root->pushBack(id);
+ }
+
+ exec = uassertStatusOK(
+ plan_executor_factory::make(expCtx,
+ std::move(ws),
+ std::move(root),
+ &CollectionPtr::null,
+ PlanYieldPolicy::YieldPolicy::NO_YIELD,
+ false, /* whether owned BSON must be returned */
+ cursorNss));
+
+
+ long long batchSize = std::numeric_limits<long long>::max();
+ if (req.getCursor() && req.getCursor()->getBatchSize()) {
+ batchSize = *req.getCursor()->getBatchSize();
+ }
+
+ size_t numReplies = 0;
+ size_t bytesBuffered = 0;
+ for (long long objCount = 0; objCount < batchSize; objCount++) {
+ BSONObj nextDoc;
+ PlanExecutor::ExecState state = exec->getNext(&nextDoc, nullptr);
+ if (state == PlanExecutor::IS_EOF) {
+ break;
+ }
+ invariant(state == PlanExecutor::ADVANCED);
+
+ // If we can't fit this result inside the current batch, then we stash it for
+ // later.
+ if (!haveSpaceForNext(nextDoc, objCount, bytesBuffered)) {
+ exec->stashResult(nextDoc);
+ break;
+ }
+
+ numReplies++;
+ bytesBuffered += nextDoc.objsize();
+ }
+ if (exec->isEOF()) {
+ invariant(numReplies == replies.size());
+ return BulkWriteCommandReply(BulkWriteCommandResponseCursor(
+ 0, std::vector<BulkWriteReplyItem>(std::move(replies))));
+ }
+
+ exec->saveState();
+ exec->detachFromOperationContext();
+
+ auto pinnedCursor = CursorManager::get(opCtx)->registerCursor(
+ opCtx,
+ {std::move(exec),
+ cursorNss,
+ AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserName(),
+ APIParameters::get(opCtx),
+ opCtx->getWriteConcern(),
+ repl::ReadConcernArgs::get(opCtx),
+ ReadPreferenceSetting::get(opCtx),
+ unparsedRequest().body,
+ _getPrivileges()});
+ auto cursorId = pinnedCursor.getCursor()->cursorid();
+
+ pinnedCursor->incNBatches();
+ pinnedCursor->incNReturnedSoFar(replies.size());
+
+ replies.resize(numReplies);
+ return BulkWriteCommandReply(BulkWriteCommandResponseCursor(
+ cursorId, std::vector<BulkWriteReplyItem>(std::move(replies))));
}
};