diff options
author | Eric Cox <eric.cox@mongodb.com> | 2021-10-08 22:19:34 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-10-08 23:34:26 +0000 |
commit | 3f69e79c0ecda4c76efa25b66ea5cdc87acd4c98 (patch) | |
tree | 3a0cd26810cdd8008f809c40df6007a8670f3de5 /src | |
parent | 4aaacbdc5159eaf3365880f5a0b84db14c2f6941 (diff) | |
download | mongo-3f69e79c0ecda4c76efa25b66ea5cdc87acd4c98.tar.gz |
SERVER-59924 Error executing aggregate with $out with "available" read concern on sharded clusters
Diffstat (limited to 'src')
9 files changed, 37 insertions, 105 deletions
diff --git a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h index 64d23c976a4..9cf6043dfc6 100644 --- a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h @@ -432,7 +432,7 @@ public: * checked by the commands infrastructure. Used by $lookup and $graphLookup to enforce the * constraint that the foreign collection must be unsharded if featureFlagShardedLookup is * turned off. Also used to enforce that the catalog cache is up-to-date when doing a local - * read.If the parent operation is unversioned, this method does nothing. + * read. */ virtual void setExpectedShardVersion(OperationContext* opCtx, const NamespaceString& nss, diff --git a/src/mongo/db/pipeline/process_interface/mongod_process_interface_factory.cpp b/src/mongo/db/pipeline/process_interface/mongod_process_interface_factory.cpp index 20d666ef96e..0405df29bed 100644 --- a/src/mongo/db/pipeline/process_interface/mongod_process_interface_factory.cpp +++ b/src/mongo/db/pipeline/process_interface/mongod_process_interface_factory.cpp @@ -48,7 +48,7 @@ std::shared_ptr<MongoProcessInterface> MongoProcessInterfaceCreateImpl(Operation (opCtx->getClient()->session()->getTags() & transport::Session::kInternalClient); if (ShardingState::get(opCtx)->enabled() && isInternalClient) { return std::make_shared<ShardServerProcessInterface>( - opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor()); + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor()); } else if (auto executor = ReplicaSetNodeProcessInterface::getReplicaSetNodeExecutor(opCtx)) { return std::make_shared<ReplicaSetNodeProcessInterface>(std::move(executor)); } diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp index 67bd2f2d4f9..379cee15e4a 100644 --- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp @@ -58,12 +58,6 @@ namespace mongo { using namespace fmt::literals; -ShardServerProcessInterface::ShardServerProcessInterface( - OperationContext* opCtx, std::shared_ptr<executor::TaskExecutor> executor) - : CommonMongodProcessInterface(executor) { - _opIsVersioned = OperationShardingState::isOperationVersioned(opCtx); -} - bool ShardServerProcessInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) { const auto cm = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); @@ -216,15 +210,14 @@ void ShardServerProcessInterface::renameIfOptionsAndIndexesHaveNotChanged( newCmdWithWriteConcernBuilder.append(WriteConcernOptions::kWriteConcernField, opCtx->getWriteConcern().toBSON()); newCmdObj = newCmdWithWriteConcernBuilder.done(); - auto response = executeRawCommandAgainstDatabasePrimary( - opCtx, - // internalRenameIfOptionsAndIndexesMatch is adminOnly. - NamespaceString::kAdminDb, - cachedDbInfo, - // $out target collection must not exist or not be sharded. - _versionCommandIfAppropriate(newCmdObj, cachedDbInfo, ChunkVersion::UNSHARDED()), - ReadPreferenceSetting(ReadPreference::PrimaryOnly), - Shard::RetryPolicy::kNoRetry); + auto response = + executeCommandAgainstDatabasePrimary(opCtx, + // internalRenameIfOptionsAndIndexesMatch is adminOnly. + NamespaceString::kAdminDb, + std::move(cachedDbInfo), + newCmdObj, + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + Shard::RetryPolicy::kNoRetry); uassertStatusOKWithContext(response.swResponse, str::stream() << "failed while running command " << newCmdObj); auto result = response.swResponse.getValue().data; @@ -254,7 +247,7 @@ BSONObj ShardServerProcessInterface::getCollectionOptions(OperationContext* opCt shard->runExhaustiveCursorCommand(opCtx, ReadPreferenceSetting(ReadPreference::PrimaryOnly), nss.db().toString(), - _versionCommandIfAppropriate(cmdObj, cachedDbInfo), + appendDbVersionIfPresent(cmdObj, cachedDbInfo), Milliseconds(-1))); } catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) { return BSONObj{}; @@ -300,7 +293,7 @@ std::list<BSONObj> ShardServerProcessInterface::getIndexSpecs(OperationContext* shard->runExhaustiveCursorCommand(opCtx, ReadPreferenceSetting(ReadPreference::PrimaryOnly), ns.db().toString(), - _versionCommandIfAppropriate(cmdObj, cachedDbInfo), + appendDbVersionIfPresent(cmdObj, cachedDbInfo), Milliseconds(-1))); } catch (ExceptionFor<ErrorCodes::NamespaceNotFound>&) { return std::list<BSONObj>(); @@ -317,13 +310,13 @@ void ShardServerProcessInterface::createCollection(OperationContext* opCtx, finalCmdBuilder.append(WriteConcernOptions::kWriteConcernField, opCtx->getWriteConcern().toBSON()); BSONObj finalCmdObj = finalCmdBuilder.obj(); - auto response = executeRawCommandAgainstDatabasePrimary( - opCtx, - dbName, - cachedDbInfo, - _versionCommandIfAppropriate(finalCmdObj, cachedDbInfo), - ReadPreferenceSetting(ReadPreference::PrimaryOnly), - Shard::RetryPolicy::kIdempotent); + auto response = + executeCommandAgainstDatabasePrimary(opCtx, + dbName, + std::move(cachedDbInfo), + finalCmdObj, + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + Shard::RetryPolicy::kIdempotent); uassertStatusOKWithContext(response.swResponse, str::stream() << "failed while running command " << finalCmdObj); auto result = response.swResponse.getValue().data; @@ -351,11 +344,11 @@ void ShardServerProcessInterface::createIndexesOnEmptyCollection( ns, "copying index for empty collection {}"_format(ns.ns()), [&] { - auto response = executeRawCommandAgainstDatabasePrimary( + auto response = executeCommandAgainstDatabasePrimary( opCtx, ns.db(), - cachedDbInfo, - _versionCommandIfAppropriate(cmdObj, cachedDbInfo), + std::move(cachedDbInfo), + cmdObj, ReadPreferenceSetting(ReadPreference::PrimaryOnly), Shard::RetryPolicy::kIdempotent); @@ -381,14 +374,13 @@ void ShardServerProcessInterface::dropCollection(OperationContext* opCtx, newCmdBuilder.append(WriteConcernOptions::kWriteConcernField, opCtx->getWriteConcern().toBSON()); auto cmdObj = newCmdBuilder.done(); - auto response = executeRawCommandAgainstDatabasePrimary( - opCtx, - ns.db(), - cachedDbInfo, - // Only unsharded collections can be dropped. - _versionCommandIfAppropriate(cmdObj, cachedDbInfo, ChunkVersion::UNSHARDED()), - ReadPreferenceSetting(ReadPreference::PrimaryOnly), - Shard::RetryPolicy::kIdempotent); + auto response = + executeCommandAgainstDatabasePrimary(opCtx, + ns.db(), + std::move(cachedDbInfo), + cmdObj, + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + Shard::RetryPolicy::kIdempotent); uassertStatusOKWithContext(response.swResponse, str::stream() << "failed while running command " << cmdObj); auto result = response.swResponse.getValue().data; @@ -422,9 +414,9 @@ bool ShardServerProcessInterface::setExpectedDbVersion(OperationContext* opCtx, uassert(ErrorCodes::IllegalOperation, "Expected db version must match known db version", knownDBVersion == dbVersion); - } else if (_opIsVersioned) { - oss.initializeClientRoutingVersions(nss, boost::none, dbVersion); - return true; + } else { + OperationShardingState::get(opCtx).initializeClientRoutingVersions( + nss, boost::none, dbVersion); } return false; @@ -437,8 +429,9 @@ void ShardServerProcessInterface::setExpectedShardVersion( auto& oss = OperationShardingState::get(opCtx); if (oss.hasShardVersion(nss)) { invariant(oss.getShardVersion(nss) == chunkVersion); - } else if (_opIsVersioned) { - oss.initializeClientRoutingVersions(nss, chunkVersion, boost::none); + } else { + OperationShardingState::get(opCtx).initializeClientRoutingVersions( + nss, chunkVersion, boost::none); } } @@ -446,16 +439,4 @@ void ShardServerProcessInterface::checkOnPrimaryShardForDb(OperationContext* opC const NamespaceString& nss) { DatabaseShardingState::checkIsPrimaryShardForDb(opCtx, nss.db()); } - -BSONObj ShardServerProcessInterface::_versionCommandIfAppropriate( - BSONObj cmdObj, - const CachedDatabaseInfo& cachedDbInfo, - boost::optional<ChunkVersion> shardVersion) { - if (!_opIsVersioned) { - return cmdObj; - } - return appendDbVersionIfPresent( - shardVersion ? appendShardVersion(cmdObj, *shardVersion) : cmdObj, cachedDbInfo); -} - } // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h index 59309fddcd4..dc4b26a971a 100644 --- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h @@ -31,7 +31,6 @@ #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/pipeline/process_interface/common_mongod_process_interface.h" -#include "mongo/s/catalog_cache.h" namespace mongo { @@ -42,9 +41,6 @@ class ShardServerProcessInterface final : public CommonMongodProcessInterface { public: using CommonMongodProcessInterface::CommonMongodProcessInterface; - ShardServerProcessInterface(OperationContext* opCtx, - std::shared_ptr<executor::TaskExecutor> executor); - /** * Note: Cannot be called while holding a lock. Refreshes from the config servers if the * metadata for the given namespace does not exist. Otherwise, will not automatically refresh, @@ -146,25 +142,6 @@ public: void unsetExpectedDbVersion(OperationContext* opCtx, const NamespaceString& nss) final; void checkOnPrimaryShardForDb(OperationContext* opCtx, const NamespaceString& nss) final; - -private: - // If the current operation is versioned, then we attach the DB version to the command object; - // otherwise, it is returned unmodified. Used when running internal commands, as the parent - // operation may be unversioned if run by a client connecting directly to the shard. If a shard - // version is supplied it will be set on the command, otherwise no shard version is attached. - // This allows sub-operations to set a version where necessary (e.g. to enforce that only - // unsharded collections can be renamed) while omitting it in cases where it is not relevant - // (e.g. obtaining a list of indexes for a collection that may be sharded or unsharded). - BSONObj _versionCommandIfAppropriate(BSONObj cmdObj, - const CachedDatabaseInfo& cachedDbInfo, - boost::optional<ChunkVersion> shardVersion = boost::none); - - // Records whether the initial operation which creates this MongoProcessInterface is versioned. - // We want to avoid applying versions to sub-operations in cases where the client has connected - // directly to a shard. Since getMores are never versioned, we must retain the versioning state - // of the original operation so that we can decide whether we should version sub-operations - // across the entire lifetime of the pipeline that owns this MongoProcessInterface. - bool _opIsVersioned = false; }; } // namespace mongo diff --git a/src/mongo/db/s/config/initial_split_policy.cpp b/src/mongo/db/s/config/initial_split_policy.cpp index 3bf9be92769..5bdfb55d6c3 100644 --- a/src/mongo/db/s/config/initial_split_policy.cpp +++ b/src/mongo/db/s/config/initial_split_policy.cpp @@ -827,7 +827,7 @@ ReshardingSplitPolicy::_makePipelineDocumentSource(OperationContext* opCtx, // ShardServerProcessInterface instead of getting it from the generic factory so the pipeline // can talk to the shards. auto pi = std::make_shared<ShardServerProcessInterface>( - opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor()); + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor()); auto expCtx = make_intrusive<ExpressionContext>(opCtx, boost::none, /* explain */ diff --git a/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp b/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp index ba1efd5a017..da8acbf4f2e 100644 --- a/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp +++ b/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp @@ -351,7 +351,6 @@ protected: std::shared_ptr<MongoProcessInterface> makeMongoProcessInterface() { return std::make_shared<ShardServerProcessInterface>( - operationContext(), Grid::get(getServiceContext())->getExecutorPool()->getFixedExecutor()); } diff --git a/src/mongo/s/cluster_commands_helpers.cpp b/src/mongo/s/cluster_commands_helpers.cpp index 577e3ab2a3e..0b6759c211c 100644 --- a/src/mongo/s/cluster_commands_helpers.cpp +++ b/src/mongo/s/cluster_commands_helpers.cpp @@ -506,18 +506,6 @@ AsyncRequestsSender::Response executeCommandAgainstDatabasePrimary( return std::move(responses.front()); } -AsyncRequestsSender::Response executeRawCommandAgainstDatabasePrimary( - OperationContext* opCtx, - StringData dbName, - const CachedDatabaseInfo& dbInfo, - const BSONObj& cmdObj, - const ReadPreferenceSetting& readPref, - Shard::RetryPolicy retryPolicy) { - auto responses = - gatherResponses(opCtx, dbName, readPref, retryPolicy, {{dbInfo.primaryId(), cmdObj}}); - return std::move(responses.front()); -} - AsyncRequestsSender::Response executeCommandAgainstShardWithMinKeyChunk( OperationContext* opCtx, const NamespaceString& nss, diff --git a/src/mongo/s/cluster_commands_helpers.h b/src/mongo/s/cluster_commands_helpers.h index 9638972b5ab..d6443df0407 100644 --- a/src/mongo/s/cluster_commands_helpers.h +++ b/src/mongo/s/cluster_commands_helpers.h @@ -253,18 +253,6 @@ AsyncRequestsSender::Response executeCommandAgainstDatabasePrimary( Shard::RetryPolicy retryPolicy); /** - * Utility for dispatching commands against the primary of a database. Does not attach a database or - * shard version to the command object, but instead issues it exactly as provided. Does not retry. - */ -AsyncRequestsSender::Response executeRawCommandAgainstDatabasePrimary( - OperationContext* opCtx, - StringData dbName, - const CachedDatabaseInfo& dbInfo, - const BSONObj& cmdObj, - const ReadPreferenceSetting& readPref, - Shard::RetryPolicy retryPolicy); - -/** * Utility for dispatching commands against the shard with the MinKey chunk for the namespace and * attaching the appropriate shard version. * diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp index 507e9adf9e4..e5f0c36e552 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner.cpp @@ -148,8 +148,7 @@ BSONObj createCommandForMergingShard(Document serializedCommand, } // Attach the IGNORED chunk version to the command. On the shard, this will skip the actual - // version check but will nonetheless mark the operation as versioned, indicating that any - // internal operations executed by the pipeline should also be appropriately versioned. + // version check but will nonetheless mark the operation as versioned. auto mergeCmdObj = appendShardVersion(mergeCmd.freeze().toBson(), ChunkVersion::IGNORED()); // Attach the read and write concerns if needed, and return the final command object. |