diff options
author | seanzimm <sean.zimmerman@mongodb.com> | 2023-01-30 18:58:14 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-01-30 21:17:40 +0000 |
commit | 20d59bdd62b679d282ee9e038290caa6306b35ca (patch) | |
tree | 365285e74f636ccf5d126b242bb3a4b86f0ced0b /src/mongo/db/commands/bulk_write.cpp | |
parent | 130b0081de32d24da58ed8aae2289d0ff00197be (diff) | |
download | mongo-20d59bdd62b679d282ee9e038290caa6306b35ca.tar.gz |
SERVER-72602: Populate Cursor Response for BulkWrite on Mongod
Diffstat (limited to 'src/mongo/db/commands/bulk_write.cpp')
-rw-r--r-- | src/mongo/db/commands/bulk_write.cpp | 202 |
1 files changed, 167 insertions, 35 deletions
diff --git a/src/mongo/db/commands/bulk_write.cpp b/src/mongo/db/commands/bulk_write.cpp index 2a0774397a7..0500e2ad144 100644 --- a/src/mongo/db/commands/bulk_write.cpp +++ b/src/mongo/db/commands/bulk_write.cpp @@ -39,9 +39,12 @@ #include "mongo/db/catalog/document_validation.h" #include "mongo/db/commands.h" #include "mongo/db/commands/bulk_write_gen.h" +#include "mongo/db/cursor_manager.h" +#include "mongo/db/exec/queued_data_stage.h" #include "mongo/db/not_primary_error_tracker.h" #include "mongo/db/ops/insert.h" #include "mongo/db/ops/write_ops_exec.h" +#include "mongo/db/query/plan_executor_factory.h" #include "mongo/db/query/query_knobs_gen.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/server_feature_flags_gen.h" @@ -61,11 +64,11 @@ namespace { class InsertBatch { public: using ReplyHandler = - std::function<void(OperationContext*, size_t, int, write_ops_exec::WriteResult&)>; + std::function<void(OperationContext*, size_t, write_ops_exec::WriteResult&)>; InsertBatch() = delete; InsertBatch(const BulkWriteCommandRequest& request, int capacity, ReplyHandler replyCallback) - : _req(request), _replyFn(replyCallback), _currentNs(), _batch() { + : _req(request), _replyFn(replyCallback), _currentNs(), _batch(), _firstOpIdx() { _batch.reserve(capacity); } @@ -74,11 +77,14 @@ public: } // Returns true if the write was successful and did not encounter errors. - bool flush(OperationContext* opCtx, size_t currentOpIdx) { + bool flush(OperationContext* opCtx) { if (empty()) { return true; } + invariant(_firstOpIdx); + invariant(_isDifferentFromSavedNamespace(NamespaceInfoEntry())); + write_ops_exec::WriteResult out; auto size = _batch.size(); out.results.reserve(size); @@ -94,7 +100,9 @@ public: &out, OperationSource::kStandard); _batch.clear(); - _replyFn(opCtx, currentOpIdx, size, out); + _replyFn(opCtx, _firstOpIdx.get(), out); + _currentNs = NamespaceInfoEntry(); + _firstOpIdx = boost::none; return out.canContinue; } @@ -111,17 +119,18 @@ public: // TODO SERVER-72682 refactor insertBatchAndHandleErrors to batch across namespaces. if (_isDifferentFromSavedNamespace(nsInfo)) { // Write the current batch since we have a different namespace to process. - if (!flush(opCtx, currentOpIdx)) { + if (!flush(opCtx)) { return false; } + invariant(empty()); _currentNs = nsInfo; + _firstOpIdx = currentOpIdx; } if (_addInsertToBatch(opCtx, stmtId, op)) { - if (!flush(opCtx, currentOpIdx)) { + if (!flush(opCtx)) { return false; } - _currentNs = NamespaceInfoEntry(); } return true; } @@ -131,6 +140,7 @@ private: ReplyHandler _replyFn; NamespaceInfoEntry _currentNs; std::vector<InsertStatement> _batch; + boost::optional<int> _firstOpIdx; bool _addInsertToBatch(OperationContext* opCtx, const int stmtId, const BSONObj& toInsert) { _batch.emplace_back(stmtId, toInsert); @@ -164,15 +174,35 @@ public: } void addInsertReplies(OperationContext* opCtx, - size_t currentOpIdx, - int numOps, + size_t firstOpIdx, write_ops_exec::WriteResult& writes) { - // TODO SERVER-72607 + invariant(!writes.results.empty()); + + for (size_t i = 0; i < writes.results.size(); ++i) { + auto idx = firstOpIdx + i; + // We do not pass in a proper numErrors since it causes unwanted truncation in error + // message generation. + if (auto error = write_ops_exec::generateError( + opCtx, writes.results[i].getStatus(), i, 0 /* numErrors */)) { + auto replyItem = BulkWriteReplyItem(0, idx); + replyItem.setCode(error.get().getStatus().code()); + replyItem.setErrmsg(StringData(error.get().getStatus().reason())); + _replies.emplace_back(replyItem); + } else { + auto replyItem = BulkWriteReplyItem(1, idx); + replyItem.setN(writes.results[i].getValue().getN()); + _replies.emplace_back(replyItem); + } + } } - void addUpdateDeleteReply(OperationContext* opCtx, - size_t currentOpIdx, - const SingleWriteResult& write) {} + void addUpdateReply(OperationContext* opCtx, + size_t currentOpIdx, + const SingleWriteResult& write) {} + + void addDeleteReply(OperationContext* opCtx, + size_t currentOpIdx, + const SingleWriteResult& write) {} std::vector<BulkWriteReplyItem>& getReplies() { return _replies; @@ -258,13 +288,16 @@ std::vector<BulkWriteReplyItem> performWrites(OperationContext* opCtx, // Construct reply handler callbacks. auto insertCB = [&responses](OperationContext* opCtx, int currentOpIdx, - int numOps, write_ops_exec::WriteResult& writes) { - responses.addInsertReplies(opCtx, currentOpIdx, numOps, writes); + responses.addInsertReplies(opCtx, currentOpIdx, writes); }; - auto updateDeleteCB = + auto updateCB = + [&responses](OperationContext* opCtx, int currentOpIdx, const SingleWriteResult& write) { + responses.addUpdateReply(opCtx, currentOpIdx, write); + }; + auto deleteCB = [&responses](OperationContext* opCtx, int currentOpIdx, const SingleWriteResult& write) { - responses.addUpdateDeleteReply(opCtx, currentOpIdx, write); + responses.addDeleteReply(opCtx, currentOpIdx, write); }; // Create a current insert batch. @@ -283,19 +316,19 @@ std::vector<BulkWriteReplyItem> performWrites(OperationContext* opCtx, } } else if (opType == kUpdate) { // Flush insert ops before handling update ops. - if (!batch.flush(opCtx, idx)) { + if (!batch.flush(opCtx)) { break; } - if (!handleUpdateOp(opCtx, req, idx, updateDeleteCB)) { + if (!handleUpdateOp(opCtx, req, idx, updateCB)) { // Update write failed can no longer continue. break; } } else { // Flush insert ops before handling delete ops. - if (!batch.flush(opCtx, idx)) { + if (!batch.flush(opCtx)) { break; } - if (!handleDeleteOp(opCtx, req, idx, updateDeleteCB)) { + if (!handleDeleteOp(opCtx, req, idx, deleteCB)) { // Delete write failed can no longer continue. break; } @@ -304,13 +337,24 @@ std::vector<BulkWriteReplyItem> performWrites(OperationContext* opCtx, // It does not matter if this final flush had errors or not since we finished processing // the last op already. - batch.flush(opCtx, idx); + batch.flush(opCtx); invariant(batch.empty()); return responses.getReplies(); } +bool haveSpaceForNext(const BSONObj& nextDoc, long long numDocs, size_t bytesBuffered) { + invariant(numDocs >= 0); + if (!numDocs) { + // Allow the first output document to exceed the limit to ensure we can always make + // progress. + return true; + } + + return (bytesBuffered + nextDoc.objsize()) <= BSONObjMaxUserSize; +} + class BulkWriteCmd : public BulkWriteCmdVersion1Gen<BulkWriteCmd> { public: bool adminOnly() const final { @@ -379,16 +423,24 @@ public: // Apply all of the write operations. auto replies = performWrites(opCtx, req); - // TODO SERVER-72607 break replies into multiple batches to create cursor. - auto reply = Reply(); - replies.emplace_back(1, 0); - reply.setCursor(BulkWriteCommandResponseCursor(0, replies)); - - return reply; + return _populateCursorReply(opCtx, req, std::move(replies)); } void doCheckAuthorization(OperationContext* opCtx) const final try { auto session = AuthorizationSession::get(opCtx->getClient()); + auto privileges = _getPrivileges(); + + // Make sure all privileges are authorized. + uassert(ErrorCodes::Unauthorized, + "unauthorized", + session->isAuthorizedForPrivileges(privileges)); + } catch (const DBException& ex) { + NotPrimaryErrorTracker::get(opCtx->getClient()).recordError(ex.code()); + throw; + } + + private: + std::vector<Privilege> _getPrivileges() const { const auto& ops = request().getOps(); const auto& nsInfo = request().getNsInfo(); @@ -420,13 +472,93 @@ public: privilege.addActions(newActions); } - // Make sure all privileges are authorized. - uassert(ErrorCodes::Unauthorized, - "unauthorized", - session->isAuthorizedForPrivileges(privileges)); - } catch (const DBException& ex) { - NotPrimaryErrorTracker::get(opCtx->getClient()).recordError(ex.code()); - throw; + return privileges; + } + + Reply _populateCursorReply(OperationContext* opCtx, + const BulkWriteCommandRequest& req, + std::vector<BulkWriteReplyItem> replies) { + const NamespaceString cursorNss = NamespaceString::makeBulkWriteNSS(); + auto expCtx = make_intrusive<ExpressionContext>( + opCtx, std::unique_ptr<CollatorInterface>(nullptr), ns()); + + std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec; + auto ws = std::make_unique<WorkingSet>(); + auto root = std::make_unique<QueuedDataStage>(expCtx.get(), ws.get()); + + for (auto& reply : replies) { + WorkingSetID id = ws->allocate(); + WorkingSetMember* member = ws->get(id); + member->keyData.clear(); + member->recordId = RecordId(); + member->resetDocument(SnapshotId(), reply.toBSON()); + member->transitionToOwnedObj(); + root->pushBack(id); + } + + exec = uassertStatusOK( + plan_executor_factory::make(expCtx, + std::move(ws), + std::move(root), + &CollectionPtr::null, + PlanYieldPolicy::YieldPolicy::NO_YIELD, + false, /* whether owned BSON must be returned */ + cursorNss)); + + + long long batchSize = std::numeric_limits<long long>::max(); + if (req.getCursor() && req.getCursor()->getBatchSize()) { + batchSize = *req.getCursor()->getBatchSize(); + } + + size_t numReplies = 0; + size_t bytesBuffered = 0; + for (long long objCount = 0; objCount < batchSize; objCount++) { + BSONObj nextDoc; + PlanExecutor::ExecState state = exec->getNext(&nextDoc, nullptr); + if (state == PlanExecutor::IS_EOF) { + break; + } + invariant(state == PlanExecutor::ADVANCED); + + // If we can't fit this result inside the current batch, then we stash it for + // later. + if (!haveSpaceForNext(nextDoc, objCount, bytesBuffered)) { + exec->stashResult(nextDoc); + break; + } + + numReplies++; + bytesBuffered += nextDoc.objsize(); + } + if (exec->isEOF()) { + invariant(numReplies == replies.size()); + return BulkWriteCommandReply(BulkWriteCommandResponseCursor( + 0, std::vector<BulkWriteReplyItem>(std::move(replies)))); + } + + exec->saveState(); + exec->detachFromOperationContext(); + + auto pinnedCursor = CursorManager::get(opCtx)->registerCursor( + opCtx, + {std::move(exec), + cursorNss, + AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserName(), + APIParameters::get(opCtx), + opCtx->getWriteConcern(), + repl::ReadConcernArgs::get(opCtx), + ReadPreferenceSetting::get(opCtx), + unparsedRequest().body, + _getPrivileges()}); + auto cursorId = pinnedCursor.getCursor()->cursorid(); + + pinnedCursor->incNBatches(); + pinnedCursor->incNReturnedSoFar(replies.size()); + + replies.resize(numReplies); + return BulkWriteCommandReply(BulkWriteCommandResponseCursor( + cursorId, std::vector<BulkWriteReplyItem>(std::move(replies)))); } }; |