summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEric Cox <eric.cox@mongodb.com>2021-11-04 18:30:27 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-11-04 20:24:48 +0000
commitbf031c7246810e873d7d9db14c57de1d771f1e56 (patch)
treebf5b4bac91ddef0b9c0b1f9a6970069b0ba2d27e
parent70bf440ea4967e8c1ea67daf8dd2d4f36beea41f (diff)
downloadmongo-bf031c7246810e873d7d9db14c57de1d771f1e56.tar.gz
SERVER-59924 Error executing aggregate with $out with "available" read concern on sharded clusters
-rw-r--r--jstests/sharding/agg_out_rc_available.js43
-rw-r--r--src/mongo/db/pipeline/process_interface/mongo_process_interface.h3
-rw-r--r--src/mongo/db/pipeline/process_interface/mongod_process_interface_factory.cpp2
-rw-r--r--src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp78
-rw-r--r--src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h23
-rw-r--r--src/mongo/db/s/config/initial_split_policy.cpp2
-rw-r--r--src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp1
-rw-r--r--src/mongo/s/cluster_commands_helpers.cpp12
-rw-r--r--src/mongo/s/cluster_commands_helpers.h12
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.cpp3
10 files changed, 77 insertions, 102 deletions
diff --git a/jstests/sharding/agg_out_rc_available.js b/jstests/sharding/agg_out_rc_available.js
new file mode 100644
index 00000000000..e6dfae35b8a
--- /dev/null
+++ b/jstests/sharding/agg_out_rc_available.js
@@ -0,0 +1,43 @@
+/**
+ * Tests that executing aggregate with $out with "available" read concern on sharded clusters
+ * doesn't fail.
+ */
+(function() {
+"use strict";
+
+load('jstests/aggregation/extras/utils.js');
+
+const st = new ShardingTest({shards: {rs0: {nodes: 1}}});
+const dbName = "test";
+db = st.getDB(dbName);
+
+assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
+
+// Setup and populate input collection.
+const inputCollName = "input_coll";
+const inputColl = db[inputCollName];
+
+const inputDocs = [{_id: 1, x: 11}, {_id: 2, x: 22}, {_id: 3, x: 33}];
+assert.commandWorked(inputColl.insert(inputDocs));
+
+// Run a simple agg pipeline with $out and a readConcern of 'available' and assert that the command
+// doesn't fail.
+const outputCollName = "output_coll";
+assert.commandWorked(db.runCommand({
+ aggregate: inputCollName,
+ pipeline: [{$out: outputCollName}],
+ cursor: {},
+ readConcern: {level: "available"}
+}));
+
+// Verify that the output collection contains the docments from the input collection.
+const result = assert.commandWorked(db.runCommand({
+ aggregate: outputCollName,
+ pipeline: [{$match: {}}],
+ cursor: {},
+ readConcern: {level: "available"}
+}));
+assert(resultsEq(result.cursor.firstBatch, inputDocs), result.cursor);
+
+st.stop();
+})();
diff --git a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
index 6bb56446ff9..8d5102762e6 100644
--- a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
@@ -424,8 +424,7 @@ public:
* Sets the expected shard version for the given namespace. Invariants if the caller attempts to
* change an existing shard version, or if the shard version for this namespace has already been
* checked by the commands infrastructure. Used by $lookup and $graphLookup to enforce the
- * constraint that the foreign collection must be unsharded. If the parent operation is
- * unversioned, this method does nothing.
+ * constraint that the foreign collection must be unsharded.
*/
virtual void setExpectedShardVersion(OperationContext* opCtx,
const NamespaceString& nss,
diff --git a/src/mongo/db/pipeline/process_interface/mongod_process_interface_factory.cpp b/src/mongo/db/pipeline/process_interface/mongod_process_interface_factory.cpp
index 20d666ef96e..0405df29bed 100644
--- a/src/mongo/db/pipeline/process_interface/mongod_process_interface_factory.cpp
+++ b/src/mongo/db/pipeline/process_interface/mongod_process_interface_factory.cpp
@@ -48,7 +48,7 @@ std::shared_ptr<MongoProcessInterface> MongoProcessInterfaceCreateImpl(Operation
(opCtx->getClient()->session()->getTags() & transport::Session::kInternalClient);
if (ShardingState::get(opCtx)->enabled() && isInternalClient) {
return std::make_shared<ShardServerProcessInterface>(
- opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor());
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor());
} else if (auto executor = ReplicaSetNodeProcessInterface::getReplicaSetNodeExecutor(opCtx)) {
return std::make_shared<ReplicaSetNodeProcessInterface>(std::move(executor));
}
diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
index ec1d83d2f27..b6c840b5846 100644
--- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
@@ -57,12 +57,6 @@ namespace mongo {
using namespace fmt::literals;
-ShardServerProcessInterface::ShardServerProcessInterface(
- OperationContext* opCtx, std::shared_ptr<executor::TaskExecutor> executor)
- : CommonMongodProcessInterface(executor) {
- _opIsVersioned = OperationShardingState::isOperationVersioned(opCtx);
-}
-
bool ShardServerProcessInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) {
const auto cm =
uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss));
@@ -200,15 +194,14 @@ void ShardServerProcessInterface::renameIfOptionsAndIndexesHaveNotChanged(
newCmdWithWriteConcernBuilder.append(WriteConcernOptions::kWriteConcernField,
opCtx->getWriteConcern().toBSON());
newCmdObj = newCmdWithWriteConcernBuilder.done();
- auto response = executeRawCommandAgainstDatabasePrimary(
- opCtx,
- // internalRenameIfOptionsAndIndexesMatch is adminOnly.
- NamespaceString::kAdminDb,
- cachedDbInfo,
- // Only unsharded collections can be renamed.
- _versionCommandIfAppropriate(newCmdObj, cachedDbInfo, ChunkVersion::UNSHARDED()),
- ReadPreferenceSetting(ReadPreference::PrimaryOnly),
- Shard::RetryPolicy::kNoRetry);
+ auto response =
+ executeCommandAgainstDatabasePrimary(opCtx,
+ // internalRenameIfOptionsAndIndexesMatch is adminOnly.
+ NamespaceString::kAdminDb,
+ std::move(cachedDbInfo),
+ newCmdObj,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ Shard::RetryPolicy::kNoRetry);
uassertStatusOKWithContext(response.swResponse,
str::stream() << "failed while running command " << newCmdObj);
auto result = response.swResponse.getValue().data;
@@ -238,7 +231,7 @@ BSONObj ShardServerProcessInterface::getCollectionOptions(OperationContext* opCt
shard->runExhaustiveCursorCommand(opCtx,
ReadPreferenceSetting(ReadPreference::PrimaryOnly),
nss.db().toString(),
- _versionCommandIfAppropriate(cmdObj, cachedDbInfo),
+ appendDbVersionIfPresent(cmdObj, cachedDbInfo),
Milliseconds(-1)));
} catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) {
return BSONObj{};
@@ -284,7 +277,7 @@ std::list<BSONObj> ShardServerProcessInterface::getIndexSpecs(OperationContext*
shard->runExhaustiveCursorCommand(opCtx,
ReadPreferenceSetting(ReadPreference::PrimaryOnly),
ns.db().toString(),
- _versionCommandIfAppropriate(cmdObj, cachedDbInfo),
+ appendDbVersionIfPresent(cmdObj, cachedDbInfo),
Milliseconds(-1)));
} catch (ExceptionFor<ErrorCodes::NamespaceNotFound>&) {
return std::list<BSONObj>();
@@ -301,13 +294,13 @@ void ShardServerProcessInterface::createCollection(OperationContext* opCtx,
finalCmdBuilder.append(WriteConcernOptions::kWriteConcernField,
opCtx->getWriteConcern().toBSON());
BSONObj finalCmdObj = finalCmdBuilder.obj();
- auto response = executeRawCommandAgainstDatabasePrimary(
- opCtx,
- dbName,
- cachedDbInfo,
- _versionCommandIfAppropriate(finalCmdObj, cachedDbInfo),
- ReadPreferenceSetting(ReadPreference::PrimaryOnly),
- Shard::RetryPolicy::kIdempotent);
+ auto response =
+ executeCommandAgainstDatabasePrimary(opCtx,
+ dbName,
+ std::move(cachedDbInfo),
+ finalCmdObj,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ Shard::RetryPolicy::kIdempotent);
uassertStatusOKWithContext(response.swResponse,
str::stream() << "failed while running command " << finalCmdObj);
auto result = response.swResponse.getValue().data;
@@ -335,11 +328,11 @@ void ShardServerProcessInterface::createIndexesOnEmptyCollection(
ns,
"copying index for empty collection {}"_format(ns.ns()),
[&] {
- auto response = executeRawCommandAgainstDatabasePrimary(
+ auto response = executeCommandAgainstDatabasePrimary(
opCtx,
ns.db(),
- cachedDbInfo,
- _versionCommandIfAppropriate(cmdObj, cachedDbInfo),
+ std::move(cachedDbInfo),
+ cmdObj,
ReadPreferenceSetting(ReadPreference::PrimaryOnly),
Shard::RetryPolicy::kIdempotent);
@@ -365,14 +358,13 @@ void ShardServerProcessInterface::dropCollection(OperationContext* opCtx,
newCmdBuilder.append(WriteConcernOptions::kWriteConcernField,
opCtx->getWriteConcern().toBSON());
auto cmdObj = newCmdBuilder.done();
- auto response = executeRawCommandAgainstDatabasePrimary(
- opCtx,
- ns.db(),
- cachedDbInfo,
- // Only unsharded collections can be dropped.
- _versionCommandIfAppropriate(cmdObj, cachedDbInfo, ChunkVersion::UNSHARDED()),
- ReadPreferenceSetting(ReadPreference::PrimaryOnly),
- Shard::RetryPolicy::kIdempotent);
+ auto response =
+ executeCommandAgainstDatabasePrimary(opCtx,
+ ns.db(),
+ std::move(cachedDbInfo),
+ cmdObj,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ Shard::RetryPolicy::kIdempotent);
uassertStatusOKWithContext(response.swResponse,
str::stream() << "failed while running command " << cmdObj);
auto result = response.swResponse.getValue().data;
@@ -396,20 +388,10 @@ void ShardServerProcessInterface::setExpectedShardVersion(
auto& oss = OperationShardingState::get(opCtx);
if (oss.hasShardVersion(nss)) {
invariant(oss.getShardVersion(nss) == chunkVersion);
- } else if (_opIsVersioned) {
- oss.initializeClientRoutingVersions(nss, chunkVersion, boost::none);
- }
-}
-
-BSONObj ShardServerProcessInterface::_versionCommandIfAppropriate(
- BSONObj cmdObj,
- const CachedDatabaseInfo& cachedDbInfo,
- boost::optional<ChunkVersion> shardVersion) {
- if (!_opIsVersioned) {
- return cmdObj;
+ } else {
+ OperationShardingState::get(opCtx).initializeClientRoutingVersions(
+ nss, chunkVersion, boost::none);
}
- return appendDbVersionIfPresent(
- shardVersion ? appendShardVersion(cmdObj, *shardVersion) : cmdObj, cachedDbInfo);
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h
index f574ea938ac..5f7dab374a2 100644
--- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h
@@ -31,7 +31,6 @@
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/pipeline/process_interface/common_mongod_process_interface.h"
-#include "mongo/s/catalog_cache.h"
namespace mongo {
@@ -42,9 +41,6 @@ class ShardServerProcessInterface final : public CommonMongodProcessInterface {
public:
using CommonMongodProcessInterface::CommonMongodProcessInterface;
- ShardServerProcessInterface(OperationContext* opCtx,
- std::shared_ptr<executor::TaskExecutor> executor);
-
/**
* Note: Cannot be called while holding a lock. Refreshes from the config servers if the
* metadata for the given namespace does not exist. Otherwise, will not automatically refresh,
@@ -129,25 +125,6 @@ public:
void setExpectedShardVersion(OperationContext* opCtx,
const NamespaceString& nss,
boost::optional<ChunkVersion> chunkVersion) final;
-
-private:
- // If the current operation is versioned, then we attach the DB version to the command object;
- // otherwise, it is returned unmodified. Used when running internal commands, as the parent
- // operation may be unversioned if run by a client connecting directly to the shard. If a shard
- // version is supplied it will be set on the command, otherwise no shard version is attached.
- // This allows sub-operations to set a version where necessary (e.g. to enforce that only
- // unsharded collections can be renamed) while omitting it in cases where it is not relevant
- // (e.g. obtaining a list of indexes for a collection that may be sharded or unsharded).
- BSONObj _versionCommandIfAppropriate(BSONObj cmdObj,
- const CachedDatabaseInfo& cachedDbInfo,
- boost::optional<ChunkVersion> shardVersion = boost::none);
-
- // Records whether the initial operation which creates this MongoProcessInterface is versioned.
- // We want to avoid applying versions to sub-operations in cases where the client has connected
- // directly to a shard. Since getMores are never versioned, we must retain the versioning state
- // of the original operation so that we can decide whether we should version sub-operations
- // across the entire lifetime of the pipeline that owns this MongoProcessInterface.
- bool _opIsVersioned = false;
};
} // namespace mongo
diff --git a/src/mongo/db/s/config/initial_split_policy.cpp b/src/mongo/db/s/config/initial_split_policy.cpp
index 27d9a4546ee..365a08dcf8a 100644
--- a/src/mongo/db/s/config/initial_split_policy.cpp
+++ b/src/mongo/db/s/config/initial_split_policy.cpp
@@ -862,7 +862,7 @@ ReshardingSplitPolicy::_makePipelineDocumentSource(OperationContext* opCtx,
// ShardServerProcessInterface instead of getting it from the generic factory so the pipeline
// can talk to the shards.
auto pi = std::make_shared<ShardServerProcessInterface>(
- opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor());
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor());
auto expCtx = make_intrusive<ExpressionContext>(opCtx,
boost::none, /* explain */
diff --git a/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp b/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp
index 1b22d39f53c..322a6bab51f 100644
--- a/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp
@@ -342,7 +342,6 @@ protected:
std::shared_ptr<MongoProcessInterface> makeMongoProcessInterface() {
return std::make_shared<ShardServerProcessInterface>(
- operationContext(),
Grid::get(getServiceContext())->getExecutorPool()->getFixedExecutor());
}
diff --git a/src/mongo/s/cluster_commands_helpers.cpp b/src/mongo/s/cluster_commands_helpers.cpp
index 8dfb3f20b4c..43b83f2c233 100644
--- a/src/mongo/s/cluster_commands_helpers.cpp
+++ b/src/mongo/s/cluster_commands_helpers.cpp
@@ -506,18 +506,6 @@ AsyncRequestsSender::Response executeCommandAgainstDatabasePrimary(
return std::move(responses.front());
}
-AsyncRequestsSender::Response executeRawCommandAgainstDatabasePrimary(
- OperationContext* opCtx,
- StringData dbName,
- const CachedDatabaseInfo& dbInfo,
- const BSONObj& cmdObj,
- const ReadPreferenceSetting& readPref,
- Shard::RetryPolicy retryPolicy) {
- auto responses =
- gatherResponses(opCtx, dbName, readPref, retryPolicy, {{dbInfo.primaryId(), cmdObj}});
- return std::move(responses.front());
-}
-
AsyncRequestsSender::Response executeCommandAgainstShardWithMinKeyChunk(
OperationContext* opCtx,
const NamespaceString& nss,
diff --git a/src/mongo/s/cluster_commands_helpers.h b/src/mongo/s/cluster_commands_helpers.h
index 9638972b5ab..d6443df0407 100644
--- a/src/mongo/s/cluster_commands_helpers.h
+++ b/src/mongo/s/cluster_commands_helpers.h
@@ -253,18 +253,6 @@ AsyncRequestsSender::Response executeCommandAgainstDatabasePrimary(
Shard::RetryPolicy retryPolicy);
/**
- * Utility for dispatching commands against the primary of a database. Does not attach a database or
- * shard version to the command object, but instead issues it exactly as provided. Does not retry.
- */
-AsyncRequestsSender::Response executeRawCommandAgainstDatabasePrimary(
- OperationContext* opCtx,
- StringData dbName,
- const CachedDatabaseInfo& dbInfo,
- const BSONObj& cmdObj,
- const ReadPreferenceSetting& readPref,
- Shard::RetryPolicy retryPolicy);
-
-/**
* Utility for dispatching commands against the shard with the MinKey chunk for the namespace and
* attaching the appropriate shard version.
*
diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp
index c85caccb34d..dcd4ffde2f0 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.cpp
+++ b/src/mongo/s/query/cluster_aggregation_planner.cpp
@@ -148,8 +148,7 @@ BSONObj createCommandForMergingShard(Document serializedCommand,
}
// Attach the IGNORED chunk version to the command. On the shard, this will skip the actual
- // version check but will nonetheless mark the operation as versioned, indicating that any
- // internal operations executed by the pipeline should also be appropriately versioned.
+ // version check but will nonetheless mark the operation as versioned.
auto mergeCmdObj = appendShardVersion(mergeCmd.freeze().toBson(), ChunkVersion::IGNORED());
// Attach the read and write concerns if needed, and return the final command object.