summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorDianna Hohensee <dianna.hohensee@10gen.com>2016-02-02 18:13:48 -0500
committerDianna Hohensee <dianna.hohensee@10gen.com>2016-02-02 18:13:48 -0500
commitd0ae5688ea3083d2916c2213a262ed0ec2cf6b4f (patch)
treec747b7ca687122e0cea9740eecfb46d682cc17ff /src/mongo
parent031193b825245e1d56d09e41b4c10652ef012579 (diff)
downloadmongo-d0ae5688ea3083d2916c2213a262ed0ec2cf6b4f.tar.gz
SERVER-20290 Introduce migration session id
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/s/SConscript22
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp92
-rw-r--r--src/mongo/db/s/migration_destination_manager.h25
-rw-r--r--src/mongo/db/s/migration_impl.cpp17
-rw-r--r--src/mongo/db/s/migration_impl.h5
-rw-r--r--src/mongo/db/s/migration_session_id.cpp95
-rw-r--r--src/mongo/db/s/migration_session_id.h83
-rw-r--r--src/mongo/db/s/migration_session_id_test.cpp82
-rw-r--r--src/mongo/db/s/migration_source_manager.cpp53
-rw-r--r--src/mongo/db/s/migration_source_manager.h17
-rw-r--r--src/mongo/db/s/move_chunk_command.cpp7
-rw-r--r--src/mongo/s/d_migrate.cpp35
12 files changed, 468 insertions, 65 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index d752c9fed0a..891a891c4db 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -20,6 +20,17 @@ env.Library(
)
env.Library(
+ target='migration_types',
+ source=[
+ 'migration_session_id.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/bson/util/bson_extract',
+ ],
+)
+
+env.Library(
target='sharding',
source=[
'migration_destination_manager.cpp',
@@ -34,6 +45,7 @@ env.Library(
'$BUILD_DIR/mongo/bson/util/bson_extract',
'$BUILD_DIR/mongo/db/common',
'$BUILD_DIR/mongo/s/sharding_initialization',
+ 'migration_types',
],
LIBDEPS_TAGS=[
# Depends on symbols defined in files in serverOnlyFiles, and has
@@ -62,6 +74,16 @@ env.Library(
)
env.CppUnitTest(
+ target='migration_types_test',
+ source=[
+ 'migration_session_id_test.cpp',
+ ],
+ LIBDEPS=[
+ 'migration_types',
+ ]
+)
+
+env.CppUnitTest(
target='metadata_test',
source=[
'metadata_loader_test.cpp',
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index c354ee5dd41..3fe6919a89a 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -161,6 +161,32 @@ bool opReplicatedEnough(OperationContext* txn,
return majorityStatus.isOK() && userStatus.isOK();
}
+/**
+ * Create the migration clone request BSON object to send to the source
+ * shard.
+ *
+ * 'sessionId' unique identifier for this migration.
+ */
+BSONObj createMigrateCloneRequest(const MigrationSessionId& sessionId) {
+ BSONObjBuilder builder;
+ builder.append("_migrateClone", 1);
+ sessionId.append(&builder);
+ return builder.obj();
+}
+
+/**
+ * Create the migration transfer mods request BSON object to send to the source
+ * shard.
+ *
+ * 'sessionId' unique identifier for this migration.
+ */
+BSONObj createTransferModsRequest(const MigrationSessionId& sessionId) {
+ BSONObjBuilder builder;
+ builder.append("_transferMods", 1);
+ sessionId.append(&builder);
+ return builder.obj();
+}
+
MONGO_FP_DECLARE(failMigrationReceivedOutOfRangeOperation);
} // namespace
@@ -174,13 +200,7 @@ MONGO_FP_DECLARE(migrateThreadHangAtStep4);
MONGO_FP_DECLARE(migrateThreadHangAtStep5);
-MigrationDestinationManager::MigrationDestinationManager()
- : _active(false),
- _numCloned(0),
- _clonedBytes(0),
- _numCatchup(0),
- _numSteady(0),
- _state(READY) {}
+MigrationDestinationManager::MigrationDestinationManager() = default;
MigrationDestinationManager::~MigrationDestinationManager() = default;
@@ -194,15 +214,19 @@ void MigrationDestinationManager::setState(State newState) {
_state = newState;
}
-bool MigrationDestinationManager::getActive() const {
+bool MigrationDestinationManager::isActive() const {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _active;
+ return _sessionId.is_initialized();
}
void MigrationDestinationManager::report(BSONObjBuilder& b) {
stdx::lock_guard<stdx::mutex> sl(_mutex);
- b.appendBool("active", _active);
+ b.appendBool("active", _sessionId.is_initialized());
+
+ if (_sessionId) {
+ b.append("sessionId", _sessionId->toString());
+ }
b.append("ns", _ns);
b.append("from", _from);
@@ -225,6 +249,7 @@ void MigrationDestinationManager::report(BSONObjBuilder& b) {
}
Status MigrationDestinationManager::start(const string& ns,
+ const MigrationSessionId& sessionId,
const string& fromShard,
const BSONObj& min,
const BSONObj& max,
@@ -233,7 +258,7 @@ Status MigrationDestinationManager::start(const string& ns,
const WriteConcernOptions& writeConcern) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_active) {
+ if (_sessionId) {
return Status(ErrorCodes::ConflictingOperationInProgress,
str::stream() << "Active migration already in progress "
<< "ns: " << _ns << ", from: " << _from << ", min: " << _min
@@ -254,7 +279,7 @@ Status MigrationDestinationManager::start(const string& ns,
_numCatchup = 0;
_numSteady = 0;
- _active = true;
+ _sessionId = sessionId;
// TODO: If we are here, the migrate thread must have completed, otherwise _active above would
// be false, so this would never block. There is no better place with the current implementation
@@ -263,9 +288,10 @@ Status MigrationDestinationManager::start(const string& ns,
_migrateThreadHandle.join();
}
- _migrateThreadHandle =
- stdx::thread([this, ns, min, max, shardKeyPattern, fromShard, epoch, writeConcern]() {
- _migrateThread(ns, min, max, shardKeyPattern, fromShard, epoch, writeConcern);
+ _migrateThreadHandle = stdx::thread(
+ [this, ns, sessionId, min, max, shardKeyPattern, fromShard, epoch, writeConcern]() {
+ _migrateThread(
+ ns, sessionId, min, max, shardKeyPattern, fromShard, epoch, writeConcern);
});
return Status::OK();
@@ -277,18 +303,31 @@ void MigrationDestinationManager::abort() {
_errmsg = "aborted";
}
-bool MigrationDestinationManager::startCommit() {
+bool MigrationDestinationManager::startCommit(const MigrationSessionId& sessionId) {
stdx::unique_lock<stdx::mutex> lock(_mutex);
if (_state != STEADY) {
return false;
}
+ // In STEADY state we must have active migration
+ invariant(_sessionId);
+
+ // This check guards against the (very unlikely) situation where the current donor shard has
+ // been stalled for some time, during which the recipient shard crashed or timed out and started
+ // serving as a recipient of chunks for another collection (note that it cannot be the same
+ // collection, because the old donor still holds the collection lock).
+ if (!_sessionId->matches(sessionId)) {
+ warning() << "startCommit received commit request from a stale session " << sessionId.toString()
+ << ". Current session is " << _sessionId->toString();
+ return false;
+ }
+
_state = COMMIT_START;
const auto deadline = stdx::chrono::system_clock::now() + Seconds(30);
- while (_active) {
+ while (_sessionId) {
if (stdx::cv_status::timeout == _isActiveCV.wait_until(lock, deadline)) {
_state = FAIL;
log() << "startCommit never finished!" << migrateLog;
@@ -305,6 +344,7 @@ bool MigrationDestinationManager::startCommit() {
}
void MigrationDestinationManager::_migrateThread(std::string ns,
+ MigrationSessionId sessionId,
BSONObj min,
BSONObj max,
BSONObj shardKeyPattern,
@@ -321,7 +361,8 @@ void MigrationDestinationManager::_migrateThread(std::string ns,
}
try {
- _migrateDriver(&txn, ns, min, max, shardKeyPattern, fromShard, epoch, writeConcern);
+ _migrateDriver(
+ &txn, ns, sessionId, min, max, shardKeyPattern, fromShard, epoch, writeConcern);
} catch (std::exception& e) {
{
stdx::lock_guard<stdx::mutex> sl(_mutex);
@@ -353,19 +394,20 @@ void MigrationDestinationManager::_migrateThread(std::string ns,
}
stdx::lock_guard<stdx::mutex> lk(_mutex);
- _active = false;
+ _sessionId = boost::none;
_isActiveCV.notify_all();
}
void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
const string& ns,
+ const MigrationSessionId& sessionId,
const BSONObj& min,
const BSONObj& max,
const BSONObj& shardKeyPattern,
const std::string& fromShard,
const OID& epoch,
const WriteConcernOptions& writeConcern) {
- invariant(getActive());
+ invariant(isActive());
invariant(getState() == READY);
invariant(!min.isEmpty());
invariant(!max.isEmpty());
@@ -563,10 +605,12 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
// 3. Initial bulk clone
setState(CLONE);
+ const BSONObj migrateCloneRequest = createMigrateCloneRequest(sessionId);
+
while (true) {
BSONObj res;
if (!conn->runCommand("admin",
- BSON("_migrateClone" << 1),
+ migrateCloneRequest,
res)) { // gets array of objects to copy, in disk order
setState(FAIL);
errmsg = "_migrateClone failed: ";
@@ -645,13 +689,15 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
// secondaries
repl::OpTime lastOpApplied = repl::ReplClientInfo::forClient(txn->getClient()).getLastOp();
+ const BSONObj xferModsRequest = createTransferModsRequest(sessionId);
+
{
// 4. Do bulk of mods
setState(CATCHUP);
while (true) {
BSONObj res;
- if (!conn->runCommand("admin", BSON("_transferMods" << 1), res)) {
+ if (!conn->runCommand("admin", xferModsRequest, res)) {
setState(FAIL);
errmsg = "_transferMods failed: ";
errmsg += res.toString();
@@ -752,7 +798,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
}
BSONObj res;
- if (!conn->runCommand("admin", BSON("_transferMods" << 1), res)) {
+ if (!conn->runCommand("admin", xferModsRequest, res)) {
log() << "_transferMods failed in STEADY state: " << res << migrateLog;
errmsg = res.toString();
setState(FAIL);
diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h
index b349147f764..03c3d05d665 100644
--- a/src/mongo/db/s/migration_destination_manager.h
+++ b/src/mongo/db/s/migration_destination_manager.h
@@ -34,6 +34,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/bson/oid.h"
+#include "mongo/db/s/migration_session_id.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/thread.h"
@@ -64,7 +65,7 @@ public:
State getState() const;
void setState(State newState);
- bool getActive() const;
+ bool isActive() const;
/**
* Reports the state of the migration manager as a BSON document.
@@ -75,6 +76,7 @@ public:
* Returns OK if migration started successfully.
*/
Status start(const std::string& ns,
+ const MigrationSessionId& sessionId,
const std::string& fromShard,
const BSONObj& min,
const BSONObj& max,
@@ -84,13 +86,14 @@ public:
void abort();
- bool startCommit();
+ bool startCommit(const MigrationSessionId& sessionId);
private:
/**
* Thread which drives the migration apply process on the recipient side.
*/
void _migrateThread(std::string ns,
+ MigrationSessionId sessionId,
BSONObj min,
BSONObj max,
BSONObj shardKeyPattern,
@@ -100,6 +103,7 @@ private:
void _migrateDriver(OperationContext* txn,
const std::string& ns,
+ const MigrationSessionId& sessionId,
const BSONObj& min,
const BSONObj& max,
const BSONObj& shardKeyPattern,
@@ -125,9 +129,10 @@ private:
// Mutex to guard all fields
mutable stdx::mutex _mutex;
- // Whether the prepare method has been called along with a condition variable on which to wait
- // for prepare to be called
- bool _active;
+ // Migration session ID uniquely identifies the migration and indicates whether the prepare
+ // method has been called.
+ boost::optional<MigrationSessionId> _sessionId{boost::none};
+ // A condition variable on which to wait for the prepare method to be called.
stdx::condition_variable _isActiveCV;
stdx::thread _migrateThreadHandle;
@@ -139,12 +144,12 @@ private:
BSONObj _max;
BSONObj _shardKeyPattern;
- long long _numCloned;
- long long _clonedBytes;
- long long _numCatchup;
- long long _numSteady;
+ long long _numCloned{0};
+ long long _clonedBytes{0};
+ long long _numCatchup{0};
+ long long _numSteady{0};
- State _state;
+ State _state{READY};
std::string _errmsg;
};
diff --git a/src/mongo/db/s/migration_impl.cpp b/src/mongo/db/s/migration_impl.cpp
index 367bdc9e5c8..d33fc302d9b 100644
--- a/src/mongo/db/s/migration_impl.cpp
+++ b/src/mongo/db/s/migration_impl.cpp
@@ -80,6 +80,13 @@ WriteConcernOptions getDefaultWriteConcernForMigration() {
return WriteConcernOptions(1, WriteConcernOptions::NONE, 0);
}
+BSONObj createRecvChunkCommitRequest(const MigrationSessionId& sessionId) {
+ BSONObjBuilder builder;
+ builder.append("_recvChunkCommit", 1);
+ sessionId.append(&builder);
+ return builder.obj();
+}
+
MONGO_FP_DECLARE(failMigrationCommit);
MONGO_FP_DECLARE(hangBeforeLeavingCriticalSection);
MONGO_FP_DECLARE(failMigrationConfigWritePrepare);
@@ -276,7 +283,7 @@ ChunkMoveOperationState::acquireMoveMetadata() {
return &_distLockStatus->getValue();
}
-Status ChunkMoveOperationState::commitMigration() {
+Status ChunkMoveOperationState::commitMigration(const MigrationSessionId& sessionId) {
invariant(_distLockStatus.is_initialized());
invariant(_distLockStatus->isOK());
@@ -321,7 +328,7 @@ Status ChunkMoveOperationState::commitMigration() {
try {
ScopedDbConnection connTo(_toShardCS, 35.0);
- connTo->runCommand("admin", BSON("_recvChunkCommit" << 1), res);
+ connTo->runCommand("admin", createRecvChunkCommitRequest(sessionId), res);
connTo.done();
recvChunkCommitStatus = getStatusFromCommandResult(res);
} catch (const DBException& e) {
@@ -578,9 +585,11 @@ std::shared_ptr<CollectionMetadata> ChunkMoveOperationState::getCollMetadata() c
return _collMetadata;
}
-Status ChunkMoveOperationState::start(BSONObj shardKeyPattern) {
+Status ChunkMoveOperationState::start(const MigrationSessionId& sessionId,
+ const BSONObj& shardKeyPattern) {
auto migrationSourceManager = ShardingState::get(_txn)->migrationSourceManager();
- if (!migrationSourceManager->start(_txn, _nss.ns(), _minKey, _maxKey, shardKeyPattern)) {
+ if (!migrationSourceManager->start(
+ _txn, sessionId, _nss.ns(), _minKey, _maxKey, shardKeyPattern)) {
return {ErrorCodes::ConflictingOperationInProgress,
"Not starting chunk migration because another migration is already in progress "
"from this shard"};
diff --git a/src/mongo/db/s/migration_impl.h b/src/mongo/db/s/migration_impl.h
index a5d86e8d544..e3d451de2db 100644
--- a/src/mongo/db/s/migration_impl.h
+++ b/src/mongo/db/s/migration_impl.h
@@ -42,6 +42,7 @@
namespace mongo {
class CollectionMetadata;
+class MigrationSessionId;
class OperationContext;
template <typename T>
class StatusWith;
@@ -114,7 +115,7 @@ public:
/**
* Starts the move chunk operation.
*/
- Status start(BSONObj shardKeyPattern);
+ Status start(const MigrationSessionId& sessionId, const BSONObj& shardKeyPattern);
/**
* Implements the migration critical section. Needs to be invoked after all data has been moved
@@ -125,7 +126,7 @@ public:
* Since some migration failures are non-recoverable, it may also shut down the server on
* certain errors.
*/
- Status commitMigration();
+ Status commitMigration(const MigrationSessionId& sessionId);
const NamespaceString& getNss() const {
return _nss;
diff --git a/src/mongo/db/s/migration_session_id.cpp b/src/mongo/db/s/migration_session_id.cpp
new file mode 100644
index 00000000000..25a0bea7aeb
--- /dev/null
+++ b/src/mongo/db/s/migration_session_id.cpp
@@ -0,0 +1,95 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/s/migration_session_id.h"
+
+#include "mongo/base/status_with.h"
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/bson/oid.h"
+#include "mongo/bson/util/bson_extract.h"
+#include "mongo/util/mongoutils/str.h"
+
+namespace mongo {
+
+using std::string;
+
+namespace {
+
+// Field name, which the extractFromBSON method expects. Use this value if adding a migration
+// session to BSON.
+const char kFieldName[] = "sessionId";
+
+} // namespace
+
+MigrationSessionId MigrationSessionId::generate(StringData donor, StringData recipient) {
+ invariant(!donor.empty());
+ invariant(!recipient.empty());
+
+ return MigrationSessionId(str::stream() << donor << "_" << recipient << "_"
+ << OID::gen().toString());
+}
+
+StatusWith<MigrationSessionId> MigrationSessionId::extractFromBSON(const BSONObj& obj) {
+ string sessionId;
+ Status status = bsonExtractStringField(obj, kFieldName, &sessionId);
+ if (status.isOK()) {
+ return MigrationSessionId(sessionId);
+ } else if (status == ErrorCodes::NoSuchKey) {
+ return MigrationSessionId();
+ }
+
+ return status;
+}
+
+MigrationSessionId::MigrationSessionId() = default;
+
+MigrationSessionId::MigrationSessionId(std::string sessionId) {
+ invariant(!sessionId.empty());
+ _sessionId = std::move(sessionId);
+}
+
+bool MigrationSessionId::matches(const MigrationSessionId& other) const {
+ if (_sessionId && other._sessionId)
+ return *_sessionId == *other._sessionId;
+
+ return !_sessionId && !other._sessionId;
+}
+
+void MigrationSessionId::append(BSONObjBuilder* builder) const {
+ if (_sessionId) {
+ builder->append(kFieldName, *_sessionId);
+ }
+}
+
+std::string MigrationSessionId::toString() const {
+ return (_sessionId ? *_sessionId : "");
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/s/migration_session_id.h b/src/mongo/db/s/migration_session_id.h
new file mode 100644
index 00000000000..dbfde922c88
--- /dev/null
+++ b/src/mongo/db/s/migration_session_id.h
@@ -0,0 +1,83 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include <boost/optional.hpp>
+#include <string>
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/base/string_data.h"
+
+namespace mongo {
+
+class BSONObj;
+class BSONObjBuilder;
+template <typename T>
+class StatusWith;
+
+/**
+ * Encapsulates the logic for generating, parsing and comparing migration sessions. The migration
+ * session id is a unique identifier for a particular moveChunk command and is exchanged as part of
+ * all communication between the source and donor shards.
+ */
+class MigrationSessionId {
+public:
+ /**
+ * Constructs a new migration session identifier with the following format:
+ * DonorId_RecipientId_UniqueIdentifier
+ */
+ static MigrationSessionId generate(StringData donor, StringData recipient);
+
+ /**
+ * Extracts the session id from BSON. If the session id is missing from the BSON contents,
+ * returns an empty MigrationSessionId.
+ */
+ static StatusWith<MigrationSessionId> extractFromBSON(const BSONObj& obj);
+
+ /**
+ * Compares two session identifiers. Two idendifiers match if either both are empty (_sessionId
+ * is not set) or if the session ids match.
+ */
+ bool matches(const MigrationSessionId& other) const;
+
+ /**
+ * Appends the migration session id to the specified builder.
+ */
+ void append(BSONObjBuilder* builder) const;
+
+ std::string toString() const;
+
+private:
+ MigrationSessionId();
+ explicit MigrationSessionId(std::string sessionId);
+
+ boost::optional<std::string> _sessionId{boost::none};
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/s/migration_session_id_test.cpp b/src/mongo/db/s/migration_session_id_test.cpp
new file mode 100644
index 00000000000..24e26888c53
--- /dev/null
+++ b/src/mongo/db/s/migration_session_id_test.cpp
@@ -0,0 +1,82 @@
+/**
+ * Copyright (C) 2012 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/s/migration_session_id.h"
+
+#include "mongo/base/status_with.h"
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/jsobj.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+
+using unittest::assertGet;
+
+namespace {
+
+TEST(MigrationSessionId, GenerateAndExtract) {
+ MigrationSessionId origSessionId = MigrationSessionId::generate("Source", "Dest");
+
+ BSONObjBuilder builder;
+ origSessionId.append(&builder);
+ BSONObj obj = builder.obj();
+
+ MigrationSessionId sessionIdAfter = assertGet(MigrationSessionId::extractFromBSON(obj));
+ ASSERT(origSessionId.matches(sessionIdAfter));
+ ASSERT_EQ(origSessionId.toString(), sessionIdAfter.toString());
+}
+
+TEST(MigrationSessionId, Comparison) {
+ MigrationSessionId emptySessionId =
+ assertGet(MigrationSessionId::extractFromBSON(BSON("SomeField" << 1)));
+ MigrationSessionId nonEmptySessionId =
+ assertGet(MigrationSessionId::extractFromBSON(BSON("SomeField" << 1 << "sessionId"
+ << "TestSessionID")));
+
+ ASSERT(!emptySessionId.matches(nonEmptySessionId));
+ ASSERT(!nonEmptySessionId.matches(emptySessionId));
+
+ MigrationSessionId sessionIdToCompare =
+ assertGet(MigrationSessionId::extractFromBSON(BSON("SomeOtherField" << 1 << "sessionId"
+ << "TestSessionID")));
+ ASSERT(nonEmptySessionId.matches(sessionIdToCompare));
+ ASSERT(sessionIdToCompare.matches(nonEmptySessionId));
+}
+
+TEST(MigrationSessionId, ErrorWhenTypeIsNotString) {
+ ASSERT_NOT_OK(MigrationSessionId::extractFromBSON(
+ BSON("SomeField" << 1 << "sessionId" << Date_t::now())).getStatus());
+ ASSERT_NOT_OK(MigrationSessionId::extractFromBSON(BSON("SomeField" << 1 << "sessionId" << 2))
+ .getStatus());
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp
index 6553e76b56e..642bcc6cf1f 100644
--- a/src/mongo/db/s/migration_source_manager.cpp
+++ b/src/mongo/db/s/migration_source_manager.cpp
@@ -162,6 +162,7 @@ MigrationSourceManager::MigrationSourceManager() = default;
MigrationSourceManager::~MigrationSourceManager() = default;
bool MigrationSourceManager::start(OperationContext* txn,
+ const MigrationSessionId& sessionId,
const std::string& ns,
const BSONObj& min,
const BSONObj& max,
@@ -176,7 +177,7 @@ bool MigrationSourceManager::start(OperationContext* txn,
stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_active) {
+ if (_sessionId) {
return false;
}
@@ -189,7 +190,7 @@ bool MigrationSourceManager::start(OperationContext* txn,
invariant(_reload.size() == 0);
invariant(_memoryUsed == 0);
- _active = true;
+ _sessionId = sessionId;
stdx::lock_guard<stdx::mutex> tLock(_cloneLocsMutex);
invariant(_cloneLocs.size() == 0);
@@ -206,7 +207,7 @@ void MigrationSourceManager::done(OperationContext* txn) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- _active = false;
+ _sessionId = boost::none;
_deleteNotifyExec.reset(NULL);
_inCriticalSection = false;
_inCriticalSectionCV.notify_all();
@@ -230,7 +231,7 @@ void MigrationSourceManager::logInsertOp(OperationContext* txn,
dassert(txn->lockState()->isWriteLocked()); // Must have Global IX.
- if (!_active || (_nss != ns))
+ if (!_sessionId || (_nss != ns))
return;
BSONElement idElement = obj["_id"];
@@ -259,7 +260,7 @@ void MigrationSourceManager::logUpdateOp(OperationContext* txn,
dassert(txn->lockState()->isWriteLocked()); // Must have Global IX.
- if (!_active || (_nss != ns))
+ if (!_sessionId || (_nss != ns))
return;
BSONElement idElement = updatedDoc["_id"];
@@ -300,14 +301,17 @@ void MigrationSourceManager::logDeleteOp(OperationContext* txn,
}
bool MigrationSourceManager::isInMigratingChunk(const NamespaceString& ns, const BSONObj& doc) {
- if (!_active)
+ if (!_sessionId)
return false;
+
if (ns != _nss)
return false;
+
return isInRange(doc, _min, _max, _shardKeyPattern);
}
bool MigrationSourceManager::transferMods(OperationContext* txn,
+ const MigrationSessionId& sessionId,
string& errmsg,
BSONObjBuilder& b) {
long long size = 0;
@@ -316,11 +320,19 @@ bool MigrationSourceManager::transferMods(OperationContext* txn,
AutoGetCollectionForRead ctx(txn, _getNS());
stdx::lock_guard<stdx::mutex> sl(_mutex);
- if (!_active) {
+
+ if (!_sessionId) {
errmsg = "no active migration!";
return false;
}
+ if (!_sessionId->matches(sessionId)) {
+ errmsg = str::stream() << "requested migration session id " << sessionId.toString()
+ << " does not match active session id "
+ << _sessionId->toString();
+ return false;
+ }
+
// TODO: fix SERVER-16540 race
_xfer(txn, _nss.ns(), ctx.getDb(), &_deleted, b, "deleted", size, false);
_xfer(txn, _nss.ns(), ctx.getDb(), &_reload, b, "reload", size, true);
@@ -452,7 +464,10 @@ bool MigrationSourceManager::storeCurrentLocs(OperationContext* txn,
return true;
}
-bool MigrationSourceManager::clone(OperationContext* txn, string& errmsg, BSONObjBuilder& result) {
+bool MigrationSourceManager::clone(OperationContext* txn,
+ const MigrationSessionId& sessionId,
+ string& errmsg,
+ BSONObjBuilder& result) {
ElapsedTracker tracker(internalQueryExecYieldIterations, internalQueryExecYieldPeriodMS);
int allocSize = 0;
@@ -461,11 +476,19 @@ bool MigrationSourceManager::clone(OperationContext* txn, string& errmsg, BSONOb
AutoGetCollection autoColl(txn, _getNS(), MODE_IS);
stdx::lock_guard<stdx::mutex> sl(_mutex);
- if (!_active) {
+
+ if (!_sessionId) {
errmsg = "not active";
return false;
}
+ if (!_sessionId->matches(sessionId)) {
+ errmsg = str::stream() << "requested migration session id " << sessionId.toString()
+ << " does not match active session id "
+ << _sessionId->toString();
+ return false;
+ }
+
Collection* collection = autoColl.getCollection();
if (!collection) {
errmsg = str::stream() << "collection " << _nss.toString() << " does not exist";
@@ -483,11 +506,19 @@ bool MigrationSourceManager::clone(OperationContext* txn, string& errmsg, BSONOb
AutoGetCollection autoColl(txn, _getNS(), MODE_IS);
stdx::lock_guard<stdx::mutex> sl(_mutex);
- if (!_active) {
+
+ if (!_sessionId) {
errmsg = "not active";
return false;
}
+ if (!_sessionId->matches(sessionId)) {
+ errmsg = str::stream() << "migration session id changed from " << sessionId.toString()
+ << " to " << _sessionId->toString()
+ << " while initial clone was active";
+ return false;
+ }
+
// TODO: fix SERVER-16540 race
Collection* collection = autoColl.getCollection();
if (!collection) {
@@ -579,7 +610,7 @@ bool MigrationSourceManager::waitTillNotInCriticalSection(int maxSecondsToWait)
bool MigrationSourceManager::isActive() const {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _active;
+ return _sessionId.is_initialized();
}
void MigrationSourceManager::_xfer(OperationContext* txn,
diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h
index 3f3c662e981..4d065fbf8d2 100644
--- a/src/mongo/db/s/migration_source_manager.h
+++ b/src/mongo/db/s/migration_source_manager.h
@@ -35,11 +35,11 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/s/migration_session_id.h"
#include "mongo/stdx/condition_variable.h"
namespace mongo {
-class BSONObj;
class Database;
class OperationContext;
class PlanExecutor;
@@ -57,6 +57,7 @@ public:
* already an existing migration in progress.
*/
bool start(OperationContext* txn,
+ const MigrationSessionId& sessionId,
const std::string& ns,
const BSONObj& min,
const BSONObj& max,
@@ -104,7 +105,10 @@ public:
* Called from the source of a migration process, this method transfers the accummulated local
* mods from source to destination.
*/
- bool transferMods(OperationContext* txn, std::string& errmsg, BSONObjBuilder& b);
+ bool transferMods(OperationContext* txn,
+ const MigrationSessionId& sessionId,
+ std::string& errmsg,
+ BSONObjBuilder& b);
/**
* Get the disklocs that belong to the chunk migrated and sort them in _cloneLocs (to avoid
@@ -121,7 +125,10 @@ public:
std::string& errmsg,
BSONObjBuilder& result);
- bool clone(OperationContext* txn, std::string& errmsg, BSONObjBuilder& result);
+ bool clone(OperationContext* txn,
+ const MigrationSessionId& sessionId,
+ std::string& errmsg,
+ BSONObjBuilder& result);
void aboutToDelete(const RecordId& dl);
@@ -193,8 +200,8 @@ private:
// Bytes in _reload + _deleted
long long _memoryUsed{0}; // (M)
- // If a migration is currently active.
- bool _active{false}; // (MG)
+ // Uniquely identifies a migration and indicates a migration is active when set.
+ boost::optional<MigrationSessionId> _sessionId{boost::none}; // (MG)
NamespaceString _nss; // (MG)
BSONObj _min; // (MG)
diff --git a/src/mongo/db/s/move_chunk_command.cpp b/src/mongo/db/s/move_chunk_command.cpp
index 12e93857473..1e1f14335d8 100644
--- a/src/mongo/db/s/move_chunk_command.cpp
+++ b/src/mongo/db/s/move_chunk_command.cpp
@@ -248,7 +248,9 @@ public:
// 3.
- auto moveChunkStartStatus = chunkMoveState.start(shardKeyPattern);
+ const auto migrationSessionId = MigrationSessionId::generate(chunkMoveState.getFromShard(),
+ chunkMoveState.getToShard());
+ auto moveChunkStartStatus = chunkMoveState.start(migrationSessionId, shardKeyPattern);
if (!moveChunkStartStatus.isOK()) {
warning() << moveChunkStartStatus.toString();
@@ -268,6 +270,7 @@ public:
BSONObjBuilder recvChunkStartBuilder;
recvChunkStartBuilder.append("_recvChunkStart", ns);
+ migrationSessionId.append(&recvChunkStartBuilder);
recvChunkStartBuilder.append("from", chunkMoveState.getFromShardCS().toString());
recvChunkStartBuilder.append("fromShardName", chunkMoveState.getFromShard());
recvChunkStartBuilder.append("toShardName", chunkMoveState.getToShard());
@@ -466,7 +469,7 @@ public:
return appendCommandStatus(result, Status(lockStatus.code(), msg));
}
- uassertStatusOK(chunkMoveState.commitMigration());
+ uassertStatusOK(chunkMoveState.commitMigration(migrationSessionId));
timing.done(5);
MONGO_FAIL_POINT_PAUSE_WHILE_SET(moveChunkHangAtStep5);
diff --git a/src/mongo/s/d_migrate.cpp b/src/mongo/s/d_migrate.cpp
index 89f300067ba..b7647aac7fc 100644
--- a/src/mongo/s/d_migrate.cpp
+++ b/src/mongo/s/d_migrate.cpp
@@ -96,7 +96,10 @@ public:
int,
string& errmsg,
BSONObjBuilder& result) {
- return ShardingState::get(txn)->migrationSourceManager()->transferMods(txn, errmsg, result);
+ const MigrationSessionId migrationSessionid(
+ uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj)));
+ return ShardingState::get(txn)->migrationSourceManager()->transferMods(
+ txn, migrationSessionid, errmsg, result);
}
} transferModsCommand;
@@ -135,7 +138,10 @@ public:
int,
string& errmsg,
BSONObjBuilder& result) {
- return ShardingState::get(txn)->migrationSourceManager()->clone(txn, errmsg, result);
+ const MigrationSessionId migrationSessionid(
+ uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj)));
+ return ShardingState::get(txn)->migrationSourceManager()->clone(
+ txn, migrationSessionid, errmsg, result);
}
} initialCloneCommand;
@@ -210,7 +216,7 @@ public:
// Active state of TO-side migrations (MigrateStatus) is serialized by distributed
// collection lock.
- if (shardingState->migrationDestinationManager()->getActive()) {
+ if (shardingState->migrationDestinationManager()->isActive()) {
errmsg = "migrate already in progress";
return false;
}
@@ -275,9 +281,18 @@ public:
const string fromShard(cmdObj["from"].String());
- Status startStatus = shardingState->migrationDestinationManager()->start(
- ns, fromShard, min, max, shardKeyPattern, currentVersion.epoch(), writeConcern);
-
+ const MigrationSessionId migrationSessionId(
+ uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj)));
+
+ Status startStatus =
+ shardingState->migrationDestinationManager()->start(ns,
+ migrationSessionId,
+ fromShard,
+ min,
+ max,
+ shardKeyPattern,
+ currentVersion.epoch(),
+ writeConcern);
if (!startStatus.isOK()) {
return appendCommandStatus(result, startStatus);
}
@@ -323,7 +338,7 @@ public:
string& errmsg,
BSONObjBuilder& result) {
ShardingState::get(txn)->migrationDestinationManager()->report(result);
- return 1;
+ return true;
}
} recvChunkStatusCommand;
@@ -362,7 +377,11 @@ public:
int,
string& errmsg,
BSONObjBuilder& result) {
- bool ok = ShardingState::get(txn)->migrationDestinationManager()->startCommit();
+ const MigrationSessionId migrationSessionid(
+ uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj)));
+ const bool ok =
+ ShardingState::get(txn)->migrationDestinationManager()->startCommit(migrationSessionid);
+
ShardingState::get(txn)->migrationDestinationManager()->report(result);
return ok;
}