diff options
Diffstat (limited to 'src/mongo/db/s/query_analysis_writer.cpp')
-rw-r--r-- | src/mongo/db/s/query_analysis_writer.cpp | 154 |
1 files changed, 6 insertions, 148 deletions
diff --git a/src/mongo/db/s/query_analysis_writer.cpp b/src/mongo/db/s/query_analysis_writer.cpp index dfa341f18db..f1c28e3143e 100644 --- a/src/mongo/db/s/query_analysis_writer.cpp +++ b/src/mongo/db/s/query_analysis_writer.cpp @@ -30,17 +30,15 @@ #include "mongo/db/s/query_analysis_writer.h" -#include "mongo/client/connpool.h" #include "mongo/db/catalog/collection_catalog.h" -#include "mongo/db/dbdirectclient.h" #include "mongo/db/ops/write_ops.h" -#include "mongo/db/server_options.h" #include "mongo/db/update/document_diff_calculator.h" #include "mongo/executor/network_interface_factory.h" #include "mongo/executor/thread_pool_task_executor.h" #include "mongo/logv2/log.h" #include "mongo/s/analyze_shard_key_documents_gen.h" #include "mongo/s/analyze_shard_key_server_parameters_gen.h" +#include "mongo/s/analyze_shard_key_util.h" #include "mongo/s/write_ops/batched_command_response.h" #include "mongo/util/concurrency/thread_pool.h" @@ -52,130 +50,9 @@ namespace analyze_shard_key { namespace { MONGO_FAIL_POINT_DEFINE(disableQueryAnalysisWriter); -MONGO_FAIL_POINT_DEFINE(hangQueryAnalysisWriterBeforeWritingLocally); -MONGO_FAIL_POINT_DEFINE(hangQueryAnalysisWriterBeforeWritingRemotely); const auto getQueryAnalysisWriter = ServiceContext::declareDecoration<QueryAnalysisWriter>(); -constexpr int kMaxRetriesOnRetryableErrors = 5; -const WriteConcernOptions kMajorityWriteConcern{WriteConcernOptions::kMajority, - WriteConcernOptions::SyncMode::UNSET, - WriteConcernOptions::kWriteConcernTimeoutSystem}; - -// The size limit for the documents to an insert in a single batch. Leave some padding for other -// fields in the insert command. -constexpr int kMaxBSONObjSizeForInsert = BSONObjMaxUserSize - 500 * 1024; - -/* - * Returns true if this mongod can accept writes to the given collection. Unless the collection is - * in the "local" database, this will only return true if this mongod is a primary (or a - * standalone). - */ -bool canAcceptWrites(OperationContext* opCtx, const NamespaceString& ns) { - ShouldNotConflictWithSecondaryBatchApplicationBlock noPBWMBlock(opCtx->lockState()); - Lock::DBLock lk(opCtx, ns.dbName(), MODE_IS); - Lock::CollectionLock lock(opCtx, ns, MODE_IS); - return mongo::repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase(opCtx, - ns.db()); -} - -/* - * Runs the given write command against the given database locally, asserts that the top-level - * command is OK, then asserts the write status using the given 'uassertWriteStatusCb' callback. - * Returns the command response. - */ -BSONObj executeWriteCommandLocal(OperationContext* opCtx, - const std::string dbName, - const BSONObj& cmdObj, - const std::function<void(const BSONObj&)>& uassertWriteStatusCb) { - DBDirectClient client(opCtx); - BSONObj resObj; - - if (!client.runCommand(dbName, cmdObj, resObj)) { - uassertStatusOK(getStatusFromCommandResult(resObj)); - } - uassertWriteStatusCb(resObj); - - return resObj; -} - -/* - * Runs the given write command against the given database on the (remote) primary, asserts that the - * top-level command is OK, then asserts the write status using the given 'uassertWriteStatusCb' - * callback. Throws a PrimarySteppedDown error if no primary is found. Returns the command response. - */ -BSONObj executeWriteCommandRemote(OperationContext* opCtx, - const std::string dbName, - const BSONObj& cmdObj, - const std::function<void(const BSONObj&)>& uassertWriteStatusCb) { - auto hostAndPort = repl::ReplicationCoordinator::get(opCtx)->getCurrentPrimaryHostAndPort(); - - if (hostAndPort.empty()) { - uasserted(ErrorCodes::PrimarySteppedDown, "No primary exists currently"); - } - - auto conn = std::make_unique<ScopedDbConnection>(hostAndPort.toString()); - - if (auth::isInternalAuthSet()) { - uassertStatusOK(conn->get()->authenticateInternalUser()); - } - - DBClientBase* client = conn->get(); - ScopeGuard guard([&] { conn->done(); }); - try { - BSONObj resObj; - - if (!client->runCommand(dbName, cmdObj, resObj)) { - uassertStatusOK(getStatusFromCommandResult(resObj)); - } - uassertWriteStatusCb(resObj); - - return resObj; - } catch (...) { - guard.dismiss(); - conn->kill(); - throw; - } -} - -/* - * Runs the given write command against the given collection. If this mongod is currently the - * primary, runs the write command locally. Otherwise, runs the command on the remote primary. - * Internally asserts that the top-level command is OK, then asserts the write status using the - * given 'uassertWriteStatusCb' callback. Internally retries the write command on retryable errors - * (for kMaxRetriesOnRetryableErrors times) so the writes must be idempotent. Returns the - * command response. - */ -BSONObj executeWriteCommand(OperationContext* opCtx, - const NamespaceString& ns, - const BSONObj& cmdObj, - const std::function<void(const BSONObj&)>& uassertWriteStatusCb) { - const auto dbName = ns.db().toString(); - auto numRetries = 0; - - while (true) { - try { - if (canAcceptWrites(opCtx, ns)) { - // There is a window here where this mongod may step down after check above. In this - // case, a NotWritablePrimary error would be thrown. However, this is preferable to - // running the command while holding locks. - hangQueryAnalysisWriterBeforeWritingLocally.pauseWhileSet(opCtx); - return executeWriteCommandLocal(opCtx, dbName, cmdObj, uassertWriteStatusCb); - } - - hangQueryAnalysisWriterBeforeWritingRemotely.pauseWhileSet(opCtx); - return executeWriteCommandRemote(opCtx, dbName, cmdObj, uassertWriteStatusCb); - } catch (DBException& ex) { - if (ErrorCodes::isRetriableError(ex) && numRetries < kMaxRetriesOnRetryableErrors) { - numRetries++; - continue; - } - throw; - } - } - - return {}; -} struct SampledWriteCommandRequest { UUID sampleId; @@ -359,6 +236,8 @@ void QueryAnalysisWriter::_flush(OperationContext* opCtx, } }); + LOGV2_DEBUG(6876101, 2, "Persisting sampled queries", "count"_attr = tmpBuffer.getCount()); + // Insert the documents in batches from the back of the buffer so that we don't need to move the // documents forward after each batch. size_t baseIndex = tmpBuffer.getCount() - 1; @@ -369,11 +248,10 @@ void QueryAnalysisWriter::_flush(OperationContext* opCtx, long long objSize = 0; size_t lastIndex = tmpBuffer.getCount(); // inclusive - LOGV2_DEBUG(6876101, 2, "Persisting sampled queries", "buffer"_attr = tmpBuffer.getCount()); while (lastIndex > 0 && docsToInsert.size() < maxBatchSize) { // Check if the next document can fit in the batch. auto doc = tmpBuffer.at(lastIndex - 1); - if (doc.objsize() + objSize >= kMaxBSONObjSizeForInsert) { + if (doc.objsize() + objSize >= kMaxBSONObjSizePerInsertBatch) { break; } lastIndex--; @@ -383,28 +261,8 @@ void QueryAnalysisWriter::_flush(OperationContext* opCtx, // We don't add a document that is above the size limit to the buffer so we should have // added at least one document to 'docsToInsert'. invariant(!docsToInsert.empty()); - LOGV2_DEBUG( - 6876102, 2, "Persisting sapmled queries", "docsToInsert"_attr = docsToInsert.size()); - - write_ops::InsertCommandRequest insertCmd(ns); - insertCmd.setDocuments(std::move(docsToInsert)); - insertCmd.setWriteCommandRequestBase([&] { - write_ops::WriteCommandRequestBase wcb; - wcb.setOrdered(false); - wcb.setBypassDocumentValidation(false); - return wcb; - }()); - auto insertCmdBson = insertCmd.toBSON( - {BSON(WriteConcernOptions::kWriteConcernField << kMajorityWriteConcern.toBSON())}); - - executeWriteCommand(opCtx, ns, std::move(insertCmdBson), [&](const BSONObj& resObj) { - BatchedCommandResponse response; - std::string errMsg; - - if (!response.parseBSON(resObj, &errMsg)) { - uasserted(ErrorCodes::FailedToParse, errMsg); - } + insertDocuments(opCtx, ns, docsToInsert, [&](const BatchedCommandResponse& response) { if (response.isErrDetailsSet() && response.sizeErrDetails() > 0) { boost::optional<write_ops::WriteError> firstWriteErr; @@ -439,7 +297,7 @@ void QueryAnalysisWriter::_flush(OperationContext* opCtx, } void QueryAnalysisWriter::Buffer::add(BSONObj doc) { - if (doc.objsize() > kMaxBSONObjSizeForInsert) { + if (doc.objsize() > kMaxBSONObjSizePerInsertBatch) { return; } _docs.push_back(std::move(doc)); |