summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDenis Grebennicov <denis.grebennicov@mongodb.com>2022-11-24 12:13:35 +0100
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-11-24 11:50:14 +0000
commit4d9fe52b55759d11cca7a93dc5f85126dafae794 (patch)
treeea211e27d0ce2e450fa768ff4f3579c53929cbca
parent7db13d14754fadd9102f9ea2386241def8ac1bbf (diff)
downloadmongo-4d9fe52b55759d11cca7a93dc5f85126dafae794.tar.gz
SERVER-68686 Add new $_internalOwningShard agg expression
-rw-r--r--jstests/sharding/query/owning_shard_expression.js133
-rw-r--r--src/mongo/db/pipeline/abt/agg_expression_visitor.cpp4
-rw-r--r--src/mongo/db/pipeline/expression_dependencies.cpp1
-rw-r--r--src/mongo/db/pipeline/expression_visitor.h5
-rw-r--r--src/mongo/db/query/cqf_command_utils.cpp4
-rw-r--r--src/mongo/db/query/sbe_stage_builder_expression.cpp6
-rw-r--r--src/mongo/db/s/SConscript1
-rw-r--r--src/mongo/db/s/sharding_expressions.cpp101
-rw-r--r--src/mongo/db/s/sharding_expressions.h68
-rw-r--r--src/mongo/db/service_entry_point_common.cpp15
10 files changed, 336 insertions, 2 deletions
diff --git a/jstests/sharding/query/owning_shard_expression.js b/jstests/sharding/query/owning_shard_expression.js
new file mode 100644
index 00000000000..5bbd7721196
--- /dev/null
+++ b/jstests/sharding/query/owning_shard_expression.js
@@ -0,0 +1,133 @@
+/**
+ * Tests that $_internalOwningShard expression correctly computes the shard id the document belongs
+ * to, while executing on mongod.
+ *
+ * @tags: [requires_fcv_63]
+ */
+(function() {
+"use strict";
+
+load("jstests/sharding/libs/create_sharded_collection_util.js");
+
+const st = new ShardingTest({
+ mongos: 1,
+ config: 1,
+ shards: 3,
+});
+const mongos = st.s;
+const dbName = jsTestName();
+const db = st.getDB(dbName);
+const sourceColl = db["source"];
+const destinationColl = db["destination"];
+
+const shard0 = st.rs0;
+const shard1 = st.rs1;
+const shard2 = st.rs2;
+
+// Retrieves the current shard version for the 'destinationColl' and returns the ShardVersion
+// object.
+function getCurrentShardVersion() {
+ const shardVersionResult = assert.commandWorked(destinationColl.getShardVersion());
+ return {
+ v: shardVersionResult.version,
+ e: shardVersionResult.versionEpoch,
+ t: shardVersionResult.versionTimestamp,
+ };
+}
+
+// Returns a projection stage with the $_internalOwningShard expression.
+function buildProjectionStageWithOwningShardExpression(shardVersion) {
+ return {
+ $project: {
+ _id: 0,
+ shard: {
+ $_internalOwningShard: {
+ shardKeyVal: {_id: "$_id"},
+ ns: destinationColl.getFullName(),
+ shardVersion: shardVersion,
+ },
+ },
+ indexData: "$$ROOT",
+ }
+ };
+}
+
+// Asserts that $_internalOwningShard expression correctly computes the shard id.
+function assertOwningShardExpressionResults(shardVersion, expectedResult) {
+ const projectionStage = buildProjectionStageWithOwningShardExpression(shardVersion);
+ assert.eq(sourceColl.aggregate([projectionStage, {$sort: {"indexData._id": 1}}]).toArray(),
+ expectedResult);
+}
+
+// Asserts that $_internalOwningShard expression fails when routing information is stale.
+function assertOwningShardExpressionFailure(shardVersion) {
+ const projectionStage = buildProjectionStageWithOwningShardExpression(shardVersion);
+ assert.commandFailedWithCode(
+ db.runCommand({
+ aggregate: sourceColl.getName(),
+ pipeline: [projectionStage, {$sort: {"indexData._id": 1}}],
+ cursor: {}
+ }),
+ [ErrorCodes.StaleConfig, ErrorCodes.ShardCannotRefreshDueToLocksHeld]);
+
+ // Assert the expression fails while executing on the mongos.
+ assert.commandFailedWithCode(db.runCommand({
+ aggregate: sourceColl.getName(),
+ pipeline: [{$sort: {_id: 1}}, projectionStage],
+ cursor: {}
+ }),
+ 6868600);
+}
+
+// Create a sharded source collection with the shard key on '_id' attribute and two chunks.
+CreateShardedCollectionUtil.shardCollectionWithChunks(sourceColl, {_id: 1}, [
+ {min: {_id: MinKey}, max: {_id: 50}, shard: st.shard2.shardName},
+ {min: {_id: 50}, max: {_id: MaxKey}, shard: st.shard0.shardName},
+]);
+
+// Insert some data.
+const documentOnShard0 = {
+ _id: 1
+};
+const documentOnShard1 = {
+ _id: 50
+};
+const documentOnShard2 = {
+ _id: 100
+};
+assert.commandWorked(sourceColl.insert(documentOnShard0));
+assert.commandWorked(sourceColl.insert(documentOnShard1));
+assert.commandWorked(sourceColl.insert(documentOnShard2));
+
+// Create a sharded destination collection with the shard key on '_id' attribute and three chunks.
+CreateShardedCollectionUtil.shardCollectionWithChunks(destinationColl, {_id: 1}, [
+ {min: {_id: MinKey}, max: {_id: 33}, shard: st.shard0.shardName},
+ {min: {_id: 33}, max: {_id: 66}, shard: st.shard1.shardName},
+ {min: {_id: 66}, max: {_id: MaxKey}, shard: st.shard2.shardName},
+]);
+const expectedResult = [
+ {shard: `${dbName}-rs0`, indexData: documentOnShard0},
+ {shard: `${dbName}-rs1`, indexData: documentOnShard1},
+ {shard: `${dbName}-rs2`, indexData: documentOnShard2},
+];
+
+// Assert that every document belongs to a different shard.
+const shardVersion = getCurrentShardVersion();
+assertOwningShardExpressionResults(shardVersion, expectedResult);
+
+// Flush the router config and assert that every document still belongs to the different shard.
+[shard0, shard1, shard2].forEach(function(shard) {
+ shard.nodes.forEach(function(node) {
+ assert.commandWorked(node.adminCommand({flushRouterConfig: destinationColl.getFullName()}));
+ });
+});
+assertOwningShardExpressionResults(shardVersion, expectedResult);
+
+// Assert that $_internalOwningShard expression will fail when routing information is stale. This is
+// simulated by providing a sharding version with a timestamp from the future.
+const futureShardVersion =
+ Object.assign({}, shardVersion, {t: new Timestamp(Math.pow(2, 32) - 1, 0)});
+assertOwningShardExpressionFailure(futureShardVersion);
+
+st.stop();
+})();
diff --git a/src/mongo/db/pipeline/abt/agg_expression_visitor.cpp b/src/mongo/db/pipeline/abt/agg_expression_visitor.cpp
index c09c0d6b0ef..5f5948e37dd 100644
--- a/src/mongo/db/pipeline/abt/agg_expression_visitor.cpp
+++ b/src/mongo/db/pipeline/abt/agg_expression_visitor.cpp
@@ -764,6 +764,10 @@ public:
unsupportedExpression("tsIncrement");
}
+ void visit(const ExpressionInternalOwningShard* expr) override final {
+ unsupportedExpression("$_internalOwningShard");
+ }
+
private:
/**
* Shared logic for $and, $or. Converts each child into an EExpression that evaluates to Boolean
diff --git a/src/mongo/db/pipeline/expression_dependencies.cpp b/src/mongo/db/pipeline/expression_dependencies.cpp
index cd2927025a3..51455c3a7ae 100644
--- a/src/mongo/db/pipeline/expression_dependencies.cpp
+++ b/src/mongo/db/pipeline/expression_dependencies.cpp
@@ -181,6 +181,7 @@ public:
void visit(const ExpressionObject*) {}
void visit(const ExpressionInternalFLEEqual*) {}
void visit(const ExpressionInternalFLEBetween*) {}
+ void visit(const ExpressionInternalOwningShard*) {}
};
class DependencyVisitor : public DefaultDependencyVisitor {
diff --git a/src/mongo/db/pipeline/expression_visitor.h b/src/mongo/db/pipeline/expression_visitor.h
index 1c97b82ece5..9ff454f9088 100644
--- a/src/mongo/db/pipeline/expression_visitor.h
+++ b/src/mongo/db/pipeline/expression_visitor.h
@@ -29,6 +29,7 @@
#pragma once
+#include "mongo/db/pipeline/expression.h"
#include "mongo/platform/basic.h"
#include "mongo/db/pipeline/expression_walker.h"
@@ -156,6 +157,7 @@ class ExpressionInternalFindElemMatch;
class ExpressionInternalFLEBetween;
class ExpressionInternalFLEEqual;
class ExpressionInternalJsEmit;
+class ExpressionInternalOwningShard;
class ExpressionFunction;
class ExpressionDegreesToRadians;
class ExpressionRadiansToDegrees;
@@ -367,6 +369,8 @@ public:
virtual void visit(expression_walker::MaybeConstPtr<IsConst, ExpressionSetField>) = 0;
virtual void visit(expression_walker::MaybeConstPtr<IsConst, ExpressionTsSecond>) = 0;
virtual void visit(expression_walker::MaybeConstPtr<IsConst, ExpressionTsIncrement>) = 0;
+ virtual void visit(
+ expression_walker::MaybeConstPtr<IsConst, ExpressionInternalOwningShard>) = 0;
};
using ExpressionMutableVisitor = ExpressionVisitor<false>;
@@ -528,5 +532,6 @@ struct SelectiveConstExpressionVisitorBase : public ExpressionConstVisitor {
void visit(const ExpressionSetField*) override {}
void visit(const ExpressionTsSecond*) override {}
void visit(const ExpressionTsIncrement*) override {}
+ void visit(const ExpressionInternalOwningShard*) override {}
};
} // namespace mongo
diff --git a/src/mongo/db/query/cqf_command_utils.cpp b/src/mongo/db/query/cqf_command_utils.cpp
index 12e4080cded..2e7603514e2 100644
--- a/src/mongo/db/query/cqf_command_utils.cpp
+++ b/src/mongo/db/query/cqf_command_utils.cpp
@@ -926,6 +926,10 @@ public:
unsupportedExpression();
}
+ void visit(const ExpressionInternalOwningShard* expr) override final {
+ unsupportedExpression();
+ }
+
private:
void unsupportedExpression() {
_eligible = false;
diff --git a/src/mongo/db/query/sbe_stage_builder_expression.cpp b/src/mongo/db/query/sbe_stage_builder_expression.cpp
index 118f8f8c4fe..985349d6d8f 100644
--- a/src/mongo/db/query/sbe_stage_builder_expression.cpp
+++ b/src/mongo/db/query/sbe_stage_builder_expression.cpp
@@ -474,6 +474,7 @@ public:
void visit(const ExpressionSetField* expr) final {}
void visit(const ExpressionTsSecond* expr) final {}
void visit(const ExpressionTsIncrement* expr) final {}
+ void visit(const ExpressionInternalOwningShard* expr) final {}
private:
void visitMultiBranchLogicExpression(const Expression* expr, sbe::EPrimBinary::Op logicOp) {
@@ -708,6 +709,7 @@ public:
void visit(const ExpressionSetField* expr) final {}
void visit(const ExpressionTsSecond* expr) final {}
void visit(const ExpressionTsIncrement* expr) final {}
+ void visit(const ExpressionInternalOwningShard* expr) final {}
private:
void visitMultiBranchLogicExpression(const Expression* expr, sbe::EPrimBinary::Op logicOp) {
@@ -3243,6 +3245,10 @@ public:
_context->pushExpr(std::move(tsIncrementExpr));
}
+ void visit(const ExpressionInternalOwningShard* expr) final {
+ unsupportedExpression("$_internalOwningShard");
+ }
+
private:
/**
* Shared logic for $and, $or. Converts each child into an EExpression that evaluates to Boolean
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 9e6f8d84f01..a97c5dabee0 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -20,6 +20,7 @@ env.Library(
'range_deletion_task.idl',
'shard_key_index_util.cpp',
'sharding_api_d_params.idl',
+ 'sharding_expressions.cpp',
'sharding_migration_critical_section.cpp',
'sharding_state.cpp',
'sharding_statistics.cpp',
diff --git a/src/mongo/db/s/sharding_expressions.cpp b/src/mongo/db/s/sharding_expressions.cpp
new file mode 100644
index 00000000000..595bc1b577f
--- /dev/null
+++ b/src/mongo/db/s/sharding_expressions.cpp
@@ -0,0 +1,101 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/s/sharding_expressions.h"
+
+#include "mongo/db/exec/document_value/document.h"
+#include "mongo/db/exec/document_value/value.h"
+#include "mongo/db/pipeline/expression.h"
+#include "mongo/db/pipeline/expression_context.h"
+#include "mongo/db/pipeline/expression_visitor.h"
+#include "mongo/db/pipeline/variables.h"
+#include "mongo/db/s/sharding_state.h"
+#include "mongo/s/grid.h"
+#include "mongo/s/is_mongos.h"
+
+
+namespace mongo {
+
+Value ExpressionInternalOwningShard::evaluate(const Document& root, Variables* variables) const {
+ // TODO SERVER-71519: Add support for handling stale exception from mongos with
+ // enableFinerGrainedCatalogCacheRefresh.
+ uassert(6868600, "$_internalOwningShard is currently not supported on mongos", !isMongos());
+
+ Value input = _children[0]->evaluate(root, variables);
+ if (input.nullish()) {
+ return Value(BSONNULL);
+ }
+
+ // Retrieve the values from the incoming document.
+ NamespaceString ns(getExpressionContext()->ns.tenantId(), input["ns"_sd].getStringData());
+ const auto shardVersionObj = input["shardVersion"_sd].getDocument().toBson();
+ const auto shardVersion = ShardVersion::parse(BSON("" << shardVersionObj).firstElement());
+ const auto shardKeyVal = input["shardKeyVal"_sd].getDocument().toBson();
+
+ // Get the 'chunkManager' from the catalog cache.
+ auto opCtx = getExpressionContext()->opCtx;
+ const auto catalogCache = Grid::get(opCtx)->catalogCache();
+ uassert(6868602,
+ "$_internalOwningShard expression only makes sense in sharded environment",
+ catalogCache);
+
+ // Setting 'allowLocks' to true when evaluating on mongod, as otherwise an invariant is thrown.
+ // We can safely set it to true as there is no risk of deadlock, because the code still throws
+ // when a refresh would actually need to take place.
+ const auto chunkManager =
+ uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, ns, true /* allowLocks */))
+ .cm;
+
+ // Invalidate catalog cache if the chunk manager version is stale.
+ if (chunkManager.getVersion().isOlderThan(shardVersion.placementVersion())) {
+ boost::optional<CollectionIndexes> collIndexes;
+ ShardVersion currentShardVersion(chunkManager.getVersion(), collIndexes);
+ uasserted(StaleConfigInfo(ns,
+ currentShardVersion,
+ boost::none /* wanted */,
+ ShardingState::get(opCtx)->shardId()),
+ str::stream()
+ << "Sharding information of collection " << ns
+ << " is currently stale and needs to be recovered from the config server");
+ }
+
+ // Retrieve the shard id for the given shard key value.
+ std::set<ShardId> shardIds;
+ chunkManager.getShardIdsForRange(shardKeyVal, shardKeyVal, &shardIds);
+ uassert(6868601, "The value should belong to exactly one ShardId", shardIds.size() == 1);
+ const auto shardId = *(shardIds.begin());
+ return Value(shardId.toString());
+}
+
+REGISTER_STABLE_EXPRESSION(_internalOwningShard, ExpressionInternalOwningShard::parse);
+
+}; // namespace mongo
diff --git a/src/mongo/db/s/sharding_expressions.h b/src/mongo/db/s/sharding_expressions.h
new file mode 100644
index 00000000000..93ad733b55a
--- /dev/null
+++ b/src/mongo/db/s/sharding_expressions.h
@@ -0,0 +1,68 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/exec/document_value/document.h"
+#include "mongo/db/exec/document_value/value.h"
+#include "mongo/db/pipeline/expression.h"
+#include "mongo/db/pipeline/expression_context.h"
+#include "mongo/db/pipeline/expression_visitor.h"
+#include "mongo/db/pipeline/variables.h"
+
+namespace mongo {
+
+class ExpressionInternalOwningShard final
+ : public ExpressionFixedArity<ExpressionInternalOwningShard, 1> {
+public:
+ static constexpr const char* const opName = "$_internalOwningShard";
+
+ ExpressionInternalOwningShard(ExpressionContext* const expCtx)
+ : ExpressionFixedArity<ExpressionInternalOwningShard, 1>(expCtx) {
+ expCtx->sbeCompatible = false;
+ }
+
+ Value evaluate(const Document& root, Variables* variables) const final;
+
+ const char* getOpName() const final {
+ return opName;
+ }
+
+ void acceptVisitor(ExpressionMutableVisitor* visitor) final {
+ return visitor->visit(this);
+ }
+
+ void acceptVisitor(ExpressionConstVisitor* visitor) const final {
+ return visitor->visit(this);
+ }
+};
+
+}; // namespace mongo
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index 84f46d900dd..84444a22f43 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -1849,7 +1849,13 @@ Future<void> ExecCommandDatabase::_commandExec() {
const auto refreshed = _execContext->behaviors->refreshCollection(opCtx, *sce);
if (refreshed) {
_refreshedCollection = true;
- if (!opCtx->isContinuingMultiDocumentTransaction() && !inCriticalSection) {
+
+ // Can not rerun the command when executing a GetMore command as the cursor
+ // is already lost.
+ const auto isRunningGetMoreCmd =
+ _execContext->getCommand()->getName() == "getMore";
+ if (!opCtx->isContinuingMultiDocumentTransaction() && !inCriticalSection &&
+ !isRunningGetMoreCmd) {
_resetLockerStateAfterShardingUpdate(opCtx);
return _commandExec();
}
@@ -1876,7 +1882,12 @@ Future<void> ExecCommandDatabase::_commandExec() {
if (refreshed) {
_refreshedCatalogCache = true;
- if (!opCtx->isContinuingMultiDocumentTransaction()) {
+
+ // Can not rerun the command when executing a GetMore command as the cursor is
+ // already lost.
+ const auto isRunningGetMoreCmd =
+ _execContext->getCommand()->getName() == "getMore";
+ if (!opCtx->isContinuingMultiDocumentTransaction() && !isRunningGetMoreCmd) {
_resetLockerStateAfterShardingUpdate(opCtx);
return _commandExec();
}