diff options
author | Dianna Hohensee <dianna.hohensee@10gen.com> | 2016-02-02 18:13:48 -0500 |
---|---|---|
committer | Dianna Hohensee <dianna.hohensee@10gen.com> | 2016-02-02 18:13:48 -0500 |
commit | d0ae5688ea3083d2916c2213a262ed0ec2cf6b4f (patch) | |
tree | c747b7ca687122e0cea9740eecfb46d682cc17ff /src/mongo | |
parent | 031193b825245e1d56d09e41b4c10652ef012579 (diff) | |
download | mongo-d0ae5688ea3083d2916c2213a262ed0ec2cf6b4f.tar.gz |
SERVER-20290 Introduce migration session id
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/s/SConscript | 22 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.cpp | 92 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.h | 25 | ||||
-rw-r--r-- | src/mongo/db/s/migration_impl.cpp | 17 | ||||
-rw-r--r-- | src/mongo/db/s/migration_impl.h | 5 | ||||
-rw-r--r-- | src/mongo/db/s/migration_session_id.cpp | 95 | ||||
-rw-r--r-- | src/mongo/db/s/migration_session_id.h | 83 | ||||
-rw-r--r-- | src/mongo/db/s/migration_session_id_test.cpp | 82 | ||||
-rw-r--r-- | src/mongo/db/s/migration_source_manager.cpp | 53 | ||||
-rw-r--r-- | src/mongo/db/s/migration_source_manager.h | 17 | ||||
-rw-r--r-- | src/mongo/db/s/move_chunk_command.cpp | 7 | ||||
-rw-r--r-- | src/mongo/s/d_migrate.cpp | 35 |
12 files changed, 468 insertions, 65 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index d752c9fed0a..891a891c4db 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -20,6 +20,17 @@ env.Library( ) env.Library( + target='migration_types', + source=[ + 'migration_session_id.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/bson/util/bson_extract', + ], +) + +env.Library( target='sharding', source=[ 'migration_destination_manager.cpp', @@ -34,6 +45,7 @@ env.Library( '$BUILD_DIR/mongo/bson/util/bson_extract', '$BUILD_DIR/mongo/db/common', '$BUILD_DIR/mongo/s/sharding_initialization', + 'migration_types', ], LIBDEPS_TAGS=[ # Depends on symbols defined in files in serverOnlyFiles, and has @@ -62,6 +74,16 @@ env.Library( ) env.CppUnitTest( + target='migration_types_test', + source=[ + 'migration_session_id_test.cpp', + ], + LIBDEPS=[ + 'migration_types', + ] +) + +env.CppUnitTest( target='metadata_test', source=[ 'metadata_loader_test.cpp', diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index c354ee5dd41..3fe6919a89a 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -161,6 +161,32 @@ bool opReplicatedEnough(OperationContext* txn, return majorityStatus.isOK() && userStatus.isOK(); } +/** + * Create the migration clone request BSON object to send to the source + * shard. + * + * 'sessionId' unique identifier for this migration. + */ +BSONObj createMigrateCloneRequest(const MigrationSessionId& sessionId) { + BSONObjBuilder builder; + builder.append("_migrateClone", 1); + sessionId.append(&builder); + return builder.obj(); +} + +/** + * Create the migration transfer mods request BSON object to send to the source + * shard. + * + * 'sessionId' unique identifier for this migration. + */ +BSONObj createTransferModsRequest(const MigrationSessionId& sessionId) { + BSONObjBuilder builder; + builder.append("_transferMods", 1); + sessionId.append(&builder); + return builder.obj(); +} + MONGO_FP_DECLARE(failMigrationReceivedOutOfRangeOperation); } // namespace @@ -174,13 +200,7 @@ MONGO_FP_DECLARE(migrateThreadHangAtStep4); MONGO_FP_DECLARE(migrateThreadHangAtStep5); -MigrationDestinationManager::MigrationDestinationManager() - : _active(false), - _numCloned(0), - _clonedBytes(0), - _numCatchup(0), - _numSteady(0), - _state(READY) {} +MigrationDestinationManager::MigrationDestinationManager() = default; MigrationDestinationManager::~MigrationDestinationManager() = default; @@ -194,15 +214,19 @@ void MigrationDestinationManager::setState(State newState) { _state = newState; } -bool MigrationDestinationManager::getActive() const { +bool MigrationDestinationManager::isActive() const { stdx::lock_guard<stdx::mutex> lk(_mutex); - return _active; + return _sessionId.is_initialized(); } void MigrationDestinationManager::report(BSONObjBuilder& b) { stdx::lock_guard<stdx::mutex> sl(_mutex); - b.appendBool("active", _active); + b.appendBool("active", _sessionId.is_initialized()); + + if (_sessionId) { + b.append("sessionId", _sessionId->toString()); + } b.append("ns", _ns); b.append("from", _from); @@ -225,6 +249,7 @@ void MigrationDestinationManager::report(BSONObjBuilder& b) { } Status MigrationDestinationManager::start(const string& ns, + const MigrationSessionId& sessionId, const string& fromShard, const BSONObj& min, const BSONObj& max, @@ -233,7 +258,7 @@ Status MigrationDestinationManager::start(const string& ns, const WriteConcernOptions& writeConcern) { stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_active) { + if (_sessionId) { return Status(ErrorCodes::ConflictingOperationInProgress, str::stream() << "Active migration already in progress " << "ns: " << _ns << ", from: " << _from << ", min: " << _min @@ -254,7 +279,7 @@ Status MigrationDestinationManager::start(const string& ns, _numCatchup = 0; _numSteady = 0; - _active = true; + _sessionId = sessionId; // TODO: If we are here, the migrate thread must have completed, otherwise _active above would // be false, so this would never block. There is no better place with the current implementation @@ -263,9 +288,10 @@ Status MigrationDestinationManager::start(const string& ns, _migrateThreadHandle.join(); } - _migrateThreadHandle = - stdx::thread([this, ns, min, max, shardKeyPattern, fromShard, epoch, writeConcern]() { - _migrateThread(ns, min, max, shardKeyPattern, fromShard, epoch, writeConcern); + _migrateThreadHandle = stdx::thread( + [this, ns, sessionId, min, max, shardKeyPattern, fromShard, epoch, writeConcern]() { + _migrateThread( + ns, sessionId, min, max, shardKeyPattern, fromShard, epoch, writeConcern); }); return Status::OK(); @@ -277,18 +303,31 @@ void MigrationDestinationManager::abort() { _errmsg = "aborted"; } -bool MigrationDestinationManager::startCommit() { +bool MigrationDestinationManager::startCommit(const MigrationSessionId& sessionId) { stdx::unique_lock<stdx::mutex> lock(_mutex); if (_state != STEADY) { return false; } + // In STEADY state we must have active migration + invariant(_sessionId); + + // This check guards against the (very unlikely) situation where the current donor shard has + // been stalled for some time, during which the recipient shard crashed or timed out and started + // serving as a recipient of chunks for another collection (note that it cannot be the same + // collection, because the old donor still holds the collection lock). + if (!_sessionId->matches(sessionId)) { + warning() << "startCommit received commit request from a stale session " << sessionId.toString() + << ". Current session is " << _sessionId->toString(); + return false; + } + _state = COMMIT_START; const auto deadline = stdx::chrono::system_clock::now() + Seconds(30); - while (_active) { + while (_sessionId) { if (stdx::cv_status::timeout == _isActiveCV.wait_until(lock, deadline)) { _state = FAIL; log() << "startCommit never finished!" << migrateLog; @@ -305,6 +344,7 @@ bool MigrationDestinationManager::startCommit() { } void MigrationDestinationManager::_migrateThread(std::string ns, + MigrationSessionId sessionId, BSONObj min, BSONObj max, BSONObj shardKeyPattern, @@ -321,7 +361,8 @@ void MigrationDestinationManager::_migrateThread(std::string ns, } try { - _migrateDriver(&txn, ns, min, max, shardKeyPattern, fromShard, epoch, writeConcern); + _migrateDriver( + &txn, ns, sessionId, min, max, shardKeyPattern, fromShard, epoch, writeConcern); } catch (std::exception& e) { { stdx::lock_guard<stdx::mutex> sl(_mutex); @@ -353,19 +394,20 @@ void MigrationDestinationManager::_migrateThread(std::string ns, } stdx::lock_guard<stdx::mutex> lk(_mutex); - _active = false; + _sessionId = boost::none; _isActiveCV.notify_all(); } void MigrationDestinationManager::_migrateDriver(OperationContext* txn, const string& ns, + const MigrationSessionId& sessionId, const BSONObj& min, const BSONObj& max, const BSONObj& shardKeyPattern, const std::string& fromShard, const OID& epoch, const WriteConcernOptions& writeConcern) { - invariant(getActive()); + invariant(isActive()); invariant(getState() == READY); invariant(!min.isEmpty()); invariant(!max.isEmpty()); @@ -563,10 +605,12 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, // 3. Initial bulk clone setState(CLONE); + const BSONObj migrateCloneRequest = createMigrateCloneRequest(sessionId); + while (true) { BSONObj res; if (!conn->runCommand("admin", - BSON("_migrateClone" << 1), + migrateCloneRequest, res)) { // gets array of objects to copy, in disk order setState(FAIL); errmsg = "_migrateClone failed: "; @@ -645,13 +689,15 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, // secondaries repl::OpTime lastOpApplied = repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(); + const BSONObj xferModsRequest = createTransferModsRequest(sessionId); + { // 4. Do bulk of mods setState(CATCHUP); while (true) { BSONObj res; - if (!conn->runCommand("admin", BSON("_transferMods" << 1), res)) { + if (!conn->runCommand("admin", xferModsRequest, res)) { setState(FAIL); errmsg = "_transferMods failed: "; errmsg += res.toString(); @@ -752,7 +798,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, } BSONObj res; - if (!conn->runCommand("admin", BSON("_transferMods" << 1), res)) { + if (!conn->runCommand("admin", xferModsRequest, res)) { log() << "_transferMods failed in STEADY state: " << res << migrateLog; errmsg = res.toString(); setState(FAIL); diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h index b349147f764..03c3d05d665 100644 --- a/src/mongo/db/s/migration_destination_manager.h +++ b/src/mongo/db/s/migration_destination_manager.h @@ -34,6 +34,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/bson/bsonobjbuilder.h" #include "mongo/bson/oid.h" +#include "mongo/db/s/migration_session_id.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" @@ -64,7 +65,7 @@ public: State getState() const; void setState(State newState); - bool getActive() const; + bool isActive() const; /** * Reports the state of the migration manager as a BSON document. @@ -75,6 +76,7 @@ public: * Returns OK if migration started successfully. */ Status start(const std::string& ns, + const MigrationSessionId& sessionId, const std::string& fromShard, const BSONObj& min, const BSONObj& max, @@ -84,13 +86,14 @@ public: void abort(); - bool startCommit(); + bool startCommit(const MigrationSessionId& sessionId); private: /** * Thread which drives the migration apply process on the recipient side. */ void _migrateThread(std::string ns, + MigrationSessionId sessionId, BSONObj min, BSONObj max, BSONObj shardKeyPattern, @@ -100,6 +103,7 @@ private: void _migrateDriver(OperationContext* txn, const std::string& ns, + const MigrationSessionId& sessionId, const BSONObj& min, const BSONObj& max, const BSONObj& shardKeyPattern, @@ -125,9 +129,10 @@ private: // Mutex to guard all fields mutable stdx::mutex _mutex; - // Whether the prepare method has been called along with a condition variable on which to wait - // for prepare to be called - bool _active; + // Migration session ID uniquely identifies the migration and indicates whether the prepare + // method has been called. + boost::optional<MigrationSessionId> _sessionId{boost::none}; + // A condition variable on which to wait for the prepare method to be called. stdx::condition_variable _isActiveCV; stdx::thread _migrateThreadHandle; @@ -139,12 +144,12 @@ private: BSONObj _max; BSONObj _shardKeyPattern; - long long _numCloned; - long long _clonedBytes; - long long _numCatchup; - long long _numSteady; + long long _numCloned{0}; + long long _clonedBytes{0}; + long long _numCatchup{0}; + long long _numSteady{0}; - State _state; + State _state{READY}; std::string _errmsg; }; diff --git a/src/mongo/db/s/migration_impl.cpp b/src/mongo/db/s/migration_impl.cpp index 367bdc9e5c8..d33fc302d9b 100644 --- a/src/mongo/db/s/migration_impl.cpp +++ b/src/mongo/db/s/migration_impl.cpp @@ -80,6 +80,13 @@ WriteConcernOptions getDefaultWriteConcernForMigration() { return WriteConcernOptions(1, WriteConcernOptions::NONE, 0); } +BSONObj createRecvChunkCommitRequest(const MigrationSessionId& sessionId) { + BSONObjBuilder builder; + builder.append("_recvChunkCommit", 1); + sessionId.append(&builder); + return builder.obj(); +} + MONGO_FP_DECLARE(failMigrationCommit); MONGO_FP_DECLARE(hangBeforeLeavingCriticalSection); MONGO_FP_DECLARE(failMigrationConfigWritePrepare); @@ -276,7 +283,7 @@ ChunkMoveOperationState::acquireMoveMetadata() { return &_distLockStatus->getValue(); } -Status ChunkMoveOperationState::commitMigration() { +Status ChunkMoveOperationState::commitMigration(const MigrationSessionId& sessionId) { invariant(_distLockStatus.is_initialized()); invariant(_distLockStatus->isOK()); @@ -321,7 +328,7 @@ Status ChunkMoveOperationState::commitMigration() { try { ScopedDbConnection connTo(_toShardCS, 35.0); - connTo->runCommand("admin", BSON("_recvChunkCommit" << 1), res); + connTo->runCommand("admin", createRecvChunkCommitRequest(sessionId), res); connTo.done(); recvChunkCommitStatus = getStatusFromCommandResult(res); } catch (const DBException& e) { @@ -578,9 +585,11 @@ std::shared_ptr<CollectionMetadata> ChunkMoveOperationState::getCollMetadata() c return _collMetadata; } -Status ChunkMoveOperationState::start(BSONObj shardKeyPattern) { +Status ChunkMoveOperationState::start(const MigrationSessionId& sessionId, + const BSONObj& shardKeyPattern) { auto migrationSourceManager = ShardingState::get(_txn)->migrationSourceManager(); - if (!migrationSourceManager->start(_txn, _nss.ns(), _minKey, _maxKey, shardKeyPattern)) { + if (!migrationSourceManager->start( + _txn, sessionId, _nss.ns(), _minKey, _maxKey, shardKeyPattern)) { return {ErrorCodes::ConflictingOperationInProgress, "Not starting chunk migration because another migration is already in progress " "from this shard"}; diff --git a/src/mongo/db/s/migration_impl.h b/src/mongo/db/s/migration_impl.h index a5d86e8d544..e3d451de2db 100644 --- a/src/mongo/db/s/migration_impl.h +++ b/src/mongo/db/s/migration_impl.h @@ -42,6 +42,7 @@ namespace mongo { class CollectionMetadata; +class MigrationSessionId; class OperationContext; template <typename T> class StatusWith; @@ -114,7 +115,7 @@ public: /** * Starts the move chunk operation. */ - Status start(BSONObj shardKeyPattern); + Status start(const MigrationSessionId& sessionId, const BSONObj& shardKeyPattern); /** * Implements the migration critical section. Needs to be invoked after all data has been moved @@ -125,7 +126,7 @@ public: * Since some migration failures are non-recoverable, it may also shut down the server on * certain errors. */ - Status commitMigration(); + Status commitMigration(const MigrationSessionId& sessionId); const NamespaceString& getNss() const { return _nss; diff --git a/src/mongo/db/s/migration_session_id.cpp b/src/mongo/db/s/migration_session_id.cpp new file mode 100644 index 00000000000..25a0bea7aeb --- /dev/null +++ b/src/mongo/db/s/migration_session_id.cpp @@ -0,0 +1,95 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/migration_session_id.h" + +#include "mongo/base/status_with.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/bson/oid.h" +#include "mongo/bson/util/bson_extract.h" +#include "mongo/util/mongoutils/str.h" + +namespace mongo { + +using std::string; + +namespace { + +// Field name, which the extractFromBSON method expects. Use this value if adding a migration +// session to BSON. +const char kFieldName[] = "sessionId"; + +} // namespace + +MigrationSessionId MigrationSessionId::generate(StringData donor, StringData recipient) { + invariant(!donor.empty()); + invariant(!recipient.empty()); + + return MigrationSessionId(str::stream() << donor << "_" << recipient << "_" + << OID::gen().toString()); +} + +StatusWith<MigrationSessionId> MigrationSessionId::extractFromBSON(const BSONObj& obj) { + string sessionId; + Status status = bsonExtractStringField(obj, kFieldName, &sessionId); + if (status.isOK()) { + return MigrationSessionId(sessionId); + } else if (status == ErrorCodes::NoSuchKey) { + return MigrationSessionId(); + } + + return status; +} + +MigrationSessionId::MigrationSessionId() = default; + +MigrationSessionId::MigrationSessionId(std::string sessionId) { + invariant(!sessionId.empty()); + _sessionId = std::move(sessionId); +} + +bool MigrationSessionId::matches(const MigrationSessionId& other) const { + if (_sessionId && other._sessionId) + return *_sessionId == *other._sessionId; + + return !_sessionId && !other._sessionId; +} + +void MigrationSessionId::append(BSONObjBuilder* builder) const { + if (_sessionId) { + builder->append(kFieldName, *_sessionId); + } +} + +std::string MigrationSessionId::toString() const { + return (_sessionId ? *_sessionId : ""); +} + +} // namespace mongo diff --git a/src/mongo/db/s/migration_session_id.h b/src/mongo/db/s/migration_session_id.h new file mode 100644 index 00000000000..dbfde922c88 --- /dev/null +++ b/src/mongo/db/s/migration_session_id.h @@ -0,0 +1,83 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include <boost/optional.hpp> +#include <string> + +#include "mongo/base/disallow_copying.h" +#include "mongo/base/string_data.h" + +namespace mongo { + +class BSONObj; +class BSONObjBuilder; +template <typename T> +class StatusWith; + +/** + * Encapsulates the logic for generating, parsing and comparing migration sessions. The migration + * session id is a unique identifier for a particular moveChunk command and is exchanged as part of + * all communication between the source and donor shards. + */ +class MigrationSessionId { +public: + /** + * Constructs a new migration session identifier with the following format: + * DonorId_RecipientId_UniqueIdentifier + */ + static MigrationSessionId generate(StringData donor, StringData recipient); + + /** + * Extracts the session id from BSON. If the session id is missing from the BSON contents, + * returns an empty MigrationSessionId. + */ + static StatusWith<MigrationSessionId> extractFromBSON(const BSONObj& obj); + + /** + * Compares two session identifiers. Two idendifiers match if either both are empty (_sessionId + * is not set) or if the session ids match. + */ + bool matches(const MigrationSessionId& other) const; + + /** + * Appends the migration session id to the specified builder. + */ + void append(BSONObjBuilder* builder) const; + + std::string toString() const; + +private: + MigrationSessionId(); + explicit MigrationSessionId(std::string sessionId); + + boost::optional<std::string> _sessionId{boost::none}; +}; + +} // namespace mongo diff --git a/src/mongo/db/s/migration_session_id_test.cpp b/src/mongo/db/s/migration_session_id_test.cpp new file mode 100644 index 00000000000..24e26888c53 --- /dev/null +++ b/src/mongo/db/s/migration_session_id_test.cpp @@ -0,0 +1,82 @@ +/** + * Copyright (C) 2012 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/migration_session_id.h" + +#include "mongo/base/status_with.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/jsobj.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/time_support.h" + +namespace mongo { + +using unittest::assertGet; + +namespace { + +TEST(MigrationSessionId, GenerateAndExtract) { + MigrationSessionId origSessionId = MigrationSessionId::generate("Source", "Dest"); + + BSONObjBuilder builder; + origSessionId.append(&builder); + BSONObj obj = builder.obj(); + + MigrationSessionId sessionIdAfter = assertGet(MigrationSessionId::extractFromBSON(obj)); + ASSERT(origSessionId.matches(sessionIdAfter)); + ASSERT_EQ(origSessionId.toString(), sessionIdAfter.toString()); +} + +TEST(MigrationSessionId, Comparison) { + MigrationSessionId emptySessionId = + assertGet(MigrationSessionId::extractFromBSON(BSON("SomeField" << 1))); + MigrationSessionId nonEmptySessionId = + assertGet(MigrationSessionId::extractFromBSON(BSON("SomeField" << 1 << "sessionId" + << "TestSessionID"))); + + ASSERT(!emptySessionId.matches(nonEmptySessionId)); + ASSERT(!nonEmptySessionId.matches(emptySessionId)); + + MigrationSessionId sessionIdToCompare = + assertGet(MigrationSessionId::extractFromBSON(BSON("SomeOtherField" << 1 << "sessionId" + << "TestSessionID"))); + ASSERT(nonEmptySessionId.matches(sessionIdToCompare)); + ASSERT(sessionIdToCompare.matches(nonEmptySessionId)); +} + +TEST(MigrationSessionId, ErrorWhenTypeIsNotString) { + ASSERT_NOT_OK(MigrationSessionId::extractFromBSON( + BSON("SomeField" << 1 << "sessionId" << Date_t::now())).getStatus()); + ASSERT_NOT_OK(MigrationSessionId::extractFromBSON(BSON("SomeField" << 1 << "sessionId" << 2)) + .getStatus()); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index 6553e76b56e..642bcc6cf1f 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -162,6 +162,7 @@ MigrationSourceManager::MigrationSourceManager() = default; MigrationSourceManager::~MigrationSourceManager() = default; bool MigrationSourceManager::start(OperationContext* txn, + const MigrationSessionId& sessionId, const std::string& ns, const BSONObj& min, const BSONObj& max, @@ -176,7 +177,7 @@ bool MigrationSourceManager::start(OperationContext* txn, stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_active) { + if (_sessionId) { return false; } @@ -189,7 +190,7 @@ bool MigrationSourceManager::start(OperationContext* txn, invariant(_reload.size() == 0); invariant(_memoryUsed == 0); - _active = true; + _sessionId = sessionId; stdx::lock_guard<stdx::mutex> tLock(_cloneLocsMutex); invariant(_cloneLocs.size() == 0); @@ -206,7 +207,7 @@ void MigrationSourceManager::done(OperationContext* txn) { stdx::lock_guard<stdx::mutex> lk(_mutex); - _active = false; + _sessionId = boost::none; _deleteNotifyExec.reset(NULL); _inCriticalSection = false; _inCriticalSectionCV.notify_all(); @@ -230,7 +231,7 @@ void MigrationSourceManager::logInsertOp(OperationContext* txn, dassert(txn->lockState()->isWriteLocked()); // Must have Global IX. - if (!_active || (_nss != ns)) + if (!_sessionId || (_nss != ns)) return; BSONElement idElement = obj["_id"]; @@ -259,7 +260,7 @@ void MigrationSourceManager::logUpdateOp(OperationContext* txn, dassert(txn->lockState()->isWriteLocked()); // Must have Global IX. - if (!_active || (_nss != ns)) + if (!_sessionId || (_nss != ns)) return; BSONElement idElement = updatedDoc["_id"]; @@ -300,14 +301,17 @@ void MigrationSourceManager::logDeleteOp(OperationContext* txn, } bool MigrationSourceManager::isInMigratingChunk(const NamespaceString& ns, const BSONObj& doc) { - if (!_active) + if (!_sessionId) return false; + if (ns != _nss) return false; + return isInRange(doc, _min, _max, _shardKeyPattern); } bool MigrationSourceManager::transferMods(OperationContext* txn, + const MigrationSessionId& sessionId, string& errmsg, BSONObjBuilder& b) { long long size = 0; @@ -316,11 +320,19 @@ bool MigrationSourceManager::transferMods(OperationContext* txn, AutoGetCollectionForRead ctx(txn, _getNS()); stdx::lock_guard<stdx::mutex> sl(_mutex); - if (!_active) { + + if (!_sessionId) { errmsg = "no active migration!"; return false; } + if (!_sessionId->matches(sessionId)) { + errmsg = str::stream() << "requested migration session id " << sessionId.toString() + << " does not match active session id " + << _sessionId->toString(); + return false; + } + // TODO: fix SERVER-16540 race _xfer(txn, _nss.ns(), ctx.getDb(), &_deleted, b, "deleted", size, false); _xfer(txn, _nss.ns(), ctx.getDb(), &_reload, b, "reload", size, true); @@ -452,7 +464,10 @@ bool MigrationSourceManager::storeCurrentLocs(OperationContext* txn, return true; } -bool MigrationSourceManager::clone(OperationContext* txn, string& errmsg, BSONObjBuilder& result) { +bool MigrationSourceManager::clone(OperationContext* txn, + const MigrationSessionId& sessionId, + string& errmsg, + BSONObjBuilder& result) { ElapsedTracker tracker(internalQueryExecYieldIterations, internalQueryExecYieldPeriodMS); int allocSize = 0; @@ -461,11 +476,19 @@ bool MigrationSourceManager::clone(OperationContext* txn, string& errmsg, BSONOb AutoGetCollection autoColl(txn, _getNS(), MODE_IS); stdx::lock_guard<stdx::mutex> sl(_mutex); - if (!_active) { + + if (!_sessionId) { errmsg = "not active"; return false; } + if (!_sessionId->matches(sessionId)) { + errmsg = str::stream() << "requested migration session id " << sessionId.toString() + << " does not match active session id " + << _sessionId->toString(); + return false; + } + Collection* collection = autoColl.getCollection(); if (!collection) { errmsg = str::stream() << "collection " << _nss.toString() << " does not exist"; @@ -483,11 +506,19 @@ bool MigrationSourceManager::clone(OperationContext* txn, string& errmsg, BSONOb AutoGetCollection autoColl(txn, _getNS(), MODE_IS); stdx::lock_guard<stdx::mutex> sl(_mutex); - if (!_active) { + + if (!_sessionId) { errmsg = "not active"; return false; } + if (!_sessionId->matches(sessionId)) { + errmsg = str::stream() << "migration session id changed from " << sessionId.toString() + << " to " << _sessionId->toString() + << " while initial clone was active"; + return false; + } + // TODO: fix SERVER-16540 race Collection* collection = autoColl.getCollection(); if (!collection) { @@ -579,7 +610,7 @@ bool MigrationSourceManager::waitTillNotInCriticalSection(int maxSecondsToWait) bool MigrationSourceManager::isActive() const { stdx::lock_guard<stdx::mutex> lk(_mutex); - return _active; + return _sessionId.is_initialized(); } void MigrationSourceManager::_xfer(OperationContext* txn, diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h index 3f3c662e981..4d065fbf8d2 100644 --- a/src/mongo/db/s/migration_source_manager.h +++ b/src/mongo/db/s/migration_source_manager.h @@ -35,11 +35,11 @@ #include "mongo/base/disallow_copying.h" #include "mongo/bson/bsonobj.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/s/migration_session_id.h" #include "mongo/stdx/condition_variable.h" namespace mongo { -class BSONObj; class Database; class OperationContext; class PlanExecutor; @@ -57,6 +57,7 @@ public: * already an existing migration in progress. */ bool start(OperationContext* txn, + const MigrationSessionId& sessionId, const std::string& ns, const BSONObj& min, const BSONObj& max, @@ -104,7 +105,10 @@ public: * Called from the source of a migration process, this method transfers the accummulated local * mods from source to destination. */ - bool transferMods(OperationContext* txn, std::string& errmsg, BSONObjBuilder& b); + bool transferMods(OperationContext* txn, + const MigrationSessionId& sessionId, + std::string& errmsg, + BSONObjBuilder& b); /** * Get the disklocs that belong to the chunk migrated and sort them in _cloneLocs (to avoid @@ -121,7 +125,10 @@ public: std::string& errmsg, BSONObjBuilder& result); - bool clone(OperationContext* txn, std::string& errmsg, BSONObjBuilder& result); + bool clone(OperationContext* txn, + const MigrationSessionId& sessionId, + std::string& errmsg, + BSONObjBuilder& result); void aboutToDelete(const RecordId& dl); @@ -193,8 +200,8 @@ private: // Bytes in _reload + _deleted long long _memoryUsed{0}; // (M) - // If a migration is currently active. - bool _active{false}; // (MG) + // Uniquely identifies a migration and indicates a migration is active when set. + boost::optional<MigrationSessionId> _sessionId{boost::none}; // (MG) NamespaceString _nss; // (MG) BSONObj _min; // (MG) diff --git a/src/mongo/db/s/move_chunk_command.cpp b/src/mongo/db/s/move_chunk_command.cpp index 12e93857473..1e1f14335d8 100644 --- a/src/mongo/db/s/move_chunk_command.cpp +++ b/src/mongo/db/s/move_chunk_command.cpp @@ -248,7 +248,9 @@ public: // 3. - auto moveChunkStartStatus = chunkMoveState.start(shardKeyPattern); + const auto migrationSessionId = MigrationSessionId::generate(chunkMoveState.getFromShard(), + chunkMoveState.getToShard()); + auto moveChunkStartStatus = chunkMoveState.start(migrationSessionId, shardKeyPattern); if (!moveChunkStartStatus.isOK()) { warning() << moveChunkStartStatus.toString(); @@ -268,6 +270,7 @@ public: BSONObjBuilder recvChunkStartBuilder; recvChunkStartBuilder.append("_recvChunkStart", ns); + migrationSessionId.append(&recvChunkStartBuilder); recvChunkStartBuilder.append("from", chunkMoveState.getFromShardCS().toString()); recvChunkStartBuilder.append("fromShardName", chunkMoveState.getFromShard()); recvChunkStartBuilder.append("toShardName", chunkMoveState.getToShard()); @@ -466,7 +469,7 @@ public: return appendCommandStatus(result, Status(lockStatus.code(), msg)); } - uassertStatusOK(chunkMoveState.commitMigration()); + uassertStatusOK(chunkMoveState.commitMigration(migrationSessionId)); timing.done(5); MONGO_FAIL_POINT_PAUSE_WHILE_SET(moveChunkHangAtStep5); diff --git a/src/mongo/s/d_migrate.cpp b/src/mongo/s/d_migrate.cpp index 89f300067ba..b7647aac7fc 100644 --- a/src/mongo/s/d_migrate.cpp +++ b/src/mongo/s/d_migrate.cpp @@ -96,7 +96,10 @@ public: int, string& errmsg, BSONObjBuilder& result) { - return ShardingState::get(txn)->migrationSourceManager()->transferMods(txn, errmsg, result); + const MigrationSessionId migrationSessionid( + uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj))); + return ShardingState::get(txn)->migrationSourceManager()->transferMods( + txn, migrationSessionid, errmsg, result); } } transferModsCommand; @@ -135,7 +138,10 @@ public: int, string& errmsg, BSONObjBuilder& result) { - return ShardingState::get(txn)->migrationSourceManager()->clone(txn, errmsg, result); + const MigrationSessionId migrationSessionid( + uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj))); + return ShardingState::get(txn)->migrationSourceManager()->clone( + txn, migrationSessionid, errmsg, result); } } initialCloneCommand; @@ -210,7 +216,7 @@ public: // Active state of TO-side migrations (MigrateStatus) is serialized by distributed // collection lock. - if (shardingState->migrationDestinationManager()->getActive()) { + if (shardingState->migrationDestinationManager()->isActive()) { errmsg = "migrate already in progress"; return false; } @@ -275,9 +281,18 @@ public: const string fromShard(cmdObj["from"].String()); - Status startStatus = shardingState->migrationDestinationManager()->start( - ns, fromShard, min, max, shardKeyPattern, currentVersion.epoch(), writeConcern); - + const MigrationSessionId migrationSessionId( + uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj))); + + Status startStatus = + shardingState->migrationDestinationManager()->start(ns, + migrationSessionId, + fromShard, + min, + max, + shardKeyPattern, + currentVersion.epoch(), + writeConcern); if (!startStatus.isOK()) { return appendCommandStatus(result, startStatus); } @@ -323,7 +338,7 @@ public: string& errmsg, BSONObjBuilder& result) { ShardingState::get(txn)->migrationDestinationManager()->report(result); - return 1; + return true; } } recvChunkStatusCommand; @@ -362,7 +377,11 @@ public: int, string& errmsg, BSONObjBuilder& result) { - bool ok = ShardingState::get(txn)->migrationDestinationManager()->startCommit(); + const MigrationSessionId migrationSessionid( + uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj))); + const bool ok = + ShardingState::get(txn)->migrationDestinationManager()->startCommit(migrationSessionid); + ShardingState::get(txn)->migrationDestinationManager()->report(result); return ok; } |