summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/migration_destination_manager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/migration_destination_manager.cpp')
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp92
1 files changed, 69 insertions, 23 deletions
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index 27e767cd6a9..38d563023d5 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -161,6 +161,32 @@ bool opReplicatedEnough(OperationContext* txn,
return majorityStatus.isOK() && userStatus.isOK();
}
+/**
+ * Create the migration clone request BSON object to send to the source
+ * shard.
+ *
+ * 'sessionId' unique identifier for this migration.
+ */
+BSONObj createMigrateCloneRequest(const MigrationSessionId& sessionId) {
+ BSONObjBuilder builder;
+ builder.append("_migrateClone", 1);
+ sessionId.append(&builder);
+ return builder.obj();
+}
+
+/**
+ * Create the migration transfer mods request BSON object to send to the source
+ * shard.
+ *
+ * 'sessionId' unique identifier for this migration.
+ */
+BSONObj createTransferModsRequest(const MigrationSessionId& sessionId) {
+ BSONObjBuilder builder;
+ builder.append("_transferMods", 1);
+ sessionId.append(&builder);
+ return builder.obj();
+}
+
MONGO_FP_DECLARE(failMigrationReceivedOutOfRangeDelete);
} // namespace
@@ -174,13 +200,7 @@ MONGO_FP_DECLARE(migrateThreadHangAtStep4);
MONGO_FP_DECLARE(migrateThreadHangAtStep5);
-MigrationDestinationManager::MigrationDestinationManager()
- : _active(false),
- _numCloned(0),
- _clonedBytes(0),
- _numCatchup(0),
- _numSteady(0),
- _state(READY) {}
+MigrationDestinationManager::MigrationDestinationManager() = default;
MigrationDestinationManager::~MigrationDestinationManager() = default;
@@ -194,15 +214,19 @@ void MigrationDestinationManager::setState(State newState) {
_state = newState;
}
-bool MigrationDestinationManager::getActive() const {
+bool MigrationDestinationManager::isActive() const {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _active;
+ return _sessionId.is_initialized();
}
void MigrationDestinationManager::report(BSONObjBuilder& b) {
stdx::lock_guard<stdx::mutex> sl(_mutex);
- b.appendBool("active", _active);
+ b.appendBool("active", _sessionId.is_initialized());
+
+ if (_sessionId) {
+ b.append("sessionId", _sessionId->toString());
+ }
b.append("ns", _ns);
b.append("from", _from);
@@ -225,6 +249,7 @@ void MigrationDestinationManager::report(BSONObjBuilder& b) {
}
Status MigrationDestinationManager::start(const string& ns,
+ const MigrationSessionId& sessionId,
const string& fromShard,
const BSONObj& min,
const BSONObj& max,
@@ -233,7 +258,7 @@ Status MigrationDestinationManager::start(const string& ns,
const WriteConcernOptions& writeConcern) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_active) {
+ if (_sessionId) {
return Status(ErrorCodes::ConflictingOperationInProgress,
str::stream() << "Active migration already in progress "
<< "ns: " << _ns << ", from: " << _from << ", min: " << _min
@@ -254,7 +279,7 @@ Status MigrationDestinationManager::start(const string& ns,
_numCatchup = 0;
_numSteady = 0;
- _active = true;
+ _sessionId = sessionId;
// TODO: If we are here, the migrate thread must have completed, otherwise _active above would
// be false, so this would never block. There is no better place with the current implementation
@@ -263,9 +288,10 @@ Status MigrationDestinationManager::start(const string& ns,
_migrateThreadHandle.join();
}
- _migrateThreadHandle =
- stdx::thread([this, ns, min, max, shardKeyPattern, fromShard, epoch, writeConcern]() {
- _migrateThread(ns, min, max, shardKeyPattern, fromShard, epoch, writeConcern);
+ _migrateThreadHandle = stdx::thread(
+ [this, ns, sessionId, min, max, shardKeyPattern, fromShard, epoch, writeConcern]() {
+ _migrateThread(
+ ns, sessionId, min, max, shardKeyPattern, fromShard, epoch, writeConcern);
});
return Status::OK();
@@ -277,18 +303,31 @@ void MigrationDestinationManager::abort() {
_errmsg = "aborted";
}
-bool MigrationDestinationManager::startCommit() {
+bool MigrationDestinationManager::startCommit(const MigrationSessionId& sessionId) {
stdx::unique_lock<stdx::mutex> lock(_mutex);
if (_state != STEADY) {
return false;
}
+ // In STEADY state we must have active migration
+ invariant(_sessionId);
+
+ // This check guards against the (very unlikely) situation where the current donor shard has
+ // been stalled for some time, during which the recipient shard crashed or timed out and started
+ // serving as a recipient of chunks for another collection (note that it cannot be the same
+ // collection, because the old donor still holds the collection lock).
+ if (!_sessionId->matches(sessionId)) {
+ warning() << "startCommit received commit request from a stale session " << sessionId.toString()
+ << ". Current session is " << _sessionId->toString();
+ return false;
+ }
+
_state = COMMIT_START;
const auto deadline = stdx::chrono::system_clock::now() + Seconds(30);
- while (_active) {
+ while (_sessionId) {
if (stdx::cv_status::timeout == _isActiveCV.wait_until(lock, deadline)) {
_state = FAIL;
log() << "startCommit never finished!" << migrateLog;
@@ -305,6 +344,7 @@ bool MigrationDestinationManager::startCommit() {
}
void MigrationDestinationManager::_migrateThread(std::string ns,
+ MigrationSessionId sessionId,
BSONObj min,
BSONObj max,
BSONObj shardKeyPattern,
@@ -321,7 +361,8 @@ void MigrationDestinationManager::_migrateThread(std::string ns,
}
try {
- _migrateDriver(&txn, ns, min, max, shardKeyPattern, fromShard, epoch, writeConcern);
+ _migrateDriver(
+ &txn, ns, sessionId, min, max, shardKeyPattern, fromShard, epoch, writeConcern);
} catch (std::exception& e) {
{
stdx::lock_guard<stdx::mutex> sl(_mutex);
@@ -353,19 +394,20 @@ void MigrationDestinationManager::_migrateThread(std::string ns,
}
stdx::lock_guard<stdx::mutex> lk(_mutex);
- _active = false;
+ _sessionId = boost::none;
_isActiveCV.notify_all();
}
void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
const string& ns,
+ const MigrationSessionId& sessionId,
const BSONObj& min,
const BSONObj& max,
const BSONObj& shardKeyPattern,
const std::string& fromShard,
const OID& epoch,
const WriteConcernOptions& writeConcern) {
- invariant(getActive());
+ invariant(isActive());
invariant(getState() == READY);
invariant(!min.isEmpty());
invariant(!max.isEmpty());
@@ -563,10 +605,12 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
// 3. Initial bulk clone
setState(CLONE);
+ const BSONObj migrateCloneRequest = createMigrateCloneRequest(sessionId);
+
while (true) {
BSONObj res;
if (!conn->runCommand("admin",
- BSON("_migrateClone" << 1),
+ migrateCloneRequest,
res)) { // gets array of objects to copy, in disk order
setState(FAIL);
errmsg = "_migrateClone failed: ";
@@ -645,13 +689,15 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
// secondaries
repl::OpTime lastOpApplied = repl::ReplClientInfo::forClient(txn->getClient()).getLastOp();
+ const BSONObj xferModsRequest = createTransferModsRequest(sessionId);
+
{
// 4. Do bulk of mods
setState(CATCHUP);
while (true) {
BSONObj res;
- if (!conn->runCommand("admin", BSON("_transferMods" << 1), res)) {
+ if (!conn->runCommand("admin", xferModsRequest, res)) {
setState(FAIL);
errmsg = "_transferMods failed: ";
errmsg += res.toString();
@@ -752,7 +798,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
}
BSONObj res;
- if (!conn->runCommand("admin", BSON("_transferMods" << 1), res)) {
+ if (!conn->runCommand("admin", xferModsRequest, res)) {
log() << "_transferMods failed in STEADY state: " << res << migrateLog;
errmsg = res.toString();
setState(FAIL);