summaryrefslogtreecommitdiff
path: root/src/mongo/db/s
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r--src/mongo/db/s/SConscript4
-rw-r--r--src/mongo/db/s/analyze_shard_key_cmd_util.cpp102
-rw-r--r--src/mongo/db/s/query_analysis_coordinator.cpp3
-rw-r--r--src/mongo/db/s/query_analysis_coordinator.h2
-rw-r--r--src/mongo/db/s/query_analysis_op_observer.cpp1
-rw-r--r--src/mongo/db/s/query_analysis_writer.cpp154
-rw-r--r--src/mongo/db/s/query_analysis_writer.h2
7 files changed, 16 insertions, 252 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index dba15fd4fcb..9bd202acd40 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -83,7 +83,8 @@ env.Library(
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/db/shard_role',
'$BUILD_DIR/mongo/idl/idl_parser',
- '$BUILD_DIR/mongo/s/analyze_shard_key_common',
+ '$BUILD_DIR/mongo/s/analyze_shard_key_idl',
+ '$BUILD_DIR/mongo/s/analyze_shard_key_util',
],
)
@@ -588,6 +589,7 @@ env.Library(
'$BUILD_DIR/mongo/db/timeseries/timeseries_options',
'$BUILD_DIR/mongo/db/transaction/transaction_api',
'$BUILD_DIR/mongo/idl/cluster_server_parameter',
+ '$BUILD_DIR/mongo/s/analyze_shard_key_util',
'$BUILD_DIR/mongo/s/commands/cluster_commands_common',
'$BUILD_DIR/mongo/s/commands/sharded_cluster_sharding_commands',
'$BUILD_DIR/mongo/s/common_s',
diff --git a/src/mongo/db/s/analyze_shard_key_cmd_util.cpp b/src/mongo/db/s/analyze_shard_key_cmd_util.cpp
index b714d85817e..6e5fced53f1 100644
--- a/src/mongo/db/s/analyze_shard_key_cmd_util.cpp
+++ b/src/mongo/db/s/analyze_shard_key_cmd_util.cpp
@@ -42,15 +42,12 @@
#include "mongo/logv2/log.h"
#include "mongo/rpc/factory.h"
#include "mongo/s/analyze_shard_key_server_parameters_gen.h"
-#include "mongo/s/cluster_commands_helpers.h"
+#include "mongo/s/analyze_shard_key_util.h"
#include "mongo/s/grid.h"
-#include "mongo/s/query/cluster_aggregate.h"
#include "mongo/s/service_entry_point_mongos.h"
-#include "mongo/s/stale_shard_version_helpers.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
-
namespace mongo {
namespace analyze_shard_key {
@@ -147,69 +144,6 @@ AggregateCommandRequest makeAggregateRequestForCardinalityAndFrequency(
return aggRequest;
}
-/**
- * Runs the given aggregate command request and applies 'callbackFn' to each returned document. On a
- * sharded cluster, automatically retries on shard versioning errors. Does not support runnning
- * getMore commands for the aggregation.
- */
-void runAggregate(OperationContext* opCtx,
- const NamespaceString& nss,
- AggregateCommandRequest aggRequest,
- std::function<void(const BSONObj&)> callbackFn) {
- if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
- const auto& catalogCache = Grid::get(opCtx)->catalogCache();
- bool succeeded = false;
-
- while (true) {
- try {
- shardVersionRetry(opCtx, catalogCache, nss, "AnalyzeShardKeyAggregation"_sd, [&] {
- BSONObjBuilder responseBuilder;
- uassertStatusOK(
- ClusterAggregate::runAggregate(opCtx,
- ClusterAggregate::Namespaces{nss, nss},
- aggRequest,
- LiteParsedPipeline{aggRequest},
- PrivilegeVector(),
- &responseBuilder));
- succeeded = true;
- auto response = responseBuilder.obj();
- auto firstBatch = response.firstElement()["firstBatch"].Obj();
- BSONObjIterator it(firstBatch);
-
- while (it.more()) {
- auto doc = it.next().Obj();
- callbackFn(doc);
- }
- });
- return;
- } catch (const DBException& ex) {
- if (ex.toStatus() == ErrorCodes::ShardNotFound) {
- // 'callbackFn' should never trigger a ShardNotFound error. It is also incorrect
- // to retry the aggregate command after some documents have already been
- // processed.
- invariant(!succeeded);
-
- LOGV2(6875200,
- "Failed to run aggregate command to analyze shard key",
- "error"_attr = ex.toStatus());
- continue;
- }
- throw;
- }
- }
-
- } else {
- DBDirectClient client(opCtx);
- auto cursor = uassertStatusOK(DBClientCursor::fromAggregationRequest(
- &client, aggRequest, true /* secondaryOk */, false /* useExhaust*/));
-
- while (cursor->more()) {
- auto doc = cursor->next();
- callbackFn(doc);
- }
- }
-}
-
struct IndexSpec {
BSONObj keyPattern;
bool isUnique;
@@ -290,7 +224,7 @@ CardinalityFrequencyMetricsBundle calculateCardinalityAndFrequency(OperationCont
}
auto aggRequest = makeAggregateRequestForCardinalityAndFrequency(nss, shardKey, hintIndexKey);
- runAggregate(opCtx, nss, aggRequest, [&](const BSONObj& doc) {
+ runAggregate(opCtx, aggRequest, [&](const BSONObj& doc) {
auto numDocs = doc.getField(kNumDocsFieldName).exactNumberLong();
auto cardinality = doc.getField(kCardinalityFieldName).exactNumberLong();
auto frequency = doc.getField(kFrequencyFieldName).exactNumberLong();
@@ -435,39 +369,11 @@ boost::optional<int64_t> getNumOrphanDocuments(OperationContext* opCtx,
<< BSON("$sum"
<< "$" + RangeDeletionTask::kNumOrphanDocsFieldName))));
AggregateCommandRequest aggRequest(NamespaceString::kRangeDeletionNamespace, pipeline);
- auto cmdObj = applyReadWriteConcern(
- opCtx, true /* appendRC */, true /* appendWC */, aggRequest.toBSON({}));
-
- std::set<ShardId> shardIds;
- cm.getAllShardIds(&shardIds);
- std::vector<AsyncRequestsSender::Request> requests;
- for (const auto& shardId : shardIds) {
- requests.emplace_back(shardId, cmdObj);
- }
- auto shardResults = gatherResponses(opCtx,
- NamespaceString::kConfigDb,
- ReadPreferenceSetting(ReadPreference::SecondaryPreferred),
- Shard::RetryPolicy::kIdempotent,
- requests);
long long numOrphanDocs = 0;
- for (const auto& shardResult : shardResults) {
- const auto shardResponse = uassertStatusOK(std::move(shardResult.swResponse));
- uassertStatusOK(shardResponse.status);
- const auto cmdResult = shardResponse.data;
- uassertStatusOK(getStatusFromCommandResult(cmdResult));
-
- auto firstBatch = cmdResult.firstElement()["firstBatch"].Obj();
- BSONObjIterator it(firstBatch);
-
- if (!it.more()) {
- continue;
- }
-
- auto doc = it.next().Obj();
- invariant(!it.more());
+ runAggregate(opCtx, nss, aggRequest, [&](const BSONObj& doc) {
numOrphanDocs += doc.getField(kNumOrphanDocsFieldName).exactNumberLong();
- }
+ });
return numOrphanDocs;
}
diff --git a/src/mongo/db/s/query_analysis_coordinator.cpp b/src/mongo/db/s/query_analysis_coordinator.cpp
index aa427ec8025..85adc506fc5 100644
--- a/src/mongo/db/s/query_analysis_coordinator.cpp
+++ b/src/mongo/db/s/query_analysis_coordinator.cpp
@@ -65,8 +65,7 @@ QueryAnalysisCoordinator* QueryAnalysisCoordinator::get(ServiceContext* serviceC
bool QueryAnalysisCoordinator::shouldRegisterReplicaSetAwareService() const {
// This is invoked when the Register above is constructed which is before FCV is set so we need
// to ignore FCV when checking if the feature flag is enabled.
- return analyze_shard_key::isFeatureFlagEnabledIgnoreFCV() &&
- serverGlobalParams.clusterRole == ClusterRole::ConfigServer;
+ return supportsCoordinatingQueryAnalysisIgnoreFCV();
}
void QueryAnalysisCoordinator::onConfigurationInsert(const BSONObj& doc) {
diff --git a/src/mongo/db/s/query_analysis_coordinator.h b/src/mongo/db/s/query_analysis_coordinator.h
index 282e8cba675..2525bdcbef5 100644
--- a/src/mongo/db/s/query_analysis_coordinator.h
+++ b/src/mongo/db/s/query_analysis_coordinator.h
@@ -33,7 +33,7 @@
#include "mongo/db/repl/replica_set_aware_service.h"
#include "mongo/db/service_context.h"
#include "mongo/s/analyze_shard_key_common_gen.h"
-#include "mongo/s/analyze_shard_key_util.h"
+#include "mongo/s/analyze_shard_key_role.h"
#include "mongo/util/periodic_runner.h"
namespace mongo {
diff --git a/src/mongo/db/s/query_analysis_op_observer.cpp b/src/mongo/db/s/query_analysis_op_observer.cpp
index 96ebcdf7d8d..499d3e33fab 100644
--- a/src/mongo/db/s/query_analysis_op_observer.cpp
+++ b/src/mongo/db/s/query_analysis_op_observer.cpp
@@ -33,7 +33,6 @@
#include "mongo/db/s/query_analysis_op_observer.h"
#include "mongo/db/s/query_analysis_writer.h"
#include "mongo/logv2/log.h"
-#include "mongo/s/analyze_shard_key_util.h"
#include "mongo/s/catalog/type_mongos.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault
diff --git a/src/mongo/db/s/query_analysis_writer.cpp b/src/mongo/db/s/query_analysis_writer.cpp
index dfa341f18db..f1c28e3143e 100644
--- a/src/mongo/db/s/query_analysis_writer.cpp
+++ b/src/mongo/db/s/query_analysis_writer.cpp
@@ -30,17 +30,15 @@
#include "mongo/db/s/query_analysis_writer.h"
-#include "mongo/client/connpool.h"
#include "mongo/db/catalog/collection_catalog.h"
-#include "mongo/db/dbdirectclient.h"
#include "mongo/db/ops/write_ops.h"
-#include "mongo/db/server_options.h"
#include "mongo/db/update/document_diff_calculator.h"
#include "mongo/executor/network_interface_factory.h"
#include "mongo/executor/thread_pool_task_executor.h"
#include "mongo/logv2/log.h"
#include "mongo/s/analyze_shard_key_documents_gen.h"
#include "mongo/s/analyze_shard_key_server_parameters_gen.h"
+#include "mongo/s/analyze_shard_key_util.h"
#include "mongo/s/write_ops/batched_command_response.h"
#include "mongo/util/concurrency/thread_pool.h"
@@ -52,130 +50,9 @@ namespace analyze_shard_key {
namespace {
MONGO_FAIL_POINT_DEFINE(disableQueryAnalysisWriter);
-MONGO_FAIL_POINT_DEFINE(hangQueryAnalysisWriterBeforeWritingLocally);
-MONGO_FAIL_POINT_DEFINE(hangQueryAnalysisWriterBeforeWritingRemotely);
const auto getQueryAnalysisWriter = ServiceContext::declareDecoration<QueryAnalysisWriter>();
-constexpr int kMaxRetriesOnRetryableErrors = 5;
-const WriteConcernOptions kMajorityWriteConcern{WriteConcernOptions::kMajority,
- WriteConcernOptions::SyncMode::UNSET,
- WriteConcernOptions::kWriteConcernTimeoutSystem};
-
-// The size limit for the documents to an insert in a single batch. Leave some padding for other
-// fields in the insert command.
-constexpr int kMaxBSONObjSizeForInsert = BSONObjMaxUserSize - 500 * 1024;
-
-/*
- * Returns true if this mongod can accept writes to the given collection. Unless the collection is
- * in the "local" database, this will only return true if this mongod is a primary (or a
- * standalone).
- */
-bool canAcceptWrites(OperationContext* opCtx, const NamespaceString& ns) {
- ShouldNotConflictWithSecondaryBatchApplicationBlock noPBWMBlock(opCtx->lockState());
- Lock::DBLock lk(opCtx, ns.dbName(), MODE_IS);
- Lock::CollectionLock lock(opCtx, ns, MODE_IS);
- return mongo::repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase(opCtx,
- ns.db());
-}
-
-/*
- * Runs the given write command against the given database locally, asserts that the top-level
- * command is OK, then asserts the write status using the given 'uassertWriteStatusCb' callback.
- * Returns the command response.
- */
-BSONObj executeWriteCommandLocal(OperationContext* opCtx,
- const std::string dbName,
- const BSONObj& cmdObj,
- const std::function<void(const BSONObj&)>& uassertWriteStatusCb) {
- DBDirectClient client(opCtx);
- BSONObj resObj;
-
- if (!client.runCommand(dbName, cmdObj, resObj)) {
- uassertStatusOK(getStatusFromCommandResult(resObj));
- }
- uassertWriteStatusCb(resObj);
-
- return resObj;
-}
-
-/*
- * Runs the given write command against the given database on the (remote) primary, asserts that the
- * top-level command is OK, then asserts the write status using the given 'uassertWriteStatusCb'
- * callback. Throws a PrimarySteppedDown error if no primary is found. Returns the command response.
- */
-BSONObj executeWriteCommandRemote(OperationContext* opCtx,
- const std::string dbName,
- const BSONObj& cmdObj,
- const std::function<void(const BSONObj&)>& uassertWriteStatusCb) {
- auto hostAndPort = repl::ReplicationCoordinator::get(opCtx)->getCurrentPrimaryHostAndPort();
-
- if (hostAndPort.empty()) {
- uasserted(ErrorCodes::PrimarySteppedDown, "No primary exists currently");
- }
-
- auto conn = std::make_unique<ScopedDbConnection>(hostAndPort.toString());
-
- if (auth::isInternalAuthSet()) {
- uassertStatusOK(conn->get()->authenticateInternalUser());
- }
-
- DBClientBase* client = conn->get();
- ScopeGuard guard([&] { conn->done(); });
- try {
- BSONObj resObj;
-
- if (!client->runCommand(dbName, cmdObj, resObj)) {
- uassertStatusOK(getStatusFromCommandResult(resObj));
- }
- uassertWriteStatusCb(resObj);
-
- return resObj;
- } catch (...) {
- guard.dismiss();
- conn->kill();
- throw;
- }
-}
-
-/*
- * Runs the given write command against the given collection. If this mongod is currently the
- * primary, runs the write command locally. Otherwise, runs the command on the remote primary.
- * Internally asserts that the top-level command is OK, then asserts the write status using the
- * given 'uassertWriteStatusCb' callback. Internally retries the write command on retryable errors
- * (for kMaxRetriesOnRetryableErrors times) so the writes must be idempotent. Returns the
- * command response.
- */
-BSONObj executeWriteCommand(OperationContext* opCtx,
- const NamespaceString& ns,
- const BSONObj& cmdObj,
- const std::function<void(const BSONObj&)>& uassertWriteStatusCb) {
- const auto dbName = ns.db().toString();
- auto numRetries = 0;
-
- while (true) {
- try {
- if (canAcceptWrites(opCtx, ns)) {
- // There is a window here where this mongod may step down after check above. In this
- // case, a NotWritablePrimary error would be thrown. However, this is preferable to
- // running the command while holding locks.
- hangQueryAnalysisWriterBeforeWritingLocally.pauseWhileSet(opCtx);
- return executeWriteCommandLocal(opCtx, dbName, cmdObj, uassertWriteStatusCb);
- }
-
- hangQueryAnalysisWriterBeforeWritingRemotely.pauseWhileSet(opCtx);
- return executeWriteCommandRemote(opCtx, dbName, cmdObj, uassertWriteStatusCb);
- } catch (DBException& ex) {
- if (ErrorCodes::isRetriableError(ex) && numRetries < kMaxRetriesOnRetryableErrors) {
- numRetries++;
- continue;
- }
- throw;
- }
- }
-
- return {};
-}
struct SampledWriteCommandRequest {
UUID sampleId;
@@ -359,6 +236,8 @@ void QueryAnalysisWriter::_flush(OperationContext* opCtx,
}
});
+ LOGV2_DEBUG(6876101, 2, "Persisting sampled queries", "count"_attr = tmpBuffer.getCount());
+
// Insert the documents in batches from the back of the buffer so that we don't need to move the
// documents forward after each batch.
size_t baseIndex = tmpBuffer.getCount() - 1;
@@ -369,11 +248,10 @@ void QueryAnalysisWriter::_flush(OperationContext* opCtx,
long long objSize = 0;
size_t lastIndex = tmpBuffer.getCount(); // inclusive
- LOGV2_DEBUG(6876101, 2, "Persisting sampled queries", "buffer"_attr = tmpBuffer.getCount());
while (lastIndex > 0 && docsToInsert.size() < maxBatchSize) {
// Check if the next document can fit in the batch.
auto doc = tmpBuffer.at(lastIndex - 1);
- if (doc.objsize() + objSize >= kMaxBSONObjSizeForInsert) {
+ if (doc.objsize() + objSize >= kMaxBSONObjSizePerInsertBatch) {
break;
}
lastIndex--;
@@ -383,28 +261,8 @@ void QueryAnalysisWriter::_flush(OperationContext* opCtx,
// We don't add a document that is above the size limit to the buffer so we should have
// added at least one document to 'docsToInsert'.
invariant(!docsToInsert.empty());
- LOGV2_DEBUG(
- 6876102, 2, "Persisting sapmled queries", "docsToInsert"_attr = docsToInsert.size());
-
- write_ops::InsertCommandRequest insertCmd(ns);
- insertCmd.setDocuments(std::move(docsToInsert));
- insertCmd.setWriteCommandRequestBase([&] {
- write_ops::WriteCommandRequestBase wcb;
- wcb.setOrdered(false);
- wcb.setBypassDocumentValidation(false);
- return wcb;
- }());
- auto insertCmdBson = insertCmd.toBSON(
- {BSON(WriteConcernOptions::kWriteConcernField << kMajorityWriteConcern.toBSON())});
-
- executeWriteCommand(opCtx, ns, std::move(insertCmdBson), [&](const BSONObj& resObj) {
- BatchedCommandResponse response;
- std::string errMsg;
-
- if (!response.parseBSON(resObj, &errMsg)) {
- uasserted(ErrorCodes::FailedToParse, errMsg);
- }
+ insertDocuments(opCtx, ns, docsToInsert, [&](const BatchedCommandResponse& response) {
if (response.isErrDetailsSet() && response.sizeErrDetails() > 0) {
boost::optional<write_ops::WriteError> firstWriteErr;
@@ -439,7 +297,7 @@ void QueryAnalysisWriter::_flush(OperationContext* opCtx,
}
void QueryAnalysisWriter::Buffer::add(BSONObj doc) {
- if (doc.objsize() > kMaxBSONObjSizeForInsert) {
+ if (doc.objsize() > kMaxBSONObjSizePerInsertBatch) {
return;
}
_docs.push_back(std::move(doc));
diff --git a/src/mongo/db/s/query_analysis_writer.h b/src/mongo/db/s/query_analysis_writer.h
index 55dcb520987..16261ca97b6 100644
--- a/src/mongo/db/s/query_analysis_writer.h
+++ b/src/mongo/db/s/query_analysis_writer.h
@@ -33,7 +33,7 @@
#include "mongo/db/service_context.h"
#include "mongo/executor/task_executor.h"
#include "mongo/s/analyze_shard_key_common_gen.h"
-#include "mongo/s/analyze_shard_key_util.h"
+#include "mongo/s/analyze_shard_key_role.h"
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/util/periodic_runner.h"