diff options
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r-- | src/mongo/db/s/SConscript | 4 | ||||
-rw-r--r-- | src/mongo/db/s/analyze_shard_key_cmd_util.cpp | 102 | ||||
-rw-r--r-- | src/mongo/db/s/query_analysis_coordinator.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/s/query_analysis_coordinator.h | 2 | ||||
-rw-r--r-- | src/mongo/db/s/query_analysis_op_observer.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/s/query_analysis_writer.cpp | 154 | ||||
-rw-r--r-- | src/mongo/db/s/query_analysis_writer.h | 2 |
7 files changed, 16 insertions, 252 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index dba15fd4fcb..9bd202acd40 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -83,7 +83,8 @@ env.Library( '$BUILD_DIR/mongo/db/service_context', '$BUILD_DIR/mongo/db/shard_role', '$BUILD_DIR/mongo/idl/idl_parser', - '$BUILD_DIR/mongo/s/analyze_shard_key_common', + '$BUILD_DIR/mongo/s/analyze_shard_key_idl', + '$BUILD_DIR/mongo/s/analyze_shard_key_util', ], ) @@ -588,6 +589,7 @@ env.Library( '$BUILD_DIR/mongo/db/timeseries/timeseries_options', '$BUILD_DIR/mongo/db/transaction/transaction_api', '$BUILD_DIR/mongo/idl/cluster_server_parameter', + '$BUILD_DIR/mongo/s/analyze_shard_key_util', '$BUILD_DIR/mongo/s/commands/cluster_commands_common', '$BUILD_DIR/mongo/s/commands/sharded_cluster_sharding_commands', '$BUILD_DIR/mongo/s/common_s', diff --git a/src/mongo/db/s/analyze_shard_key_cmd_util.cpp b/src/mongo/db/s/analyze_shard_key_cmd_util.cpp index b714d85817e..6e5fced53f1 100644 --- a/src/mongo/db/s/analyze_shard_key_cmd_util.cpp +++ b/src/mongo/db/s/analyze_shard_key_cmd_util.cpp @@ -42,15 +42,12 @@ #include "mongo/logv2/log.h" #include "mongo/rpc/factory.h" #include "mongo/s/analyze_shard_key_server_parameters_gen.h" -#include "mongo/s/cluster_commands_helpers.h" +#include "mongo/s/analyze_shard_key_util.h" #include "mongo/s/grid.h" -#include "mongo/s/query/cluster_aggregate.h" #include "mongo/s/service_entry_point_mongos.h" -#include "mongo/s/stale_shard_version_helpers.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding - namespace mongo { namespace analyze_shard_key { @@ -147,69 +144,6 @@ AggregateCommandRequest makeAggregateRequestForCardinalityAndFrequency( return aggRequest; } -/** - * Runs the given aggregate command request and applies 'callbackFn' to each returned document. On a - * sharded cluster, automatically retries on shard versioning errors. Does not support runnning - * getMore commands for the aggregation. - */ -void runAggregate(OperationContext* opCtx, - const NamespaceString& nss, - AggregateCommandRequest aggRequest, - std::function<void(const BSONObj&)> callbackFn) { - if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { - const auto& catalogCache = Grid::get(opCtx)->catalogCache(); - bool succeeded = false; - - while (true) { - try { - shardVersionRetry(opCtx, catalogCache, nss, "AnalyzeShardKeyAggregation"_sd, [&] { - BSONObjBuilder responseBuilder; - uassertStatusOK( - ClusterAggregate::runAggregate(opCtx, - ClusterAggregate::Namespaces{nss, nss}, - aggRequest, - LiteParsedPipeline{aggRequest}, - PrivilegeVector(), - &responseBuilder)); - succeeded = true; - auto response = responseBuilder.obj(); - auto firstBatch = response.firstElement()["firstBatch"].Obj(); - BSONObjIterator it(firstBatch); - - while (it.more()) { - auto doc = it.next().Obj(); - callbackFn(doc); - } - }); - return; - } catch (const DBException& ex) { - if (ex.toStatus() == ErrorCodes::ShardNotFound) { - // 'callbackFn' should never trigger a ShardNotFound error. It is also incorrect - // to retry the aggregate command after some documents have already been - // processed. - invariant(!succeeded); - - LOGV2(6875200, - "Failed to run aggregate command to analyze shard key", - "error"_attr = ex.toStatus()); - continue; - } - throw; - } - } - - } else { - DBDirectClient client(opCtx); - auto cursor = uassertStatusOK(DBClientCursor::fromAggregationRequest( - &client, aggRequest, true /* secondaryOk */, false /* useExhaust*/)); - - while (cursor->more()) { - auto doc = cursor->next(); - callbackFn(doc); - } - } -} - struct IndexSpec { BSONObj keyPattern; bool isUnique; @@ -290,7 +224,7 @@ CardinalityFrequencyMetricsBundle calculateCardinalityAndFrequency(OperationCont } auto aggRequest = makeAggregateRequestForCardinalityAndFrequency(nss, shardKey, hintIndexKey); - runAggregate(opCtx, nss, aggRequest, [&](const BSONObj& doc) { + runAggregate(opCtx, aggRequest, [&](const BSONObj& doc) { auto numDocs = doc.getField(kNumDocsFieldName).exactNumberLong(); auto cardinality = doc.getField(kCardinalityFieldName).exactNumberLong(); auto frequency = doc.getField(kFrequencyFieldName).exactNumberLong(); @@ -435,39 +369,11 @@ boost::optional<int64_t> getNumOrphanDocuments(OperationContext* opCtx, << BSON("$sum" << "$" + RangeDeletionTask::kNumOrphanDocsFieldName)))); AggregateCommandRequest aggRequest(NamespaceString::kRangeDeletionNamespace, pipeline); - auto cmdObj = applyReadWriteConcern( - opCtx, true /* appendRC */, true /* appendWC */, aggRequest.toBSON({})); - - std::set<ShardId> shardIds; - cm.getAllShardIds(&shardIds); - std::vector<AsyncRequestsSender::Request> requests; - for (const auto& shardId : shardIds) { - requests.emplace_back(shardId, cmdObj); - } - auto shardResults = gatherResponses(opCtx, - NamespaceString::kConfigDb, - ReadPreferenceSetting(ReadPreference::SecondaryPreferred), - Shard::RetryPolicy::kIdempotent, - requests); long long numOrphanDocs = 0; - for (const auto& shardResult : shardResults) { - const auto shardResponse = uassertStatusOK(std::move(shardResult.swResponse)); - uassertStatusOK(shardResponse.status); - const auto cmdResult = shardResponse.data; - uassertStatusOK(getStatusFromCommandResult(cmdResult)); - - auto firstBatch = cmdResult.firstElement()["firstBatch"].Obj(); - BSONObjIterator it(firstBatch); - - if (!it.more()) { - continue; - } - - auto doc = it.next().Obj(); - invariant(!it.more()); + runAggregate(opCtx, nss, aggRequest, [&](const BSONObj& doc) { numOrphanDocs += doc.getField(kNumOrphanDocsFieldName).exactNumberLong(); - } + }); return numOrphanDocs; } diff --git a/src/mongo/db/s/query_analysis_coordinator.cpp b/src/mongo/db/s/query_analysis_coordinator.cpp index aa427ec8025..85adc506fc5 100644 --- a/src/mongo/db/s/query_analysis_coordinator.cpp +++ b/src/mongo/db/s/query_analysis_coordinator.cpp @@ -65,8 +65,7 @@ QueryAnalysisCoordinator* QueryAnalysisCoordinator::get(ServiceContext* serviceC bool QueryAnalysisCoordinator::shouldRegisterReplicaSetAwareService() const { // This is invoked when the Register above is constructed which is before FCV is set so we need // to ignore FCV when checking if the feature flag is enabled. - return analyze_shard_key::isFeatureFlagEnabledIgnoreFCV() && - serverGlobalParams.clusterRole == ClusterRole::ConfigServer; + return supportsCoordinatingQueryAnalysisIgnoreFCV(); } void QueryAnalysisCoordinator::onConfigurationInsert(const BSONObj& doc) { diff --git a/src/mongo/db/s/query_analysis_coordinator.h b/src/mongo/db/s/query_analysis_coordinator.h index 282e8cba675..2525bdcbef5 100644 --- a/src/mongo/db/s/query_analysis_coordinator.h +++ b/src/mongo/db/s/query_analysis_coordinator.h @@ -33,7 +33,7 @@ #include "mongo/db/repl/replica_set_aware_service.h" #include "mongo/db/service_context.h" #include "mongo/s/analyze_shard_key_common_gen.h" -#include "mongo/s/analyze_shard_key_util.h" +#include "mongo/s/analyze_shard_key_role.h" #include "mongo/util/periodic_runner.h" namespace mongo { diff --git a/src/mongo/db/s/query_analysis_op_observer.cpp b/src/mongo/db/s/query_analysis_op_observer.cpp index 96ebcdf7d8d..499d3e33fab 100644 --- a/src/mongo/db/s/query_analysis_op_observer.cpp +++ b/src/mongo/db/s/query_analysis_op_observer.cpp @@ -33,7 +33,6 @@ #include "mongo/db/s/query_analysis_op_observer.h" #include "mongo/db/s/query_analysis_writer.h" #include "mongo/logv2/log.h" -#include "mongo/s/analyze_shard_key_util.h" #include "mongo/s/catalog/type_mongos.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault diff --git a/src/mongo/db/s/query_analysis_writer.cpp b/src/mongo/db/s/query_analysis_writer.cpp index dfa341f18db..f1c28e3143e 100644 --- a/src/mongo/db/s/query_analysis_writer.cpp +++ b/src/mongo/db/s/query_analysis_writer.cpp @@ -30,17 +30,15 @@ #include "mongo/db/s/query_analysis_writer.h" -#include "mongo/client/connpool.h" #include "mongo/db/catalog/collection_catalog.h" -#include "mongo/db/dbdirectclient.h" #include "mongo/db/ops/write_ops.h" -#include "mongo/db/server_options.h" #include "mongo/db/update/document_diff_calculator.h" #include "mongo/executor/network_interface_factory.h" #include "mongo/executor/thread_pool_task_executor.h" #include "mongo/logv2/log.h" #include "mongo/s/analyze_shard_key_documents_gen.h" #include "mongo/s/analyze_shard_key_server_parameters_gen.h" +#include "mongo/s/analyze_shard_key_util.h" #include "mongo/s/write_ops/batched_command_response.h" #include "mongo/util/concurrency/thread_pool.h" @@ -52,130 +50,9 @@ namespace analyze_shard_key { namespace { MONGO_FAIL_POINT_DEFINE(disableQueryAnalysisWriter); -MONGO_FAIL_POINT_DEFINE(hangQueryAnalysisWriterBeforeWritingLocally); -MONGO_FAIL_POINT_DEFINE(hangQueryAnalysisWriterBeforeWritingRemotely); const auto getQueryAnalysisWriter = ServiceContext::declareDecoration<QueryAnalysisWriter>(); -constexpr int kMaxRetriesOnRetryableErrors = 5; -const WriteConcernOptions kMajorityWriteConcern{WriteConcernOptions::kMajority, - WriteConcernOptions::SyncMode::UNSET, - WriteConcernOptions::kWriteConcernTimeoutSystem}; - -// The size limit for the documents to an insert in a single batch. Leave some padding for other -// fields in the insert command. -constexpr int kMaxBSONObjSizeForInsert = BSONObjMaxUserSize - 500 * 1024; - -/* - * Returns true if this mongod can accept writes to the given collection. Unless the collection is - * in the "local" database, this will only return true if this mongod is a primary (or a - * standalone). - */ -bool canAcceptWrites(OperationContext* opCtx, const NamespaceString& ns) { - ShouldNotConflictWithSecondaryBatchApplicationBlock noPBWMBlock(opCtx->lockState()); - Lock::DBLock lk(opCtx, ns.dbName(), MODE_IS); - Lock::CollectionLock lock(opCtx, ns, MODE_IS); - return mongo::repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase(opCtx, - ns.db()); -} - -/* - * Runs the given write command against the given database locally, asserts that the top-level - * command is OK, then asserts the write status using the given 'uassertWriteStatusCb' callback. - * Returns the command response. - */ -BSONObj executeWriteCommandLocal(OperationContext* opCtx, - const std::string dbName, - const BSONObj& cmdObj, - const std::function<void(const BSONObj&)>& uassertWriteStatusCb) { - DBDirectClient client(opCtx); - BSONObj resObj; - - if (!client.runCommand(dbName, cmdObj, resObj)) { - uassertStatusOK(getStatusFromCommandResult(resObj)); - } - uassertWriteStatusCb(resObj); - - return resObj; -} - -/* - * Runs the given write command against the given database on the (remote) primary, asserts that the - * top-level command is OK, then asserts the write status using the given 'uassertWriteStatusCb' - * callback. Throws a PrimarySteppedDown error if no primary is found. Returns the command response. - */ -BSONObj executeWriteCommandRemote(OperationContext* opCtx, - const std::string dbName, - const BSONObj& cmdObj, - const std::function<void(const BSONObj&)>& uassertWriteStatusCb) { - auto hostAndPort = repl::ReplicationCoordinator::get(opCtx)->getCurrentPrimaryHostAndPort(); - - if (hostAndPort.empty()) { - uasserted(ErrorCodes::PrimarySteppedDown, "No primary exists currently"); - } - - auto conn = std::make_unique<ScopedDbConnection>(hostAndPort.toString()); - - if (auth::isInternalAuthSet()) { - uassertStatusOK(conn->get()->authenticateInternalUser()); - } - - DBClientBase* client = conn->get(); - ScopeGuard guard([&] { conn->done(); }); - try { - BSONObj resObj; - - if (!client->runCommand(dbName, cmdObj, resObj)) { - uassertStatusOK(getStatusFromCommandResult(resObj)); - } - uassertWriteStatusCb(resObj); - - return resObj; - } catch (...) { - guard.dismiss(); - conn->kill(); - throw; - } -} - -/* - * Runs the given write command against the given collection. If this mongod is currently the - * primary, runs the write command locally. Otherwise, runs the command on the remote primary. - * Internally asserts that the top-level command is OK, then asserts the write status using the - * given 'uassertWriteStatusCb' callback. Internally retries the write command on retryable errors - * (for kMaxRetriesOnRetryableErrors times) so the writes must be idempotent. Returns the - * command response. - */ -BSONObj executeWriteCommand(OperationContext* opCtx, - const NamespaceString& ns, - const BSONObj& cmdObj, - const std::function<void(const BSONObj&)>& uassertWriteStatusCb) { - const auto dbName = ns.db().toString(); - auto numRetries = 0; - - while (true) { - try { - if (canAcceptWrites(opCtx, ns)) { - // There is a window here where this mongod may step down after check above. In this - // case, a NotWritablePrimary error would be thrown. However, this is preferable to - // running the command while holding locks. - hangQueryAnalysisWriterBeforeWritingLocally.pauseWhileSet(opCtx); - return executeWriteCommandLocal(opCtx, dbName, cmdObj, uassertWriteStatusCb); - } - - hangQueryAnalysisWriterBeforeWritingRemotely.pauseWhileSet(opCtx); - return executeWriteCommandRemote(opCtx, dbName, cmdObj, uassertWriteStatusCb); - } catch (DBException& ex) { - if (ErrorCodes::isRetriableError(ex) && numRetries < kMaxRetriesOnRetryableErrors) { - numRetries++; - continue; - } - throw; - } - } - - return {}; -} struct SampledWriteCommandRequest { UUID sampleId; @@ -359,6 +236,8 @@ void QueryAnalysisWriter::_flush(OperationContext* opCtx, } }); + LOGV2_DEBUG(6876101, 2, "Persisting sampled queries", "count"_attr = tmpBuffer.getCount()); + // Insert the documents in batches from the back of the buffer so that we don't need to move the // documents forward after each batch. size_t baseIndex = tmpBuffer.getCount() - 1; @@ -369,11 +248,10 @@ void QueryAnalysisWriter::_flush(OperationContext* opCtx, long long objSize = 0; size_t lastIndex = tmpBuffer.getCount(); // inclusive - LOGV2_DEBUG(6876101, 2, "Persisting sampled queries", "buffer"_attr = tmpBuffer.getCount()); while (lastIndex > 0 && docsToInsert.size() < maxBatchSize) { // Check if the next document can fit in the batch. auto doc = tmpBuffer.at(lastIndex - 1); - if (doc.objsize() + objSize >= kMaxBSONObjSizeForInsert) { + if (doc.objsize() + objSize >= kMaxBSONObjSizePerInsertBatch) { break; } lastIndex--; @@ -383,28 +261,8 @@ void QueryAnalysisWriter::_flush(OperationContext* opCtx, // We don't add a document that is above the size limit to the buffer so we should have // added at least one document to 'docsToInsert'. invariant(!docsToInsert.empty()); - LOGV2_DEBUG( - 6876102, 2, "Persisting sapmled queries", "docsToInsert"_attr = docsToInsert.size()); - - write_ops::InsertCommandRequest insertCmd(ns); - insertCmd.setDocuments(std::move(docsToInsert)); - insertCmd.setWriteCommandRequestBase([&] { - write_ops::WriteCommandRequestBase wcb; - wcb.setOrdered(false); - wcb.setBypassDocumentValidation(false); - return wcb; - }()); - auto insertCmdBson = insertCmd.toBSON( - {BSON(WriteConcernOptions::kWriteConcernField << kMajorityWriteConcern.toBSON())}); - - executeWriteCommand(opCtx, ns, std::move(insertCmdBson), [&](const BSONObj& resObj) { - BatchedCommandResponse response; - std::string errMsg; - - if (!response.parseBSON(resObj, &errMsg)) { - uasserted(ErrorCodes::FailedToParse, errMsg); - } + insertDocuments(opCtx, ns, docsToInsert, [&](const BatchedCommandResponse& response) { if (response.isErrDetailsSet() && response.sizeErrDetails() > 0) { boost::optional<write_ops::WriteError> firstWriteErr; @@ -439,7 +297,7 @@ void QueryAnalysisWriter::_flush(OperationContext* opCtx, } void QueryAnalysisWriter::Buffer::add(BSONObj doc) { - if (doc.objsize() > kMaxBSONObjSizeForInsert) { + if (doc.objsize() > kMaxBSONObjSizePerInsertBatch) { return; } _docs.push_back(std::move(doc)); diff --git a/src/mongo/db/s/query_analysis_writer.h b/src/mongo/db/s/query_analysis_writer.h index 55dcb520987..16261ca97b6 100644 --- a/src/mongo/db/s/query_analysis_writer.h +++ b/src/mongo/db/s/query_analysis_writer.h @@ -33,7 +33,7 @@ #include "mongo/db/service_context.h" #include "mongo/executor/task_executor.h" #include "mongo/s/analyze_shard_key_common_gen.h" -#include "mongo/s/analyze_shard_key_util.h" +#include "mongo/s/analyze_shard_key_role.h" #include "mongo/s/write_ops/batched_command_request.h" #include "mongo/util/periodic_runner.h" |