/**
* Copyright (C) 2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#pragma once
#include
#include "mongo/base/disallow_copying.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/bson/oid.h"
#include "mongo/client/connection_string.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/s/active_migrations_registry.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/migration_session_id.h"
#include "mongo/db/s/session_catalog_migration_destination.h"
#include "mongo/s/shard_id.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/thread.h"
#include "mongo/util/concurrency/with_lock.h"
#include "mongo/util/timer.h"
namespace mongo {
class OperationContext;
class Status;
struct WriteConcernOptions;
namespace repl {
class OpTime;
}
/**
* Drives the receiving side of the MongoD migration process. One instance exists per shard.
*/
class MigrationDestinationManager {
MONGO_DISALLOW_COPYING(MigrationDestinationManager);
public:
enum State { READY, CLONE, CATCHUP, STEADY, COMMIT_START, DONE, FAIL, ABORT };
MigrationDestinationManager();
~MigrationDestinationManager();
/**
* Returns the singleton instance of the migration destination manager.
*
* TODO (SERVER-25333): This should become per-collection instance instead of singleton.
*/
static MigrationDestinationManager* get(OperationContext* opCtx);
State getState() const;
void setState(State newState);
/**
* These log the argument msg; then, under lock, move msg to _errmsg and set the state to FAIL.
* The setStateWailWarn version logs with "warning() << msg".
*/
void setStateFail(std::string msg);
void setStateFailWarn(std::string msg);
/**
* Checks whether the MigrationDestinationManager is currently handling a migration.
*/
bool isActive() const;
/**
* Reports the state of the migration manager as a BSON document.
*/
void report(BSONObjBuilder& b, OperationContext* opCtx, bool waitForSteadyOrDone);
/**
* Returns a report on the active migration, if the migration is active. Otherwise return an
* empty BSONObj.
*/
BSONObj getMigrationStatusReport();
/**
* Returns OK if migration started successfully.
*/
Status start(const NamespaceString& nss,
ScopedReceiveChunk scopedReceiveChunk,
const MigrationSessionId& sessionId,
const ConnectionString& fromShardConnString,
const ShardId& fromShard,
const ShardId& toShard,
const BSONObj& min,
const BSONObj& max,
const BSONObj& shardKeyPattern,
const OID& epoch,
const WriteConcernOptions& writeConcern);
/**
* Clones documents from a donor shard.
*/
static void cloneDocumentsFromDonor(
OperationContext* opCtx,
stdx::function insertBatchFn,
stdx::function fetchBatchFn);
/**
* Idempotent method, which causes the current ongoing migration to abort only if it has the
* specified session id. If the migration is already aborted, does nothing.
*/
Status abort(const MigrationSessionId& sessionId);
/**
* Same as 'abort' above, but unconditionally aborts the current migration without checking the
* session id. Only used for backwards compatibility.
*/
void abortWithoutSessionIdCheck();
Status startCommit(const MigrationSessionId& sessionId);
private:
/**
* Thread which drives the migration apply process on the recipient side.
*/
void _migrateThread(BSONObj min,
BSONObj max,
BSONObj shardKeyPattern,
ConnectionString fromShardConnString,
OID epoch,
WriteConcernOptions writeConcern);
void _migrateDriver(OperationContext* opCtx,
const BSONObj& min,
const BSONObj& max,
const BSONObj& shardKeyPattern,
const ConnectionString& fromShardConnString,
const OID& epoch,
const WriteConcernOptions& writeConcern);
bool _applyMigrateOp(OperationContext* opCtx,
const NamespaceString& ns,
const BSONObj& min,
const BSONObj& max,
const BSONObj& shardKeyPattern,
const BSONObj& xfer,
repl::OpTime* lastOpApplied);
bool _flushPendingWrites(OperationContext* opCtx,
const std::string& ns,
BSONObj min,
BSONObj max,
const repl::OpTime& lastOpApplied,
const WriteConcernOptions& writeConcern);
/**
* Remembers a chunk range between 'min' and 'max' as a range which will have data migrated
* into it, to protect it against separate commands to clean up orphaned data. First, though,
* it schedules deletion of any documents in the range, so that process must be seen to be
* complete before migrating any new documents in.
*/
CollectionShardingState::CleanupNotification _notePending(OperationContext*,
NamespaceString const&,
OID const&,
ChunkRange const&);
/**
* Stops tracking a chunk range between 'min' and 'max' that previously was having data
* migrated into it, and schedules deletion of any such documents already migrated in.
*/
void _forgetPending(OperationContext*, NamespaceString const&, OID const&, ChunkRange const&);
/**
* Checks whether the MigrationDestinationManager is currently handling a migration by checking
* that the migration "_sessionId" is initialized.
*/
bool _isActive(WithLock) const;
// Mutex to guard all fields
mutable stdx::mutex _mutex;
// Migration session ID uniquely identifies the migration and indicates whether the prepare
// method has been called.
boost::optional _sessionId;
boost::optional _scopedReceiveChunk;
// A condition variable on which to wait for the prepare method to be called.
stdx::condition_variable _isActiveCV;
stdx::thread _migrateThreadHandle;
NamespaceString _nss;
ConnectionString _fromShardConnString;
ShardId _fromShard;
ShardId _toShard;
BSONObj _min;
BSONObj _max;
BSONObj _shardKeyPattern;
// Set to true once we have accepted the chunk as pending into our metadata. Used so that on
// failure we can perform the appropriate cleanup.
bool _chunkMarkedPending{false};
long long _numCloned{0};
long long _clonedBytes{0};
long long _numCatchup{0};
long long _numSteady{0};
State _state{READY};
std::string _errmsg;
std::unique_ptr _sessionMigration;
// Condition variable, which is signalled every time the state of the migration changes.
stdx::condition_variable _stateChangedCV;
};
} // namespace mongo