diff options
author | Alex Taskov <alex.taskov@mongodb.com> | 2020-02-04 13:18:29 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-02-04 18:42:34 +0000 |
commit | 8d3429fed4e5307a9e79fe99494ccefc388e54f4 (patch) | |
tree | 615d6e74116fc32da0c1ac92227f22ad8f615f38 | |
parent | 0eaddad5bb19553974910d4957d18e1ad790a504 (diff) | |
download | mongo-8d3429fed4e5307a9e79fe99494ccefc388e54f4.tar.gz |
SERVER-45339 Make MigrationSourceManager send _recvChunkStart to recipient shard as a retryable write
-rw-r--r-- | jstests/sharding/addshard5.js | 12 | ||||
-rw-r--r-- | jstests/sharding/convert_to_and_from_sharded.js | 9 | ||||
-rw-r--r-- | jstests/ssl/libs/ssl_helpers.js | 13 | ||||
-rw-r--r-- | jstests/ssl/mixed_mode_sharded_transition.js | 9 | ||||
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source.h | 6 | ||||
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy.h | 5 | ||||
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.cpp | 76 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.h | 4 | ||||
-rw-r--r-- | src/mongo/db/s/migration_source_manager.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/s/migration_source_manager.h | 1 | ||||
-rw-r--r-- | src/mongo/db/s/start_chunk_clone_request.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/s/start_chunk_clone_request.h | 13 | ||||
-rw-r--r-- | src/mongo/db/s/start_chunk_clone_request_test.cpp | 15 | ||||
-rw-r--r-- | src/mongo/db/transaction_validation.cpp | 11 | ||||
-rw-r--r-- | src/mongo/shell/shardingtest.js | 11 |
17 files changed, 177 insertions, 48 deletions
diff --git a/jstests/sharding/addshard5.js b/jstests/sharding/addshard5.js index 7a2b6866c8c..bb14780b72d 100644 --- a/jstests/sharding/addshard5.js +++ b/jstests/sharding/addshard5.js @@ -29,8 +29,11 @@ assert.commandWorked(mongos.adminCommand( assert.commandWorked(mongos.adminCommand({removeShard: st.shard1.shardName})); assert.commandWorked(mongos.adminCommand({removeShard: st.shard1.shardName})); -var shard2 = MongoRunner.runMongod({'shardsvr': ''}); -assert.commandWorked(mongos.adminCommand({addShard: shard2.host, name: st.shard1.shardName})); +let shard2 = new ReplSetTest({nodes: 2, nodeOptions: {shardsvr: ""}}); +shard2.startSet(); +shard2.initiate(); + +assert.commandWorked(mongos.adminCommand({addShard: shard2.getURL(), name: st.shard1.shardName})); jsTest.log('Shard was dropped and re-added with same name...'); st.printShardingStatus(); @@ -39,8 +42,9 @@ st.printShardingStatus(); assert.commandWorked( mongos.adminCommand({moveChunk: coll + '', find: {_id: 0}, to: st.shard1.shardName})); -assert.eq('world', shard2.getCollection(coll + '').findOne().hello); +let shard2Conn = shard2.getPrimary(); +assert.eq('world', shard2Conn.getCollection(coll + '').findOne().hello); st.stop(); -MongoRunner.stopMongod(shard2); +shard2.stopSet(); })(); diff --git a/jstests/sharding/convert_to_and_from_sharded.js b/jstests/sharding/convert_to_and_from_sharded.js index f615a277195..0202e70e605 100644 --- a/jstests/sharding/convert_to_and_from_sharded.js +++ b/jstests/sharding/convert_to_and_from_sharded.js @@ -80,9 +80,12 @@ for (x = 0; x < 4; x++) { assert.commandWorked(st.s.adminCommand({split: 'test.sharded', middle: {_id: x}})); } -var newMongod = MongoRunner.runMongod({shardsvr: ''}); +let newShard = + new ReplSetTest({name: "toRemoveLater", nodes: NUM_NODES, nodeOptions: {shardsvr: ""}}); +newShard.startSet(); +newShard.initiate(); -assert.commandWorked(st.s.adminCommand({addShard: newMongod.name, name: 'toRemoveLater'})); +assert.commandWorked(st.s.adminCommand({addShard: newShard.getURL(), name: 'toRemoveLater'})); for (x = 0; x < 2; x++) { assert.commandWorked( @@ -102,7 +105,7 @@ assert.soon(function() { return res.state == 'completed'; }); -MongoRunner.stopMongod(newMongod); +newShard.stopSet(); checkBasicCRUD(st.s.getDB('test').unsharded); checkBasicCRUD(st.s.getDB('test').sharded); diff --git a/jstests/ssl/libs/ssl_helpers.js b/jstests/ssl/libs/ssl_helpers.js index db53fa7c687..65ba385b321 100644 --- a/jstests/ssl/libs/ssl_helpers.js +++ b/jstests/ssl/libs/ssl_helpers.js @@ -110,7 +110,7 @@ function testShardedLookup(shardingTest) { * Takes in two mongod/mongos configuration options and runs a basic * sharding test to see if they can work together... */ -function mixedShardTest(options1, options2, shouldSucceed) { +function mixedShardTest(options1, options2, shouldSucceed, disableResumableRangeDeleter) { let authSucceeded = false; try { // Start ShardingTest with enableBalancer because ShardingTest attempts to turn @@ -123,12 +123,19 @@ function mixedShardTest(options1, options2, shouldSucceed) { // // Once SERVER-14017 is fixed the "enableBalancer" line can be removed. // TODO: SERVER-43899 Make sharding_with_x509.js and mixed_mode_sharded_transition.js start - // shards as replica sets. + // shards as replica sets and remove disableResumableRangeDeleter parameter. + let otherOptions = {enableBalancer: true}; + + if (disableResumableRangeDeleter) { + otherOptions.shardAsReplicaSet = false; + otherOptions.shardOptions = {setParameter: {"disableResumableRangeDeleter": true}}; + } + var st = new ShardingTest({ mongos: [options1], config: [options1], shards: [options1, options2], - other: {enableBalancer: true, shardAsReplicaSet: false} + other: otherOptions }); // Create admin user in case the options include auth diff --git a/jstests/ssl/mixed_mode_sharded_transition.js b/jstests/ssl/mixed_mode_sharded_transition.js index 8f9136b3e39..f6b0d7bbaec 100644 --- a/jstests/ssl/mixed_mode_sharded_transition.js +++ b/jstests/ssl/mixed_mode_sharded_transition.js @@ -11,6 +11,8 @@ load('jstests/ssl/libs/ssl_helpers.js'); (function() { 'use strict'; +const disableResumableRangeDeleter = true; + var transitionToX509AllowSSL = Object.merge(allowSSL, {transitionToAuth: '', clusterAuthMode: 'x509'}); var transitionToX509PreferSSL = @@ -18,15 +20,16 @@ var transitionToX509PreferSSL = var x509RequireSSL = Object.merge(requireSSL, {clusterAuthMode: 'x509'}); function testCombos(opt1, opt2, shouldSucceed) { - mixedShardTest(opt1, opt2, shouldSucceed); - mixedShardTest(opt2, opt1, shouldSucceed); + mixedShardTest(opt1, opt2, shouldSucceed, disableResumableRangeDeleter); + mixedShardTest(opt2, opt1, shouldSucceed, disableResumableRangeDeleter); } print('=== Testing transitionToAuth/allowSSL - transitionToAuth/preferSSL cluster ==='); testCombos(transitionToX509AllowSSL, transitionToX509PreferSSL, true); print('=== Testing transitionToAuth/preferSSL - transitionToAuth/preferSSL cluster ==='); -mixedShardTest(transitionToX509PreferSSL, transitionToX509PreferSSL, true); +mixedShardTest( + transitionToX509PreferSSL, transitionToX509PreferSSL, true, disableResumableRangeDeleter); print('=== Testing transitionToAuth/preferSSL - x509/requireSSL cluster ==='); testCombos(transitionToX509PreferSSL, x509RequireSSL, true); diff --git a/src/mongo/db/s/migration_chunk_cloner_source.h b/src/mongo/db/s/migration_chunk_cloner_source.h index f59efc0f16b..cb4ebb53a29 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source.h +++ b/src/mongo/db/s/migration_chunk_cloner_source.h @@ -29,6 +29,7 @@ #pragma once +#include "mongo/db/logical_session_id.h" #include "mongo/util/time_support.h" namespace mongo { @@ -72,7 +73,10 @@ public: * NOTE: Must be called without any locks and must succeed, before any other methods are called * (except for cancelClone and [insert/update/delete]Op). */ - virtual Status startClone(OperationContext* opCtx, const UUID& migrationId) = 0; + virtual Status startClone(OperationContext* opCtx, + const UUID& migrationId, + const LogicalSessionId& lsid, + TxnNumber txnNumber) = 0; /** * Blocking method, which uses some custom selected logic for deciding whether it is appropriate diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index b9dd891ed17..454fe2ac76b 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -238,7 +238,9 @@ MigrationChunkClonerSourceLegacy::~MigrationChunkClonerSourceLegacy() { } Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx, - const UUID& migrationId) { + const UUID& migrationId, + const LogicalSessionId& lsid, + TxnNumber txnNumber) { invariant(_state == kNew); invariant(!opCtx->lockState()->isLocked()); @@ -285,6 +287,8 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx, StartChunkCloneRequest::appendAsCommand(&cmdBuilder, _args.getNss(), migrationId, + lsid, + txnNumber, _sessionId, _donorConnStr, _args.getFromShardId(), diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h index a7236c136ce..d5f56e196ed 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h @@ -90,7 +90,10 @@ public: HostAndPort recipientHost); ~MigrationChunkClonerSourceLegacy(); - Status startClone(OperationContext* opCtx, const UUID& migrationId) override; + Status startClone(OperationContext* opCtx, + const UUID& migrationId, + const LogicalSessionId& lsid, + TxnNumber txnNumber) override; Status awaitUntilCriticalSectionIsAppropriate(OperationContext* opCtx, Milliseconds maxTimeToWait) override; diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp index 89ca8742cfc..8750348e8cc 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp @@ -32,6 +32,7 @@ #include "mongo/client/remote_command_targeter_mock.h" #include "mongo/db/catalog_raii.h" #include "mongo/db/dbdirectclient.h" +#include "mongo/db/logical_session_id_helpers.h" #include "mongo/db/namespace_string.h" #include "mongo/db/s/migration_chunk_cloner_source_legacy.h" #include "mongo/s/catalog/sharding_catalog_client_mock.h" @@ -101,6 +102,8 @@ protected: clockSource->advance(Seconds(1)); operationContext()->getServiceContext()->setFastClockSource(std::move(clockSource)); + + _lsid = makeLogicalSessionId(operationContext()); } void tearDown() override { @@ -167,6 +170,10 @@ protected: return BSON("_id" << value << "X" << value); } +protected: + LogicalSessionId _lsid; + TxnNumber _txnNumber{0}; + private: std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient( std::unique_ptr<DistLockManager> distLockManager) override { @@ -214,7 +221,7 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, CorrectDocumentsFetched) { onCommand([&](const RemoteCommandRequest& request) { return BSON("ok" << true); }); }); - ASSERT_OK(cloner.startClone(operationContext(), UUID::gen())); + ASSERT_OK(cloner.startClone(operationContext(), UUID::gen(), _lsid, _txnNumber)); futureStartClone.default_timed_get(); } @@ -312,7 +319,7 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, CollectionNotFound) { kDonorConnStr, kRecipientConnStr.getServers()[0]); - ASSERT_NOT_OK(cloner.startClone(operationContext(), UUID::gen())); + ASSERT_NOT_OK(cloner.startClone(operationContext(), UUID::gen(), _lsid, _txnNumber)); cloner.cancelClone(operationContext()); } @@ -325,7 +332,7 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, ShardKeyIndexNotFound) { kDonorConnStr, kRecipientConnStr.getServers()[0]); - ASSERT_NOT_OK(cloner.startClone(operationContext(), UUID::gen())); + ASSERT_NOT_OK(cloner.startClone(operationContext(), UUID::gen(), _lsid, _txnNumber)); cloner.cancelClone(operationContext()); } @@ -351,7 +358,8 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, FailedToEngageRecipientShard) { }); }); - auto startCloneStatus = cloner.startClone(operationContext(), UUID::gen()); + auto startCloneStatus = + cloner.startClone(operationContext(), UUID::gen(), _lsid, _txnNumber); ASSERT_EQ(ErrorCodes::NetworkTimeout, startCloneStatus.code()); futureStartClone.default_timed_get(); } diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index a44e6bdda24..a11bb49b8a4 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -42,6 +42,7 @@ #include "mongo/db/dbhelpers.h" #include "mongo/db/index/index_descriptor.h" #include "mongo/db/index_builds_coordinator.h" +#include "mongo/db/logical_session_id_helpers.h" #include "mongo/db/namespace_string.h" #include "mongo/db/op_observer.h" #include "mongo/db/operation_context.h" @@ -59,7 +60,9 @@ #include "mongo/db/s/start_chunk_clone_request.h" #include "mongo/db/server_options.h" #include "mongo/db/service_context.h" +#include "mongo/db/session_catalog_mongod.h" #include "mongo/db/storage/remove_saver.h" +#include "mongo/db/transaction_participant.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" @@ -327,7 +330,7 @@ BSONObj MigrationDestinationManager::getMigrationStatusReport() { Status MigrationDestinationManager::start(OperationContext* opCtx, const NamespaceString& nss, ScopedReceiveChunk scopedReceiveChunk, - const StartChunkCloneRequest cloneRequest, + const StartChunkCloneRequest& cloneRequest, const OID& epoch, const WriteConcernOptions& writeConcern) { stdx::lock_guard<Latch> lk(_mutex); @@ -357,6 +360,8 @@ Status MigrationDestinationManager::start(OperationContext* opCtx, cloneRequest.hasMigrationId()); _migrationId = cloneRequest.getMigrationId(); + _lsid = cloneRequest.getLsid(); + _txnNumber = cloneRequest.getTxnNumber(); } _nss = nss; @@ -756,21 +761,42 @@ void MigrationDestinationManager::cloneCollectionIndexesAndOptions( void MigrationDestinationManager::_migrateThread() { Client::initKillableThread("migrateThread", getGlobalServiceContext()); - auto opCtx = Client::getCurrent()->makeOperationContext(); + auto uniqueOpCtx = Client::getCurrent()->makeOperationContext(); + auto opCtx = uniqueOpCtx.get(); if (AuthorizationManager::get(opCtx->getServiceContext())->isAuthEnabled()) { - AuthorizationSession::get(opCtx->getClient())->grantInternalAuthorization(opCtx.get()); + AuthorizationSession::get(opCtx->getClient())->grantInternalAuthorization(opCtx); } try { - _migrateDriver(opCtx.get()); + // The outer OperationContext is used to hold the session checked out for the + // duration of the recipient's side of the migration. This guarantees that if the + // donor shard has failed over, then the new donor primary cannot bump the + // txnNumber on this session while this node is still executing the recipient side + //(which is important because otherwise, this node may create orphans after the + // range deletion task on this node has been processed). + if (_enableResumableRangeDeleter) { + opCtx->setLogicalSessionId(_lsid); + opCtx->setTxnNumber(_txnNumber); + + MongoDOperationContextSession sessionTxnState(opCtx); + + auto txnParticipant = TransactionParticipant::get(opCtx); + txnParticipant.beginOrContinue(opCtx, + *opCtx->getTxnNumber(), + boost::none /* autocommit */, + boost::none /* startTransaction */); + _migrateDriver(opCtx); + } else { + _migrateDriver(opCtx); + } } catch (...) { _setStateFail(str::stream() << "migrate failed: " << redact(exceptionToStatus())); } if (!_enableResumableRangeDeleter) { if (getState() != DONE) { - _forgetPending(opCtx.get(), ChunkRange(_min, _max)); + _forgetPending(opCtx, ChunkRange(_min, _max)); } } @@ -780,7 +806,7 @@ void MigrationDestinationManager::_migrateThread() { _isActiveCV.notify_all(); } -void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) { +void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) { invariant(isActive()); invariant(_sessionId); invariant(_scopedReceiveChunk); @@ -792,7 +818,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) { << " at epoch " << _epoch.toString() << " with session id " << *_sessionId; MoveTimingHelper timing( - opCtx, "to", _nss.ns(), _min, _max, 6 /* steps */, &_errmsg, ShardId(), ShardId()); + outerOpCtx, "to", _nss.ns(), _min, _max, 6 /* steps */, &_errmsg, ShardId(), ShardId()); const auto initialState = getState(); @@ -803,10 +829,11 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) { invariant(initialState == READY); - auto donorCollectionOptionsAndIndexes = getCollectionIndexesAndOptions(opCtx, _nss, _fromShard); + auto donorCollectionOptionsAndIndexes = + getCollectionIndexesAndOptions(outerOpCtx, _nss, _fromShard); auto fromShard = - uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, _fromShard)); + uassertStatusOK(Grid::get(outerOpCtx)->shardRegistry()->getShard(outerOpCtx, _fromShard)); { const ChunkRange range(_min, _max); @@ -815,20 +842,20 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) { // deleted. if (_enableResumableRangeDeleter) { while (migrationutil::checkForConflictingDeletions( - opCtx, range, donorCollectionOptionsAndIndexes.uuid)) { + outerOpCtx, range, donorCollectionOptionsAndIndexes.uuid)) { LOG(0) << "Migration paused because range overlaps with a " "range that is scheduled for deletion: collection: " << _nss.ns() << " range: " << redact(range.toString()); auto status = CollectionShardingRuntime::waitForClean( - opCtx, _nss, donorCollectionOptionsAndIndexes.uuid, range); + outerOpCtx, _nss, donorCollectionOptionsAndIndexes.uuid, range); if (!status.isOK()) { _setStateFail(redact(status.reason())); return; } - opCtx->sleepFor(Milliseconds(1000)); + outerOpCtx->sleepFor(Milliseconds(1000)); } RangeDeletionTask recipientDeletionTask(_migrationId, @@ -839,13 +866,13 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) { CleanWhenEnum::kNow); recipientDeletionTask.setPending(true); - migrationutil::persistRangeDeletionTaskLocally(opCtx, recipientDeletionTask); + migrationutil::persistRangeDeletionTaskLocally(outerOpCtx, recipientDeletionTask); } else { // Synchronously delete any data which might have been left orphaned in the range // being moved, and wait for completion - auto cleanupCompleteFuture = _notePending(opCtx, range); - auto cleanupStatus = cleanupCompleteFuture.getNoThrow(opCtx); + auto cleanupCompleteFuture = _notePending(outerOpCtx, range); + auto cleanupStatus = cleanupCompleteFuture.getNoThrow(outerOpCtx); // Wait for the range deletion to report back. Swallow // RangeDeletionAbandonedBecauseCollectionWithUUIDDoesNotExist error since the // collection could either never exist or get dropped directly from the shard after the @@ -859,7 +886,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) { // Wait for any other, overlapping queued deletions to drain cleanupStatus = CollectionShardingRuntime::waitForClean( - opCtx, _nss, donorCollectionOptionsAndIndexes.uuid, range); + outerOpCtx, _nss, donorCollectionOptionsAndIndexes.uuid, range); if (!cleanupStatus.isOK()) { _setStateFail(redact(cleanupStatus.reason())); return; @@ -870,6 +897,23 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) { migrateThreadHangAtStep1.pauseWhileSet(); } + // The conventional usage of retryable writes is to assign statement id's to all of + // the writes done as part of the data copying so that _recvChunkStart is + // conceptually a retryable write batch. However, we are using an alternate approach to do those + // writes under an AlternativeClientRegion because 1) threading the + // statement id's through to all the places where they are needed would make this code more + // complex, and 2) some of the operations, like creating the collection or building indexes, are + // not currently supported in retryable writes. + auto newClient = outerOpCtx->getServiceContext()->makeClient("MigrationCoordinator"); + { + stdx::lock_guard<Client> lk(*newClient.get()); + newClient->setSystemOperationKillable(lk); + } + + AlternativeClientRegion acr(newClient); + auto newOpCtxPtr = cc().makeOperationContext(); + auto opCtx = newOpCtxPtr.get(); + { cloneCollectionIndexesAndOptions(opCtx, _nss, donorCollectionOptionsAndIndexes); diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h index 6be51ce2dd8..5182ace7ea9 100644 --- a/src/mongo/db/s/migration_destination_manager.h +++ b/src/mongo/db/s/migration_destination_manager.h @@ -109,7 +109,7 @@ public: Status start(OperationContext* opCtx, const NamespaceString& nss, ScopedReceiveChunk scopedReceiveChunk, - StartChunkCloneRequest cloneRequest, + const StartChunkCloneRequest& cloneRequest, const OID& epoch, const WriteConcernOptions& writeConcern); @@ -207,6 +207,8 @@ private: bool _enableResumableRangeDeleter{true}; UUID _migrationId; + LogicalSessionId _lsid; + TxnNumber _txnNumber; NamespaceString _nss; ConnectionString _fromShardConnString; ShardId _fromShard; diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index e8cc631b5d2..0ea9147e6dd 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -39,6 +39,7 @@ #include "mongo/db/catalog_raii.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/logical_clock.h" +#include "mongo/db/logical_session_cache.h" #include "mongo/db/logical_session_id_helpers.h" #include "mongo/db/op_observer.h" #include "mongo/db/operation_context.h" @@ -259,6 +260,7 @@ Status MigrationSourceManager::startClone() { auto replEnabled = replCoord->isReplEnabled(); UUID migrationId = UUID::gen(); + _lsid = makeLogicalSessionId(_opCtx); { const auto metadata = _getCurrentMetadataAndCheckEpoch(); @@ -318,7 +320,7 @@ Status MigrationSourceManager::startClone() { _coordinator->startMigration(_opCtx, _args.getWaitForDelete()); } - Status startCloneStatus = _cloneDriver->startClone(_opCtx, migrationId); + Status startCloneStatus = _cloneDriver->startClone(_opCtx, migrationId, _lsid, TxnNumber{0}); if (!startCloneStatus.isOK()) { return startCloneStatus; } @@ -746,6 +748,8 @@ void MigrationSourceManager::_cleanup() { auto newOpCtx = newOpCtxPtr.get(); _coordinator->completeMigration(newOpCtx); } + + LogicalSessionCache::get(_opCtx)->endSessions({_lsid}); } _state = kDone; diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h index b0aef2c7327..72da23a8b40 100644 --- a/src/mongo/db/s/migration_source_manager.h +++ b/src/mongo/db/s/migration_source_manager.h @@ -270,6 +270,7 @@ private: BSONObj _recipientCloneCounts; boost::optional<CollectionCriticalSection> _critSec; + LogicalSessionId _lsid; }; } // namespace mongo diff --git a/src/mongo/db/s/start_chunk_clone_request.cpp b/src/mongo/db/s/start_chunk_clone_request.cpp index 883bca6b9f7..0047cc3fc47 100644 --- a/src/mongo/db/s/start_chunk_clone_request.cpp +++ b/src/mongo/db/s/start_chunk_clone_request.cpp @@ -43,6 +43,8 @@ const char kRecvChunkStart[] = "_recvChunkStart"; const char kFromShardConnectionString[] = "from"; // Note: The UUID parsing code relies on this field being named 'uuid'. const char kMigrationId[] = "uuid"; +const char kLsid[] = "lsid"; +const char kTxnNumber[] = "txnNumber"; const char kFromShardId[] = "fromShardName"; const char kToShardId[] = "toShardName"; const char kChunkMinKey[] = "min"; @@ -76,8 +78,12 @@ StatusWith<StartChunkCloneRequest> StartChunkCloneRequest::createFromCommand(Nam // TODO (SERVER-44787): Remove this existence check after 4.4 is released and the // disableResumableRangeDeleter option is removed. - if (obj.getField("uuid")) + if (obj.getField("uuid")) { request._migrationId = UUID::parse(obj); + request._lsid = LogicalSessionId::parse(IDLParserErrorContext("StartChunkCloneRequest"), + obj[kLsid].Obj()); + request._txnNumber = obj.getField(kTxnNumber).Long(); + } { std::string fromShardConnectionString; @@ -162,6 +168,8 @@ void StartChunkCloneRequest::appendAsCommand( BSONObjBuilder* builder, const NamespaceString& nss, const UUID& migrationId, + const LogicalSessionId& lsid, + TxnNumber txnNumber, const MigrationSessionId& sessionId, const ConnectionString& fromShardConnectionString, const ShardId& fromShardId, @@ -176,6 +184,8 @@ void StartChunkCloneRequest::appendAsCommand( builder->append(kRecvChunkStart, nss.ns()); migrationId.appendToBuilder(builder, kMigrationId); + builder->append(kLsid, lsid.toBSON()); + builder->append(kTxnNumber, txnNumber); sessionId.append(builder); builder->append(kFromShardConnectionString, fromShardConnectionString.toString()); builder->append(kFromShardId, fromShardId.toString()); diff --git a/src/mongo/db/s/start_chunk_clone_request.h b/src/mongo/db/s/start_chunk_clone_request.h index 826c6e94371..17cc08f4460 100644 --- a/src/mongo/db/s/start_chunk_clone_request.h +++ b/src/mongo/db/s/start_chunk_clone_request.h @@ -32,6 +32,7 @@ #include <string> #include "mongo/client/connection_string.h" +#include "mongo/db/logical_session_id.h" #include "mongo/db/namespace_string.h" #include "mongo/db/s/migration_session_id.h" #include "mongo/s/request_types/migration_secondary_throttle_options.h" @@ -62,6 +63,8 @@ public: static void appendAsCommand(BSONObjBuilder* builder, const NamespaceString& nss, const UUID& migrationId, + const LogicalSessionId& lsid, + TxnNumber txnNumber, const MigrationSessionId& sessionId, const ConnectionString& fromShardConnectionString, const ShardId& fromShardId, @@ -109,6 +112,14 @@ public: return *_migrationId; } + const LogicalSessionId& getLsid() const { + return _lsid; + } + + const TxnNumber getTxnNumber() const { + return _txnNumber; + } + const ShardId& getFromShardId() const { return _fromShardId; } @@ -148,6 +159,8 @@ private: * TODO (SERVER-44787): Make non-optional after 4.4 is released. */ boost::optional<UUID> _migrationId; + LogicalSessionId _lsid; + TxnNumber _txnNumber; // The session id of this migration MigrationSessionId _sessionId; diff --git a/src/mongo/db/s/start_chunk_clone_request_test.cpp b/src/mongo/db/s/start_chunk_clone_request_test.cpp index 0ff6f4ffb35..f498be0e366 100644 --- a/src/mongo/db/s/start_chunk_clone_request_test.cpp +++ b/src/mongo/db/s/start_chunk_clone_request_test.cpp @@ -33,7 +33,10 @@ #include "mongo/base/status_with.h" #include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/client.h" #include "mongo/db/jsobj.h" +#include "mongo/db/logical_session_id_helpers.h" +#include "mongo/db/service_context.h" #include "mongo/s/shard_id.h" #include "mongo/unittest/unittest.h" @@ -44,14 +47,22 @@ using unittest::assertGet; namespace { TEST(StartChunkCloneRequest, CreateAsCommandComplete) { + auto serviceContext = ServiceContext::make(); + auto client = serviceContext->makeClient("TestClient"); + auto opCtx = client->makeOperationContext(); + MigrationSessionId sessionId = MigrationSessionId::generate("shard0001", "shard0002"); UUID migrationId = UUID::gen(); + auto lsid = makeLogicalSessionId(opCtx.get()); + TxnNumber txnNumber = 0; BSONObjBuilder builder; StartChunkCloneRequest::appendAsCommand( &builder, NamespaceString("TestDB.TestColl"), migrationId, + lsid, + txnNumber, sessionId, assertGet(ConnectionString::parse("TestDonorRS/Donor1:12345,Donor2:12345,Donor3:12345")), ShardId("shard0001"), @@ -68,7 +79,9 @@ TEST(StartChunkCloneRequest, CreateAsCommandComplete) { ASSERT_EQ("TestDB.TestColl", request.getNss().ns()); ASSERT_EQ(sessionId.toString(), request.getSessionId().toString()); - ASSERT(migrationId == request.getMigrationId()); + ASSERT_EQ(migrationId, request.getMigrationId()); + ASSERT_EQ(lsid, request.getLsid()); + ASSERT_EQ(txnNumber, request.getTxnNumber()); ASSERT(sessionId.matches(request.getSessionId())); ASSERT_EQ( assertGet(ConnectionString::parse("TestDonorRS/Donor1:12345,Donor2:12345,Donor3:12345")) diff --git a/src/mongo/db/transaction_validation.cpp b/src/mongo/db/transaction_validation.cpp index c036569dc47..9342e07d1b9 100644 --- a/src/mongo/db/transaction_validation.cpp +++ b/src/mongo/db/transaction_validation.cpp @@ -45,11 +45,16 @@ using namespace fmt::literals; namespace { -const StringMap<int> retryableWriteCommands = { - {"delete", 1}, {"findandmodify", 1}, {"findAndModify", 1}, {"insert", 1}, {"update", 1}}; +const StringMap<int> retryableWriteCommands = {{"delete", 1}, + {"findandmodify", 1}, + {"findAndModify", 1}, + {"insert", 1}, + {"update", 1}, + {"_recvChunkStart", 1}}; // Commands that can be sent with session info but should not check out a session. -const StringMap<int> skipSessionCheckoutList = {{"coordinateCommitTransaction", 1}}; +const StringMap<int> skipSessionCheckoutList = {{"coordinateCommitTransaction", 1}, + {"_recvChunkStart", 1}}; const StringMap<int> transactionCommands = {{"commitTransaction", 1}, {"coordinateCommitTransaction", 1}, diff --git a/src/mongo/shell/shardingtest.js b/src/mongo/shell/shardingtest.js index 76de57c8751..8f13e2fbc9f 100644 --- a/src/mongo/shell/shardingtest.js +++ b/src/mongo/shell/shardingtest.js @@ -1727,15 +1727,16 @@ var ShardingTest = function(params) { // TODO SERVER-45108: Enable support for x509 auth for _flushRoutingTableCacheUpdates. if (!otherParams.manualAddShard && !x509AuthRequired) { for (let i = 0; i < numShards; i++) { + const keyFileLocal = + (otherParams.shards && otherParams.shards[i] && otherParams.shards[i].keyFile) + ? otherParams.shards[i].keyFile + : this.keyFile; + if (otherParams.rs || otherParams["rs" + i] || startShardsAsRS) { const rs = this._rs[i].test; - flushRT(rs.getPrimary(), rs.nodes, this.keyFile); + flushRT(rs.getPrimary(), rs.nodes, keyFileLocal); } else { // If specified, use the keyFile for the standalone shard. - const keyFileLocal = (otherParams.shards && otherParams.shards[i] && - otherParams.shards[i].keyFile) - ? otherParams.shards[i].keyFile - : this.keyFile; flushRT(this["shard" + i], this["shard" + i], keyFileLocal); } } |