diff options
author | Eric Cox <eric.cox@mongodb.com> | 2021-11-04 18:30:27 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-11-04 20:24:48 +0000 |
commit | bf031c7246810e873d7d9db14c57de1d771f1e56 (patch) | |
tree | bf5b4bac91ddef0b9c0b1f9a6970069b0ba2d27e | |
parent | 70bf440ea4967e8c1ea67daf8dd2d4f36beea41f (diff) | |
download | mongo-bf031c7246810e873d7d9db14c57de1d771f1e56.tar.gz |
SERVER-59924 Error executing aggregate with $out with "available" read concern on sharded clusters
10 files changed, 77 insertions, 102 deletions
diff --git a/jstests/sharding/agg_out_rc_available.js b/jstests/sharding/agg_out_rc_available.js new file mode 100644 index 00000000000..e6dfae35b8a --- /dev/null +++ b/jstests/sharding/agg_out_rc_available.js @@ -0,0 +1,43 @@ +/** + * Tests that executing aggregate with $out with "available" read concern on sharded clusters + * doesn't fail. + */ +(function() { +"use strict"; + +load('jstests/aggregation/extras/utils.js'); + +const st = new ShardingTest({shards: {rs0: {nodes: 1}}}); +const dbName = "test"; +db = st.getDB(dbName); + +assert.commandWorked(st.s.adminCommand({enableSharding: dbName})); + +// Setup and populate input collection. +const inputCollName = "input_coll"; +const inputColl = db[inputCollName]; + +const inputDocs = [{_id: 1, x: 11}, {_id: 2, x: 22}, {_id: 3, x: 33}]; +assert.commandWorked(inputColl.insert(inputDocs)); + +// Run a simple agg pipeline with $out and a readConcern of 'available' and assert that the command +// doesn't fail. +const outputCollName = "output_coll"; +assert.commandWorked(db.runCommand({ + aggregate: inputCollName, + pipeline: [{$out: outputCollName}], + cursor: {}, + readConcern: {level: "available"} +})); + +// Verify that the output collection contains the docments from the input collection. +const result = assert.commandWorked(db.runCommand({ + aggregate: outputCollName, + pipeline: [{$match: {}}], + cursor: {}, + readConcern: {level: "available"} +})); +assert(resultsEq(result.cursor.firstBatch, inputDocs), result.cursor); + +st.stop(); +})(); diff --git a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h index 6bb56446ff9..8d5102762e6 100644 --- a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h @@ -424,8 +424,7 @@ public: * Sets the expected shard version for the given namespace. Invariants if the caller attempts to * change an existing shard version, or if the shard version for this namespace has already been * checked by the commands infrastructure. Used by $lookup and $graphLookup to enforce the - * constraint that the foreign collection must be unsharded. If the parent operation is - * unversioned, this method does nothing. + * constraint that the foreign collection must be unsharded. */ virtual void setExpectedShardVersion(OperationContext* opCtx, const NamespaceString& nss, diff --git a/src/mongo/db/pipeline/process_interface/mongod_process_interface_factory.cpp b/src/mongo/db/pipeline/process_interface/mongod_process_interface_factory.cpp index 20d666ef96e..0405df29bed 100644 --- a/src/mongo/db/pipeline/process_interface/mongod_process_interface_factory.cpp +++ b/src/mongo/db/pipeline/process_interface/mongod_process_interface_factory.cpp @@ -48,7 +48,7 @@ std::shared_ptr<MongoProcessInterface> MongoProcessInterfaceCreateImpl(Operation (opCtx->getClient()->session()->getTags() & transport::Session::kInternalClient); if (ShardingState::get(opCtx)->enabled() && isInternalClient) { return std::make_shared<ShardServerProcessInterface>( - opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor()); + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor()); } else if (auto executor = ReplicaSetNodeProcessInterface::getReplicaSetNodeExecutor(opCtx)) { return std::make_shared<ReplicaSetNodeProcessInterface>(std::move(executor)); } diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp index ec1d83d2f27..b6c840b5846 100644 --- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp @@ -57,12 +57,6 @@ namespace mongo { using namespace fmt::literals; -ShardServerProcessInterface::ShardServerProcessInterface( - OperationContext* opCtx, std::shared_ptr<executor::TaskExecutor> executor) - : CommonMongodProcessInterface(executor) { - _opIsVersioned = OperationShardingState::isOperationVersioned(opCtx); -} - bool ShardServerProcessInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) { const auto cm = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); @@ -200,15 +194,14 @@ void ShardServerProcessInterface::renameIfOptionsAndIndexesHaveNotChanged( newCmdWithWriteConcernBuilder.append(WriteConcernOptions::kWriteConcernField, opCtx->getWriteConcern().toBSON()); newCmdObj = newCmdWithWriteConcernBuilder.done(); - auto response = executeRawCommandAgainstDatabasePrimary( - opCtx, - // internalRenameIfOptionsAndIndexesMatch is adminOnly. - NamespaceString::kAdminDb, - cachedDbInfo, - // Only unsharded collections can be renamed. - _versionCommandIfAppropriate(newCmdObj, cachedDbInfo, ChunkVersion::UNSHARDED()), - ReadPreferenceSetting(ReadPreference::PrimaryOnly), - Shard::RetryPolicy::kNoRetry); + auto response = + executeCommandAgainstDatabasePrimary(opCtx, + // internalRenameIfOptionsAndIndexesMatch is adminOnly. + NamespaceString::kAdminDb, + std::move(cachedDbInfo), + newCmdObj, + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + Shard::RetryPolicy::kNoRetry); uassertStatusOKWithContext(response.swResponse, str::stream() << "failed while running command " << newCmdObj); auto result = response.swResponse.getValue().data; @@ -238,7 +231,7 @@ BSONObj ShardServerProcessInterface::getCollectionOptions(OperationContext* opCt shard->runExhaustiveCursorCommand(opCtx, ReadPreferenceSetting(ReadPreference::PrimaryOnly), nss.db().toString(), - _versionCommandIfAppropriate(cmdObj, cachedDbInfo), + appendDbVersionIfPresent(cmdObj, cachedDbInfo), Milliseconds(-1))); } catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) { return BSONObj{}; @@ -284,7 +277,7 @@ std::list<BSONObj> ShardServerProcessInterface::getIndexSpecs(OperationContext* shard->runExhaustiveCursorCommand(opCtx, ReadPreferenceSetting(ReadPreference::PrimaryOnly), ns.db().toString(), - _versionCommandIfAppropriate(cmdObj, cachedDbInfo), + appendDbVersionIfPresent(cmdObj, cachedDbInfo), Milliseconds(-1))); } catch (ExceptionFor<ErrorCodes::NamespaceNotFound>&) { return std::list<BSONObj>(); @@ -301,13 +294,13 @@ void ShardServerProcessInterface::createCollection(OperationContext* opCtx, finalCmdBuilder.append(WriteConcernOptions::kWriteConcernField, opCtx->getWriteConcern().toBSON()); BSONObj finalCmdObj = finalCmdBuilder.obj(); - auto response = executeRawCommandAgainstDatabasePrimary( - opCtx, - dbName, - cachedDbInfo, - _versionCommandIfAppropriate(finalCmdObj, cachedDbInfo), - ReadPreferenceSetting(ReadPreference::PrimaryOnly), - Shard::RetryPolicy::kIdempotent); + auto response = + executeCommandAgainstDatabasePrimary(opCtx, + dbName, + std::move(cachedDbInfo), + finalCmdObj, + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + Shard::RetryPolicy::kIdempotent); uassertStatusOKWithContext(response.swResponse, str::stream() << "failed while running command " << finalCmdObj); auto result = response.swResponse.getValue().data; @@ -335,11 +328,11 @@ void ShardServerProcessInterface::createIndexesOnEmptyCollection( ns, "copying index for empty collection {}"_format(ns.ns()), [&] { - auto response = executeRawCommandAgainstDatabasePrimary( + auto response = executeCommandAgainstDatabasePrimary( opCtx, ns.db(), - cachedDbInfo, - _versionCommandIfAppropriate(cmdObj, cachedDbInfo), + std::move(cachedDbInfo), + cmdObj, ReadPreferenceSetting(ReadPreference::PrimaryOnly), Shard::RetryPolicy::kIdempotent); @@ -365,14 +358,13 @@ void ShardServerProcessInterface::dropCollection(OperationContext* opCtx, newCmdBuilder.append(WriteConcernOptions::kWriteConcernField, opCtx->getWriteConcern().toBSON()); auto cmdObj = newCmdBuilder.done(); - auto response = executeRawCommandAgainstDatabasePrimary( - opCtx, - ns.db(), - cachedDbInfo, - // Only unsharded collections can be dropped. - _versionCommandIfAppropriate(cmdObj, cachedDbInfo, ChunkVersion::UNSHARDED()), - ReadPreferenceSetting(ReadPreference::PrimaryOnly), - Shard::RetryPolicy::kIdempotent); + auto response = + executeCommandAgainstDatabasePrimary(opCtx, + ns.db(), + std::move(cachedDbInfo), + cmdObj, + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + Shard::RetryPolicy::kIdempotent); uassertStatusOKWithContext(response.swResponse, str::stream() << "failed while running command " << cmdObj); auto result = response.swResponse.getValue().data; @@ -396,20 +388,10 @@ void ShardServerProcessInterface::setExpectedShardVersion( auto& oss = OperationShardingState::get(opCtx); if (oss.hasShardVersion(nss)) { invariant(oss.getShardVersion(nss) == chunkVersion); - } else if (_opIsVersioned) { - oss.initializeClientRoutingVersions(nss, chunkVersion, boost::none); - } -} - -BSONObj ShardServerProcessInterface::_versionCommandIfAppropriate( - BSONObj cmdObj, - const CachedDatabaseInfo& cachedDbInfo, - boost::optional<ChunkVersion> shardVersion) { - if (!_opIsVersioned) { - return cmdObj; + } else { + OperationShardingState::get(opCtx).initializeClientRoutingVersions( + nss, chunkVersion, boost::none); } - return appendDbVersionIfPresent( - shardVersion ? appendShardVersion(cmdObj, *shardVersion) : cmdObj, cachedDbInfo); } } // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h index f574ea938ac..5f7dab374a2 100644 --- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h @@ -31,7 +31,6 @@ #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/pipeline/process_interface/common_mongod_process_interface.h" -#include "mongo/s/catalog_cache.h" namespace mongo { @@ -42,9 +41,6 @@ class ShardServerProcessInterface final : public CommonMongodProcessInterface { public: using CommonMongodProcessInterface::CommonMongodProcessInterface; - ShardServerProcessInterface(OperationContext* opCtx, - std::shared_ptr<executor::TaskExecutor> executor); - /** * Note: Cannot be called while holding a lock. Refreshes from the config servers if the * metadata for the given namespace does not exist. Otherwise, will not automatically refresh, @@ -129,25 +125,6 @@ public: void setExpectedShardVersion(OperationContext* opCtx, const NamespaceString& nss, boost::optional<ChunkVersion> chunkVersion) final; - -private: - // If the current operation is versioned, then we attach the DB version to the command object; - // otherwise, it is returned unmodified. Used when running internal commands, as the parent - // operation may be unversioned if run by a client connecting directly to the shard. If a shard - // version is supplied it will be set on the command, otherwise no shard version is attached. - // This allows sub-operations to set a version where necessary (e.g. to enforce that only - // unsharded collections can be renamed) while omitting it in cases where it is not relevant - // (e.g. obtaining a list of indexes for a collection that may be sharded or unsharded). - BSONObj _versionCommandIfAppropriate(BSONObj cmdObj, - const CachedDatabaseInfo& cachedDbInfo, - boost::optional<ChunkVersion> shardVersion = boost::none); - - // Records whether the initial operation which creates this MongoProcessInterface is versioned. - // We want to avoid applying versions to sub-operations in cases where the client has connected - // directly to a shard. Since getMores are never versioned, we must retain the versioning state - // of the original operation so that we can decide whether we should version sub-operations - // across the entire lifetime of the pipeline that owns this MongoProcessInterface. - bool _opIsVersioned = false; }; } // namespace mongo diff --git a/src/mongo/db/s/config/initial_split_policy.cpp b/src/mongo/db/s/config/initial_split_policy.cpp index 27d9a4546ee..365a08dcf8a 100644 --- a/src/mongo/db/s/config/initial_split_policy.cpp +++ b/src/mongo/db/s/config/initial_split_policy.cpp @@ -862,7 +862,7 @@ ReshardingSplitPolicy::_makePipelineDocumentSource(OperationContext* opCtx, // ShardServerProcessInterface instead of getting it from the generic factory so the pipeline // can talk to the shards. auto pi = std::make_shared<ShardServerProcessInterface>( - opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor()); + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor()); auto expCtx = make_intrusive<ExpressionContext>(opCtx, boost::none, /* explain */ diff --git a/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp b/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp index 1b22d39f53c..322a6bab51f 100644 --- a/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp +++ b/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp @@ -342,7 +342,6 @@ protected: std::shared_ptr<MongoProcessInterface> makeMongoProcessInterface() { return std::make_shared<ShardServerProcessInterface>( - operationContext(), Grid::get(getServiceContext())->getExecutorPool()->getFixedExecutor()); } diff --git a/src/mongo/s/cluster_commands_helpers.cpp b/src/mongo/s/cluster_commands_helpers.cpp index 8dfb3f20b4c..43b83f2c233 100644 --- a/src/mongo/s/cluster_commands_helpers.cpp +++ b/src/mongo/s/cluster_commands_helpers.cpp @@ -506,18 +506,6 @@ AsyncRequestsSender::Response executeCommandAgainstDatabasePrimary( return std::move(responses.front()); } -AsyncRequestsSender::Response executeRawCommandAgainstDatabasePrimary( - OperationContext* opCtx, - StringData dbName, - const CachedDatabaseInfo& dbInfo, - const BSONObj& cmdObj, - const ReadPreferenceSetting& readPref, - Shard::RetryPolicy retryPolicy) { - auto responses = - gatherResponses(opCtx, dbName, readPref, retryPolicy, {{dbInfo.primaryId(), cmdObj}}); - return std::move(responses.front()); -} - AsyncRequestsSender::Response executeCommandAgainstShardWithMinKeyChunk( OperationContext* opCtx, const NamespaceString& nss, diff --git a/src/mongo/s/cluster_commands_helpers.h b/src/mongo/s/cluster_commands_helpers.h index 9638972b5ab..d6443df0407 100644 --- a/src/mongo/s/cluster_commands_helpers.h +++ b/src/mongo/s/cluster_commands_helpers.h @@ -253,18 +253,6 @@ AsyncRequestsSender::Response executeCommandAgainstDatabasePrimary( Shard::RetryPolicy retryPolicy); /** - * Utility for dispatching commands against the primary of a database. Does not attach a database or - * shard version to the command object, but instead issues it exactly as provided. Does not retry. - */ -AsyncRequestsSender::Response executeRawCommandAgainstDatabasePrimary( - OperationContext* opCtx, - StringData dbName, - const CachedDatabaseInfo& dbInfo, - const BSONObj& cmdObj, - const ReadPreferenceSetting& readPref, - Shard::RetryPolicy retryPolicy); - -/** * Utility for dispatching commands against the shard with the MinKey chunk for the namespace and * attaching the appropriate shard version. * diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp index c85caccb34d..dcd4ffde2f0 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner.cpp @@ -148,8 +148,7 @@ BSONObj createCommandForMergingShard(Document serializedCommand, } // Attach the IGNORED chunk version to the command. On the shard, this will skip the actual - // version check but will nonetheless mark the operation as versioned, indicating that any - // internal operations executed by the pipeline should also be appropriately versioned. + // version check but will nonetheless mark the operation as versioned. auto mergeCmdObj = appendShardVersion(mergeCmd.freeze().toBson(), ChunkVersion::IGNORED()); // Attach the read and write concerns if needed, and return the final command object. |