summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjannaerin <golden.janna@gmail.com>2020-09-16 15:57:28 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-09-21 18:23:56 +0000
commit4a362919e211b8f88853796c936b4c9157d05b16 (patch)
treed9fe4394defe3ef818ee8b58d530961c5373c8d1
parent1ccff1fdee9edfd99109e1e508ecdddbe06c3bc4 (diff)
downloadmongo-4a362919e211b8f88853796c936b4c9157d05b16.tar.gz
SERVER-50957 Move anonymous functions which run writes to config docs in txns onto the ShardingCatalogManager
-rw-r--r--src/mongo/db/ops/write_ops_parsers.h3
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager.cpp137
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager.h45
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp193
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client_impl.cpp5
5 files changed, 247 insertions, 136 deletions
diff --git a/src/mongo/db/ops/write_ops_parsers.h b/src/mongo/db/ops/write_ops_parsers.h
index 937b5853912..8de2835d193 100644
--- a/src/mongo/db/ops/write_ops_parsers.h
+++ b/src/mongo/db/ops/write_ops_parsers.h
@@ -44,6 +44,9 @@ namespace write_ops {
// byte (zero terminator) = 7 bytes
constexpr int kWriteCommandBSONArrayPerElementOverheadBytes = 7;
+constexpr int kRetryableAndTxnBatchWriteBSONSizeOverhead =
+ kWriteCommandBSONArrayPerElementOverheadBytes * 2;
+
/**
* Parses the 'limit' property of a delete entry, which has inverted meaning from the 'multi'
* property of an update.
diff --git a/src/mongo/db/s/config/sharding_catalog_manager.cpp b/src/mongo/db/s/config/sharding_catalog_manager.cpp
index aba9afbc8fe..8ff5e20e6d2 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager.cpp
@@ -33,8 +33,10 @@
#include "mongo/db/s/config/sharding_catalog_manager.h"
+#include "mongo/db/auth/authorization_session_impl.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/s/balancer/type_migration.h"
+#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/catalog/config_server_version.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/catalog/type_chunk.h"
@@ -46,6 +48,9 @@
#include "mongo/s/catalog/type_tags.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
+#include "mongo/s/write_ops/batched_command_request.h"
+#include "mongo/s/write_ops/batched_command_response.h"
+#include "mongo/transport/service_entry_point.h"
namespace mongo {
namespace {
@@ -56,6 +61,71 @@ const WriteConcernOptions kNoWaitWriteConcern(1, WriteConcernOptions::SyncMode::
const auto getShardingCatalogManager =
ServiceContext::declareDecoration<boost::optional<ShardingCatalogManager>>();
+OpMsg runCommandInLocalTxn(OperationContext* opCtx,
+ StringData db,
+ bool startTransaction,
+ TxnNumber txnNumber,
+ BSONObj cmdObj) {
+ BSONObjBuilder bob(std::move(cmdObj));
+ if (startTransaction) {
+ bob.append("startTransaction", true);
+ }
+ bob.append("autocommit", false);
+ bob.append(OperationSessionInfo::kTxnNumberFieldName, txnNumber);
+
+ BSONObjBuilder lsidBuilder(bob.subobjStart("lsid"));
+ opCtx->getLogicalSessionId()->serialize(&bob);
+ lsidBuilder.doneFast();
+
+ return OpMsg::parseOwned(
+ opCtx->getServiceContext()
+ ->getServiceEntryPoint()
+ ->handleRequest(opCtx,
+ OpMsgRequest::fromDBAndBody(db.toString(), bob.obj()).serialize())
+ .get()
+ .response);
+}
+
+void runCommitOrAbortTxnForConfigDocument(OperationContext* opCtx,
+ TxnNumber txnNumber,
+ std::string cmdName) {
+ // Swap out the clients in order to get a fresh opCtx. Previous operations in this transaction
+ // that have been run on this opCtx would have set the timeout in the locker on the opCtx, but
+ // commit should not have a lock timeout.
+ auto newClient = getGlobalServiceContext()->makeClient("ShardingCatalogManager");
+ AlternativeClientRegion acr(newClient);
+ auto newOpCtx = cc().makeOperationContext();
+ AuthorizationSession::get(newOpCtx.get()->getClient())
+ ->grantInternalAuthorization(newOpCtx.get()->getClient());
+ newOpCtx.get()->setLogicalSessionId(opCtx->getLogicalSessionId().get());
+ newOpCtx.get()->setTxnNumber(txnNumber);
+
+ BSONObjBuilder bob;
+ bob.append(cmdName, true);
+ bob.append("autocommit", false);
+ bob.append(OperationSessionInfo::kTxnNumberFieldName, txnNumber);
+ bob.append(WriteConcernOptions::kWriteConcernField, WriteConcernOptions::Majority);
+
+ BSONObjBuilder lsidBuilder(bob.subobjStart("lsid"));
+ newOpCtx->getLogicalSessionId()->serialize(&bob);
+ lsidBuilder.doneFast();
+
+ const auto cmdObj = bob.obj();
+
+ const auto replyOpMsg =
+ OpMsg::parseOwned(newOpCtx->getServiceContext()
+ ->getServiceEntryPoint()
+ ->handleRequest(newOpCtx.get(),
+ OpMsgRequest::fromDBAndBody(
+ NamespaceString::kAdminDb.toString(), cmdObj)
+ .serialize())
+ .get()
+ .response);
+
+ uassertStatusOK(getStatusFromCommandResult(replyOpMsg.body));
+ uassertStatusOK(getWriteConcernStatusFromCommandResult(replyOpMsg.body));
+}
+
} // namespace
void ShardingCatalogManager::create(ServiceContext* serviceContext,
@@ -395,4 +465,71 @@ StatusWith<bool> ShardingCatalogManager::_isShardRequiredByZoneStillInUse(
return false;
}
+BSONObj ShardingCatalogManager::writeToConfigDocumentInTxn(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BatchedCommandRequest& request,
+ bool startTransaction,
+ TxnNumber txnNumber) {
+ invariant(nss.db() == NamespaceString::kConfigDb);
+ return runCommandInLocalTxn(opCtx, nss.db(), startTransaction, txnNumber, request.toBSON())
+ .body;
+}
+
+void ShardingCatalogManager::insertConfigDocumentsInTxn(OperationContext* opCtx,
+ const NamespaceString& nss,
+ std::vector<BSONObj> docs,
+ bool startTransaction,
+ TxnNumber txnNumber) {
+ invariant(nss.db() == NamespaceString::kConfigDb);
+
+ std::vector<BSONObj> workingBatch;
+ size_t workingBatchItemSize = 0;
+ int workingBatchDocSize = 0;
+
+ auto doBatchInsert = [&]() {
+ BatchedCommandRequest request([&] {
+ write_ops::Insert insertOp(nss);
+ insertOp.setDocuments(workingBatch);
+ return insertOp;
+ }());
+
+ uassertStatusOK(getStatusFromWriteCommandReply(
+ writeToConfigDocumentInTxn(opCtx, nss, request, startTransaction, txnNumber)));
+ };
+
+ while (!docs.empty()) {
+ BSONObj toAdd = docs.back();
+ docs.pop_back();
+
+ const int docSizePlusOverhead =
+ toAdd.objsize() + write_ops::kRetryableAndTxnBatchWriteBSONSizeOverhead;
+ // Check if pushing this object will exceed the batch size limit or the max object size
+ if ((workingBatchItemSize + 1 > write_ops::kMaxWriteBatchSize) ||
+ (workingBatchDocSize + docSizePlusOverhead > BSONObjMaxUserSize)) {
+ doBatchInsert();
+
+ workingBatch.clear();
+ workingBatchItemSize = 0;
+ workingBatchDocSize = 0;
+ }
+
+ workingBatch.push_back(toAdd);
+ ++workingBatchItemSize;
+ workingBatchDocSize += docSizePlusOverhead;
+ }
+
+ if (!workingBatch.empty())
+ doBatchInsert();
+}
+
+void ShardingCatalogManager::commitTxnForConfigDocument(OperationContext* opCtx,
+ TxnNumber txnNumber) {
+ runCommitOrAbortTxnForConfigDocument(opCtx, txnNumber, "commitTransaction");
+}
+
+void ShardingCatalogManager::abortTxnForConfigDocument(OperationContext* opCtx,
+ TxnNumber txnNumber) {
+ runCommitOrAbortTxnForConfigDocument(opCtx, txnNumber, "abortTransaction");
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/config/sharding_catalog_manager.h b/src/mongo/db/s/config/sharding_catalog_manager.h
index 16772155643..a6f97372025 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager.h
+++ b/src/mongo/db/s/config/sharding_catalog_manager.h
@@ -36,6 +36,7 @@
#include "mongo/executor/task_executor.h"
#include "mongo/platform/mutex.h"
#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/s/catalog/type_collection.h"
#include "mongo/s/catalog/type_database.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/client/shard.h"
@@ -182,6 +183,37 @@ public:
*/
Lock::ExclusiveLock lockZoneMutex(OperationContext* opCtx);
+ /**
+ * Runs the write 'request' on namespace 'nss' in a transaction with 'txnNumber'. Write must be
+ * on a collection in the config database.
+ */
+ BSONObj writeToConfigDocumentInTxn(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BatchedCommandRequest& request,
+ bool startTransaction,
+ TxnNumber txnNumber);
+
+ /**
+ * Inserts 'docs' to namespace 'nss' in a transaction with 'txnNumber'. Breaks into multiple
+ * batches if 'docs' is larger than the max batch size. Write must be on a collection in the
+ * config database.
+ */
+ void insertConfigDocumentsInTxn(OperationContext* opCtx,
+ const NamespaceString& nss,
+ std::vector<BSONObj> docs,
+ bool startTransaction,
+ TxnNumber txnNumber);
+
+ /**
+ * Runs commit for the transaction with 'txnNumber'.
+ */
+ void commitTxnForConfigDocument(OperationContext* opCtx, TxnNumber txnNumber);
+
+ /**
+ * Runs abort for the transaction with 'txnNumber'.
+ */
+ void abortTxnForConfigDocument(OperationContext* opCtx, TxnNumber txnNumber);
+
//
// Chunk Operations
//
@@ -340,6 +372,19 @@ public:
const ShardKeyPattern& newShardKey);
/**
+ * Runs a replacement update on config.collections for the collection entry for 'nss' in a
+ * transaction with 'txnNumber'. 'coll' is used as the replacement doc.
+ *
+ * Throws exception on errors.
+ */
+ void updateShardingCatalogEntryForCollectionInTxn(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const CollectionType& coll,
+ const bool upsert,
+ const bool startTransaction,
+ TxnNumber txnNumber);
+
+ /**
* Creates a ScopedLock on the collection name in _namespaceSerializer. This is to prevent
* timeouts waiting on the dist lock if multiple threads attempt to create or drop the same
* collection.
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp
index 23b6688bf9e..8bad0744665 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp
@@ -88,32 +88,6 @@ MONGO_FAIL_POINT_DEFINE(hangRefineCollectionShardKeyBeforeUpdatingChunks);
MONGO_FAIL_POINT_DEFINE(hangRefineCollectionShardKeyBeforeCommit);
namespace {
-
-OpMsg runCommandInLocalTxn(OperationContext* opCtx,
- StringData db,
- bool startTransaction,
- TxnNumber txnNumber,
- BSONObj cmdObj) {
- BSONObjBuilder bob(std::move(cmdObj));
- if (startTransaction) {
- bob.append("startTransaction", true);
- }
- bob.append("autocommit", false);
- bob.append(OperationSessionInfo::kTxnNumberFieldName, txnNumber);
-
- BSONObjBuilder lsidBuilder(bob.subobjStart("lsid"));
- opCtx->getLogicalSessionId()->serialize(&bob);
- lsidBuilder.doneFast();
-
- return OpMsg::parseOwned(
- opCtx->getServiceContext()
- ->getServiceEntryPoint()
- ->handleRequest(opCtx,
- OpMsgRequest::fromDBAndBody(db.toString(), bob.obj()).serialize())
- .get()
- .response);
-}
-
const ReadPreferenceSetting kConfigReadSelector(ReadPreference::Nearest, TagSet{});
static constexpr int kMaxNumStaleShardVersionRetries = 10;
const WriteConcernOptions kNoWaitWriteConcern(1, WriteConcernOptions::SyncMode::UNSET, Seconds(0));
@@ -163,16 +137,11 @@ boost::optional<UUID> checkCollectionOptions(OperationContext* opCtx,
return uassertStatusOK(UUID::parse(collectionInfo["uuid"]));
}
-Status updateConfigDocumentInTxn(OperationContext* opCtx,
- const NamespaceString& nss,
- const BSONObj& query,
- const BSONObj& update,
- bool upsert,
- bool useMultiUpdate,
- bool startTransaction,
- TxnNumber txnNumber) {
- invariant(nss.db() == NamespaceString::kConfigDb);
-
+BatchedCommandRequest buildUpdateOp(const NamespaceString& nss,
+ const BSONObj& query,
+ const BSONObj& update,
+ bool upsert,
+ bool useMultiUpdate) {
BatchedCommandRequest request([&] {
write_ops::Update updateOp(nss);
updateOp.setUpdates({[&] {
@@ -186,20 +155,14 @@ Status updateConfigDocumentInTxn(OperationContext* opCtx,
return updateOp;
}());
- return getStatusFromWriteCommandReply(
- runCommandInLocalTxn(opCtx, nss.db(), startTransaction, txnNumber, request.toBSON()).body);
+ return request;
}
-Status pipelineUpdateConfigDocumentInTxn(OperationContext* opCtx,
- const NamespaceString& nss,
- const BSONObj& query,
- const std::vector<BSONObj>& updates,
- bool upsert,
- bool useMultiUpdate,
- bool startTransaction,
- TxnNumber txnNumber) {
- invariant(nss.db() == NamespaceString::kConfigDb);
-
+BatchedCommandRequest buildPipelineUpdateOp(const NamespaceString& nss,
+ const BSONObj& query,
+ const std::vector<BSONObj>& updates,
+ bool upsert,
+ bool useMultiUpdate) {
BatchedCommandRequest request([&] {
write_ops::Update updateOp(nss);
updateOp.setUpdates({[&] {
@@ -213,68 +176,7 @@ Status pipelineUpdateConfigDocumentInTxn(OperationContext* opCtx,
return updateOp;
}());
- return getStatusFromWriteCommandReply(
- runCommandInLocalTxn(opCtx, nss.db(), startTransaction, txnNumber, request.toBSON()).body);
-}
-
-Status updateShardingCatalogEntryForCollectionInTxn(OperationContext* opCtx,
- const NamespaceString& nss,
- const CollectionType& coll,
- const bool upsert,
- const bool startTransaction,
- TxnNumber txnNumber) {
- fassert(51249, coll.validate());
-
- auto status = updateConfigDocumentInTxn(opCtx,
- CollectionType::ConfigNS,
- BSON(CollectionType::fullNs(nss.ns())),
- coll.toBSON(),
- upsert,
- false /* multi */,
- startTransaction,
- txnNumber);
- return status.withContext(str::stream() << "Collection metadata write failed");
-}
-
-Status commitTxnForConfigDocument(OperationContext* opCtx, TxnNumber txnNumber) {
- // Swap out the clients in order to get a fresh opCtx. Previous operations in this transaction
- // that have been run on this opCtx would have set the timeout in the locker on the opCtx, but
- // commit should not have a lock timeout.
- auto newClient = getGlobalServiceContext()->makeClient("commitRefineShardKey");
- AlternativeClientRegion acr(newClient);
- auto commitOpCtx = cc().makeOperationContext();
- AuthorizationSession::get(commitOpCtx.get()->getClient())
- ->grantInternalAuthorization(commitOpCtx.get()->getClient());
- commitOpCtx.get()->setLogicalSessionId(opCtx->getLogicalSessionId().get());
- commitOpCtx.get()->setTxnNumber(txnNumber);
-
- BSONObjBuilder bob;
- bob.append("commitTransaction", true);
- bob.append("autocommit", false);
- bob.append(OperationSessionInfo::kTxnNumberFieldName, txnNumber);
- bob.append(WriteConcernOptions::kWriteConcernField, WriteConcernOptions::Majority);
-
- BSONObjBuilder lsidBuilder(bob.subobjStart("lsid"));
- commitOpCtx->getLogicalSessionId()->serialize(&bob);
- lsidBuilder.doneFast();
-
- const auto cmdObj = bob.obj();
-
- const auto replyOpMsg =
- OpMsg::parseOwned(commitOpCtx->getServiceContext()
- ->getServiceEntryPoint()
- ->handleRequest(commitOpCtx.get(),
- OpMsgRequest::fromDBAndBody(
- NamespaceString::kAdminDb.toString(), cmdObj)
- .serialize())
- .get()
- .response);
-
- auto commandStatus = getStatusFromCommandResult(replyOpMsg.body);
- if (!commandStatus.isOK()) {
- return commandStatus;
- }
- return getWriteConcernStatusFromCommandResult(replyOpMsg.body);
+ return request;
}
void triggerFireAndForgetShardRefreshes(OperationContext* opCtx, const NamespaceString& nss) {
@@ -747,12 +649,8 @@ void ShardingCatalogManager::refineCollectionShardKey(OperationContext* opCtx,
->grantInternalAuthorization(asr.opCtx()->getClient());
TxnNumber txnNumber = 0;
- uassertStatusOK(updateShardingCatalogEntryForCollectionInTxn(asr.opCtx(),
- nss,
- collType,
- false /* upsert */,
- true /* startTransaction */,
- txnNumber));
+ updateShardingCatalogEntryForCollectionInTxn(
+ asr.opCtx(), nss, collType, false /* upsert */, true /* startTransaction */, txnNumber);
LOGV2(21933,
"refineCollectionShardKey updated collection entry for {namespace}: took "
@@ -774,14 +672,17 @@ void ShardingCatalogManager::refineCollectionShardKey(OperationContext* opCtx,
// to the newly-generated objectid, (ii) their bounds for each new field in the refined
// key to MinKey (except for the global max chunk where the max bounds are set to
// MaxKey), and unsetting (iii) their jumbo field.
- uassertStatusOK(pipelineUpdateConfigDocumentInTxn(asr.opCtx(),
- ChunkType::ConfigNS,
- BSON("ns" << nss.ns()),
- chunkUpdates,
- false, // upsert
- true, // useMultiUpdate
- false, // startTransaction
- txnNumber));
+ uassertStatusOK(getStatusFromWriteCommandReply(
+ writeToConfigDocumentInTxn(asr.opCtx(),
+ ChunkType::ConfigNS,
+ buildPipelineUpdateOp(ChunkType::ConfigNS,
+ BSON("ns" << nss.ns()),
+ chunkUpdates,
+ false, // upsert
+ true // useMultiUpdate
+ ),
+ false, // startTransaction
+ txnNumber)));
LOGV2(21935,
"refineCollectionShardKey: updated chunk entries for {namespace}: took "
@@ -795,14 +696,17 @@ void ShardingCatalogManager::refineCollectionShardKey(OperationContext* opCtx,
// Update all config.tags entries for the given namespace by setting their bounds for
// each new field in the refined key to MinKey (except for the global max tag where the
// max bounds are set to MaxKey).
- uassertStatusOK(pipelineUpdateConfigDocumentInTxn(asr.opCtx(),
- TagsType::ConfigNS,
- BSON("ns" << nss.ns()),
- tagUpdates,
- false, // upsert
- true, // useMultiUpdate
- false, // startTransaction
- txnNumber));
+ uassertStatusOK(getStatusFromWriteCommandReply(
+ writeToConfigDocumentInTxn(asr.opCtx(),
+ TagsType::ConfigNS,
+ buildPipelineUpdateOp(TagsType::ConfigNS,
+ BSON("ns" << nss.ns()),
+ tagUpdates,
+ false, // upsert
+ true // useMultiUpdate
+ ),
+ false, // startTransaction
+ txnNumber)));
LOGV2(21936,
@@ -819,7 +723,7 @@ void ShardingCatalogManager::refineCollectionShardKey(OperationContext* opCtx,
}
// Note this will wait for majority write concern.
- uassertStatusOK(commitTxnForConfigDocument(asr.opCtx(), txnNumber));
+ commitTxnForConfigDocument(asr.opCtx(), txnNumber);
}
ShardingLogging::get(opCtx)->logChange(opCtx,
@@ -843,4 +747,27 @@ void ShardingCatalogManager::refineCollectionShardKey(OperationContext* opCtx,
}
}
+void ShardingCatalogManager::updateShardingCatalogEntryForCollectionInTxn(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const CollectionType& coll,
+ const bool upsert,
+ const bool startTransaction,
+ TxnNumber txnNumber) {
+ fassert(51249, coll.validate());
+
+ auto status = getStatusFromCommandResult(
+ writeToConfigDocumentInTxn(opCtx,
+ CollectionType::ConfigNS,
+ buildUpdateOp(CollectionType::ConfigNS,
+ BSON(CollectionType::fullNs(nss.ns())),
+ coll.toBSON(),
+ upsert,
+ false /* multi */
+ ),
+ startTransaction,
+ txnNumber));
+ uassertStatusOKWithContext(status, "Collection metadata write failed");
+}
+
} // namespace mongo
diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp
index 334bb82504d..42b374565bd 100644
--- a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp
+++ b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp
@@ -94,8 +94,6 @@ const ReadPreferenceSetting kConfigPrimaryPreferredSelector(ReadPreference::Prim
const int kMaxReadRetry = 3;
const int kMaxWriteRetry = 3;
-const int kRetryableBatchWriteBSONSizeOverhead = kWriteCommandBSONArrayPerElementOverheadBytes * 2;
-
const NamespaceString kSettingsNamespace("config", "settings");
void toBatchError(const Status& status, BatchedCommandResponse* response) {
@@ -869,7 +867,8 @@ void ShardingCatalogClientImpl::insertConfigDocumentsAsRetryableWrite(
BSONObj toAdd = docs.back();
docs.pop_back();
- const int docSizePlusOverhead = toAdd.objsize() + kRetryableBatchWriteBSONSizeOverhead;
+ const int docSizePlusOverhead =
+ toAdd.objsize() + write_ops::kRetryableAndTxnBatchWriteBSONSizeOverhead;
// Check if pushing this object will exceed the batch size limit or the max object size
if ((workingBatchItemSize + 1 > write_ops::kMaxWriteBatchSize) ||
(workingBatchDocSize + docSizePlusOverhead > BSONObjMaxUserSize)) {