summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorseanzimm <sean.zimmerman@mongodb.com>2023-05-02 19:03:06 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-05-02 20:31:13 +0000
commit63cc48d21b065e8ed09d672b5af6f8a69cec0dba (patch)
treec2f4dd6461f6e711b1aa5dedf9f39f28a8da609f
parentc8404ce61457363daa0b225e4195af38b5d59dd6 (diff)
downloadmongo-63cc48d21b065e8ed09d672b5af6f8a69cec0dba.tar.gz
SERVER-72187 Support Tenant ID on bulkWrite
-rw-r--r--buildscripts/resmokeconfig/suites/native_tenant_data_isolation_with_dollar_tenant_jscore_passthrough.yml5
-rw-r--r--jstests/core/write/bulk/bulk_write.js2
-rw-r--r--jstests/core/write/bulk/bulk_write_delete_cursor.js2
-rw-r--r--jstests/core/write/bulk/bulk_write_getMore.js2
-rw-r--r--jstests/core/write/bulk/bulk_write_insert_cursor.js2
-rw-r--r--jstests/core/write/bulk/bulk_write_non_retryable_cursor.js2
-rw-r--r--jstests/core/write/bulk/bulk_write_non_transaction.js2
-rw-r--r--jstests/core/write/bulk/bulk_write_update_cursor.js2
-rw-r--r--jstests/libs/override_methods/inject_tenant_prefix.js51
-rw-r--r--src/mongo/db/commands/bulk_write.cpp26
-rw-r--r--src/mongo/db/namespace_string.cpp4
-rw-r--r--src/mongo/db/namespace_string.h2
-rw-r--r--src/mongo/s/commands/cluster_bulk_write_cmd.cpp3
13 files changed, 72 insertions, 33 deletions
diff --git a/buildscripts/resmokeconfig/suites/native_tenant_data_isolation_with_dollar_tenant_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/native_tenant_data_isolation_with_dollar_tenant_jscore_passthrough.yml
index 7f38d0151cb..892e5b06c82 100644
--- a/buildscripts/resmokeconfig/suites/native_tenant_data_isolation_with_dollar_tenant_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/native_tenant_data_isolation_with_dollar_tenant_jscore_passthrough.yml
@@ -77,11 +77,6 @@ selector:
- jstests/core/txns/kill_op_on_txn_expiry.js
# TODO SERVER-72357: cannot get the expected error due to an authorization contract issue.
- jstests/core/txns/multi_statement_transaction_command_args.js
- # TODO SERVER-72187: bulkWrite command does not support Tenant ID command
- - jstests/core/write/bulk/bulk_write_insert_cursor.js
- - jstests/core/write/bulk/bulk_write_update_cursor.js
- - jstests/core/write/bulk/bulk_write_non_retryable_cursor.js
- - jstests/core/write/bulk/bulk_write_getMore.js
# This test looks for the presence of a log line that contains a db name. Injecting a tenantId in
# the requests causes the test to fails due to a mismatch.
- jstests/core/api//apitest_db_profile_level.js
diff --git a/jstests/core/write/bulk/bulk_write.js b/jstests/core/write/bulk/bulk_write.js
index f1767bd0338..f29e4077eb2 100644
--- a/jstests/core/write/bulk/bulk_write.js
+++ b/jstests/core/write/bulk/bulk_write.js
@@ -6,8 +6,6 @@
* assumes_against_mongod_not_mongos,
* not_allowed_with_security_token,
* command_not_supported_in_serverless,
- * # Command is not yet compatible with tenant migration.
- * tenant_migration_incompatible,
* # TODO SERVER-52419 Remove this tag.
* featureFlagBulkWriteCommand,
* ]
diff --git a/jstests/core/write/bulk/bulk_write_delete_cursor.js b/jstests/core/write/bulk/bulk_write_delete_cursor.js
index 25beceb6d66..3a99750f08d 100644
--- a/jstests/core/write/bulk/bulk_write_delete_cursor.js
+++ b/jstests/core/write/bulk/bulk_write_delete_cursor.js
@@ -6,8 +6,6 @@
* assumes_against_mongod_not_mongos,
* not_allowed_with_security_token,
* command_not_supported_in_serverless,
- * # Command is not yet compatible with tenant migration.
- * tenant_migration_incompatible,
* # TODO SERVER-52419 Remove this tag.
* featureFlagBulkWriteCommand,
* ]
diff --git a/jstests/core/write/bulk/bulk_write_getMore.js b/jstests/core/write/bulk/bulk_write_getMore.js
index 397e2847efa..ce2f98cd29b 100644
--- a/jstests/core/write/bulk/bulk_write_getMore.js
+++ b/jstests/core/write/bulk/bulk_write_getMore.js
@@ -14,8 +14,6 @@
* requires_getmore,
* # Contains commands that fail which will fail the entire transaction
* does_not_support_transactions,
- * # Command is not yet compatible with tenant migration.
- * tenant_migration_incompatible,
* # TODO SERVER-52419 Remove this tag.
* featureFlagBulkWriteCommand,
* ]
diff --git a/jstests/core/write/bulk/bulk_write_insert_cursor.js b/jstests/core/write/bulk/bulk_write_insert_cursor.js
index a4168827bb0..494fd188dc3 100644
--- a/jstests/core/write/bulk/bulk_write_insert_cursor.js
+++ b/jstests/core/write/bulk/bulk_write_insert_cursor.js
@@ -6,8 +6,6 @@
* assumes_against_mongod_not_mongos,
* not_allowed_with_security_token,
* command_not_supported_in_serverless,
- * # Command is not yet compatible with tenant migration.
- * tenant_migration_incompatible,
* # TODO SERVER-52419 Remove this tag.
* featureFlagBulkWriteCommand,
* ]
diff --git a/jstests/core/write/bulk/bulk_write_non_retryable_cursor.js b/jstests/core/write/bulk/bulk_write_non_retryable_cursor.js
index 3a21c96fe7b..60a555072fe 100644
--- a/jstests/core/write/bulk/bulk_write_non_retryable_cursor.js
+++ b/jstests/core/write/bulk/bulk_write_non_retryable_cursor.js
@@ -10,8 +10,6 @@
* requires_non_retryable_writes,
* not_allowed_with_security_token,
* command_not_supported_in_serverless,
- * # Command is not yet compatible with tenant migration.
- * tenant_migration_incompatible,
* # TODO SERVER-52419 Remove this tag.
* featureFlagBulkWriteCommand,
* ]
diff --git a/jstests/core/write/bulk/bulk_write_non_transaction.js b/jstests/core/write/bulk/bulk_write_non_transaction.js
index 044f606dd63..081b96f27a5 100644
--- a/jstests/core/write/bulk/bulk_write_non_transaction.js
+++ b/jstests/core/write/bulk/bulk_write_non_transaction.js
@@ -11,8 +11,6 @@
* command_not_supported_in_serverless,
* # Contains commands that fail which will fail the entire transaction
* does_not_support_transactions,
- * # Command is not yet compatible with tenant migration.
- * tenant_migration_incompatible,
* # TODO SERVER-52419 Remove this tag.
* featureFlagBulkWriteCommand,
* ]
diff --git a/jstests/core/write/bulk/bulk_write_update_cursor.js b/jstests/core/write/bulk/bulk_write_update_cursor.js
index 687d349e9c5..7c92195ac8b 100644
--- a/jstests/core/write/bulk/bulk_write_update_cursor.js
+++ b/jstests/core/write/bulk/bulk_write_update_cursor.js
@@ -6,8 +6,6 @@
* assumes_against_mongod_not_mongos,
* not_allowed_with_security_token,
* command_not_supported_in_serverless,
- * # Command is not yet compatible with tenant migration.
- * tenant_migration_incompatible,
* # TODO SERVER-52419 Remove this tag.
* featureFlagBulkWriteCommand,
* ]
diff --git a/jstests/libs/override_methods/inject_tenant_prefix.js b/jstests/libs/override_methods/inject_tenant_prefix.js
index 4e558024877..1911fff5cec 100644
--- a/jstests/libs/override_methods/inject_tenant_prefix.js
+++ b/jstests/libs/override_methods/inject_tenant_prefix.js
@@ -320,6 +320,19 @@ function extractTenantMigrationError(resObj, errorCode) {
}
}
}
+
+ // BulkWrite command has errors contained in a cursor response. The error will always be
+ // in the first batch of the cursor response since getMore is not allowed to run with
+ // tenant migration / shard merge suites.
+ if (resObj.cursor) {
+ if (resObj.cursor.firstBatch) {
+ for (let opRes of resObj.cursor.firstBatch) {
+ if (opRes.code && opRes.code == errorCode) {
+ return {code: opRes.code, errmsg: opRes.errmsg};
+ }
+ }
+ }
+ }
return null;
}
@@ -391,6 +404,15 @@ function modifyCmdObjForRetry(cmdObj, resObj) {
}
cmdObj.deletes = retryOps;
}
+
+ if (cmdObj.bulkWrite) {
+ let retryOps = [];
+ // For bulkWrite tenant migration errors always act as if they are executed as
+ // `ordered:true` meaning we will have to retry every op from the one that errored.
+ retryOps =
+ cmdObj.ops.slice(resObj.cursor.firstBatch[resObj.cursor.firstBatch.length - 1].idx);
+ cmdObj.ops = retryOps;
+ }
}
/**
@@ -533,6 +555,7 @@ function runCommandRetryOnTenantMigrationErrors(
let nModified = 0;
let upserted = [];
let nonRetryableWriteErrors = [];
+ let bulkWriteResponse = {};
const isRetryableWrite =
cmdObjWithTenantId.txnNumber && !cmdObjWithTenantId.hasOwnProperty("autocommit");
@@ -575,6 +598,31 @@ function runCommandRetryOnTenantMigrationErrors(
// Add/modify the shells's n, nModified, upserted, and writeErrors, unless this command is
// part of a retryable write.
if (!isRetryableWrite) {
+ // bulkWrite case.
+ if (cmdObjWithTenantId.bulkWrite) {
+ // First attempt store the whole response.
+ if (numAttempts == 1) {
+ bulkWriteResponse = resObj;
+ } else {
+ // The last item from the previous response is guaranteed to be a
+ // tenant migration error. Remove it to append the retried response.
+ let newIdx = bulkWriteResponse.cursor.firstBatch.pop().idx;
+ // Iterate over new response and change the indexes to start with newIdx.
+ for (let opRes of resObj.cursor.firstBatch) {
+ opRes.idx = newIdx;
+ newIdx += 1;
+ }
+
+ // Add the new responses (with modified indexes) onto the original responses.
+ bulkWriteResponse.cursor.firstBatch =
+ bulkWriteResponse.cursor.firstBatch.concat(resObj.cursor.firstBatch);
+
+ // Add new numErrors onto old numErrors. Subtract one to account for the
+ // tenant migration error that was popped off.
+ bulkWriteResponse.numErrors += resObj.numErrors - 1;
+ }
+ }
+
if (resObj.n) {
n += resObj.n;
}
@@ -707,6 +755,9 @@ function runCommandRetryOnTenantMigrationErrors(
if (nonRetryableWriteErrors.length > 0) {
resObj.writeErrors = nonRetryableWriteErrors;
}
+ if (cmdObjWithTenantId.bulkWrite) {
+ resObj = bulkWriteResponse;
+ }
}
return resObj;
}
diff --git a/src/mongo/db/commands/bulk_write.cpp b/src/mongo/db/commands/bulk_write.cpp
index 9f987b6afb0..0b2379f8897 100644
--- a/src/mongo/db/commands/bulk_write.cpp
+++ b/src/mongo/db/commands/bulk_write.cpp
@@ -175,19 +175,24 @@ public:
_replies.emplace_back(replyItem);
}
- void addUpdateErrorReply(size_t currentOpIdx, const Status& status) {
+ void addUpdateErrorReply(OperationContext* opCtx, size_t currentOpIdx, const Status& status) {
auto replyItem = BulkWriteReplyItem(currentOpIdx);
replyItem.setNModified(0);
- addErrorReply(replyItem, status);
+ addErrorReply(opCtx, replyItem, status);
}
- void addErrorReply(size_t currentOpIdx, const Status& status) {
+ void addErrorReply(OperationContext* opCtx, size_t currentOpIdx, const Status& status) {
auto replyItem = BulkWriteReplyItem(currentOpIdx);
- addErrorReply(replyItem, status);
+ addErrorReply(opCtx, replyItem, status);
}
- void addErrorReply(BulkWriteReplyItem& replyItem, const Status& status) {
- replyItem.setStatus(status);
+ void addErrorReply(OperationContext* opCtx,
+ BulkWriteReplyItem& replyItem,
+ const Status& status) {
+ auto error =
+ write_ops_exec::generateError(opCtx, status, replyItem.getIdx(), 0 /* numErrors */);
+ invariant(error);
+ replyItem.setStatus(error.get().getStatus());
replyItem.setOk(status.isOK() ? 1.0 : 0.0);
replyItem.setN(0);
_replies.emplace_back(replyItem);
@@ -524,7 +529,7 @@ bool handleInsertOp(OperationContext* opCtx,
uassertStatusOK(fixedDoc.getStatus());
MONGO_UNREACHABLE;
} catch (const DBException& ex) {
- responses.addErrorReply(currentOpIdx, ex.toStatus());
+ responses.addErrorReply(opCtx, currentOpIdx, ex.toStatus());
write_ops_exec::WriteResult out;
// fixDocumentForInsert can only fail for validation reasons, we only use handleError
// here to tell us if we are able to continue processing further ops or not.
@@ -684,7 +689,7 @@ bool handleUpdateOp(OperationContext* opCtx,
if (ex.code() == ErrorCodes::IncompleteTransactionHistory) {
throw;
}
- responses.addUpdateErrorReply(currentOpIdx, ex.toStatus());
+ responses.addUpdateErrorReply(opCtx, currentOpIdx, ex.toStatus());
write_ops_exec::WriteResult out;
return write_ops_exec::handleError(
opCtx, ex, nsInfo[idx].getNs(), req.getOrdered(), op->getMulti(), boost::none, &out);
@@ -777,7 +782,7 @@ bool handleDeleteOp(OperationContext* opCtx,
if (ex.code() == ErrorCodes::IncompleteTransactionHistory) {
throw;
}
- responses.addErrorReply(currentOpIdx, ex.toStatus());
+ responses.addErrorReply(opCtx, currentOpIdx, ex.toStatus());
write_ops_exec::WriteResult out;
return write_ops_exec::handleError(
opCtx, ex, nsInfo[idx].getNs(), req.getOrdered(), false, boost::none, &out);
@@ -867,7 +872,8 @@ public:
bulk_write::RetriedStmtIds retriedStmtIds,
int numErrors) {
auto reqObj = unparsedRequest().body;
- const NamespaceString cursorNss = NamespaceString::makeBulkWriteNSS();
+ const NamespaceString cursorNss =
+ NamespaceString::makeBulkWriteNSS(req.getDollarTenant());
auto expCtx = make_intrusive<ExpressionContext>(
opCtx, std::unique_ptr<CollatorInterface>(nullptr), ns());
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp
index b2547b5c5a1..d9a4c3daea5 100644
--- a/src/mongo/db/namespace_string.cpp
+++ b/src/mongo/db/namespace_string.cpp
@@ -175,8 +175,8 @@ bool NamespaceString::mustBeAppliedInOwnOplogBatch() const {
ns == kConfigsvrShardsNamespace.ns();
}
-NamespaceString NamespaceString::makeBulkWriteNSS() {
- return NamespaceString(DatabaseName::kAdmin, bulkWriteCursorCol);
+NamespaceString NamespaceString::makeBulkWriteNSS(const boost::optional<TenantId>& tenantId) {
+ return NamespaceString(DatabaseName::kAdmin.db(), bulkWriteCursorCol, tenantId);
}
NamespaceString NamespaceString::makeClusterParametersNSS(
diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h
index 9521bb9d874..d45198573d6 100644
--- a/src/mongo/db/namespace_string.h
+++ b/src/mongo/db/namespace_string.h
@@ -337,7 +337,7 @@ public:
* Constructs a NamespaceString representing a BulkWrite namespace. The format for this
* namespace is admin.$cmd.bulkWrite".
*/
- static NamespaceString makeBulkWriteNSS();
+ static NamespaceString makeBulkWriteNSS(const boost::optional<TenantId>& tenantId);
/**
* Constructs the oplog buffer NamespaceString for the given migration id for movePrimary op.
diff --git a/src/mongo/s/commands/cluster_bulk_write_cmd.cpp b/src/mongo/s/commands/cluster_bulk_write_cmd.cpp
index b27b94890d9..9f3ce595590 100644
--- a/src/mongo/s/commands/cluster_bulk_write_cmd.cpp
+++ b/src/mongo/s/commands/cluster_bulk_write_cmd.cpp
@@ -128,7 +128,8 @@ public:
const auto& req = request();
auto reqObj = unparsedRequest().body;
- const NamespaceString cursorNss = NamespaceString::makeBulkWriteNSS();
+ const NamespaceString cursorNss =
+ NamespaceString::makeBulkWriteNSS(req.getDollarTenant());
ClusterClientCursorParams params(cursorNss,
APIParameters::get(opCtx),
ReadPreferenceSetting::get(opCtx),