summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorseanzimm <sean.zimmerman@mongodb.com>2023-01-30 18:58:14 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-01-30 21:17:40 +0000
commit20d59bdd62b679d282ee9e038290caa6306b35ca (patch)
tree365285e74f636ccf5d126b242bb3a4b86f0ced0b
parent130b0081de32d24da58ed8aae2289d0ff00197be (diff)
downloadmongo-20d59bdd62b679d282ee9e038290caa6306b35ca.tar.gz
SERVER-72602: Populate Cursor Response for BulkWrite on Mongod
-rw-r--r--buildscripts/resmokeconfig/suites/native_tenant_data_isolation_with_dollar_tenant_jscore_passthrough.yml2
-rw-r--r--jstests/core/write/bulk/bulk_write.js69
-rw-r--r--jstests/core/write/bulk/bulk_write_cursor.js144
-rw-r--r--src/mongo/db/commands/SConscript1
-rw-r--r--src/mongo/db/commands/bulk_write.cpp202
-rw-r--r--src/mongo/db/commands/bulk_write.idl11
-rw-r--r--src/mongo/db/commands/write_commands.cpp87
-rw-r--r--src/mongo/db/namespace_string.cpp5
-rw-r--r--src/mongo/db/namespace_string.h6
-rw-r--r--src/mongo/db/ops/SConscript1
-rw-r--r--src/mongo/db/ops/write_ops_exec.cpp51
-rw-r--r--src/mongo/db/ops/write_ops_exec.h8
12 files changed, 443 insertions, 144 deletions
diff --git a/buildscripts/resmokeconfig/suites/native_tenant_data_isolation_with_dollar_tenant_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/native_tenant_data_isolation_with_dollar_tenant_jscore_passthrough.yml
index 31fc7cba7d2..8aa1a2a1858 100644
--- a/buildscripts/resmokeconfig/suites/native_tenant_data_isolation_with_dollar_tenant_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/native_tenant_data_isolation_with_dollar_tenant_jscore_passthrough.yml
@@ -76,6 +76,8 @@ selector:
- jstests/core/txns/write_conflicts_with_non_txns.js
# TODO SERVER-72357: cannot get the expected error due to an authorization contract issue.
- jstests/core/txns/multi_statement_transaction_command_args.js
+ # TODO SERVER-72187: bulkWrite command does not support Tenant ID command
+ - jstests/core/write/bulk/bulk_write_cursor.js
# TODO SERVER-73023 The tenantId is not attached to the namespace provided to failcommand
# failpoint
- jstests/core/failcommand_failpoint.js
diff --git a/jstests/core/write/bulk/bulk_write.js b/jstests/core/write/bulk/bulk_write.js
index a94744415ca..12ae3156542 100644
--- a/jstests/core/write/bulk/bulk_write.js
+++ b/jstests/core/write/bulk/bulk_write.js
@@ -5,7 +5,6 @@
* @tags: [
* assumes_against_mongod_not_mongos,
* not_allowed_with_security_token,
- * requires_fastcount,
* # Until bulkWrite is compatible with retryable writes.
* requires_non_retryable_writes,
* # Command is not yet compatible with tenant migration.
@@ -31,8 +30,8 @@ coll1.drop();
assert.commandWorked(db.adminCommand(
{bulkWrite: 1, ops: [{insert: 0, document: {skey: "MongoDB"}}], nsInfo: [{ns: "test.coll"}]}));
-assert.eq(coll.count(), 1);
-assert.eq(coll1.count(), 0);
+assert.eq(coll.find().itcount(), 1);
+assert.eq(coll1.find().itcount(), 0);
coll.drop();
// Make sure non-adminDB request fails
@@ -43,8 +42,8 @@ assert.commandFailedWithCode(db.runCommand({
}),
[ErrorCodes.Unauthorized]);
-assert.eq(coll.count(), 0);
-assert.eq(coll1.count(), 0);
+assert.eq(coll.find().itcount(), 0);
+assert.eq(coll1.find().itcount(), 0);
// Make sure optional fields are accepted
assert.commandWorked(db.adminCommand({
@@ -56,8 +55,8 @@ assert.commandWorked(db.adminCommand({
ordered: false
}));
-assert.eq(coll.count(), 1);
-assert.eq(coll1.count(), 0);
+assert.eq(coll.find().itcount(), 1);
+assert.eq(coll1.find().itcount(), 0);
coll.drop();
// Make sure invalid fields are not accepted
@@ -72,8 +71,8 @@ assert.commandFailedWithCode(db.adminCommand({
}),
[40415]);
-assert.eq(coll.count(), 0);
-assert.eq(coll1.count(), 0);
+assert.eq(coll.find().itcount(), 0);
+assert.eq(coll1.find().itcount(), 0);
// Make sure ops and nsInfo can take arrays properly
assert.commandWorked(db.adminCommand({
@@ -82,8 +81,8 @@ assert.commandWorked(db.adminCommand({
nsInfo: [{ns: "test.coll"}, {ns: "test.coll1"}]
}));
-assert.eq(coll.count(), 1);
-assert.eq(coll1.count(), 1);
+assert.eq(coll.find().itcount(), 1);
+assert.eq(coll1.find().itcount(), 1);
coll.drop();
coll1.drop();
@@ -94,8 +93,8 @@ assert.commandWorked(db.adminCommand({
nsInfo: [{ns: "test.coll"}]
}));
-assert.eq(coll.count(), 2);
-assert.eq(coll1.count(), 0);
+assert.eq(coll.find().itcount(), 2);
+assert.eq(coll1.find().itcount(), 0);
coll.drop();
// Make sure we fail if index out of range of nsInfo
@@ -106,21 +105,21 @@ assert.commandFailedWithCode(db.adminCommand({
}),
[ErrorCodes.BadValue]);
-assert.eq(coll.count(), 0);
-assert.eq(coll1.count(), 0);
+assert.eq(coll.find().itcount(), 0);
+assert.eq(coll1.find().itcount(), 0);
// Missing ops
assert.commandFailedWithCode(db.adminCommand({bulkWrite: 1, nsInfo: [{ns: "mydb.coll"}]}), [40414]);
-assert.eq(coll.count(), 0);
-assert.eq(coll1.count(), 0);
+assert.eq(coll.find().itcount(), 0);
+assert.eq(coll1.find().itcount(), 0);
// Missing nsInfo
assert.commandFailedWithCode(
db.adminCommand({bulkWrite: 1, ops: [{insert: 0, document: {skey: "MongoDB"}}]}), [40414]);
-assert.eq(coll.count(), 0);
-assert.eq(coll1.count(), 0);
+assert.eq(coll.find().itcount(), 0);
+assert.eq(coll1.find().itcount(), 0);
// Test valid arguments with invalid values
assert.commandFailedWithCode(db.adminCommand({
@@ -130,39 +129,39 @@ assert.commandFailedWithCode(db.adminCommand({
}),
[ErrorCodes.TypeMismatch]);
-assert.eq(coll.count(), 0);
-assert.eq(coll1.count(), 0);
+assert.eq(coll.find().itcount(), 0);
+assert.eq(coll1.find().itcount(), 0);
assert.commandFailedWithCode(
db.adminCommand(
{bulkWrite: 1, ops: [{insert: 0, document: "test"}], nsInfo: [{ns: "test.coll"}]}),
[ErrorCodes.TypeMismatch]);
-assert.eq(coll.count(), 0);
-assert.eq(coll1.count(), 0);
+assert.eq(coll.find().itcount(), 0);
+assert.eq(coll1.find().itcount(), 0);
assert.commandFailedWithCode(
db.adminCommand(
{bulkWrite: 1, ops: [{insert: 0, document: {skey: "MongoDB"}}], nsInfo: ["test"]}),
[ErrorCodes.TypeMismatch]);
-assert.eq(coll.count(), 0);
-assert.eq(coll1.count(), 0);
+assert.eq(coll.find().itcount(), 0);
+assert.eq(coll1.find().itcount(), 0);
assert.commandFailedWithCode(
db.adminCommand({bulkWrite: 1, ops: "test", nsInfo: [{ns: "test.coll"}]}),
[ErrorCodes.TypeMismatch]);
-assert.eq(coll.count(), 0);
-assert.eq(coll1.count(), 0);
+assert.eq(coll.find().itcount(), 0);
+assert.eq(coll1.find().itcount(), 0);
assert.commandFailedWithCode(
db.adminCommand(
{bulkWrite: 1, ops: [{insert: 0, document: {skey: "MongoDB"}}], nsInfo: "test"}),
[ErrorCodes.TypeMismatch]);
-assert.eq(coll.count(), 0);
-assert.eq(coll1.count(), 0);
+assert.eq(coll.find().itcount(), 0);
+assert.eq(coll1.find().itcount(), 0);
// Test 2 inserts into the same namespace
assert.commandWorked(db.adminCommand({
@@ -171,8 +170,8 @@ assert.commandWorked(db.adminCommand({
nsInfo: [{ns: "test.coll"}]
}));
-assert.eq(coll.count(), 2);
-assert.eq(coll1.count(), 0);
+assert.eq(coll.find().itcount(), 2);
+assert.eq(coll1.find().itcount(), 0);
coll.drop();
// Test that a write can fail part way through a write and the write partially executes.
@@ -186,8 +185,8 @@ assert.commandWorked(db.adminCommand({
nsInfo: [{ns: "test.coll"}, {ns: "test.coll1"}]
}));
-assert.eq(coll.count(), 1);
-assert.eq(coll1.count(), 0);
+assert.eq(coll.find().itcount(), 1);
+assert.eq(coll1.find().itcount(), 0);
coll.drop();
coll1.drop();
@@ -202,8 +201,8 @@ assert.commandWorked(db.adminCommand({
ordered: false
}));
-assert.eq(coll.count(), 1);
-assert.eq(coll1.count(), 1);
+assert.eq(coll.find().itcount(), 1);
+assert.eq(coll1.find().itcount(), 1);
coll.drop();
coll1.drop();
diff --git a/jstests/core/write/bulk/bulk_write_cursor.js b/jstests/core/write/bulk/bulk_write_cursor.js
new file mode 100644
index 00000000000..a3a54aebe4e
--- /dev/null
+++ b/jstests/core/write/bulk/bulk_write_cursor.js
@@ -0,0 +1,144 @@
+/**
+ * Tests bulk write cursor response for correct responses.
+ *
+ * The test runs commands that are not allowed with security token: bulkWrite.
+ * @tags: [
+ * assumes_against_mongod_not_mongos,
+ * not_allowed_with_security_token,
+ * # Until bulkWrite is compatible with retryable writes.
+ * requires_non_retryable_writes,
+ * # Command is not yet compatible with tenant migration.
+ * tenant_migration_incompatible,
+ * ]
+ */
+(function() {
+"use strict";
+load("jstests/libs/feature_flag_util.js");
+
+// Skip this test if the BulkWriteCommand feature flag is not enabled.
+if (!FeatureFlagUtil.isEnabled(db, "BulkWriteCommand")) {
+ jsTestLog('Skipping test because the BulkWriteCommand feature flag is disabled.');
+ return;
+}
+
+var coll = db.getCollection("coll");
+var coll1 = db.getCollection("coll1");
+coll.drop();
+coll1.drop();
+
+const cursorEntryValidator = function(entry, expectedEntry) {
+ assert(entry.ok == expectedEntry.ok);
+ assert(entry.idx == expectedEntry.idx);
+ assert(entry.n == expectedEntry.n);
+ assert(entry.code == expectedEntry.code);
+};
+
+// Make sure a properly formed request has successful result.
+var res = db.adminCommand(
+ {bulkWrite: 1, ops: [{insert: 0, document: {skey: "MongoDB"}}], nsInfo: [{ns: "test.coll"}]});
+
+assert.commandWorked(res);
+
+assert(res.cursor.id == 0);
+cursorEntryValidator(res.cursor.firstBatch[0], {ok: 1, n: 1, idx: 0});
+assert(!res.cursor.firstBatch[1]);
+
+assert.eq(coll.find().itcount(), 1);
+assert.eq(coll1.find().itcount(), 0);
+
+coll.drop();
+
+// Test getMore by setting batch size to 1 and running 2 inserts.
+// Should end up with 1 insert return per batch.
+res = db.adminCommand({
+ bulkWrite: 1,
+ ops: [{insert: 1, document: {skey: "MongoDB"}}, {insert: 0, document: {skey: "MongoDB"}}],
+ nsInfo: [{ns: "test.coll"}, {ns: "test.coll1"}],
+ cursor: {batchSize: 1},
+});
+
+assert.commandWorked(res);
+
+assert(res.cursor.id != 0);
+cursorEntryValidator(res.cursor.firstBatch[0], {ok: 1, n: 1, idx: 0});
+assert(!res.cursor.firstBatch[1]);
+
+// First batch only had 1 of 2 responses so run a getMore to get the next batch.
+var getMoreRes =
+ assert.commandWorked(db.adminCommand({getMore: res.cursor.id, collection: "$cmd.bulkWrite"}));
+
+assert(getMoreRes.cursor.id == 0);
+cursorEntryValidator(res.cursor.firstBatch[0], {ok: 1, n: 1, idx: 0});
+assert(!getMoreRes.cursor.nextBatch[1]);
+
+assert.eq(coll.find().itcount(), 1);
+assert.eq(coll1.find().itcount(), 1);
+coll.drop();
+coll1.drop();
+
+// Test internal batch size > 1.
+res = db.adminCommand({
+ bulkWrite: 1,
+ ops: [{insert: 0, document: {skey: "MongoDB"}}, {insert: 0, document: {skey: "MongoDB"}}],
+ nsInfo: [{ns: "test.coll"}]
+});
+
+assert.commandWorked(res);
+
+assert(res.cursor.id == 0);
+cursorEntryValidator(res.cursor.firstBatch[0], {ok: 1, n: 1, idx: 0});
+cursorEntryValidator(res.cursor.firstBatch[1], {ok: 1, n: 1, idx: 1});
+assert(!res.cursor.firstBatch[2]);
+
+assert.eq(coll.find().itcount(), 2);
+assert.eq(coll1.find().itcount(), 0);
+coll.drop();
+
+// Test that a write can fail part way through a write and the write partially executes.
+res = db.adminCommand({
+ bulkWrite: 1,
+ ops: [
+ {insert: 0, document: {_id: 1, skey: "MongoDB"}},
+ {insert: 0, document: {_id: 1, skey: "MongoDB"}},
+ {insert: 1, document: {skey: "MongoDB"}}
+ ],
+ nsInfo: [{ns: "test.coll"}, {ns: "test.coll1"}]
+});
+
+assert.commandWorked(res);
+
+assert(res.cursor.id == 0);
+cursorEntryValidator(res.cursor.firstBatch[0], {ok: 1, n: 1, idx: 0});
+cursorEntryValidator(res.cursor.firstBatch[1], {ok: 0, idx: 1, code: 11000});
+assert(!res.cursor.firstBatch[2]);
+
+assert.eq(coll.find().itcount(), 1);
+assert.eq(coll1.find().itcount(), 0);
+coll.drop();
+coll1.drop();
+
+// Test that we continue processing after an error for ordered:false.
+res = db.adminCommand({
+ bulkWrite: 1,
+ ops: [
+ {insert: 0, document: {_id: 1, skey: "MongoDB"}},
+ {insert: 0, document: {_id: 1, skey: "MongoDB"}},
+ {insert: 1, document: {skey: "MongoDB"}}
+ ],
+ nsInfo: [{ns: "test.coll"}, {ns: "test.coll1"}],
+ ordered: false
+});
+
+assert.commandWorked(res);
+
+assert(res.cursor.id == 0);
+cursorEntryValidator(res.cursor.firstBatch[0], {ok: 1, n: 1, idx: 0});
+cursorEntryValidator(res.cursor.firstBatch[1], {ok: 0, idx: 1, code: 11000});
+cursorEntryValidator(res.cursor.firstBatch[2], {ok: 1, n: 1, idx: 2});
+assert(!res.cursor.firstBatch[3]);
+
+assert.eq(coll.find().itcount(), 1);
+assert.eq(coll1.find().itcount(), 1);
+coll.drop();
+coll1.drop();
+})();
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index be378910ef2..c3ca11d290f 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -302,6 +302,7 @@ env.Library(
'$BUILD_DIR/mongo/db/ops/write_ops',
'$BUILD_DIR/mongo/db/ops/write_ops_exec',
'$BUILD_DIR/mongo/db/ops/write_ops_parsers',
+ '$BUILD_DIR/mongo/db/query_exec',
'$BUILD_DIR/mongo/db/server_base',
'$BUILD_DIR/mongo/db/server_feature_flags',
'$BUILD_DIR/mongo/db/server_options_core',
diff --git a/src/mongo/db/commands/bulk_write.cpp b/src/mongo/db/commands/bulk_write.cpp
index 2a0774397a7..0500e2ad144 100644
--- a/src/mongo/db/commands/bulk_write.cpp
+++ b/src/mongo/db/commands/bulk_write.cpp
@@ -39,9 +39,12 @@
#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/commands.h"
#include "mongo/db/commands/bulk_write_gen.h"
+#include "mongo/db/cursor_manager.h"
+#include "mongo/db/exec/queued_data_stage.h"
#include "mongo/db/not_primary_error_tracker.h"
#include "mongo/db/ops/insert.h"
#include "mongo/db/ops/write_ops_exec.h"
+#include "mongo/db/query/plan_executor_factory.h"
#include "mongo/db/query/query_knobs_gen.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/server_feature_flags_gen.h"
@@ -61,11 +64,11 @@ namespace {
class InsertBatch {
public:
using ReplyHandler =
- std::function<void(OperationContext*, size_t, int, write_ops_exec::WriteResult&)>;
+ std::function<void(OperationContext*, size_t, write_ops_exec::WriteResult&)>;
InsertBatch() = delete;
InsertBatch(const BulkWriteCommandRequest& request, int capacity, ReplyHandler replyCallback)
- : _req(request), _replyFn(replyCallback), _currentNs(), _batch() {
+ : _req(request), _replyFn(replyCallback), _currentNs(), _batch(), _firstOpIdx() {
_batch.reserve(capacity);
}
@@ -74,11 +77,14 @@ public:
}
// Returns true if the write was successful and did not encounter errors.
- bool flush(OperationContext* opCtx, size_t currentOpIdx) {
+ bool flush(OperationContext* opCtx) {
if (empty()) {
return true;
}
+ invariant(_firstOpIdx);
+ invariant(_isDifferentFromSavedNamespace(NamespaceInfoEntry()));
+
write_ops_exec::WriteResult out;
auto size = _batch.size();
out.results.reserve(size);
@@ -94,7 +100,9 @@ public:
&out,
OperationSource::kStandard);
_batch.clear();
- _replyFn(opCtx, currentOpIdx, size, out);
+ _replyFn(opCtx, _firstOpIdx.get(), out);
+ _currentNs = NamespaceInfoEntry();
+ _firstOpIdx = boost::none;
return out.canContinue;
}
@@ -111,17 +119,18 @@ public:
// TODO SERVER-72682 refactor insertBatchAndHandleErrors to batch across namespaces.
if (_isDifferentFromSavedNamespace(nsInfo)) {
// Write the current batch since we have a different namespace to process.
- if (!flush(opCtx, currentOpIdx)) {
+ if (!flush(opCtx)) {
return false;
}
+ invariant(empty());
_currentNs = nsInfo;
+ _firstOpIdx = currentOpIdx;
}
if (_addInsertToBatch(opCtx, stmtId, op)) {
- if (!flush(opCtx, currentOpIdx)) {
+ if (!flush(opCtx)) {
return false;
}
- _currentNs = NamespaceInfoEntry();
}
return true;
}
@@ -131,6 +140,7 @@ private:
ReplyHandler _replyFn;
NamespaceInfoEntry _currentNs;
std::vector<InsertStatement> _batch;
+ boost::optional<int> _firstOpIdx;
bool _addInsertToBatch(OperationContext* opCtx, const int stmtId, const BSONObj& toInsert) {
_batch.emplace_back(stmtId, toInsert);
@@ -164,15 +174,35 @@ public:
}
void addInsertReplies(OperationContext* opCtx,
- size_t currentOpIdx,
- int numOps,
+ size_t firstOpIdx,
write_ops_exec::WriteResult& writes) {
- // TODO SERVER-72607
+ invariant(!writes.results.empty());
+
+ for (size_t i = 0; i < writes.results.size(); ++i) {
+ auto idx = firstOpIdx + i;
+ // We do not pass in a proper numErrors since it causes unwanted truncation in error
+ // message generation.
+ if (auto error = write_ops_exec::generateError(
+ opCtx, writes.results[i].getStatus(), i, 0 /* numErrors */)) {
+ auto replyItem = BulkWriteReplyItem(0, idx);
+ replyItem.setCode(error.get().getStatus().code());
+ replyItem.setErrmsg(StringData(error.get().getStatus().reason()));
+ _replies.emplace_back(replyItem);
+ } else {
+ auto replyItem = BulkWriteReplyItem(1, idx);
+ replyItem.setN(writes.results[i].getValue().getN());
+ _replies.emplace_back(replyItem);
+ }
+ }
}
- void addUpdateDeleteReply(OperationContext* opCtx,
- size_t currentOpIdx,
- const SingleWriteResult& write) {}
+ void addUpdateReply(OperationContext* opCtx,
+ size_t currentOpIdx,
+ const SingleWriteResult& write) {}
+
+ void addDeleteReply(OperationContext* opCtx,
+ size_t currentOpIdx,
+ const SingleWriteResult& write) {}
std::vector<BulkWriteReplyItem>& getReplies() {
return _replies;
@@ -258,13 +288,16 @@ std::vector<BulkWriteReplyItem> performWrites(OperationContext* opCtx,
// Construct reply handler callbacks.
auto insertCB = [&responses](OperationContext* opCtx,
int currentOpIdx,
- int numOps,
write_ops_exec::WriteResult& writes) {
- responses.addInsertReplies(opCtx, currentOpIdx, numOps, writes);
+ responses.addInsertReplies(opCtx, currentOpIdx, writes);
};
- auto updateDeleteCB =
+ auto updateCB =
+ [&responses](OperationContext* opCtx, int currentOpIdx, const SingleWriteResult& write) {
+ responses.addUpdateReply(opCtx, currentOpIdx, write);
+ };
+ auto deleteCB =
[&responses](OperationContext* opCtx, int currentOpIdx, const SingleWriteResult& write) {
- responses.addUpdateDeleteReply(opCtx, currentOpIdx, write);
+ responses.addDeleteReply(opCtx, currentOpIdx, write);
};
// Create a current insert batch.
@@ -283,19 +316,19 @@ std::vector<BulkWriteReplyItem> performWrites(OperationContext* opCtx,
}
} else if (opType == kUpdate) {
// Flush insert ops before handling update ops.
- if (!batch.flush(opCtx, idx)) {
+ if (!batch.flush(opCtx)) {
break;
}
- if (!handleUpdateOp(opCtx, req, idx, updateDeleteCB)) {
+ if (!handleUpdateOp(opCtx, req, idx, updateCB)) {
// Update write failed can no longer continue.
break;
}
} else {
// Flush insert ops before handling delete ops.
- if (!batch.flush(opCtx, idx)) {
+ if (!batch.flush(opCtx)) {
break;
}
- if (!handleDeleteOp(opCtx, req, idx, updateDeleteCB)) {
+ if (!handleDeleteOp(opCtx, req, idx, deleteCB)) {
// Delete write failed can no longer continue.
break;
}
@@ -304,13 +337,24 @@ std::vector<BulkWriteReplyItem> performWrites(OperationContext* opCtx,
// It does not matter if this final flush had errors or not since we finished processing
// the last op already.
- batch.flush(opCtx, idx);
+ batch.flush(opCtx);
invariant(batch.empty());
return responses.getReplies();
}
+bool haveSpaceForNext(const BSONObj& nextDoc, long long numDocs, size_t bytesBuffered) {
+ invariant(numDocs >= 0);
+ if (!numDocs) {
+ // Allow the first output document to exceed the limit to ensure we can always make
+ // progress.
+ return true;
+ }
+
+ return (bytesBuffered + nextDoc.objsize()) <= BSONObjMaxUserSize;
+}
+
class BulkWriteCmd : public BulkWriteCmdVersion1Gen<BulkWriteCmd> {
public:
bool adminOnly() const final {
@@ -379,16 +423,24 @@ public:
// Apply all of the write operations.
auto replies = performWrites(opCtx, req);
- // TODO SERVER-72607 break replies into multiple batches to create cursor.
- auto reply = Reply();
- replies.emplace_back(1, 0);
- reply.setCursor(BulkWriteCommandResponseCursor(0, replies));
-
- return reply;
+ return _populateCursorReply(opCtx, req, std::move(replies));
}
void doCheckAuthorization(OperationContext* opCtx) const final try {
auto session = AuthorizationSession::get(opCtx->getClient());
+ auto privileges = _getPrivileges();
+
+ // Make sure all privileges are authorized.
+ uassert(ErrorCodes::Unauthorized,
+ "unauthorized",
+ session->isAuthorizedForPrivileges(privileges));
+ } catch (const DBException& ex) {
+ NotPrimaryErrorTracker::get(opCtx->getClient()).recordError(ex.code());
+ throw;
+ }
+
+ private:
+ std::vector<Privilege> _getPrivileges() const {
const auto& ops = request().getOps();
const auto& nsInfo = request().getNsInfo();
@@ -420,13 +472,93 @@ public:
privilege.addActions(newActions);
}
- // Make sure all privileges are authorized.
- uassert(ErrorCodes::Unauthorized,
- "unauthorized",
- session->isAuthorizedForPrivileges(privileges));
- } catch (const DBException& ex) {
- NotPrimaryErrorTracker::get(opCtx->getClient()).recordError(ex.code());
- throw;
+ return privileges;
+ }
+
+ Reply _populateCursorReply(OperationContext* opCtx,
+ const BulkWriteCommandRequest& req,
+ std::vector<BulkWriteReplyItem> replies) {
+ const NamespaceString cursorNss = NamespaceString::makeBulkWriteNSS();
+ auto expCtx = make_intrusive<ExpressionContext>(
+ opCtx, std::unique_ptr<CollatorInterface>(nullptr), ns());
+
+ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec;
+ auto ws = std::make_unique<WorkingSet>();
+ auto root = std::make_unique<QueuedDataStage>(expCtx.get(), ws.get());
+
+ for (auto& reply : replies) {
+ WorkingSetID id = ws->allocate();
+ WorkingSetMember* member = ws->get(id);
+ member->keyData.clear();
+ member->recordId = RecordId();
+ member->resetDocument(SnapshotId(), reply.toBSON());
+ member->transitionToOwnedObj();
+ root->pushBack(id);
+ }
+
+ exec = uassertStatusOK(
+ plan_executor_factory::make(expCtx,
+ std::move(ws),
+ std::move(root),
+ &CollectionPtr::null,
+ PlanYieldPolicy::YieldPolicy::NO_YIELD,
+ false, /* whether owned BSON must be returned */
+ cursorNss));
+
+
+ long long batchSize = std::numeric_limits<long long>::max();
+ if (req.getCursor() && req.getCursor()->getBatchSize()) {
+ batchSize = *req.getCursor()->getBatchSize();
+ }
+
+ size_t numReplies = 0;
+ size_t bytesBuffered = 0;
+ for (long long objCount = 0; objCount < batchSize; objCount++) {
+ BSONObj nextDoc;
+ PlanExecutor::ExecState state = exec->getNext(&nextDoc, nullptr);
+ if (state == PlanExecutor::IS_EOF) {
+ break;
+ }
+ invariant(state == PlanExecutor::ADVANCED);
+
+ // If we can't fit this result inside the current batch, then we stash it for
+ // later.
+ if (!haveSpaceForNext(nextDoc, objCount, bytesBuffered)) {
+ exec->stashResult(nextDoc);
+ break;
+ }
+
+ numReplies++;
+ bytesBuffered += nextDoc.objsize();
+ }
+ if (exec->isEOF()) {
+ invariant(numReplies == replies.size());
+ return BulkWriteCommandReply(BulkWriteCommandResponseCursor(
+ 0, std::vector<BulkWriteReplyItem>(std::move(replies))));
+ }
+
+ exec->saveState();
+ exec->detachFromOperationContext();
+
+ auto pinnedCursor = CursorManager::get(opCtx)->registerCursor(
+ opCtx,
+ {std::move(exec),
+ cursorNss,
+ AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserName(),
+ APIParameters::get(opCtx),
+ opCtx->getWriteConcern(),
+ repl::ReadConcernArgs::get(opCtx),
+ ReadPreferenceSetting::get(opCtx),
+ unparsedRequest().body,
+ _getPrivileges()});
+ auto cursorId = pinnedCursor.getCursor()->cursorid();
+
+ pinnedCursor->incNBatches();
+ pinnedCursor->incNReturnedSoFar(replies.size());
+
+ replies.resize(numReplies);
+ return BulkWriteCommandReply(BulkWriteCommandResponseCursor(
+ cursorId, std::vector<BulkWriteReplyItem>(std::move(replies))));
}
};
diff --git a/src/mongo/db/commands/bulk_write.idl b/src/mongo/db/commands/bulk_write.idl
index 22edb378042..379023ef2eb 100644
--- a/src/mongo/db/commands/bulk_write.idl
+++ b/src/mongo/db/commands/bulk_write.idl
@@ -99,7 +99,7 @@ structs:
For update: number of documents that matched the query predicate.
For delete: number of documents deleted."
type: int
- default: 0
+ optional: true
stability: unstable
nModified:
description: "Number of updated documents."
@@ -114,10 +114,17 @@ structs:
type: IDLAnyTypeOwned
optional: true
stability: unstable
+ code:
+ type: int
+ optional: true
+ stability: unstable
+ errmsg:
+ type: string
+ optional: true
+ stability: unstable
BulkWriteCommandResponseCursor:
description: "Cursor holding results for a successful 'bulkWrite' command."
- strict: true
fields:
id:
type: long
diff --git a/src/mongo/db/commands/write_commands.cpp b/src/mongo/db/commands/write_commands.cpp
index c4ba7d0c495..93658385367 100644
--- a/src/mongo/db/commands/write_commands.cpp
+++ b/src/mongo/db/commands/write_commands.cpp
@@ -61,8 +61,6 @@
#include "mongo/db/query/get_executor.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_coordinator.h"
-#include "mongo/db/repl/tenant_migration_access_blocker_registry.h"
-#include "mongo/db/repl/tenant_migration_access_blocker_util.h"
#include "mongo/db/repl/tenant_migration_conflict_info.h"
#include "mongo/db/repl/tenant_migration_decoration.h"
#include "mongo/db/s/collection_sharding_state.h"
@@ -96,7 +94,6 @@
namespace mongo {
namespace {
-MONGO_FAIL_POINT_DEFINE(hangWriteBeforeWaitingForMigrationDecision);
MONGO_FAIL_POINT_DEFINE(hangInsertBeforeWrite);
MONGO_FAIL_POINT_DEFINE(hangTimeseriesInsertBeforeCommit);
MONGO_FAIL_POINT_DEFINE(hangTimeseriesInsertBeforeWrite);
@@ -336,62 +333,6 @@ boost::optional<std::pair<Status, bool>> checkFailUnorderedTimeseriesInsertFailP
return boost::none;
}
-boost::optional<write_ops::WriteError> generateError(OperationContext* opCtx,
- const Status& status,
- int index,
- size_t numErrors) {
- if (status.isOK()) {
- return boost::none;
- }
-
- boost::optional<Status> overwrittenStatus;
-
- if (status == ErrorCodes::TenantMigrationConflict) {
- hangWriteBeforeWaitingForMigrationDecision.pauseWhileSet(opCtx);
-
- overwrittenStatus.emplace(
- tenant_migration_access_blocker::handleTenantMigrationConflict(opCtx, status));
-
- // Interruption errors encountered during batch execution fail the entire batch, so throw on
- // such errors here for consistency.
- if (ErrorCodes::isInterruption(*overwrittenStatus)) {
- uassertStatusOK(*overwrittenStatus);
- }
-
- // Tenant migration errors, similarly to migration errors consume too much space in the
- // ordered:false responses and get truncated. Since the call to
- // 'handleTenantMigrationConflict' above replaces the original status, we need to manually
- // truncate the new reason if the original 'status' was also truncated.
- if (status.reason().empty()) {
- overwrittenStatus = overwrittenStatus->withReason("");
- }
- }
-
- constexpr size_t kMaxErrorReasonsToReport = 1;
- constexpr size_t kMaxErrorSizeToReportAfterMaxReasonsReached = 1024 * 1024;
-
- if (numErrors > kMaxErrorReasonsToReport) {
- size_t errorSize =
- overwrittenStatus ? overwrittenStatus->reason().size() : status.reason().size();
- if (errorSize > kMaxErrorSizeToReportAfterMaxReasonsReached)
- overwrittenStatus =
- overwrittenStatus ? overwrittenStatus->withReason("") : status.withReason("");
- }
-
- if (overwrittenStatus)
- return write_ops::WriteError(index, std::move(*overwrittenStatus));
- else
- return write_ops::WriteError(index, status);
-}
-
-template <typename T>
-boost::optional<write_ops::WriteError> generateError(OperationContext* opCtx,
- const StatusWith<T>& result,
- int index,
- size_t numErrors) {
- return generateError(opCtx, result.getStatus(), index, numErrors);
-}
-
/**
* Contains hooks that are used by 'populateReply' method.
*/
@@ -439,7 +380,8 @@ void populateReply(OperationContext* opCtx,
long long nVal = 0;
std::vector<write_ops::WriteError> errors;
for (size_t i = 0; i < result.results.size(); ++i) {
- if (auto error = generateError(opCtx, result.results[i], i, errors.size())) {
+ if (auto error = write_ops_exec::generateError(
+ opCtx, result.results[i].getStatus(), i, errors.size())) {
errors.emplace_back(std::move(*error));
continue;
}
@@ -837,8 +779,8 @@ public:
if (performInsert) {
const auto output =
_performTimeseriesInsert(opCtx, batch, metadata, std::move(stmtIds));
- if (auto error =
- generateError(opCtx, output.result, start + index, errors->size())) {
+ if (auto error = write_ops_exec::generateError(
+ opCtx, output.result.getStatus(), start + index, errors->size())) {
errors->emplace_back(std::move(*error));
bucketCatalog.abort(batch, output.result.getStatus());
return output.canContinue;
@@ -865,8 +807,8 @@ public:
docsToRetry->push_back(index);
opCtx->recoveryUnit()->abandonSnapshot();
return true;
- } else if (auto error =
- generateError(opCtx, output.result, start + index, errors->size())) {
+ } else if (auto error = write_ops_exec::generateError(
+ opCtx, output.result.getStatus(), start + index, errors->size())) {
errors->emplace_back(std::move(*error));
bucketCatalog.abort(batch, output.result.getStatus());
return output.canContinue;
@@ -881,8 +823,8 @@ public:
if (closedBucket) {
// If this write closed a bucket, compress the bucket
auto output = _performTimeseriesBucketCompression(opCtx, *closedBucket);
- if (auto error =
- generateError(opCtx, output.result, start + index, errors->size())) {
+ if (auto error = write_ops_exec::generateError(
+ opCtx, output.result.getStatus(), start + index, errors->size())) {
errors->emplace_back(std::move(*error));
return output.canContinue;
}
@@ -1117,8 +1059,8 @@ public:
invariant(start + index < request().getDocuments().size());
if (rebuildOptionsError) {
- const auto error{
- generateError(opCtx, *rebuildOptionsError, start + index, errors->size())};
+ const auto error{write_ops_exec::generateError(
+ opCtx, *rebuildOptionsError, start + index, errors->size())};
errors->emplace_back(std::move(*error));
return false;
}
@@ -1245,7 +1187,8 @@ public:
} while (!swResult.isOK() &&
(swResult.getStatus().code() == ErrorCodes::WriteConflict));
- if (auto error = generateError(opCtx, swResult, start + index, errors->size())) {
+ if (auto error = write_ops_exec::generateError(
+ opCtx, swResult.getStatus(), start + index, errors->size())) {
invariant(swResult.getStatus().code() != ErrorCodes::WriteConflict);
errors->emplace_back(std::move(*error));
return false;
@@ -1267,8 +1210,8 @@ public:
// If this write closed a bucket, compress the bucket
auto ret = _performTimeseriesBucketCompression(opCtx, closedBucket);
- if (auto error =
- generateError(opCtx, ret.result, start + index, errors->size())) {
+ if (auto error = write_ops_exec::generateError(
+ opCtx, ret.result.getStatus(), start + index, errors->size())) {
// Bucket compression only fail when we may not try to perform any other
// write operation. When handleError() inside write_ops_exec.cpp return
// false.
@@ -1339,7 +1282,7 @@ public:
opCtx->recoveryUnit()->abandonSnapshot();
continue;
}
- if (auto error = generateError(
+ if (auto error = write_ops_exec::generateError(
opCtx, swCommitInfo.getStatus(), start + index, errors->size())) {
errors->emplace_back(std::move(*error));
continue;
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp
index dab24a3c482..0f0f228d6c0 100644
--- a/src/mongo/db/namespace_string.cpp
+++ b/src/mongo/db/namespace_string.cpp
@@ -43,6 +43,7 @@ namespace mongo {
namespace {
constexpr auto listCollectionsCursorCol = "$cmd.listCollections"_sd;
+constexpr auto bulkWriteCursorCol = "$cmd.bulkWrite"_sd;
constexpr auto collectionlessAggregateCursorCol = "$cmd.aggregate"_sd;
constexpr auto dropPendingNSPrefix = "system.drop."_sd;
@@ -321,6 +322,10 @@ bool NamespaceString::mustBeAppliedInOwnOplogBatch() const {
_ns == kConfigsvrShardsNamespace.ns();
}
+NamespaceString NamespaceString::makeBulkWriteNSS() {
+ return NamespaceString(kAdminDb, bulkWriteCursorCol);
+}
+
NamespaceString NamespaceString::makeClusterParametersNSS(
const boost::optional<TenantId>& tenantId) {
return tenantId ? NamespaceString(tenantId, kConfigDb, "clusterParameters")
diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h
index 9c5049186a6..e9c93ea3876 100644
--- a/src/mongo/db/namespace_string.h
+++ b/src/mongo/db/namespace_string.h
@@ -397,6 +397,12 @@ public:
static NamespaceString makeClusterParametersNSS(const boost::optional<TenantId>& tenantId);
/**
+ * Constructs a NamespaceString representing a BulkWrite namespace. The format for this
+ * namespace is admin.$cmd.bulkWrite".
+ */
+ static NamespaceString makeBulkWriteNSS();
+
+ /**
* NOTE: DollarInDbNameBehavior::allow is deprecated.
*
* Please use DollarInDbNameBehavior::disallow and check explicitly for any DB names that must
diff --git a/src/mongo/db/ops/SConscript b/src/mongo/db/ops/SConscript
index e9f3f42346f..7832627e9bd 100644
--- a/src/mongo/db/ops/SConscript
+++ b/src/mongo/db/ops/SConscript
@@ -63,6 +63,7 @@ env.Library(
'$BUILD_DIR/mongo/db/record_id_helpers',
'$BUILD_DIR/mongo/db/repl/oplog',
'$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
+ '$BUILD_DIR/mongo/db/repl/tenant_migration_access_blocker',
'$BUILD_DIR/mongo/db/s/query_analysis_writer',
'$BUILD_DIR/mongo/db/shard_role',
'$BUILD_DIR/mongo/db/stats/counters',
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp
index bd4084e0c32..89900de0cdd 100644
--- a/src/mongo/db/ops/write_ops_exec.cpp
+++ b/src/mongo/db/ops/write_ops_exec.cpp
@@ -61,6 +61,8 @@
#include "mongo/db/query/plan_summary_stats.h"
#include "mongo/db/record_id_helpers.h"
#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/db/repl/tenant_migration_access_blocker_registry.h"
+#include "mongo/db/repl/tenant_migration_access_blocker_util.h"
#include "mongo/db/repl/tenant_migration_conflict_info.h"
#include "mongo/db/repl/tenant_migration_decoration.h"
#include "mongo/db/s/collection_sharding_state.h"
@@ -98,6 +100,7 @@ MONGO_FAIL_POINT_DEFINE(failAllRemoves);
MONGO_FAIL_POINT_DEFINE(hangBeforeChildRemoveOpFinishes);
MONGO_FAIL_POINT_DEFINE(hangBeforeChildRemoveOpIsPopped);
MONGO_FAIL_POINT_DEFINE(hangAfterAllChildRemoveOpsArePopped);
+MONGO_FAIL_POINT_DEFINE(hangWriteBeforeWaitingForMigrationDecision);
MONGO_FAIL_POINT_DEFINE(hangDuringBatchInsert);
MONGO_FAIL_POINT_DEFINE(hangDuringBatchUpdate);
MONGO_FAIL_POINT_DEFINE(hangAfterBatchUpdate);
@@ -549,6 +552,54 @@ bool insertBatchAndHandleErrors(OperationContext* opCtx,
return true;
}
+boost::optional<write_ops::WriteError> generateError(OperationContext* opCtx,
+ const Status& status,
+ int index,
+ size_t numErrors) {
+ if (status.isOK()) {
+ return boost::none;
+ }
+
+ boost::optional<Status> overwrittenStatus;
+
+ if (status == ErrorCodes::TenantMigrationConflict) {
+ hangWriteBeforeWaitingForMigrationDecision.pauseWhileSet(opCtx);
+
+ overwrittenStatus.emplace(
+ tenant_migration_access_blocker::handleTenantMigrationConflict(opCtx, status));
+
+ // Interruption errors encountered during batch execution fail the entire batch, so throw on
+ // such errors here for consistency.
+ if (ErrorCodes::isInterruption(*overwrittenStatus)) {
+ uassertStatusOK(*overwrittenStatus);
+ }
+
+ // Tenant migration errors, similarly to migration errors consume too much space in the
+ // ordered:false responses and get truncated. Since the call to
+ // 'handleTenantMigrationConflict' above replaces the original status, we need to manually
+ // truncate the new reason if the original 'status' was also truncated.
+ if (status.reason().empty()) {
+ overwrittenStatus = overwrittenStatus->withReason("");
+ }
+ }
+
+ constexpr size_t kMaxErrorReasonsToReport = 1;
+ constexpr size_t kMaxErrorSizeToReportAfterMaxReasonsReached = 1024 * 1024;
+
+ if (numErrors > kMaxErrorReasonsToReport) {
+ size_t errorSize =
+ overwrittenStatus ? overwrittenStatus->reason().size() : status.reason().size();
+ if (errorSize > kMaxErrorSizeToReportAfterMaxReasonsReached)
+ overwrittenStatus =
+ overwrittenStatus ? overwrittenStatus->withReason("") : status.withReason("");
+ }
+
+ if (overwrittenStatus)
+ return write_ops::WriteError(index, std::move(*overwrittenStatus));
+ else
+ return write_ops::WriteError(index, status);
+}
+
WriteResult performInserts(OperationContext* opCtx,
const write_ops::InsertCommandRequest& wholeOp,
OperationSource source) {
diff --git a/src/mongo/db/ops/write_ops_exec.h b/src/mongo/db/ops/write_ops_exec.h
index 1e11b18ce81..4e8713bec0f 100644
--- a/src/mongo/db/ops/write_ops_exec.h
+++ b/src/mongo/db/ops/write_ops_exec.h
@@ -83,6 +83,14 @@ bool insertBatchAndHandleErrors(OperationContext* opCtx,
OperationSource source);
/**
+ * Generates a WriteError for a given Status.
+ */
+boost::optional<write_ops::WriteError> generateError(OperationContext* opCtx,
+ const Status& status,
+ int index,
+ size_t numErrors);
+
+/**
* Performs a batch of inserts, updates, or deletes.
*
* These functions handle all of the work of doing the writes, including locking, incrementing