diff options
author | Blake Oler <blake.oler@mongodb.com> | 2019-04-09 18:08:53 -0400 |
---|---|---|
committer | Blake Oler <blake.oler@mongodb.com> | 2019-04-16 15:57:30 -0400 |
commit | 29ef1a415c74c883746325f13a8eaaa1831f8102 (patch) | |
tree | 13ce885258c58747b604a226215bdaca535361a5 /src/mongo/s/catalog | |
parent | e984f9781d2947e3b1fc10ae8535d630c49b5e94 (diff) | |
download | mongo-29ef1a415c74c883746325f13a8eaaa1831f8102.tar.gz |
SERVER-40346 Use AlternativeSessionRegion to insert config documents as retryable write
Diffstat (limited to 'src/mongo/s/catalog')
-rw-r--r-- | src/mongo/s/catalog/SConscript | 3 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_client.h | 13 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_client_impl.cpp | 106 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_client_impl.h | 5 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_client_mock.cpp | 6 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_client_mock.h | 5 |
6 files changed, 138 insertions, 0 deletions
diff --git a/src/mongo/s/catalog/SConscript b/src/mongo/s/catalog/SConscript index 72d095847c9..f4618bcd300 100644 --- a/src/mongo/s/catalog/SConscript +++ b/src/mongo/s/catalog/SConscript @@ -84,6 +84,9 @@ env.Library( 'dist_lock_manager', 'sharding_catalog_client', ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/logical_session_id_helpers' + ] ) env.Library( diff --git a/src/mongo/s/catalog/sharding_catalog_client.h b/src/mongo/s/catalog/sharding_catalog_client.h index cb16b5d26f9..febbf82fe06 100644 --- a/src/mongo/s/catalog/sharding_catalog_client.h +++ b/src/mongo/s/catalog/sharding_catalog_client.h @@ -322,6 +322,19 @@ public: const WriteConcernOptions& writeConcern) = 0; /** + * Directly inserts documents in the specified namespace on the config server. Inserts said + * documents using a retryable write. Underneath, a session is created and destroyed -- this + * ad-hoc session creation strategy should never be used outside of specific, non-performant + * code paths. + * + * Must only be used for insertions in the 'config' database. + */ + virtual void insertConfigDocumentsAsRetryableWrite(OperationContext* opCtx, + const NamespaceString& nss, + std::vector<BSONObj> docs, + const WriteConcernOptions& writeConcern) = 0; + + /** * Updates a single document in the specified namespace on the config server. The document must * have an _id index. Must only be used for updates to the 'config' database. * diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp index 063069922b0..b64f17d968c 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp +++ b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp @@ -42,6 +42,8 @@ #include "mongo/client/remote_command_targeter.h" #include "mongo/db/client.h" #include "mongo/db/commands.h" +#include "mongo/db/logical_session_cache.h" +#include "mongo/db/logical_session_id_helpers.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/optime.h" @@ -64,6 +66,7 @@ #include "mongo/s/request_types/set_shard_version_request.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/s/shard_util.h" +#include "mongo/s/write_ops/batch_write_op.h" #include "mongo/s/write_ops/batched_command_request.h" #include "mongo/s/write_ops/batched_command_response.h" #include "mongo/util/assert_util.h" @@ -87,12 +90,39 @@ using str::stream; namespace { +class AlternativeSessionRegion { +public: + AlternativeSessionRegion(OperationContext* opCtx) + : _alternateClient(opCtx->getServiceContext()->makeClient("alternative-session-region")), + _acr(_alternateClient), + _newOpCtx(cc().makeOperationContext()), + _lsid(makeLogicalSessionId(opCtx)) { + _newOpCtx->setLogicalSessionId(_lsid); + } + + ~AlternativeSessionRegion() { + LogicalSessionCache::get(opCtx())->endSessions({_lsid}); + } + + OperationContext* opCtx() { + return &*_newOpCtx; + } + +private: + ServiceContext::UniqueClient _alternateClient; + AlternativeClientRegion _acr; + ServiceContext::UniqueOperationContext _newOpCtx; + LogicalSessionId _lsid; +}; + const ReadPreferenceSetting kConfigReadSelector(ReadPreference::Nearest, TagSet{}); const ReadPreferenceSetting kConfigPrimaryPreferredSelector(ReadPreference::PrimaryPreferred, TagSet{}); const int kMaxReadRetry = 3; const int kMaxWriteRetry = 3; +const int kRetryableBatchWriteBSONSizeOverhead = kWriteCommandBSONArrayPerElementOverheadBytes * 2; + const NamespaceString kSettingsNamespace("config", "settings"); void toBatchError(const Status& status, BatchedCommandResponse* response) { @@ -100,6 +130,38 @@ void toBatchError(const Status& status, BatchedCommandResponse* response) { response->setStatus(status); } +void sendRetryableWriteBatchRequestToConfig(OperationContext* opCtx, + const NamespaceString& nss, + std::vector<BSONObj>& docs, + TxnNumber txnNumber, + const WriteConcernOptions& writeConcern) { + auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); + + BatchedCommandRequest request([&] { + write_ops::Insert insertOp(nss); + insertOp.setDocuments(docs); + return insertOp; + }()); + request.setWriteConcern(writeConcern.toBSON()); + + BSONObj cmdObj = request.toBSON(); + BSONObjBuilder bob(cmdObj); + bob.append(OperationSessionInfo::kTxnNumberFieldName, txnNumber); + + BatchedCommandResponse batchResponse; + auto response = configShard->runCommand(opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + nss.db().toString(), + bob.obj(), + Shard::kDefaultConfigCommandTimeout, + Shard::RetryPolicy::kIdempotent); + + auto writeStatus = Shard::CommandResponse::processBatchWriteResponse(response, &batchResponse); + + uassertStatusOK(batchResponse.toStatus()); + uassertStatusOK(writeStatus); +} + } // namespace ShardingCatalogClientImpl::ShardingCatalogClientImpl( @@ -813,6 +875,50 @@ Status ShardingCatalogClientImpl::insertConfigDocument(OperationContext* opCtx, MONGO_UNREACHABLE; } +void ShardingCatalogClientImpl::insertConfigDocumentsAsRetryableWrite( + OperationContext* opCtx, + const NamespaceString& nss, + std::vector<BSONObj> docs, + const WriteConcernOptions& writeConcern) { + invariant(nss.db() == NamespaceString::kAdminDb || nss.db() == NamespaceString::kConfigDb); + + AlternativeSessionRegion asr(opCtx); + TxnNumber currentTxnNumber = 0; + + std::vector<BSONObj> workingBatch; + size_t workingBatchItemSize = 0; + + int workingBatchDocSize = kRetryableBatchWriteBSONSizeOverhead; + + while (!docs.empty()) { + BSONObj toAdd = docs.back(); + docs.pop_back(); + + int docSize = toAdd.objsize(); + bool batchAtSizeLimit = (workingBatchItemSize + 1 > write_ops::kMaxWriteBatchSize) || + (workingBatchDocSize + docSize > BSONObjMaxUserSize); + + if (batchAtSizeLimit) { + sendRetryableWriteBatchRequestToConfig( + asr.opCtx(), nss, workingBatch, currentTxnNumber, writeConcern); + ++currentTxnNumber; + + workingBatch.clear(); + workingBatchItemSize = 0; + workingBatchDocSize = kRetryableBatchWriteBSONSizeOverhead; + } + + workingBatch.push_back(toAdd); + ++workingBatchItemSize; + workingBatchDocSize += docSize; + } + + if (!workingBatch.empty()) { + sendRetryableWriteBatchRequestToConfig( + asr.opCtx(), nss, workingBatch, currentTxnNumber, writeConcern); + } +} + StatusWith<bool> ShardingCatalogClientImpl::updateConfigDocument( OperationContext* opCtx, const NamespaceString& nss, diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.h b/src/mongo/s/catalog/sharding_catalog_client_impl.h index 3af675402fd..ab4e9506594 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_impl.h +++ b/src/mongo/s/catalog/sharding_catalog_client_impl.h @@ -143,6 +143,11 @@ public: const BSONObj& doc, const WriteConcernOptions& writeConcern) override; + void insertConfigDocumentsAsRetryableWrite(OperationContext* opCtx, + const NamespaceString& nss, + std::vector<BSONObj> docs, + const WriteConcernOptions& writeConcern) override; + StatusWith<bool> updateConfigDocument(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& query, diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp index 3c4bca683ce..abb2cd48a25 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp +++ b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp @@ -163,6 +163,12 @@ Status ShardingCatalogClientMock::insertConfigDocument(OperationContext* opCtx, return {ErrorCodes::InternalError, "Method not implemented"}; } +void ShardingCatalogClientMock::insertConfigDocumentsAsRetryableWrite( + OperationContext* opCtx, + const NamespaceString& nss, + std::vector<BSONObj> docs, + const WriteConcernOptions& writeConcern) {} + StatusWith<bool> ShardingCatalogClientMock::updateConfigDocument( OperationContext* opCtx, const NamespaceString& nss, diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.h b/src/mongo/s/catalog/sharding_catalog_client_mock.h index 5af200acd7e..f8c84300cc1 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_mock.h +++ b/src/mongo/s/catalog/sharding_catalog_client_mock.h @@ -116,6 +116,11 @@ public: const BSONObj& doc, const WriteConcernOptions& writeConcern) override; + void insertConfigDocumentsAsRetryableWrite(OperationContext* opCtx, + const NamespaceString& nss, + std::vector<BSONObj> docs, + const WriteConcernOptions& writeConcern) override; + StatusWith<bool> updateConfigDocument(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& query, |