diff options
author | Blake Oler <blake.oler@mongodb.com> | 2019-04-09 18:08:53 -0400 |
---|---|---|
committer | Blake Oler <blake.oler@mongodb.com> | 2019-06-04 11:26:46 -0400 |
commit | 0177acdb774e1808b01f1b2139420fabb304694d (patch) | |
tree | 297e7d60d5b13d5f5a3d11cd896ae0ecdf263625 | |
parent | da806668926bcbdddbdc3d1aa86216810e4041d5 (diff) | |
download | mongo-0177acdb774e1808b01f1b2139420fabb304694d.tar.gz |
SERVER-40346 Use AlternativeSessionRegion to insert config documents as retryable write
(cherry picked from commit 29ef1a415c74c883746325f13a8eaaa1831f8102)
17 files changed, 212 insertions, 50 deletions
diff --git a/jstests/auth/list_all_sessions.js b/jstests/auth/list_all_sessions.js index 1337fc2e94c..7f077bee537 100644 --- a/jstests/auth/list_all_sessions.js +++ b/jstests/auth/list_all_sessions.js @@ -56,16 +56,12 @@ runListAllSessionsTest(mongod); MongoRunner.stopMongod(mongod); - // TODO: Remove 'shardAsReplicaSet: false' when SERVER-32672 is fixed. - const st = new ShardingTest({ - shards: 1, - mongos: 1, - config: 1, - other: {keyFile: 'jstests/libs/key1', shardAsReplicaSet: false} - }); + const st = + new ShardingTest({shards: 1, mongos: 1, config: 1, other: {keyFile: 'jstests/libs/key1'}}); // Ensure that the sessions collection exists. st.c0.getDB("admin").runCommand({refreshLogicalSessionCacheNow: 1}); + st.rs0.getPrimary().getDB("admin").runCommand({refreshLogicalSessionCacheNow: 1}); runListAllSessionsTest(st.s0); st.stop(); diff --git a/jstests/noPassthrough/transaction_reaper.js b/jstests/noPassthrough/transaction_reaper.js index 71a0977f972..b0574c099c7 100644 --- a/jstests/noPassthrough/transaction_reaper.js +++ b/jstests/noPassthrough/transaction_reaper.js @@ -50,7 +50,10 @@ this.st.s0.getDB("admin").runCommand({shardCollection: "test.test", key: {_id: 1}}); // Ensure that the sessions collection exists. - this.st.c0.getDB("admin").runCommand({refreshLogicalSessionCacheNow: 1}); + assert.commandWorked( + this.st.c0.getDB("admin").runCommand({refreshLogicalSessionCacheNow: 1})); + assert.commandWorked( + this.st.rs0.getPrimary().getDB("admin").runCommand({refreshLogicalSessionCacheNow: 1})); } Sharding.prototype.stop = function() { diff --git a/jstests/noPassthrough/verify_session_cache_updates.js b/jstests/noPassthrough/verify_session_cache_updates.js index 6fb6d36ea7c..48622ba7b95 100644 --- a/jstests/noPassthrough/verify_session_cache_updates.js +++ b/jstests/noPassthrough/verify_session_cache_updates.js @@ -68,6 +68,8 @@ { var st = new ShardingTest({shards: 1, mongos: 1, config: 1}); + st.rs0.getPrimary().getDB("admin").runCommand({refreshLogicalSessionCacheNow: 1}); + runTest(st.s0); st.stop(); } diff --git a/jstests/sharding/sessions_collection_auto_healing.js b/jstests/sharding/sessions_collection_auto_healing.js index a9526628ee0..8d0c91f9d85 100644 --- a/jstests/sharding/sessions_collection_auto_healing.js +++ b/jstests/sharding/sessions_collection_auto_healing.js @@ -116,7 +116,10 @@ load('jstests/libs/sessions_collection.js'); validateSessionsCollection(shard, true, true); - assert.eq(shardConfig.system.sessions.count(), 1, "did not flush config's sessions"); + // We will have two sessions because of the session used in the shardCollection's retryable + // write to shard the sessions collection. It will disappear after we run the refresh + // function on the shard. + assert.eq(shardConfig.system.sessions.count(), 2, "did not flush config's sessions"); // Now, if we do refreshes on the other servers, their in-mem records will // be written to the collection. diff --git a/jstests/sharding/verify_sessions_expiration_sharded.js b/jstests/sharding/verify_sessions_expiration_sharded.js index 3de71e881b2..fe743f147d6 100644 --- a/jstests/sharding/verify_sessions_expiration_sharded.js +++ b/jstests/sharding/verify_sessions_expiration_sharded.js @@ -27,9 +27,11 @@ const startSession = {startSession: 1}; const failPointName = "waitAfterPinningCursorBeforeGetMoreBatch"; - function refreshSessionsAndVerifyCount(config, expectedCount) { - config.runCommand(refresh); - assert.eq(config.system.sessions.count(), expectedCount); + function refreshSessionsAndVerifyCount(mongosConfig, shardConfig, expectedCount) { + mongosConfig.runCommand(refresh); + shardConfig.runCommand(refresh); + + assert.eq(mongosConfig.system.sessions.count(), expectedCount); } function verifyOpenCursorCount(db, expectedCount) { @@ -49,18 +51,19 @@ let mongos = shardingTest.s; let db = mongos.getDB(dbName); - let config = mongos.getDB("config"); + let mongosConfig = mongos.getDB("config"); + let shardConfig = shardingTest.rs0.getPrimary().getDB("config"); // 1. Verify that sessions expire from config.system.sessions after the timeout has passed. for (let i = 0; i < 5; i++) { let res = db.runCommand(startSession); assert.commandWorked(res, "unable to start session"); } - refreshSessionsAndVerifyCount(config, 5); + refreshSessionsAndVerifyCount(mongosConfig, shardConfig, 5); // Manually delete entries in config.system.sessions to simulate TTL expiration. - assert.commandWorked(config.system.sessions.remove({})); - refreshSessionsAndVerifyCount(config, 0); + assert.commandWorked(mongosConfig.system.sessions.remove({})); + refreshSessionsAndVerifyCount(mongosConfig, shardConfig, 0); // 2. Verify that getMores after finds will update the 'lastUse' field on documents in the // config.system.sessions collection. @@ -77,8 +80,8 @@ assert(cursors[i].hasNext()); } - refreshSessionsAndVerifyCount(config, 5); - verifyOpenCursorCount(config, 5); + refreshSessionsAndVerifyCount(mongosConfig, shardConfig, 5); + verifyOpenCursorCount(mongosConfig, 5); let sessionsCollectionArray; let lastUseValues = []; @@ -87,10 +90,10 @@ cursors[j].next(); } - refreshSessionsAndVerifyCount(config, 5); - verifyOpenCursorCount(config, 5); + refreshSessionsAndVerifyCount(mongosConfig, shardConfig, 5); + verifyOpenCursorCount(mongosConfig, 5); - sessionsCollectionArray = getSessions(config); + sessionsCollectionArray = getSessions(mongosConfig); if (i == 0) { for (let j = 0; j < sessionsCollectionArray.length; j++) { @@ -106,10 +109,10 @@ // 3. Verify that letting sessions expire (simulated by manual deletion) will kill their // cursors. - assert.commandWorked(config.system.sessions.remove({})); + assert.commandWorked(mongosConfig.system.sessions.remove({})); - refreshSessionsAndVerifyCount(config, 0); - verifyOpenCursorCount(config, 0); + refreshSessionsAndVerifyCount(mongosConfig, shardConfig, 0); + verifyOpenCursorCount(mongosConfig, 0); for (let i = 0; i < cursors.length; i++) { assert.commandFailedWithCode( @@ -128,10 +131,10 @@ sessionId: pinnedCursorSession, db: pinnedCursorDB, assertFunction: (cursorId, coll) => { - assert.commandWorked(config.system.sessions.remove({})); + assert.commandWorked(mongosConfig.system.sessions.remove({})); + verifyOpenCursorCount(mongosConfig, 1); - verifyOpenCursorCount(config, 1); - refreshSessionsAndVerifyCount(config, 1); + refreshSessionsAndVerifyCount(mongosConfig, shardConfig, 1); let db = coll.getDB(); assert.commandWorked(db.runCommand({killCursors: coll.getName(), cursors: [cursorId]})); diff --git a/src/mongo/db/ops/write_ops_parsers.h b/src/mongo/db/ops/write_ops_parsers.h index cf5d87ee882..ef5a95eac41 100644 --- a/src/mongo/db/ops/write_ops_parsers.h +++ b/src/mongo/db/ops/write_ops_parsers.h @@ -37,6 +37,11 @@ namespace mongo { namespace write_ops { +// Conservative per array element overhead. This value was calculated as 1 byte (element type) + 5 +// bytes (max string encoding of the array index encoded as string and the maximum key is 99999) + 1 +// byte (zero terminator) = 7 bytes +constexpr int kWriteCommandBSONArrayPerElementOverheadBytes = 7; + /** * Parses the 'limit' property of a delete entry, which has inverted meaning from the 'multi' * property of an update. diff --git a/src/mongo/db/s/config/initial_split_policy.cpp b/src/mongo/db/s/config/initial_split_policy.cpp index 81d0ecb22ed..27910363707 100644 --- a/src/mongo/db/s/config/initial_split_policy.cpp +++ b/src/mongo/db/s/config/initial_split_policy.cpp @@ -387,17 +387,6 @@ InitialSplitPolicy::ShardCollectionConfig InitialSplitPolicy::createFirstChunks( return initialChunks; } -void InitialSplitPolicy::writeFirstChunksToConfig( - OperationContext* opCtx, const InitialSplitPolicy::ShardCollectionConfig& initialChunks) { - for (const auto& chunk : initialChunks.chunks) { - uassertStatusOK(Grid::get(opCtx)->catalogClient()->insertConfigDocument( - opCtx, - ChunkType::ConfigNS, - chunk.toConfigBSON(), - ShardingCatalogClient::kMajorityWriteConcern)); - } -} - boost::optional<CollectionType> InitialSplitPolicy::checkIfCollectionAlreadyShardedWithSameOptions( OperationContext* opCtx, const NamespaceString& nss, diff --git a/src/mongo/db/s/config/initial_split_policy.h b/src/mongo/db/s/config/initial_split_policy.h index b7bf33c797a..26a991cf038 100644 --- a/src/mongo/db/s/config/initial_split_policy.h +++ b/src/mongo/db/s/config/initial_split_policy.h @@ -137,16 +137,10 @@ public: int numContiguousChunksPerShard = 1); /** - * Writes to the config server the first chunks for a newly sharded collection. - */ - static void writeFirstChunksToConfig( - OperationContext* opCtx, const InitialSplitPolicy::ShardCollectionConfig& initialChunks); - - /** * Throws an exception if the collection is already sharded with different options. * - * If the collection is already sharded with the same options, returns the existing collection's - * full spec, else returns boost::none. + * If the collection is already sharded with the same options, returns the existing + * collection's full spec, else returns boost::none. */ static boost::optional<CollectionType> checkIfCollectionAlreadyShardedWithSameOptions( OperationContext* opCtx, diff --git a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp index 9dbbc01b517..d70fd1b7915 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp @@ -128,6 +128,17 @@ boost::optional<UUID> checkCollectionOptions(OperationContext* opCtx, return uassertStatusOK(UUID::parse(collectionInfo["uuid"])); } +void writeFirstChunksForShardCollection( + OperationContext* opCtx, const InitialSplitPolicy::ShardCollectionConfig& initialChunks) { + for (const auto& chunk : initialChunks.chunks) { + uassertStatusOK(Grid::get(opCtx)->catalogClient()->insertConfigDocument( + opCtx, + ChunkType::ConfigNS, + chunk.toConfigBSON(), + ShardingCatalogClient::kMajorityWriteConcern)); + } +} + } // namespace void checkForExistingChunks(OperationContext* opCtx, const NamespaceString& nss) { @@ -417,7 +428,7 @@ void ShardingCatalogManager::shardCollection(OperationContext* opCtx, treatAsNoZonesDefined, treatAsEmpty); - InitialSplitPolicy::writeFirstChunksToConfig(opCtx, initialChunks); + writeFirstChunksForShardCollection(opCtx, initialChunks); { CollectionType coll; diff --git a/src/mongo/db/s/shardsvr_shard_collection.cpp b/src/mongo/db/s/shardsvr_shard_collection.cpp index f7b8b55d7dd..693f06292fa 100644 --- a/src/mongo/db/s/shardsvr_shard_collection.cpp +++ b/src/mongo/db/s/shardsvr_shard_collection.cpp @@ -413,6 +413,22 @@ void checkForExistingChunks(OperationContext* opCtx, const NamespaceString& nss) numChunks == 0); } +void writeFirstChunksToConfig(OperationContext* opCtx, + const InitialSplitPolicy::ShardCollectionConfig& initialChunks) { + + std::vector<BSONObj> chunkObjs; + chunkObjs.reserve(initialChunks.chunks.size()); + for (const auto& chunk : initialChunks.chunks) { + chunkObjs.push_back(chunk.toConfigBSON()); + } + + Grid::get(opCtx)->catalogClient()->insertConfigDocumentsAsRetryableWrite( + opCtx, + ChunkType::ConfigNS, + std::move(chunkObjs), + ShardingCatalogClient::kMajorityWriteConcern); +} + void shardCollection(OperationContext* opCtx, const NamespaceString& nss, const boost::optional<UUID> uuid, @@ -520,7 +536,7 @@ void shardCollection(OperationContext* opCtx, } // Insert chunk documents to config.chunks on the config server. - InitialSplitPolicy::writeFirstChunksToConfig(opCtx, initialChunks); + writeFirstChunksToConfig(opCtx, initialChunks); { CollectionType coll; diff --git a/src/mongo/s/catalog/SConscript b/src/mongo/s/catalog/SConscript index 88d3a3eb090..4ea24f8ceb6 100644 --- a/src/mongo/s/catalog/SConscript +++ b/src/mongo/s/catalog/SConscript @@ -83,6 +83,9 @@ env.Library( 'dist_lock_manager', 'sharding_catalog_client', ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/logical_session_id_helpers' + ] ) env.Library( diff --git a/src/mongo/s/catalog/sharding_catalog_client.h b/src/mongo/s/catalog/sharding_catalog_client.h index 35696c159be..962b179f14a 100644 --- a/src/mongo/s/catalog/sharding_catalog_client.h +++ b/src/mongo/s/catalog/sharding_catalog_client.h @@ -340,6 +340,19 @@ public: const WriteConcernOptions& writeConcern) = 0; /** + * Directly inserts documents in the specified namespace on the config server. Inserts said + * documents using a retryable write. Underneath, a session is created and destroyed -- this + * ad-hoc session creation strategy should never be used outside of specific, non-performant + * code paths. + * + * Must only be used for insertions in the 'config' database. + */ + virtual void insertConfigDocumentsAsRetryableWrite(OperationContext* opCtx, + const NamespaceString& nss, + std::vector<BSONObj> docs, + const WriteConcernOptions& writeConcern) = 0; + + /** * Updates a single document in the specified namespace on the config server. The document must * have an _id index. Must only be used for updates to the 'config' database. * diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp index 3c0ff06aeb7..d4f51de98b8 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp +++ b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp @@ -43,6 +43,8 @@ #include "mongo/client/remote_command_targeter.h" #include "mongo/db/client.h" #include "mongo/db/commands.h" +#include "mongo/db/logical_session_cache.h" +#include "mongo/db/logical_session_id_helpers.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/optime.h" @@ -65,6 +67,7 @@ #include "mongo/s/request_types/set_shard_version_request.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/s/shard_util.h" +#include "mongo/s/write_ops/batch_write_op.h" #include "mongo/s/write_ops/batched_command_request.h" #include "mongo/s/write_ops/batched_command_response.h" #include "mongo/util/assert_util.h" @@ -88,12 +91,40 @@ using str::stream; namespace { +class AlternativeSessionRegion { +public: + AlternativeSessionRegion(OperationContext* opCtx) + : _alternateClient(opCtx->getServiceContext()->makeClient("alternative-session-region")), + _acr(_alternateClient), + _newOpCtx(cc().makeOperationContext()), + _lsid(makeLogicalSessionId(opCtx)) { + _newOpCtx->setLogicalSessionId(_lsid); + } + + ~AlternativeSessionRegion() { + LogicalSessionCache::get(opCtx())->endSessions({_lsid}); + } + + OperationContext* opCtx() { + return &*_newOpCtx; + } + +private: + ServiceContext::UniqueClient _alternateClient; + AlternativeClientRegion _acr; + ServiceContext::UniqueOperationContext _newOpCtx; + LogicalSessionId _lsid; +}; + const ReadPreferenceSetting kConfigReadSelector(ReadPreference::Nearest, TagSet{}); const ReadPreferenceSetting kConfigPrimaryPreferredSelector(ReadPreference::PrimaryPreferred, TagSet{}); const int kMaxReadRetry = 3; const int kMaxWriteRetry = 3; +const int kRetryableBatchWriteBSONSizeOverhead = + write_ops::kWriteCommandBSONArrayPerElementOverheadBytes * 2; + const std::string kActionLogCollectionName("actionlog"); const int kActionLogCollectionSizeMB = 20 * 1024 * 1024; @@ -107,6 +138,38 @@ void toBatchError(const Status& status, BatchedCommandResponse* response) { response->setStatus(status); } +void sendRetryableWriteBatchRequestToConfig(OperationContext* opCtx, + const NamespaceString& nss, + std::vector<BSONObj>& docs, + TxnNumber txnNumber, + const WriteConcernOptions& writeConcern) { + auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); + + BatchedCommandRequest request([&] { + write_ops::Insert insertOp(nss); + insertOp.setDocuments(docs); + return insertOp; + }()); + request.setWriteConcern(writeConcern.toBSON()); + + BSONObj cmdObj = request.toBSON(); + BSONObjBuilder bob(cmdObj); + bob.append(OperationSessionInfo::kTxnNumberFieldName, txnNumber); + + BatchedCommandResponse batchResponse; + auto response = configShard->runCommand(opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + nss.db().toString(), + bob.obj(), + Shard::kDefaultConfigCommandTimeout, + Shard::RetryPolicy::kIdempotent); + + auto writeStatus = Shard::CommandResponse::processBatchWriteResponse(response, &batchResponse); + + uassertStatusOK(batchResponse.toStatus()); + uassertStatusOK(writeStatus); +} + } // namespace ShardingCatalogClientImpl::ShardingCatalogClientImpl( @@ -898,6 +961,50 @@ Status ShardingCatalogClientImpl::insertConfigDocument(OperationContext* opCtx, MONGO_UNREACHABLE; } +void ShardingCatalogClientImpl::insertConfigDocumentsAsRetryableWrite( + OperationContext* opCtx, + const NamespaceString& nss, + std::vector<BSONObj> docs, + const WriteConcernOptions& writeConcern) { + invariant(nss.db() == NamespaceString::kAdminDb || nss.db() == NamespaceString::kConfigDb); + + AlternativeSessionRegion asr(opCtx); + TxnNumber currentTxnNumber = 0; + + std::vector<BSONObj> workingBatch; + size_t workingBatchItemSize = 0; + + int workingBatchDocSize = kRetryableBatchWriteBSONSizeOverhead; + + while (!docs.empty()) { + BSONObj toAdd = docs.back(); + docs.pop_back(); + + int docSize = toAdd.objsize(); + bool batchAtSizeLimit = (workingBatchItemSize + 1 > write_ops::kMaxWriteBatchSize) || + (workingBatchDocSize + docSize > BSONObjMaxUserSize); + + if (batchAtSizeLimit) { + sendRetryableWriteBatchRequestToConfig( + asr.opCtx(), nss, workingBatch, currentTxnNumber, writeConcern); + ++currentTxnNumber; + + workingBatch.clear(); + workingBatchItemSize = 0; + workingBatchDocSize = kRetryableBatchWriteBSONSizeOverhead; + } + + workingBatch.push_back(toAdd); + ++workingBatchItemSize; + workingBatchDocSize += docSize; + } + + if (!workingBatch.empty()) { + sendRetryableWriteBatchRequestToConfig( + asr.opCtx(), nss, workingBatch, currentTxnNumber, writeConcern); + } +} + StatusWith<bool> ShardingCatalogClientImpl::updateConfigDocument( OperationContext* opCtx, const NamespaceString& nss, diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.h b/src/mongo/s/catalog/sharding_catalog_client_impl.h index 9d7c0979e3b..65b5c903bd2 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_impl.h +++ b/src/mongo/s/catalog/sharding_catalog_client_impl.h @@ -155,6 +155,11 @@ public: const BSONObj& doc, const WriteConcernOptions& writeConcern) override; + void insertConfigDocumentsAsRetryableWrite(OperationContext* opCtx, + const NamespaceString& nss, + std::vector<BSONObj> docs, + const WriteConcernOptions& writeConcern) override; + StatusWith<bool> updateConfigDocument(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& query, diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp index 1fe6a240757..5d80c9d8883 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp +++ b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp @@ -179,6 +179,12 @@ Status ShardingCatalogClientMock::insertConfigDocument(OperationContext* opCtx, return {ErrorCodes::InternalError, "Method not implemented"}; } +void ShardingCatalogClientMock::insertConfigDocumentsAsRetryableWrite( + OperationContext* opCtx, + const NamespaceString& nss, + std::vector<BSONObj> docs, + const WriteConcernOptions& writeConcern) {} + StatusWith<bool> ShardingCatalogClientMock::updateConfigDocument( OperationContext* opCtx, const NamespaceString& nss, diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.h b/src/mongo/s/catalog/sharding_catalog_client_mock.h index a3c3c1ec07e..e01debc632d 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_mock.h +++ b/src/mongo/s/catalog/sharding_catalog_client_mock.h @@ -128,6 +128,11 @@ public: const BSONObj& doc, const WriteConcernOptions& writeConcern) override; + void insertConfigDocumentsAsRetryableWrite(OperationContext* opCtx, + const NamespaceString& nss, + std::vector<BSONObj> docs, + const WriteConcernOptions& writeConcern) override; + StatusWith<bool> updateConfigDocument(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& query, diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp index c3ec4b07338..229ce247335 100644 --- a/src/mongo/s/write_ops/batch_write_op.cpp +++ b/src/mongo/s/write_ops/batch_write_op.cpp @@ -316,8 +316,9 @@ Status BatchWriteOp::targetBatch(const NSTargeter& targeter, // Account the array overhead once for the actual updates array and once for the statement // ids array, if retryable writes are used - const int writeSizeBytes = getWriteSizeBytes(writeOp) + kBSONArrayPerElementOverheadBytes + - (_batchTxnNum ? kBSONArrayPerElementOverheadBytes + 4 : 0); + const int writeSizeBytes = getWriteSizeBytes(writeOp) + + write_ops::kWriteCommandBSONArrayPerElementOverheadBytes + + (_batchTxnNum ? write_ops::kWriteCommandBSONArrayPerElementOverheadBytes + 4 : 0); if (wouldMakeBatchesTooBig(writes, writeSizeBytes, batchMap)) { invariant(!batchMap.empty()); |