diff options
author | seanzimm <sean.zimmerman@mongodb.com> | 2023-05-02 19:03:06 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-05-02 20:31:13 +0000 |
commit | 63cc48d21b065e8ed09d672b5af6f8a69cec0dba (patch) | |
tree | c2f4dd6461f6e711b1aa5dedf9f39f28a8da609f | |
parent | c8404ce61457363daa0b225e4195af38b5d59dd6 (diff) | |
download | mongo-63cc48d21b065e8ed09d672b5af6f8a69cec0dba.tar.gz |
SERVER-72187 Support Tenant ID on bulkWrite
13 files changed, 72 insertions, 33 deletions
diff --git a/buildscripts/resmokeconfig/suites/native_tenant_data_isolation_with_dollar_tenant_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/native_tenant_data_isolation_with_dollar_tenant_jscore_passthrough.yml index 7f38d0151cb..892e5b06c82 100644 --- a/buildscripts/resmokeconfig/suites/native_tenant_data_isolation_with_dollar_tenant_jscore_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/native_tenant_data_isolation_with_dollar_tenant_jscore_passthrough.yml @@ -77,11 +77,6 @@ selector: - jstests/core/txns/kill_op_on_txn_expiry.js # TODO SERVER-72357: cannot get the expected error due to an authorization contract issue. - jstests/core/txns/multi_statement_transaction_command_args.js - # TODO SERVER-72187: bulkWrite command does not support Tenant ID command - - jstests/core/write/bulk/bulk_write_insert_cursor.js - - jstests/core/write/bulk/bulk_write_update_cursor.js - - jstests/core/write/bulk/bulk_write_non_retryable_cursor.js - - jstests/core/write/bulk/bulk_write_getMore.js # This test looks for the presence of a log line that contains a db name. Injecting a tenantId in # the requests causes the test to fails due to a mismatch. - jstests/core/api//apitest_db_profile_level.js diff --git a/jstests/core/write/bulk/bulk_write.js b/jstests/core/write/bulk/bulk_write.js index f1767bd0338..f29e4077eb2 100644 --- a/jstests/core/write/bulk/bulk_write.js +++ b/jstests/core/write/bulk/bulk_write.js @@ -6,8 +6,6 @@ * assumes_against_mongod_not_mongos, * not_allowed_with_security_token, * command_not_supported_in_serverless, - * # Command is not yet compatible with tenant migration. - * tenant_migration_incompatible, * # TODO SERVER-52419 Remove this tag. * featureFlagBulkWriteCommand, * ] diff --git a/jstests/core/write/bulk/bulk_write_delete_cursor.js b/jstests/core/write/bulk/bulk_write_delete_cursor.js index 25beceb6d66..3a99750f08d 100644 --- a/jstests/core/write/bulk/bulk_write_delete_cursor.js +++ b/jstests/core/write/bulk/bulk_write_delete_cursor.js @@ -6,8 +6,6 @@ * assumes_against_mongod_not_mongos, * not_allowed_with_security_token, * command_not_supported_in_serverless, - * # Command is not yet compatible with tenant migration. - * tenant_migration_incompatible, * # TODO SERVER-52419 Remove this tag. * featureFlagBulkWriteCommand, * ] diff --git a/jstests/core/write/bulk/bulk_write_getMore.js b/jstests/core/write/bulk/bulk_write_getMore.js index 397e2847efa..ce2f98cd29b 100644 --- a/jstests/core/write/bulk/bulk_write_getMore.js +++ b/jstests/core/write/bulk/bulk_write_getMore.js @@ -14,8 +14,6 @@ * requires_getmore, * # Contains commands that fail which will fail the entire transaction * does_not_support_transactions, - * # Command is not yet compatible with tenant migration. - * tenant_migration_incompatible, * # TODO SERVER-52419 Remove this tag. * featureFlagBulkWriteCommand, * ] diff --git a/jstests/core/write/bulk/bulk_write_insert_cursor.js b/jstests/core/write/bulk/bulk_write_insert_cursor.js index a4168827bb0..494fd188dc3 100644 --- a/jstests/core/write/bulk/bulk_write_insert_cursor.js +++ b/jstests/core/write/bulk/bulk_write_insert_cursor.js @@ -6,8 +6,6 @@ * assumes_against_mongod_not_mongos, * not_allowed_with_security_token, * command_not_supported_in_serverless, - * # Command is not yet compatible with tenant migration. - * tenant_migration_incompatible, * # TODO SERVER-52419 Remove this tag. * featureFlagBulkWriteCommand, * ] diff --git a/jstests/core/write/bulk/bulk_write_non_retryable_cursor.js b/jstests/core/write/bulk/bulk_write_non_retryable_cursor.js index 3a21c96fe7b..60a555072fe 100644 --- a/jstests/core/write/bulk/bulk_write_non_retryable_cursor.js +++ b/jstests/core/write/bulk/bulk_write_non_retryable_cursor.js @@ -10,8 +10,6 @@ * requires_non_retryable_writes, * not_allowed_with_security_token, * command_not_supported_in_serverless, - * # Command is not yet compatible with tenant migration. - * tenant_migration_incompatible, * # TODO SERVER-52419 Remove this tag. * featureFlagBulkWriteCommand, * ] diff --git a/jstests/core/write/bulk/bulk_write_non_transaction.js b/jstests/core/write/bulk/bulk_write_non_transaction.js index 044f606dd63..081b96f27a5 100644 --- a/jstests/core/write/bulk/bulk_write_non_transaction.js +++ b/jstests/core/write/bulk/bulk_write_non_transaction.js @@ -11,8 +11,6 @@ * command_not_supported_in_serverless, * # Contains commands that fail which will fail the entire transaction * does_not_support_transactions, - * # Command is not yet compatible with tenant migration. - * tenant_migration_incompatible, * # TODO SERVER-52419 Remove this tag. * featureFlagBulkWriteCommand, * ] diff --git a/jstests/core/write/bulk/bulk_write_update_cursor.js b/jstests/core/write/bulk/bulk_write_update_cursor.js index 687d349e9c5..7c92195ac8b 100644 --- a/jstests/core/write/bulk/bulk_write_update_cursor.js +++ b/jstests/core/write/bulk/bulk_write_update_cursor.js @@ -6,8 +6,6 @@ * assumes_against_mongod_not_mongos, * not_allowed_with_security_token, * command_not_supported_in_serverless, - * # Command is not yet compatible with tenant migration. - * tenant_migration_incompatible, * # TODO SERVER-52419 Remove this tag. * featureFlagBulkWriteCommand, * ] diff --git a/jstests/libs/override_methods/inject_tenant_prefix.js b/jstests/libs/override_methods/inject_tenant_prefix.js index 4e558024877..1911fff5cec 100644 --- a/jstests/libs/override_methods/inject_tenant_prefix.js +++ b/jstests/libs/override_methods/inject_tenant_prefix.js @@ -320,6 +320,19 @@ function extractTenantMigrationError(resObj, errorCode) { } } } + + // BulkWrite command has errors contained in a cursor response. The error will always be + // in the first batch of the cursor response since getMore is not allowed to run with + // tenant migration / shard merge suites. + if (resObj.cursor) { + if (resObj.cursor.firstBatch) { + for (let opRes of resObj.cursor.firstBatch) { + if (opRes.code && opRes.code == errorCode) { + return {code: opRes.code, errmsg: opRes.errmsg}; + } + } + } + } return null; } @@ -391,6 +404,15 @@ function modifyCmdObjForRetry(cmdObj, resObj) { } cmdObj.deletes = retryOps; } + + if (cmdObj.bulkWrite) { + let retryOps = []; + // For bulkWrite tenant migration errors always act as if they are executed as + // `ordered:true` meaning we will have to retry every op from the one that errored. + retryOps = + cmdObj.ops.slice(resObj.cursor.firstBatch[resObj.cursor.firstBatch.length - 1].idx); + cmdObj.ops = retryOps; + } } /** @@ -533,6 +555,7 @@ function runCommandRetryOnTenantMigrationErrors( let nModified = 0; let upserted = []; let nonRetryableWriteErrors = []; + let bulkWriteResponse = {}; const isRetryableWrite = cmdObjWithTenantId.txnNumber && !cmdObjWithTenantId.hasOwnProperty("autocommit"); @@ -575,6 +598,31 @@ function runCommandRetryOnTenantMigrationErrors( // Add/modify the shells's n, nModified, upserted, and writeErrors, unless this command is // part of a retryable write. if (!isRetryableWrite) { + // bulkWrite case. + if (cmdObjWithTenantId.bulkWrite) { + // First attempt store the whole response. + if (numAttempts == 1) { + bulkWriteResponse = resObj; + } else { + // The last item from the previous response is guaranteed to be a + // tenant migration error. Remove it to append the retried response. + let newIdx = bulkWriteResponse.cursor.firstBatch.pop().idx; + // Iterate over new response and change the indexes to start with newIdx. + for (let opRes of resObj.cursor.firstBatch) { + opRes.idx = newIdx; + newIdx += 1; + } + + // Add the new responses (with modified indexes) onto the original responses. + bulkWriteResponse.cursor.firstBatch = + bulkWriteResponse.cursor.firstBatch.concat(resObj.cursor.firstBatch); + + // Add new numErrors onto old numErrors. Subtract one to account for the + // tenant migration error that was popped off. + bulkWriteResponse.numErrors += resObj.numErrors - 1; + } + } + if (resObj.n) { n += resObj.n; } @@ -707,6 +755,9 @@ function runCommandRetryOnTenantMigrationErrors( if (nonRetryableWriteErrors.length > 0) { resObj.writeErrors = nonRetryableWriteErrors; } + if (cmdObjWithTenantId.bulkWrite) { + resObj = bulkWriteResponse; + } } return resObj; } diff --git a/src/mongo/db/commands/bulk_write.cpp b/src/mongo/db/commands/bulk_write.cpp index 9f987b6afb0..0b2379f8897 100644 --- a/src/mongo/db/commands/bulk_write.cpp +++ b/src/mongo/db/commands/bulk_write.cpp @@ -175,19 +175,24 @@ public: _replies.emplace_back(replyItem); } - void addUpdateErrorReply(size_t currentOpIdx, const Status& status) { + void addUpdateErrorReply(OperationContext* opCtx, size_t currentOpIdx, const Status& status) { auto replyItem = BulkWriteReplyItem(currentOpIdx); replyItem.setNModified(0); - addErrorReply(replyItem, status); + addErrorReply(opCtx, replyItem, status); } - void addErrorReply(size_t currentOpIdx, const Status& status) { + void addErrorReply(OperationContext* opCtx, size_t currentOpIdx, const Status& status) { auto replyItem = BulkWriteReplyItem(currentOpIdx); - addErrorReply(replyItem, status); + addErrorReply(opCtx, replyItem, status); } - void addErrorReply(BulkWriteReplyItem& replyItem, const Status& status) { - replyItem.setStatus(status); + void addErrorReply(OperationContext* opCtx, + BulkWriteReplyItem& replyItem, + const Status& status) { + auto error = + write_ops_exec::generateError(opCtx, status, replyItem.getIdx(), 0 /* numErrors */); + invariant(error); + replyItem.setStatus(error.get().getStatus()); replyItem.setOk(status.isOK() ? 1.0 : 0.0); replyItem.setN(0); _replies.emplace_back(replyItem); @@ -524,7 +529,7 @@ bool handleInsertOp(OperationContext* opCtx, uassertStatusOK(fixedDoc.getStatus()); MONGO_UNREACHABLE; } catch (const DBException& ex) { - responses.addErrorReply(currentOpIdx, ex.toStatus()); + responses.addErrorReply(opCtx, currentOpIdx, ex.toStatus()); write_ops_exec::WriteResult out; // fixDocumentForInsert can only fail for validation reasons, we only use handleError // here to tell us if we are able to continue processing further ops or not. @@ -684,7 +689,7 @@ bool handleUpdateOp(OperationContext* opCtx, if (ex.code() == ErrorCodes::IncompleteTransactionHistory) { throw; } - responses.addUpdateErrorReply(currentOpIdx, ex.toStatus()); + responses.addUpdateErrorReply(opCtx, currentOpIdx, ex.toStatus()); write_ops_exec::WriteResult out; return write_ops_exec::handleError( opCtx, ex, nsInfo[idx].getNs(), req.getOrdered(), op->getMulti(), boost::none, &out); @@ -777,7 +782,7 @@ bool handleDeleteOp(OperationContext* opCtx, if (ex.code() == ErrorCodes::IncompleteTransactionHistory) { throw; } - responses.addErrorReply(currentOpIdx, ex.toStatus()); + responses.addErrorReply(opCtx, currentOpIdx, ex.toStatus()); write_ops_exec::WriteResult out; return write_ops_exec::handleError( opCtx, ex, nsInfo[idx].getNs(), req.getOrdered(), false, boost::none, &out); @@ -867,7 +872,8 @@ public: bulk_write::RetriedStmtIds retriedStmtIds, int numErrors) { auto reqObj = unparsedRequest().body; - const NamespaceString cursorNss = NamespaceString::makeBulkWriteNSS(); + const NamespaceString cursorNss = + NamespaceString::makeBulkWriteNSS(req.getDollarTenant()); auto expCtx = make_intrusive<ExpressionContext>( opCtx, std::unique_ptr<CollatorInterface>(nullptr), ns()); diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp index b2547b5c5a1..d9a4c3daea5 100644 --- a/src/mongo/db/namespace_string.cpp +++ b/src/mongo/db/namespace_string.cpp @@ -175,8 +175,8 @@ bool NamespaceString::mustBeAppliedInOwnOplogBatch() const { ns == kConfigsvrShardsNamespace.ns(); } -NamespaceString NamespaceString::makeBulkWriteNSS() { - return NamespaceString(DatabaseName::kAdmin, bulkWriteCursorCol); +NamespaceString NamespaceString::makeBulkWriteNSS(const boost::optional<TenantId>& tenantId) { + return NamespaceString(DatabaseName::kAdmin.db(), bulkWriteCursorCol, tenantId); } NamespaceString NamespaceString::makeClusterParametersNSS( diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h index 9521bb9d874..d45198573d6 100644 --- a/src/mongo/db/namespace_string.h +++ b/src/mongo/db/namespace_string.h @@ -337,7 +337,7 @@ public: * Constructs a NamespaceString representing a BulkWrite namespace. The format for this * namespace is admin.$cmd.bulkWrite". */ - static NamespaceString makeBulkWriteNSS(); + static NamespaceString makeBulkWriteNSS(const boost::optional<TenantId>& tenantId); /** * Constructs the oplog buffer NamespaceString for the given migration id for movePrimary op. diff --git a/src/mongo/s/commands/cluster_bulk_write_cmd.cpp b/src/mongo/s/commands/cluster_bulk_write_cmd.cpp index b27b94890d9..9f3ce595590 100644 --- a/src/mongo/s/commands/cluster_bulk_write_cmd.cpp +++ b/src/mongo/s/commands/cluster_bulk_write_cmd.cpp @@ -128,7 +128,8 @@ public: const auto& req = request(); auto reqObj = unparsedRequest().body; - const NamespaceString cursorNss = NamespaceString::makeBulkWriteNSS(); + const NamespaceString cursorNss = + NamespaceString::makeBulkWriteNSS(req.getDollarTenant()); ClusterClientCursorParams params(cursorNss, APIParameters::get(opCtx), ReadPreferenceSetting::get(opCtx), |