diff options
author | kauboy26 <vishnu.kaushik@mongodb.com> | 2023-04-03 20:42:55 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-04-03 22:48:14 +0000 |
commit | b4cacf7c95adbdf22c2e20b51bd0d0a31ea6e8d3 (patch) | |
tree | 0c0f3abeafb62e878e5b314c754a9738fbdc5c3e /src/mongo/db/commands/bulk_write.cpp | |
parent | bb7497ecfdd45b7e87c0015aa515e2838414318d (diff) | |
download | mongo-b4cacf7c95adbdf22c2e20b51bd0d0a31ea6e8d3.tar.gz |
SERVER-72789 Validate the database/shard versions for bulkWrite sent from mongos
Diffstat (limited to 'src/mongo/db/commands/bulk_write.cpp')
-rw-r--r-- | src/mongo/db/commands/bulk_write.cpp | 191 |
1 files changed, 103 insertions, 88 deletions
diff --git a/src/mongo/db/commands/bulk_write.cpp b/src/mongo/db/commands/bulk_write.cpp index 916ecd1a5ad..a91a7c9698f 100644 --- a/src/mongo/db/commands/bulk_write.cpp +++ b/src/mongo/db/commands/bulk_write.cpp @@ -38,6 +38,7 @@ #include "mongo/db/catalog/collection_operation_source.h" #include "mongo/db/catalog/document_validation.h" #include "mongo/db/commands.h" +#include "mongo/db/commands/bulk_write.h" #include "mongo/db/commands/bulk_write_crud_op.h" #include "mongo/db/commands/bulk_write_gen.h" #include "mongo/db/commands/bulk_write_parser.h" @@ -57,6 +58,7 @@ #include "mongo/db/query/plan_executor_factory.h" #include "mongo/db/query/query_knobs_gen.h" #include "mongo/db/repl/oplog.h" +#include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/server_feature_flags_gen.h" #include "mongo/db/server_options.h" #include "mongo/db/service_context.h" @@ -164,10 +166,10 @@ private: std::vector<InsertStatement> _batch; boost::optional<int> _firstOpIdx; + // Return true when the batch is at maximum capacity and should be flushed. bool _addInsertToBatch(OperationContext* opCtx, const int stmtId, const BSONObj& toInsert) { _batch.emplace_back(stmtId, toInsert); - // Return true when the batch is at maximum capacity and should be flushed. return _batch.size() == _batch.capacity(); } @@ -545,92 +547,6 @@ bool handleDeleteOp(OperationContext* opCtx, } } -std::vector<BulkWriteReplyItem> performWrites(OperationContext* opCtx, - const BulkWriteCommandRequest& req) { - const auto& ops = req.getOps(); - const auto& bypassDocumentValidation = req.getBypassDocumentValidation(); - - DisableDocumentSchemaValidationIfTrue docSchemaValidationDisabler(opCtx, - bypassDocumentValidation); - - DisableSafeContentValidationIfTrue safeContentValidationDisabler( - opCtx, bypassDocumentValidation, false); - - auto responses = BulkWriteReplies(req, ops.size()); - - // Construct reply handler callbacks. - auto insertCB = [&responses](OperationContext* opCtx, - int currentOpIdx, - write_ops_exec::WriteResult& writes) { - responses.addInsertReplies(opCtx, currentOpIdx, writes); - }; - auto updateCB = [&responses](int currentOpIdx, - const UpdateResult& result, - const boost::optional<BSONObj>& value) { - responses.addUpdateReply(currentOpIdx, result, value); - }; - auto deleteCB = - [&responses](int currentOpIdx, long long nDeleted, const boost::optional<BSONObj>& value) { - responses.addDeleteReply(currentOpIdx, nDeleted, value); - }; - - auto errorCB = [&responses](int currentOpIdx, const Status& status) { - responses.addErrorReply(currentOpIdx, status); - }; - - // Create a current insert batch. - const size_t maxBatchSize = internalInsertMaxBatchSize.load(); - auto batch = InsertBatch(req, std::min(ops.size(), maxBatchSize), insertCB); - - size_t idx = 0; - - auto curOp = CurOp::get(opCtx); - - ON_BLOCK_EXIT([&] { - if (curOp) { - finishCurOp(opCtx, &*curOp); - } - }); - - for (; idx < ops.size(); ++idx) { - auto op = BulkWriteCRUDOp(ops[idx]); - auto opType = op.getType(); - - if (opType == BulkWriteCRUDOp::kInsert) { - if (!handleInsertOp(opCtx, op.getInsert(), req, idx, errorCB, batch)) { - // Insert write failed can no longer continue. - break; - } - } else if (opType == BulkWriteCRUDOp::kUpdate) { - // Flush insert ops before handling update ops. - if (!batch.flush(opCtx)) { - break; - } - if (!handleUpdateOp(opCtx, curOp, op.getUpdate(), req, idx, errorCB, updateCB)) { - // Update write failed can no longer continue. - break; - } - } else { - // Flush insert ops before handling delete ops. - if (!batch.flush(opCtx)) { - break; - } - if (!handleDeleteOp(opCtx, curOp, op.getDelete(), req, idx, errorCB, deleteCB)) { - // Delete write failed can no longer continue. - break; - } - } - } - - // It does not matter if this final flush had errors or not since we finished processing - // the last op already. - batch.flush(opCtx); - - invariant(batch.empty()); - - return responses.getReplies(); -} - bool haveSpaceForNext(const BSONObj& nextDoc, long long numDocs, size_t bytesBuffered) { invariant(numDocs >= 0); if (!numDocs) { @@ -727,7 +643,7 @@ public: } // Apply all of the write operations. - auto replies = performWrites(opCtx, req); + auto replies = bulk_write::performWrites(opCtx, req); return _populateCursorReply(opCtx, req, std::move(replies)); } @@ -869,4 +785,103 @@ public: } bulkWriteCmd; } // namespace + +namespace bulk_write { + +std::vector<BulkWriteReplyItem> performWrites(OperationContext* opCtx, + const BulkWriteCommandRequest& req) { + const auto& ops = req.getOps(); + const auto& bypassDocumentValidation = req.getBypassDocumentValidation(); + + DisableDocumentSchemaValidationIfTrue docSchemaValidationDisabler(opCtx, + bypassDocumentValidation); + + DisableSafeContentValidationIfTrue safeContentValidationDisabler( + opCtx, bypassDocumentValidation, false); + + auto responses = BulkWriteReplies(req, ops.size()); + + // Construct reply handler callbacks. + auto insertCB = [&responses](OperationContext* opCtx, + int currentOpIdx, + write_ops_exec::WriteResult& writes) { + responses.addInsertReplies(opCtx, currentOpIdx, writes); + }; + auto updateCB = [&responses](int currentOpIdx, + const UpdateResult& result, + const boost::optional<BSONObj>& value) { + responses.addUpdateReply(currentOpIdx, result, value); + }; + auto deleteCB = + [&responses](int currentOpIdx, long long nDeleted, const boost::optional<BSONObj>& value) { + responses.addDeleteReply(currentOpIdx, nDeleted, value); + }; + + auto errorCB = [&responses](int currentOpIdx, const Status& status) { + responses.addErrorReply(currentOpIdx, status); + }; + + // Create a current insert batch. + const size_t maxBatchSize = internalInsertMaxBatchSize.load(); + auto batch = InsertBatch(req, std::min(ops.size(), maxBatchSize), insertCB); + + size_t idx = 0; + + auto curOp = CurOp::get(opCtx); + + ON_BLOCK_EXIT([&] { + if (curOp) { + finishCurOp(opCtx, &*curOp); + } + }); + + // Tell mongod what the shard and database versions are. This will cause writes to fail in case + // there is a mismatch in the mongos request provided versions and the local (shard's) + // understanding of the version. + for (const auto& nsInfo : req.getNsInfo()) { + // TODO (SERVER-72767, SERVER-72804, SERVER-72805): Support timeseries collections. + OperationShardingState::setShardRole( + opCtx, nsInfo.getNs(), nsInfo.getShardVersion(), nsInfo.getDatabaseVersion()); + } + + for (; idx < ops.size(); ++idx) { + auto op = BulkWriteCRUDOp(ops[idx]); + auto opType = op.getType(); + + if (opType == BulkWriteCRUDOp::kInsert) { + if (!handleInsertOp(opCtx, op.getInsert(), req, idx, errorCB, batch)) { + // Insert write failed can no longer continue. + break; + } + } else if (opType == BulkWriteCRUDOp::kUpdate) { + // Flush insert ops before handling update ops. + if (!batch.flush(opCtx)) { + break; + } + if (!handleUpdateOp(opCtx, curOp, op.getUpdate(), req, idx, errorCB, updateCB)) { + // Update write failed can no longer continue. + break; + } + } else { + // Flush insert ops before handling delete ops. + if (!batch.flush(opCtx)) { + break; + } + if (!handleDeleteOp(opCtx, curOp, op.getDelete(), req, idx, errorCB, deleteCB)) { + // Delete write failed can no longer continue. + break; + } + } + } + + // It does not matter if this final flush had errors or not since we finished processing + // the last op already. + batch.flush(opCtx); + + invariant(batch.empty()); + + return responses.getReplies(); +} + +} // namespace bulk_write } // namespace mongo |