diff options
author | jannaerin <golden.janna@gmail.com> | 2020-09-16 15:57:28 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-09-21 18:23:56 +0000 |
commit | 4a362919e211b8f88853796c936b4c9157d05b16 (patch) | |
tree | d9fe4394defe3ef818ee8b58d530961c5373c8d1 | |
parent | 1ccff1fdee9edfd99109e1e508ecdddbe06c3bc4 (diff) | |
download | mongo-4a362919e211b8f88853796c936b4c9157d05b16.tar.gz |
SERVER-50957 Move anonymous functions which run writes to config docs in txns onto the ShardingCatalogManager
5 files changed, 247 insertions, 136 deletions
diff --git a/src/mongo/db/ops/write_ops_parsers.h b/src/mongo/db/ops/write_ops_parsers.h index 937b5853912..8de2835d193 100644 --- a/src/mongo/db/ops/write_ops_parsers.h +++ b/src/mongo/db/ops/write_ops_parsers.h @@ -44,6 +44,9 @@ namespace write_ops { // byte (zero terminator) = 7 bytes constexpr int kWriteCommandBSONArrayPerElementOverheadBytes = 7; +constexpr int kRetryableAndTxnBatchWriteBSONSizeOverhead = + kWriteCommandBSONArrayPerElementOverheadBytes * 2; + /** * Parses the 'limit' property of a delete entry, which has inverted meaning from the 'multi' * property of an update. diff --git a/src/mongo/db/s/config/sharding_catalog_manager.cpp b/src/mongo/db/s/config/sharding_catalog_manager.cpp index aba9afbc8fe..8ff5e20e6d2 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager.cpp @@ -33,8 +33,10 @@ #include "mongo/db/s/config/sharding_catalog_manager.h" +#include "mongo/db/auth/authorization_session_impl.h" #include "mongo/db/operation_context.h" #include "mongo/db/s/balancer/type_migration.h" +#include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/catalog/config_server_version.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/type_chunk.h" @@ -46,6 +48,9 @@ #include "mongo/s/catalog/type_tags.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" +#include "mongo/s/write_ops/batched_command_request.h" +#include "mongo/s/write_ops/batched_command_response.h" +#include "mongo/transport/service_entry_point.h" namespace mongo { namespace { @@ -56,6 +61,71 @@ const WriteConcernOptions kNoWaitWriteConcern(1, WriteConcernOptions::SyncMode:: const auto getShardingCatalogManager = ServiceContext::declareDecoration<boost::optional<ShardingCatalogManager>>(); +OpMsg runCommandInLocalTxn(OperationContext* opCtx, + StringData db, + bool startTransaction, + TxnNumber txnNumber, + BSONObj cmdObj) { + BSONObjBuilder bob(std::move(cmdObj)); + if (startTransaction) { + bob.append("startTransaction", true); + } + bob.append("autocommit", false); + bob.append(OperationSessionInfo::kTxnNumberFieldName, txnNumber); + + BSONObjBuilder lsidBuilder(bob.subobjStart("lsid")); + opCtx->getLogicalSessionId()->serialize(&bob); + lsidBuilder.doneFast(); + + return OpMsg::parseOwned( + opCtx->getServiceContext() + ->getServiceEntryPoint() + ->handleRequest(opCtx, + OpMsgRequest::fromDBAndBody(db.toString(), bob.obj()).serialize()) + .get() + .response); +} + +void runCommitOrAbortTxnForConfigDocument(OperationContext* opCtx, + TxnNumber txnNumber, + std::string cmdName) { + // Swap out the clients in order to get a fresh opCtx. Previous operations in this transaction + // that have been run on this opCtx would have set the timeout in the locker on the opCtx, but + // commit should not have a lock timeout. + auto newClient = getGlobalServiceContext()->makeClient("ShardingCatalogManager"); + AlternativeClientRegion acr(newClient); + auto newOpCtx = cc().makeOperationContext(); + AuthorizationSession::get(newOpCtx.get()->getClient()) + ->grantInternalAuthorization(newOpCtx.get()->getClient()); + newOpCtx.get()->setLogicalSessionId(opCtx->getLogicalSessionId().get()); + newOpCtx.get()->setTxnNumber(txnNumber); + + BSONObjBuilder bob; + bob.append(cmdName, true); + bob.append("autocommit", false); + bob.append(OperationSessionInfo::kTxnNumberFieldName, txnNumber); + bob.append(WriteConcernOptions::kWriteConcernField, WriteConcernOptions::Majority); + + BSONObjBuilder lsidBuilder(bob.subobjStart("lsid")); + newOpCtx->getLogicalSessionId()->serialize(&bob); + lsidBuilder.doneFast(); + + const auto cmdObj = bob.obj(); + + const auto replyOpMsg = + OpMsg::parseOwned(newOpCtx->getServiceContext() + ->getServiceEntryPoint() + ->handleRequest(newOpCtx.get(), + OpMsgRequest::fromDBAndBody( + NamespaceString::kAdminDb.toString(), cmdObj) + .serialize()) + .get() + .response); + + uassertStatusOK(getStatusFromCommandResult(replyOpMsg.body)); + uassertStatusOK(getWriteConcernStatusFromCommandResult(replyOpMsg.body)); +} + } // namespace void ShardingCatalogManager::create(ServiceContext* serviceContext, @@ -395,4 +465,71 @@ StatusWith<bool> ShardingCatalogManager::_isShardRequiredByZoneStillInUse( return false; } +BSONObj ShardingCatalogManager::writeToConfigDocumentInTxn(OperationContext* opCtx, + const NamespaceString& nss, + const BatchedCommandRequest& request, + bool startTransaction, + TxnNumber txnNumber) { + invariant(nss.db() == NamespaceString::kConfigDb); + return runCommandInLocalTxn(opCtx, nss.db(), startTransaction, txnNumber, request.toBSON()) + .body; +} + +void ShardingCatalogManager::insertConfigDocumentsInTxn(OperationContext* opCtx, + const NamespaceString& nss, + std::vector<BSONObj> docs, + bool startTransaction, + TxnNumber txnNumber) { + invariant(nss.db() == NamespaceString::kConfigDb); + + std::vector<BSONObj> workingBatch; + size_t workingBatchItemSize = 0; + int workingBatchDocSize = 0; + + auto doBatchInsert = [&]() { + BatchedCommandRequest request([&] { + write_ops::Insert insertOp(nss); + insertOp.setDocuments(workingBatch); + return insertOp; + }()); + + uassertStatusOK(getStatusFromWriteCommandReply( + writeToConfigDocumentInTxn(opCtx, nss, request, startTransaction, txnNumber))); + }; + + while (!docs.empty()) { + BSONObj toAdd = docs.back(); + docs.pop_back(); + + const int docSizePlusOverhead = + toAdd.objsize() + write_ops::kRetryableAndTxnBatchWriteBSONSizeOverhead; + // Check if pushing this object will exceed the batch size limit or the max object size + if ((workingBatchItemSize + 1 > write_ops::kMaxWriteBatchSize) || + (workingBatchDocSize + docSizePlusOverhead > BSONObjMaxUserSize)) { + doBatchInsert(); + + workingBatch.clear(); + workingBatchItemSize = 0; + workingBatchDocSize = 0; + } + + workingBatch.push_back(toAdd); + ++workingBatchItemSize; + workingBatchDocSize += docSizePlusOverhead; + } + + if (!workingBatch.empty()) + doBatchInsert(); +} + +void ShardingCatalogManager::commitTxnForConfigDocument(OperationContext* opCtx, + TxnNumber txnNumber) { + runCommitOrAbortTxnForConfigDocument(opCtx, txnNumber, "commitTransaction"); +} + +void ShardingCatalogManager::abortTxnForConfigDocument(OperationContext* opCtx, + TxnNumber txnNumber) { + runCommitOrAbortTxnForConfigDocument(opCtx, txnNumber, "abortTransaction"); +} + } // namespace mongo diff --git a/src/mongo/db/s/config/sharding_catalog_manager.h b/src/mongo/db/s/config/sharding_catalog_manager.h index 16772155643..a6f97372025 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager.h +++ b/src/mongo/db/s/config/sharding_catalog_manager.h @@ -36,6 +36,7 @@ #include "mongo/executor/task_executor.h" #include "mongo/platform/mutex.h" #include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog/type_database.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/client/shard.h" @@ -182,6 +183,37 @@ public: */ Lock::ExclusiveLock lockZoneMutex(OperationContext* opCtx); + /** + * Runs the write 'request' on namespace 'nss' in a transaction with 'txnNumber'. Write must be + * on a collection in the config database. + */ + BSONObj writeToConfigDocumentInTxn(OperationContext* opCtx, + const NamespaceString& nss, + const BatchedCommandRequest& request, + bool startTransaction, + TxnNumber txnNumber); + + /** + * Inserts 'docs' to namespace 'nss' in a transaction with 'txnNumber'. Breaks into multiple + * batches if 'docs' is larger than the max batch size. Write must be on a collection in the + * config database. + */ + void insertConfigDocumentsInTxn(OperationContext* opCtx, + const NamespaceString& nss, + std::vector<BSONObj> docs, + bool startTransaction, + TxnNumber txnNumber); + + /** + * Runs commit for the transaction with 'txnNumber'. + */ + void commitTxnForConfigDocument(OperationContext* opCtx, TxnNumber txnNumber); + + /** + * Runs abort for the transaction with 'txnNumber'. + */ + void abortTxnForConfigDocument(OperationContext* opCtx, TxnNumber txnNumber); + // // Chunk Operations // @@ -340,6 +372,19 @@ public: const ShardKeyPattern& newShardKey); /** + * Runs a replacement update on config.collections for the collection entry for 'nss' in a + * transaction with 'txnNumber'. 'coll' is used as the replacement doc. + * + * Throws exception on errors. + */ + void updateShardingCatalogEntryForCollectionInTxn(OperationContext* opCtx, + const NamespaceString& nss, + const CollectionType& coll, + const bool upsert, + const bool startTransaction, + TxnNumber txnNumber); + + /** * Creates a ScopedLock on the collection name in _namespaceSerializer. This is to prevent * timeouts waiting on the dist lock if multiple threads attempt to create or drop the same * collection. diff --git a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp index 23b6688bf9e..8bad0744665 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp @@ -88,32 +88,6 @@ MONGO_FAIL_POINT_DEFINE(hangRefineCollectionShardKeyBeforeUpdatingChunks); MONGO_FAIL_POINT_DEFINE(hangRefineCollectionShardKeyBeforeCommit); namespace { - -OpMsg runCommandInLocalTxn(OperationContext* opCtx, - StringData db, - bool startTransaction, - TxnNumber txnNumber, - BSONObj cmdObj) { - BSONObjBuilder bob(std::move(cmdObj)); - if (startTransaction) { - bob.append("startTransaction", true); - } - bob.append("autocommit", false); - bob.append(OperationSessionInfo::kTxnNumberFieldName, txnNumber); - - BSONObjBuilder lsidBuilder(bob.subobjStart("lsid")); - opCtx->getLogicalSessionId()->serialize(&bob); - lsidBuilder.doneFast(); - - return OpMsg::parseOwned( - opCtx->getServiceContext() - ->getServiceEntryPoint() - ->handleRequest(opCtx, - OpMsgRequest::fromDBAndBody(db.toString(), bob.obj()).serialize()) - .get() - .response); -} - const ReadPreferenceSetting kConfigReadSelector(ReadPreference::Nearest, TagSet{}); static constexpr int kMaxNumStaleShardVersionRetries = 10; const WriteConcernOptions kNoWaitWriteConcern(1, WriteConcernOptions::SyncMode::UNSET, Seconds(0)); @@ -163,16 +137,11 @@ boost::optional<UUID> checkCollectionOptions(OperationContext* opCtx, return uassertStatusOK(UUID::parse(collectionInfo["uuid"])); } -Status updateConfigDocumentInTxn(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& query, - const BSONObj& update, - bool upsert, - bool useMultiUpdate, - bool startTransaction, - TxnNumber txnNumber) { - invariant(nss.db() == NamespaceString::kConfigDb); - +BatchedCommandRequest buildUpdateOp(const NamespaceString& nss, + const BSONObj& query, + const BSONObj& update, + bool upsert, + bool useMultiUpdate) { BatchedCommandRequest request([&] { write_ops::Update updateOp(nss); updateOp.setUpdates({[&] { @@ -186,20 +155,14 @@ Status updateConfigDocumentInTxn(OperationContext* opCtx, return updateOp; }()); - return getStatusFromWriteCommandReply( - runCommandInLocalTxn(opCtx, nss.db(), startTransaction, txnNumber, request.toBSON()).body); + return request; } -Status pipelineUpdateConfigDocumentInTxn(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& query, - const std::vector<BSONObj>& updates, - bool upsert, - bool useMultiUpdate, - bool startTransaction, - TxnNumber txnNumber) { - invariant(nss.db() == NamespaceString::kConfigDb); - +BatchedCommandRequest buildPipelineUpdateOp(const NamespaceString& nss, + const BSONObj& query, + const std::vector<BSONObj>& updates, + bool upsert, + bool useMultiUpdate) { BatchedCommandRequest request([&] { write_ops::Update updateOp(nss); updateOp.setUpdates({[&] { @@ -213,68 +176,7 @@ Status pipelineUpdateConfigDocumentInTxn(OperationContext* opCtx, return updateOp; }()); - return getStatusFromWriteCommandReply( - runCommandInLocalTxn(opCtx, nss.db(), startTransaction, txnNumber, request.toBSON()).body); -} - -Status updateShardingCatalogEntryForCollectionInTxn(OperationContext* opCtx, - const NamespaceString& nss, - const CollectionType& coll, - const bool upsert, - const bool startTransaction, - TxnNumber txnNumber) { - fassert(51249, coll.validate()); - - auto status = updateConfigDocumentInTxn(opCtx, - CollectionType::ConfigNS, - BSON(CollectionType::fullNs(nss.ns())), - coll.toBSON(), - upsert, - false /* multi */, - startTransaction, - txnNumber); - return status.withContext(str::stream() << "Collection metadata write failed"); -} - -Status commitTxnForConfigDocument(OperationContext* opCtx, TxnNumber txnNumber) { - // Swap out the clients in order to get a fresh opCtx. Previous operations in this transaction - // that have been run on this opCtx would have set the timeout in the locker on the opCtx, but - // commit should not have a lock timeout. - auto newClient = getGlobalServiceContext()->makeClient("commitRefineShardKey"); - AlternativeClientRegion acr(newClient); - auto commitOpCtx = cc().makeOperationContext(); - AuthorizationSession::get(commitOpCtx.get()->getClient()) - ->grantInternalAuthorization(commitOpCtx.get()->getClient()); - commitOpCtx.get()->setLogicalSessionId(opCtx->getLogicalSessionId().get()); - commitOpCtx.get()->setTxnNumber(txnNumber); - - BSONObjBuilder bob; - bob.append("commitTransaction", true); - bob.append("autocommit", false); - bob.append(OperationSessionInfo::kTxnNumberFieldName, txnNumber); - bob.append(WriteConcernOptions::kWriteConcernField, WriteConcernOptions::Majority); - - BSONObjBuilder lsidBuilder(bob.subobjStart("lsid")); - commitOpCtx->getLogicalSessionId()->serialize(&bob); - lsidBuilder.doneFast(); - - const auto cmdObj = bob.obj(); - - const auto replyOpMsg = - OpMsg::parseOwned(commitOpCtx->getServiceContext() - ->getServiceEntryPoint() - ->handleRequest(commitOpCtx.get(), - OpMsgRequest::fromDBAndBody( - NamespaceString::kAdminDb.toString(), cmdObj) - .serialize()) - .get() - .response); - - auto commandStatus = getStatusFromCommandResult(replyOpMsg.body); - if (!commandStatus.isOK()) { - return commandStatus; - } - return getWriteConcernStatusFromCommandResult(replyOpMsg.body); + return request; } void triggerFireAndForgetShardRefreshes(OperationContext* opCtx, const NamespaceString& nss) { @@ -747,12 +649,8 @@ void ShardingCatalogManager::refineCollectionShardKey(OperationContext* opCtx, ->grantInternalAuthorization(asr.opCtx()->getClient()); TxnNumber txnNumber = 0; - uassertStatusOK(updateShardingCatalogEntryForCollectionInTxn(asr.opCtx(), - nss, - collType, - false /* upsert */, - true /* startTransaction */, - txnNumber)); + updateShardingCatalogEntryForCollectionInTxn( + asr.opCtx(), nss, collType, false /* upsert */, true /* startTransaction */, txnNumber); LOGV2(21933, "refineCollectionShardKey updated collection entry for {namespace}: took " @@ -774,14 +672,17 @@ void ShardingCatalogManager::refineCollectionShardKey(OperationContext* opCtx, // to the newly-generated objectid, (ii) their bounds for each new field in the refined // key to MinKey (except for the global max chunk where the max bounds are set to // MaxKey), and unsetting (iii) their jumbo field. - uassertStatusOK(pipelineUpdateConfigDocumentInTxn(asr.opCtx(), - ChunkType::ConfigNS, - BSON("ns" << nss.ns()), - chunkUpdates, - false, // upsert - true, // useMultiUpdate - false, // startTransaction - txnNumber)); + uassertStatusOK(getStatusFromWriteCommandReply( + writeToConfigDocumentInTxn(asr.opCtx(), + ChunkType::ConfigNS, + buildPipelineUpdateOp(ChunkType::ConfigNS, + BSON("ns" << nss.ns()), + chunkUpdates, + false, // upsert + true // useMultiUpdate + ), + false, // startTransaction + txnNumber))); LOGV2(21935, "refineCollectionShardKey: updated chunk entries for {namespace}: took " @@ -795,14 +696,17 @@ void ShardingCatalogManager::refineCollectionShardKey(OperationContext* opCtx, // Update all config.tags entries for the given namespace by setting their bounds for // each new field in the refined key to MinKey (except for the global max tag where the // max bounds are set to MaxKey). - uassertStatusOK(pipelineUpdateConfigDocumentInTxn(asr.opCtx(), - TagsType::ConfigNS, - BSON("ns" << nss.ns()), - tagUpdates, - false, // upsert - true, // useMultiUpdate - false, // startTransaction - txnNumber)); + uassertStatusOK(getStatusFromWriteCommandReply( + writeToConfigDocumentInTxn(asr.opCtx(), + TagsType::ConfigNS, + buildPipelineUpdateOp(TagsType::ConfigNS, + BSON("ns" << nss.ns()), + tagUpdates, + false, // upsert + true // useMultiUpdate + ), + false, // startTransaction + txnNumber))); LOGV2(21936, @@ -819,7 +723,7 @@ void ShardingCatalogManager::refineCollectionShardKey(OperationContext* opCtx, } // Note this will wait for majority write concern. - uassertStatusOK(commitTxnForConfigDocument(asr.opCtx(), txnNumber)); + commitTxnForConfigDocument(asr.opCtx(), txnNumber); } ShardingLogging::get(opCtx)->logChange(opCtx, @@ -843,4 +747,27 @@ void ShardingCatalogManager::refineCollectionShardKey(OperationContext* opCtx, } } +void ShardingCatalogManager::updateShardingCatalogEntryForCollectionInTxn( + OperationContext* opCtx, + const NamespaceString& nss, + const CollectionType& coll, + const bool upsert, + const bool startTransaction, + TxnNumber txnNumber) { + fassert(51249, coll.validate()); + + auto status = getStatusFromCommandResult( + writeToConfigDocumentInTxn(opCtx, + CollectionType::ConfigNS, + buildUpdateOp(CollectionType::ConfigNS, + BSON(CollectionType::fullNs(nss.ns())), + coll.toBSON(), + upsert, + false /* multi */ + ), + startTransaction, + txnNumber)); + uassertStatusOKWithContext(status, "Collection metadata write failed"); +} + } // namespace mongo diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp index 334bb82504d..42b374565bd 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp +++ b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp @@ -94,8 +94,6 @@ const ReadPreferenceSetting kConfigPrimaryPreferredSelector(ReadPreference::Prim const int kMaxReadRetry = 3; const int kMaxWriteRetry = 3; -const int kRetryableBatchWriteBSONSizeOverhead = kWriteCommandBSONArrayPerElementOverheadBytes * 2; - const NamespaceString kSettingsNamespace("config", "settings"); void toBatchError(const Status& status, BatchedCommandResponse* response) { @@ -869,7 +867,8 @@ void ShardingCatalogClientImpl::insertConfigDocumentsAsRetryableWrite( BSONObj toAdd = docs.back(); docs.pop_back(); - const int docSizePlusOverhead = toAdd.objsize() + kRetryableBatchWriteBSONSizeOverhead; + const int docSizePlusOverhead = + toAdd.objsize() + write_ops::kRetryableAndTxnBatchWriteBSONSizeOverhead; // Check if pushing this object will exceed the batch size limit or the max object size if ((workingBatchItemSize + 1 > write_ops::kMaxWriteBatchSize) || (workingBatchDocSize + docSizePlusOverhead > BSONObjMaxUserSize)) { |