summaryrefslogtreecommitdiff
path: root/src/mongo/s/d_migrate.cpp
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2015-07-13 11:26:52 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2015-07-14 18:16:03 -0400
commit00c74d1386d4678ebcdeb37878fbb3c53cb02d83 (patch)
tree1f14b3adeb997b989d2a10d8f35f84c37f3efa51 /src/mongo/s/d_migrate.cpp
parentd9500825fd369a98e177eeedbe9c64dbcf8d9392 (diff)
downloadmongo-00c74d1386d4678ebcdeb37878fbb3c53cb02d83.tar.gz
SERVER-18084 Modularize chunk migration state machine
* Move the sharding migration destination (receiving side) management to a separate source file * Move the destination thread management to be owned by the destination manager * Move the sharding migration source (donor side) management to a separate source file * Remove the medianKey command since it is noop and not used anywhere No migration logic changes.
Diffstat (limited to 'src/mongo/s/d_migrate.cpp')
-rw-r--r--src/mongo/s/d_migrate.cpp1765
1 files changed, 92 insertions, 1673 deletions
diff --git a/src/mongo/s/d_migrate.cpp b/src/mongo/s/d_migrate.cpp
index b98c02dada8..beebc9b7dc6 100644
--- a/src/mongo/s/d_migrate.cpp
+++ b/src/mongo/s/d_migrate.cpp
@@ -39,31 +39,23 @@
#include "mongo/db/auth/action_set.h"
#include "mongo/db/auth/action_type.h"
#include "mongo/db/auth/authorization_manager.h"
-#include "mongo/db/auth/authorization_manager_global.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/auth/privilege.h"
-#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/catalog/index_create.h"
#include "mongo/db/clientcursor.h"
#include "mongo/db/commands.h"
#include "mongo/db/concurrency/lock_state.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbhelpers.h"
-#include "mongo/db/exec/plan_stage.h"
#include "mongo/db/field_parser.h"
-#include "mongo/db/hasher.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/op_observer.h"
-#include "mongo/db/operation_context_impl.h"
-#include "mongo/db/ops/delete.h"
-#include "mongo/db/query/internal_plans.h"
-#include "mongo/db/query/query_knobs.h"
#include "mongo/db/range_deleter_service.h"
-#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_coordinator_global.h"
-#include "mongo/db/storage/mmap_v1/dur.h"
#include "mongo/db/s/collection_metadata.h"
+#include "mongo/db/s/migration_destination_manager.h"
+#include "mongo/db/s/migration_source_manager.h"
#include "mongo/db/s/sharded_connection_info.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/write_concern.h"
@@ -78,29 +70,15 @@
#include "mongo/s/config.h"
#include "mongo/s/d_state.h"
#include "mongo/s/grid.h"
-#include "mongo/s/shard_key_pattern.h"
#include "mongo/stdx/chrono.h"
-#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/memory.h"
-#include "mongo/stdx/mutex.h"
-#include "mongo/stdx/thread.h"
#include "mongo/util/assert_util.h"
-#include "mongo/util/elapsed_tracker.h"
#include "mongo/util/exit.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
-#include "mongo/util/processinfo.h"
-#include "mongo/util/startup_test.h"
-
-// Pause while a fail point is enabled.
-#define MONGO_FP_PAUSE_WHILE(symbol) \
- while (MONGO_FAIL_POINT(symbol)) { \
- sleepmillis(100); \
- }
namespace mongo {
-using namespace stdx::chrono;
using std::list;
using std::set;
using std::string;
@@ -112,9 +90,14 @@ namespace {
const int kDefaultWTimeoutMs = 60 * 1000;
const WriteConcernOptions DefaultWriteConcern(2, WriteConcernOptions::NONE, kDefaultWTimeoutMs);
+Tee* migrateLog = RamLog::get("migrate");
+
+MigrationSourceManager migrateSourceManager;
+MigrationDestinationManager migrateDestManager;
+
/**
- * Returns the default write concern for migration cleanup (at donor shard) and
- * cloning documents (at recipient shard).
+ * Returns the default write concern for migration cleanup (at donor shard) and cloning documents
+ * (on the destination shard).
*/
WriteConcernOptions getDefaultWriteConcern() {
repl::ReplicationCoordinator* replCoordinator = repl::getGlobalReplicationCoordinator();
@@ -128,98 +111,39 @@ WriteConcernOptions getDefaultWriteConcern() {
return WriteConcernOptions(1, WriteConcernOptions::NONE, 0);
}
-} // namespace
-
-MONGO_FP_DECLARE(failMigrationCommit);
-MONGO_FP_DECLARE(failMigrationConfigWritePrepare);
-MONGO_FP_DECLARE(failMigrationApplyOps);
-
-Tee* migrateLog = RamLog::get("migrate");
-
-class MoveTimingHelper {
-public:
- MoveTimingHelper(OperationContext* txn,
- const string& where,
- const string& ns,
- BSONObj min,
- BSONObj max,
- int total,
- string* cmdErrmsg,
- string toShard,
- string fromShard)
- : _txn(txn),
- _where(where),
- _ns(ns),
- _to(toShard),
- _from(fromShard),
- _next(0),
- _total(total),
- _cmdErrmsg(cmdErrmsg) {
- _b.append("min", min);
- _b.append("max", max);
+struct MigrateStatusHolder {
+ MigrateStatusHolder(OperationContext* txn,
+ MigrationSourceManager* migrateSourceManager,
+ const std::string& ns,
+ const BSONObj& min,
+ const BSONObj& max,
+ const BSONObj& shardKeyPattern)
+ : _txn(txn), _migrateSourceManager(migrateSourceManager) {
+ _isAnotherMigrationActive =
+ !_migrateSourceManager->start(txn, ns, min, max, shardKeyPattern);
}
-
- ~MoveTimingHelper() {
- // even if logChange doesn't throw, bson does
- // sigh
- try {
- if (!_to.empty()) {
- _b.append("to", _to);
- }
- if (!_from.empty()) {
- _b.append("from", _from);
- }
- if (_next != _total) {
- _b.append("note", "aborted");
- } else {
- _b.append("note", "success");
- }
- if (!_cmdErrmsg->empty()) {
- _b.append("errmsg", *_cmdErrmsg);
- }
-
- grid.catalogManager()->logChange(_txn->getClient()->clientAddress(true),
- (string) "moveChunk." + _where,
- _ns,
- _b.obj());
- } catch (const std::exception& e) {
- warning() << "couldn't record timing for moveChunk '" << _where << "': " << e.what()
- << migrateLog;
+ ~MigrateStatusHolder() {
+ if (!_isAnotherMigrationActive) {
+ _migrateSourceManager->done(_txn);
}
}
- void done(int step) {
- invariant(step == ++_next);
- invariant(step <= _total);
-
- const string s = str::stream() << "step " << step << " of " << _total;
-
- CurOp* op = CurOp::get(_txn);
- {
- stdx::lock_guard<Client> lk(*_txn->getClient());
- op->setMessage_inlock(s.c_str());
- }
-
- _b.appendNumber(s, _t.millis());
- _t.reset();
+ bool isAnotherMigrationActive() const {
+ return _isAnotherMigrationActive;
}
private:
OperationContext* const _txn;
- Timer _t;
+ MigrationSourceManager* const _migrateSourceManager;
- string _where;
- string _ns;
- string _to;
- string _from;
-
- int _next;
- int _total; // expected # of steps
+ bool _isAnotherMigrationActive;
+};
- const string* _cmdErrmsg;
+} // namespace
- BSONObjBuilder _b;
-};
+MONGO_FP_DECLARE(failMigrationCommit);
+MONGO_FP_DECLARE(failMigrationConfigWritePrepare);
+MONGO_FP_DECLARE(failMigrationApplyOps);
class ChunkCommandHelper : public Command {
public:
@@ -239,662 +163,6 @@ public:
}
};
-bool isInRange(const BSONObj& obj,
- const BSONObj& min,
- const BSONObj& max,
- const BSONObj& shardKeyPattern) {
- ShardKeyPattern shardKey(shardKeyPattern);
- BSONObj k = shardKey.extractShardKeyFromDoc(obj);
- return k.woCompare(min) >= 0 && k.woCompare(max) < 0;
-}
-
-class MigrateFromStatus {
-public:
- MigrateFromStatus() : _inCriticalSection(false), _memoryUsed(0), _active(false) {}
-
- /**
- * @return false if cannot start. One of the reason for not being able to
- * start is there is already an existing migration in progress.
- */
- bool start(OperationContext* txn,
- const std::string& ns,
- const BSONObj& min,
- const BSONObj& max,
- const BSONObj& shardKeyPattern) {
- verify(!min.isEmpty());
- verify(!max.isEmpty());
- verify(!ns.empty());
-
- // Get global shared to synchronize with logOp. Also see comments in the class
- // members declaration for more details.
- Lock::GlobalRead globalShared(txn->lockState());
- stdx::lock_guard<stdx::mutex> lk(_mutex);
-
- if (_active) {
- return false;
- }
-
- _ns = ns;
- _min = min;
- _max = max;
- _shardKeyPattern = shardKeyPattern;
-
- verify(_deleted.size() == 0);
- verify(_reload.size() == 0);
- verify(_memoryUsed == 0);
-
- _active = true;
-
- stdx::lock_guard<stdx::mutex> tLock(_cloneLocsMutex);
- verify(_cloneLocs.size() == 0);
-
- return true;
- }
-
- void done(OperationContext* txn) {
- log() << "MigrateFromStatus::done About to acquire global lock to exit critical "
- "section";
-
- // Get global shared to synchronize with logOp. Also see comments in the class
- // members declaration for more details.
- Lock::GlobalRead globalShared(txn->lockState());
- stdx::lock_guard<stdx::mutex> lk(_mutex);
-
- _active = false;
- _deleteNotifyExec.reset(NULL);
- _inCriticalSection = false;
- _inCriticalSectionCV.notify_all();
-
- _deleted.clear();
- _reload.clear();
- _memoryUsed = 0;
-
- stdx::lock_guard<stdx::mutex> cloneLock(_cloneLocsMutex);
- _cloneLocs.clear();
- }
-
- void logOp(OperationContext* txn,
- const char* opstr,
- const char* ns,
- const BSONObj& obj,
- BSONObj* patt,
- bool notInActiveChunk) {
- ensureShardVersionOKOrThrow(txn->getClient(), ns);
-
- const char op = opstr[0];
-
- if (notInActiveChunk) {
- // Ignore writes that came from the migration process like cleanup so they
- // won't be transferred to the recipient shard. Also ignore ops from
- // _migrateClone and _transferMods since it is impossible to move a chunk
- // to self.
- return;
- }
-
- dassert(txn->lockState()->isWriteLocked()); // Must have Global IX.
-
- if (!_active)
- return;
-
- if (_ns != ns)
- return;
-
- // no need to log if this is not an insertion, an update, or an actual deletion
- // note: opstr 'db' isn't a deletion but a mention that a database exists
- // (for replication machinery mostly).
- if (op == 'n' || op == 'c' || (op == 'd' && opstr[1] == 'b'))
- return;
-
- BSONElement ide;
- if (patt)
- ide = patt->getField("_id");
- else
- ide = obj["_id"];
-
- if (ide.eoo()) {
- warning() << "logOpForSharding got mod with no _id, ignoring obj: " << obj
- << migrateLog;
- return;
- }
-
- if (op == 'i' && (!isInRange(obj, _min, _max, _shardKeyPattern))) {
- return;
- }
-
- BSONObj idObj(ide.wrap());
-
- if (op == 'u') {
- BSONObj fullDoc;
- OldClientContext ctx(txn, _ns, false);
- if (!Helpers::findById(txn, ctx.db(), _ns.c_str(), idObj, fullDoc)) {
- warning() << "logOpForSharding couldn't find: " << idObj
- << " even though should have" << migrateLog;
- dassert(false); // TODO: Abort the migration.
- return;
- }
-
- if (!isInRange(fullDoc, _min, _max, _shardKeyPattern)) {
- return;
- }
- }
-
- // Note: can't check if delete is in active chunk since the document is gone!
-
- txn->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idObj, op));
- }
-
- /**
- * Insert items from docIdList to a new array with the given fieldName in the given
- * builder. If explode is true, the inserted object will be the full version of the
- * document. Note that the whenever an item from the docList is inserted to the array,
- * it will also be removed from docList.
- *
- * Should be holding the collection lock for ns if explode is true.
- */
- void xfer(OperationContext* txn,
- const string& ns,
- Database* db,
- list<BSONObj>* docIdList,
- BSONObjBuilder& builder,
- const char* fieldName,
- long long& size,
- bool explode) {
- const long long maxSize = 1024 * 1024;
-
- if (docIdList->size() == 0 || size > maxSize)
- return;
-
- BSONArrayBuilder arr(builder.subarrayStart(fieldName));
-
- list<BSONObj>::iterator docIdIter = docIdList->begin();
- while (docIdIter != docIdList->end() && size < maxSize) {
- BSONObj idDoc = *docIdIter;
- if (explode) {
- BSONObj fullDoc;
- if (Helpers::findById(txn, db, ns.c_str(), idDoc, fullDoc)) {
- arr.append(fullDoc);
- size += fullDoc.objsize();
- }
- } else {
- arr.append(idDoc);
- size += idDoc.objsize();
- }
-
- docIdIter = docIdList->erase(docIdIter);
- }
-
- arr.done();
- }
-
- /**
- * called from the dest of a migrate
- * transfers mods from src to dest
- */
- bool transferMods(OperationContext* txn, string& errmsg, BSONObjBuilder& b) {
- long long size = 0;
-
- {
- AutoGetCollectionForRead ctx(txn, getNS());
-
- stdx::lock_guard<stdx::mutex> sl(_mutex);
- if (!_active) {
- errmsg = "no active migration!";
- return false;
- }
-
- // TODO: fix SERVER-16540 race
- xfer(txn, _ns, ctx.getDb(), &_deleted, b, "deleted", size, false);
- xfer(txn, _ns, ctx.getDb(), &_reload, b, "reload", size, true);
- }
-
- b.append("size", size);
-
- return true;
- }
-
- /**
- * Get the disklocs that belong to the chunk migrated and sort them in _cloneLocs
- * (to avoid seeking disk later).
- *
- * @param maxChunkSize number of bytes beyond which a chunk's base data (no indices)
- * is considered too large to move.
- * @param errmsg filled with textual description of error if this call return false.
- * @return false if approximate chunk size is too big to move or true otherwise.
- */
- bool storeCurrentLocs(OperationContext* txn,
- long long maxChunkSize,
- string& errmsg,
- BSONObjBuilder& result) {
- AutoGetCollectionForRead ctx(txn, getNS());
- Collection* collection = ctx.getCollection();
- if (!collection) {
- errmsg = "ns not found, should be impossible";
- return false;
- }
-
- // Allow multiKey based on the invariant that shard keys must be single-valued.
- // Therefore, any multi-key index prefixed by shard key cannot be multikey over
- // the shard key fields.
- IndexDescriptor* idx =
- collection->getIndexCatalog()->findShardKeyPrefixedIndex(txn,
- _shardKeyPattern,
- false); // requireSingleKey
-
- if (idx == NULL) {
- errmsg = str::stream() << "can't find index with prefix " << _shardKeyPattern
- << " in storeCurrentLocs for " << _ns;
- return false;
- }
-
- // Assume both min and max non-empty, append MinKey's to make them fit chosen index
- BSONObj min;
- BSONObj max;
- KeyPattern kp(idx->keyPattern());
-
- {
- // It's alright not to lock _mutex all the way through based on the assumption
- // that this is only called by the main thread that drives the migration and
- // only it can start and stop the current migration.
- stdx::lock_guard<stdx::mutex> sl(_mutex);
-
- invariant(_deleteNotifyExec.get() == NULL);
- unique_ptr<WorkingSet> ws = stdx::make_unique<WorkingSet>();
- unique_ptr<DeleteNotificationStage> dns = stdx::make_unique<DeleteNotificationStage>();
- // Takes ownership of 'ws' and 'dns'.
- auto statusWithPlanExecutor = PlanExecutor::make(
- txn, std::move(ws), std::move(dns), collection, PlanExecutor::YIELD_MANUAL);
- invariant(statusWithPlanExecutor.isOK());
- _deleteNotifyExec = std::move(statusWithPlanExecutor.getValue());
- _deleteNotifyExec->registerExec();
-
- min = Helpers::toKeyFormat(kp.extendRangeBound(_min, false));
- max = Helpers::toKeyFormat(kp.extendRangeBound(_max, false));
- }
-
- unique_ptr<PlanExecutor> exec(
- InternalPlanner::indexScan(txn, collection, idx, min, max, false));
- // We can afford to yield here because any change to the base data that we might
- // miss is already being queued and will migrate in the 'transferMods' stage.
- exec->setYieldPolicy(PlanExecutor::YIELD_AUTO);
-
- // use the average object size to estimate how many objects a full chunk would carry
- // do that while traversing the chunk's range using the sharding index, below
- // there's a fair amount of slack before we determine a chunk is too large because object
- // sizes will vary
- unsigned long long maxRecsWhenFull;
- long long avgRecSize;
- const long long totalRecs = collection->numRecords(txn);
- if (totalRecs > 0) {
- avgRecSize = collection->dataSize(txn) / totalRecs;
- maxRecsWhenFull = maxChunkSize / avgRecSize;
- maxRecsWhenFull = std::min((unsigned long long)(Chunk::MaxObjectPerChunk + 1),
- 130 * maxRecsWhenFull / 100 /* slack */);
- } else {
- avgRecSize = 0;
- maxRecsWhenFull = Chunk::MaxObjectPerChunk + 1;
- }
-
- // do a full traversal of the chunk and don't stop even if we think it is a large chunk
- // we want the number of records to better report, in that case
- bool isLargeChunk = false;
- unsigned long long recCount = 0;
- ;
- RecordId dl;
- while (PlanExecutor::ADVANCED == exec->getNext(NULL, &dl)) {
- if (!isLargeChunk) {
- stdx::lock_guard<stdx::mutex> lk(_cloneLocsMutex);
- _cloneLocs.insert(dl);
- }
-
- if (++recCount > maxRecsWhenFull) {
- isLargeChunk = true;
- // continue on despite knowing that it will fail,
- // just to get the correct value for recCount.
- }
- }
- exec.reset();
-
- if (isLargeChunk) {
- stdx::lock_guard<stdx::mutex> sl(_mutex);
- warning() << "cannot move chunk: the maximum number of documents for a chunk is "
- << maxRecsWhenFull << " , the maximum chunk size is " << maxChunkSize
- << " , average document size is " << avgRecSize << ". Found " << recCount
- << " documents in chunk "
- << " ns: " << _ns << " " << _min << " -> " << _max << migrateLog;
-
- result.appendBool("chunkTooBig", true);
- result.appendNumber("estimatedChunkSize", (long long)(recCount * avgRecSize));
- errmsg = "chunk too big to move";
- return false;
- }
-
- log() << "moveChunk number of documents: " << cloneLocsRemaining() << migrateLog;
-
- txn->recoveryUnit()->abandonSnapshot();
- return true;
- }
-
- bool clone(OperationContext* txn, string& errmsg, BSONObjBuilder& result) {
- ElapsedTracker tracker(internalQueryExecYieldIterations, internalQueryExecYieldPeriodMS);
-
- int allocSize = 0;
- {
- AutoGetCollectionForRead ctx(txn, getNS());
-
- stdx::lock_guard<stdx::mutex> sl(_mutex);
- if (!_active) {
- errmsg = "not active";
- return false;
- }
-
- Collection* collection = ctx.getCollection();
- if (!collection) {
- errmsg = str::stream() << "collection " << _ns << " does not exist";
- return false;
- }
-
- allocSize = std::min(
- BSONObjMaxUserSize,
- static_cast<int>((12 + collection->averageObjectSize(txn)) * cloneLocsRemaining()));
- }
-
- bool isBufferFilled = false;
- BSONArrayBuilder clonedDocsArrayBuilder(allocSize);
- while (!isBufferFilled) {
- AutoGetCollectionForRead ctx(txn, getNS());
-
- stdx::lock_guard<stdx::mutex> sl(_mutex);
- if (!_active) {
- errmsg = "not active";
- return false;
- }
-
- // TODO: fix SERVER-16540 race
-
- Collection* collection = ctx.getCollection();
-
- if (!collection) {
- errmsg = str::stream() << "collection " << _ns << " does not exist";
- return false;
- }
-
- stdx::lock_guard<stdx::mutex> lk(_cloneLocsMutex);
- set<RecordId>::iterator cloneLocsIter = _cloneLocs.begin();
- for (; cloneLocsIter != _cloneLocs.end(); ++cloneLocsIter) {
- if (tracker.intervalHasElapsed()) // should I yield?
- break;
-
- RecordId dl = *cloneLocsIter;
- Snapshotted<BSONObj> doc;
- if (!collection->findDoc(txn, dl, &doc)) {
- // doc was deleted
- continue;
- }
-
- // Use the builder size instead of accumulating 'doc's size so that we take
- // into consideration the overhead of BSONArray indices, and *always*
- // append one doc.
- if (clonedDocsArrayBuilder.arrSize() != 0 &&
- (clonedDocsArrayBuilder.len() + doc.value().objsize() + 1024) >
- BSONObjMaxUserSize) {
- isBufferFilled = true; // break out of outer while loop
- break;
- }
-
- clonedDocsArrayBuilder.append(doc.value());
- }
-
- _cloneLocs.erase(_cloneLocs.begin(), cloneLocsIter);
-
- // Note: must be holding _cloneLocsMutex, don't move this inside while condition!
- if (_cloneLocs.empty()) {
- break;
- }
- }
-
- result.appendArray("objects", clonedDocsArrayBuilder.arr());
- return true;
- }
-
- void aboutToDelete(const RecordId& dl) {
- // Even though above we call findDoc to check for existance
- // that check only works for non-mmapv1 engines, and this is needed
- // for mmapv1.
-
- stdx::lock_guard<stdx::mutex> lk(_cloneLocsMutex);
- _cloneLocs.erase(dl);
- }
-
- std::size_t cloneLocsRemaining() {
- stdx::lock_guard<stdx::mutex> lk(_cloneLocsMutex);
- return _cloneLocs.size();
- }
-
- long long mbUsed() const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _memoryUsed / (1024 * 1024);
- }
-
- bool getInCriticalSection() const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _inCriticalSection;
- }
-
- void setInCriticalSection(bool b) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _inCriticalSection = b;
- _inCriticalSectionCV.notify_all();
- }
-
- std::string getNS() const {
- stdx::lock_guard<stdx::mutex> sl(_mutex);
- return _ns;
- }
-
- /**
- * @return true if we are NOT in the critical section
- */
- bool waitTillNotInCriticalSection(int maxSecondsToWait) {
- const auto deadline = system_clock::now() + seconds(maxSecondsToWait);
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- while (_inCriticalSection) {
- if (stdx::cv_status::timeout == _inCriticalSectionCV.wait_until(lk, deadline))
- return false;
- }
-
- return true;
- }
-
- bool isActive() const {
- return _getActive();
- }
-
-private:
- bool _getActive() const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _active;
- }
- void _setActive(bool b) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _active = b;
- }
-
- /**
- * Used to commit work for LogOpForSharding. Used to keep track of changes in documents
- * that are part of a chunk being migrated.
- */
- class LogOpForShardingHandler : public RecoveryUnit::Change {
- public:
- /**
- * Invariant: idObj should belong to a document that is part of the active chunk
- * being migrated.
- */
- LogOpForShardingHandler(MigrateFromStatus* migrateFromStatus,
- const BSONObj& idObj,
- const char op)
- : _migrateFromStatus(migrateFromStatus), _idObj(idObj.getOwned()), _op(op) {}
-
- virtual void commit() {
- switch (_op) {
- case 'd': {
- stdx::lock_guard<stdx::mutex> sl(_migrateFromStatus->_mutex);
- _migrateFromStatus->_deleted.push_back(_idObj);
- _migrateFromStatus->_memoryUsed += _idObj.firstElement().size() + 5;
- break;
- }
-
- case 'i':
- case 'u': {
- stdx::lock_guard<stdx::mutex> sl(_migrateFromStatus->_mutex);
- _migrateFromStatus->_reload.push_back(_idObj);
- _migrateFromStatus->_memoryUsed += _idObj.firstElement().size() + 5;
- break;
- }
-
- default:
- invariant(false);
- }
- }
-
- virtual void rollback() {}
-
- private:
- MigrateFromStatus* _migrateFromStatus;
- const BSONObj _idObj;
- const char _op;
- };
-
- /**
- * Used to receive invalidation notifications.
- *
- * XXX: move to the exec/ directory.
- */
- class DeleteNotificationStage : public PlanStage {
- public:
- virtual void invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type);
-
- virtual StageState work(WorkingSetID* out) {
- invariant(false);
- }
- virtual bool isEOF() {
- invariant(false);
- return false;
- }
- virtual void kill() {}
- virtual void saveState() {
- invariant(false);
- }
- virtual void restoreState(OperationContext* opCtx) {
- invariant(false);
- }
- virtual unique_ptr<PlanStageStats> getStats() {
- invariant(false);
- return NULL;
- }
- virtual CommonStats* getCommonStats() const {
- invariant(false);
- return NULL;
- }
- virtual SpecificStats* getSpecificStats() const {
- invariant(false);
- return NULL;
- }
- virtual std::vector<PlanStage*> getChildren() const {
- vector<PlanStage*> empty;
- return empty;
- }
- virtual StageType stageType() const {
- return STAGE_NOTIFY_DELETE;
- }
- };
-
- //
- // All member variables are labeled with one of the following codes indicating the
- // synchronization rules for accessing them.
- //
- // (M) Must hold _mutex for access.
- // (MG) For reads, _mutex *OR* Global IX Lock must be held.
- // For writes, the _mutex *AND* (Global Shared or Exclusive Lock) must be held.
- // (C) Must hold _cloneLocsMutex for access.
- //
- // Locking order:
- //
- // Global Lock -> _mutex -> _cloneLocsMutex
-
- mutable stdx::mutex _mutex;
-
- stdx::condition_variable _inCriticalSectionCV; // (M)
-
- // Is migration currently in critical section. This can be used to block new writes.
- bool _inCriticalSection; // (M)
-
- unique_ptr<PlanExecutor> _deleteNotifyExec; // (M)
-
- // List of _id of documents that were modified that must be re-cloned.
- list<BSONObj> _reload; // (M)
-
- // List of _id of documents that were deleted during clone that should be deleted later.
- list<BSONObj> _deleted; // (M)
-
- // bytes in _reload + _deleted
- long long _memoryUsed; // (M)
-
- // If a migration is currently active.
- bool _active; // (MG)
-
- string _ns; // (MG)
- BSONObj _min; // (MG)
- BSONObj _max; // (MG)
- BSONObj _shardKeyPattern; // (MG)
-
- mutable stdx::mutex _cloneLocsMutex;
-
- // List of record id that needs to be transferred from here to the other side.
- set<RecordId> _cloneLocs; // (C)
-
-} migrateFromStatus;
-
-void MigrateFromStatus::DeleteNotificationStage::invalidate(OperationContext* txn,
- const RecordId& dl,
- InvalidationType type) {
- if (type == INVALIDATION_DELETION) {
- migrateFromStatus.aboutToDelete(dl);
- }
-}
-
-struct MigrateStatusHolder {
- MigrateStatusHolder(OperationContext* txn,
- const std::string& ns,
- const BSONObj& min,
- const BSONObj& max,
- const BSONObj& shardKeyPattern)
- : _txn(txn) {
- _isAnotherMigrationActive = !migrateFromStatus.start(txn, ns, min, max, shardKeyPattern);
- }
- ~MigrateStatusHolder() {
- if (!_isAnotherMigrationActive) {
- migrateFromStatus.done(_txn);
- }
- }
-
- bool isAnotherMigrationActive() const {
- return _isAnotherMigrationActive;
- }
-
-private:
- OperationContext* _txn;
- bool _isAnotherMigrationActive;
-};
-
-void logOpForSharding(OperationContext* txn,
- const char* opstr,
- const char* ns,
- const BSONObj& obj,
- BSONObj* patt,
- bool notInActiveChunk) {
- migrateFromStatus.logOp(txn, opstr, ns, obj, patt, notInActiveChunk);
-}
-
class TransferModsCommand : public ChunkCommandHelper {
public:
void help(std::stringstream& h) const {
@@ -914,8 +182,9 @@ public:
int,
string& errmsg,
BSONObjBuilder& result) {
- return migrateFromStatus.transferMods(txn, errmsg, result);
+ return migrateSourceManager.transferMods(txn, errmsg, result);
}
+
} transferModsCommand;
@@ -938,8 +207,9 @@ public:
int,
string& errmsg,
BSONObjBuilder& result) {
- return migrateFromStatus.clone(txn, errmsg, result);
+ return migrateSourceManager.clone(txn, errmsg, result);
}
+
} initialCloneCommand;
// Tests can pause / resume moveChunk's progress at each step by enabling / disabling each fail
@@ -1153,11 +423,12 @@ public:
log() << "received moveChunk request: " << cmdObj << migrateLog;
timing.done(1);
- MONGO_FP_PAUSE_WHILE(moveChunkHangAtStep1);
+
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET(moveChunkHangAtStep1);
// 2.
- if (migrateFromStatus.isActive()) {
+ if (migrateSourceManager.isActive()) {
errmsg = "migration already in progress";
warning() << errmsg;
return false;
@@ -1257,10 +528,11 @@ public:
log() << "moveChunk request accepted at version " << origShardVersion;
timing.done(2);
- MONGO_FP_PAUSE_WHILE(moveChunkHangAtStep2);
+
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET(moveChunkHangAtStep2);
// 3.
- MigrateStatusHolder statusHolder(txn, ns, min, max, shardKeyPattern);
+ MigrateStatusHolder statusHolder(txn, &migrateSourceManager, ns, min, max, shardKeyPattern);
if (statusHolder.isAnotherMigrationActive()) {
errmsg = "moveChunk is already in progress from this shard";
@@ -1293,7 +565,7 @@ public:
{
// See comment at the top of the function for more information on what
// synchronization is used here.
- if (!migrateFromStatus.storeCurrentLocs(txn, maxChunkSize, errmsg, result)) {
+ if (!migrateSourceManager.storeCurrentLocs(txn, maxChunkSize, errmsg, result)) {
warning() << errmsg;
return false;
}
@@ -1341,8 +613,10 @@ public:
return false;
}
}
+
timing.done(3);
- MONGO_FP_PAUSE_WHILE(moveChunkHangAtStep3);
+
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET(moveChunkHangAtStep3);
// 4.
@@ -1385,7 +659,7 @@ public:
}
LOG(0) << "moveChunk data transfer progress: " << res
- << " my mem used: " << migrateFromStatus.mbUsed() << migrateLog;
+ << " my mem used: " << migrateSourceManager.mbUsed() << migrateLog;
if (!ok || res["state"].String() == "fail") {
warning() << "moveChunk error transferring data caused migration abort: " << res
@@ -1398,7 +672,7 @@ public:
if (res["state"].String() == "steady")
break;
- if (migrateFromStatus.mbUsed() > (500 * 1024 * 1024)) {
+ if (migrateSourceManager.mbUsed() > (500 * 1024 * 1024)) {
// This is too much memory for us to use for this so we're going to abort
// the migrate
ScopedDbConnection conn(toShardCS);
@@ -1422,7 +696,8 @@ public:
}
timing.done(4);
- MONGO_FP_PAUSE_WHILE(moveChunkHangAtStep4);
+
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET(moveChunkHangAtStep4);
// 5.
@@ -1432,7 +707,7 @@ public:
log() << "About to check if it is safe to enter critical section";
// Ensure all cloned docs have actually been transferred
- std::size_t locsRemaining = migrateFromStatus.cloneLocsRemaining();
+ std::size_t locsRemaining = migrateSourceManager.cloneLocsRemaining();
if (locsRemaining != 0) {
errmsg = str::stream() << "moveChunk cannot enter critical section before all data is"
<< " cloned, " << locsRemaining << " locs were not transferred"
@@ -1459,7 +734,7 @@ public:
// 5.a
// we're under the collection lock here, so no other migrate can change maxVersion
// or CollectionMetadata state
- migrateFromStatus.setInCriticalSection(true);
+ migrateSourceManager.setInCriticalSection(true);
ChunkVersion myVersion = origCollVersion;
myVersion.incMajor();
@@ -1600,7 +875,6 @@ public:
log() << "moveChunk updating self version to: " << nextVersion << " through "
<< bumpMin << " -> " << bumpMax << " for collection '" << ns << "'"
<< migrateLog;
-
} else {
log() << "moveChunk moved last chunk out for collection '" << ns << "'"
<< migrateLog;
@@ -1713,7 +987,7 @@ public:
}
}
- migrateFromStatus.setInCriticalSection(false);
+ migrateSourceManager.setInCriticalSection(false);
// 5.d
BSONObjBuilder commitInfo;
@@ -1726,9 +1000,10 @@ public:
txn->getClient()->clientAddress(true), "moveChunk.commit", ns, commitInfo.obj());
}
- migrateFromStatus.done(txn);
+ migrateSourceManager.done(txn);
timing.done(5);
- MONGO_FP_PAUSE_WHILE(moveChunkHangAtStep5);
+
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET(moveChunkHangAtStep5);
// 6.
// NOTE: It is important that the distributed collection lock be held for this step.
@@ -1761,22 +1036,16 @@ public:
log() << "could not queue migration cleanup: " << errMsg;
}
}
+
timing.done(6);
- MONGO_FP_PAUSE_WHILE(moveChunkHangAtStep6);
+
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET(moveChunkHangAtStep6);
return true;
}
} moveChunkCmd;
-bool ShardingState::inCriticalMigrateSection() {
- return migrateFromStatus.getInCriticalSection();
-}
-
-bool ShardingState::waitTillNotInCriticalSection(int maxSecondsToWait) {
- return migrateFromStatus.waitTillNotInCriticalSection(maxSecondsToWait);
-}
-
/* -----
below this are the "to" side commands
@@ -1790,837 +1059,6 @@ bool ShardingState::waitTillNotInCriticalSection(int maxSecondsToWait) {
commend to "commit"
*/
-// Enabling / disabling these fail points pauses / resumes MigrateStatus::_go(), the thread
-// that receives a chunk migration from the donor.
-MONGO_FP_DECLARE(migrateThreadHangAtStep1);
-MONGO_FP_DECLARE(migrateThreadHangAtStep2);
-MONGO_FP_DECLARE(migrateThreadHangAtStep3);
-MONGO_FP_DECLARE(migrateThreadHangAtStep4);
-MONGO_FP_DECLARE(migrateThreadHangAtStep5);
-
-class MigrateStatus {
-public:
- enum State { READY, CLONE, CATCHUP, STEADY, COMMIT_START, DONE, FAIL, ABORT };
-
- MigrateStatus()
- : _active(false),
- _numCloned(0),
- _clonedBytes(0),
- _numCatchup(0),
- _numSteady(0),
- _state(READY) {}
-
- void setState(State newState) {
- stdx::lock_guard<stdx::mutex> sl(_mutex);
- _state = newState;
- }
-
- State getState() const {
- stdx::lock_guard<stdx::mutex> sl(_mutex);
- return _state;
- }
-
- /**
- * Returns OK if preparation was successful.
- */
- Status prepare(const std::string& ns,
- const std::string& fromShard,
- const BSONObj& min,
- const BSONObj& max,
- const BSONObj& shardKeyPattern) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
-
- if (_active) {
- return Status(ErrorCodes::ConflictingOperationInProgress,
- str::stream() << "Active migration already in progress "
- << "ns: " << _ns << ", from: " << _from << ", min: " << _min
- << ", max: " << _max);
- }
-
- _state = READY;
- _errmsg = "";
-
- _ns = ns;
- _from = fromShard;
- _min = min;
- _max = max;
- _shardKeyPattern = shardKeyPattern;
-
- _numCloned = 0;
- _clonedBytes = 0;
- _numCatchup = 0;
- _numSteady = 0;
-
- _active = true;
-
- return Status::OK();
- }
-
- void go(OperationContext* txn,
- const std::string& ns,
- BSONObj min,
- BSONObj max,
- BSONObj shardKeyPattern,
- const std::string& fromShard,
- const OID& epoch,
- const WriteConcernOptions& writeConcern) {
- try {
- _go(txn, ns, min, max, shardKeyPattern, fromShard, epoch, writeConcern);
- } catch (std::exception& e) {
- {
- stdx::lock_guard<stdx::mutex> sl(_mutex);
- _state = FAIL;
- _errmsg = e.what();
- }
-
- error() << "migrate failed: " << e.what() << migrateLog;
- } catch (...) {
- {
- stdx::lock_guard<stdx::mutex> sl(_mutex);
- _state = FAIL;
- _errmsg = "UNKNOWN ERROR";
- }
-
- error() << "migrate failed with unknown exception" << migrateLog;
- }
-
- if (getState() != DONE) {
- // Unprotect the range if needed/possible on unsuccessful TO migration
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock dbLock(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX);
- Lock::CollectionLock collLock(txn->lockState(), ns, MODE_X);
-
- string errMsg;
- if (!ShardingState::get(txn)->forgetPending(txn, ns, min, max, epoch, &errMsg)) {
- warning() << errMsg;
- }
- }
-
- setActive(false);
- }
-
- void _go(OperationContext* txn,
- const std::string& ns,
- BSONObj min,
- BSONObj max,
- BSONObj shardKeyPattern,
- const std::string& fromShard,
- const OID& epoch,
- const WriteConcernOptions& writeConcern) {
- verify(getActive());
- verify(getState() == READY);
- verify(!min.isEmpty());
- verify(!max.isEmpty());
-
- DisableDocumentValidation validationDisabler(txn);
-
- log() << "starting receiving-end of migration of chunk " << min << " -> " << max
- << " for collection " << ns << " from " << fromShard << " at epoch "
- << epoch.toString();
-
- string errmsg;
- MoveTimingHelper timing(txn, "to", ns, min, max, 5 /* steps */, &errmsg, "", "");
-
- ScopedDbConnection conn(fromShard);
- conn->getLastError(); // just test connection
-
- NamespaceString nss(ns);
- {
- // 0. copy system.namespaces entry if collection doesn't already exist
- OldClientWriteContext ctx(txn, ns);
- if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss)) {
- errmsg = str::stream() << "Not primary during migration: " << ns
- << ": checking if collection exists";
- warning() << errmsg;
- setState(FAIL);
- return;
- }
-
- // Only copy if ns doesn't already exist
- Database* db = ctx.db();
- Collection* collection = db->getCollection(ns);
-
- if (!collection) {
- list<BSONObj> infos = conn->getCollectionInfos(
- nsToDatabase(ns), BSON("name" << nsToCollectionSubstring(ns)));
-
- BSONObj options;
- if (infos.size() > 0) {
- BSONObj entry = infos.front();
- if (entry["options"].isABSONObj()) {
- options = entry["options"].Obj();
- }
- }
-
- WriteUnitOfWork wuow(txn);
- Status status = userCreateNS(txn, db, ns, options, false);
- if (!status.isOK()) {
- warning() << "failed to create collection [" << ns << "] "
- << " with options " << options << ": " << status;
- }
- wuow.commit();
- }
- }
-
- {
- // 1. copy indexes
-
- vector<BSONObj> indexSpecs;
- {
- const std::list<BSONObj> indexes = conn->getIndexSpecs(ns);
- indexSpecs.insert(indexSpecs.begin(), indexes.begin(), indexes.end());
- }
-
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock lk(txn->lockState(), nsToDatabaseSubstring(ns), MODE_X);
- OldClientContext ctx(txn, ns);
- if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss)) {
- errmsg = str::stream() << "Not primary during migration: " << ns;
- warning() << errmsg;
- setState(FAIL);
- return;
- }
-
- Database* db = ctx.db();
- Collection* collection = db->getCollection(ns);
- if (!collection) {
- errmsg = str::stream() << "collection dropped during migration: " << ns;
- warning() << errmsg;
- setState(FAIL);
- return;
- }
-
- MultiIndexBlock indexer(txn, collection);
-
- indexer.removeExistingIndexes(&indexSpecs);
-
- if (!indexSpecs.empty()) {
- // Only copy indexes if the collection does not have any documents.
- if (collection->numRecords(txn) > 0) {
- errmsg = str::stream() << "aborting migration, shard is missing "
- << indexSpecs.size() << " indexes and "
- << "collection is not empty. Non-trivial "
- << "index creation should be scheduled manually";
- warning() << errmsg;
- setState(FAIL);
- return;
- }
-
- Status status = indexer.init(indexSpecs);
- if (!status.isOK()) {
- errmsg = str::stream() << "failed to create index before migrating data. "
- << " error: " << status.toString();
- warning() << errmsg;
- setState(FAIL);
- return;
- }
-
- status = indexer.insertAllDocumentsInCollection();
- if (!status.isOK()) {
- errmsg = str::stream() << "failed to create index before migrating data. "
- << " error: " << status.toString();
- warning() << errmsg;
- setState(FAIL);
- return;
- }
-
- WriteUnitOfWork wunit(txn);
- indexer.commit();
-
- for (size_t i = 0; i < indexSpecs.size(); i++) {
- // make sure to create index on secondaries as well
- getGlobalServiceContext()->getOpObserver()->onCreateIndex(
- txn, db->getSystemIndexesName(), indexSpecs[i], true /* fromMigrate */);
- }
-
- wunit.commit();
- }
-
- timing.done(1);
- MONGO_FP_PAUSE_WHILE(migrateThreadHangAtStep1);
- }
-
- {
- // 2. delete any data already in range
- RangeDeleterOptions deleterOptions(
- KeyRange(ns, min.getOwned(), max.getOwned(), shardKeyPattern));
- deleterOptions.writeConcern = writeConcern;
- // No need to wait since all existing cursors will filter out this range when
- // returning the results.
- deleterOptions.waitForOpenCursors = false;
- deleterOptions.fromMigrate = true;
- deleterOptions.onlyRemoveOrphanedDocs = true;
- deleterOptions.removeSaverReason = "preCleanup";
-
- string errMsg;
-
- if (!getDeleter()->deleteNow(txn, deleterOptions, &errMsg)) {
- warning() << "Failed to queue delete for migrate abort: " << errMsg;
- setState(FAIL);
- return;
- }
-
- {
- // Protect the range by noting that we're now starting a migration to it
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock dbLock(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX);
- Lock::CollectionLock collLock(txn->lockState(), ns, MODE_X);
-
- if (!ShardingState::get(txn)->notePending(txn, ns, min, max, epoch, &errmsg)) {
- warning() << errmsg;
- setState(FAIL);
- return;
- }
- }
-
- timing.done(2);
- MONGO_FP_PAUSE_WHILE(migrateThreadHangAtStep2);
- }
-
- State currentState = getState();
- if (currentState == FAIL || currentState == ABORT) {
- string errMsg;
- RangeDeleterOptions deleterOptions(
- KeyRange(ns, min.getOwned(), max.getOwned(), shardKeyPattern));
- deleterOptions.writeConcern = writeConcern;
- // No need to wait since all existing cursors will filter out this range when
- // returning the results.
- deleterOptions.waitForOpenCursors = false;
- deleterOptions.fromMigrate = true;
- deleterOptions.onlyRemoveOrphanedDocs = true;
-
- if (!getDeleter()->queueDelete(txn, deleterOptions, NULL /* notifier */, &errMsg)) {
- warning() << "Failed to queue delete for migrate abort: " << errMsg;
- }
- }
-
- {
- // 3. initial bulk clone
- setState(CLONE);
-
- while (true) {
- BSONObj res;
- if (!conn->runCommand("admin",
- BSON("_migrateClone" << 1),
- res)) { // gets array of objects to copy, in disk order
- setState(FAIL);
- errmsg = "_migrateClone failed: ";
- errmsg += res.toString();
- error() << errmsg << migrateLog;
- conn.done();
- return;
- }
-
- BSONObj arr = res["objects"].Obj();
- int thisTime = 0;
-
- BSONObjIterator i(arr);
- while (i.more()) {
- txn->checkForInterrupt();
-
- if (getState() == ABORT) {
- errmsg = str::stream() << "Migration abort requested while "
- << "copying documents";
- error() << errmsg << migrateLog;
- return;
- }
-
- BSONObj docToClone = i.next().Obj();
- {
- OldClientWriteContext cx(txn, ns);
-
- BSONObj localDoc;
- if (willOverrideLocalId(txn,
- ns,
- min,
- max,
- shardKeyPattern,
- cx.db(),
- docToClone,
- &localDoc)) {
- string errMsg = str::stream() << "cannot migrate chunk, local document "
- << localDoc << " has same _id as cloned "
- << "remote document " << docToClone;
-
- warning() << errMsg;
-
- // Exception will abort migration cleanly
- uasserted(16976, errMsg);
- }
-
- Helpers::upsert(txn, ns, docToClone, true);
- }
- thisTime++;
-
- {
- stdx::lock_guard<stdx::mutex> statsLock(_mutex);
- _numCloned++;
- _clonedBytes += docToClone.objsize();
- }
-
- if (writeConcern.shouldWaitForOtherNodes()) {
- repl::ReplicationCoordinator::StatusAndDuration replStatus =
- repl::getGlobalReplicationCoordinator()->awaitReplication(
- txn,
- repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(),
- writeConcern);
- if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) {
- warning() << "secondaryThrottle on, but doc insert timed out; "
- "continuing";
- } else {
- massertStatusOK(replStatus.status);
- }
- }
- }
-
- if (thisTime == 0)
- break;
- }
-
- timing.done(3);
- MONGO_FP_PAUSE_WHILE(migrateThreadHangAtStep3);
- }
-
- // If running on a replicated system, we'll need to flush the docs we cloned to the
- // secondaries
- repl::OpTime lastOpApplied = repl::ReplClientInfo::forClient(txn->getClient()).getLastOp();
-
- {
- // 4. do bulk of mods
- setState(CATCHUP);
- while (true) {
- BSONObj res;
- if (!conn->runCommand("admin", BSON("_transferMods" << 1), res)) {
- setState(FAIL);
- errmsg = "_transferMods failed: ";
- errmsg += res.toString();
- error() << "_transferMods failed: " << res << migrateLog;
- conn.done();
- return;
- }
- if (res["size"].number() == 0)
- break;
-
- apply(txn, ns, min, max, shardKeyPattern, res, &lastOpApplied);
-
- const int maxIterations = 3600 * 50;
- int i;
- for (i = 0; i < maxIterations; i++) {
- txn->checkForInterrupt();
-
- if (getState() == ABORT) {
- errmsg = str::stream() << "Migration abort requested while waiting "
- << "for replication at catch up stage";
- error() << errmsg << migrateLog;
-
- return;
- }
-
- if (opReplicatedEnough(txn, lastOpApplied, writeConcern))
- break;
-
- if (i > 100) {
- warning() << "secondaries having hard time keeping up with migrate"
- << migrateLog;
- }
-
- sleepmillis(20);
- }
-
- if (i == maxIterations) {
- errmsg = "secondary can't keep up with migrate";
- error() << errmsg << migrateLog;
- conn.done();
- setState(FAIL);
- return;
- }
- }
-
- timing.done(4);
- MONGO_FP_PAUSE_WHILE(migrateThreadHangAtStep4);
- }
-
- {
- // pause to wait for replication
- // this will prevent us from going into critical section until we're ready
- Timer t;
- while (t.minutes() < 600) {
- txn->checkForInterrupt();
-
- if (getState() == ABORT) {
- errmsg = "Migration abort requested while waiting for replication";
- error() << errmsg << migrateLog;
- return;
- }
-
- log() << "Waiting for replication to catch up before entering critical section";
- if (flushPendingWrites(txn, ns, min, max, lastOpApplied, writeConcern))
- break;
- sleepsecs(1);
- }
-
- if (t.minutes() >= 600) {
- setState(FAIL);
- errmsg = "Cannot go to critical section because secondaries cannot keep up";
- error() << errmsg << migrateLog;
- return;
- }
- }
-
- {
- // 5. wait for commit
-
- setState(STEADY);
- bool transferAfterCommit = false;
- while (getState() == STEADY || getState() == COMMIT_START) {
- txn->checkForInterrupt();
-
- // Make sure we do at least one transfer after recv'ing the commit message
- // If we aren't sure that at least one transfer happens *after* our state
- // changes to COMMIT_START, there could be mods still on the FROM shard that
- // got logged *after* our _transferMods but *before* the critical section.
- if (getState() == COMMIT_START)
- transferAfterCommit = true;
-
- BSONObj res;
- if (!conn->runCommand("admin", BSON("_transferMods" << 1), res)) {
- log() << "_transferMods failed in STEADY state: " << res << migrateLog;
- errmsg = res.toString();
- setState(FAIL);
- conn.done();
- return;
- }
-
- if (res["size"].number() > 0 &&
- apply(txn, ns, min, max, shardKeyPattern, res, &lastOpApplied)) {
- continue;
- }
-
- if (getState() == ABORT) {
- return;
- }
-
- // We know we're finished when:
- // 1) The from side has told us that it has locked writes (COMMIT_START)
- // 2) We've checked at least one more time for un-transmitted mods
- if (getState() == COMMIT_START && transferAfterCommit == true) {
- if (flushPendingWrites(txn, ns, min, max, lastOpApplied, writeConcern))
- break;
- }
-
- // Only sleep if we aren't committing
- if (getState() == STEADY)
- sleepmillis(10);
- }
-
- if (getState() == FAIL) {
- errmsg = "timed out waiting for commit";
- return;
- }
-
- timing.done(5);
- MONGO_FP_PAUSE_WHILE(migrateThreadHangAtStep5);
- }
-
- setState(DONE);
- conn.done();
- }
-
- void status(BSONObjBuilder& b) {
- stdx::lock_guard<stdx::mutex> sl(_mutex);
-
- b.appendBool("active", _active);
-
- b.append("ns", _ns);
- b.append("from", _from);
- b.append("min", _min);
- b.append("max", _max);
- b.append("shardKeyPattern", _shardKeyPattern);
-
- b.append("state", stateToString(_state));
- if (_state == FAIL) {
- b.append("errmsg", _errmsg);
- }
-
- BSONObjBuilder bb(b.subobjStart("counts"));
- bb.append("cloned", _numCloned);
- bb.append("clonedBytes", _clonedBytes);
- bb.append("catchup", _numCatchup);
- bb.append("steady", _numSteady);
- bb.done();
- }
-
- bool apply(OperationContext* txn,
- const string& ns,
- BSONObj min,
- BSONObj max,
- BSONObj shardKeyPattern,
- const BSONObj& xfer,
- repl::OpTime* lastOpApplied) {
- repl::OpTime dummy;
- if (lastOpApplied == NULL) {
- lastOpApplied = &dummy;
- }
-
- bool didAnything = false;
-
- if (xfer["deleted"].isABSONObj()) {
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock dlk(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX);
- Helpers::RemoveSaver rs("moveChunk", ns, "removedDuring");
-
- BSONObjIterator i(xfer["deleted"].Obj());
- while (i.more()) {
- Lock::CollectionLock clk(txn->lockState(), ns, MODE_X);
- OldClientContext ctx(txn, ns);
-
- BSONObj id = i.next().Obj();
-
- // do not apply deletes if they do not belong to the chunk being migrated
- BSONObj fullObj;
- if (Helpers::findById(txn, ctx.db(), ns.c_str(), id, fullObj)) {
- if (!isInRange(fullObj, min, max, shardKeyPattern)) {
- log() << "not applying out of range deletion: " << fullObj << migrateLog;
-
- continue;
- }
- }
-
- if (serverGlobalParams.moveParanoia) {
- rs.goingToDelete(fullObj);
- }
-
- deleteObjects(txn,
- ctx.db(),
- ns,
- id,
- PlanExecutor::YIELD_MANUAL,
- true /* justOne */,
- false /* god */,
- true /* fromMigrate */);
-
- *lastOpApplied = repl::ReplClientInfo::forClient(txn->getClient()).getLastOp();
- didAnything = true;
- }
- }
-
- if (xfer["reload"].isABSONObj()) {
- BSONObjIterator i(xfer["reload"].Obj());
- while (i.more()) {
- OldClientWriteContext cx(txn, ns);
-
- BSONObj updatedDoc = i.next().Obj();
-
- BSONObj localDoc;
- if (willOverrideLocalId(
- txn, ns, min, max, shardKeyPattern, cx.db(), updatedDoc, &localDoc)) {
- string errMsg = str::stream()
- << "cannot migrate chunk, local document " << localDoc
- << " has same _id as reloaded remote document " << updatedDoc;
-
- warning() << errMsg;
-
- // Exception will abort migration cleanly
- uasserted(16977, errMsg);
- }
-
- // We are in write lock here, so sure we aren't killing
- Helpers::upsert(txn, ns, updatedDoc, true);
-
- *lastOpApplied = repl::ReplClientInfo::forClient(txn->getClient()).getLastOp();
- didAnything = true;
- }
- }
-
- return didAnything;
- }
-
- /**
- * Checks if an upsert of a remote document will override a local document with the same _id
- * but in a different range on this shard.
- * Must be in WriteContext to avoid races and DBHelper errors.
- * TODO: Could optimize this check out if sharding on _id.
- */
- bool willOverrideLocalId(OperationContext* txn,
- const string& ns,
- BSONObj min,
- BSONObj max,
- BSONObj shardKeyPattern,
- Database* db,
- BSONObj remoteDoc,
- BSONObj* localDoc) {
- *localDoc = BSONObj();
- if (Helpers::findById(txn, db, ns.c_str(), remoteDoc, *localDoc)) {
- return !isInRange(*localDoc, min, max, shardKeyPattern);
- }
-
- return false;
- }
-
- /**
- * Returns true if the majority of the nodes and the nodes corresponding to the given
- * writeConcern (if not empty) have applied till the specified lastOp.
- */
- bool opReplicatedEnough(OperationContext* txn,
- const repl::OpTime& lastOpApplied,
- const WriteConcernOptions& writeConcern) {
- WriteConcernOptions majorityWriteConcern;
- majorityWriteConcern.wTimeout = -1;
- majorityWriteConcern.wMode = WriteConcernOptions::kMajority;
- Status majorityStatus = repl::getGlobalReplicationCoordinator()
- ->awaitReplication(txn, lastOpApplied, majorityWriteConcern)
- .status;
-
- if (!writeConcern.shouldWaitForOtherNodes()) {
- return majorityStatus.isOK();
- }
-
- // Also enforce the user specified write concern after "majority" so it covers
- // the union of the 2 write concerns.
-
- WriteConcernOptions userWriteConcern(writeConcern);
- userWriteConcern.wTimeout = -1;
- Status userStatus = repl::getGlobalReplicationCoordinator()
- ->awaitReplication(txn, lastOpApplied, userWriteConcern)
- .status;
-
- return majorityStatus.isOK() && userStatus.isOK();
- }
-
- bool flushPendingWrites(OperationContext* txn,
- const std::string& ns,
- BSONObj min,
- BSONObj max,
- const repl::OpTime& lastOpApplied,
- const WriteConcernOptions& writeConcern) {
- if (!opReplicatedEnough(txn, lastOpApplied, writeConcern)) {
- repl::OpTime op(lastOpApplied);
- OCCASIONALLY warning() << "migrate commit waiting for a majority of slaves for '" << ns
- << "' " << min << " -> " << max << " waiting for: " << op
- << migrateLog;
- return false;
- }
-
- log() << "migrate commit succeeded flushing to secondaries for '" << ns << "' " << min
- << " -> " << max << migrateLog;
-
- {
- // Get global lock to wait for write to be commited to journal.
- ScopedTransaction transaction(txn, MODE_S);
- Lock::GlobalRead lk(txn->lockState());
-
- // if durability is on, force a write to journal
- if (getDur().commitNow(txn)) {
- log() << "migrate commit flushed to journal for '" << ns << "' " << min << " -> "
- << max << migrateLog;
- }
- }
-
- return true;
- }
-
- static string stateToString(State state) {
- switch (state) {
- case READY:
- return "ready";
- case CLONE:
- return "clone";
- case CATCHUP:
- return "catchup";
- case STEADY:
- return "steady";
- case COMMIT_START:
- return "commitStart";
- case DONE:
- return "done";
- case FAIL:
- return "fail";
- case ABORT:
- return "abort";
- }
- verify(0);
- return "";
- }
-
- bool startCommit() {
- stdx::unique_lock<stdx::mutex> lock(_mutex);
-
- if (_state != STEADY) {
- return false;
- }
-
- const auto deadline = system_clock::now() + seconds(30);
- _state = COMMIT_START;
- while (_active) {
- if (stdx::cv_status::timeout == isActiveCV.wait_until(lock, deadline)) {
- _state = FAIL;
- log() << "startCommit never finished!" << migrateLog;
- return false;
- }
- }
-
- if (_state == DONE) {
- return true;
- }
-
- log() << "startCommit failed, final data failed to transfer" << migrateLog;
- return false;
- }
-
- void abort() {
- stdx::lock_guard<stdx::mutex> sl(_mutex);
- _state = ABORT;
- _errmsg = "aborted";
- }
-
- bool getActive() const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _active;
- }
- void setActive(bool b) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _active = b;
- isActiveCV.notify_all();
- }
-
- // Guards all fields.
- mutable stdx::mutex _mutex;
- bool _active;
- stdx::condition_variable isActiveCV;
-
- std::string _ns;
- std::string _from;
- BSONObj _min;
- BSONObj _max;
- BSONObj _shardKeyPattern;
-
- long long _numCloned;
- long long _clonedBytes;
- long long _numCatchup;
- long long _numSteady;
-
- State _state;
- std::string _errmsg;
-
-} migrateStatus;
-
-void migrateThread(std::string ns,
- BSONObj min,
- BSONObj max,
- BSONObj shardKeyPattern,
- std::string fromShard,
- OID epoch,
- WriteConcernOptions writeConcern) {
- Client::initThread("migrateThread");
- OperationContextImpl txn;
- if (getGlobalAuthorizationManager()->isAuthEnabled()) {
- ShardedConnectionInfo::addHook();
- AuthorizationSession::get(txn.getClient())->grantInternalAuthorization();
- }
-
- migrateStatus.go(&txn, ns, min, max, shardKeyPattern, fromShard, epoch, writeConcern);
-}
-
/**
* Command for initiating the recipient side of the migration to start copying data
* from the donor shard.
@@ -2645,11 +1083,13 @@ public:
void help(std::stringstream& h) const {
h << "internal";
}
+
RecvChunkStartCommand() : ChunkCommandHelper("_recvChunkStart") {}
virtual bool isWriteCommandForConfigServer() const {
return false;
}
+
virtual void addRequiredPrivileges(const std::string& dbname,
const BSONObj& cmdObj,
std::vector<Privilege>* out) {
@@ -2657,6 +1097,7 @@ public:
actions.addAction(ActionType::internal);
out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
}
+
bool run(OperationContext* txn,
const string&,
BSONObj& cmdObj,
@@ -2665,7 +1106,7 @@ public:
BSONObjBuilder& result) {
// Active state of TO-side migrations (MigrateStatus) is serialized by distributed
// collection lock.
- if (migrateStatus.getActive()) {
+ if (migrateDestManager.getActive()) {
errmsg = "migrate already in progress";
return false;
}
@@ -2706,13 +1147,12 @@ public:
BSONObj min = cmdObj["min"].Obj().getOwned();
BSONObj max = cmdObj["max"].Obj().getOwned();
- // Refresh our collection manager from the config server, we need a collection manager
- // to start registering pending chunks.
- // We force the remote refresh here to make the behavior consistent and predictable,
- // generally we'd refresh anyway, and to be paranoid.
+ // Refresh our collection manager from the config server, we need a collection manager to
+ // start registering pending chunks. We force the remote refresh here to make the behavior
+ // consistent and predictable, generally we'd refresh anyway, and to be paranoid.
ChunkVersion currentVersion;
- Status status = ShardingState::get(txn)->refreshMetadataNow(txn, ns, &currentVersion);
+ Status status = ShardingState::get(txn)->refreshMetadataNow(txn, ns, &currentVersion);
if (!status.isOK()) {
errmsg = str::stream() << "cannot start recv'ing chunk "
<< "[" << min << "," << max << ")" << causedBy(status.reason());
@@ -2778,23 +1218,13 @@ public:
const string fromShard(cmdObj["from"].String());
- // Set the TO-side migration to active
- Status prepareStatus = migrateStatus.prepare(ns, fromShard, min, max, shardKeyPattern);
+ Status startStatus = migrateDestManager.start(
+ ns, fromShard, min, max, shardKeyPattern, currentVersion.epoch(), writeConcern);
- if (!prepareStatus.isOK()) {
- return appendCommandStatus(result, prepareStatus);
+ if (!startStatus.isOK()) {
+ return appendCommandStatus(result, startStatus);
}
- stdx::thread m(migrateThread,
- ns,
- min,
- max,
- shardKeyPattern,
- fromShard,
- currentVersion.epoch(),
- writeConcern);
-
- m.detach();
result.appendBool("started", true);
return true;
}
@@ -2820,7 +1250,7 @@ public:
int,
string& errmsg,
BSONObjBuilder& result) {
- migrateStatus.status(result);
+ migrateDestManager.report(result);
return 1;
}
@@ -2845,8 +1275,8 @@ public:
int,
string& errmsg,
BSONObjBuilder& result) {
- bool ok = migrateStatus.startCommit();
- migrateStatus.status(result);
+ bool ok = migrateDestManager.startCommit();
+ migrateDestManager.report(result);
return ok;
}
@@ -2871,39 +1301,28 @@ public:
int,
string& errmsg,
BSONObjBuilder& result) {
- migrateStatus.abort();
- migrateStatus.status(result);
+ migrateDestManager.abort();
+ migrateDestManager.report(result);
return true;
}
-} recvChunkAboortCommand;
+} recvChunkAbortCommand;
+bool ShardingState::inCriticalMigrateSection() {
+ return migrateSourceManager.getInCriticalSection();
+}
-class IsInRangeTest : public StartupTest {
-public:
- void run() {
- BSONObj min = BSON("x" << 1);
- BSONObj max = BSON("x" << 5);
- BSONObj skey = BSON("x" << 1);
-
- verify(!isInRange(BSON("x" << 0), min, max, skey));
- verify(isInRange(BSON("x" << 1), min, max, skey));
- verify(isInRange(BSON("x" << 3), min, max, skey));
- verify(isInRange(BSON("x" << 4), min, max, skey));
- verify(!isInRange(BSON("x" << 5), min, max, skey));
- verify(!isInRange(BSON("x" << 6), min, max, skey));
-
- BSONObj obj = BSON("n" << 3);
- BSONObj min2 = BSON("x" << BSONElementHasher::hash64(obj.firstElement(), 0) - 2);
- BSONObj max2 = BSON("x" << BSONElementHasher::hash64(obj.firstElement(), 0) + 2);
- BSONObj hashedKey = BSON("x"
- << "hashed");
-
- verify(isInRange(BSON("x" << 3), min2, max2, hashedKey));
- verify(!isInRange(BSON("x" << 3), min, max, hashedKey));
- verify(!isInRange(BSON("x" << 4), min2, max2, hashedKey));
-
- LOG(1) << "isInRangeTest passed" << migrateLog;
- }
-} isInRangeTest;
+bool ShardingState::waitTillNotInCriticalSection(int maxSecondsToWait) {
+ return migrateSourceManager.waitTillNotInCriticalSection(maxSecondsToWait);
+}
+
+void logOpForSharding(OperationContext* txn,
+ const char* opstr,
+ const char* ns,
+ const BSONObj& obj,
+ BSONObj* patt,
+ bool notInActiveChunk) {
+ migrateSourceManager.logOp(txn, opstr, ns, obj, patt, notInActiveChunk);
}
+
+} // namespace mongo