diff options
author | Denis Grebennicov <denis.grebennicov@mongodb.com> | 2022-11-24 12:13:35 +0100 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-11-24 11:50:14 +0000 |
commit | 4d9fe52b55759d11cca7a93dc5f85126dafae794 (patch) | |
tree | ea211e27d0ce2e450fa768ff4f3579c53929cbca | |
parent | 7db13d14754fadd9102f9ea2386241def8ac1bbf (diff) | |
download | mongo-4d9fe52b55759d11cca7a93dc5f85126dafae794.tar.gz |
SERVER-68686 Add new $_internalOwningShard agg expression
-rw-r--r-- | jstests/sharding/query/owning_shard_expression.js | 133 | ||||
-rw-r--r-- | src/mongo/db/pipeline/abt/agg_expression_visitor.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/pipeline/expression_dependencies.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/pipeline/expression_visitor.h | 5 | ||||
-rw-r--r-- | src/mongo/db/query/cqf_command_utils.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/query/sbe_stage_builder_expression.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_expressions.cpp | 101 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_expressions.h | 68 | ||||
-rw-r--r-- | src/mongo/db/service_entry_point_common.cpp | 15 |
10 files changed, 336 insertions, 2 deletions
diff --git a/jstests/sharding/query/owning_shard_expression.js b/jstests/sharding/query/owning_shard_expression.js new file mode 100644 index 00000000000..5bbd7721196 --- /dev/null +++ b/jstests/sharding/query/owning_shard_expression.js @@ -0,0 +1,133 @@ +/** + * Tests that $_internalOwningShard expression correctly computes the shard id the document belongs + * to, while executing on mongod. + * + * @tags: [requires_fcv_63] + */ +(function() { +"use strict"; + +load("jstests/sharding/libs/create_sharded_collection_util.js"); + +const st = new ShardingTest({ + mongos: 1, + config: 1, + shards: 3, +}); +const mongos = st.s; +const dbName = jsTestName(); +const db = st.getDB(dbName); +const sourceColl = db["source"]; +const destinationColl = db["destination"]; + +const shard0 = st.rs0; +const shard1 = st.rs1; +const shard2 = st.rs2; + +// Retrieves the current shard version for the 'destinationColl' and returns the ShardVersion +// object. +function getCurrentShardVersion() { + const shardVersionResult = assert.commandWorked(destinationColl.getShardVersion()); + return { + v: shardVersionResult.version, + e: shardVersionResult.versionEpoch, + t: shardVersionResult.versionTimestamp, + }; +} + +// Returns a projection stage with the $_internalOwningShard expression. +function buildProjectionStageWithOwningShardExpression(shardVersion) { + return { + $project: { + _id: 0, + shard: { + $_internalOwningShard: { + shardKeyVal: {_id: "$_id"}, + ns: destinationColl.getFullName(), + shardVersion: shardVersion, + }, + }, + indexData: "$$ROOT", + } + }; +} + +// Asserts that $_internalOwningShard expression correctly computes the shard id. +function assertOwningShardExpressionResults(shardVersion, expectedResult) { + const projectionStage = buildProjectionStageWithOwningShardExpression(shardVersion); + assert.eq(sourceColl.aggregate([projectionStage, {$sort: {"indexData._id": 1}}]).toArray(), + expectedResult); +} + +// Asserts that $_internalOwningShard expression fails when routing information is stale. +function assertOwningShardExpressionFailure(shardVersion) { + const projectionStage = buildProjectionStageWithOwningShardExpression(shardVersion); + assert.commandFailedWithCode( + db.runCommand({ + aggregate: sourceColl.getName(), + pipeline: [projectionStage, {$sort: {"indexData._id": 1}}], + cursor: {} + }), + [ErrorCodes.StaleConfig, ErrorCodes.ShardCannotRefreshDueToLocksHeld]); + + // Assert the expression fails while executing on the mongos. + assert.commandFailedWithCode(db.runCommand({ + aggregate: sourceColl.getName(), + pipeline: [{$sort: {_id: 1}}, projectionStage], + cursor: {} + }), + 6868600); +} + +// Create a sharded source collection with the shard key on '_id' attribute and two chunks. +CreateShardedCollectionUtil.shardCollectionWithChunks(sourceColl, {_id: 1}, [ + {min: {_id: MinKey}, max: {_id: 50}, shard: st.shard2.shardName}, + {min: {_id: 50}, max: {_id: MaxKey}, shard: st.shard0.shardName}, +]); + +// Insert some data. +const documentOnShard0 = { + _id: 1 +}; +const documentOnShard1 = { + _id: 50 +}; +const documentOnShard2 = { + _id: 100 +}; +assert.commandWorked(sourceColl.insert(documentOnShard0)); +assert.commandWorked(sourceColl.insert(documentOnShard1)); +assert.commandWorked(sourceColl.insert(documentOnShard2)); + +// Create a sharded destination collection with the shard key on '_id' attribute and three chunks. +CreateShardedCollectionUtil.shardCollectionWithChunks(destinationColl, {_id: 1}, [ + {min: {_id: MinKey}, max: {_id: 33}, shard: st.shard0.shardName}, + {min: {_id: 33}, max: {_id: 66}, shard: st.shard1.shardName}, + {min: {_id: 66}, max: {_id: MaxKey}, shard: st.shard2.shardName}, +]); +const expectedResult = [ + {shard: `${dbName}-rs0`, indexData: documentOnShard0}, + {shard: `${dbName}-rs1`, indexData: documentOnShard1}, + {shard: `${dbName}-rs2`, indexData: documentOnShard2}, +]; + +// Assert that every document belongs to a different shard. +const shardVersion = getCurrentShardVersion(); +assertOwningShardExpressionResults(shardVersion, expectedResult); + +// Flush the router config and assert that every document still belongs to the different shard. +[shard0, shard1, shard2].forEach(function(shard) { + shard.nodes.forEach(function(node) { + assert.commandWorked(node.adminCommand({flushRouterConfig: destinationColl.getFullName()})); + }); +}); +assertOwningShardExpressionResults(shardVersion, expectedResult); + +// Assert that $_internalOwningShard expression will fail when routing information is stale. This is +// simulated by providing a sharding version with a timestamp from the future. +const futureShardVersion = + Object.assign({}, shardVersion, {t: new Timestamp(Math.pow(2, 32) - 1, 0)}); +assertOwningShardExpressionFailure(futureShardVersion); + +st.stop(); +})(); diff --git a/src/mongo/db/pipeline/abt/agg_expression_visitor.cpp b/src/mongo/db/pipeline/abt/agg_expression_visitor.cpp index c09c0d6b0ef..5f5948e37dd 100644 --- a/src/mongo/db/pipeline/abt/agg_expression_visitor.cpp +++ b/src/mongo/db/pipeline/abt/agg_expression_visitor.cpp @@ -764,6 +764,10 @@ public: unsupportedExpression("tsIncrement"); } + void visit(const ExpressionInternalOwningShard* expr) override final { + unsupportedExpression("$_internalOwningShard"); + } + private: /** * Shared logic for $and, $or. Converts each child into an EExpression that evaluates to Boolean diff --git a/src/mongo/db/pipeline/expression_dependencies.cpp b/src/mongo/db/pipeline/expression_dependencies.cpp index cd2927025a3..51455c3a7ae 100644 --- a/src/mongo/db/pipeline/expression_dependencies.cpp +++ b/src/mongo/db/pipeline/expression_dependencies.cpp @@ -181,6 +181,7 @@ public: void visit(const ExpressionObject*) {} void visit(const ExpressionInternalFLEEqual*) {} void visit(const ExpressionInternalFLEBetween*) {} + void visit(const ExpressionInternalOwningShard*) {} }; class DependencyVisitor : public DefaultDependencyVisitor { diff --git a/src/mongo/db/pipeline/expression_visitor.h b/src/mongo/db/pipeline/expression_visitor.h index 1c97b82ece5..9ff454f9088 100644 --- a/src/mongo/db/pipeline/expression_visitor.h +++ b/src/mongo/db/pipeline/expression_visitor.h @@ -29,6 +29,7 @@ #pragma once +#include "mongo/db/pipeline/expression.h" #include "mongo/platform/basic.h" #include "mongo/db/pipeline/expression_walker.h" @@ -156,6 +157,7 @@ class ExpressionInternalFindElemMatch; class ExpressionInternalFLEBetween; class ExpressionInternalFLEEqual; class ExpressionInternalJsEmit; +class ExpressionInternalOwningShard; class ExpressionFunction; class ExpressionDegreesToRadians; class ExpressionRadiansToDegrees; @@ -367,6 +369,8 @@ public: virtual void visit(expression_walker::MaybeConstPtr<IsConst, ExpressionSetField>) = 0; virtual void visit(expression_walker::MaybeConstPtr<IsConst, ExpressionTsSecond>) = 0; virtual void visit(expression_walker::MaybeConstPtr<IsConst, ExpressionTsIncrement>) = 0; + virtual void visit( + expression_walker::MaybeConstPtr<IsConst, ExpressionInternalOwningShard>) = 0; }; using ExpressionMutableVisitor = ExpressionVisitor<false>; @@ -528,5 +532,6 @@ struct SelectiveConstExpressionVisitorBase : public ExpressionConstVisitor { void visit(const ExpressionSetField*) override {} void visit(const ExpressionTsSecond*) override {} void visit(const ExpressionTsIncrement*) override {} + void visit(const ExpressionInternalOwningShard*) override {} }; } // namespace mongo diff --git a/src/mongo/db/query/cqf_command_utils.cpp b/src/mongo/db/query/cqf_command_utils.cpp index 12e4080cded..2e7603514e2 100644 --- a/src/mongo/db/query/cqf_command_utils.cpp +++ b/src/mongo/db/query/cqf_command_utils.cpp @@ -926,6 +926,10 @@ public: unsupportedExpression(); } + void visit(const ExpressionInternalOwningShard* expr) override final { + unsupportedExpression(); + } + private: void unsupportedExpression() { _eligible = false; diff --git a/src/mongo/db/query/sbe_stage_builder_expression.cpp b/src/mongo/db/query/sbe_stage_builder_expression.cpp index 118f8f8c4fe..985349d6d8f 100644 --- a/src/mongo/db/query/sbe_stage_builder_expression.cpp +++ b/src/mongo/db/query/sbe_stage_builder_expression.cpp @@ -474,6 +474,7 @@ public: void visit(const ExpressionSetField* expr) final {} void visit(const ExpressionTsSecond* expr) final {} void visit(const ExpressionTsIncrement* expr) final {} + void visit(const ExpressionInternalOwningShard* expr) final {} private: void visitMultiBranchLogicExpression(const Expression* expr, sbe::EPrimBinary::Op logicOp) { @@ -708,6 +709,7 @@ public: void visit(const ExpressionSetField* expr) final {} void visit(const ExpressionTsSecond* expr) final {} void visit(const ExpressionTsIncrement* expr) final {} + void visit(const ExpressionInternalOwningShard* expr) final {} private: void visitMultiBranchLogicExpression(const Expression* expr, sbe::EPrimBinary::Op logicOp) { @@ -3243,6 +3245,10 @@ public: _context->pushExpr(std::move(tsIncrementExpr)); } + void visit(const ExpressionInternalOwningShard* expr) final { + unsupportedExpression("$_internalOwningShard"); + } + private: /** * Shared logic for $and, $or. Converts each child into an EExpression that evaluates to Boolean diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 9e6f8d84f01..a97c5dabee0 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -20,6 +20,7 @@ env.Library( 'range_deletion_task.idl', 'shard_key_index_util.cpp', 'sharding_api_d_params.idl', + 'sharding_expressions.cpp', 'sharding_migration_critical_section.cpp', 'sharding_state.cpp', 'sharding_statistics.cpp', diff --git a/src/mongo/db/s/sharding_expressions.cpp b/src/mongo/db/s/sharding_expressions.cpp new file mode 100644 index 00000000000..595bc1b577f --- /dev/null +++ b/src/mongo/db/s/sharding_expressions.cpp @@ -0,0 +1,101 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/sharding_expressions.h" + +#include "mongo/db/exec/document_value/document.h" +#include "mongo/db/exec/document_value/value.h" +#include "mongo/db/pipeline/expression.h" +#include "mongo/db/pipeline/expression_context.h" +#include "mongo/db/pipeline/expression_visitor.h" +#include "mongo/db/pipeline/variables.h" +#include "mongo/db/s/sharding_state.h" +#include "mongo/s/grid.h" +#include "mongo/s/is_mongos.h" + + +namespace mongo { + +Value ExpressionInternalOwningShard::evaluate(const Document& root, Variables* variables) const { + // TODO SERVER-71519: Add support for handling stale exception from mongos with + // enableFinerGrainedCatalogCacheRefresh. + uassert(6868600, "$_internalOwningShard is currently not supported on mongos", !isMongos()); + + Value input = _children[0]->evaluate(root, variables); + if (input.nullish()) { + return Value(BSONNULL); + } + + // Retrieve the values from the incoming document. + NamespaceString ns(getExpressionContext()->ns.tenantId(), input["ns"_sd].getStringData()); + const auto shardVersionObj = input["shardVersion"_sd].getDocument().toBson(); + const auto shardVersion = ShardVersion::parse(BSON("" << shardVersionObj).firstElement()); + const auto shardKeyVal = input["shardKeyVal"_sd].getDocument().toBson(); + + // Get the 'chunkManager' from the catalog cache. + auto opCtx = getExpressionContext()->opCtx; + const auto catalogCache = Grid::get(opCtx)->catalogCache(); + uassert(6868602, + "$_internalOwningShard expression only makes sense in sharded environment", + catalogCache); + + // Setting 'allowLocks' to true when evaluating on mongod, as otherwise an invariant is thrown. + // We can safely set it to true as there is no risk of deadlock, because the code still throws + // when a refresh would actually need to take place. + const auto chunkManager = + uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, ns, true /* allowLocks */)) + .cm; + + // Invalidate catalog cache if the chunk manager version is stale. + if (chunkManager.getVersion().isOlderThan(shardVersion.placementVersion())) { + boost::optional<CollectionIndexes> collIndexes; + ShardVersion currentShardVersion(chunkManager.getVersion(), collIndexes); + uasserted(StaleConfigInfo(ns, + currentShardVersion, + boost::none /* wanted */, + ShardingState::get(opCtx)->shardId()), + str::stream() + << "Sharding information of collection " << ns + << " is currently stale and needs to be recovered from the config server"); + } + + // Retrieve the shard id for the given shard key value. + std::set<ShardId> shardIds; + chunkManager.getShardIdsForRange(shardKeyVal, shardKeyVal, &shardIds); + uassert(6868601, "The value should belong to exactly one ShardId", shardIds.size() == 1); + const auto shardId = *(shardIds.begin()); + return Value(shardId.toString()); +} + +REGISTER_STABLE_EXPRESSION(_internalOwningShard, ExpressionInternalOwningShard::parse); + +}; // namespace mongo diff --git a/src/mongo/db/s/sharding_expressions.h b/src/mongo/db/s/sharding_expressions.h new file mode 100644 index 00000000000..93ad733b55a --- /dev/null +++ b/src/mongo/db/s/sharding_expressions.h @@ -0,0 +1,68 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/platform/basic.h" + +#include "mongo/db/exec/document_value/document.h" +#include "mongo/db/exec/document_value/value.h" +#include "mongo/db/pipeline/expression.h" +#include "mongo/db/pipeline/expression_context.h" +#include "mongo/db/pipeline/expression_visitor.h" +#include "mongo/db/pipeline/variables.h" + +namespace mongo { + +class ExpressionInternalOwningShard final + : public ExpressionFixedArity<ExpressionInternalOwningShard, 1> { +public: + static constexpr const char* const opName = "$_internalOwningShard"; + + ExpressionInternalOwningShard(ExpressionContext* const expCtx) + : ExpressionFixedArity<ExpressionInternalOwningShard, 1>(expCtx) { + expCtx->sbeCompatible = false; + } + + Value evaluate(const Document& root, Variables* variables) const final; + + const char* getOpName() const final { + return opName; + } + + void acceptVisitor(ExpressionMutableVisitor* visitor) final { + return visitor->visit(this); + } + + void acceptVisitor(ExpressionConstVisitor* visitor) const final { + return visitor->visit(this); + } +}; + +}; // namespace mongo diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index 84f46d900dd..84444a22f43 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -1849,7 +1849,13 @@ Future<void> ExecCommandDatabase::_commandExec() { const auto refreshed = _execContext->behaviors->refreshCollection(opCtx, *sce); if (refreshed) { _refreshedCollection = true; - if (!opCtx->isContinuingMultiDocumentTransaction() && !inCriticalSection) { + + // Can not rerun the command when executing a GetMore command as the cursor + // is already lost. + const auto isRunningGetMoreCmd = + _execContext->getCommand()->getName() == "getMore"; + if (!opCtx->isContinuingMultiDocumentTransaction() && !inCriticalSection && + !isRunningGetMoreCmd) { _resetLockerStateAfterShardingUpdate(opCtx); return _commandExec(); } @@ -1876,7 +1882,12 @@ Future<void> ExecCommandDatabase::_commandExec() { if (refreshed) { _refreshedCatalogCache = true; - if (!opCtx->isContinuingMultiDocumentTransaction()) { + + // Can not rerun the command when executing a GetMore command as the cursor is + // already lost. + const auto isRunningGetMoreCmd = + _execContext->getCommand()->getName() == "getMore"; + if (!opCtx->isContinuingMultiDocumentTransaction() && !isRunningGetMoreCmd) { _resetLockerStateAfterShardingUpdate(opCtx); return _commandExec(); } |