summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/query_analysis_writer.cpp
diff options
context:
space:
mode:
authorCheahuychou Mao <mao.cheahuychou@gmail.com>2023-01-20 21:10:39 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-01-20 22:42:30 +0000
commit4384d9dd340ae38f5ba02c75bc4da14471a89edc (patch)
treea1e81a1087487d542490871335a064310adf7ad4 /src/mongo/db/s/query_analysis_writer.cpp
parent39ff9fa6100370ca59f6ebea3342f0b1669023f0 (diff)
downloadmongo-4384d9dd340ae38f5ba02c75bc4da14471a89edc.tar.gz
SERVER-72852 Move the aggregate and write helpers for analyzeShardKey into a util file
Diffstat (limited to 'src/mongo/db/s/query_analysis_writer.cpp')
-rw-r--r--src/mongo/db/s/query_analysis_writer.cpp154
1 files changed, 6 insertions, 148 deletions
diff --git a/src/mongo/db/s/query_analysis_writer.cpp b/src/mongo/db/s/query_analysis_writer.cpp
index dfa341f18db..f1c28e3143e 100644
--- a/src/mongo/db/s/query_analysis_writer.cpp
+++ b/src/mongo/db/s/query_analysis_writer.cpp
@@ -30,17 +30,15 @@
#include "mongo/db/s/query_analysis_writer.h"
-#include "mongo/client/connpool.h"
#include "mongo/db/catalog/collection_catalog.h"
-#include "mongo/db/dbdirectclient.h"
#include "mongo/db/ops/write_ops.h"
-#include "mongo/db/server_options.h"
#include "mongo/db/update/document_diff_calculator.h"
#include "mongo/executor/network_interface_factory.h"
#include "mongo/executor/thread_pool_task_executor.h"
#include "mongo/logv2/log.h"
#include "mongo/s/analyze_shard_key_documents_gen.h"
#include "mongo/s/analyze_shard_key_server_parameters_gen.h"
+#include "mongo/s/analyze_shard_key_util.h"
#include "mongo/s/write_ops/batched_command_response.h"
#include "mongo/util/concurrency/thread_pool.h"
@@ -52,130 +50,9 @@ namespace analyze_shard_key {
namespace {
MONGO_FAIL_POINT_DEFINE(disableQueryAnalysisWriter);
-MONGO_FAIL_POINT_DEFINE(hangQueryAnalysisWriterBeforeWritingLocally);
-MONGO_FAIL_POINT_DEFINE(hangQueryAnalysisWriterBeforeWritingRemotely);
const auto getQueryAnalysisWriter = ServiceContext::declareDecoration<QueryAnalysisWriter>();
-constexpr int kMaxRetriesOnRetryableErrors = 5;
-const WriteConcernOptions kMajorityWriteConcern{WriteConcernOptions::kMajority,
- WriteConcernOptions::SyncMode::UNSET,
- WriteConcernOptions::kWriteConcernTimeoutSystem};
-
-// The size limit for the documents to an insert in a single batch. Leave some padding for other
-// fields in the insert command.
-constexpr int kMaxBSONObjSizeForInsert = BSONObjMaxUserSize - 500 * 1024;
-
-/*
- * Returns true if this mongod can accept writes to the given collection. Unless the collection is
- * in the "local" database, this will only return true if this mongod is a primary (or a
- * standalone).
- */
-bool canAcceptWrites(OperationContext* opCtx, const NamespaceString& ns) {
- ShouldNotConflictWithSecondaryBatchApplicationBlock noPBWMBlock(opCtx->lockState());
- Lock::DBLock lk(opCtx, ns.dbName(), MODE_IS);
- Lock::CollectionLock lock(opCtx, ns, MODE_IS);
- return mongo::repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase(opCtx,
- ns.db());
-}
-
-/*
- * Runs the given write command against the given database locally, asserts that the top-level
- * command is OK, then asserts the write status using the given 'uassertWriteStatusCb' callback.
- * Returns the command response.
- */
-BSONObj executeWriteCommandLocal(OperationContext* opCtx,
- const std::string dbName,
- const BSONObj& cmdObj,
- const std::function<void(const BSONObj&)>& uassertWriteStatusCb) {
- DBDirectClient client(opCtx);
- BSONObj resObj;
-
- if (!client.runCommand(dbName, cmdObj, resObj)) {
- uassertStatusOK(getStatusFromCommandResult(resObj));
- }
- uassertWriteStatusCb(resObj);
-
- return resObj;
-}
-
-/*
- * Runs the given write command against the given database on the (remote) primary, asserts that the
- * top-level command is OK, then asserts the write status using the given 'uassertWriteStatusCb'
- * callback. Throws a PrimarySteppedDown error if no primary is found. Returns the command response.
- */
-BSONObj executeWriteCommandRemote(OperationContext* opCtx,
- const std::string dbName,
- const BSONObj& cmdObj,
- const std::function<void(const BSONObj&)>& uassertWriteStatusCb) {
- auto hostAndPort = repl::ReplicationCoordinator::get(opCtx)->getCurrentPrimaryHostAndPort();
-
- if (hostAndPort.empty()) {
- uasserted(ErrorCodes::PrimarySteppedDown, "No primary exists currently");
- }
-
- auto conn = std::make_unique<ScopedDbConnection>(hostAndPort.toString());
-
- if (auth::isInternalAuthSet()) {
- uassertStatusOK(conn->get()->authenticateInternalUser());
- }
-
- DBClientBase* client = conn->get();
- ScopeGuard guard([&] { conn->done(); });
- try {
- BSONObj resObj;
-
- if (!client->runCommand(dbName, cmdObj, resObj)) {
- uassertStatusOK(getStatusFromCommandResult(resObj));
- }
- uassertWriteStatusCb(resObj);
-
- return resObj;
- } catch (...) {
- guard.dismiss();
- conn->kill();
- throw;
- }
-}
-
-/*
- * Runs the given write command against the given collection. If this mongod is currently the
- * primary, runs the write command locally. Otherwise, runs the command on the remote primary.
- * Internally asserts that the top-level command is OK, then asserts the write status using the
- * given 'uassertWriteStatusCb' callback. Internally retries the write command on retryable errors
- * (for kMaxRetriesOnRetryableErrors times) so the writes must be idempotent. Returns the
- * command response.
- */
-BSONObj executeWriteCommand(OperationContext* opCtx,
- const NamespaceString& ns,
- const BSONObj& cmdObj,
- const std::function<void(const BSONObj&)>& uassertWriteStatusCb) {
- const auto dbName = ns.db().toString();
- auto numRetries = 0;
-
- while (true) {
- try {
- if (canAcceptWrites(opCtx, ns)) {
- // There is a window here where this mongod may step down after check above. In this
- // case, a NotWritablePrimary error would be thrown. However, this is preferable to
- // running the command while holding locks.
- hangQueryAnalysisWriterBeforeWritingLocally.pauseWhileSet(opCtx);
- return executeWriteCommandLocal(opCtx, dbName, cmdObj, uassertWriteStatusCb);
- }
-
- hangQueryAnalysisWriterBeforeWritingRemotely.pauseWhileSet(opCtx);
- return executeWriteCommandRemote(opCtx, dbName, cmdObj, uassertWriteStatusCb);
- } catch (DBException& ex) {
- if (ErrorCodes::isRetriableError(ex) && numRetries < kMaxRetriesOnRetryableErrors) {
- numRetries++;
- continue;
- }
- throw;
- }
- }
-
- return {};
-}
struct SampledWriteCommandRequest {
UUID sampleId;
@@ -359,6 +236,8 @@ void QueryAnalysisWriter::_flush(OperationContext* opCtx,
}
});
+ LOGV2_DEBUG(6876101, 2, "Persisting sampled queries", "count"_attr = tmpBuffer.getCount());
+
// Insert the documents in batches from the back of the buffer so that we don't need to move the
// documents forward after each batch.
size_t baseIndex = tmpBuffer.getCount() - 1;
@@ -369,11 +248,10 @@ void QueryAnalysisWriter::_flush(OperationContext* opCtx,
long long objSize = 0;
size_t lastIndex = tmpBuffer.getCount(); // inclusive
- LOGV2_DEBUG(6876101, 2, "Persisting sampled queries", "buffer"_attr = tmpBuffer.getCount());
while (lastIndex > 0 && docsToInsert.size() < maxBatchSize) {
// Check if the next document can fit in the batch.
auto doc = tmpBuffer.at(lastIndex - 1);
- if (doc.objsize() + objSize >= kMaxBSONObjSizeForInsert) {
+ if (doc.objsize() + objSize >= kMaxBSONObjSizePerInsertBatch) {
break;
}
lastIndex--;
@@ -383,28 +261,8 @@ void QueryAnalysisWriter::_flush(OperationContext* opCtx,
// We don't add a document that is above the size limit to the buffer so we should have
// added at least one document to 'docsToInsert'.
invariant(!docsToInsert.empty());
- LOGV2_DEBUG(
- 6876102, 2, "Persisting sapmled queries", "docsToInsert"_attr = docsToInsert.size());
-
- write_ops::InsertCommandRequest insertCmd(ns);
- insertCmd.setDocuments(std::move(docsToInsert));
- insertCmd.setWriteCommandRequestBase([&] {
- write_ops::WriteCommandRequestBase wcb;
- wcb.setOrdered(false);
- wcb.setBypassDocumentValidation(false);
- return wcb;
- }());
- auto insertCmdBson = insertCmd.toBSON(
- {BSON(WriteConcernOptions::kWriteConcernField << kMajorityWriteConcern.toBSON())});
-
- executeWriteCommand(opCtx, ns, std::move(insertCmdBson), [&](const BSONObj& resObj) {
- BatchedCommandResponse response;
- std::string errMsg;
-
- if (!response.parseBSON(resObj, &errMsg)) {
- uasserted(ErrorCodes::FailedToParse, errMsg);
- }
+ insertDocuments(opCtx, ns, docsToInsert, [&](const BatchedCommandResponse& response) {
if (response.isErrDetailsSet() && response.sizeErrDetails() > 0) {
boost::optional<write_ops::WriteError> firstWriteErr;
@@ -439,7 +297,7 @@ void QueryAnalysisWriter::_flush(OperationContext* opCtx,
}
void QueryAnalysisWriter::Buffer::add(BSONObj doc) {
- if (doc.objsize() > kMaxBSONObjSizeForInsert) {
+ if (doc.objsize() > kMaxBSONObjSizePerInsertBatch) {
return;
}
_docs.push_back(std::move(doc));