From 20d59bdd62b679d282ee9e038290caa6306b35ca Mon Sep 17 00:00:00 2001 From: seanzimm Date: Mon, 30 Jan 2023 18:58:14 +0000 Subject: SERVER-72602: Populate Cursor Response for BulkWrite on Mongod --- ...ation_with_dollar_tenant_jscore_passthrough.yml | 2 + jstests/core/write/bulk/bulk_write.js | 69 ++++--- jstests/core/write/bulk/bulk_write_cursor.js | 144 +++++++++++++++ src/mongo/db/commands/SConscript | 1 + src/mongo/db/commands/bulk_write.cpp | 202 +++++++++++++++++---- src/mongo/db/commands/bulk_write.idl | 11 +- src/mongo/db/commands/write_commands.cpp | 87 ++------- src/mongo/db/namespace_string.cpp | 5 + src/mongo/db/namespace_string.h | 6 + src/mongo/db/ops/SConscript | 1 + src/mongo/db/ops/write_ops_exec.cpp | 51 ++++++ src/mongo/db/ops/write_ops_exec.h | 8 + 12 files changed, 443 insertions(+), 144 deletions(-) create mode 100644 jstests/core/write/bulk/bulk_write_cursor.js diff --git a/buildscripts/resmokeconfig/suites/native_tenant_data_isolation_with_dollar_tenant_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/native_tenant_data_isolation_with_dollar_tenant_jscore_passthrough.yml index 31fc7cba7d2..8aa1a2a1858 100644 --- a/buildscripts/resmokeconfig/suites/native_tenant_data_isolation_with_dollar_tenant_jscore_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/native_tenant_data_isolation_with_dollar_tenant_jscore_passthrough.yml @@ -76,6 +76,8 @@ selector: - jstests/core/txns/write_conflicts_with_non_txns.js # TODO SERVER-72357: cannot get the expected error due to an authorization contract issue. - jstests/core/txns/multi_statement_transaction_command_args.js + # TODO SERVER-72187: bulkWrite command does not support Tenant ID command + - jstests/core/write/bulk/bulk_write_cursor.js # TODO SERVER-73023 The tenantId is not attached to the namespace provided to failcommand # failpoint - jstests/core/failcommand_failpoint.js diff --git a/jstests/core/write/bulk/bulk_write.js b/jstests/core/write/bulk/bulk_write.js index a94744415ca..12ae3156542 100644 --- a/jstests/core/write/bulk/bulk_write.js +++ b/jstests/core/write/bulk/bulk_write.js @@ -5,7 +5,6 @@ * @tags: [ * assumes_against_mongod_not_mongos, * not_allowed_with_security_token, - * requires_fastcount, * # Until bulkWrite is compatible with retryable writes. * requires_non_retryable_writes, * # Command is not yet compatible with tenant migration. @@ -31,8 +30,8 @@ coll1.drop(); assert.commandWorked(db.adminCommand( {bulkWrite: 1, ops: [{insert: 0, document: {skey: "MongoDB"}}], nsInfo: [{ns: "test.coll"}]})); -assert.eq(coll.count(), 1); -assert.eq(coll1.count(), 0); +assert.eq(coll.find().itcount(), 1); +assert.eq(coll1.find().itcount(), 0); coll.drop(); // Make sure non-adminDB request fails @@ -43,8 +42,8 @@ assert.commandFailedWithCode(db.runCommand({ }), [ErrorCodes.Unauthorized]); -assert.eq(coll.count(), 0); -assert.eq(coll1.count(), 0); +assert.eq(coll.find().itcount(), 0); +assert.eq(coll1.find().itcount(), 0); // Make sure optional fields are accepted assert.commandWorked(db.adminCommand({ @@ -56,8 +55,8 @@ assert.commandWorked(db.adminCommand({ ordered: false })); -assert.eq(coll.count(), 1); -assert.eq(coll1.count(), 0); +assert.eq(coll.find().itcount(), 1); +assert.eq(coll1.find().itcount(), 0); coll.drop(); // Make sure invalid fields are not accepted @@ -72,8 +71,8 @@ assert.commandFailedWithCode(db.adminCommand({ }), [40415]); -assert.eq(coll.count(), 0); -assert.eq(coll1.count(), 0); +assert.eq(coll.find().itcount(), 0); +assert.eq(coll1.find().itcount(), 0); // Make sure ops and nsInfo can take arrays properly assert.commandWorked(db.adminCommand({ @@ -82,8 +81,8 @@ assert.commandWorked(db.adminCommand({ nsInfo: [{ns: "test.coll"}, {ns: "test.coll1"}] })); -assert.eq(coll.count(), 1); -assert.eq(coll1.count(), 1); +assert.eq(coll.find().itcount(), 1); +assert.eq(coll1.find().itcount(), 1); coll.drop(); coll1.drop(); @@ -94,8 +93,8 @@ assert.commandWorked(db.adminCommand({ nsInfo: [{ns: "test.coll"}] })); -assert.eq(coll.count(), 2); -assert.eq(coll1.count(), 0); +assert.eq(coll.find().itcount(), 2); +assert.eq(coll1.find().itcount(), 0); coll.drop(); // Make sure we fail if index out of range of nsInfo @@ -106,21 +105,21 @@ assert.commandFailedWithCode(db.adminCommand({ }), [ErrorCodes.BadValue]); -assert.eq(coll.count(), 0); -assert.eq(coll1.count(), 0); +assert.eq(coll.find().itcount(), 0); +assert.eq(coll1.find().itcount(), 0); // Missing ops assert.commandFailedWithCode(db.adminCommand({bulkWrite: 1, nsInfo: [{ns: "mydb.coll"}]}), [40414]); -assert.eq(coll.count(), 0); -assert.eq(coll1.count(), 0); +assert.eq(coll.find().itcount(), 0); +assert.eq(coll1.find().itcount(), 0); // Missing nsInfo assert.commandFailedWithCode( db.adminCommand({bulkWrite: 1, ops: [{insert: 0, document: {skey: "MongoDB"}}]}), [40414]); -assert.eq(coll.count(), 0); -assert.eq(coll1.count(), 0); +assert.eq(coll.find().itcount(), 0); +assert.eq(coll1.find().itcount(), 0); // Test valid arguments with invalid values assert.commandFailedWithCode(db.adminCommand({ @@ -130,39 +129,39 @@ assert.commandFailedWithCode(db.adminCommand({ }), [ErrorCodes.TypeMismatch]); -assert.eq(coll.count(), 0); -assert.eq(coll1.count(), 0); +assert.eq(coll.find().itcount(), 0); +assert.eq(coll1.find().itcount(), 0); assert.commandFailedWithCode( db.adminCommand( {bulkWrite: 1, ops: [{insert: 0, document: "test"}], nsInfo: [{ns: "test.coll"}]}), [ErrorCodes.TypeMismatch]); -assert.eq(coll.count(), 0); -assert.eq(coll1.count(), 0); +assert.eq(coll.find().itcount(), 0); +assert.eq(coll1.find().itcount(), 0); assert.commandFailedWithCode( db.adminCommand( {bulkWrite: 1, ops: [{insert: 0, document: {skey: "MongoDB"}}], nsInfo: ["test"]}), [ErrorCodes.TypeMismatch]); -assert.eq(coll.count(), 0); -assert.eq(coll1.count(), 0); +assert.eq(coll.find().itcount(), 0); +assert.eq(coll1.find().itcount(), 0); assert.commandFailedWithCode( db.adminCommand({bulkWrite: 1, ops: "test", nsInfo: [{ns: "test.coll"}]}), [ErrorCodes.TypeMismatch]); -assert.eq(coll.count(), 0); -assert.eq(coll1.count(), 0); +assert.eq(coll.find().itcount(), 0); +assert.eq(coll1.find().itcount(), 0); assert.commandFailedWithCode( db.adminCommand( {bulkWrite: 1, ops: [{insert: 0, document: {skey: "MongoDB"}}], nsInfo: "test"}), [ErrorCodes.TypeMismatch]); -assert.eq(coll.count(), 0); -assert.eq(coll1.count(), 0); +assert.eq(coll.find().itcount(), 0); +assert.eq(coll1.find().itcount(), 0); // Test 2 inserts into the same namespace assert.commandWorked(db.adminCommand({ @@ -171,8 +170,8 @@ assert.commandWorked(db.adminCommand({ nsInfo: [{ns: "test.coll"}] })); -assert.eq(coll.count(), 2); -assert.eq(coll1.count(), 0); +assert.eq(coll.find().itcount(), 2); +assert.eq(coll1.find().itcount(), 0); coll.drop(); // Test that a write can fail part way through a write and the write partially executes. @@ -186,8 +185,8 @@ assert.commandWorked(db.adminCommand({ nsInfo: [{ns: "test.coll"}, {ns: "test.coll1"}] })); -assert.eq(coll.count(), 1); -assert.eq(coll1.count(), 0); +assert.eq(coll.find().itcount(), 1); +assert.eq(coll1.find().itcount(), 0); coll.drop(); coll1.drop(); @@ -202,8 +201,8 @@ assert.commandWorked(db.adminCommand({ ordered: false })); -assert.eq(coll.count(), 1); -assert.eq(coll1.count(), 1); +assert.eq(coll.find().itcount(), 1); +assert.eq(coll1.find().itcount(), 1); coll.drop(); coll1.drop(); diff --git a/jstests/core/write/bulk/bulk_write_cursor.js b/jstests/core/write/bulk/bulk_write_cursor.js new file mode 100644 index 00000000000..a3a54aebe4e --- /dev/null +++ b/jstests/core/write/bulk/bulk_write_cursor.js @@ -0,0 +1,144 @@ +/** + * Tests bulk write cursor response for correct responses. + * + * The test runs commands that are not allowed with security token: bulkWrite. + * @tags: [ + * assumes_against_mongod_not_mongos, + * not_allowed_with_security_token, + * # Until bulkWrite is compatible with retryable writes. + * requires_non_retryable_writes, + * # Command is not yet compatible with tenant migration. + * tenant_migration_incompatible, + * ] + */ +(function() { +"use strict"; +load("jstests/libs/feature_flag_util.js"); + +// Skip this test if the BulkWriteCommand feature flag is not enabled. +if (!FeatureFlagUtil.isEnabled(db, "BulkWriteCommand")) { + jsTestLog('Skipping test because the BulkWriteCommand feature flag is disabled.'); + return; +} + +var coll = db.getCollection("coll"); +var coll1 = db.getCollection("coll1"); +coll.drop(); +coll1.drop(); + +const cursorEntryValidator = function(entry, expectedEntry) { + assert(entry.ok == expectedEntry.ok); + assert(entry.idx == expectedEntry.idx); + assert(entry.n == expectedEntry.n); + assert(entry.code == expectedEntry.code); +}; + +// Make sure a properly formed request has successful result. +var res = db.adminCommand( + {bulkWrite: 1, ops: [{insert: 0, document: {skey: "MongoDB"}}], nsInfo: [{ns: "test.coll"}]}); + +assert.commandWorked(res); + +assert(res.cursor.id == 0); +cursorEntryValidator(res.cursor.firstBatch[0], {ok: 1, n: 1, idx: 0}); +assert(!res.cursor.firstBatch[1]); + +assert.eq(coll.find().itcount(), 1); +assert.eq(coll1.find().itcount(), 0); + +coll.drop(); + +// Test getMore by setting batch size to 1 and running 2 inserts. +// Should end up with 1 insert return per batch. +res = db.adminCommand({ + bulkWrite: 1, + ops: [{insert: 1, document: {skey: "MongoDB"}}, {insert: 0, document: {skey: "MongoDB"}}], + nsInfo: [{ns: "test.coll"}, {ns: "test.coll1"}], + cursor: {batchSize: 1}, +}); + +assert.commandWorked(res); + +assert(res.cursor.id != 0); +cursorEntryValidator(res.cursor.firstBatch[0], {ok: 1, n: 1, idx: 0}); +assert(!res.cursor.firstBatch[1]); + +// First batch only had 1 of 2 responses so run a getMore to get the next batch. +var getMoreRes = + assert.commandWorked(db.adminCommand({getMore: res.cursor.id, collection: "$cmd.bulkWrite"})); + +assert(getMoreRes.cursor.id == 0); +cursorEntryValidator(res.cursor.firstBatch[0], {ok: 1, n: 1, idx: 0}); +assert(!getMoreRes.cursor.nextBatch[1]); + +assert.eq(coll.find().itcount(), 1); +assert.eq(coll1.find().itcount(), 1); +coll.drop(); +coll1.drop(); + +// Test internal batch size > 1. +res = db.adminCommand({ + bulkWrite: 1, + ops: [{insert: 0, document: {skey: "MongoDB"}}, {insert: 0, document: {skey: "MongoDB"}}], + nsInfo: [{ns: "test.coll"}] +}); + +assert.commandWorked(res); + +assert(res.cursor.id == 0); +cursorEntryValidator(res.cursor.firstBatch[0], {ok: 1, n: 1, idx: 0}); +cursorEntryValidator(res.cursor.firstBatch[1], {ok: 1, n: 1, idx: 1}); +assert(!res.cursor.firstBatch[2]); + +assert.eq(coll.find().itcount(), 2); +assert.eq(coll1.find().itcount(), 0); +coll.drop(); + +// Test that a write can fail part way through a write and the write partially executes. +res = db.adminCommand({ + bulkWrite: 1, + ops: [ + {insert: 0, document: {_id: 1, skey: "MongoDB"}}, + {insert: 0, document: {_id: 1, skey: "MongoDB"}}, + {insert: 1, document: {skey: "MongoDB"}} + ], + nsInfo: [{ns: "test.coll"}, {ns: "test.coll1"}] +}); + +assert.commandWorked(res); + +assert(res.cursor.id == 0); +cursorEntryValidator(res.cursor.firstBatch[0], {ok: 1, n: 1, idx: 0}); +cursorEntryValidator(res.cursor.firstBatch[1], {ok: 0, idx: 1, code: 11000}); +assert(!res.cursor.firstBatch[2]); + +assert.eq(coll.find().itcount(), 1); +assert.eq(coll1.find().itcount(), 0); +coll.drop(); +coll1.drop(); + +// Test that we continue processing after an error for ordered:false. +res = db.adminCommand({ + bulkWrite: 1, + ops: [ + {insert: 0, document: {_id: 1, skey: "MongoDB"}}, + {insert: 0, document: {_id: 1, skey: "MongoDB"}}, + {insert: 1, document: {skey: "MongoDB"}} + ], + nsInfo: [{ns: "test.coll"}, {ns: "test.coll1"}], + ordered: false +}); + +assert.commandWorked(res); + +assert(res.cursor.id == 0); +cursorEntryValidator(res.cursor.firstBatch[0], {ok: 1, n: 1, idx: 0}); +cursorEntryValidator(res.cursor.firstBatch[1], {ok: 0, idx: 1, code: 11000}); +cursorEntryValidator(res.cursor.firstBatch[2], {ok: 1, n: 1, idx: 2}); +assert(!res.cursor.firstBatch[3]); + +assert.eq(coll.find().itcount(), 1); +assert.eq(coll1.find().itcount(), 1); +coll.drop(); +coll1.drop(); +})(); diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index be378910ef2..c3ca11d290f 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -302,6 +302,7 @@ env.Library( '$BUILD_DIR/mongo/db/ops/write_ops', '$BUILD_DIR/mongo/db/ops/write_ops_exec', '$BUILD_DIR/mongo/db/ops/write_ops_parsers', + '$BUILD_DIR/mongo/db/query_exec', '$BUILD_DIR/mongo/db/server_base', '$BUILD_DIR/mongo/db/server_feature_flags', '$BUILD_DIR/mongo/db/server_options_core', diff --git a/src/mongo/db/commands/bulk_write.cpp b/src/mongo/db/commands/bulk_write.cpp index 2a0774397a7..0500e2ad144 100644 --- a/src/mongo/db/commands/bulk_write.cpp +++ b/src/mongo/db/commands/bulk_write.cpp @@ -39,9 +39,12 @@ #include "mongo/db/catalog/document_validation.h" #include "mongo/db/commands.h" #include "mongo/db/commands/bulk_write_gen.h" +#include "mongo/db/cursor_manager.h" +#include "mongo/db/exec/queued_data_stage.h" #include "mongo/db/not_primary_error_tracker.h" #include "mongo/db/ops/insert.h" #include "mongo/db/ops/write_ops_exec.h" +#include "mongo/db/query/plan_executor_factory.h" #include "mongo/db/query/query_knobs_gen.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/server_feature_flags_gen.h" @@ -61,11 +64,11 @@ namespace { class InsertBatch { public: using ReplyHandler = - std::function; + std::function; InsertBatch() = delete; InsertBatch(const BulkWriteCommandRequest& request, int capacity, ReplyHandler replyCallback) - : _req(request), _replyFn(replyCallback), _currentNs(), _batch() { + : _req(request), _replyFn(replyCallback), _currentNs(), _batch(), _firstOpIdx() { _batch.reserve(capacity); } @@ -74,11 +77,14 @@ public: } // Returns true if the write was successful and did not encounter errors. - bool flush(OperationContext* opCtx, size_t currentOpIdx) { + bool flush(OperationContext* opCtx) { if (empty()) { return true; } + invariant(_firstOpIdx); + invariant(_isDifferentFromSavedNamespace(NamespaceInfoEntry())); + write_ops_exec::WriteResult out; auto size = _batch.size(); out.results.reserve(size); @@ -94,7 +100,9 @@ public: &out, OperationSource::kStandard); _batch.clear(); - _replyFn(opCtx, currentOpIdx, size, out); + _replyFn(opCtx, _firstOpIdx.get(), out); + _currentNs = NamespaceInfoEntry(); + _firstOpIdx = boost::none; return out.canContinue; } @@ -111,17 +119,18 @@ public: // TODO SERVER-72682 refactor insertBatchAndHandleErrors to batch across namespaces. if (_isDifferentFromSavedNamespace(nsInfo)) { // Write the current batch since we have a different namespace to process. - if (!flush(opCtx, currentOpIdx)) { + if (!flush(opCtx)) { return false; } + invariant(empty()); _currentNs = nsInfo; + _firstOpIdx = currentOpIdx; } if (_addInsertToBatch(opCtx, stmtId, op)) { - if (!flush(opCtx, currentOpIdx)) { + if (!flush(opCtx)) { return false; } - _currentNs = NamespaceInfoEntry(); } return true; } @@ -131,6 +140,7 @@ private: ReplyHandler _replyFn; NamespaceInfoEntry _currentNs; std::vector _batch; + boost::optional _firstOpIdx; bool _addInsertToBatch(OperationContext* opCtx, const int stmtId, const BSONObj& toInsert) { _batch.emplace_back(stmtId, toInsert); @@ -164,15 +174,35 @@ public: } void addInsertReplies(OperationContext* opCtx, - size_t currentOpIdx, - int numOps, + size_t firstOpIdx, write_ops_exec::WriteResult& writes) { - // TODO SERVER-72607 + invariant(!writes.results.empty()); + + for (size_t i = 0; i < writes.results.size(); ++i) { + auto idx = firstOpIdx + i; + // We do not pass in a proper numErrors since it causes unwanted truncation in error + // message generation. + if (auto error = write_ops_exec::generateError( + opCtx, writes.results[i].getStatus(), i, 0 /* numErrors */)) { + auto replyItem = BulkWriteReplyItem(0, idx); + replyItem.setCode(error.get().getStatus().code()); + replyItem.setErrmsg(StringData(error.get().getStatus().reason())); + _replies.emplace_back(replyItem); + } else { + auto replyItem = BulkWriteReplyItem(1, idx); + replyItem.setN(writes.results[i].getValue().getN()); + _replies.emplace_back(replyItem); + } + } } - void addUpdateDeleteReply(OperationContext* opCtx, - size_t currentOpIdx, - const SingleWriteResult& write) {} + void addUpdateReply(OperationContext* opCtx, + size_t currentOpIdx, + const SingleWriteResult& write) {} + + void addDeleteReply(OperationContext* opCtx, + size_t currentOpIdx, + const SingleWriteResult& write) {} std::vector& getReplies() { return _replies; @@ -258,13 +288,16 @@ std::vector performWrites(OperationContext* opCtx, // Construct reply handler callbacks. auto insertCB = [&responses](OperationContext* opCtx, int currentOpIdx, - int numOps, write_ops_exec::WriteResult& writes) { - responses.addInsertReplies(opCtx, currentOpIdx, numOps, writes); + responses.addInsertReplies(opCtx, currentOpIdx, writes); }; - auto updateDeleteCB = + auto updateCB = + [&responses](OperationContext* opCtx, int currentOpIdx, const SingleWriteResult& write) { + responses.addUpdateReply(opCtx, currentOpIdx, write); + }; + auto deleteCB = [&responses](OperationContext* opCtx, int currentOpIdx, const SingleWriteResult& write) { - responses.addUpdateDeleteReply(opCtx, currentOpIdx, write); + responses.addDeleteReply(opCtx, currentOpIdx, write); }; // Create a current insert batch. @@ -283,19 +316,19 @@ std::vector performWrites(OperationContext* opCtx, } } else if (opType == kUpdate) { // Flush insert ops before handling update ops. - if (!batch.flush(opCtx, idx)) { + if (!batch.flush(opCtx)) { break; } - if (!handleUpdateOp(opCtx, req, idx, updateDeleteCB)) { + if (!handleUpdateOp(opCtx, req, idx, updateCB)) { // Update write failed can no longer continue. break; } } else { // Flush insert ops before handling delete ops. - if (!batch.flush(opCtx, idx)) { + if (!batch.flush(opCtx)) { break; } - if (!handleDeleteOp(opCtx, req, idx, updateDeleteCB)) { + if (!handleDeleteOp(opCtx, req, idx, deleteCB)) { // Delete write failed can no longer continue. break; } @@ -304,13 +337,24 @@ std::vector performWrites(OperationContext* opCtx, // It does not matter if this final flush had errors or not since we finished processing // the last op already. - batch.flush(opCtx, idx); + batch.flush(opCtx); invariant(batch.empty()); return responses.getReplies(); } +bool haveSpaceForNext(const BSONObj& nextDoc, long long numDocs, size_t bytesBuffered) { + invariant(numDocs >= 0); + if (!numDocs) { + // Allow the first output document to exceed the limit to ensure we can always make + // progress. + return true; + } + + return (bytesBuffered + nextDoc.objsize()) <= BSONObjMaxUserSize; +} + class BulkWriteCmd : public BulkWriteCmdVersion1Gen { public: bool adminOnly() const final { @@ -379,16 +423,24 @@ public: // Apply all of the write operations. auto replies = performWrites(opCtx, req); - // TODO SERVER-72607 break replies into multiple batches to create cursor. - auto reply = Reply(); - replies.emplace_back(1, 0); - reply.setCursor(BulkWriteCommandResponseCursor(0, replies)); - - return reply; + return _populateCursorReply(opCtx, req, std::move(replies)); } void doCheckAuthorization(OperationContext* opCtx) const final try { auto session = AuthorizationSession::get(opCtx->getClient()); + auto privileges = _getPrivileges(); + + // Make sure all privileges are authorized. + uassert(ErrorCodes::Unauthorized, + "unauthorized", + session->isAuthorizedForPrivileges(privileges)); + } catch (const DBException& ex) { + NotPrimaryErrorTracker::get(opCtx->getClient()).recordError(ex.code()); + throw; + } + + private: + std::vector _getPrivileges() const { const auto& ops = request().getOps(); const auto& nsInfo = request().getNsInfo(); @@ -420,13 +472,93 @@ public: privilege.addActions(newActions); } - // Make sure all privileges are authorized. - uassert(ErrorCodes::Unauthorized, - "unauthorized", - session->isAuthorizedForPrivileges(privileges)); - } catch (const DBException& ex) { - NotPrimaryErrorTracker::get(opCtx->getClient()).recordError(ex.code()); - throw; + return privileges; + } + + Reply _populateCursorReply(OperationContext* opCtx, + const BulkWriteCommandRequest& req, + std::vector replies) { + const NamespaceString cursorNss = NamespaceString::makeBulkWriteNSS(); + auto expCtx = make_intrusive( + opCtx, std::unique_ptr(nullptr), ns()); + + std::unique_ptr exec; + auto ws = std::make_unique(); + auto root = std::make_unique(expCtx.get(), ws.get()); + + for (auto& reply : replies) { + WorkingSetID id = ws->allocate(); + WorkingSetMember* member = ws->get(id); + member->keyData.clear(); + member->recordId = RecordId(); + member->resetDocument(SnapshotId(), reply.toBSON()); + member->transitionToOwnedObj(); + root->pushBack(id); + } + + exec = uassertStatusOK( + plan_executor_factory::make(expCtx, + std::move(ws), + std::move(root), + &CollectionPtr::null, + PlanYieldPolicy::YieldPolicy::NO_YIELD, + false, /* whether owned BSON must be returned */ + cursorNss)); + + + long long batchSize = std::numeric_limits::max(); + if (req.getCursor() && req.getCursor()->getBatchSize()) { + batchSize = *req.getCursor()->getBatchSize(); + } + + size_t numReplies = 0; + size_t bytesBuffered = 0; + for (long long objCount = 0; objCount < batchSize; objCount++) { + BSONObj nextDoc; + PlanExecutor::ExecState state = exec->getNext(&nextDoc, nullptr); + if (state == PlanExecutor::IS_EOF) { + break; + } + invariant(state == PlanExecutor::ADVANCED); + + // If we can't fit this result inside the current batch, then we stash it for + // later. + if (!haveSpaceForNext(nextDoc, objCount, bytesBuffered)) { + exec->stashResult(nextDoc); + break; + } + + numReplies++; + bytesBuffered += nextDoc.objsize(); + } + if (exec->isEOF()) { + invariant(numReplies == replies.size()); + return BulkWriteCommandReply(BulkWriteCommandResponseCursor( + 0, std::vector(std::move(replies)))); + } + + exec->saveState(); + exec->detachFromOperationContext(); + + auto pinnedCursor = CursorManager::get(opCtx)->registerCursor( + opCtx, + {std::move(exec), + cursorNss, + AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserName(), + APIParameters::get(opCtx), + opCtx->getWriteConcern(), + repl::ReadConcernArgs::get(opCtx), + ReadPreferenceSetting::get(opCtx), + unparsedRequest().body, + _getPrivileges()}); + auto cursorId = pinnedCursor.getCursor()->cursorid(); + + pinnedCursor->incNBatches(); + pinnedCursor->incNReturnedSoFar(replies.size()); + + replies.resize(numReplies); + return BulkWriteCommandReply(BulkWriteCommandResponseCursor( + cursorId, std::vector(std::move(replies)))); } }; diff --git a/src/mongo/db/commands/bulk_write.idl b/src/mongo/db/commands/bulk_write.idl index 22edb378042..379023ef2eb 100644 --- a/src/mongo/db/commands/bulk_write.idl +++ b/src/mongo/db/commands/bulk_write.idl @@ -99,7 +99,7 @@ structs: For update: number of documents that matched the query predicate. For delete: number of documents deleted." type: int - default: 0 + optional: true stability: unstable nModified: description: "Number of updated documents." @@ -114,10 +114,17 @@ structs: type: IDLAnyTypeOwned optional: true stability: unstable + code: + type: int + optional: true + stability: unstable + errmsg: + type: string + optional: true + stability: unstable BulkWriteCommandResponseCursor: description: "Cursor holding results for a successful 'bulkWrite' command." - strict: true fields: id: type: long diff --git a/src/mongo/db/commands/write_commands.cpp b/src/mongo/db/commands/write_commands.cpp index c4ba7d0c495..93658385367 100644 --- a/src/mongo/db/commands/write_commands.cpp +++ b/src/mongo/db/commands/write_commands.cpp @@ -61,8 +61,6 @@ #include "mongo/db/query/get_executor.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_coordinator.h" -#include "mongo/db/repl/tenant_migration_access_blocker_registry.h" -#include "mongo/db/repl/tenant_migration_access_blocker_util.h" #include "mongo/db/repl/tenant_migration_conflict_info.h" #include "mongo/db/repl/tenant_migration_decoration.h" #include "mongo/db/s/collection_sharding_state.h" @@ -96,7 +94,6 @@ namespace mongo { namespace { -MONGO_FAIL_POINT_DEFINE(hangWriteBeforeWaitingForMigrationDecision); MONGO_FAIL_POINT_DEFINE(hangInsertBeforeWrite); MONGO_FAIL_POINT_DEFINE(hangTimeseriesInsertBeforeCommit); MONGO_FAIL_POINT_DEFINE(hangTimeseriesInsertBeforeWrite); @@ -336,62 +333,6 @@ boost::optional> checkFailUnorderedTimeseriesInsertFailP return boost::none; } -boost::optional generateError(OperationContext* opCtx, - const Status& status, - int index, - size_t numErrors) { - if (status.isOK()) { - return boost::none; - } - - boost::optional overwrittenStatus; - - if (status == ErrorCodes::TenantMigrationConflict) { - hangWriteBeforeWaitingForMigrationDecision.pauseWhileSet(opCtx); - - overwrittenStatus.emplace( - tenant_migration_access_blocker::handleTenantMigrationConflict(opCtx, status)); - - // Interruption errors encountered during batch execution fail the entire batch, so throw on - // such errors here for consistency. - if (ErrorCodes::isInterruption(*overwrittenStatus)) { - uassertStatusOK(*overwrittenStatus); - } - - // Tenant migration errors, similarly to migration errors consume too much space in the - // ordered:false responses and get truncated. Since the call to - // 'handleTenantMigrationConflict' above replaces the original status, we need to manually - // truncate the new reason if the original 'status' was also truncated. - if (status.reason().empty()) { - overwrittenStatus = overwrittenStatus->withReason(""); - } - } - - constexpr size_t kMaxErrorReasonsToReport = 1; - constexpr size_t kMaxErrorSizeToReportAfterMaxReasonsReached = 1024 * 1024; - - if (numErrors > kMaxErrorReasonsToReport) { - size_t errorSize = - overwrittenStatus ? overwrittenStatus->reason().size() : status.reason().size(); - if (errorSize > kMaxErrorSizeToReportAfterMaxReasonsReached) - overwrittenStatus = - overwrittenStatus ? overwrittenStatus->withReason("") : status.withReason(""); - } - - if (overwrittenStatus) - return write_ops::WriteError(index, std::move(*overwrittenStatus)); - else - return write_ops::WriteError(index, status); -} - -template -boost::optional generateError(OperationContext* opCtx, - const StatusWith& result, - int index, - size_t numErrors) { - return generateError(opCtx, result.getStatus(), index, numErrors); -} - /** * Contains hooks that are used by 'populateReply' method. */ @@ -439,7 +380,8 @@ void populateReply(OperationContext* opCtx, long long nVal = 0; std::vector errors; for (size_t i = 0; i < result.results.size(); ++i) { - if (auto error = generateError(opCtx, result.results[i], i, errors.size())) { + if (auto error = write_ops_exec::generateError( + opCtx, result.results[i].getStatus(), i, errors.size())) { errors.emplace_back(std::move(*error)); continue; } @@ -837,8 +779,8 @@ public: if (performInsert) { const auto output = _performTimeseriesInsert(opCtx, batch, metadata, std::move(stmtIds)); - if (auto error = - generateError(opCtx, output.result, start + index, errors->size())) { + if (auto error = write_ops_exec::generateError( + opCtx, output.result.getStatus(), start + index, errors->size())) { errors->emplace_back(std::move(*error)); bucketCatalog.abort(batch, output.result.getStatus()); return output.canContinue; @@ -865,8 +807,8 @@ public: docsToRetry->push_back(index); opCtx->recoveryUnit()->abandonSnapshot(); return true; - } else if (auto error = - generateError(opCtx, output.result, start + index, errors->size())) { + } else if (auto error = write_ops_exec::generateError( + opCtx, output.result.getStatus(), start + index, errors->size())) { errors->emplace_back(std::move(*error)); bucketCatalog.abort(batch, output.result.getStatus()); return output.canContinue; @@ -881,8 +823,8 @@ public: if (closedBucket) { // If this write closed a bucket, compress the bucket auto output = _performTimeseriesBucketCompression(opCtx, *closedBucket); - if (auto error = - generateError(opCtx, output.result, start + index, errors->size())) { + if (auto error = write_ops_exec::generateError( + opCtx, output.result.getStatus(), start + index, errors->size())) { errors->emplace_back(std::move(*error)); return output.canContinue; } @@ -1117,8 +1059,8 @@ public: invariant(start + index < request().getDocuments().size()); if (rebuildOptionsError) { - const auto error{ - generateError(opCtx, *rebuildOptionsError, start + index, errors->size())}; + const auto error{write_ops_exec::generateError( + opCtx, *rebuildOptionsError, start + index, errors->size())}; errors->emplace_back(std::move(*error)); return false; } @@ -1245,7 +1187,8 @@ public: } while (!swResult.isOK() && (swResult.getStatus().code() == ErrorCodes::WriteConflict)); - if (auto error = generateError(opCtx, swResult, start + index, errors->size())) { + if (auto error = write_ops_exec::generateError( + opCtx, swResult.getStatus(), start + index, errors->size())) { invariant(swResult.getStatus().code() != ErrorCodes::WriteConflict); errors->emplace_back(std::move(*error)); return false; @@ -1267,8 +1210,8 @@ public: // If this write closed a bucket, compress the bucket auto ret = _performTimeseriesBucketCompression(opCtx, closedBucket); - if (auto error = - generateError(opCtx, ret.result, start + index, errors->size())) { + if (auto error = write_ops_exec::generateError( + opCtx, ret.result.getStatus(), start + index, errors->size())) { // Bucket compression only fail when we may not try to perform any other // write operation. When handleError() inside write_ops_exec.cpp return // false. @@ -1339,7 +1282,7 @@ public: opCtx->recoveryUnit()->abandonSnapshot(); continue; } - if (auto error = generateError( + if (auto error = write_ops_exec::generateError( opCtx, swCommitInfo.getStatus(), start + index, errors->size())) { errors->emplace_back(std::move(*error)); continue; diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp index dab24a3c482..0f0f228d6c0 100644 --- a/src/mongo/db/namespace_string.cpp +++ b/src/mongo/db/namespace_string.cpp @@ -43,6 +43,7 @@ namespace mongo { namespace { constexpr auto listCollectionsCursorCol = "$cmd.listCollections"_sd; +constexpr auto bulkWriteCursorCol = "$cmd.bulkWrite"_sd; constexpr auto collectionlessAggregateCursorCol = "$cmd.aggregate"_sd; constexpr auto dropPendingNSPrefix = "system.drop."_sd; @@ -321,6 +322,10 @@ bool NamespaceString::mustBeAppliedInOwnOplogBatch() const { _ns == kConfigsvrShardsNamespace.ns(); } +NamespaceString NamespaceString::makeBulkWriteNSS() { + return NamespaceString(kAdminDb, bulkWriteCursorCol); +} + NamespaceString NamespaceString::makeClusterParametersNSS( const boost::optional& tenantId) { return tenantId ? NamespaceString(tenantId, kConfigDb, "clusterParameters") diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h index 9c5049186a6..e9c93ea3876 100644 --- a/src/mongo/db/namespace_string.h +++ b/src/mongo/db/namespace_string.h @@ -396,6 +396,12 @@ public: */ static NamespaceString makeClusterParametersNSS(const boost::optional& tenantId); + /** + * Constructs a NamespaceString representing a BulkWrite namespace. The format for this + * namespace is admin.$cmd.bulkWrite". + */ + static NamespaceString makeBulkWriteNSS(); + /** * NOTE: DollarInDbNameBehavior::allow is deprecated. * diff --git a/src/mongo/db/ops/SConscript b/src/mongo/db/ops/SConscript index e9f3f42346f..7832627e9bd 100644 --- a/src/mongo/db/ops/SConscript +++ b/src/mongo/db/ops/SConscript @@ -63,6 +63,7 @@ env.Library( '$BUILD_DIR/mongo/db/record_id_helpers', '$BUILD_DIR/mongo/db/repl/oplog', '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', + '$BUILD_DIR/mongo/db/repl/tenant_migration_access_blocker', '$BUILD_DIR/mongo/db/s/query_analysis_writer', '$BUILD_DIR/mongo/db/shard_role', '$BUILD_DIR/mongo/db/stats/counters', diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index bd4084e0c32..89900de0cdd 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -61,6 +61,8 @@ #include "mongo/db/query/plan_summary_stats.h" #include "mongo/db/record_id_helpers.h" #include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/repl/tenant_migration_access_blocker_registry.h" +#include "mongo/db/repl/tenant_migration_access_blocker_util.h" #include "mongo/db/repl/tenant_migration_conflict_info.h" #include "mongo/db/repl/tenant_migration_decoration.h" #include "mongo/db/s/collection_sharding_state.h" @@ -98,6 +100,7 @@ MONGO_FAIL_POINT_DEFINE(failAllRemoves); MONGO_FAIL_POINT_DEFINE(hangBeforeChildRemoveOpFinishes); MONGO_FAIL_POINT_DEFINE(hangBeforeChildRemoveOpIsPopped); MONGO_FAIL_POINT_DEFINE(hangAfterAllChildRemoveOpsArePopped); +MONGO_FAIL_POINT_DEFINE(hangWriteBeforeWaitingForMigrationDecision); MONGO_FAIL_POINT_DEFINE(hangDuringBatchInsert); MONGO_FAIL_POINT_DEFINE(hangDuringBatchUpdate); MONGO_FAIL_POINT_DEFINE(hangAfterBatchUpdate); @@ -549,6 +552,54 @@ bool insertBatchAndHandleErrors(OperationContext* opCtx, return true; } +boost::optional generateError(OperationContext* opCtx, + const Status& status, + int index, + size_t numErrors) { + if (status.isOK()) { + return boost::none; + } + + boost::optional overwrittenStatus; + + if (status == ErrorCodes::TenantMigrationConflict) { + hangWriteBeforeWaitingForMigrationDecision.pauseWhileSet(opCtx); + + overwrittenStatus.emplace( + tenant_migration_access_blocker::handleTenantMigrationConflict(opCtx, status)); + + // Interruption errors encountered during batch execution fail the entire batch, so throw on + // such errors here for consistency. + if (ErrorCodes::isInterruption(*overwrittenStatus)) { + uassertStatusOK(*overwrittenStatus); + } + + // Tenant migration errors, similarly to migration errors consume too much space in the + // ordered:false responses and get truncated. Since the call to + // 'handleTenantMigrationConflict' above replaces the original status, we need to manually + // truncate the new reason if the original 'status' was also truncated. + if (status.reason().empty()) { + overwrittenStatus = overwrittenStatus->withReason(""); + } + } + + constexpr size_t kMaxErrorReasonsToReport = 1; + constexpr size_t kMaxErrorSizeToReportAfterMaxReasonsReached = 1024 * 1024; + + if (numErrors > kMaxErrorReasonsToReport) { + size_t errorSize = + overwrittenStatus ? overwrittenStatus->reason().size() : status.reason().size(); + if (errorSize > kMaxErrorSizeToReportAfterMaxReasonsReached) + overwrittenStatus = + overwrittenStatus ? overwrittenStatus->withReason("") : status.withReason(""); + } + + if (overwrittenStatus) + return write_ops::WriteError(index, std::move(*overwrittenStatus)); + else + return write_ops::WriteError(index, status); +} + WriteResult performInserts(OperationContext* opCtx, const write_ops::InsertCommandRequest& wholeOp, OperationSource source) { diff --git a/src/mongo/db/ops/write_ops_exec.h b/src/mongo/db/ops/write_ops_exec.h index 1e11b18ce81..4e8713bec0f 100644 --- a/src/mongo/db/ops/write_ops_exec.h +++ b/src/mongo/db/ops/write_ops_exec.h @@ -82,6 +82,14 @@ bool insertBatchAndHandleErrors(OperationContext* opCtx, WriteResult* out, OperationSource source); +/** + * Generates a WriteError for a given Status. + */ +boost::optional generateError(OperationContext* opCtx, + const Status& status, + int index, + size_t numErrors); + /** * Performs a batch of inserts, updates, or deletes. * -- cgit v1.2.1