summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorEric Cox <eric.cox@mongodb.com>2021-10-08 22:19:34 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-10-08 23:34:26 +0000
commit3f69e79c0ecda4c76efa25b66ea5cdc87acd4c98 (patch)
tree3a0cd26810cdd8008f809c40df6007a8670f3de5 /src
parent4aaacbdc5159eaf3365880f5a0b84db14c2f6941 (diff)
downloadmongo-3f69e79c0ecda4c76efa25b66ea5cdc87acd4c98.tar.gz
SERVER-59924 Error executing aggregate with $out with "available" read concern on sharded clusters
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/pipeline/process_interface/mongo_process_interface.h2
-rw-r--r--src/mongo/db/pipeline/process_interface/mongod_process_interface_factory.cpp2
-rw-r--r--src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp85
-rw-r--r--src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h23
-rw-r--r--src/mongo/db/s/config/initial_split_policy.cpp2
-rw-r--r--src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp1
-rw-r--r--src/mongo/s/cluster_commands_helpers.cpp12
-rw-r--r--src/mongo/s/cluster_commands_helpers.h12
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.cpp3
9 files changed, 37 insertions, 105 deletions
diff --git a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
index 64d23c976a4..9cf6043dfc6 100644
--- a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
@@ -432,7 +432,7 @@ public:
* checked by the commands infrastructure. Used by $lookup and $graphLookup to enforce the
* constraint that the foreign collection must be unsharded if featureFlagShardedLookup is
* turned off. Also used to enforce that the catalog cache is up-to-date when doing a local
- * read.If the parent operation is unversioned, this method does nothing.
+ * read.
*/
virtual void setExpectedShardVersion(OperationContext* opCtx,
const NamespaceString& nss,
diff --git a/src/mongo/db/pipeline/process_interface/mongod_process_interface_factory.cpp b/src/mongo/db/pipeline/process_interface/mongod_process_interface_factory.cpp
index 20d666ef96e..0405df29bed 100644
--- a/src/mongo/db/pipeline/process_interface/mongod_process_interface_factory.cpp
+++ b/src/mongo/db/pipeline/process_interface/mongod_process_interface_factory.cpp
@@ -48,7 +48,7 @@ std::shared_ptr<MongoProcessInterface> MongoProcessInterfaceCreateImpl(Operation
(opCtx->getClient()->session()->getTags() & transport::Session::kInternalClient);
if (ShardingState::get(opCtx)->enabled() && isInternalClient) {
return std::make_shared<ShardServerProcessInterface>(
- opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor());
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor());
} else if (auto executor = ReplicaSetNodeProcessInterface::getReplicaSetNodeExecutor(opCtx)) {
return std::make_shared<ReplicaSetNodeProcessInterface>(std::move(executor));
}
diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
index 67bd2f2d4f9..379cee15e4a 100644
--- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
@@ -58,12 +58,6 @@ namespace mongo {
using namespace fmt::literals;
-ShardServerProcessInterface::ShardServerProcessInterface(
- OperationContext* opCtx, std::shared_ptr<executor::TaskExecutor> executor)
- : CommonMongodProcessInterface(executor) {
- _opIsVersioned = OperationShardingState::isOperationVersioned(opCtx);
-}
-
bool ShardServerProcessInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) {
const auto cm =
uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss));
@@ -216,15 +210,14 @@ void ShardServerProcessInterface::renameIfOptionsAndIndexesHaveNotChanged(
newCmdWithWriteConcernBuilder.append(WriteConcernOptions::kWriteConcernField,
opCtx->getWriteConcern().toBSON());
newCmdObj = newCmdWithWriteConcernBuilder.done();
- auto response = executeRawCommandAgainstDatabasePrimary(
- opCtx,
- // internalRenameIfOptionsAndIndexesMatch is adminOnly.
- NamespaceString::kAdminDb,
- cachedDbInfo,
- // $out target collection must not exist or not be sharded.
- _versionCommandIfAppropriate(newCmdObj, cachedDbInfo, ChunkVersion::UNSHARDED()),
- ReadPreferenceSetting(ReadPreference::PrimaryOnly),
- Shard::RetryPolicy::kNoRetry);
+ auto response =
+ executeCommandAgainstDatabasePrimary(opCtx,
+ // internalRenameIfOptionsAndIndexesMatch is adminOnly.
+ NamespaceString::kAdminDb,
+ std::move(cachedDbInfo),
+ newCmdObj,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ Shard::RetryPolicy::kNoRetry);
uassertStatusOKWithContext(response.swResponse,
str::stream() << "failed while running command " << newCmdObj);
auto result = response.swResponse.getValue().data;
@@ -254,7 +247,7 @@ BSONObj ShardServerProcessInterface::getCollectionOptions(OperationContext* opCt
shard->runExhaustiveCursorCommand(opCtx,
ReadPreferenceSetting(ReadPreference::PrimaryOnly),
nss.db().toString(),
- _versionCommandIfAppropriate(cmdObj, cachedDbInfo),
+ appendDbVersionIfPresent(cmdObj, cachedDbInfo),
Milliseconds(-1)));
} catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) {
return BSONObj{};
@@ -300,7 +293,7 @@ std::list<BSONObj> ShardServerProcessInterface::getIndexSpecs(OperationContext*
shard->runExhaustiveCursorCommand(opCtx,
ReadPreferenceSetting(ReadPreference::PrimaryOnly),
ns.db().toString(),
- _versionCommandIfAppropriate(cmdObj, cachedDbInfo),
+ appendDbVersionIfPresent(cmdObj, cachedDbInfo),
Milliseconds(-1)));
} catch (ExceptionFor<ErrorCodes::NamespaceNotFound>&) {
return std::list<BSONObj>();
@@ -317,13 +310,13 @@ void ShardServerProcessInterface::createCollection(OperationContext* opCtx,
finalCmdBuilder.append(WriteConcernOptions::kWriteConcernField,
opCtx->getWriteConcern().toBSON());
BSONObj finalCmdObj = finalCmdBuilder.obj();
- auto response = executeRawCommandAgainstDatabasePrimary(
- opCtx,
- dbName,
- cachedDbInfo,
- _versionCommandIfAppropriate(finalCmdObj, cachedDbInfo),
- ReadPreferenceSetting(ReadPreference::PrimaryOnly),
- Shard::RetryPolicy::kIdempotent);
+ auto response =
+ executeCommandAgainstDatabasePrimary(opCtx,
+ dbName,
+ std::move(cachedDbInfo),
+ finalCmdObj,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ Shard::RetryPolicy::kIdempotent);
uassertStatusOKWithContext(response.swResponse,
str::stream() << "failed while running command " << finalCmdObj);
auto result = response.swResponse.getValue().data;
@@ -351,11 +344,11 @@ void ShardServerProcessInterface::createIndexesOnEmptyCollection(
ns,
"copying index for empty collection {}"_format(ns.ns()),
[&] {
- auto response = executeRawCommandAgainstDatabasePrimary(
+ auto response = executeCommandAgainstDatabasePrimary(
opCtx,
ns.db(),
- cachedDbInfo,
- _versionCommandIfAppropriate(cmdObj, cachedDbInfo),
+ std::move(cachedDbInfo),
+ cmdObj,
ReadPreferenceSetting(ReadPreference::PrimaryOnly),
Shard::RetryPolicy::kIdempotent);
@@ -381,14 +374,13 @@ void ShardServerProcessInterface::dropCollection(OperationContext* opCtx,
newCmdBuilder.append(WriteConcernOptions::kWriteConcernField,
opCtx->getWriteConcern().toBSON());
auto cmdObj = newCmdBuilder.done();
- auto response = executeRawCommandAgainstDatabasePrimary(
- opCtx,
- ns.db(),
- cachedDbInfo,
- // Only unsharded collections can be dropped.
- _versionCommandIfAppropriate(cmdObj, cachedDbInfo, ChunkVersion::UNSHARDED()),
- ReadPreferenceSetting(ReadPreference::PrimaryOnly),
- Shard::RetryPolicy::kIdempotent);
+ auto response =
+ executeCommandAgainstDatabasePrimary(opCtx,
+ ns.db(),
+ std::move(cachedDbInfo),
+ cmdObj,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ Shard::RetryPolicy::kIdempotent);
uassertStatusOKWithContext(response.swResponse,
str::stream() << "failed while running command " << cmdObj);
auto result = response.swResponse.getValue().data;
@@ -422,9 +414,9 @@ bool ShardServerProcessInterface::setExpectedDbVersion(OperationContext* opCtx,
uassert(ErrorCodes::IllegalOperation,
"Expected db version must match known db version",
knownDBVersion == dbVersion);
- } else if (_opIsVersioned) {
- oss.initializeClientRoutingVersions(nss, boost::none, dbVersion);
- return true;
+ } else {
+ OperationShardingState::get(opCtx).initializeClientRoutingVersions(
+ nss, boost::none, dbVersion);
}
return false;
@@ -437,8 +429,9 @@ void ShardServerProcessInterface::setExpectedShardVersion(
auto& oss = OperationShardingState::get(opCtx);
if (oss.hasShardVersion(nss)) {
invariant(oss.getShardVersion(nss) == chunkVersion);
- } else if (_opIsVersioned) {
- oss.initializeClientRoutingVersions(nss, chunkVersion, boost::none);
+ } else {
+ OperationShardingState::get(opCtx).initializeClientRoutingVersions(
+ nss, chunkVersion, boost::none);
}
}
@@ -446,16 +439,4 @@ void ShardServerProcessInterface::checkOnPrimaryShardForDb(OperationContext* opC
const NamespaceString& nss) {
DatabaseShardingState::checkIsPrimaryShardForDb(opCtx, nss.db());
}
-
-BSONObj ShardServerProcessInterface::_versionCommandIfAppropriate(
- BSONObj cmdObj,
- const CachedDatabaseInfo& cachedDbInfo,
- boost::optional<ChunkVersion> shardVersion) {
- if (!_opIsVersioned) {
- return cmdObj;
- }
- return appendDbVersionIfPresent(
- shardVersion ? appendShardVersion(cmdObj, *shardVersion) : cmdObj, cachedDbInfo);
-}
-
} // namespace mongo
diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h
index 59309fddcd4..dc4b26a971a 100644
--- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h
@@ -31,7 +31,6 @@
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/pipeline/process_interface/common_mongod_process_interface.h"
-#include "mongo/s/catalog_cache.h"
namespace mongo {
@@ -42,9 +41,6 @@ class ShardServerProcessInterface final : public CommonMongodProcessInterface {
public:
using CommonMongodProcessInterface::CommonMongodProcessInterface;
- ShardServerProcessInterface(OperationContext* opCtx,
- std::shared_ptr<executor::TaskExecutor> executor);
-
/**
* Note: Cannot be called while holding a lock. Refreshes from the config servers if the
* metadata for the given namespace does not exist. Otherwise, will not automatically refresh,
@@ -146,25 +142,6 @@ public:
void unsetExpectedDbVersion(OperationContext* opCtx, const NamespaceString& nss) final;
void checkOnPrimaryShardForDb(OperationContext* opCtx, const NamespaceString& nss) final;
-
-private:
- // If the current operation is versioned, then we attach the DB version to the command object;
- // otherwise, it is returned unmodified. Used when running internal commands, as the parent
- // operation may be unversioned if run by a client connecting directly to the shard. If a shard
- // version is supplied it will be set on the command, otherwise no shard version is attached.
- // This allows sub-operations to set a version where necessary (e.g. to enforce that only
- // unsharded collections can be renamed) while omitting it in cases where it is not relevant
- // (e.g. obtaining a list of indexes for a collection that may be sharded or unsharded).
- BSONObj _versionCommandIfAppropriate(BSONObj cmdObj,
- const CachedDatabaseInfo& cachedDbInfo,
- boost::optional<ChunkVersion> shardVersion = boost::none);
-
- // Records whether the initial operation which creates this MongoProcessInterface is versioned.
- // We want to avoid applying versions to sub-operations in cases where the client has connected
- // directly to a shard. Since getMores are never versioned, we must retain the versioning state
- // of the original operation so that we can decide whether we should version sub-operations
- // across the entire lifetime of the pipeline that owns this MongoProcessInterface.
- bool _opIsVersioned = false;
};
} // namespace mongo
diff --git a/src/mongo/db/s/config/initial_split_policy.cpp b/src/mongo/db/s/config/initial_split_policy.cpp
index 3bf9be92769..5bdfb55d6c3 100644
--- a/src/mongo/db/s/config/initial_split_policy.cpp
+++ b/src/mongo/db/s/config/initial_split_policy.cpp
@@ -827,7 +827,7 @@ ReshardingSplitPolicy::_makePipelineDocumentSource(OperationContext* opCtx,
// ShardServerProcessInterface instead of getting it from the generic factory so the pipeline
// can talk to the shards.
auto pi = std::make_shared<ShardServerProcessInterface>(
- opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor());
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor());
auto expCtx = make_intrusive<ExpressionContext>(opCtx,
boost::none, /* explain */
diff --git a/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp b/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp
index ba1efd5a017..da8acbf4f2e 100644
--- a/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp
@@ -351,7 +351,6 @@ protected:
std::shared_ptr<MongoProcessInterface> makeMongoProcessInterface() {
return std::make_shared<ShardServerProcessInterface>(
- operationContext(),
Grid::get(getServiceContext())->getExecutorPool()->getFixedExecutor());
}
diff --git a/src/mongo/s/cluster_commands_helpers.cpp b/src/mongo/s/cluster_commands_helpers.cpp
index 577e3ab2a3e..0b6759c211c 100644
--- a/src/mongo/s/cluster_commands_helpers.cpp
+++ b/src/mongo/s/cluster_commands_helpers.cpp
@@ -506,18 +506,6 @@ AsyncRequestsSender::Response executeCommandAgainstDatabasePrimary(
return std::move(responses.front());
}
-AsyncRequestsSender::Response executeRawCommandAgainstDatabasePrimary(
- OperationContext* opCtx,
- StringData dbName,
- const CachedDatabaseInfo& dbInfo,
- const BSONObj& cmdObj,
- const ReadPreferenceSetting& readPref,
- Shard::RetryPolicy retryPolicy) {
- auto responses =
- gatherResponses(opCtx, dbName, readPref, retryPolicy, {{dbInfo.primaryId(), cmdObj}});
- return std::move(responses.front());
-}
-
AsyncRequestsSender::Response executeCommandAgainstShardWithMinKeyChunk(
OperationContext* opCtx,
const NamespaceString& nss,
diff --git a/src/mongo/s/cluster_commands_helpers.h b/src/mongo/s/cluster_commands_helpers.h
index 9638972b5ab..d6443df0407 100644
--- a/src/mongo/s/cluster_commands_helpers.h
+++ b/src/mongo/s/cluster_commands_helpers.h
@@ -253,18 +253,6 @@ AsyncRequestsSender::Response executeCommandAgainstDatabasePrimary(
Shard::RetryPolicy retryPolicy);
/**
- * Utility for dispatching commands against the primary of a database. Does not attach a database or
- * shard version to the command object, but instead issues it exactly as provided. Does not retry.
- */
-AsyncRequestsSender::Response executeRawCommandAgainstDatabasePrimary(
- OperationContext* opCtx,
- StringData dbName,
- const CachedDatabaseInfo& dbInfo,
- const BSONObj& cmdObj,
- const ReadPreferenceSetting& readPref,
- Shard::RetryPolicy retryPolicy);
-
-/**
* Utility for dispatching commands against the shard with the MinKey chunk for the namespace and
* attaching the appropriate shard version.
*
diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp
index 507e9adf9e4..e5f0c36e552 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.cpp
+++ b/src/mongo/s/query/cluster_aggregation_planner.cpp
@@ -148,8 +148,7 @@ BSONObj createCommandForMergingShard(Document serializedCommand,
}
// Attach the IGNORED chunk version to the command. On the shard, this will skip the actual
- // version check but will nonetheless mark the operation as versioned, indicating that any
- // internal operations executed by the pipeline should also be appropriately versioned.
+ // version check but will nonetheless mark the operation as versioned.
auto mergeCmdObj = appendShardVersion(mergeCmd.freeze().toBson(), ChunkVersion::IGNORED());
// Attach the read and write concerns if needed, and return the final command object.