summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDianna Hohensee <dianna.hohensee@10gen.com>2016-02-02 18:13:48 -0500
committerDianna Hohensee <dianna.hohensee@10gen.com>2016-02-02 19:06:15 -0500
commitc91b1ea56a2619a123876970229556013cea5d9a (patch)
tree608aca79591d3835af71ae02513eff27cedf4633
parent3d236611718ccd164335c0edc649f34868d0072c (diff)
downloadmongo-c91b1ea56a2619a123876970229556013cea5d9a.tar.gz
SERVER-20290 Introduce migration session id, with test
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_auth.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_auth_audit.yml1
-rw-r--r--jstests/sharding/donor_shard_abort_and_start_new_migration.js191
-rw-r--r--src/mongo/db/s/SConscript22
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp92
-rw-r--r--src/mongo/db/s/migration_destination_manager.h25
-rw-r--r--src/mongo/db/s/migration_impl.cpp17
-rw-r--r--src/mongo/db/s/migration_impl.h5
-rw-r--r--src/mongo/db/s/migration_session_id.cpp95
-rw-r--r--src/mongo/db/s/migration_session_id.h83
-rw-r--r--src/mongo/db/s/migration_session_id_test.cpp82
-rw-r--r--src/mongo/db/s/migration_source_manager.cpp49
-rw-r--r--src/mongo/db/s/migration_source_manager.h17
-rw-r--r--src/mongo/db/s/move_chunk_command.cpp7
-rw-r--r--src/mongo/s/d_migrate.cpp35
15 files changed, 658 insertions, 64 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_auth.yml b/buildscripts/resmokeconfig/suites/sharding_auth.yml
index a114442e28a..d99760820e7 100644
--- a/buildscripts/resmokeconfig/suites/sharding_auth.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_auth.yml
@@ -23,6 +23,7 @@ selector:
- jstests/sharding/cleanup_orphaned_cmd_during_movechunk_hashed.js # SERVER-21713
- jstests/sharding/migration_with_source_deletes.js # SERVER-21713
- jstests/sharding/migration_sets_fromMigrate_flag.js # SERVER-21713
+ - jstests/sharding/donor_shard_abort_and_start_new_migration.js # SERVER-21713
executor:
js_test:
diff --git a/buildscripts/resmokeconfig/suites/sharding_auth_audit.yml b/buildscripts/resmokeconfig/suites/sharding_auth_audit.yml
index b8845773d5c..61e50340b93 100644
--- a/buildscripts/resmokeconfig/suites/sharding_auth_audit.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_auth_audit.yml
@@ -23,6 +23,7 @@ selector:
- jstests/sharding/cleanup_orphaned_cmd_during_movechunk_hashed.js # SERVER-21713
- jstests/sharding/migration_with_source_deletes.js # SERVER-21713
- jstests/sharding/migration_sets_fromMigrate_flag.js # SERVER-21713
+ - jstests/sharding/donor_shard_abort_and_start_new_migration.js # SERVER-21713
executor:
js_test:
diff --git a/jstests/sharding/donor_shard_abort_and_start_new_migration.js b/jstests/sharding/donor_shard_abort_and_start_new_migration.js
new file mode 100644
index 00000000000..d2ee79ed5f0
--- /dev/null
+++ b/jstests/sharding/donor_shard_abort_and_start_new_migration.js
@@ -0,0 +1,191 @@
+//
+// This test validates that when a donor shard aborts a migration before it sends a commit to
+// the recipient and then starts a different migration, the original recipient cannot retrieve
+// transfer documents from the donor that are intended for a different shard. For more
+// information see SERVER-20290.
+//
+// The sequence of events is:
+// - Start shards 1, 2 and 3 (s0, s1, s2), create two sharded collections with the same shardkey,
+// each with 2 chunks on s0.
+// - Insert documents into each collection's 2 chunks, so all chunks have a document.
+// - On the recipient shard (s1) enable the migrateThreadHangAtStep3 failpoint to pause
+// the migration after document cloning.
+// - Start migration of the first collection's chunk from s0 to s1. The recipient shard s1 will
+// block when it reaches the failpoint, so execute the moveChunk command on a separate thread.
+// - Abort the migration on the donor shard, s0.
+// - On the other recipient shard (s2) enable the migrateThreadHangAtStep3 failpoint, to pause
+// the migration after document cloning.
+// - Start migration of the other collection's chunk from s0 to s2. The recipient shard s2 will
+// block when it reaches the failpoint, so execute the moveChunk command on a separate thread.
+// - Now insert 2 new documents in the chunk being moved to s2 so the migration's xfermods log is
+// populated, and unpause the migrateThreadHangAtStep3 on s1. This will cause s1 to resume
+// fetching documents from s0, and s0 will refuse s1 access to the xfermods after checking s1's
+// session ID, which no longer matches the current migration.
+//
+// This tests migration session IDs, the reason for which is explained in SERVER-20290.
+//
+
+load('./jstests/libs/chunk_manipulation_util.js');
+
+(function() {
+"use strict";
+
+var staticMongodFoo = MongoRunner.runMongod({}); // For startParallelOps.
+var staticMongodBaz = MongoRunner.runMongod({}); // For startParallelOps.
+
+/**
+ * Start up new sharded cluster, balancer defaults to off.
+ */
+
+var st = new ShardingTest({ shards : 3, mongos : 1 });
+
+var mongos = st.s0,
+ admin = mongos.getDB('admin'),
+ shards = mongos.getCollection('config.shards').find().toArray(),
+ dbName = "testDB",
+ fooNS = dbName + ".foo",
+ fooColl = mongos.getCollection(fooNS),
+ bazNS = dbName + ".baz",
+ bazColl = mongos.getCollection(bazNS),
+ donor = st.shard0,
+ fooRecipient = st.shard1,
+ bazRecipient = st.shard2,
+ fooDonorColl = donor.getCollection(fooNS),
+ bazDonorColl = donor.getCollection(bazNS),
+ fooRecipientColl = fooRecipient.getCollection(fooNS),
+ bazRecipientColl = bazRecipient.getCollection(bazNS);
+
+/**
+ * Enable sharding on both collections, and split each collection into two chunks.
+ */
+
+// Two chunks
+// Donor:
+// testDB.foo: [0, 10) [10, 20)
+// testDB.baz: [0, 10) [10, 20)
+// Recipient:
+assert.commandWorked(admin.runCommand({enableSharding: dbName}));
+st.ensurePrimaryShard(dbName, shards[0]._id);
+
+assert.commandWorked(admin.runCommand({shardCollection: fooNS, key: {a: 1}}));
+assert.commandWorked(admin.runCommand({split: fooNS, middle: {a: 10}}));
+assert.commandWorked(admin.runCommand({shardCollection: bazNS, key: {a: 1}}));
+assert.commandWorked(admin.runCommand({split: bazNS, middle: {a: 10}}));
+
+/**
+ * Insert one document into each of the chunks in the testDB.baz and testDB.foo collections.
+ */
+
+assert.writeOK(fooColl.insert({a: 0}));
+assert.writeOK(fooColl.insert({a: 10}));
+assert.eq(0, fooRecipientColl.count());
+assert.eq(2, fooDonorColl.count());
+assert.eq(2, fooColl.count());
+assert.writeOK(bazColl.insert({a: 0}));
+assert.writeOK(bazColl.insert({a: 10}));
+assert.eq(0, bazRecipientColl.count());
+assert.eq(2, bazDonorColl.count());
+assert.eq(2, bazColl.count());
+
+/**
+ * Set the failpoints. Both recipient shards will pause migration after cloning chunk
+ * data from donor, and before checking transfer mods log on donor. Pause the donor shard
+ * before it checks for interrupts to the migration.
+ */
+
+pauseMigrateAtStep(fooRecipient, migrateStepNames.cloned);
+pauseMigrateAtStep(bazRecipient, migrateStepNames.cloned);
+pauseMoveChunkAtStep(donor, moveChunkStepNames.startedMoveChunk);
+
+/**
+ * Start first moveChunk operation in the background: moving chunk [10, 20) in testDB.foo
+ * from donor to fooRecipient. This will move one document, {a: 10}. Migration will pause
+ * after cloning step (when it reaches the failpoint).
+ */
+
+// Donor: testDB.foo [10, 20) -> FooRecipient
+// testDB.foo: [0, 10)
+jsTest.log('Starting first migration of collection foo, pause after cloning...');
+var joinFooMoveChunk = moveChunkParallel(
+ staticMongodFoo,
+ st.s0.host,
+ {a: 10},
+ null,
+ fooColl.getFullName(),
+ shards[1]._id);
+waitForMigrateStep(fooRecipient, migrateStepNames.cloned);
+
+/**
+ * Abort the migration on the donor shard by finding and killing the operation by operation
+ * ID. Release the donor shard failpoint so that the donor shard can discover the migration
+ * has received a interrupt signal. The recipient shard, fooRecipient, which is currently
+ * paused, will not yet be aware that the migration has been aborted.
+ */
+
+jsTest.log('Abort donor shard migration of foo collection....');
+var inProgressOps = admin.currentOp().inprog;
+for (var op in inProgressOps) {
+ if (inProgressOps[op].query.moveChunk) {
+ admin.killOp(inProgressOps[op].opid);
+ jsTest.log("Killing migration with opid: " + inProgressOps[op].opid);
+ }
+}
+unpauseMoveChunkAtStep(donor, moveChunkStepNames.startedMoveChunk);
+
+/**
+ * Start second moveChunk operation in the background: moving chunk [10, 20) in testDB.baz
+ * from donor to bazRecipient. This will move one document, {a: 10}. Migration will pause
+ * after the recipient cloning step (when it reaches the failpoint).
+ */
+
+// Donor: testDB.baz [10, 20) -> BazRecipient
+// testDB.baz: [0, 10)
+jsTest.log('Starting second migration of collection baz, pause after cloning...');
+var joinBazMoveChunk = moveChunkParallel(
+ staticMongodBaz,
+ st.s0.host,
+ {a: 10},
+ null,
+ bazColl.getFullName(),
+ shards[2]._id);
+waitForMigrateStep(bazRecipient, migrateStepNames.cloned);
+
+/**
+ * Insert documents into testDB.baz collection's currently migrating chunk with range
+ * [10, 20) so as to populate the migration xfermods log.
+ */
+
+jsTest.log("Inserting 2 docs into donor shard's testDB.baz collection " +
+ "in the range of the currently migrating chunk....");
+assert.writeOK(bazColl.insert({a: 11}));
+assert.writeOK(bazColl.insert({a: 12}));
+assert.eq(4, bazColl.count(), "Failed to insert documents into baz collection!");
+
+/**
+ * Unpause fooRecipient (disable failpoint) and finish first migration, which should fail.
+ * FooRecipient will be attempting to access the donor shard's migration xfermods log,
+ * which has documents but for a different migration. FooRecipient will fail to get retrieve
+ * the documents, and abort the migration.
+ */
+
+jsTest.log('Finishing first migration, which should fail....');
+unpauseMigrateAtStep(fooRecipient, migrateStepNames.cloned);
+assert.throws(function() {
+ joinFooMoveChunk();
+});
+
+/**
+ * Unpause bazRecipient (disable failpoint) and finish second migration, which should
+ * succeed normally.
+ */
+
+jsTest.log('Finishing second migration, which should succeed....');
+unpauseMigrateAtStep(bazRecipient, migrateStepNames.cloned);
+joinBazMoveChunk();
+assert.eq(3, bazRecipientColl.count(), 'BazRecipient does not have 3 documents.');
+assert.eq(1, bazDonorColl.count(), 'Donor does not have 1 document in the baz collection.');
+
+jsTest.log('DONE!');
+st.stop();
+
+})(); \ No newline at end of file
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index d752c9fed0a..891a891c4db 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -20,6 +20,17 @@ env.Library(
)
env.Library(
+ target='migration_types',
+ source=[
+ 'migration_session_id.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/bson/util/bson_extract',
+ ],
+)
+
+env.Library(
target='sharding',
source=[
'migration_destination_manager.cpp',
@@ -34,6 +45,7 @@ env.Library(
'$BUILD_DIR/mongo/bson/util/bson_extract',
'$BUILD_DIR/mongo/db/common',
'$BUILD_DIR/mongo/s/sharding_initialization',
+ 'migration_types',
],
LIBDEPS_TAGS=[
# Depends on symbols defined in files in serverOnlyFiles, and has
@@ -62,6 +74,16 @@ env.Library(
)
env.CppUnitTest(
+ target='migration_types_test',
+ source=[
+ 'migration_session_id_test.cpp',
+ ],
+ LIBDEPS=[
+ 'migration_types',
+ ]
+)
+
+env.CppUnitTest(
target='metadata_test',
source=[
'metadata_loader_test.cpp',
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index 27e767cd6a9..38d563023d5 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -161,6 +161,32 @@ bool opReplicatedEnough(OperationContext* txn,
return majorityStatus.isOK() && userStatus.isOK();
}
+/**
+ * Create the migration clone request BSON object to send to the source
+ * shard.
+ *
+ * 'sessionId' unique identifier for this migration.
+ */
+BSONObj createMigrateCloneRequest(const MigrationSessionId& sessionId) {
+ BSONObjBuilder builder;
+ builder.append("_migrateClone", 1);
+ sessionId.append(&builder);
+ return builder.obj();
+}
+
+/**
+ * Create the migration transfer mods request BSON object to send to the source
+ * shard.
+ *
+ * 'sessionId' unique identifier for this migration.
+ */
+BSONObj createTransferModsRequest(const MigrationSessionId& sessionId) {
+ BSONObjBuilder builder;
+ builder.append("_transferMods", 1);
+ sessionId.append(&builder);
+ return builder.obj();
+}
+
MONGO_FP_DECLARE(failMigrationReceivedOutOfRangeDelete);
} // namespace
@@ -174,13 +200,7 @@ MONGO_FP_DECLARE(migrateThreadHangAtStep4);
MONGO_FP_DECLARE(migrateThreadHangAtStep5);
-MigrationDestinationManager::MigrationDestinationManager()
- : _active(false),
- _numCloned(0),
- _clonedBytes(0),
- _numCatchup(0),
- _numSteady(0),
- _state(READY) {}
+MigrationDestinationManager::MigrationDestinationManager() = default;
MigrationDestinationManager::~MigrationDestinationManager() = default;
@@ -194,15 +214,19 @@ void MigrationDestinationManager::setState(State newState) {
_state = newState;
}
-bool MigrationDestinationManager::getActive() const {
+bool MigrationDestinationManager::isActive() const {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _active;
+ return _sessionId.is_initialized();
}
void MigrationDestinationManager::report(BSONObjBuilder& b) {
stdx::lock_guard<stdx::mutex> sl(_mutex);
- b.appendBool("active", _active);
+ b.appendBool("active", _sessionId.is_initialized());
+
+ if (_sessionId) {
+ b.append("sessionId", _sessionId->toString());
+ }
b.append("ns", _ns);
b.append("from", _from);
@@ -225,6 +249,7 @@ void MigrationDestinationManager::report(BSONObjBuilder& b) {
}
Status MigrationDestinationManager::start(const string& ns,
+ const MigrationSessionId& sessionId,
const string& fromShard,
const BSONObj& min,
const BSONObj& max,
@@ -233,7 +258,7 @@ Status MigrationDestinationManager::start(const string& ns,
const WriteConcernOptions& writeConcern) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_active) {
+ if (_sessionId) {
return Status(ErrorCodes::ConflictingOperationInProgress,
str::stream() << "Active migration already in progress "
<< "ns: " << _ns << ", from: " << _from << ", min: " << _min
@@ -254,7 +279,7 @@ Status MigrationDestinationManager::start(const string& ns,
_numCatchup = 0;
_numSteady = 0;
- _active = true;
+ _sessionId = sessionId;
// TODO: If we are here, the migrate thread must have completed, otherwise _active above would
// be false, so this would never block. There is no better place with the current implementation
@@ -263,9 +288,10 @@ Status MigrationDestinationManager::start(const string& ns,
_migrateThreadHandle.join();
}
- _migrateThreadHandle =
- stdx::thread([this, ns, min, max, shardKeyPattern, fromShard, epoch, writeConcern]() {
- _migrateThread(ns, min, max, shardKeyPattern, fromShard, epoch, writeConcern);
+ _migrateThreadHandle = stdx::thread(
+ [this, ns, sessionId, min, max, shardKeyPattern, fromShard, epoch, writeConcern]() {
+ _migrateThread(
+ ns, sessionId, min, max, shardKeyPattern, fromShard, epoch, writeConcern);
});
return Status::OK();
@@ -277,18 +303,31 @@ void MigrationDestinationManager::abort() {
_errmsg = "aborted";
}
-bool MigrationDestinationManager::startCommit() {
+bool MigrationDestinationManager::startCommit(const MigrationSessionId& sessionId) {
stdx::unique_lock<stdx::mutex> lock(_mutex);
if (_state != STEADY) {
return false;
}
+ // In STEADY state we must have active migration
+ invariant(_sessionId);
+
+ // This check guards against the (very unlikely) situation where the current donor shard has
+ // been stalled for some time, during which the recipient shard crashed or timed out and started
+ // serving as a recipient of chunks for another collection (note that it cannot be the same
+ // collection, because the old donor still holds the collection lock).
+ if (!_sessionId->matches(sessionId)) {
+ warning() << "startCommit received commit request from a stale session " << sessionId.toString()
+ << ". Current session is " << _sessionId->toString();
+ return false;
+ }
+
_state = COMMIT_START;
const auto deadline = stdx::chrono::system_clock::now() + Seconds(30);
- while (_active) {
+ while (_sessionId) {
if (stdx::cv_status::timeout == _isActiveCV.wait_until(lock, deadline)) {
_state = FAIL;
log() << "startCommit never finished!" << migrateLog;
@@ -305,6 +344,7 @@ bool MigrationDestinationManager::startCommit() {
}
void MigrationDestinationManager::_migrateThread(std::string ns,
+ MigrationSessionId sessionId,
BSONObj min,
BSONObj max,
BSONObj shardKeyPattern,
@@ -321,7 +361,8 @@ void MigrationDestinationManager::_migrateThread(std::string ns,
}
try {
- _migrateDriver(&txn, ns, min, max, shardKeyPattern, fromShard, epoch, writeConcern);
+ _migrateDriver(
+ &txn, ns, sessionId, min, max, shardKeyPattern, fromShard, epoch, writeConcern);
} catch (std::exception& e) {
{
stdx::lock_guard<stdx::mutex> sl(_mutex);
@@ -353,19 +394,20 @@ void MigrationDestinationManager::_migrateThread(std::string ns,
}
stdx::lock_guard<stdx::mutex> lk(_mutex);
- _active = false;
+ _sessionId = boost::none;
_isActiveCV.notify_all();
}
void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
const string& ns,
+ const MigrationSessionId& sessionId,
const BSONObj& min,
const BSONObj& max,
const BSONObj& shardKeyPattern,
const std::string& fromShard,
const OID& epoch,
const WriteConcernOptions& writeConcern) {
- invariant(getActive());
+ invariant(isActive());
invariant(getState() == READY);
invariant(!min.isEmpty());
invariant(!max.isEmpty());
@@ -563,10 +605,12 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
// 3. Initial bulk clone
setState(CLONE);
+ const BSONObj migrateCloneRequest = createMigrateCloneRequest(sessionId);
+
while (true) {
BSONObj res;
if (!conn->runCommand("admin",
- BSON("_migrateClone" << 1),
+ migrateCloneRequest,
res)) { // gets array of objects to copy, in disk order
setState(FAIL);
errmsg = "_migrateClone failed: ";
@@ -645,13 +689,15 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
// secondaries
repl::OpTime lastOpApplied = repl::ReplClientInfo::forClient(txn->getClient()).getLastOp();
+ const BSONObj xferModsRequest = createTransferModsRequest(sessionId);
+
{
// 4. Do bulk of mods
setState(CATCHUP);
while (true) {
BSONObj res;
- if (!conn->runCommand("admin", BSON("_transferMods" << 1), res)) {
+ if (!conn->runCommand("admin", xferModsRequest, res)) {
setState(FAIL);
errmsg = "_transferMods failed: ";
errmsg += res.toString();
@@ -752,7 +798,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
}
BSONObj res;
- if (!conn->runCommand("admin", BSON("_transferMods" << 1), res)) {
+ if (!conn->runCommand("admin", xferModsRequest, res)) {
log() << "_transferMods failed in STEADY state: " << res << migrateLog;
errmsg = res.toString();
setState(FAIL);
diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h
index b349147f764..03c3d05d665 100644
--- a/src/mongo/db/s/migration_destination_manager.h
+++ b/src/mongo/db/s/migration_destination_manager.h
@@ -34,6 +34,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/bson/oid.h"
+#include "mongo/db/s/migration_session_id.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/thread.h"
@@ -64,7 +65,7 @@ public:
State getState() const;
void setState(State newState);
- bool getActive() const;
+ bool isActive() const;
/**
* Reports the state of the migration manager as a BSON document.
@@ -75,6 +76,7 @@ public:
* Returns OK if migration started successfully.
*/
Status start(const std::string& ns,
+ const MigrationSessionId& sessionId,
const std::string& fromShard,
const BSONObj& min,
const BSONObj& max,
@@ -84,13 +86,14 @@ public:
void abort();
- bool startCommit();
+ bool startCommit(const MigrationSessionId& sessionId);
private:
/**
* Thread which drives the migration apply process on the recipient side.
*/
void _migrateThread(std::string ns,
+ MigrationSessionId sessionId,
BSONObj min,
BSONObj max,
BSONObj shardKeyPattern,
@@ -100,6 +103,7 @@ private:
void _migrateDriver(OperationContext* txn,
const std::string& ns,
+ const MigrationSessionId& sessionId,
const BSONObj& min,
const BSONObj& max,
const BSONObj& shardKeyPattern,
@@ -125,9 +129,10 @@ private:
// Mutex to guard all fields
mutable stdx::mutex _mutex;
- // Whether the prepare method has been called along with a condition variable on which to wait
- // for prepare to be called
- bool _active;
+ // Migration session ID uniquely identifies the migration and indicates whether the prepare
+ // method has been called.
+ boost::optional<MigrationSessionId> _sessionId{boost::none};
+ // A condition variable on which to wait for the prepare method to be called.
stdx::condition_variable _isActiveCV;
stdx::thread _migrateThreadHandle;
@@ -139,12 +144,12 @@ private:
BSONObj _max;
BSONObj _shardKeyPattern;
- long long _numCloned;
- long long _clonedBytes;
- long long _numCatchup;
- long long _numSteady;
+ long long _numCloned{0};
+ long long _clonedBytes{0};
+ long long _numCatchup{0};
+ long long _numSteady{0};
- State _state;
+ State _state{READY};
std::string _errmsg;
};
diff --git a/src/mongo/db/s/migration_impl.cpp b/src/mongo/db/s/migration_impl.cpp
index 367bdc9e5c8..d33fc302d9b 100644
--- a/src/mongo/db/s/migration_impl.cpp
+++ b/src/mongo/db/s/migration_impl.cpp
@@ -80,6 +80,13 @@ WriteConcernOptions getDefaultWriteConcernForMigration() {
return WriteConcernOptions(1, WriteConcernOptions::NONE, 0);
}
+BSONObj createRecvChunkCommitRequest(const MigrationSessionId& sessionId) {
+ BSONObjBuilder builder;
+ builder.append("_recvChunkCommit", 1);
+ sessionId.append(&builder);
+ return builder.obj();
+}
+
MONGO_FP_DECLARE(failMigrationCommit);
MONGO_FP_DECLARE(hangBeforeLeavingCriticalSection);
MONGO_FP_DECLARE(failMigrationConfigWritePrepare);
@@ -276,7 +283,7 @@ ChunkMoveOperationState::acquireMoveMetadata() {
return &_distLockStatus->getValue();
}
-Status ChunkMoveOperationState::commitMigration() {
+Status ChunkMoveOperationState::commitMigration(const MigrationSessionId& sessionId) {
invariant(_distLockStatus.is_initialized());
invariant(_distLockStatus->isOK());
@@ -321,7 +328,7 @@ Status ChunkMoveOperationState::commitMigration() {
try {
ScopedDbConnection connTo(_toShardCS, 35.0);
- connTo->runCommand("admin", BSON("_recvChunkCommit" << 1), res);
+ connTo->runCommand("admin", createRecvChunkCommitRequest(sessionId), res);
connTo.done();
recvChunkCommitStatus = getStatusFromCommandResult(res);
} catch (const DBException& e) {
@@ -578,9 +585,11 @@ std::shared_ptr<CollectionMetadata> ChunkMoveOperationState::getCollMetadata() c
return _collMetadata;
}
-Status ChunkMoveOperationState::start(BSONObj shardKeyPattern) {
+Status ChunkMoveOperationState::start(const MigrationSessionId& sessionId,
+ const BSONObj& shardKeyPattern) {
auto migrationSourceManager = ShardingState::get(_txn)->migrationSourceManager();
- if (!migrationSourceManager->start(_txn, _nss.ns(), _minKey, _maxKey, shardKeyPattern)) {
+ if (!migrationSourceManager->start(
+ _txn, sessionId, _nss.ns(), _minKey, _maxKey, shardKeyPattern)) {
return {ErrorCodes::ConflictingOperationInProgress,
"Not starting chunk migration because another migration is already in progress "
"from this shard"};
diff --git a/src/mongo/db/s/migration_impl.h b/src/mongo/db/s/migration_impl.h
index a5d86e8d544..e3d451de2db 100644
--- a/src/mongo/db/s/migration_impl.h
+++ b/src/mongo/db/s/migration_impl.h
@@ -42,6 +42,7 @@
namespace mongo {
class CollectionMetadata;
+class MigrationSessionId;
class OperationContext;
template <typename T>
class StatusWith;
@@ -114,7 +115,7 @@ public:
/**
* Starts the move chunk operation.
*/
- Status start(BSONObj shardKeyPattern);
+ Status start(const MigrationSessionId& sessionId, const BSONObj& shardKeyPattern);
/**
* Implements the migration critical section. Needs to be invoked after all data has been moved
@@ -125,7 +126,7 @@ public:
* Since some migration failures are non-recoverable, it may also shut down the server on
* certain errors.
*/
- Status commitMigration();
+ Status commitMigration(const MigrationSessionId& sessionId);
const NamespaceString& getNss() const {
return _nss;
diff --git a/src/mongo/db/s/migration_session_id.cpp b/src/mongo/db/s/migration_session_id.cpp
new file mode 100644
index 00000000000..25a0bea7aeb
--- /dev/null
+++ b/src/mongo/db/s/migration_session_id.cpp
@@ -0,0 +1,95 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/s/migration_session_id.h"
+
+#include "mongo/base/status_with.h"
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/bson/oid.h"
+#include "mongo/bson/util/bson_extract.h"
+#include "mongo/util/mongoutils/str.h"
+
+namespace mongo {
+
+using std::string;
+
+namespace {
+
+// Field name, which the extractFromBSON method expects. Use this value if adding a migration
+// session to BSON.
+const char kFieldName[] = "sessionId";
+
+} // namespace
+
+MigrationSessionId MigrationSessionId::generate(StringData donor, StringData recipient) {
+ invariant(!donor.empty());
+ invariant(!recipient.empty());
+
+ return MigrationSessionId(str::stream() << donor << "_" << recipient << "_"
+ << OID::gen().toString());
+}
+
+StatusWith<MigrationSessionId> MigrationSessionId::extractFromBSON(const BSONObj& obj) {
+ string sessionId;
+ Status status = bsonExtractStringField(obj, kFieldName, &sessionId);
+ if (status.isOK()) {
+ return MigrationSessionId(sessionId);
+ } else if (status == ErrorCodes::NoSuchKey) {
+ return MigrationSessionId();
+ }
+
+ return status;
+}
+
+MigrationSessionId::MigrationSessionId() = default;
+
+MigrationSessionId::MigrationSessionId(std::string sessionId) {
+ invariant(!sessionId.empty());
+ _sessionId = std::move(sessionId);
+}
+
+bool MigrationSessionId::matches(const MigrationSessionId& other) const {
+ if (_sessionId && other._sessionId)
+ return *_sessionId == *other._sessionId;
+
+ return !_sessionId && !other._sessionId;
+}
+
+void MigrationSessionId::append(BSONObjBuilder* builder) const {
+ if (_sessionId) {
+ builder->append(kFieldName, *_sessionId);
+ }
+}
+
+std::string MigrationSessionId::toString() const {
+ return (_sessionId ? *_sessionId : "");
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/s/migration_session_id.h b/src/mongo/db/s/migration_session_id.h
new file mode 100644
index 00000000000..dbfde922c88
--- /dev/null
+++ b/src/mongo/db/s/migration_session_id.h
@@ -0,0 +1,83 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include <boost/optional.hpp>
+#include <string>
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/base/string_data.h"
+
+namespace mongo {
+
+class BSONObj;
+class BSONObjBuilder;
+template <typename T>
+class StatusWith;
+
+/**
+ * Encapsulates the logic for generating, parsing and comparing migration sessions. The migration
+ * session id is a unique identifier for a particular moveChunk command and is exchanged as part of
+ * all communication between the source and donor shards.
+ */
+class MigrationSessionId {
+public:
+ /**
+ * Constructs a new migration session identifier with the following format:
+ * DonorId_RecipientId_UniqueIdentifier
+ */
+ static MigrationSessionId generate(StringData donor, StringData recipient);
+
+ /**
+ * Extracts the session id from BSON. If the session id is missing from the BSON contents,
+ * returns an empty MigrationSessionId.
+ */
+ static StatusWith<MigrationSessionId> extractFromBSON(const BSONObj& obj);
+
+ /**
+ * Compares two session identifiers. Two idendifiers match if either both are empty (_sessionId
+ * is not set) or if the session ids match.
+ */
+ bool matches(const MigrationSessionId& other) const;
+
+ /**
+ * Appends the migration session id to the specified builder.
+ */
+ void append(BSONObjBuilder* builder) const;
+
+ std::string toString() const;
+
+private:
+ MigrationSessionId();
+ explicit MigrationSessionId(std::string sessionId);
+
+ boost::optional<std::string> _sessionId{boost::none};
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/s/migration_session_id_test.cpp b/src/mongo/db/s/migration_session_id_test.cpp
new file mode 100644
index 00000000000..24e26888c53
--- /dev/null
+++ b/src/mongo/db/s/migration_session_id_test.cpp
@@ -0,0 +1,82 @@
+/**
+ * Copyright (C) 2012 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/s/migration_session_id.h"
+
+#include "mongo/base/status_with.h"
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/jsobj.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+
+using unittest::assertGet;
+
+namespace {
+
+TEST(MigrationSessionId, GenerateAndExtract) {
+ MigrationSessionId origSessionId = MigrationSessionId::generate("Source", "Dest");
+
+ BSONObjBuilder builder;
+ origSessionId.append(&builder);
+ BSONObj obj = builder.obj();
+
+ MigrationSessionId sessionIdAfter = assertGet(MigrationSessionId::extractFromBSON(obj));
+ ASSERT(origSessionId.matches(sessionIdAfter));
+ ASSERT_EQ(origSessionId.toString(), sessionIdAfter.toString());
+}
+
+TEST(MigrationSessionId, Comparison) {
+ MigrationSessionId emptySessionId =
+ assertGet(MigrationSessionId::extractFromBSON(BSON("SomeField" << 1)));
+ MigrationSessionId nonEmptySessionId =
+ assertGet(MigrationSessionId::extractFromBSON(BSON("SomeField" << 1 << "sessionId"
+ << "TestSessionID")));
+
+ ASSERT(!emptySessionId.matches(nonEmptySessionId));
+ ASSERT(!nonEmptySessionId.matches(emptySessionId));
+
+ MigrationSessionId sessionIdToCompare =
+ assertGet(MigrationSessionId::extractFromBSON(BSON("SomeOtherField" << 1 << "sessionId"
+ << "TestSessionID")));
+ ASSERT(nonEmptySessionId.matches(sessionIdToCompare));
+ ASSERT(sessionIdToCompare.matches(nonEmptySessionId));
+}
+
+TEST(MigrationSessionId, ErrorWhenTypeIsNotString) {
+ ASSERT_NOT_OK(MigrationSessionId::extractFromBSON(
+ BSON("SomeField" << 1 << "sessionId" << Date_t::now())).getStatus());
+ ASSERT_NOT_OK(MigrationSessionId::extractFromBSON(BSON("SomeField" << 1 << "sessionId" << 2))
+ .getStatus());
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp
index 09680bae2a3..e65edff5711 100644
--- a/src/mongo/db/s/migration_source_manager.cpp
+++ b/src/mongo/db/s/migration_source_manager.cpp
@@ -162,6 +162,7 @@ MigrationSourceManager::MigrationSourceManager() = default;
MigrationSourceManager::~MigrationSourceManager() = default;
bool MigrationSourceManager::start(OperationContext* txn,
+ const MigrationSessionId& sessionId,
const std::string& ns,
const BSONObj& min,
const BSONObj& max,
@@ -176,7 +177,7 @@ bool MigrationSourceManager::start(OperationContext* txn,
stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_active) {
+ if (_sessionId) {
return false;
}
@@ -189,7 +190,7 @@ bool MigrationSourceManager::start(OperationContext* txn,
invariant(_reload.size() == 0);
invariant(_memoryUsed == 0);
- _active = true;
+ _sessionId = sessionId;
stdx::lock_guard<stdx::mutex> tLock(_cloneLocsMutex);
invariant(_cloneLocs.size() == 0);
@@ -206,7 +207,7 @@ void MigrationSourceManager::done(OperationContext* txn) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- _active = false;
+ _sessionId = boost::none;
_deleteNotifyExec.reset(NULL);
_inCriticalSection = false;
_inCriticalSectionCV.notify_all();
@@ -241,7 +242,7 @@ void MigrationSourceManager::logOp(OperationContext* txn,
dassert(txn->lockState()->isWriteLocked()); // Must have Global IX.
- if (!_active)
+ if (!_sessionId)
return;
if (_nss != ns)
@@ -289,7 +290,7 @@ void MigrationSourceManager::logOp(OperationContext* txn,
}
bool MigrationSourceManager::isInMigratingChunk(const NamespaceString& ns, const BSONObj& doc) {
- if (!_active)
+ if (!_sessionId)
return false;
if (ns != _nss)
return false;
@@ -297,6 +298,7 @@ bool MigrationSourceManager::isInMigratingChunk(const NamespaceString& ns, const
}
bool MigrationSourceManager::transferMods(OperationContext* txn,
+ const MigrationSessionId& sessionId,
string& errmsg,
BSONObjBuilder& b) {
long long size = 0;
@@ -305,11 +307,19 @@ bool MigrationSourceManager::transferMods(OperationContext* txn,
AutoGetCollectionForRead ctx(txn, _getNS());
stdx::lock_guard<stdx::mutex> sl(_mutex);
- if (!_active) {
+
+ if (!_sessionId) {
errmsg = "no active migration!";
return false;
}
+ if (!_sessionId->matches(sessionId)) {
+ errmsg = str::stream() << "requested migration session id " << sessionId.toString()
+ << " does not match active session id "
+ << _sessionId->toString();
+ return false;
+ }
+
// TODO: fix SERVER-16540 race
_xfer(txn, _nss.ns(), ctx.getDb(), &_deleted, b, "deleted", size, false);
_xfer(txn, _nss.ns(), ctx.getDb(), &_reload, b, "reload", size, true);
@@ -442,7 +452,10 @@ bool MigrationSourceManager::storeCurrentLocs(OperationContext* txn,
return true;
}
-bool MigrationSourceManager::clone(OperationContext* txn, string& errmsg, BSONObjBuilder& result) {
+bool MigrationSourceManager::clone(OperationContext* txn,
+ const MigrationSessionId& sessionId,
+ string& errmsg,
+ BSONObjBuilder& result) {
ElapsedTracker tracker(internalQueryExecYieldIterations, internalQueryExecYieldPeriodMS);
int allocSize = 0;
@@ -451,11 +464,19 @@ bool MigrationSourceManager::clone(OperationContext* txn, string& errmsg, BSONOb
AutoGetCollection autoColl(txn, _getNS(), MODE_IS);
stdx::lock_guard<stdx::mutex> sl(_mutex);
- if (!_active) {
+
+ if (!_sessionId) {
errmsg = "not active";
return false;
}
+ if (!_sessionId->matches(sessionId)) {
+ errmsg = str::stream() << "requested migration session id " << sessionId.toString()
+ << " does not match active session id "
+ << _sessionId->toString();
+ return false;
+ }
+
Collection* collection = autoColl.getCollection();
if (!collection) {
errmsg = str::stream() << "collection " << _nss.toString() << " does not exist";
@@ -473,11 +494,19 @@ bool MigrationSourceManager::clone(OperationContext* txn, string& errmsg, BSONOb
AutoGetCollection autoColl(txn, _getNS(), MODE_IS);
stdx::lock_guard<stdx::mutex> sl(_mutex);
- if (!_active) {
+
+ if (!_sessionId) {
errmsg = "not active";
return false;
}
+ if (!_sessionId->matches(sessionId)) {
+ errmsg = str::stream() << "migration session id changed from " << sessionId.toString()
+ << " to " << _sessionId->toString()
+ << " while initial clone was active";
+ return false;
+ }
+
// TODO: fix SERVER-16540 race
Collection* collection = autoColl.getCollection();
if (!collection) {
@@ -569,7 +598,7 @@ bool MigrationSourceManager::waitTillNotInCriticalSection(int maxSecondsToWait)
bool MigrationSourceManager::isActive() const {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _active;
+ return _sessionId.is_initialized();
}
void MigrationSourceManager::_xfer(OperationContext* txn,
diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h
index 076c14fa9f0..2677050a8d0 100644
--- a/src/mongo/db/s/migration_source_manager.h
+++ b/src/mongo/db/s/migration_source_manager.h
@@ -35,11 +35,11 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/s/migration_session_id.h"
#include "mongo/stdx/condition_variable.h"
namespace mongo {
-class BSONObj;
class Database;
class OperationContext;
class PlanExecutor;
@@ -57,6 +57,7 @@ public:
* already an existing migration in progress.
*/
bool start(OperationContext* txn,
+ const MigrationSessionId& sessionId,
const std::string& ns,
const BSONObj& min,
const BSONObj& max,
@@ -81,7 +82,10 @@ public:
* Called from the source of a migration process, this method transfers the accummulated local
* mods from source to destination.
*/
- bool transferMods(OperationContext* txn, std::string& errmsg, BSONObjBuilder& b);
+ bool transferMods(OperationContext* txn,
+ const MigrationSessionId& sessionId,
+ std::string& errmsg,
+ BSONObjBuilder& b);
/**
* Get the disklocs that belong to the chunk migrated and sort them in _cloneLocs (to avoid
@@ -98,7 +102,10 @@ public:
std::string& errmsg,
BSONObjBuilder& result);
- bool clone(OperationContext* txn, std::string& errmsg, BSONObjBuilder& result);
+ bool clone(OperationContext* txn,
+ const MigrationSessionId& sessionId,
+ std::string& errmsg,
+ BSONObjBuilder& result);
void aboutToDelete(const RecordId& dl);
@@ -170,8 +177,8 @@ private:
// Bytes in _reload + _deleted
long long _memoryUsed{0}; // (M)
- // If a migration is currently active.
- bool _active{false}; // (MG)
+ // Uniquely identifies a migration and indicates a migration is active when set.
+ boost::optional<MigrationSessionId> _sessionId{boost::none}; // (MG)
NamespaceString _nss; // (MG)
BSONObj _min; // (MG)
diff --git a/src/mongo/db/s/move_chunk_command.cpp b/src/mongo/db/s/move_chunk_command.cpp
index 12e93857473..1e1f14335d8 100644
--- a/src/mongo/db/s/move_chunk_command.cpp
+++ b/src/mongo/db/s/move_chunk_command.cpp
@@ -248,7 +248,9 @@ public:
// 3.
- auto moveChunkStartStatus = chunkMoveState.start(shardKeyPattern);
+ const auto migrationSessionId = MigrationSessionId::generate(chunkMoveState.getFromShard(),
+ chunkMoveState.getToShard());
+ auto moveChunkStartStatus = chunkMoveState.start(migrationSessionId, shardKeyPattern);
if (!moveChunkStartStatus.isOK()) {
warning() << moveChunkStartStatus.toString();
@@ -268,6 +270,7 @@ public:
BSONObjBuilder recvChunkStartBuilder;
recvChunkStartBuilder.append("_recvChunkStart", ns);
+ migrationSessionId.append(&recvChunkStartBuilder);
recvChunkStartBuilder.append("from", chunkMoveState.getFromShardCS().toString());
recvChunkStartBuilder.append("fromShardName", chunkMoveState.getFromShard());
recvChunkStartBuilder.append("toShardName", chunkMoveState.getToShard());
@@ -466,7 +469,7 @@ public:
return appendCommandStatus(result, Status(lockStatus.code(), msg));
}
- uassertStatusOK(chunkMoveState.commitMigration());
+ uassertStatusOK(chunkMoveState.commitMigration(migrationSessionId));
timing.done(5);
MONGO_FAIL_POINT_PAUSE_WHILE_SET(moveChunkHangAtStep5);
diff --git a/src/mongo/s/d_migrate.cpp b/src/mongo/s/d_migrate.cpp
index 9983ccd58df..370b50cb036 100644
--- a/src/mongo/s/d_migrate.cpp
+++ b/src/mongo/s/d_migrate.cpp
@@ -96,7 +96,10 @@ public:
int,
string& errmsg,
BSONObjBuilder& result) {
- return ShardingState::get(txn)->migrationSourceManager()->transferMods(txn, errmsg, result);
+ const MigrationSessionId migrationSessionid(
+ uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj)));
+ return ShardingState::get(txn)->migrationSourceManager()->transferMods(
+ txn, migrationSessionid, errmsg, result);
}
} transferModsCommand;
@@ -135,7 +138,10 @@ public:
int,
string& errmsg,
BSONObjBuilder& result) {
- return ShardingState::get(txn)->migrationSourceManager()->clone(txn, errmsg, result);
+ const MigrationSessionId migrationSessionid(
+ uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj)));
+ return ShardingState::get(txn)->migrationSourceManager()->clone(
+ txn, migrationSessionid, errmsg, result);
}
} initialCloneCommand;
@@ -210,7 +216,7 @@ public:
// Active state of TO-side migrations (MigrateStatus) is serialized by distributed
// collection lock.
- if (shardingState->migrationDestinationManager()->getActive()) {
+ if (shardingState->migrationDestinationManager()->isActive()) {
errmsg = "migrate already in progress";
return false;
}
@@ -293,9 +299,18 @@ public:
const string fromShard(cmdObj["from"].String());
- Status startStatus = shardingState->migrationDestinationManager()->start(
- ns, fromShard, min, max, shardKeyPattern, currentVersion.epoch(), writeConcern);
-
+ const MigrationSessionId migrationSessionId(
+ uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj)));
+
+ Status startStatus =
+ shardingState->migrationDestinationManager()->start(ns,
+ migrationSessionId,
+ fromShard,
+ min,
+ max,
+ shardKeyPattern,
+ currentVersion.epoch(),
+ writeConcern);
if (!startStatus.isOK()) {
return appendCommandStatus(result, startStatus);
}
@@ -341,7 +356,7 @@ public:
string& errmsg,
BSONObjBuilder& result) {
ShardingState::get(txn)->migrationDestinationManager()->report(result);
- return 1;
+ return true;
}
} recvChunkStatusCommand;
@@ -380,7 +395,11 @@ public:
int,
string& errmsg,
BSONObjBuilder& result) {
- bool ok = ShardingState::get(txn)->migrationDestinationManager()->startCommit();
+ const MigrationSessionId migrationSessionid(
+ uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj)));
+ const bool ok =
+ ShardingState::get(txn)->migrationDestinationManager()->startCommit(migrationSessionid);
+
ShardingState::get(txn)->migrationDestinationManager()->report(result);
return ok;
}