summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Taskov <alex.taskov@mongodb.com>2020-02-04 13:18:29 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-02-04 18:42:34 +0000
commit8d3429fed4e5307a9e79fe99494ccefc388e54f4 (patch)
tree615d6e74116fc32da0c1ac92227f22ad8f615f38
parent0eaddad5bb19553974910d4957d18e1ad790a504 (diff)
downloadmongo-8d3429fed4e5307a9e79fe99494ccefc388e54f4.tar.gz
SERVER-45339 Make MigrationSourceManager send _recvChunkStart to recipient shard as a retryable write
-rw-r--r--jstests/sharding/addshard5.js12
-rw-r--r--jstests/sharding/convert_to_and_from_sharded.js9
-rw-r--r--jstests/ssl/libs/ssl_helpers.js13
-rw-r--r--jstests/ssl/mixed_mode_sharded_transition.js9
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source.h6
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp6
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.h5
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp16
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp76
-rw-r--r--src/mongo/db/s/migration_destination_manager.h4
-rw-r--r--src/mongo/db/s/migration_source_manager.cpp6
-rw-r--r--src/mongo/db/s/migration_source_manager.h1
-rw-r--r--src/mongo/db/s/start_chunk_clone_request.cpp12
-rw-r--r--src/mongo/db/s/start_chunk_clone_request.h13
-rw-r--r--src/mongo/db/s/start_chunk_clone_request_test.cpp15
-rw-r--r--src/mongo/db/transaction_validation.cpp11
-rw-r--r--src/mongo/shell/shardingtest.js11
17 files changed, 177 insertions, 48 deletions
diff --git a/jstests/sharding/addshard5.js b/jstests/sharding/addshard5.js
index 7a2b6866c8c..bb14780b72d 100644
--- a/jstests/sharding/addshard5.js
+++ b/jstests/sharding/addshard5.js
@@ -29,8 +29,11 @@ assert.commandWorked(mongos.adminCommand(
assert.commandWorked(mongos.adminCommand({removeShard: st.shard1.shardName}));
assert.commandWorked(mongos.adminCommand({removeShard: st.shard1.shardName}));
-var shard2 = MongoRunner.runMongod({'shardsvr': ''});
-assert.commandWorked(mongos.adminCommand({addShard: shard2.host, name: st.shard1.shardName}));
+let shard2 = new ReplSetTest({nodes: 2, nodeOptions: {shardsvr: ""}});
+shard2.startSet();
+shard2.initiate();
+
+assert.commandWorked(mongos.adminCommand({addShard: shard2.getURL(), name: st.shard1.shardName}));
jsTest.log('Shard was dropped and re-added with same name...');
st.printShardingStatus();
@@ -39,8 +42,9 @@ st.printShardingStatus();
assert.commandWorked(
mongos.adminCommand({moveChunk: coll + '', find: {_id: 0}, to: st.shard1.shardName}));
-assert.eq('world', shard2.getCollection(coll + '').findOne().hello);
+let shard2Conn = shard2.getPrimary();
+assert.eq('world', shard2Conn.getCollection(coll + '').findOne().hello);
st.stop();
-MongoRunner.stopMongod(shard2);
+shard2.stopSet();
})();
diff --git a/jstests/sharding/convert_to_and_from_sharded.js b/jstests/sharding/convert_to_and_from_sharded.js
index f615a277195..0202e70e605 100644
--- a/jstests/sharding/convert_to_and_from_sharded.js
+++ b/jstests/sharding/convert_to_and_from_sharded.js
@@ -80,9 +80,12 @@ for (x = 0; x < 4; x++) {
assert.commandWorked(st.s.adminCommand({split: 'test.sharded', middle: {_id: x}}));
}
-var newMongod = MongoRunner.runMongod({shardsvr: ''});
+let newShard =
+ new ReplSetTest({name: "toRemoveLater", nodes: NUM_NODES, nodeOptions: {shardsvr: ""}});
+newShard.startSet();
+newShard.initiate();
-assert.commandWorked(st.s.adminCommand({addShard: newMongod.name, name: 'toRemoveLater'}));
+assert.commandWorked(st.s.adminCommand({addShard: newShard.getURL(), name: 'toRemoveLater'}));
for (x = 0; x < 2; x++) {
assert.commandWorked(
@@ -102,7 +105,7 @@ assert.soon(function() {
return res.state == 'completed';
});
-MongoRunner.stopMongod(newMongod);
+newShard.stopSet();
checkBasicCRUD(st.s.getDB('test').unsharded);
checkBasicCRUD(st.s.getDB('test').sharded);
diff --git a/jstests/ssl/libs/ssl_helpers.js b/jstests/ssl/libs/ssl_helpers.js
index db53fa7c687..65ba385b321 100644
--- a/jstests/ssl/libs/ssl_helpers.js
+++ b/jstests/ssl/libs/ssl_helpers.js
@@ -110,7 +110,7 @@ function testShardedLookup(shardingTest) {
* Takes in two mongod/mongos configuration options and runs a basic
* sharding test to see if they can work together...
*/
-function mixedShardTest(options1, options2, shouldSucceed) {
+function mixedShardTest(options1, options2, shouldSucceed, disableResumableRangeDeleter) {
let authSucceeded = false;
try {
// Start ShardingTest with enableBalancer because ShardingTest attempts to turn
@@ -123,12 +123,19 @@ function mixedShardTest(options1, options2, shouldSucceed) {
//
// Once SERVER-14017 is fixed the "enableBalancer" line can be removed.
// TODO: SERVER-43899 Make sharding_with_x509.js and mixed_mode_sharded_transition.js start
- // shards as replica sets.
+ // shards as replica sets and remove disableResumableRangeDeleter parameter.
+ let otherOptions = {enableBalancer: true};
+
+ if (disableResumableRangeDeleter) {
+ otherOptions.shardAsReplicaSet = false;
+ otherOptions.shardOptions = {setParameter: {"disableResumableRangeDeleter": true}};
+ }
+
var st = new ShardingTest({
mongos: [options1],
config: [options1],
shards: [options1, options2],
- other: {enableBalancer: true, shardAsReplicaSet: false}
+ other: otherOptions
});
// Create admin user in case the options include auth
diff --git a/jstests/ssl/mixed_mode_sharded_transition.js b/jstests/ssl/mixed_mode_sharded_transition.js
index 8f9136b3e39..f6b0d7bbaec 100644
--- a/jstests/ssl/mixed_mode_sharded_transition.js
+++ b/jstests/ssl/mixed_mode_sharded_transition.js
@@ -11,6 +11,8 @@ load('jstests/ssl/libs/ssl_helpers.js');
(function() {
'use strict';
+const disableResumableRangeDeleter = true;
+
var transitionToX509AllowSSL =
Object.merge(allowSSL, {transitionToAuth: '', clusterAuthMode: 'x509'});
var transitionToX509PreferSSL =
@@ -18,15 +20,16 @@ var transitionToX509PreferSSL =
var x509RequireSSL = Object.merge(requireSSL, {clusterAuthMode: 'x509'});
function testCombos(opt1, opt2, shouldSucceed) {
- mixedShardTest(opt1, opt2, shouldSucceed);
- mixedShardTest(opt2, opt1, shouldSucceed);
+ mixedShardTest(opt1, opt2, shouldSucceed, disableResumableRangeDeleter);
+ mixedShardTest(opt2, opt1, shouldSucceed, disableResumableRangeDeleter);
}
print('=== Testing transitionToAuth/allowSSL - transitionToAuth/preferSSL cluster ===');
testCombos(transitionToX509AllowSSL, transitionToX509PreferSSL, true);
print('=== Testing transitionToAuth/preferSSL - transitionToAuth/preferSSL cluster ===');
-mixedShardTest(transitionToX509PreferSSL, transitionToX509PreferSSL, true);
+mixedShardTest(
+ transitionToX509PreferSSL, transitionToX509PreferSSL, true, disableResumableRangeDeleter);
print('=== Testing transitionToAuth/preferSSL - x509/requireSSL cluster ===');
testCombos(transitionToX509PreferSSL, x509RequireSSL, true);
diff --git a/src/mongo/db/s/migration_chunk_cloner_source.h b/src/mongo/db/s/migration_chunk_cloner_source.h
index f59efc0f16b..cb4ebb53a29 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source.h
@@ -29,6 +29,7 @@
#pragma once
+#include "mongo/db/logical_session_id.h"
#include "mongo/util/time_support.h"
namespace mongo {
@@ -72,7 +73,10 @@ public:
* NOTE: Must be called without any locks and must succeed, before any other methods are called
* (except for cancelClone and [insert/update/delete]Op).
*/
- virtual Status startClone(OperationContext* opCtx, const UUID& migrationId) = 0;
+ virtual Status startClone(OperationContext* opCtx,
+ const UUID& migrationId,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber) = 0;
/**
* Blocking method, which uses some custom selected logic for deciding whether it is appropriate
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index b9dd891ed17..454fe2ac76b 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -238,7 +238,9 @@ MigrationChunkClonerSourceLegacy::~MigrationChunkClonerSourceLegacy() {
}
Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx,
- const UUID& migrationId) {
+ const UUID& migrationId,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber) {
invariant(_state == kNew);
invariant(!opCtx->lockState()->isLocked());
@@ -285,6 +287,8 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx,
StartChunkCloneRequest::appendAsCommand(&cmdBuilder,
_args.getNss(),
migrationId,
+ lsid,
+ txnNumber,
_sessionId,
_donorConnStr,
_args.getFromShardId(),
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
index a7236c136ce..d5f56e196ed 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
@@ -90,7 +90,10 @@ public:
HostAndPort recipientHost);
~MigrationChunkClonerSourceLegacy();
- Status startClone(OperationContext* opCtx, const UUID& migrationId) override;
+ Status startClone(OperationContext* opCtx,
+ const UUID& migrationId,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber) override;
Status awaitUntilCriticalSectionIsAppropriate(OperationContext* opCtx,
Milliseconds maxTimeToWait) override;
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp
index 89ca8742cfc..8750348e8cc 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp
@@ -32,6 +32,7 @@
#include "mongo/client/remote_command_targeter_mock.h"
#include "mongo/db/catalog_raii.h"
#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/logical_session_id_helpers.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/s/migration_chunk_cloner_source_legacy.h"
#include "mongo/s/catalog/sharding_catalog_client_mock.h"
@@ -101,6 +102,8 @@ protected:
clockSource->advance(Seconds(1));
operationContext()->getServiceContext()->setFastClockSource(std::move(clockSource));
+
+ _lsid = makeLogicalSessionId(operationContext());
}
void tearDown() override {
@@ -167,6 +170,10 @@ protected:
return BSON("_id" << value << "X" << value);
}
+protected:
+ LogicalSessionId _lsid;
+ TxnNumber _txnNumber{0};
+
private:
std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient(
std::unique_ptr<DistLockManager> distLockManager) override {
@@ -214,7 +221,7 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, CorrectDocumentsFetched) {
onCommand([&](const RemoteCommandRequest& request) { return BSON("ok" << true); });
});
- ASSERT_OK(cloner.startClone(operationContext(), UUID::gen()));
+ ASSERT_OK(cloner.startClone(operationContext(), UUID::gen(), _lsid, _txnNumber));
futureStartClone.default_timed_get();
}
@@ -312,7 +319,7 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, CollectionNotFound) {
kDonorConnStr,
kRecipientConnStr.getServers()[0]);
- ASSERT_NOT_OK(cloner.startClone(operationContext(), UUID::gen()));
+ ASSERT_NOT_OK(cloner.startClone(operationContext(), UUID::gen(), _lsid, _txnNumber));
cloner.cancelClone(operationContext());
}
@@ -325,7 +332,7 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, ShardKeyIndexNotFound) {
kDonorConnStr,
kRecipientConnStr.getServers()[0]);
- ASSERT_NOT_OK(cloner.startClone(operationContext(), UUID::gen()));
+ ASSERT_NOT_OK(cloner.startClone(operationContext(), UUID::gen(), _lsid, _txnNumber));
cloner.cancelClone(operationContext());
}
@@ -351,7 +358,8 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, FailedToEngageRecipientShard) {
});
});
- auto startCloneStatus = cloner.startClone(operationContext(), UUID::gen());
+ auto startCloneStatus =
+ cloner.startClone(operationContext(), UUID::gen(), _lsid, _txnNumber);
ASSERT_EQ(ErrorCodes::NetworkTimeout, startCloneStatus.code());
futureStartClone.default_timed_get();
}
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index a44e6bdda24..a11bb49b8a4 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -42,6 +42,7 @@
#include "mongo/db/dbhelpers.h"
#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/index_builds_coordinator.h"
+#include "mongo/db/logical_session_id_helpers.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/op_observer.h"
#include "mongo/db/operation_context.h"
@@ -59,7 +60,9 @@
#include "mongo/db/s/start_chunk_clone_request.h"
#include "mongo/db/server_options.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/session_catalog_mongod.h"
#include "mongo/db/storage/remove_saver.h"
+#include "mongo/db/transaction_participant.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
@@ -327,7 +330,7 @@ BSONObj MigrationDestinationManager::getMigrationStatusReport() {
Status MigrationDestinationManager::start(OperationContext* opCtx,
const NamespaceString& nss,
ScopedReceiveChunk scopedReceiveChunk,
- const StartChunkCloneRequest cloneRequest,
+ const StartChunkCloneRequest& cloneRequest,
const OID& epoch,
const WriteConcernOptions& writeConcern) {
stdx::lock_guard<Latch> lk(_mutex);
@@ -357,6 +360,8 @@ Status MigrationDestinationManager::start(OperationContext* opCtx,
cloneRequest.hasMigrationId());
_migrationId = cloneRequest.getMigrationId();
+ _lsid = cloneRequest.getLsid();
+ _txnNumber = cloneRequest.getTxnNumber();
}
_nss = nss;
@@ -756,21 +761,42 @@ void MigrationDestinationManager::cloneCollectionIndexesAndOptions(
void MigrationDestinationManager::_migrateThread() {
Client::initKillableThread("migrateThread", getGlobalServiceContext());
- auto opCtx = Client::getCurrent()->makeOperationContext();
+ auto uniqueOpCtx = Client::getCurrent()->makeOperationContext();
+ auto opCtx = uniqueOpCtx.get();
if (AuthorizationManager::get(opCtx->getServiceContext())->isAuthEnabled()) {
- AuthorizationSession::get(opCtx->getClient())->grantInternalAuthorization(opCtx.get());
+ AuthorizationSession::get(opCtx->getClient())->grantInternalAuthorization(opCtx);
}
try {
- _migrateDriver(opCtx.get());
+ // The outer OperationContext is used to hold the session checked out for the
+ // duration of the recipient's side of the migration. This guarantees that if the
+ // donor shard has failed over, then the new donor primary cannot bump the
+ // txnNumber on this session while this node is still executing the recipient side
+ //(which is important because otherwise, this node may create orphans after the
+ // range deletion task on this node has been processed).
+ if (_enableResumableRangeDeleter) {
+ opCtx->setLogicalSessionId(_lsid);
+ opCtx->setTxnNumber(_txnNumber);
+
+ MongoDOperationContextSession sessionTxnState(opCtx);
+
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ txnParticipant.beginOrContinue(opCtx,
+ *opCtx->getTxnNumber(),
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */);
+ _migrateDriver(opCtx);
+ } else {
+ _migrateDriver(opCtx);
+ }
} catch (...) {
_setStateFail(str::stream() << "migrate failed: " << redact(exceptionToStatus()));
}
if (!_enableResumableRangeDeleter) {
if (getState() != DONE) {
- _forgetPending(opCtx.get(), ChunkRange(_min, _max));
+ _forgetPending(opCtx, ChunkRange(_min, _max));
}
}
@@ -780,7 +806,7 @@ void MigrationDestinationManager::_migrateThread() {
_isActiveCV.notify_all();
}
-void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) {
+void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
invariant(isActive());
invariant(_sessionId);
invariant(_scopedReceiveChunk);
@@ -792,7 +818,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) {
<< " at epoch " << _epoch.toString() << " with session id " << *_sessionId;
MoveTimingHelper timing(
- opCtx, "to", _nss.ns(), _min, _max, 6 /* steps */, &_errmsg, ShardId(), ShardId());
+ outerOpCtx, "to", _nss.ns(), _min, _max, 6 /* steps */, &_errmsg, ShardId(), ShardId());
const auto initialState = getState();
@@ -803,10 +829,11 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) {
invariant(initialState == READY);
- auto donorCollectionOptionsAndIndexes = getCollectionIndexesAndOptions(opCtx, _nss, _fromShard);
+ auto donorCollectionOptionsAndIndexes =
+ getCollectionIndexesAndOptions(outerOpCtx, _nss, _fromShard);
auto fromShard =
- uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, _fromShard));
+ uassertStatusOK(Grid::get(outerOpCtx)->shardRegistry()->getShard(outerOpCtx, _fromShard));
{
const ChunkRange range(_min, _max);
@@ -815,20 +842,20 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) {
// deleted.
if (_enableResumableRangeDeleter) {
while (migrationutil::checkForConflictingDeletions(
- opCtx, range, donorCollectionOptionsAndIndexes.uuid)) {
+ outerOpCtx, range, donorCollectionOptionsAndIndexes.uuid)) {
LOG(0) << "Migration paused because range overlaps with a "
"range that is scheduled for deletion: collection: "
<< _nss.ns() << " range: " << redact(range.toString());
auto status = CollectionShardingRuntime::waitForClean(
- opCtx, _nss, donorCollectionOptionsAndIndexes.uuid, range);
+ outerOpCtx, _nss, donorCollectionOptionsAndIndexes.uuid, range);
if (!status.isOK()) {
_setStateFail(redact(status.reason()));
return;
}
- opCtx->sleepFor(Milliseconds(1000));
+ outerOpCtx->sleepFor(Milliseconds(1000));
}
RangeDeletionTask recipientDeletionTask(_migrationId,
@@ -839,13 +866,13 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) {
CleanWhenEnum::kNow);
recipientDeletionTask.setPending(true);
- migrationutil::persistRangeDeletionTaskLocally(opCtx, recipientDeletionTask);
+ migrationutil::persistRangeDeletionTaskLocally(outerOpCtx, recipientDeletionTask);
} else {
// Synchronously delete any data which might have been left orphaned in the range
// being moved, and wait for completion
- auto cleanupCompleteFuture = _notePending(opCtx, range);
- auto cleanupStatus = cleanupCompleteFuture.getNoThrow(opCtx);
+ auto cleanupCompleteFuture = _notePending(outerOpCtx, range);
+ auto cleanupStatus = cleanupCompleteFuture.getNoThrow(outerOpCtx);
// Wait for the range deletion to report back. Swallow
// RangeDeletionAbandonedBecauseCollectionWithUUIDDoesNotExist error since the
// collection could either never exist or get dropped directly from the shard after the
@@ -859,7 +886,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) {
// Wait for any other, overlapping queued deletions to drain
cleanupStatus = CollectionShardingRuntime::waitForClean(
- opCtx, _nss, donorCollectionOptionsAndIndexes.uuid, range);
+ outerOpCtx, _nss, donorCollectionOptionsAndIndexes.uuid, range);
if (!cleanupStatus.isOK()) {
_setStateFail(redact(cleanupStatus.reason()));
return;
@@ -870,6 +897,23 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) {
migrateThreadHangAtStep1.pauseWhileSet();
}
+ // The conventional usage of retryable writes is to assign statement id's to all of
+ // the writes done as part of the data copying so that _recvChunkStart is
+ // conceptually a retryable write batch. However, we are using an alternate approach to do those
+ // writes under an AlternativeClientRegion because 1) threading the
+ // statement id's through to all the places where they are needed would make this code more
+ // complex, and 2) some of the operations, like creating the collection or building indexes, are
+ // not currently supported in retryable writes.
+ auto newClient = outerOpCtx->getServiceContext()->makeClient("MigrationCoordinator");
+ {
+ stdx::lock_guard<Client> lk(*newClient.get());
+ newClient->setSystemOperationKillable(lk);
+ }
+
+ AlternativeClientRegion acr(newClient);
+ auto newOpCtxPtr = cc().makeOperationContext();
+ auto opCtx = newOpCtxPtr.get();
+
{
cloneCollectionIndexesAndOptions(opCtx, _nss, donorCollectionOptionsAndIndexes);
diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h
index 6be51ce2dd8..5182ace7ea9 100644
--- a/src/mongo/db/s/migration_destination_manager.h
+++ b/src/mongo/db/s/migration_destination_manager.h
@@ -109,7 +109,7 @@ public:
Status start(OperationContext* opCtx,
const NamespaceString& nss,
ScopedReceiveChunk scopedReceiveChunk,
- StartChunkCloneRequest cloneRequest,
+ const StartChunkCloneRequest& cloneRequest,
const OID& epoch,
const WriteConcernOptions& writeConcern);
@@ -207,6 +207,8 @@ private:
bool _enableResumableRangeDeleter{true};
UUID _migrationId;
+ LogicalSessionId _lsid;
+ TxnNumber _txnNumber;
NamespaceString _nss;
ConnectionString _fromShardConnString;
ShardId _fromShard;
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp
index e8cc631b5d2..0ea9147e6dd 100644
--- a/src/mongo/db/s/migration_source_manager.cpp
+++ b/src/mongo/db/s/migration_source_manager.cpp
@@ -39,6 +39,7 @@
#include "mongo/db/catalog_raii.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/logical_clock.h"
+#include "mongo/db/logical_session_cache.h"
#include "mongo/db/logical_session_id_helpers.h"
#include "mongo/db/op_observer.h"
#include "mongo/db/operation_context.h"
@@ -259,6 +260,7 @@ Status MigrationSourceManager::startClone() {
auto replEnabled = replCoord->isReplEnabled();
UUID migrationId = UUID::gen();
+ _lsid = makeLogicalSessionId(_opCtx);
{
const auto metadata = _getCurrentMetadataAndCheckEpoch();
@@ -318,7 +320,7 @@ Status MigrationSourceManager::startClone() {
_coordinator->startMigration(_opCtx, _args.getWaitForDelete());
}
- Status startCloneStatus = _cloneDriver->startClone(_opCtx, migrationId);
+ Status startCloneStatus = _cloneDriver->startClone(_opCtx, migrationId, _lsid, TxnNumber{0});
if (!startCloneStatus.isOK()) {
return startCloneStatus;
}
@@ -746,6 +748,8 @@ void MigrationSourceManager::_cleanup() {
auto newOpCtx = newOpCtxPtr.get();
_coordinator->completeMigration(newOpCtx);
}
+
+ LogicalSessionCache::get(_opCtx)->endSessions({_lsid});
}
_state = kDone;
diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h
index b0aef2c7327..72da23a8b40 100644
--- a/src/mongo/db/s/migration_source_manager.h
+++ b/src/mongo/db/s/migration_source_manager.h
@@ -270,6 +270,7 @@ private:
BSONObj _recipientCloneCounts;
boost::optional<CollectionCriticalSection> _critSec;
+ LogicalSessionId _lsid;
};
} // namespace mongo
diff --git a/src/mongo/db/s/start_chunk_clone_request.cpp b/src/mongo/db/s/start_chunk_clone_request.cpp
index 883bca6b9f7..0047cc3fc47 100644
--- a/src/mongo/db/s/start_chunk_clone_request.cpp
+++ b/src/mongo/db/s/start_chunk_clone_request.cpp
@@ -43,6 +43,8 @@ const char kRecvChunkStart[] = "_recvChunkStart";
const char kFromShardConnectionString[] = "from";
// Note: The UUID parsing code relies on this field being named 'uuid'.
const char kMigrationId[] = "uuid";
+const char kLsid[] = "lsid";
+const char kTxnNumber[] = "txnNumber";
const char kFromShardId[] = "fromShardName";
const char kToShardId[] = "toShardName";
const char kChunkMinKey[] = "min";
@@ -76,8 +78,12 @@ StatusWith<StartChunkCloneRequest> StartChunkCloneRequest::createFromCommand(Nam
// TODO (SERVER-44787): Remove this existence check after 4.4 is released and the
// disableResumableRangeDeleter option is removed.
- if (obj.getField("uuid"))
+ if (obj.getField("uuid")) {
request._migrationId = UUID::parse(obj);
+ request._lsid = LogicalSessionId::parse(IDLParserErrorContext("StartChunkCloneRequest"),
+ obj[kLsid].Obj());
+ request._txnNumber = obj.getField(kTxnNumber).Long();
+ }
{
std::string fromShardConnectionString;
@@ -162,6 +168,8 @@ void StartChunkCloneRequest::appendAsCommand(
BSONObjBuilder* builder,
const NamespaceString& nss,
const UUID& migrationId,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber,
const MigrationSessionId& sessionId,
const ConnectionString& fromShardConnectionString,
const ShardId& fromShardId,
@@ -176,6 +184,8 @@ void StartChunkCloneRequest::appendAsCommand(
builder->append(kRecvChunkStart, nss.ns());
migrationId.appendToBuilder(builder, kMigrationId);
+ builder->append(kLsid, lsid.toBSON());
+ builder->append(kTxnNumber, txnNumber);
sessionId.append(builder);
builder->append(kFromShardConnectionString, fromShardConnectionString.toString());
builder->append(kFromShardId, fromShardId.toString());
diff --git a/src/mongo/db/s/start_chunk_clone_request.h b/src/mongo/db/s/start_chunk_clone_request.h
index 826c6e94371..17cc08f4460 100644
--- a/src/mongo/db/s/start_chunk_clone_request.h
+++ b/src/mongo/db/s/start_chunk_clone_request.h
@@ -32,6 +32,7 @@
#include <string>
#include "mongo/client/connection_string.h"
+#include "mongo/db/logical_session_id.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/s/migration_session_id.h"
#include "mongo/s/request_types/migration_secondary_throttle_options.h"
@@ -62,6 +63,8 @@ public:
static void appendAsCommand(BSONObjBuilder* builder,
const NamespaceString& nss,
const UUID& migrationId,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber,
const MigrationSessionId& sessionId,
const ConnectionString& fromShardConnectionString,
const ShardId& fromShardId,
@@ -109,6 +112,14 @@ public:
return *_migrationId;
}
+ const LogicalSessionId& getLsid() const {
+ return _lsid;
+ }
+
+ const TxnNumber getTxnNumber() const {
+ return _txnNumber;
+ }
+
const ShardId& getFromShardId() const {
return _fromShardId;
}
@@ -148,6 +159,8 @@ private:
* TODO (SERVER-44787): Make non-optional after 4.4 is released.
*/
boost::optional<UUID> _migrationId;
+ LogicalSessionId _lsid;
+ TxnNumber _txnNumber;
// The session id of this migration
MigrationSessionId _sessionId;
diff --git a/src/mongo/db/s/start_chunk_clone_request_test.cpp b/src/mongo/db/s/start_chunk_clone_request_test.cpp
index 0ff6f4ffb35..f498be0e366 100644
--- a/src/mongo/db/s/start_chunk_clone_request_test.cpp
+++ b/src/mongo/db/s/start_chunk_clone_request_test.cpp
@@ -33,7 +33,10 @@
#include "mongo/base/status_with.h"
#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/client.h"
#include "mongo/db/jsobj.h"
+#include "mongo/db/logical_session_id_helpers.h"
+#include "mongo/db/service_context.h"
#include "mongo/s/shard_id.h"
#include "mongo/unittest/unittest.h"
@@ -44,14 +47,22 @@ using unittest::assertGet;
namespace {
TEST(StartChunkCloneRequest, CreateAsCommandComplete) {
+ auto serviceContext = ServiceContext::make();
+ auto client = serviceContext->makeClient("TestClient");
+ auto opCtx = client->makeOperationContext();
+
MigrationSessionId sessionId = MigrationSessionId::generate("shard0001", "shard0002");
UUID migrationId = UUID::gen();
+ auto lsid = makeLogicalSessionId(opCtx.get());
+ TxnNumber txnNumber = 0;
BSONObjBuilder builder;
StartChunkCloneRequest::appendAsCommand(
&builder,
NamespaceString("TestDB.TestColl"),
migrationId,
+ lsid,
+ txnNumber,
sessionId,
assertGet(ConnectionString::parse("TestDonorRS/Donor1:12345,Donor2:12345,Donor3:12345")),
ShardId("shard0001"),
@@ -68,7 +79,9 @@ TEST(StartChunkCloneRequest, CreateAsCommandComplete) {
ASSERT_EQ("TestDB.TestColl", request.getNss().ns());
ASSERT_EQ(sessionId.toString(), request.getSessionId().toString());
- ASSERT(migrationId == request.getMigrationId());
+ ASSERT_EQ(migrationId, request.getMigrationId());
+ ASSERT_EQ(lsid, request.getLsid());
+ ASSERT_EQ(txnNumber, request.getTxnNumber());
ASSERT(sessionId.matches(request.getSessionId()));
ASSERT_EQ(
assertGet(ConnectionString::parse("TestDonorRS/Donor1:12345,Donor2:12345,Donor3:12345"))
diff --git a/src/mongo/db/transaction_validation.cpp b/src/mongo/db/transaction_validation.cpp
index c036569dc47..9342e07d1b9 100644
--- a/src/mongo/db/transaction_validation.cpp
+++ b/src/mongo/db/transaction_validation.cpp
@@ -45,11 +45,16 @@ using namespace fmt::literals;
namespace {
-const StringMap<int> retryableWriteCommands = {
- {"delete", 1}, {"findandmodify", 1}, {"findAndModify", 1}, {"insert", 1}, {"update", 1}};
+const StringMap<int> retryableWriteCommands = {{"delete", 1},
+ {"findandmodify", 1},
+ {"findAndModify", 1},
+ {"insert", 1},
+ {"update", 1},
+ {"_recvChunkStart", 1}};
// Commands that can be sent with session info but should not check out a session.
-const StringMap<int> skipSessionCheckoutList = {{"coordinateCommitTransaction", 1}};
+const StringMap<int> skipSessionCheckoutList = {{"coordinateCommitTransaction", 1},
+ {"_recvChunkStart", 1}};
const StringMap<int> transactionCommands = {{"commitTransaction", 1},
{"coordinateCommitTransaction", 1},
diff --git a/src/mongo/shell/shardingtest.js b/src/mongo/shell/shardingtest.js
index 76de57c8751..8f13e2fbc9f 100644
--- a/src/mongo/shell/shardingtest.js
+++ b/src/mongo/shell/shardingtest.js
@@ -1727,15 +1727,16 @@ var ShardingTest = function(params) {
// TODO SERVER-45108: Enable support for x509 auth for _flushRoutingTableCacheUpdates.
if (!otherParams.manualAddShard && !x509AuthRequired) {
for (let i = 0; i < numShards; i++) {
+ const keyFileLocal =
+ (otherParams.shards && otherParams.shards[i] && otherParams.shards[i].keyFile)
+ ? otherParams.shards[i].keyFile
+ : this.keyFile;
+
if (otherParams.rs || otherParams["rs" + i] || startShardsAsRS) {
const rs = this._rs[i].test;
- flushRT(rs.getPrimary(), rs.nodes, this.keyFile);
+ flushRT(rs.getPrimary(), rs.nodes, keyFileLocal);
} else {
// If specified, use the keyFile for the standalone shard.
- const keyFileLocal = (otherParams.shards && otherParams.shards[i] &&
- otherParams.shards[i].keyFile)
- ? otherParams.shards[i].keyFile
- : this.keyFile;
flushRT(this["shard" + i], this["shard" + i], keyFileLocal);
}
}