summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorEric Cox <eric.cox@mongodb.com>2020-05-06 15:26:02 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-06-01 20:27:40 +0000
commitcc318e18f225ccaae25b0e0c4c87d8ea91b59ba8 (patch)
treefd41fe5f08d3dc237dcadc762608b06e2775c032 /src/mongo
parent8bf94a6b123c4a8d60f7b7fd8fd860976a726fbf (diff)
downloadmongo-cc318e18f225ccaae25b0e0c4c87d8ea91b59ba8.tar.gz
SERVER-47740 Plumbing for construction of a full ExpressionContext in the sharded case
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/pipeline/expression_context_for_test.h30
-rw-r--r--src/mongo/db/query/canonical_query.cpp10
-rw-r--r--src/mongo/s/chunk_manager.cpp12
-rw-r--r--src/mongo/s/chunk_manager.h2
-rw-r--r--src/mongo/s/chunk_manager_query_test.cpp20
-rw-r--r--src/mongo/s/cluster_commands_helpers.cpp22
-rw-r--r--src/mongo/s/commands/cluster_write_cmd.cpp4
-rw-r--r--src/mongo/s/ns_targeter.h9
-rw-r--r--src/mongo/s/write_ops/SConscript2
-rw-r--r--src/mongo/s/write_ops/batch_write_exec_test.cpp183
-rw-r--r--src/mongo/s/write_ops/batch_write_op.cpp7
-rw-r--r--src/mongo/s/write_ops/batched_command_request.cpp50
-rw-r--r--src/mongo/s/write_ops/batched_command_request.h44
-rw-r--r--src/mongo/s/write_ops/chunk_manager_targeter.cpp90
-rw-r--r--src/mongo/s/write_ops/chunk_manager_targeter.h16
-rw-r--r--src/mongo/s/write_ops/chunk_manager_targeter_test.cpp110
-rw-r--r--src/mongo/s/write_ops/mock_ns_targeter.h12
-rw-r--r--src/mongo/s/write_ops/write_op.cpp4
18 files changed, 496 insertions, 131 deletions
diff --git a/src/mongo/db/pipeline/expression_context_for_test.h b/src/mongo/db/pipeline/expression_context_for_test.h
index 4e18ccb6f39..80e4f2ea772 100644
--- a/src/mongo/db/pipeline/expression_context_for_test.h
+++ b/src/mongo/db/pipeline/expression_context_for_test.h
@@ -149,6 +149,36 @@ public:
}
/**
+ * Constructor which sets the given OperationContext on the ExpressionContextForTest. This will
+ * also resolve the ExpressionContextForTest's ServiceContext from the OperationContext and
+ * accepts letParameters.
+ */
+ ExpressionContextForTest(OperationContext* opCtx,
+ const NamespaceString& nss,
+ std::unique_ptr<CollatorInterface> collator,
+ const boost::optional<BSONObj>& letParameters = boost::none)
+ : ExpressionContext(opCtx,
+ boost::none, // explain
+ false, // fromMongos,
+ false, // needsMerge,
+ false, // allowDiskUse,
+ false, // bypassDocumentValidation,
+ false, // isMapReduce
+ nss,
+ RuntimeConstants(Date_t::now(), Timestamp(1, 0)),
+ std::move(collator),
+ std::make_shared<StubMongoProcessInterface>(),
+ {}, // resolvedNamespaces
+ {}, // collUUID
+ letParameters,
+ false // mayDbProfile
+ ),
+ _serviceContext(opCtx->getServiceContext()) {
+ // Resolve the TimeZoneDatabase to be used by this ExpressionContextForTest.
+ _setTimeZoneDatabase();
+ }
+
+ /**
* Sets the resolved definition for an involved namespace.
*/
void setResolvedNamespace(const NamespaceString& nss, ResolvedNamespace resolvedNamespace) {
diff --git a/src/mongo/db/query/canonical_query.cpp b/src/mongo/db/query/canonical_query.cpp
index d8b34b3b9e1..38efced1c0c 100644
--- a/src/mongo/db/query/canonical_query.cpp
+++ b/src/mongo/db/query/canonical_query.cpp
@@ -98,10 +98,16 @@ StatusWith<std::unique_ptr<CanonicalQuery>> CanonicalQuery::canonicalize(
// Make MatchExpression.
boost::intrusive_ptr<ExpressionContext> newExpCtx;
if (!expCtx.get()) {
- newExpCtx = make_intrusive<ExpressionContext>(
- opCtx, std::move(collator), qr->nss(), qr->getRuntimeConstants());
+ newExpCtx = make_intrusive<ExpressionContext>(opCtx,
+ std::move(collator),
+ qr->nss(),
+ qr->getRuntimeConstants(),
+ qr->getLetParameters());
} else {
newExpCtx = expCtx;
+ // A collator can enter through both the QueryRequest and ExpressionContext arguments.
+ // This invariant ensures that both collators are the same because downstream we
+ // pull the collator from only one of the ExpressionContext carrier.
invariant(CollatorInterface::collatorsMatch(collator.get(), expCtx->getCollator()));
}
diff --git a/src/mongo/s/chunk_manager.cpp b/src/mongo/s/chunk_manager.cpp
index 51acbcf0dc7..9cff650a255 100644
--- a/src/mongo/s/chunk_manager.cpp
+++ b/src/mongo/s/chunk_manager.cpp
@@ -303,22 +303,26 @@ bool ChunkManager::keyBelongsToShard(const BSONObj& shardKey, const ShardId& sha
return chunkInfo->getShardIdAt(_clusterTime) == shardId;
}
-void ChunkManager::getShardIdsForQuery(OperationContext* opCtx,
+void ChunkManager::getShardIdsForQuery(boost::intrusive_ptr<ExpressionContext> expCtx,
const BSONObj& query,
const BSONObj& collation,
std::set<ShardId>* shardIds) const {
auto qr = std::make_unique<QueryRequest>(_rt->getns());
qr->setFilter(query);
+ if (auto uuid = getUUID())
+ expCtx->uuid = uuid;
+
if (!collation.isEmpty()) {
qr->setCollation(collation);
} else if (_rt->getDefaultCollator()) {
- qr->setCollation(_rt->getDefaultCollator()->getSpec().toBSON());
+ auto defaultCollator = _rt->getDefaultCollator();
+ qr->setCollation(defaultCollator->getSpec().toBSON());
+ expCtx->setCollator(defaultCollator->clone());
}
- const boost::intrusive_ptr<ExpressionContext> expCtx;
auto cq = uassertStatusOK(
- CanonicalQuery::canonicalize(opCtx,
+ CanonicalQuery::canonicalize(expCtx->opCtx,
std::move(qr),
expCtx,
ExtensionsCallbackNoop(),
diff --git a/src/mongo/s/chunk_manager.h b/src/mongo/s/chunk_manager.h
index 67cdfb15fa6..b91750c4073 100644
--- a/src/mongo/s/chunk_manager.h
+++ b/src/mongo/s/chunk_manager.h
@@ -412,7 +412,7 @@ public:
* Finds the shard IDs for a given filter and collation. If collation is empty, we use the
* collection default collation for targeting.
*/
- void getShardIdsForQuery(OperationContext* opCtx,
+ void getShardIdsForQuery(boost::intrusive_ptr<ExpressionContext> expCtx,
const BSONObj& query,
const BSONObj& collation,
std::set<ShardId>* shardIds) const;
diff --git a/src/mongo/s/chunk_manager_query_test.cpp b/src/mongo/s/chunk_manager_query_test.cpp
index 5ec10372524..fc710bde923 100644
--- a/src/mongo/s/chunk_manager_query_test.cpp
+++ b/src/mongo/s/chunk_manager_query_test.cpp
@@ -33,9 +33,13 @@
#include <set>
+#include "mongo/db/catalog/catalog_test_fixture.h"
+#include "mongo/db/pipeline/expression_context_for_test.h"
+#include "mongo/db/query/collation/collator_factory_interface.h"
#include "mongo/db/query/collation/collator_interface_mock.h"
#include "mongo/s/catalog_cache_test_fixture.h"
#include "mongo/s/chunk_manager.h"
+#include "mongo/util/assert_util.h"
namespace mongo {
namespace {
@@ -71,8 +75,18 @@ protected:
makeChunkManager(kNss, shardKeyPattern, std::move(defaultCollator), false, splitPoints);
std::set<ShardId> shardIds;
- chunkManager->getShardIdsForQuery(operationContext(), query, queryCollation, &shardIds);
+ auto&& cif = [&]() {
+ if (queryCollation.isEmpty()) {
+ return std::unique_ptr<CollatorInterface>{};
+ } else {
+ return uassertStatusOK(CollatorFactoryInterface::get(getServiceContext())
+ ->makeFromBSON(queryCollation));
+ }
+ }();
+ auto expCtx =
+ make_intrusive<ExpressionContextForTest>(operationContext(), kNss, std::move(cif));
+ chunkManager->getShardIdsForQuery(expCtx, query, queryCollation, &shardIds);
_assertShardIdsMatch(expectedShardIds, shardIds);
}
@@ -515,9 +529,9 @@ TEST_F(ChunkManagerQueryTest, SnapshotQueryWithMoreShardsThanLatestMetadata) {
chunkManager.getShardIdsForRange(BSON("x" << MINKEY), BSON("x" << MAXKEY), &shardIds);
ASSERT_EQ(2, shardIds.size());
+ const auto expCtx = make_intrusive<ExpressionContextForTest>();
shardIds.clear();
- chunkManager.getShardIdsForQuery(
- operationContext(), BSON("x" << BSON("$gt" << -20)), {}, &shardIds);
+ chunkManager.getShardIdsForQuery(expCtx, BSON("x" << BSON("$gt" << -20)), {}, &shardIds);
ASSERT_EQ(2, shardIds.size());
}
diff --git a/src/mongo/s/cluster_commands_helpers.cpp b/src/mongo/s/cluster_commands_helpers.cpp
index f7f24aa31c2..881542a5730 100644
--- a/src/mongo/s/cluster_commands_helpers.cpp
+++ b/src/mongo/s/cluster_commands_helpers.cpp
@@ -41,6 +41,8 @@
#include "mongo/db/error_labels.h"
#include "mongo/db/logical_clock.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/pipeline/expression_context.h"
+#include "mongo/db/query/collation/collator_factory_interface.h"
#include "mongo/db/query/cursor_response.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/executor/task_executor_pool.h"
@@ -365,7 +367,14 @@ std::vector<AsyncRequestsSender::Request> buildVersionedRequestsForTargetedShard
// The collection is sharded. Target all shards that own chunks that match the query.
std::set<ShardId> shardIds;
- routingInfo.cm()->getShardIdsForQuery(opCtx, query, collation, &shardIds);
+ std::unique_ptr<CollatorInterface> collator;
+ if (!collation.isEmpty()) {
+ collator = uassertStatusOK(
+ CollatorFactoryInterface::get(opCtx->getServiceContext())->makeFromBSON(collation));
+ }
+
+ auto expCtx = make_intrusive<ExpressionContext>(opCtx, std::move(collator), nss);
+ routingInfo.cm()->getShardIdsForQuery(expCtx, query, collation, &shardIds);
for (const ShardId& shardId : shardIds) {
if (shardsToSkip.find(shardId) == shardsToSkip.end()) {
@@ -694,7 +703,16 @@ std::set<ShardId> getTargetedShardsForQuery(OperationContext* opCtx,
// The collection is sharded. Use the routing table to decide which shards to target
// based on the query and collation.
std::set<ShardId> shardIds;
- routingInfo.cm()->getShardIdsForQuery(opCtx, query, collation, &shardIds);
+ auto&& cif = [&]() {
+ if (collation.isEmpty()) {
+ return std::unique_ptr<CollatorInterface>{};
+ } else {
+ return uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext())
+ ->makeFromBSON(collation));
+ }
+ }();
+ auto expCtx = make_intrusive<ExpressionContext>(opCtx, std::move(cif), NamespaceString());
+ routingInfo.cm()->getShardIdsForQuery(expCtx, query, collation, &shardIds);
return shardIds;
}
diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp
index f4c226fa461..229488dcb86 100644
--- a/src/mongo/s/commands/cluster_write_cmd.cpp
+++ b/src/mongo/s/commands/cluster_write_cmd.cpp
@@ -372,9 +372,9 @@ private:
if (targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Insert) {
return std::vector{targeter.targetInsert(opCtx, targetingBatchItem.getDocument())};
} else if (targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Update) {
- return targeter.targetUpdate(opCtx, targetingBatchItem.getUpdate());
+ return targeter.targetUpdate(opCtx, targetingBatchItem);
} else if (targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Delete) {
- return targeter.targetDelete(opCtx, targetingBatchItem.getDelete());
+ return targeter.targetDelete(opCtx, targetingBatchItem);
}
MONGO_UNREACHABLE;
}();
diff --git a/src/mongo/s/ns_targeter.h b/src/mongo/s/ns_targeter.h
index 36ce6269680..debfc1d9a69 100644
--- a/src/mongo/s/ns_targeter.h
+++ b/src/mongo/s/ns_targeter.h
@@ -37,6 +37,7 @@
#include "mongo/s/chunk_version.h"
#include "mongo/s/shard_id.h"
#include "mongo/s/stale_exception.h"
+#include "mongo/s/write_ops/batched_command_request.h"
namespace mongo {
@@ -99,15 +100,15 @@ public:
* Returns a vector of ShardEndpoints for a potentially multi-shard update or throws
* ShardKeyNotFound if 'updateOp' misses a shard key, but the type of update requires it.
*/
- virtual std::vector<ShardEndpoint> targetUpdate(
- OperationContext* opCtx, const write_ops::UpdateOpEntry& updateOp) const = 0;
+ virtual std::vector<ShardEndpoint> targetUpdate(OperationContext* opCtx,
+ const BatchItemRef& itemRef) const = 0;
/**
* Returns a vector of ShardEndpoints for a potentially multi-shard delete or throws
* ShardKeyNotFound if 'deleteOp' misses a shard key, but the type of delete requires it.
*/
- virtual std::vector<ShardEndpoint> targetDelete(
- OperationContext* opCtx, const write_ops::DeleteOpEntry& deleteOp) const = 0;
+ virtual std::vector<ShardEndpoint> targetDelete(OperationContext* opCtx,
+ const BatchItemRef& itemRef) const = 0;
/**
* Returns a vector of ShardEndpoints for all shards.
diff --git a/src/mongo/s/write_ops/SConscript b/src/mongo/s/write_ops/SConscript
index 67633ae8d77..62d2ed4a9db 100644
--- a/src/mongo/s/write_ops/SConscript
+++ b/src/mongo/s/write_ops/SConscript
@@ -35,6 +35,8 @@ env.Library(
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/commands/server_status_core',
+ '$BUILD_DIR/mongo/db/pipeline/pipeline',
+ '$BUILD_DIR/mongo/db/pipeline/process_interface/mongos_process_interface',
'$BUILD_DIR/mongo/s/sharding_router_api',
'batch_write_types',
],
diff --git a/src/mongo/s/write_ops/batch_write_exec_test.cpp b/src/mongo/s/write_ops/batch_write_exec_test.cpp
index 2c05bee128d..4e230942e68 100644
--- a/src/mongo/s/write_ops/batch_write_exec_test.cpp
+++ b/src/mongo/s/write_ops/batch_write_exec_test.cpp
@@ -317,6 +317,169 @@ TEST_F(BatchWriteExecTest, SingleOpUnordered) {
future.default_timed_get();
}
+TEST_F(BatchWriteExecTest, SingleUpdateTargetsShardWithLet) {
+ // Try to update the single doc where a let param is used in the shard key.
+ const auto let = BSON("y" << 100);
+ const auto rtc = RuntimeConstants{Date_t::now(), Timestamp(1, 1)};
+ const auto q = BSON("x"
+ << "$$y");
+ BatchedCommandRequest updateRequest([&] {
+ write_ops::Update updateOp(nss);
+ updateOp.setWriteCommandBase([] {
+ write_ops::WriteCommandBase writeCommandBase;
+ writeCommandBase.setOrdered(false);
+ return writeCommandBase;
+ }());
+ updateOp.setLet(let);
+ updateOp.setRuntimeConstants(rtc);
+ updateOp.setUpdates(std::vector{write_ops::UpdateOpEntry(q,
+ {BSON("Key"
+ << "100")})});
+ return updateOp;
+ }());
+ updateRequest.setWriteConcern(BSONObj());
+
+ const static auto epoch = OID::gen();
+
+ class MultiShardTargeter : public MockNSTargeter {
+ public:
+ using MockNSTargeter::MockNSTargeter;
+
+ std::vector<ShardEndpoint> targetUpdate(OperationContext* opCtx,
+ const BatchItemRef& itemRef) const override {
+ return std::vector<ShardEndpoint>{
+ ShardEndpoint(kShardName2, ChunkVersion(101, 200, epoch))};
+ }
+ };
+
+ MultiShardTargeter multiShardNSTargeter(
+ nss,
+ {MockRange(ShardEndpoint(kShardName1, ChunkVersion(100, 200, epoch)),
+ BSON("x" << MINKEY),
+ BSON("x" << 0)),
+ MockRange(ShardEndpoint(kShardName2, ChunkVersion(101, 200, epoch)),
+ BSON("x" << 0),
+ BSON("x" << MAXKEY))});
+
+ auto future = launchAsync([&] {
+ BatchedCommandResponse response;
+ BatchWriteExecStats stats;
+ BatchWriteExec::executeBatch(
+ operationContext(), multiShardNSTargeter, updateRequest, &response, &stats);
+
+ return response;
+ });
+
+ // The update will hit the first shard.
+ onCommandForPoolExecutor([&](const RemoteCommandRequest& request) {
+ ASSERT_EQ(kTestShardHost2, request.target);
+
+ BatchedCommandResponse response;
+ response.setStatus(Status::OK());
+ response.setNModified(1);
+
+ // Check that let params and runtimeConstants are propigated to shards.
+ const auto opMsgRequest(OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj));
+ const auto actualBatchedUpdate(BatchedCommandRequest::parseUpdate(opMsgRequest));
+ ASSERT_BSONOBJ_EQ(let, actualBatchedUpdate.getLet().value_or(BSONObj()));
+ ASSERT_EQUALS(actualBatchedUpdate.getRuntimeConstants()->getLocalNow(), rtc.getLocalNow());
+ ASSERT_EQUALS(actualBatchedUpdate.getRuntimeConstants()->getClusterTime(),
+ rtc.getClusterTime());
+
+ // Check that let params are only forwarded and not evaluated.
+ auto expectedQ = BSON("x"
+ << "$$y");
+ for (auto&& u : actualBatchedUpdate.getUpdateRequest().getUpdates())
+ ASSERT_BSONOBJ_EQ(expectedQ, u.getQ());
+
+ return response.toBSON();
+ });
+
+ auto response = future.default_timed_get();
+ ASSERT_OK(response.getTopLevelStatus());
+ ASSERT_EQ(1, response.getNModified());
+}
+
+TEST_F(BatchWriteExecTest, SingleDeleteTargetsShardWithLet) {
+ // Try to update the single doc where a let param is used in the shard key.
+ const auto let = BSON("y" << 100);
+ const auto rtc = RuntimeConstants{Date_t::now(), Timestamp(1, 1)};
+ const auto q = BSON("x"
+ << "$$y");
+ BatchedCommandRequest deleteRequest([&] {
+ write_ops::Delete deleteOp(nss);
+ deleteOp.setWriteCommandBase([] {
+ write_ops::WriteCommandBase writeCommandBase;
+ writeCommandBase.setOrdered(false);
+ return writeCommandBase;
+ }());
+ deleteOp.setLet(let);
+ deleteOp.setRuntimeConstants(rtc);
+ deleteOp.setDeletes(std::vector{write_ops::DeleteOpEntry(q, false)});
+ return deleteOp;
+ }());
+ deleteRequest.setWriteConcern(BSONObj());
+
+ const static auto epoch = OID::gen();
+
+ class MultiShardTargeter : public MockNSTargeter {
+ public:
+ using MockNSTargeter::MockNSTargeter;
+
+ protected:
+ std::vector<ShardEndpoint> targetDelete(OperationContext* opCtx,
+ const BatchItemRef& itemRef) const override {
+ return std::vector<ShardEndpoint>{
+ ShardEndpoint(kShardName2, ChunkVersion(101, 200, epoch))};
+ }
+ };
+
+ MultiShardTargeter multiShardNSTargeter(
+ nss,
+ {MockRange(ShardEndpoint(kShardName1, ChunkVersion(100, 200, epoch)),
+ BSON("x" << MINKEY),
+ BSON("x" << 0)),
+ MockRange(ShardEndpoint(kShardName2, ChunkVersion(101, 200, epoch)),
+ BSON("x" << 0),
+ BSON("x" << MAXKEY))});
+
+ auto future = launchAsync([&] {
+ BatchedCommandResponse response;
+ BatchWriteExecStats stats;
+ BatchWriteExec::executeBatch(
+ operationContext(), multiShardNSTargeter, deleteRequest, &response, &stats);
+
+ return response;
+ });
+
+ // The update will hit the first shard.
+ onCommandForPoolExecutor([&](const RemoteCommandRequest& request) {
+ ASSERT_EQ(kTestShardHost2, request.target);
+
+ BatchedCommandResponse response;
+ response.setStatus(Status::OK());
+
+ // Check that let params are propigated to shards.
+ const auto opMsgRequest(OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj));
+ const auto actualBatchedUpdate(BatchedCommandRequest::parseDelete(opMsgRequest));
+ ASSERT_BSONOBJ_EQ(let, actualBatchedUpdate.getLet().value_or(BSONObj()));
+ ASSERT_EQUALS(actualBatchedUpdate.getRuntimeConstants()->getLocalNow(), rtc.getLocalNow());
+ ASSERT_EQUALS(actualBatchedUpdate.getRuntimeConstants()->getClusterTime(),
+ rtc.getClusterTime());
+
+ // Check that let params are only forwarded and not evaluated.
+ auto expectedQ = BSON("x"
+ << "$$y");
+ for (auto&& u : actualBatchedUpdate.getDeleteRequest().getDeletes())
+ ASSERT_BSONOBJ_EQ(expectedQ, u.getQ());
+
+ return response.toBSON();
+ });
+
+ auto response = future.default_timed_get();
+ ASSERT_OK(response.getTopLevelStatus());
+}
+
TEST_F(BatchWriteExecTest, MultiOpLargeOrdered) {
const int kNumDocsToInsert = 100'000;
const std::string kDocValue(200, 'x');
@@ -450,8 +613,8 @@ TEST_F(BatchWriteExecTest, StaleShardVersionReturnedFromBatchWithSingleMultiWrit
public:
using MockNSTargeter::MockNSTargeter;
- std::vector<ShardEndpoint> targetUpdate(
- OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const override {
+ std::vector<ShardEndpoint> targetUpdate(OperationContext* opCtx,
+ const BatchItemRef& itemRef) const override {
return std::vector<ShardEndpoint>{
ShardEndpoint(kShardName1, ChunkVersion(100, 200, epoch)),
ShardEndpoint(kShardName2, ChunkVersion(101, 200, epoch))};
@@ -551,8 +714,8 @@ TEST_F(BatchWriteExecTest,
public:
using MockNSTargeter::MockNSTargeter;
- std::vector<ShardEndpoint> targetUpdate(
- OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const override {
+ std::vector<ShardEndpoint> targetUpdate(OperationContext* opCtx,
+ const BatchItemRef& itemRef) const override {
return std::vector<ShardEndpoint>{
ShardEndpoint(kShardName1, ChunkVersion(100, 200, epoch)),
ShardEndpoint(kShardName2, ChunkVersion(101, 200, epoch))};
@@ -667,8 +830,8 @@ TEST_F(BatchWriteExecTest, RetryableErrorReturnedFromMultiWriteWithShard1Firs) {
public:
using MockNSTargeter::MockNSTargeter;
- std::vector<ShardEndpoint> targetUpdate(
- OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const override {
+ std::vector<ShardEndpoint> targetUpdate(OperationContext* opCtx,
+ const BatchItemRef& itemRef) const override {
return std::vector<ShardEndpoint>{
ShardEndpoint(kShardName1, ChunkVersion(100, 200, epoch)),
ShardEndpoint(kShardName2, ChunkVersion(101, 200, epoch))};
@@ -794,8 +957,8 @@ TEST_F(BatchWriteExecTest, RetryableErrorReturnedFromMultiWriteWithShard1FirstOK
public:
using MockNSTargeter::MockNSTargeter;
- std::vector<ShardEndpoint> targetUpdate(
- OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const override {
+ std::vector<ShardEndpoint> targetUpdate(OperationContext* opCtx,
+ const BatchItemRef& itemRef) const override {
return std::vector<ShardEndpoint>{
ShardEndpoint(kShardName1, ChunkVersion(100, 200, epoch)),
ShardEndpoint(kShardName2, ChunkVersion(101, 200, epoch))};
@@ -919,8 +1082,8 @@ TEST_F(BatchWriteExecTest, RetryableErrorReturnedFromWriteWithShard1SSVShard2OK)
public:
using MockNSTargeter::MockNSTargeter;
- std::vector<ShardEndpoint> targetUpdate(
- OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const override {
+ std::vector<ShardEndpoint> targetUpdate(OperationContext* opCtx,
+ const BatchItemRef& itemRef) const override {
if (targetAll) {
return std::vector<ShardEndpoint>{
ShardEndpoint(kShardName1, ChunkVersion(100, 200, epoch)),
diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp
index 511816bc441..595d547aaa8 100644
--- a/src/mongo/s/write_ops/batch_write_op.cpp
+++ b/src/mongo/s/write_ops/batch_write_op.cpp
@@ -513,8 +513,10 @@ BatchedCommandRequest BatchWriteOp::buildBatchRequest(
return BatchedCommandRequest([&] {
write_ops::Update updateOp(_clientRequest.getNS());
updateOp.setUpdates(std::move(*updates));
- // Each child batch inherits its runtime constants from the parent batch.
+ // Each child batch inherits its let params/runtime constants from the parent
+ // batch.
updateOp.setRuntimeConstants(_clientRequest.getRuntimeConstants());
+ updateOp.setLet(_clientRequest.getLet());
return updateOp;
}());
}
@@ -522,6 +524,9 @@ BatchedCommandRequest BatchWriteOp::buildBatchRequest(
return BatchedCommandRequest([&] {
write_ops::Delete deleteOp(_clientRequest.getNS());
deleteOp.setDeletes(std::move(*deletes));
+ // Each child batch inherits its let params from the parent batch.
+ deleteOp.setLet(_clientRequest.getLet());
+ deleteOp.setRuntimeConstants(_clientRequest.getRuntimeConstants());
return deleteOp;
}());
}
diff --git a/src/mongo/s/write_ops/batched_command_request.cpp b/src/mongo/s/write_ops/batched_command_request.cpp
index 08e4a1e0c25..6ae6516159d 100644
--- a/src/mongo/s/write_ops/batched_command_request.cpp
+++ b/src/mongo/s/write_ops/batched_command_request.cpp
@@ -29,7 +29,9 @@
#include "mongo/platform/basic.h"
+#include "mongo/db/pipeline/variables.h"
#include "mongo/s/write_ops/batched_command_request.h"
+#include "mongo/util/visit_helper.h"
#include "mongo/bson/bsonobj.h"
@@ -67,6 +69,10 @@ BatchedCommandRequest constructBatchedCommandRequest(const OpMsgRequest& request
} // namespace
+const boost::optional<RuntimeConstants> BatchedCommandRequest::kEmptyRuntimeConstants =
+ boost::optional<RuntimeConstants>{};
+const boost::optional<BSONObj> BatchedCommandRequest::kEmptyLet = boost::optional<BSONObj>{};
+
BatchedCommandRequest BatchedCommandRequest::parseInsert(const OpMsgRequest& request) {
return constructBatchedCommandRequest<InsertOp>(request);
}
@@ -98,6 +104,50 @@ std::size_t BatchedCommandRequest::sizeWriteOps() const {
return _visit(Visitor{});
}
+bool BatchedCommandRequest::hasRuntimeConstants() const {
+ return _visit(visit_helper::Overloaded{
+ [](write_ops::Insert&) { return false; },
+ [&](write_ops::Update& op) { return op.getRuntimeConstants().has_value(); },
+ [&](write_ops::Delete& op) { return op.getRuntimeConstants().has_value(); }});
+}
+
+void BatchedCommandRequest::setRuntimeConstants(RuntimeConstants runtimeConstants) {
+ _visit(visit_helper::Overloaded{
+ [](write_ops::Insert&) {},
+ [&](write_ops::Update& op) { op.setRuntimeConstants(std::move(runtimeConstants)); },
+ [&](write_ops::Delete& op) { op.setRuntimeConstants(std::move(runtimeConstants)); }});
+}
+
+const boost::optional<RuntimeConstants>& BatchedCommandRequest::getRuntimeConstants() const {
+ struct Visitor {
+ auto& operator()(const write_ops::Insert& op) const {
+ return kEmptyRuntimeConstants;
+ }
+ auto& operator()(const write_ops::Update& op) const {
+ return op.getRuntimeConstants();
+ }
+ auto& operator()(const write_ops::Delete& op) const {
+ return op.getRuntimeConstants();
+ }
+ };
+ return _visit(Visitor{});
+};
+
+const boost::optional<BSONObj>& BatchedCommandRequest::getLet() const {
+ struct Visitor {
+ auto& operator()(const write_ops::Insert& op) const {
+ return kEmptyLet;
+ }
+ auto& operator()(const write_ops::Update& op) const {
+ return op.getLet();
+ }
+ auto& operator()(const write_ops::Delete& op) const {
+ return op.getLet();
+ }
+ };
+ return _visit(Visitor{});
+};
+
bool BatchedCommandRequest::isVerboseWC() const {
if (!hasWriteConcern()) {
return true;
diff --git a/src/mongo/s/write_ops/batched_command_request.h b/src/mongo/s/write_ops/batched_command_request.h
index ee317b1346a..041a059fdda 100644
--- a/src/mongo/s/write_ops/batched_command_request.h
+++ b/src/mongo/s/write_ops/batched_command_request.h
@@ -36,6 +36,7 @@
#include "mongo/rpc/op_msg.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/database_version_helpers.h"
+#include "mongo/util/visit_helper.h"
namespace mongo {
@@ -133,32 +134,12 @@ public:
return *_dbVersion;
}
- void setRuntimeConstants(RuntimeConstants runtimeConstants) {
- invariant(_updateReq);
- _updateReq->setRuntimeConstants(std::move(runtimeConstants));
- }
-
- bool hasRuntimeConstants() const {
- invariant(_updateReq);
- return _updateReq->getRuntimeConstants().has_value();
- }
+ void setRuntimeConstants(RuntimeConstants runtimeConstants);
- const boost::optional<RuntimeConstants>& getRuntimeConstants() const {
- invariant(_updateReq);
- return _updateReq->getRuntimeConstants();
- }
+ bool hasRuntimeConstants() const;
- void setLet(BSONObj let) {
- _updateReq->setLet(std::move(let));
- }
-
- bool hasLet() const {
- return _updateReq->getLet().is_initialized();
- }
-
- const boost::optional<BSONObj>& getLet() const {
- return _updateReq->getLet();
- }
+ const boost::optional<RuntimeConstants>& getRuntimeConstants() const;
+ const boost::optional<BSONObj>& getLet() const;
const write_ops::WriteCommandBase& getWriteCommandBase() const;
void setWriteCommandBase(write_ops::WriteCommandBase writeCommandBase);
@@ -180,6 +161,12 @@ public:
*/
static BatchedCommandRequest cloneInsertWithIds(BatchedCommandRequest origCmdRequest);
+ /** These are used to return empty refs from Insert ops that don't carry runtimeConstants
+ * or let parameters in getLet and getRuntimeConstants.
+ */
+ const static boost::optional<RuntimeConstants> kEmptyRuntimeConstants;
+ const static boost::optional<BSONObj> kEmptyLet;
+
private:
template <typename Req, typename F, typename... As>
static decltype(auto) _visitImpl(Req&& r, F&& f, As&&... as) {
@@ -234,7 +221,6 @@ public:
const auto& getDocument() const {
return _request.getInsertRequest().getDocuments()[_index];
}
-
const auto& getUpdate() const {
return _request.getUpdateRequest().getUpdates()[_index];
}
@@ -243,6 +229,14 @@ public:
return _request.getDeleteRequest().getDeletes()[_index];
}
+ auto& getLet() const {
+ return _request.getLet();
+ }
+
+ auto& getRuntimeConstants() const {
+ return _request.getRuntimeConstants();
+ }
+
private:
const BatchedCommandRequest& _request;
const int _index;
diff --git a/src/mongo/s/write_ops/chunk_manager_targeter.cpp b/src/mongo/s/write_ops/chunk_manager_targeter.cpp
index c3f1cfd817b..e50ee00b3f4 100644
--- a/src/mongo/s/write_ops/chunk_manager_targeter.cpp
+++ b/src/mongo/s/write_ops/chunk_manager_targeter.cpp
@@ -31,20 +31,27 @@
#include "mongo/platform/basic.h"
-#include "mongo/s/write_ops/chunk_manager_targeter.h"
-
#include "mongo/base/counter.h"
#include "mongo/db/commands/server_status_metric.h"
+#include "mongo/db/curop.h"
#include "mongo/db/matcher/extensions_callback_noop.h"
+#include "mongo/db/pipeline/pipeline.h"
+#include "mongo/db/pipeline/pipeline_d.h"
+#include "mongo/db/pipeline/process_interface/mongos_process_interface.h"
#include "mongo/db/query/canonical_query.h"
#include "mongo/db/query/collation/collation_index_key.h"
+#include "mongo/db/query/collation/collator_factory_interface.h"
+#include "mongo/executor/task_executor_pool.h"
#include "mongo/logv2/log.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/cluster_commands_helpers.h"
#include "mongo/s/database_version_helpers.h"
#include "mongo/s/grid.h"
#include "mongo/s/shard_key_pattern.h"
+#include "mongo/s/write_ops/chunk_manager_targeter.h"
+#include "mongo/util/intrusive_counter.h"
#include "mongo/util/str.h"
+#include "signal.h"
namespace mongo {
namespace {
@@ -347,6 +354,51 @@ bool wasMetadataRefreshed(const std::shared_ptr<ChunkManager>& managerA,
return false;
}
+/**
+ * Makes an expression context suitable for canonicalization of queries that contain let parameters
+ * and runtimeConstants on mongos.
+ */
+auto makeExpressionContextWithDefaultsForTargeter(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& collation,
+ const boost::optional<ExplainOptions::Verbosity>& verbosity,
+ const boost::optional<BSONObj>& letParameters,
+ const boost::optional<RuntimeConstants>& runtimeConstants) {
+
+ auto&& cif = [&]() {
+ if (collation.isEmpty()) {
+ return std::unique_ptr<CollatorInterface>{};
+ } else {
+ return uassertStatusOK(
+ CollatorFactoryInterface::get(opCtx->getServiceContext())->makeFromBSON(collation));
+ }
+ }();
+
+ StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces;
+ resolvedNamespaces.emplace(nss.coll(),
+ ExpressionContext::ResolvedNamespace(nss, std::vector<BSONObj>{}));
+
+ return make_intrusive<ExpressionContext>(
+ opCtx,
+ verbosity,
+ true, // fromMongos
+ false, // needs merge
+ false, // disk use is banned on mongos
+ true, // bypass document validation, mongos isn't a storage node
+ false, // not mapReduce
+ nss,
+ runtimeConstants,
+ std::move(cif),
+ std::make_shared<MongosProcessInterface>(
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor()),
+ std::move(resolvedNamespaces),
+ boost::none, // collection uuid
+ letParameters,
+ false // mongos has no profile collection
+ );
+}
+
} // namespace
ChunkManagerTargeter::ChunkManagerTargeter(OperationContext* opCtx,
@@ -398,8 +450,8 @@ ShardEndpoint ChunkManagerTargeter::targetInsert(OperationContext* opCtx,
_routingInfo->db().databaseVersion());
}
-std::vector<ShardEndpoint> ChunkManagerTargeter::targetUpdate(
- OperationContext* opCtx, const write_ops::UpdateOpEntry& updateOp) const {
+std::vector<ShardEndpoint> ChunkManagerTargeter::targetUpdate(OperationContext* opCtx,
+ const BatchItemRef& itemRef) const {
// If the update is replacement-style:
// 1. Attempt to target using the query. If this fails, AND the query targets more than one
// shard,
@@ -413,6 +465,7 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetUpdate(
// as if the the shard key values are specified as NULL. A replacement document is also allowed
// to have a missing '_id', and if the '_id' exists in the query, it will be emplaced in the
// replacement document for targeting purposes.
+ const auto& updateOp = itemRef.getUpdate();
const auto updateType = getUpdateExprType(updateOp);
// If the collection is not sharded, forward the update to the primary shard.
@@ -450,7 +503,13 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetUpdate(
// We first try to target based on the update's query. It is always valid to forward any update
// or upsert to a single shard, so return immediately if we are able to target a single shard.
- auto endPoints = uassertStatusOK(_targetQuery(opCtx, query, collation));
+ auto expCtx = makeExpressionContextWithDefaultsForTargeter(opCtx,
+ _nss,
+ collation,
+ boost::none, // explain
+ itemRef.getLet(),
+ itemRef.getRuntimeConstants());
+ auto endPoints = uassertStatusOK(_targetQuery(expCtx, query, collation));
if (endPoints.size() == 1) {
return endPoints;
}
@@ -484,10 +543,10 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetUpdate(
return endPoints;
}
-std::vector<ShardEndpoint> ChunkManagerTargeter::targetDelete(
- OperationContext* opCtx, const write_ops::DeleteOpEntry& deleteOp) const {
+std::vector<ShardEndpoint> ChunkManagerTargeter::targetDelete(OperationContext* opCtx,
+ const BatchItemRef& itemRef) const {
+ const auto& deleteOp = itemRef.getDelete();
BSONObj shardKey;
-
if (_routingInfo->cm()) {
// Sharded collections have the following further requirements for targeting:
//
@@ -515,7 +574,12 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetDelete(
if (!collation.isEmpty()) {
qr->setCollation(collation);
}
- const boost::intrusive_ptr<ExpressionContext> expCtx;
+ auto expCtx = makeExpressionContextWithDefaultsForTargeter(opCtx,
+ _nss,
+ collation,
+ boost::none, // explain
+ itemRef.getLet(),
+ itemRef.getRuntimeConstants());
auto cq = uassertStatusOKWithContext(
CanonicalQuery::canonicalize(opCtx,
std::move(qr),
@@ -534,11 +598,13 @@ std::vector<ShardEndpoint> ChunkManagerTargeter::targetDelete(
!_routingInfo->cm() || deleteOp.getMulti() ||
isExactIdQuery(opCtx, *cq, _routingInfo->cm().get()));
- return uassertStatusOK(_targetQuery(opCtx, deleteOp.getQ(), collation));
+ return uassertStatusOK(_targetQuery(expCtx, deleteOp.getQ(), collation));
}
StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::_targetQuery(
- OperationContext* opCtx, const BSONObj& query, const BSONObj& collation) const {
+ boost::intrusive_ptr<ExpressionContext> expCtx,
+ const BSONObj& query,
+ const BSONObj& collation) const {
if (!_routingInfo->cm()) {
return std::vector<ShardEndpoint>{{_routingInfo->db().primaryId(),
ChunkVersion::UNSHARDED(),
@@ -547,7 +613,7 @@ StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::_targetQuery(
std::set<ShardId> shardIds;
try {
- _routingInfo->cm()->getShardIdsForQuery(opCtx, query, collation, &shardIds);
+ _routingInfo->cm()->getShardIdsForQuery(expCtx, query, collation, &shardIds);
} catch (const DBException& ex) {
return ex.toStatus();
}
diff --git a/src/mongo/s/write_ops/chunk_manager_targeter.h b/src/mongo/s/write_ops/chunk_manager_targeter.h
index d280d6383b9..1e62bc2eeb5 100644
--- a/src/mongo/s/write_ops/chunk_manager_targeter.h
+++ b/src/mongo/s/write_ops/chunk_manager_targeter.h
@@ -37,6 +37,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/ns_targeter.h"
+#include "mongo/s/write_ops/batched_command_request.h"
namespace mongo {
@@ -75,11 +76,11 @@ public:
ShardEndpoint targetInsert(OperationContext* opCtx, const BSONObj& doc) const override;
- std::vector<ShardEndpoint> targetUpdate(
- OperationContext* opCtx, const write_ops::UpdateOpEntry& updateOp) const override;
+ std::vector<ShardEndpoint> targetUpdate(OperationContext* opCtx,
+ const BatchItemRef& itemRef) const override;
- std::vector<ShardEndpoint> targetDelete(
- OperationContext* opCtx, const write_ops::DeleteOpEntry& deleteOp) const override;
+ std::vector<ShardEndpoint> targetDelete(OperationContext* opCtx,
+ const BatchItemRef& itemRef) const override;
std::vector<ShardEndpoint> targetAllShards(OperationContext* opCtx) const override;
@@ -126,9 +127,10 @@ private:
*
* If 'collation' is empty, we use the collection default collation for targeting.
*/
- StatusWith<std::vector<ShardEndpoint>> _targetQuery(OperationContext* opCtx,
- const BSONObj& query,
- const BSONObj& collation) const;
+ StatusWith<std::vector<ShardEndpoint>> _targetQuery(
+ boost::intrusive_ptr<ExpressionContext> expCtx,
+ const BSONObj& query,
+ const BSONObj& collation) const;
/**
* Returns a ShardEndpoint for an exact shard key query.
diff --git a/src/mongo/s/write_ops/chunk_manager_targeter_test.cpp b/src/mongo/s/write_ops/chunk_manager_targeter_test.cpp
index 7da11f654f6..43751bd27cf 100644
--- a/src/mongo/s/write_ops/chunk_manager_targeter_test.cpp
+++ b/src/mongo/s/write_ops/chunk_manager_targeter_test.cpp
@@ -30,6 +30,7 @@
#include "mongo/platform/basic.h"
#include "mongo/db/hasher.h"
+#include "mongo/db/pipeline/expression_context_for_test.h"
#include "mongo/db/service_context_test_fixture.h"
#include "mongo/s/catalog_cache_test_fixture.h"
#include "mongo/s/session_catalog_router.h"
@@ -45,19 +46,23 @@ using unittest::assertGet;
const NamespaceString kNss("TestDB", "TestColl");
-write_ops::UpdateOpEntry buildUpdate(BSONObj query, BSONObj update, bool upsert) {
+auto buildUpdate(const NamespaceString& nss, BSONObj query, BSONObj update, bool upsert) {
+ write_ops::Update updateOp(nss);
write_ops::UpdateOpEntry entry;
entry.setQ(query);
entry.setU(update);
entry.setUpsert(upsert);
- return entry;
+ updateOp.setUpdates(std::vector{entry});
+ return BatchedCommandRequest{std::move(updateOp)};
}
-write_ops::DeleteOpEntry buildDelete(BSONObj query) {
+auto buildDelete(const NamespaceString& nss, BSONObj query) {
+ write_ops::Delete deleteOp(nss);
write_ops::DeleteOpEntry entry;
entry.setQ(query);
entry.setMulti(false);
- return entry;
+ deleteOp.setDeletes(std::vector{entry});
+ return BatchedCommandRequest{std::move(deleteOp)};
}
class ChunkManagerTargeterTest : public CatalogCacheTestFixture {
@@ -191,54 +196,56 @@ TEST_F(ChunkManagerTargeterTest, TargetUpdateWithRangePrefixHashedShardKey) {
splitPoints);
// When update targets using replacement object.
- auto res = cmTargeter.targetUpdate(
- operationContext(),
- buildUpdate(fromjson("{'a.b': {$gt : 2}}"), fromjson("{a: {b: -1}}"), false));
+ auto request =
+ buildUpdate(kNss, fromjson("{'a.b': {$gt : 2}}"), fromjson("{a: {b: -1}}"), false);
+ auto res = cmTargeter.targetUpdate(operationContext(), BatchItemRef(&request, 0));
ASSERT_EQUALS(res.size(), 1);
ASSERT_EQUALS(res[0].shardName, "2");
// When update targets using query.
- res = cmTargeter.targetUpdate(
- operationContext(),
- buildUpdate(fromjson("{$and: [{'a.b': {$gte : 0}}, {'a.b': {$lt: 99}}]}}"),
- fromjson("{$set: {p : 1}}"),
- false));
+ auto requestAndSet = buildUpdate(kNss,
+ fromjson("{$and: [{'a.b': {$gte : 0}}, {'a.b': {$lt: 99}}]}}"),
+ fromjson("{$set: {p : 1}}"),
+ false);
+ res = cmTargeter.targetUpdate(operationContext(), BatchItemRef(&requestAndSet, 0));
ASSERT_EQUALS(res.size(), 1);
ASSERT_EQUALS(res[0].shardName, "3");
- res = cmTargeter.targetUpdate(
- operationContext(),
- buildUpdate(fromjson("{'a.b': {$lt : -101}}"), fromjson("{a: {b: 111}}"), false));
+ auto requestLT =
+ buildUpdate(kNss, fromjson("{'a.b': {$lt : -101}}"), fromjson("{a: {b: 111}}"), false);
+ res = cmTargeter.targetUpdate(operationContext(), BatchItemRef(&requestLT, 0));
ASSERT_EQUALS(res.size(), 1);
ASSERT_EQUALS(res[0].shardName, "1");
// For op-style updates, query on _id gets targeted to all shards.
- res = cmTargeter.targetUpdate(
- operationContext(), buildUpdate(fromjson("{_id: 1}"), fromjson("{$set: {p: 111}}"), false));
+ auto requestOpUpdate =
+ buildUpdate(kNss, fromjson("{_id: 1}"), fromjson("{$set: {p: 111}}"), false);
+ res = cmTargeter.targetUpdate(operationContext(), BatchItemRef(&requestOpUpdate, 0));
ASSERT_EQUALS(res.size(), 5);
// For replacement style updates, query on _id uses replacement doc to target. If the
// replacement doc doesn't have shard key fields, then update should be routed to the shard
// holding 'null' shard key documents.
- res = cmTargeter.targetUpdate(operationContext(),
- buildUpdate(fromjson("{_id: 1}"), fromjson("{p: 111}}"), false));
+ auto requestReplUpdate = buildUpdate(kNss, fromjson("{_id: 1}"), fromjson("{p: 111}}"), false);
+ res = cmTargeter.targetUpdate(operationContext(), BatchItemRef(&requestReplUpdate, 0));
ASSERT_EQUALS(res.size(), 1);
ASSERT_EQUALS(res[0].shardName, "1");
// Upsert requires full shard key in query, even if the query can target a single shard.
+ auto requestFullKey = buildUpdate(kNss,
+ fromjson("{'a.b': 100, 'c.d' : {$exists: false}}}"),
+ fromjson("{a: {b: -111}}"),
+ true);
ASSERT_THROWS_CODE(
- cmTargeter.targetUpdate(operationContext(),
- buildUpdate(fromjson("{'a.b': 100, 'c.d' : {$exists: false}}}"),
- fromjson("{a: {b: -111}}"),
- true)),
+ cmTargeter.targetUpdate(operationContext(), BatchItemRef(&requestFullKey, 0)),
DBException,
ErrorCodes::ShardKeyNotFound);
// Upsert success case.
- res = cmTargeter.targetUpdate(
- operationContext(),
- buildUpdate(fromjson("{'a.b': 100, 'c.d': 'val'}"), fromjson("{a: {b: -111}}"), true));
+ auto requestSuccess =
+ buildUpdate(kNss, fromjson("{'a.b': 100, 'c.d': 'val'}"), fromjson("{a: {b: -111}}"), true);
+ res = cmTargeter.targetUpdate(operationContext(), BatchItemRef(&requestSuccess, 0));
ASSERT_EQUALS(res.size(), 1);
ASSERT_EQUALS(res[0].shardName, "4");
}
@@ -263,8 +270,8 @@ TEST_F(ChunkManagerTargeterTest, TargetUpdateWithHashedPrefixHashedShardKey) {
// Verify that the given document is being routed based on hashed value of 'i' in
// 'updateQueryObj'.
- const auto res = cmTargeter.targetUpdate(
- operationContext(), buildUpdate(updateQueryObj, fromjson("{$set: {p: 1}}"), false));
+ auto request = buildUpdate(kNss, updateQueryObj, fromjson("{$set: {p: 1}}"), false);
+ const auto res = cmTargeter.targetUpdate(operationContext(), BatchItemRef(&request, 0));
ASSERT_EQUALS(res.size(), 1);
ASSERT_EQUALS(res[0].shardName, findChunk(updateQueryObj["a"]["b"]).getShardId());
}
@@ -272,16 +279,15 @@ TEST_F(ChunkManagerTargeterTest, TargetUpdateWithHashedPrefixHashedShardKey) {
// Range queries on hashed field cannot be used for targeting. In this case, update will be
// targeted based on update document.
const auto updateObj = fromjson("{a: {b: -1}}");
- auto res = cmTargeter.targetUpdate(
- operationContext(), buildUpdate(fromjson("{'a.b': {$gt : 101}}"), updateObj, false));
+ auto requestUpdate = buildUpdate(kNss, fromjson("{'a.b': {$gt : 101}}"), updateObj, false);
+ auto res = cmTargeter.targetUpdate(operationContext(), BatchItemRef(&requestUpdate, 0));
ASSERT_EQUALS(res.size(), 1);
ASSERT_EQUALS(res[0].shardName, findChunk(updateObj["a"]["b"]).getShardId());
- ASSERT_THROWS_CODE(
- cmTargeter.targetUpdate(
- operationContext(),
- buildUpdate(fromjson("{'a.b': {$gt : 101}}"), fromjson("{$set: {p: 1}}"), false)),
- DBException,
- ErrorCodes::InvalidOptions);
+ auto requestErr =
+ buildUpdate(kNss, fromjson("{'a.b': {$gt : 101}}"), fromjson("{$set: {p: 1}}"), false);
+ ASSERT_THROWS_CODE(cmTargeter.targetUpdate(operationContext(), BatchItemRef(&requestErr, 0)),
+ DBException,
+ ErrorCodes::InvalidOptions);
}
TEST_F(ChunkManagerTargeterTest, TargetDeleteWithRangePrefixHashedShardKey) {
@@ -295,30 +301,33 @@ TEST_F(ChunkManagerTargeterTest, TargetDeleteWithRangePrefixHashedShardKey) {
splitPoints);
// Cannot delete without full shardkey in the query.
+ auto requestPartialKey = buildDelete(kNss, fromjson("{'a.b': {$gt : 2}}"));
ASSERT_THROWS_CODE(
- cmTargeter.targetDelete(operationContext(), buildDelete(fromjson("{'a.b': {$gt : 2}}"))),
+ cmTargeter.targetDelete(operationContext(), BatchItemRef(&requestPartialKey, 0)),
DBException,
ErrorCodes::ShardKeyNotFound);
+
+ auto requestPartialKey2 = buildDelete(kNss, fromjson("{'a.b': -101}"));
ASSERT_THROWS_CODE(
- cmTargeter.targetDelete(operationContext(), buildDelete(fromjson("{'a.b': -101}"))),
+ cmTargeter.targetDelete(operationContext(), BatchItemRef(&requestPartialKey2, 0)),
DBException,
ErrorCodes::ShardKeyNotFound);
// Delete targeted correctly with full shard key in query.
- auto res = cmTargeter.targetDelete(operationContext(),
- buildDelete(fromjson("{'a.b': -101, 'c.d': 5}")));
+ auto requestFullKey = buildDelete(kNss, fromjson("{'a.b': -101, 'c.d': 5}"));
+ auto res = cmTargeter.targetDelete(operationContext(), BatchItemRef(&requestFullKey, 0));
ASSERT_EQUALS(res.size(), 1);
ASSERT_EQUALS(res[0].shardName, "1");
// Query with MinKey value should go to chunk '0' because MinKey is smaller than BSONNULL.
- res = cmTargeter.targetDelete(
- operationContext(),
- buildDelete(BSONObjBuilder().appendMinKey("a.b").append("c.d", 4).obj()));
+ auto requestMinKey =
+ buildDelete(kNss, BSONObjBuilder().appendMinKey("a.b").append("c.d", 4).obj());
+ res = cmTargeter.targetDelete(operationContext(), BatchItemRef(&requestMinKey, 0));
ASSERT_EQUALS(res.size(), 1);
ASSERT_EQUALS(res[0].shardName, "0");
- res =
- cmTargeter.targetDelete(operationContext(), buildDelete(fromjson("{'a.b': 0, 'c.d': 5}")));
+ auto requestMinKey2 = buildDelete(kNss, fromjson("{'a.b': 0, 'c.d': 5}"));
+ res = cmTargeter.targetDelete(operationContext(), BatchItemRef(&requestMinKey2, 0));
ASSERT_EQUALS(res.size(), 1);
ASSERT_EQUALS(res[0].shardName, "3");
}
@@ -342,16 +351,17 @@ TEST_F(ChunkManagerTargeterTest, TargetDeleteWithHashedPrefixHashedShardKey) {
auto queryObj = BSON("a" << BSON("b" << i) << "c" << BSON("d" << 10));
// Verify that the given document is being routed based on hashed value of 'i' in
// 'queryObj'.
- const auto res = cmTargeter.targetDelete(operationContext(), buildDelete(queryObj));
+ auto request = buildDelete(kNss, queryObj);
+ const auto res = cmTargeter.targetDelete(operationContext(), BatchItemRef(&request, 0));
ASSERT_EQUALS(res.size(), 1);
ASSERT_EQUALS(res[0].shardName, findChunk(queryObj["a"]["b"]).getShardId());
}
// Range queries on hashed field cannot be used for targeting.
- ASSERT_THROWS_CODE(
- cmTargeter.targetDelete(operationContext(), buildDelete(fromjson("{'a.b': {$gt : 101}}"))),
- DBException,
- ErrorCodes::ShardKeyNotFound);
+ auto request = buildDelete(kNss, fromjson("{'a.b': {$gt : 101}}"));
+ ASSERT_THROWS_CODE(cmTargeter.targetDelete(operationContext(), BatchItemRef(&request, 0)),
+ DBException,
+ ErrorCodes::ShardKeyNotFound);
}
} // namespace
diff --git a/src/mongo/s/write_ops/mock_ns_targeter.h b/src/mongo/s/write_ops/mock_ns_targeter.h
index ec6fb892834..4cde0765dd1 100644
--- a/src/mongo/s/write_ops/mock_ns_targeter.h
+++ b/src/mongo/s/write_ops/mock_ns_targeter.h
@@ -74,18 +74,18 @@ public:
* Returns the first ShardEndpoint for the query from the mock ranges. Only can handle
* queries of the form { field : { $gte : <value>, $lt : <value> } }.
*/
- std::vector<ShardEndpoint> targetUpdate(
- OperationContext* opCtx, const write_ops::UpdateOpEntry& updateOp) const override {
- return _targetQuery(updateOp.getQ());
+ std::vector<ShardEndpoint> targetUpdate(OperationContext* opCtx,
+ const BatchItemRef& itemRef) const override {
+ return _targetQuery(itemRef.getUpdate().getQ());
}
/**
* Returns the first ShardEndpoint for the query from the mock ranges. Only can handle
* queries of the form { field : { $gte : <value>, $lt : <value> } }.
*/
- std::vector<ShardEndpoint> targetDelete(
- OperationContext* opCtx, const write_ops::DeleteOpEntry& deleteOp) const override {
- return _targetQuery(deleteOp.getQ());
+ std::vector<ShardEndpoint> targetDelete(OperationContext* opCtx,
+ const BatchItemRef& itemRef) const override {
+ return _targetQuery(itemRef.getDelete().getQ());
}
std::vector<ShardEndpoint> targetAllShards(OperationContext* opCtx) const override {
diff --git a/src/mongo/s/write_ops/write_op.cpp b/src/mongo/s/write_ops/write_op.cpp
index 85501f2aedf..31d5e0ce31d 100644
--- a/src/mongo/s/write_ops/write_op.cpp
+++ b/src/mongo/s/write_ops/write_op.cpp
@@ -59,9 +59,9 @@ void WriteOp::targetWrites(OperationContext* opCtx,
if (_itemRef.getOpType() == BatchedCommandRequest::BatchType_Insert) {
return std::vector{targeter.targetInsert(opCtx, _itemRef.getDocument())};
} else if (_itemRef.getOpType() == BatchedCommandRequest::BatchType_Update) {
- return targeter.targetUpdate(opCtx, _itemRef.getUpdate());
+ return targeter.targetUpdate(opCtx, _itemRef);
} else if (_itemRef.getOpType() == BatchedCommandRequest::BatchType_Delete) {
- return targeter.targetDelete(opCtx, _itemRef.getDelete());
+ return targeter.targetDelete(opCtx, _itemRef);
}
MONGO_UNREACHABLE;
}();