summaryrefslogtreecommitdiff
path: root/src/mongo/db/commands/bulk_write.cpp
diff options
context:
space:
mode:
authorkauboy26 <vishnu.kaushik@mongodb.com>2023-04-03 20:42:55 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-04-03 22:48:14 +0000
commitb4cacf7c95adbdf22c2e20b51bd0d0a31ea6e8d3 (patch)
tree0c0f3abeafb62e878e5b314c754a9738fbdc5c3e /src/mongo/db/commands/bulk_write.cpp
parentbb7497ecfdd45b7e87c0015aa515e2838414318d (diff)
downloadmongo-b4cacf7c95adbdf22c2e20b51bd0d0a31ea6e8d3.tar.gz
SERVER-72789 Validate the database/shard versions for bulkWrite sent from mongos
Diffstat (limited to 'src/mongo/db/commands/bulk_write.cpp')
-rw-r--r--src/mongo/db/commands/bulk_write.cpp191
1 files changed, 103 insertions, 88 deletions
diff --git a/src/mongo/db/commands/bulk_write.cpp b/src/mongo/db/commands/bulk_write.cpp
index 916ecd1a5ad..a91a7c9698f 100644
--- a/src/mongo/db/commands/bulk_write.cpp
+++ b/src/mongo/db/commands/bulk_write.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/catalog/collection_operation_source.h"
#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/commands.h"
+#include "mongo/db/commands/bulk_write.h"
#include "mongo/db/commands/bulk_write_crud_op.h"
#include "mongo/db/commands/bulk_write_gen.h"
#include "mongo/db/commands/bulk_write_parser.h"
@@ -57,6 +58,7 @@
#include "mongo/db/query/plan_executor_factory.h"
#include "mongo/db/query/query_knobs_gen.h"
#include "mongo/db/repl/oplog.h"
+#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/server_feature_flags_gen.h"
#include "mongo/db/server_options.h"
#include "mongo/db/service_context.h"
@@ -164,10 +166,10 @@ private:
std::vector<InsertStatement> _batch;
boost::optional<int> _firstOpIdx;
+ // Return true when the batch is at maximum capacity and should be flushed.
bool _addInsertToBatch(OperationContext* opCtx, const int stmtId, const BSONObj& toInsert) {
_batch.emplace_back(stmtId, toInsert);
- // Return true when the batch is at maximum capacity and should be flushed.
return _batch.size() == _batch.capacity();
}
@@ -545,92 +547,6 @@ bool handleDeleteOp(OperationContext* opCtx,
}
}
-std::vector<BulkWriteReplyItem> performWrites(OperationContext* opCtx,
- const BulkWriteCommandRequest& req) {
- const auto& ops = req.getOps();
- const auto& bypassDocumentValidation = req.getBypassDocumentValidation();
-
- DisableDocumentSchemaValidationIfTrue docSchemaValidationDisabler(opCtx,
- bypassDocumentValidation);
-
- DisableSafeContentValidationIfTrue safeContentValidationDisabler(
- opCtx, bypassDocumentValidation, false);
-
- auto responses = BulkWriteReplies(req, ops.size());
-
- // Construct reply handler callbacks.
- auto insertCB = [&responses](OperationContext* opCtx,
- int currentOpIdx,
- write_ops_exec::WriteResult& writes) {
- responses.addInsertReplies(opCtx, currentOpIdx, writes);
- };
- auto updateCB = [&responses](int currentOpIdx,
- const UpdateResult& result,
- const boost::optional<BSONObj>& value) {
- responses.addUpdateReply(currentOpIdx, result, value);
- };
- auto deleteCB =
- [&responses](int currentOpIdx, long long nDeleted, const boost::optional<BSONObj>& value) {
- responses.addDeleteReply(currentOpIdx, nDeleted, value);
- };
-
- auto errorCB = [&responses](int currentOpIdx, const Status& status) {
- responses.addErrorReply(currentOpIdx, status);
- };
-
- // Create a current insert batch.
- const size_t maxBatchSize = internalInsertMaxBatchSize.load();
- auto batch = InsertBatch(req, std::min(ops.size(), maxBatchSize), insertCB);
-
- size_t idx = 0;
-
- auto curOp = CurOp::get(opCtx);
-
- ON_BLOCK_EXIT([&] {
- if (curOp) {
- finishCurOp(opCtx, &*curOp);
- }
- });
-
- for (; idx < ops.size(); ++idx) {
- auto op = BulkWriteCRUDOp(ops[idx]);
- auto opType = op.getType();
-
- if (opType == BulkWriteCRUDOp::kInsert) {
- if (!handleInsertOp(opCtx, op.getInsert(), req, idx, errorCB, batch)) {
- // Insert write failed can no longer continue.
- break;
- }
- } else if (opType == BulkWriteCRUDOp::kUpdate) {
- // Flush insert ops before handling update ops.
- if (!batch.flush(opCtx)) {
- break;
- }
- if (!handleUpdateOp(opCtx, curOp, op.getUpdate(), req, idx, errorCB, updateCB)) {
- // Update write failed can no longer continue.
- break;
- }
- } else {
- // Flush insert ops before handling delete ops.
- if (!batch.flush(opCtx)) {
- break;
- }
- if (!handleDeleteOp(opCtx, curOp, op.getDelete(), req, idx, errorCB, deleteCB)) {
- // Delete write failed can no longer continue.
- break;
- }
- }
- }
-
- // It does not matter if this final flush had errors or not since we finished processing
- // the last op already.
- batch.flush(opCtx);
-
- invariant(batch.empty());
-
- return responses.getReplies();
-}
-
bool haveSpaceForNext(const BSONObj& nextDoc, long long numDocs, size_t bytesBuffered) {
invariant(numDocs >= 0);
if (!numDocs) {
@@ -727,7 +643,7 @@ public:
}
// Apply all of the write operations.
- auto replies = performWrites(opCtx, req);
+ auto replies = bulk_write::performWrites(opCtx, req);
return _populateCursorReply(opCtx, req, std::move(replies));
}
@@ -869,4 +785,103 @@ public:
} bulkWriteCmd;
} // namespace
+
+namespace bulk_write {
+
+std::vector<BulkWriteReplyItem> performWrites(OperationContext* opCtx,
+ const BulkWriteCommandRequest& req) {
+ const auto& ops = req.getOps();
+ const auto& bypassDocumentValidation = req.getBypassDocumentValidation();
+
+ DisableDocumentSchemaValidationIfTrue docSchemaValidationDisabler(opCtx,
+ bypassDocumentValidation);
+
+ DisableSafeContentValidationIfTrue safeContentValidationDisabler(
+ opCtx, bypassDocumentValidation, false);
+
+ auto responses = BulkWriteReplies(req, ops.size());
+
+ // Construct reply handler callbacks.
+ auto insertCB = [&responses](OperationContext* opCtx,
+ int currentOpIdx,
+ write_ops_exec::WriteResult& writes) {
+ responses.addInsertReplies(opCtx, currentOpIdx, writes);
+ };
+ auto updateCB = [&responses](int currentOpIdx,
+ const UpdateResult& result,
+ const boost::optional<BSONObj>& value) {
+ responses.addUpdateReply(currentOpIdx, result, value);
+ };
+ auto deleteCB =
+ [&responses](int currentOpIdx, long long nDeleted, const boost::optional<BSONObj>& value) {
+ responses.addDeleteReply(currentOpIdx, nDeleted, value);
+ };
+
+ auto errorCB = [&responses](int currentOpIdx, const Status& status) {
+ responses.addErrorReply(currentOpIdx, status);
+ };
+
+ // Create a current insert batch.
+ const size_t maxBatchSize = internalInsertMaxBatchSize.load();
+ auto batch = InsertBatch(req, std::min(ops.size(), maxBatchSize), insertCB);
+
+ size_t idx = 0;
+
+ auto curOp = CurOp::get(opCtx);
+
+ ON_BLOCK_EXIT([&] {
+ if (curOp) {
+ finishCurOp(opCtx, &*curOp);
+ }
+ });
+
+ // Tell mongod what the shard and database versions are. This will cause writes to fail in case
+ // there is a mismatch in the mongos request provided versions and the local (shard's)
+ // understanding of the version.
+ for (const auto& nsInfo : req.getNsInfo()) {
+ // TODO (SERVER-72767, SERVER-72804, SERVER-72805): Support timeseries collections.
+ OperationShardingState::setShardRole(
+ opCtx, nsInfo.getNs(), nsInfo.getShardVersion(), nsInfo.getDatabaseVersion());
+ }
+
+ for (; idx < ops.size(); ++idx) {
+ auto op = BulkWriteCRUDOp(ops[idx]);
+ auto opType = op.getType();
+
+ if (opType == BulkWriteCRUDOp::kInsert) {
+ if (!handleInsertOp(opCtx, op.getInsert(), req, idx, errorCB, batch)) {
+ // Insert write failed can no longer continue.
+ break;
+ }
+ } else if (opType == BulkWriteCRUDOp::kUpdate) {
+ // Flush insert ops before handling update ops.
+ if (!batch.flush(opCtx)) {
+ break;
+ }
+ if (!handleUpdateOp(opCtx, curOp, op.getUpdate(), req, idx, errorCB, updateCB)) {
+ // Update write failed can no longer continue.
+ break;
+ }
+ } else {
+ // Flush insert ops before handling delete ops.
+ if (!batch.flush(opCtx)) {
+ break;
+ }
+ if (!handleDeleteOp(opCtx, curOp, op.getDelete(), req, idx, errorCB, deleteCB)) {
+ // Delete write failed can no longer continue.
+ break;
+ }
+ }
+ }
+
+ // It does not matter if this final flush had errors or not since we finished processing
+ // the last op already.
+ batch.flush(opCtx);
+
+ invariant(batch.empty());
+
+ return responses.getReplies();
+}
+
+} // namespace bulk_write
} // namespace mongo