diff options
author | Blake Oler <blake.oler@mongodb.com> | 2019-04-09 18:08:53 -0400 |
---|---|---|
committer | Blake Oler <blake.oler@mongodb.com> | 2019-04-16 15:57:30 -0400 |
commit | 29ef1a415c74c883746325f13a8eaaa1831f8102 (patch) | |
tree | 13ce885258c58747b604a226215bdaca535361a5 /src/mongo | |
parent | e984f9781d2947e3b1fc10ae8535d630c49b5e94 (diff) | |
download | mongo-29ef1a415c74c883746325f13a8eaaa1831f8102.tar.gz |
SERVER-40346 Use AlternativeSessionRegion to insert config documents as retryable write
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/ops/write_ops_parsers.h | 6 | ||||
-rw-r--r-- | src/mongo/db/s/config/initial_split_policy.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/s/config/initial_split_policy.h | 10 | ||||
-rw-r--r-- | src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp | 13 | ||||
-rw-r--r-- | src/mongo/db/s/shardsvr_shard_collection.cpp | 18 | ||||
-rw-r--r-- | src/mongo/s/catalog/SConscript | 3 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_client.h | 13 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_client_impl.cpp | 106 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_client_impl.h | 5 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_client_mock.cpp | 6 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_client_mock.h | 5 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_write_op.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_write_op.h | 5 |
13 files changed, 179 insertions, 26 deletions
diff --git a/src/mongo/db/ops/write_ops_parsers.h b/src/mongo/db/ops/write_ops_parsers.h index 77ccb46eded..f116b79b63e 100644 --- a/src/mongo/db/ops/write_ops_parsers.h +++ b/src/mongo/db/ops/write_ops_parsers.h @@ -40,7 +40,7 @@ namespace write_ops { // Conservative per array element overhead. This value was calculated as 1 byte (element type) + 5 // bytes (max string encoding of the array index encoded as string and the maximum key is 99999) + 1 // byte (zero terminator) = 7 bytes -constexpr int kBSONArrayPerElementOverheadBytes = 7; +constexpr int kWriteCommandBSONArrayPerElementOverheadBytes = 7; /** * Parses the 'limit' property of a delete entry, which has inverted meaning from the 'multi' @@ -90,10 +90,10 @@ public: int size = 0; std::for_each(_pipeline->begin(), _pipeline->end(), [&size](const BSONObj& obj) { - size += obj.objsize() + kBSONArrayPerElementOverheadBytes; + size += obj.objsize() + kWriteCommandBSONArrayPerElementOverheadBytes; }); - return size + kBSONArrayPerElementOverheadBytes; + return size + kWriteCommandBSONArrayPerElementOverheadBytes; } Type type() const { diff --git a/src/mongo/db/s/config/initial_split_policy.cpp b/src/mongo/db/s/config/initial_split_policy.cpp index 61baebf030f..bd8ec905907 100644 --- a/src/mongo/db/s/config/initial_split_policy.cpp +++ b/src/mongo/db/s/config/initial_split_policy.cpp @@ -382,17 +382,6 @@ InitialSplitPolicy::ShardCollectionConfig InitialSplitPolicy::createFirstChunks( return initialChunks; } -void InitialSplitPolicy::writeFirstChunksToConfig( - OperationContext* opCtx, const InitialSplitPolicy::ShardCollectionConfig& initialChunks) { - for (const auto& chunk : initialChunks.chunks) { - uassertStatusOK(Grid::get(opCtx)->catalogClient()->insertConfigDocument( - opCtx, - ChunkType::ConfigNS, - chunk.toConfigBSON(), - ShardingCatalogClient::kMajorityWriteConcern)); - } -} - boost::optional<CollectionType> InitialSplitPolicy::checkIfCollectionAlreadyShardedWithSameOptions( OperationContext* opCtx, const NamespaceString& nss, diff --git a/src/mongo/db/s/config/initial_split_policy.h b/src/mongo/db/s/config/initial_split_policy.h index ab44a85216c..feeb2ca2902 100644 --- a/src/mongo/db/s/config/initial_split_policy.h +++ b/src/mongo/db/s/config/initial_split_policy.h @@ -136,16 +136,10 @@ public: int numContiguousChunksPerShard = 1); /** - * Writes to the config server the first chunks for a newly sharded collection. - */ - static void writeFirstChunksToConfig( - OperationContext* opCtx, const InitialSplitPolicy::ShardCollectionConfig& initialChunks); - - /** * Throws an exception if the collection is already sharded with different options. * - * If the collection is already sharded with the same options, returns the existing collection's - * full spec, else returns boost::none. + * If the collection is already sharded with the same options, returns the existing + * collection's full spec, else returns boost::none. */ static boost::optional<CollectionType> checkIfCollectionAlreadyShardedWithSameOptions( OperationContext* opCtx, diff --git a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp index ddc0b54f29a..b1220cee3fc 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp @@ -128,6 +128,17 @@ boost::optional<UUID> checkCollectionOptions(OperationContext* opCtx, return uassertStatusOK(UUID::parse(collectionInfo["uuid"])); } +void writeFirstChunksForShardCollection( + OperationContext* opCtx, const InitialSplitPolicy::ShardCollectionConfig& initialChunks) { + for (const auto& chunk : initialChunks.chunks) { + uassertStatusOK(Grid::get(opCtx)->catalogClient()->insertConfigDocument( + opCtx, + ChunkType::ConfigNS, + chunk.toConfigBSON(), + ShardingCatalogClient::kMajorityWriteConcern)); + } +} + } // namespace void checkForExistingChunks(OperationContext* opCtx, const NamespaceString& nss) { @@ -414,7 +425,7 @@ void ShardingCatalogManager::shardCollection(OperationContext* opCtx, treatAsNoZonesDefined, treatAsEmpty); - InitialSplitPolicy::writeFirstChunksToConfig(opCtx, initialChunks); + writeFirstChunksForShardCollection(opCtx, initialChunks); { CollectionType coll; diff --git a/src/mongo/db/s/shardsvr_shard_collection.cpp b/src/mongo/db/s/shardsvr_shard_collection.cpp index 48b48dc7532..f8d4877b58c 100644 --- a/src/mongo/db/s/shardsvr_shard_collection.cpp +++ b/src/mongo/db/s/shardsvr_shard_collection.cpp @@ -415,6 +415,22 @@ void checkForExistingChunks(OperationContext* opCtx, const NamespaceString& nss) numChunks == 0); } +void writeFirstChunksToConfig(OperationContext* opCtx, + const InitialSplitPolicy::ShardCollectionConfig& initialChunks) { + + std::vector<BSONObj> chunkObjs; + chunkObjs.reserve(initialChunks.chunks.size()); + for (const auto& chunk : initialChunks.chunks) { + chunkObjs.push_back(chunk.toConfigBSON()); + } + + Grid::get(opCtx)->catalogClient()->insertConfigDocumentsAsRetryableWrite( + opCtx, + ChunkType::ConfigNS, + std::move(chunkObjs), + ShardingCatalogClient::kMajorityWriteConcern); +} + void shardCollection(OperationContext* opCtx, const NamespaceString& nss, const boost::optional<UUID> uuid, @@ -522,7 +538,7 @@ void shardCollection(OperationContext* opCtx, } // Insert chunk documents to config.chunks on the config server. - InitialSplitPolicy::writeFirstChunksToConfig(opCtx, initialChunks); + writeFirstChunksToConfig(opCtx, initialChunks); { CollectionType coll; diff --git a/src/mongo/s/catalog/SConscript b/src/mongo/s/catalog/SConscript index 72d095847c9..f4618bcd300 100644 --- a/src/mongo/s/catalog/SConscript +++ b/src/mongo/s/catalog/SConscript @@ -84,6 +84,9 @@ env.Library( 'dist_lock_manager', 'sharding_catalog_client', ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/logical_session_id_helpers' + ] ) env.Library( diff --git a/src/mongo/s/catalog/sharding_catalog_client.h b/src/mongo/s/catalog/sharding_catalog_client.h index cb16b5d26f9..febbf82fe06 100644 --- a/src/mongo/s/catalog/sharding_catalog_client.h +++ b/src/mongo/s/catalog/sharding_catalog_client.h @@ -322,6 +322,19 @@ public: const WriteConcernOptions& writeConcern) = 0; /** + * Directly inserts documents in the specified namespace on the config server. Inserts said + * documents using a retryable write. Underneath, a session is created and destroyed -- this + * ad-hoc session creation strategy should never be used outside of specific, non-performant + * code paths. + * + * Must only be used for insertions in the 'config' database. + */ + virtual void insertConfigDocumentsAsRetryableWrite(OperationContext* opCtx, + const NamespaceString& nss, + std::vector<BSONObj> docs, + const WriteConcernOptions& writeConcern) = 0; + + /** * Updates a single document in the specified namespace on the config server. The document must * have an _id index. Must only be used for updates to the 'config' database. * diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp index 063069922b0..b64f17d968c 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp +++ b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp @@ -42,6 +42,8 @@ #include "mongo/client/remote_command_targeter.h" #include "mongo/db/client.h" #include "mongo/db/commands.h" +#include "mongo/db/logical_session_cache.h" +#include "mongo/db/logical_session_id_helpers.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/optime.h" @@ -64,6 +66,7 @@ #include "mongo/s/request_types/set_shard_version_request.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/s/shard_util.h" +#include "mongo/s/write_ops/batch_write_op.h" #include "mongo/s/write_ops/batched_command_request.h" #include "mongo/s/write_ops/batched_command_response.h" #include "mongo/util/assert_util.h" @@ -87,12 +90,39 @@ using str::stream; namespace { +class AlternativeSessionRegion { +public: + AlternativeSessionRegion(OperationContext* opCtx) + : _alternateClient(opCtx->getServiceContext()->makeClient("alternative-session-region")), + _acr(_alternateClient), + _newOpCtx(cc().makeOperationContext()), + _lsid(makeLogicalSessionId(opCtx)) { + _newOpCtx->setLogicalSessionId(_lsid); + } + + ~AlternativeSessionRegion() { + LogicalSessionCache::get(opCtx())->endSessions({_lsid}); + } + + OperationContext* opCtx() { + return &*_newOpCtx; + } + +private: + ServiceContext::UniqueClient _alternateClient; + AlternativeClientRegion _acr; + ServiceContext::UniqueOperationContext _newOpCtx; + LogicalSessionId _lsid; +}; + const ReadPreferenceSetting kConfigReadSelector(ReadPreference::Nearest, TagSet{}); const ReadPreferenceSetting kConfigPrimaryPreferredSelector(ReadPreference::PrimaryPreferred, TagSet{}); const int kMaxReadRetry = 3; const int kMaxWriteRetry = 3; +const int kRetryableBatchWriteBSONSizeOverhead = kWriteCommandBSONArrayPerElementOverheadBytes * 2; + const NamespaceString kSettingsNamespace("config", "settings"); void toBatchError(const Status& status, BatchedCommandResponse* response) { @@ -100,6 +130,38 @@ void toBatchError(const Status& status, BatchedCommandResponse* response) { response->setStatus(status); } +void sendRetryableWriteBatchRequestToConfig(OperationContext* opCtx, + const NamespaceString& nss, + std::vector<BSONObj>& docs, + TxnNumber txnNumber, + const WriteConcernOptions& writeConcern) { + auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); + + BatchedCommandRequest request([&] { + write_ops::Insert insertOp(nss); + insertOp.setDocuments(docs); + return insertOp; + }()); + request.setWriteConcern(writeConcern.toBSON()); + + BSONObj cmdObj = request.toBSON(); + BSONObjBuilder bob(cmdObj); + bob.append(OperationSessionInfo::kTxnNumberFieldName, txnNumber); + + BatchedCommandResponse batchResponse; + auto response = configShard->runCommand(opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + nss.db().toString(), + bob.obj(), + Shard::kDefaultConfigCommandTimeout, + Shard::RetryPolicy::kIdempotent); + + auto writeStatus = Shard::CommandResponse::processBatchWriteResponse(response, &batchResponse); + + uassertStatusOK(batchResponse.toStatus()); + uassertStatusOK(writeStatus); +} + } // namespace ShardingCatalogClientImpl::ShardingCatalogClientImpl( @@ -813,6 +875,50 @@ Status ShardingCatalogClientImpl::insertConfigDocument(OperationContext* opCtx, MONGO_UNREACHABLE; } +void ShardingCatalogClientImpl::insertConfigDocumentsAsRetryableWrite( + OperationContext* opCtx, + const NamespaceString& nss, + std::vector<BSONObj> docs, + const WriteConcernOptions& writeConcern) { + invariant(nss.db() == NamespaceString::kAdminDb || nss.db() == NamespaceString::kConfigDb); + + AlternativeSessionRegion asr(opCtx); + TxnNumber currentTxnNumber = 0; + + std::vector<BSONObj> workingBatch; + size_t workingBatchItemSize = 0; + + int workingBatchDocSize = kRetryableBatchWriteBSONSizeOverhead; + + while (!docs.empty()) { + BSONObj toAdd = docs.back(); + docs.pop_back(); + + int docSize = toAdd.objsize(); + bool batchAtSizeLimit = (workingBatchItemSize + 1 > write_ops::kMaxWriteBatchSize) || + (workingBatchDocSize + docSize > BSONObjMaxUserSize); + + if (batchAtSizeLimit) { + sendRetryableWriteBatchRequestToConfig( + asr.opCtx(), nss, workingBatch, currentTxnNumber, writeConcern); + ++currentTxnNumber; + + workingBatch.clear(); + workingBatchItemSize = 0; + workingBatchDocSize = kRetryableBatchWriteBSONSizeOverhead; + } + + workingBatch.push_back(toAdd); + ++workingBatchItemSize; + workingBatchDocSize += docSize; + } + + if (!workingBatch.empty()) { + sendRetryableWriteBatchRequestToConfig( + asr.opCtx(), nss, workingBatch, currentTxnNumber, writeConcern); + } +} + StatusWith<bool> ShardingCatalogClientImpl::updateConfigDocument( OperationContext* opCtx, const NamespaceString& nss, diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.h b/src/mongo/s/catalog/sharding_catalog_client_impl.h index 3af675402fd..ab4e9506594 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_impl.h +++ b/src/mongo/s/catalog/sharding_catalog_client_impl.h @@ -143,6 +143,11 @@ public: const BSONObj& doc, const WriteConcernOptions& writeConcern) override; + void insertConfigDocumentsAsRetryableWrite(OperationContext* opCtx, + const NamespaceString& nss, + std::vector<BSONObj> docs, + const WriteConcernOptions& writeConcern) override; + StatusWith<bool> updateConfigDocument(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& query, diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp index 3c4bca683ce..abb2cd48a25 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp +++ b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp @@ -163,6 +163,12 @@ Status ShardingCatalogClientMock::insertConfigDocument(OperationContext* opCtx, return {ErrorCodes::InternalError, "Method not implemented"}; } +void ShardingCatalogClientMock::insertConfigDocumentsAsRetryableWrite( + OperationContext* opCtx, + const NamespaceString& nss, + std::vector<BSONObj> docs, + const WriteConcernOptions& writeConcern) {} + StatusWith<bool> ShardingCatalogClientMock::updateConfigDocument( OperationContext* opCtx, const NamespaceString& nss, diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.h b/src/mongo/s/catalog/sharding_catalog_client_mock.h index 5af200acd7e..f8c84300cc1 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_mock.h +++ b/src/mongo/s/catalog/sharding_catalog_client_mock.h @@ -116,6 +116,11 @@ public: const BSONObj& doc, const WriteConcernOptions& writeConcern) override; + void insertConfigDocumentsAsRetryableWrite(OperationContext* opCtx, + const NamespaceString& nss, + std::vector<BSONObj> docs, + const WriteConcernOptions& writeConcern) override; + StatusWith<bool> updateConfigDocument(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& query, diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp index ae2a1832c40..dc5da6c1694 100644 --- a/src/mongo/s/write_ops/batch_write_op.cpp +++ b/src/mongo/s/write_ops/batch_write_op.cpp @@ -369,8 +369,8 @@ Status BatchWriteOp::targetBatch(const NSTargeter& targeter, // Account the array overhead once for the actual updates array and once for the statement // ids array, if retryable writes are used const int writeSizeBytes = getWriteSizeBytes(writeOp) + - write_ops::kBSONArrayPerElementOverheadBytes + - (_batchTxnNum ? write_ops::kBSONArrayPerElementOverheadBytes + 4 : 0); + write_ops::kWriteCommandBSONArrayPerElementOverheadBytes + + (_batchTxnNum ? write_ops::kWriteCommandBSONArrayPerElementOverheadBytes + 4 : 0); if (wouldMakeBatchesTooBig(writes, writeSizeBytes, batchMap)) { invariant(!batchMap.empty()); diff --git a/src/mongo/s/write_ops/batch_write_op.h b/src/mongo/s/write_ops/batch_write_op.h index b01814aa138..3c946f0f4fe 100644 --- a/src/mongo/s/write_ops/batch_write_op.h +++ b/src/mongo/s/write_ops/batch_write_op.h @@ -50,6 +50,11 @@ class OperationContext; class TargetedWriteBatch; class TrackedErrors; +// Conservative overhead per element contained in the write batch. This value was calculated as 1 +// byte (element type) + 5 bytes (max string encoding of the array index encoded as string and the +// maximum key is 99999) + 1 byte (zero terminator) = 7 bytes +const int kWriteCommandBSONArrayPerElementOverheadBytes = 7; + /** * Simple struct for storing an error with an endpoint. * |