diff options
-rw-r--r-- | src/mongo/db/pipeline/expression_context_for_test.h | 30 | ||||
-rw-r--r-- | src/mongo/db/query/canonical_query.cpp | 10 | ||||
-rw-r--r-- | src/mongo/s/chunk_manager.cpp | 12 | ||||
-rw-r--r-- | src/mongo/s/chunk_manager.h | 2 | ||||
-rw-r--r-- | src/mongo/s/chunk_manager_query_test.cpp | 20 | ||||
-rw-r--r-- | src/mongo/s/cluster_commands_helpers.cpp | 22 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_write_cmd.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/ns_targeter.h | 9 | ||||
-rw-r--r-- | src/mongo/s/write_ops/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_write_exec_test.cpp | 183 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_write_op.cpp | 7 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batched_command_request.cpp | 50 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batched_command_request.h | 44 | ||||
-rw-r--r-- | src/mongo/s/write_ops/chunk_manager_targeter.cpp | 90 | ||||
-rw-r--r-- | src/mongo/s/write_ops/chunk_manager_targeter.h | 16 | ||||
-rw-r--r-- | src/mongo/s/write_ops/chunk_manager_targeter_test.cpp | 110 | ||||
-rw-r--r-- | src/mongo/s/write_ops/mock_ns_targeter.h | 12 | ||||
-rw-r--r-- | src/mongo/s/write_ops/write_op.cpp | 4 |
18 files changed, 496 insertions, 131 deletions
diff --git a/src/mongo/db/pipeline/expression_context_for_test.h b/src/mongo/db/pipeline/expression_context_for_test.h index 4e18ccb6f39..80e4f2ea772 100644 --- a/src/mongo/db/pipeline/expression_context_for_test.h +++ b/src/mongo/db/pipeline/expression_context_for_test.h @@ -149,6 +149,36 @@ public: } /** + * Constructor which sets the given OperationContext on the ExpressionContextForTest. This will + * also resolve the ExpressionContextForTest's ServiceContext from the OperationContext and + * accepts letParameters. + */ + ExpressionContextForTest(OperationContext* opCtx, + const NamespaceString& nss, + std::unique_ptr<CollatorInterface> collator, + const boost::optional<BSONObj>& letParameters = boost::none) + : ExpressionContext(opCtx, + boost::none, // explain + false, // fromMongos, + false, // needsMerge, + false, // allowDiskUse, + false, // bypassDocumentValidation, + false, // isMapReduce + nss, + RuntimeConstants(Date_t::now(), Timestamp(1, 0)), + std::move(collator), + std::make_shared<StubMongoProcessInterface>(), + {}, // resolvedNamespaces + {}, // collUUID + letParameters, + false // mayDbProfile + ), + _serviceContext(opCtx->getServiceContext()) { + // Resolve the TimeZoneDatabase to be used by this ExpressionContextForTest. + _setTimeZoneDatabase(); + } + + /** * Sets the resolved definition for an involved namespace. */ void setResolvedNamespace(const NamespaceString& nss, ResolvedNamespace resolvedNamespace) { diff --git a/src/mongo/db/query/canonical_query.cpp b/src/mongo/db/query/canonical_query.cpp index d8b34b3b9e1..38efced1c0c 100644 --- a/src/mongo/db/query/canonical_query.cpp +++ b/src/mongo/db/query/canonical_query.cpp @@ -98,10 +98,16 @@ StatusWith<std::unique_ptr<CanonicalQuery>> CanonicalQuery::canonicalize( // Make MatchExpression. boost::intrusive_ptr<ExpressionContext> newExpCtx; if (!expCtx.get()) { - newExpCtx = make_intrusive<ExpressionContext>( - opCtx, std::move(collator), qr->nss(), qr->getRuntimeConstants()); + newExpCtx = make_intrusive<ExpressionContext>(opCtx, + std::move(collator), + qr->nss(), + qr->getRuntimeConstants(), + qr->getLetParameters()); } else { newExpCtx = expCtx; + // A collator can enter through both the QueryRequest and ExpressionContext arguments. + // This invariant ensures that both collators are the same because downstream we + // pull the collator from only one of the ExpressionContext carrier. invariant(CollatorInterface::collatorsMatch(collator.get(), expCtx->getCollator())); } diff --git a/src/mongo/s/chunk_manager.cpp b/src/mongo/s/chunk_manager.cpp index 51acbcf0dc7..9cff650a255 100644 --- a/src/mongo/s/chunk_manager.cpp +++ b/src/mongo/s/chunk_manager.cpp @@ -303,22 +303,26 @@ bool ChunkManager::keyBelongsToShard(const BSONObj& shardKey, const ShardId& sha return chunkInfo->getShardIdAt(_clusterTime) == shardId; } -void ChunkManager::getShardIdsForQuery(OperationContext* opCtx, +void ChunkManager::getShardIdsForQuery(boost::intrusive_ptr<ExpressionContext> expCtx, const BSONObj& query, const BSONObj& collation, std::set<ShardId>* shardIds) const { auto qr = std::make_unique<QueryRequest>(_rt->getns()); qr->setFilter(query); + if (auto uuid = getUUID()) + expCtx->uuid = uuid; + if (!collation.isEmpty()) { qr->setCollation(collation); } else if (_rt->getDefaultCollator()) { - qr->setCollation(_rt->getDefaultCollator()->getSpec().toBSON()); + auto defaultCollator = _rt->getDefaultCollator(); + qr->setCollation(defaultCollator->getSpec().toBSON()); + expCtx->setCollator(defaultCollator->clone()); } - const boost::intrusive_ptr<ExpressionContext> expCtx; auto cq = uassertStatusOK( - CanonicalQuery::canonicalize(opCtx, + CanonicalQuery::canonicalize(expCtx->opCtx, std::move(qr), expCtx, ExtensionsCallbackNoop(), diff --git a/src/mongo/s/chunk_manager.h b/src/mongo/s/chunk_manager.h index 67cdfb15fa6..b91750c4073 100644 --- a/src/mongo/s/chunk_manager.h +++ b/src/mongo/s/chunk_manager.h @@ -412,7 +412,7 @@ public: * Finds the shard IDs for a given filter and collation. If collation is empty, we use the * collection default collation for targeting. */ - void getShardIdsForQuery(OperationContext* opCtx, + void getShardIdsForQuery(boost::intrusive_ptr<ExpressionContext> expCtx, const BSONObj& query, const BSONObj& collation, std::set<ShardId>* shardIds) const; diff --git a/src/mongo/s/chunk_manager_query_test.cpp b/src/mongo/s/chunk_manager_query_test.cpp index 5ec10372524..fc710bde923 100644 --- a/src/mongo/s/chunk_manager_query_test.cpp +++ b/src/mongo/s/chunk_manager_query_test.cpp @@ -33,9 +33,13 @@ #include <set> +#include "mongo/db/catalog/catalog_test_fixture.h" +#include "mongo/db/pipeline/expression_context_for_test.h" +#include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/query/collation/collator_interface_mock.h" #include "mongo/s/catalog_cache_test_fixture.h" #include "mongo/s/chunk_manager.h" +#include "mongo/util/assert_util.h" namespace mongo { namespace { @@ -71,8 +75,18 @@ protected: makeChunkManager(kNss, shardKeyPattern, std::move(defaultCollator), false, splitPoints); std::set<ShardId> shardIds; - chunkManager->getShardIdsForQuery(operationContext(), query, queryCollation, &shardIds); + auto&& cif = [&]() { + if (queryCollation.isEmpty()) { + return std::unique_ptr<CollatorInterface>{}; + } else { + return uassertStatusOK(CollatorFactoryInterface::get(getServiceContext()) + ->makeFromBSON(queryCollation)); + } + }(); + auto expCtx = + make_intrusive<ExpressionContextForTest>(operationContext(), kNss, std::move(cif)); + chunkManager->getShardIdsForQuery(expCtx, query, queryCollation, &shardIds); _assertShardIdsMatch(expectedShardIds, shardIds); } @@ -515,9 +529,9 @@ TEST_F(ChunkManagerQueryTest, SnapshotQueryWithMoreShardsThanLatestMetadata) { chunkManager.getShardIdsForRange(BSON("x" << MINKEY), BSON("x" << MAXKEY), &shardIds); ASSERT_EQ(2, shardIds.size()); + const auto expCtx = make_intrusive<ExpressionContextForTest>(); shardIds.clear(); - chunkManager.getShardIdsForQuery( - operationContext(), BSON("x" << BSON("$gt" << -20)), {}, &shardIds); + chunkManager.getShardIdsForQuery(expCtx, BSON("x" << BSON("$gt" << -20)), {}, &shardIds); ASSERT_EQ(2, shardIds.size()); } diff --git a/src/mongo/s/cluster_commands_helpers.cpp b/src/mongo/s/cluster_commands_helpers.cpp index f7f24aa31c2..881542a5730 100644 --- a/src/mongo/s/cluster_commands_helpers.cpp +++ b/src/mongo/s/cluster_commands_helpers.cpp @@ -41,6 +41,8 @@ #include "mongo/db/error_labels.h" #include "mongo/db/logical_clock.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/pipeline/expression_context.h" +#include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/query/cursor_response.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/executor/task_executor_pool.h" @@ -365,7 +367,14 @@ std::vector<AsyncRequestsSender::Request> buildVersionedRequestsForTargetedShard // The collection is sharded. Target all shards that own chunks that match the query. std::set<ShardId> shardIds; - routingInfo.cm()->getShardIdsForQuery(opCtx, query, collation, &shardIds); + std::unique_ptr<CollatorInterface> collator; + if (!collation.isEmpty()) { + collator = uassertStatusOK( + CollatorFactoryInterface::get(opCtx->getServiceContext())->makeFromBSON(collation)); + } + + auto expCtx = make_intrusive<ExpressionContext>(opCtx, std::move(collator), nss); + routingInfo.cm()->getShardIdsForQuery(expCtx, query, collation, &shardIds); for (const ShardId& shardId : shardIds) { if (shardsToSkip.find(shardId) == shardsToSkip.end()) { @@ -694,7 +703,16 @@ std::set<ShardId> getTargetedShardsForQuery(OperationContext* opCtx, // The collection is sharded. Use the routing table to decide which shards to target // based on the query and collation. std::set<ShardId> shardIds; - routingInfo.cm()->getShardIdsForQuery(opCtx, query, collation, &shardIds); + auto&& cif = [&]() { + if (collation.isEmpty()) { + return std::unique_ptr<CollatorInterface>{}; + } else { + return uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext()) + ->makeFromBSON(collation)); + } + }(); + auto expCtx = make_intrusive<ExpressionContext>(opCtx, std::move(cif), NamespaceString()); + routingInfo.cm()->getShardIdsForQuery(expCtx, query, collation, &shardIds); return shardIds; } diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp index f4c226fa461..229488dcb86 100644 --- a/src/mongo/s/commands/cluster_write_cmd.cpp +++ b/src/mongo/s/commands/cluster_write_cmd.cpp @@ -372,9 +372,9 @@ private: if (targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Insert) { return std::vector{targeter.targetInsert(opCtx, targetingBatchItem.getDocument())}; } else if (targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Update) { - return targeter.targetUpdate(opCtx, targetingBatchItem.getUpdate()); + return targeter.targetUpdate(opCtx, targetingBatchItem); } else if (targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Delete) { - return targeter.targetDelete(opCtx, targetingBatchItem.getDelete()); + return targeter.targetDelete(opCtx, targetingBatchItem); } MONGO_UNREACHABLE; }(); diff --git a/src/mongo/s/ns_targeter.h b/src/mongo/s/ns_targeter.h index 36ce6269680..debfc1d9a69 100644 --- a/src/mongo/s/ns_targeter.h +++ b/src/mongo/s/ns_targeter.h @@ -37,6 +37,7 @@ #include "mongo/s/chunk_version.h" #include "mongo/s/shard_id.h" #include "mongo/s/stale_exception.h" +#include "mongo/s/write_ops/batched_command_request.h" namespace mongo { @@ -99,15 +100,15 @@ public: * Returns a vector of ShardEndpoints for a potentially multi-shard update or throws * ShardKeyNotFound if 'updateOp' misses a shard key, but the type of update requires it. */ - virtual std::vector<ShardEndpoint> targetUpdate( - OperationContext* opCtx, const write_ops::UpdateOpEntry& updateOp) const = 0; + virtual std::vector<ShardEndpoint> targetUpdate(OperationContext* opCtx, + const BatchItemRef& itemRef) const = 0; /** * Returns a vector of ShardEndpoints for a potentially multi-shard delete or throws * ShardKeyNotFound if 'deleteOp' misses a shard key, but the type of delete requires it. */ - virtual std::vector<ShardEndpoint> targetDelete( - OperationContext* opCtx, const write_ops::DeleteOpEntry& deleteOp) const = 0; + virtual std::vector<ShardEndpoint> targetDelete(OperationContext* opCtx, + const BatchItemRef& itemRef) const = 0; /** * Returns a vector of ShardEndpoints for all shards. diff --git a/src/mongo/s/write_ops/SConscript b/src/mongo/s/write_ops/SConscript index 67633ae8d77..62d2ed4a9db 100644 --- a/src/mongo/s/write_ops/SConscript +++ b/src/mongo/s/write_ops/SConscript @@ -35,6 +35,8 @@ env.Library( ], LIBDEPS=[ '$BUILD_DIR/mongo/db/commands/server_status_core', + '$BUILD_DIR/mongo/db/pipeline/pipeline', + '$BUILD_DIR/mongo/db/pipeline/process_interface/mongos_process_interface', '$BUILD_DIR/mongo/s/sharding_router_api', 'batch_write_types', ], diff --git a/src/mongo/s/write_ops/batch_write_exec_test.cpp b/src/mongo/s/write_ops/batch_write_exec_test.cpp index 2c05bee128d..4e230942e68 100644 --- a/src/mongo/s/write_ops/batch_write_exec_test.cpp +++ b/src/mongo/s/write_ops/batch_write_exec_test.cpp @@ -317,6 +317,169 @@ TEST_F(BatchWriteExecTest, SingleOpUnordered) { future.default_timed_get(); } +TEST_F(BatchWriteExecTest, SingleUpdateTargetsShardWithLet) { + // Try to update the single doc where a let param is used in the shard key. + const auto let = BSON("y" << 100); + const auto rtc = RuntimeConstants{Date_t::now(), Timestamp(1, 1)}; + const auto q = BSON("x" + << "$$y"); + BatchedCommandRequest updateRequest([&] { + write_ops::Update updateOp(nss); + updateOp.setWriteCommandBase([] { + write_ops::WriteCommandBase writeCommandBase; + writeCommandBase.setOrdered(false); + return writeCommandBase; + }()); + updateOp.setLet(let); + updateOp.setRuntimeConstants(rtc); + updateOp.setUpdates(std::vector{write_ops::UpdateOpEntry(q, + {BSON("Key" + << "100")})}); + return updateOp; + }()); + updateRequest.setWriteConcern(BSONObj()); + + const static auto epoch = OID::gen(); + + class MultiShardTargeter : public MockNSTargeter { + public: + using MockNSTargeter::MockNSTargeter; + + std::vector<ShardEndpoint> targetUpdate(OperationContext* opCtx, + const BatchItemRef& itemRef) const override { + return std::vector<ShardEndpoint>{ + ShardEndpoint(kShardName2, ChunkVersion(101, 200, epoch))}; + } + }; + + MultiShardTargeter multiShardNSTargeter( + nss, + {MockRange(ShardEndpoint(kShardName1, ChunkVersion(100, 200, epoch)), + BSON("x" << MINKEY), + BSON("x" << 0)), + MockRange(ShardEndpoint(kShardName2, ChunkVersion(101, 200, epoch)), + BSON("x" << 0), + BSON("x" << MAXKEY))}); + + auto future = launchAsync([&] { + BatchedCommandResponse response; + BatchWriteExecStats stats; + BatchWriteExec::executeBatch( + operationContext(), multiShardNSTargeter, updateRequest, &response, &stats); + + return response; + }); + + // The update will hit the first shard. + onCommandForPoolExecutor([&](const RemoteCommandRequest& request) { + ASSERT_EQ(kTestShardHost2, request.target); + + BatchedCommandResponse response; + response.setStatus(Status::OK()); + response.setNModified(1); + + // Check that let params and runtimeConstants are propigated to shards. + const auto opMsgRequest(OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj)); + const auto actualBatchedUpdate(BatchedCommandRequest::parseUpdate(opMsgRequest)); + ASSERT_BSONOBJ_EQ(let, actualBatchedUpdate.getLet().value_or(BSONObj())); + ASSERT_EQUALS(actualBatchedUpdate.getRuntimeConstants()->getLocalNow(), rtc.getLocalNow()); + ASSERT_EQUALS(actualBatchedUpdate.getRuntimeConstants()->getClusterTime(), + rtc.getClusterTime()); + + // Check that let params are only forwarded and not evaluated. + auto expectedQ = BSON("x" + << "$$y"); + for (auto&& u : actualBatchedUpdate.getUpdateRequest().getUpdates()) + ASSERT_BSONOBJ_EQ(expectedQ, u.getQ()); + + return response.toBSON(); + }); + + auto response = future.default_timed_get(); + ASSERT_OK(response.getTopLevelStatus()); + ASSERT_EQ(1, response.getNModified()); +} + +TEST_F(BatchWriteExecTest, SingleDeleteTargetsShardWithLet) { + // Try to update the single doc where a let param is used in the shard key. + const auto let = BSON("y" << 100); + const auto rtc = RuntimeConstants{Date_t::now(), Timestamp(1, 1)}; + const auto q = BSON("x" + << "$$y"); + BatchedCommandRequest deleteRequest([&] { + write_ops::Delete deleteOp(nss); + deleteOp.setWriteCommandBase([] { + write_ops::WriteCommandBase writeCommandBase; + writeCommandBase.setOrdered(false); + return writeCommandBase; + }()); + deleteOp.setLet(let); + deleteOp.setRuntimeConstants(rtc); + deleteOp.setDeletes(std::vector{write_ops::DeleteOpEntry(q, false)}); + return deleteOp; + }()); + deleteRequest.setWriteConcern(BSONObj()); + + const static auto epoch = OID::gen(); + + class MultiShardTargeter : public MockNSTargeter { + public: + using MockNSTargeter::MockNSTargeter; + + protected: + std::vector<ShardEndpoint> targetDelete(OperationContext* opCtx, + const BatchItemRef& itemRef) const override { + return std::vector<ShardEndpoint>{ + ShardEndpoint(kShardName2, ChunkVersion(101, 200, epoch))}; + } + }; + + MultiShardTargeter multiShardNSTargeter( + nss, + {MockRange(ShardEndpoint(kShardName1, ChunkVersion(100, 200, epoch)), + BSON("x" << MINKEY), + BSON("x" << 0)), + MockRange(ShardEndpoint(kShardName2, ChunkVersion(101, 200, epoch)), + BSON("x" << 0), + BSON("x" << MAXKEY))}); + + auto future = launchAsync([&] { + BatchedCommandResponse response; + BatchWriteExecStats stats; + BatchWriteExec::executeBatch( + operationContext(), multiShardNSTargeter, deleteRequest, &response, &stats); + + return response; + }); + + // The update will hit the first shard. + onCommandForPoolExecutor([&](const RemoteCommandRequest& request) { + ASSERT_EQ(kTestShardHost2, request.target); + + BatchedCommandResponse response; + response.setStatus(Status::OK()); + + // Check that let params are propigated to shards. + const auto opMsgRequest(OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj)); + const auto actualBatchedUpdate(BatchedCommandRequest::parseDelete(opMsgRequest)); + ASSERT_BSONOBJ_EQ(let, actualBatchedUpdate.getLet().value_or(BSONObj())); + ASSERT_EQUALS(actualBatchedUpdate.getRuntimeConstants()->getLocalNow(), rtc.getLocalNow()); + ASSERT_EQUALS(actualBatchedUpdate.getRuntimeConstants()->getClusterTime(), + rtc.getClusterTime()); + + // Check that let params are only forwarded and not evaluated. + auto expectedQ = BSON("x" + << "$$y"); + for (auto&& u : actualBatchedUpdate.getDeleteRequest().getDeletes()) + ASSERT_BSONOBJ_EQ(expectedQ, u.getQ()); + + return response.toBSON(); + }); + + auto response = future.default_timed_get(); + ASSERT_OK(response.getTopLevelStatus()); +} + TEST_F(BatchWriteExecTest, MultiOpLargeOrdered) { const int kNumDocsToInsert = 100'000; const std::string kDocValue(200, 'x'); @@ -450,8 +613,8 @@ TEST_F(BatchWriteExecTest, StaleShardVersionReturnedFromBatchWithSingleMultiWrit public: using MockNSTargeter::MockNSTargeter; - std::vector<ShardEndpoint> targetUpdate( - OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const override { + std::vector<ShardEndpoint> targetUpdate(OperationContext* opCtx, + const BatchItemRef& itemRef) const override { return std::vector<ShardEndpoint>{ ShardEndpoint(kShardName1, ChunkVersion(100, 200, epoch)), ShardEndpoint(kShardName2, ChunkVersion(101, 200, epoch))}; @@ -551,8 +714,8 @@ TEST_F(BatchWriteExecTest, public: using MockNSTargeter::MockNSTargeter; - std::vector<ShardEndpoint> targetUpdate( - OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const override { + std::vector<ShardEndpoint> targetUpdate(OperationContext* opCtx, + const BatchItemRef& itemRef) const override { return std::vector<ShardEndpoint>{ ShardEndpoint(kShardName1, ChunkVersion(100, 200, epoch)), ShardEndpoint(kShardName2, ChunkVersion(101, 200, epoch))}; @@ -667,8 +830,8 @@ TEST_F(BatchWriteExecTest, RetryableErrorReturnedFromMultiWriteWithShard1Firs) { public: using MockNSTargeter::MockNSTargeter; - std::vector<ShardEndpoint> targetUpdate( - OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const override { + std::vector<ShardEndpoint> targetUpdate(OperationContext* opCtx, + const BatchItemRef& itemRef) const override { return std::vector<ShardEndpoint>{ ShardEndpoint(kShardName1, ChunkVersion(100, 200, epoch)), ShardEndpoint(kShardName2, ChunkVersion(101, 200, epoch))}; @@ -794,8 +957,8 @@ TEST_F(BatchWriteExecTest, RetryableErrorReturnedFromMultiWriteWithShard1FirstOK public: using MockNSTargeter::MockNSTargeter; - std::vector<ShardEndpoint> targetUpdate( - OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const override { + std::vector<ShardEndpoint> targetUpdate(OperationContext* opCtx, + const BatchItemRef& itemRef) const override { return std::vector<ShardEndpoint>{ ShardEndpoint(kShardName1, ChunkVersion(100, 200, epoch)), ShardEndpoint(kShardName2, ChunkVersion(101, 200, epoch))}; @@ -919,8 +1082,8 @@ TEST_F(BatchWriteExecTest, RetryableErrorReturnedFromWriteWithShard1SSVShard2OK) public: using MockNSTargeter::MockNSTargeter; - std::vector<ShardEndpoint> targetUpdate( - OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const override { + std::vector<ShardEndpoint> targetUpdate(OperationContext* opCtx, + const BatchItemRef& itemRef) const override { if (targetAll) { return std::vector<ShardEndpoint>{ ShardEndpoint(kShardName1, ChunkVersion(100, 200, epoch)), diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp index 511816bc441..595d547aaa8 100644 --- a/src/mongo/s/write_ops/batch_write_op.cpp +++ b/src/mongo/s/write_ops/batch_write_op.cpp @@ -513,8 +513,10 @@ BatchedCommandRequest BatchWriteOp::buildBatchRequest( return BatchedCommandRequest([&] { write_ops::Update updateOp(_clientRequest.getNS()); updateOp.setUpdates(std::move(*updates)); - // Each child batch inherits its runtime constants from the parent batch. + // Each child batch inherits its let params/runtime constants from the parent + // batch. updateOp.setRuntimeConstants(_clientRequest.getRuntimeConstants()); + updateOp.setLet(_clientRequest.getLet()); return updateOp; }()); } @@ -522,6 +524,9 @@ BatchedCommandRequest BatchWriteOp::buildBatchRequest( return BatchedCommandRequest([&] { write_ops::Delete deleteOp(_clientRequest.getNS()); deleteOp.setDeletes(std::move(*deletes)); + // Each child batch inherits its let params from the parent batch. + deleteOp.setLet(_clientRequest.getLet()); + deleteOp.setRuntimeConstants(_clientRequest.getRuntimeConstants()); return deleteOp; }()); } diff --git a/src/mongo/s/write_ops/batched_command_request.cpp b/src/mongo/s/write_ops/batched_command_request.cpp index 08e4a1e0c25..6ae6516159d 100644 --- a/src/mongo/s/write_ops/batched_command_request.cpp +++ b/src/mongo/s/write_ops/batched_command_request.cpp @@ -29,7 +29,9 @@ #include "mongo/platform/basic.h" +#include "mongo/db/pipeline/variables.h" #include "mongo/s/write_ops/batched_command_request.h" +#include "mongo/util/visit_helper.h" #include "mongo/bson/bsonobj.h" @@ -67,6 +69,10 @@ BatchedCommandRequest constructBatchedCommandRequest(const OpMsgRequest& request } // namespace +const boost::optional<RuntimeConstants> BatchedCommandRequest::kEmptyRuntimeConstants = + boost::optional<RuntimeConstants>{}; +const boost::optional<BSONObj> BatchedCommandRequest::kEmptyLet = boost::optional<BSONObj>{}; + BatchedCommandRequest BatchedCommandRequest::parseInsert(const OpMsgRequest& request) { return constructBatchedCommandRequest<InsertOp>(request); } @@ -98,6 +104,50 @@ std::size_t BatchedCommandRequest::sizeWriteOps() const { return _visit(Visitor{}); } +bool BatchedCommandRequest::hasRuntimeConstants() const { + return _visit(visit_helper::Overloaded{ + [](write_ops::Insert&) { return false; }, + [&](write_ops::Update& op) { return op.getRuntimeConstants().has_value(); }, + [&](write_ops::Delete& op) { return op.getRuntimeConstants().has_value(); }}); +} + +void BatchedCommandRequest::setRuntimeConstants(RuntimeConstants runtimeConstants) { + _visit(visit_helper::Overloaded{ + [](write_ops::Insert&) {}, + [&](write_ops::Update& op) { op.setRuntimeConstants(std::move(runtimeConstants)); }, + [&](write_ops::Delete& op) { op.setRuntimeConstants(std::move(runtimeConstants)); }}); +} + +const boost::optional<RuntimeConstants>& BatchedCommandRequest::getRuntimeConstants() const { + struct Visitor { + auto& operator()(const write_ops::Insert& op) const { + return kEmptyRuntimeConstants; + } + auto& operator()(const write_ops::Update& op) const { + return op.getRuntimeConstants(); + } + auto& operator()(const write_ops::Delete& op) const { + return op.getRuntimeConstants(); + } + }; + return _visit(Visitor{}); +}; + +const boost::optional<BSONObj>& BatchedCommandRequest::getLet() const { + struct Visitor { + auto& operator()(const write_ops::Insert& op) const { + return kEmptyLet; + } + auto& operator()(const write_ops::Update& op) const { + return op.getLet(); + } + auto& operator()(const write_ops::Delete& op) const { + return op.getLet(); + } + }; + return _visit(Visitor{}); +}; + bool BatchedCommandRequest::isVerboseWC() const { if (!hasWriteConcern()) { return true; diff --git a/src/mongo/s/write_ops/batched_command_request.h b/src/mongo/s/write_ops/batched_command_request.h index ee317b1346a..041a059fdda 100644 --- a/src/mongo/s/write_ops/batched_command_request.h +++ b/src/mongo/s/write_ops/batched_command_request.h @@ -36,6 +36,7 @@ #include "mongo/rpc/op_msg.h" #include "mongo/s/chunk_version.h" #include "mongo/s/database_version_helpers.h" +#include "mongo/util/visit_helper.h" namespace mongo { @@ -133,32 +134,12 @@ public: return *_dbVersion; } - void setRuntimeConstants(RuntimeConstants runtimeConstants) { - invariant(_updateReq); - _updateReq->setRuntimeConstants(std::move(runtimeConstants)); - } - - bool hasRuntimeConstants() const { - invariant(_updateReq); - return _updateReq->getRuntimeConstants().has_value(); - } + void setRuntimeConstants(RuntimeConstants runtimeConstants); - const boost::optional<RuntimeConstants>& getRuntimeConstants() const { - invariant(_updateReq); - return _updateReq->getRuntimeConstants(); - } + bool hasRuntimeConstants() const; - void setLet(BSONObj let) { - _updateReq->setLet(std::move(let)); - } - - bool hasLet() const { - return _updateReq->getLet().is_initialized(); - } - - const boost::optional<BSONObj>& getLet() const { - return _updateReq->getLet(); - } + const boost::optional<RuntimeConstants>& getRuntimeConstants() const; + const boost::optional<BSONObj>& getLet() const; const write_ops::WriteCommandBase& getWriteCommandBase() const; void setWriteCommandBase(write_ops::WriteCommandBase writeCommandBase); @@ -180,6 +161,12 @@ public: */ static BatchedCommandRequest cloneInsertWithIds(BatchedCommandRequest origCmdRequest); + /** These are used to return empty refs from Insert ops that don't carry runtimeConstants + * or let parameters in getLet and getRuntimeConstants. + */ + const static boost::optional<RuntimeConstants> kEmptyRuntimeConstants; + const static boost::optional<BSONObj> kEmptyLet; + private: template <typename Req, typename F, typename... As> static decltype(auto) _visitImpl(Req&& r, F&& f, As&&... as) { @@ -234,7 +221,6 @@ public: const auto& getDocument() const { return _request.getInsertRequest().getDocuments()[_index]; } - const auto& getUpdate() const { return _request.getUpdateRequest().getUpdates()[_index]; } @@ -243,6 +229,14 @@ public: return _request.getDeleteRequest().getDeletes()[_index]; } + auto& getLet() const { + return _request.getLet(); + } + + auto& getRuntimeConstants() const { + return _request.getRuntimeConstants(); + } + private: const BatchedCommandRequest& _request; const int _index; diff --git a/src/mongo/s/write_ops/chunk_manager_targeter.cpp b/src/mongo/s/write_ops/chunk_manager_targeter.cpp index c3f1cfd817b..e50ee00b3f4 100644 --- a/src/mongo/s/write_ops/chunk_manager_targeter.cpp +++ b/src/mongo/s/write_ops/chunk_manager_targeter.cpp @@ -31,20 +31,27 @@ #include "mongo/platform/basic.h" -#include "mongo/s/write_ops/chunk_manager_targeter.h" - #include "mongo/base/counter.h" #include "mongo/db/commands/server_status_metric.h" +#include "mongo/db/curop.h" #include "mongo/db/matcher/extensions_callback_noop.h" +#include "mongo/db/pipeline/pipeline.h" +#include "mongo/db/pipeline/pipeline_d.h" +#include "mongo/db/pipeline/process_interface/mongos_process_interface.h" #include "mongo/db/query/canonical_query.h" #include "mongo/db/query/collation/collation_index_key.h" +#include "mongo/db/query/collation/collator_factory_interface.h" +#include "mongo/executor/task_executor_pool.h" #include "mongo/logv2/log.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/cluster_commands_helpers.h" #include "mongo/s/database_version_helpers.h" #include "mongo/s/grid.h" #include "mongo/s/shard_key_pattern.h" +#include "mongo/s/write_ops/chunk_manager_targeter.h" +#include "mongo/util/intrusive_counter.h" #include "mongo/util/str.h" +#include "signal.h" namespace mongo { namespace { @@ -347,6 +354,51 @@ bool wasMetadataRefreshed(const std::shared_ptr<ChunkManager>& managerA, return false; } +/** + * Makes an expression context suitable for canonicalization of queries that contain let parameters + * and runtimeConstants on mongos. + */ +auto makeExpressionContextWithDefaultsForTargeter( + OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& collation, + const boost::optional<ExplainOptions::Verbosity>& verbosity, + const boost::optional<BSONObj>& letParameters, + const boost::optional<RuntimeConstants>& runtimeConstants) { + + auto&& cif = [&]() { + if (collation.isEmpty()) { + return std::unique_ptr<CollatorInterface>{}; + } else { + return uassertStatusOK( + CollatorFactoryInterface::get(opCtx->getServiceContext())->makeFromBSON(collation)); + } + }(); + + StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces; + resolvedNamespaces.emplace(nss.coll(), + ExpressionContext::ResolvedNamespace(nss, std::vector<BSONObj>{})); + + return make_intrusive<ExpressionContext>( + opCtx, + verbosity, + true, // fromMongos + false, // needs merge + false, // disk use is banned on mongos + true, // bypass document validation, mongos isn't a storage node + false, // not mapReduce + nss, + runtimeConstants, + std::move(cif), + std::make_shared<MongosProcessInterface>( + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor()), + std::move(resolvedNamespaces), + boost::none, // collection uuid + letParameters, + false // mongos has no profile collection + ); +} + } // namespace ChunkManagerTargeter::ChunkManagerTargeter(OperationContext* opCtx, @@ -398,8 +450,8 @@ ShardEndpoint ChunkManagerTargeter::targetInsert(OperationContext* opCtx, _routingInfo->db().databaseVersion()); } -std::vector<ShardEndpoint> ChunkManagerTargeter::targetUpdate( - OperationContext* opCtx, const write_ops::UpdateOpEntry& updateOp) const { +std::vector<ShardEndpoint> ChunkManagerTargeter::targetUpdate(OperationContext* opCtx, + const BatchItemRef& itemRef) const { // If the update is replacement-style: // 1. Attempt to target using the query. If this fails, AND the query targets more than one // shard, @@ -413,6 +465,7 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetUpdate( // as if the the shard key values are specified as NULL. A replacement document is also allowed // to have a missing '_id', and if the '_id' exists in the query, it will be emplaced in the // replacement document for targeting purposes. + const auto& updateOp = itemRef.getUpdate(); const auto updateType = getUpdateExprType(updateOp); // If the collection is not sharded, forward the update to the primary shard. @@ -450,7 +503,13 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetUpdate( // We first try to target based on the update's query. It is always valid to forward any update // or upsert to a single shard, so return immediately if we are able to target a single shard. - auto endPoints = uassertStatusOK(_targetQuery(opCtx, query, collation)); + auto expCtx = makeExpressionContextWithDefaultsForTargeter(opCtx, + _nss, + collation, + boost::none, // explain + itemRef.getLet(), + itemRef.getRuntimeConstants()); + auto endPoints = uassertStatusOK(_targetQuery(expCtx, query, collation)); if (endPoints.size() == 1) { return endPoints; } @@ -484,10 +543,10 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetUpdate( return endPoints; } -std::vector<ShardEndpoint> ChunkManagerTargeter::targetDelete( - OperationContext* opCtx, const write_ops::DeleteOpEntry& deleteOp) const { +std::vector<ShardEndpoint> ChunkManagerTargeter::targetDelete(OperationContext* opCtx, + const BatchItemRef& itemRef) const { + const auto& deleteOp = itemRef.getDelete(); BSONObj shardKey; - if (_routingInfo->cm()) { // Sharded collections have the following further requirements for targeting: // @@ -515,7 +574,12 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetDelete( if (!collation.isEmpty()) { qr->setCollation(collation); } - const boost::intrusive_ptr<ExpressionContext> expCtx; + auto expCtx = makeExpressionContextWithDefaultsForTargeter(opCtx, + _nss, + collation, + boost::none, // explain + itemRef.getLet(), + itemRef.getRuntimeConstants()); auto cq = uassertStatusOKWithContext( CanonicalQuery::canonicalize(opCtx, std::move(qr), @@ -534,11 +598,13 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetDelete( !_routingInfo->cm() || deleteOp.getMulti() || isExactIdQuery(opCtx, *cq, _routingInfo->cm().get())); - return uassertStatusOK(_targetQuery(opCtx, deleteOp.getQ(), collation)); + return uassertStatusOK(_targetQuery(expCtx, deleteOp.getQ(), collation)); } StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::_targetQuery( - OperationContext* opCtx, const BSONObj& query, const BSONObj& collation) const { + boost::intrusive_ptr<ExpressionContext> expCtx, + const BSONObj& query, + const BSONObj& collation) const { if (!_routingInfo->cm()) { return std::vector<ShardEndpoint>{{_routingInfo->db().primaryId(), ChunkVersion::UNSHARDED(), @@ -547,7 +613,7 @@ StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::_targetQuery( std::set<ShardId> shardIds; try { - _routingInfo->cm()->getShardIdsForQuery(opCtx, query, collation, &shardIds); + _routingInfo->cm()->getShardIdsForQuery(expCtx, query, collation, &shardIds); } catch (const DBException& ex) { return ex.toStatus(); } diff --git a/src/mongo/s/write_ops/chunk_manager_targeter.h b/src/mongo/s/write_ops/chunk_manager_targeter.h index d280d6383b9..1e62bc2eeb5 100644 --- a/src/mongo/s/write_ops/chunk_manager_targeter.h +++ b/src/mongo/s/write_ops/chunk_manager_targeter.h @@ -37,6 +37,7 @@ #include "mongo/db/namespace_string.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/ns_targeter.h" +#include "mongo/s/write_ops/batched_command_request.h" namespace mongo { @@ -75,11 +76,11 @@ public: ShardEndpoint targetInsert(OperationContext* opCtx, const BSONObj& doc) const override; - std::vector<ShardEndpoint> targetUpdate( - OperationContext* opCtx, const write_ops::UpdateOpEntry& updateOp) const override; + std::vector<ShardEndpoint> targetUpdate(OperationContext* opCtx, + const BatchItemRef& itemRef) const override; - std::vector<ShardEndpoint> targetDelete( - OperationContext* opCtx, const write_ops::DeleteOpEntry& deleteOp) const override; + std::vector<ShardEndpoint> targetDelete(OperationContext* opCtx, + const BatchItemRef& itemRef) const override; std::vector<ShardEndpoint> targetAllShards(OperationContext* opCtx) const override; @@ -126,9 +127,10 @@ private: * * If 'collation' is empty, we use the collection default collation for targeting. */ - StatusWith<std::vector<ShardEndpoint>> _targetQuery(OperationContext* opCtx, - const BSONObj& query, - const BSONObj& collation) const; + StatusWith<std::vector<ShardEndpoint>> _targetQuery( + boost::intrusive_ptr<ExpressionContext> expCtx, + const BSONObj& query, + const BSONObj& collation) const; /** * Returns a ShardEndpoint for an exact shard key query. diff --git a/src/mongo/s/write_ops/chunk_manager_targeter_test.cpp b/src/mongo/s/write_ops/chunk_manager_targeter_test.cpp index 7da11f654f6..43751bd27cf 100644 --- a/src/mongo/s/write_ops/chunk_manager_targeter_test.cpp +++ b/src/mongo/s/write_ops/chunk_manager_targeter_test.cpp @@ -30,6 +30,7 @@ #include "mongo/platform/basic.h" #include "mongo/db/hasher.h" +#include "mongo/db/pipeline/expression_context_for_test.h" #include "mongo/db/service_context_test_fixture.h" #include "mongo/s/catalog_cache_test_fixture.h" #include "mongo/s/session_catalog_router.h" @@ -45,19 +46,23 @@ using unittest::assertGet; const NamespaceString kNss("TestDB", "TestColl"); -write_ops::UpdateOpEntry buildUpdate(BSONObj query, BSONObj update, bool upsert) { +auto buildUpdate(const NamespaceString& nss, BSONObj query, BSONObj update, bool upsert) { + write_ops::Update updateOp(nss); write_ops::UpdateOpEntry entry; entry.setQ(query); entry.setU(update); entry.setUpsert(upsert); - return entry; + updateOp.setUpdates(std::vector{entry}); + return BatchedCommandRequest{std::move(updateOp)}; } -write_ops::DeleteOpEntry buildDelete(BSONObj query) { +auto buildDelete(const NamespaceString& nss, BSONObj query) { + write_ops::Delete deleteOp(nss); write_ops::DeleteOpEntry entry; entry.setQ(query); entry.setMulti(false); - return entry; + deleteOp.setDeletes(std::vector{entry}); + return BatchedCommandRequest{std::move(deleteOp)}; } class ChunkManagerTargeterTest : public CatalogCacheTestFixture { @@ -191,54 +196,56 @@ TEST_F(ChunkManagerTargeterTest, TargetUpdateWithRangePrefixHashedShardKey) { splitPoints); // When update targets using replacement object. - auto res = cmTargeter.targetUpdate( - operationContext(), - buildUpdate(fromjson("{'a.b': {$gt : 2}}"), fromjson("{a: {b: -1}}"), false)); + auto request = + buildUpdate(kNss, fromjson("{'a.b': {$gt : 2}}"), fromjson("{a: {b: -1}}"), false); + auto res = cmTargeter.targetUpdate(operationContext(), BatchItemRef(&request, 0)); ASSERT_EQUALS(res.size(), 1); ASSERT_EQUALS(res[0].shardName, "2"); // When update targets using query. - res = cmTargeter.targetUpdate( - operationContext(), - buildUpdate(fromjson("{$and: [{'a.b': {$gte : 0}}, {'a.b': {$lt: 99}}]}}"), - fromjson("{$set: {p : 1}}"), - false)); + auto requestAndSet = buildUpdate(kNss, + fromjson("{$and: [{'a.b': {$gte : 0}}, {'a.b': {$lt: 99}}]}}"), + fromjson("{$set: {p : 1}}"), + false); + res = cmTargeter.targetUpdate(operationContext(), BatchItemRef(&requestAndSet, 0)); ASSERT_EQUALS(res.size(), 1); ASSERT_EQUALS(res[0].shardName, "3"); - res = cmTargeter.targetUpdate( - operationContext(), - buildUpdate(fromjson("{'a.b': {$lt : -101}}"), fromjson("{a: {b: 111}}"), false)); + auto requestLT = + buildUpdate(kNss, fromjson("{'a.b': {$lt : -101}}"), fromjson("{a: {b: 111}}"), false); + res = cmTargeter.targetUpdate(operationContext(), BatchItemRef(&requestLT, 0)); ASSERT_EQUALS(res.size(), 1); ASSERT_EQUALS(res[0].shardName, "1"); // For op-style updates, query on _id gets targeted to all shards. - res = cmTargeter.targetUpdate( - operationContext(), buildUpdate(fromjson("{_id: 1}"), fromjson("{$set: {p: 111}}"), false)); + auto requestOpUpdate = + buildUpdate(kNss, fromjson("{_id: 1}"), fromjson("{$set: {p: 111}}"), false); + res = cmTargeter.targetUpdate(operationContext(), BatchItemRef(&requestOpUpdate, 0)); ASSERT_EQUALS(res.size(), 5); // For replacement style updates, query on _id uses replacement doc to target. If the // replacement doc doesn't have shard key fields, then update should be routed to the shard // holding 'null' shard key documents. - res = cmTargeter.targetUpdate(operationContext(), - buildUpdate(fromjson("{_id: 1}"), fromjson("{p: 111}}"), false)); + auto requestReplUpdate = buildUpdate(kNss, fromjson("{_id: 1}"), fromjson("{p: 111}}"), false); + res = cmTargeter.targetUpdate(operationContext(), BatchItemRef(&requestReplUpdate, 0)); ASSERT_EQUALS(res.size(), 1); ASSERT_EQUALS(res[0].shardName, "1"); // Upsert requires full shard key in query, even if the query can target a single shard. + auto requestFullKey = buildUpdate(kNss, + fromjson("{'a.b': 100, 'c.d' : {$exists: false}}}"), + fromjson("{a: {b: -111}}"), + true); ASSERT_THROWS_CODE( - cmTargeter.targetUpdate(operationContext(), - buildUpdate(fromjson("{'a.b': 100, 'c.d' : {$exists: false}}}"), - fromjson("{a: {b: -111}}"), - true)), + cmTargeter.targetUpdate(operationContext(), BatchItemRef(&requestFullKey, 0)), DBException, ErrorCodes::ShardKeyNotFound); // Upsert success case. - res = cmTargeter.targetUpdate( - operationContext(), - buildUpdate(fromjson("{'a.b': 100, 'c.d': 'val'}"), fromjson("{a: {b: -111}}"), true)); + auto requestSuccess = + buildUpdate(kNss, fromjson("{'a.b': 100, 'c.d': 'val'}"), fromjson("{a: {b: -111}}"), true); + res = cmTargeter.targetUpdate(operationContext(), BatchItemRef(&requestSuccess, 0)); ASSERT_EQUALS(res.size(), 1); ASSERT_EQUALS(res[0].shardName, "4"); } @@ -263,8 +270,8 @@ TEST_F(ChunkManagerTargeterTest, TargetUpdateWithHashedPrefixHashedShardKey) { // Verify that the given document is being routed based on hashed value of 'i' in // 'updateQueryObj'. - const auto res = cmTargeter.targetUpdate( - operationContext(), buildUpdate(updateQueryObj, fromjson("{$set: {p: 1}}"), false)); + auto request = buildUpdate(kNss, updateQueryObj, fromjson("{$set: {p: 1}}"), false); + const auto res = cmTargeter.targetUpdate(operationContext(), BatchItemRef(&request, 0)); ASSERT_EQUALS(res.size(), 1); ASSERT_EQUALS(res[0].shardName, findChunk(updateQueryObj["a"]["b"]).getShardId()); } @@ -272,16 +279,15 @@ TEST_F(ChunkManagerTargeterTest, TargetUpdateWithHashedPrefixHashedShardKey) { // Range queries on hashed field cannot be used for targeting. In this case, update will be // targeted based on update document. const auto updateObj = fromjson("{a: {b: -1}}"); - auto res = cmTargeter.targetUpdate( - operationContext(), buildUpdate(fromjson("{'a.b': {$gt : 101}}"), updateObj, false)); + auto requestUpdate = buildUpdate(kNss, fromjson("{'a.b': {$gt : 101}}"), updateObj, false); + auto res = cmTargeter.targetUpdate(operationContext(), BatchItemRef(&requestUpdate, 0)); ASSERT_EQUALS(res.size(), 1); ASSERT_EQUALS(res[0].shardName, findChunk(updateObj["a"]["b"]).getShardId()); - ASSERT_THROWS_CODE( - cmTargeter.targetUpdate( - operationContext(), - buildUpdate(fromjson("{'a.b': {$gt : 101}}"), fromjson("{$set: {p: 1}}"), false)), - DBException, - ErrorCodes::InvalidOptions); + auto requestErr = + buildUpdate(kNss, fromjson("{'a.b': {$gt : 101}}"), fromjson("{$set: {p: 1}}"), false); + ASSERT_THROWS_CODE(cmTargeter.targetUpdate(operationContext(), BatchItemRef(&requestErr, 0)), + DBException, + ErrorCodes::InvalidOptions); } TEST_F(ChunkManagerTargeterTest, TargetDeleteWithRangePrefixHashedShardKey) { @@ -295,30 +301,33 @@ TEST_F(ChunkManagerTargeterTest, TargetDeleteWithRangePrefixHashedShardKey) { splitPoints); // Cannot delete without full shardkey in the query. + auto requestPartialKey = buildDelete(kNss, fromjson("{'a.b': {$gt : 2}}")); ASSERT_THROWS_CODE( - cmTargeter.targetDelete(operationContext(), buildDelete(fromjson("{'a.b': {$gt : 2}}"))), + cmTargeter.targetDelete(operationContext(), BatchItemRef(&requestPartialKey, 0)), DBException, ErrorCodes::ShardKeyNotFound); + + auto requestPartialKey2 = buildDelete(kNss, fromjson("{'a.b': -101}")); ASSERT_THROWS_CODE( - cmTargeter.targetDelete(operationContext(), buildDelete(fromjson("{'a.b': -101}"))), + cmTargeter.targetDelete(operationContext(), BatchItemRef(&requestPartialKey2, 0)), DBException, ErrorCodes::ShardKeyNotFound); // Delete targeted correctly with full shard key in query. - auto res = cmTargeter.targetDelete(operationContext(), - buildDelete(fromjson("{'a.b': -101, 'c.d': 5}"))); + auto requestFullKey = buildDelete(kNss, fromjson("{'a.b': -101, 'c.d': 5}")); + auto res = cmTargeter.targetDelete(operationContext(), BatchItemRef(&requestFullKey, 0)); ASSERT_EQUALS(res.size(), 1); ASSERT_EQUALS(res[0].shardName, "1"); // Query with MinKey value should go to chunk '0' because MinKey is smaller than BSONNULL. - res = cmTargeter.targetDelete( - operationContext(), - buildDelete(BSONObjBuilder().appendMinKey("a.b").append("c.d", 4).obj())); + auto requestMinKey = + buildDelete(kNss, BSONObjBuilder().appendMinKey("a.b").append("c.d", 4).obj()); + res = cmTargeter.targetDelete(operationContext(), BatchItemRef(&requestMinKey, 0)); ASSERT_EQUALS(res.size(), 1); ASSERT_EQUALS(res[0].shardName, "0"); - res = - cmTargeter.targetDelete(operationContext(), buildDelete(fromjson("{'a.b': 0, 'c.d': 5}"))); + auto requestMinKey2 = buildDelete(kNss, fromjson("{'a.b': 0, 'c.d': 5}")); + res = cmTargeter.targetDelete(operationContext(), BatchItemRef(&requestMinKey2, 0)); ASSERT_EQUALS(res.size(), 1); ASSERT_EQUALS(res[0].shardName, "3"); } @@ -342,16 +351,17 @@ TEST_F(ChunkManagerTargeterTest, TargetDeleteWithHashedPrefixHashedShardKey) { auto queryObj = BSON("a" << BSON("b" << i) << "c" << BSON("d" << 10)); // Verify that the given document is being routed based on hashed value of 'i' in // 'queryObj'. - const auto res = cmTargeter.targetDelete(operationContext(), buildDelete(queryObj)); + auto request = buildDelete(kNss, queryObj); + const auto res = cmTargeter.targetDelete(operationContext(), BatchItemRef(&request, 0)); ASSERT_EQUALS(res.size(), 1); ASSERT_EQUALS(res[0].shardName, findChunk(queryObj["a"]["b"]).getShardId()); } // Range queries on hashed field cannot be used for targeting. - ASSERT_THROWS_CODE( - cmTargeter.targetDelete(operationContext(), buildDelete(fromjson("{'a.b': {$gt : 101}}"))), - DBException, - ErrorCodes::ShardKeyNotFound); + auto request = buildDelete(kNss, fromjson("{'a.b': {$gt : 101}}")); + ASSERT_THROWS_CODE(cmTargeter.targetDelete(operationContext(), BatchItemRef(&request, 0)), + DBException, + ErrorCodes::ShardKeyNotFound); } } // namespace diff --git a/src/mongo/s/write_ops/mock_ns_targeter.h b/src/mongo/s/write_ops/mock_ns_targeter.h index ec6fb892834..4cde0765dd1 100644 --- a/src/mongo/s/write_ops/mock_ns_targeter.h +++ b/src/mongo/s/write_ops/mock_ns_targeter.h @@ -74,18 +74,18 @@ public: * Returns the first ShardEndpoint for the query from the mock ranges. Only can handle * queries of the form { field : { $gte : <value>, $lt : <value> } }. */ - std::vector<ShardEndpoint> targetUpdate( - OperationContext* opCtx, const write_ops::UpdateOpEntry& updateOp) const override { - return _targetQuery(updateOp.getQ()); + std::vector<ShardEndpoint> targetUpdate(OperationContext* opCtx, + const BatchItemRef& itemRef) const override { + return _targetQuery(itemRef.getUpdate().getQ()); } /** * Returns the first ShardEndpoint for the query from the mock ranges. Only can handle * queries of the form { field : { $gte : <value>, $lt : <value> } }. */ - std::vector<ShardEndpoint> targetDelete( - OperationContext* opCtx, const write_ops::DeleteOpEntry& deleteOp) const override { - return _targetQuery(deleteOp.getQ()); + std::vector<ShardEndpoint> targetDelete(OperationContext* opCtx, + const BatchItemRef& itemRef) const override { + return _targetQuery(itemRef.getDelete().getQ()); } std::vector<ShardEndpoint> targetAllShards(OperationContext* opCtx) const override { diff --git a/src/mongo/s/write_ops/write_op.cpp b/src/mongo/s/write_ops/write_op.cpp index 85501f2aedf..31d5e0ce31d 100644 --- a/src/mongo/s/write_ops/write_op.cpp +++ b/src/mongo/s/write_ops/write_op.cpp @@ -59,9 +59,9 @@ void WriteOp::targetWrites(OperationContext* opCtx, if (_itemRef.getOpType() == BatchedCommandRequest::BatchType_Insert) { return std::vector{targeter.targetInsert(opCtx, _itemRef.getDocument())}; } else if (_itemRef.getOpType() == BatchedCommandRequest::BatchType_Update) { - return targeter.targetUpdate(opCtx, _itemRef.getUpdate()); + return targeter.targetUpdate(opCtx, _itemRef); } else if (_itemRef.getOpType() == BatchedCommandRequest::BatchType_Delete) { - return targeter.targetDelete(opCtx, _itemRef.getDelete()); + return targeter.targetDelete(opCtx, _itemRef); } MONGO_UNREACHABLE; }(); |