diff options
author | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2015-06-20 00:22:50 -0400 |
---|---|---|
committer | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2015-06-20 10:56:02 -0400 |
commit | 9c2ed42daa8fbbef4a919c21ec564e2db55e8d60 (patch) | |
tree | 3814f79c10d7b490948d8cb7b112ac1dd41ceff1 /src/mongo/s/d_migrate.cpp | |
parent | 01965cf52bce6976637ecb8f4a622aeb05ab256a (diff) | |
download | mongo-9c2ed42daa8fbbef4a919c21ec564e2db55e8d60.tar.gz |
SERVER-18579: Clang-Format - reformat code, no comment reflow
Diffstat (limited to 'src/mongo/s/d_migrate.cpp')
-rw-r--r-- | src/mongo/s/d_migrate.cpp | 4627 |
1 files changed, 2290 insertions, 2337 deletions
diff --git a/src/mongo/s/d_migrate.cpp b/src/mongo/s/d_migrate.cpp index 8ed12b4d082..a0893be2524 100644 --- a/src/mongo/s/d_migrate.cpp +++ b/src/mongo/s/d_migrate.cpp @@ -91,653 +91,645 @@ #include "mongo/util/startup_test.h" // Pause while a fail point is enabled. -#define MONGO_FP_PAUSE_WHILE(symbol) while (MONGO_FAIL_POINT(symbol)) { sleepmillis(100); } +#define MONGO_FP_PAUSE_WHILE(symbol) \ + while (MONGO_FAIL_POINT(symbol)) { \ + sleepmillis(100); \ + } namespace mongo { - using namespace std::chrono; - using std::list; - using std::set; - using std::string; - using std::vector; +using namespace std::chrono; +using std::list; +using std::set; +using std::string; +using std::vector; namespace { - const int kDefaultWTimeoutMs = 60 * 1000; - const WriteConcernOptions DefaultWriteConcern(2, WriteConcernOptions::NONE, kDefaultWTimeoutMs); - - /** - * Returns the default write concern for migration cleanup (at donor shard) and - * cloning documents (at recipient shard). - */ - WriteConcernOptions getDefaultWriteConcern() { - repl::ReplicationCoordinator* replCoordinator = repl::getGlobalReplicationCoordinator(); - if (replCoordinator->getReplicationMode() == - mongo::repl::ReplicationCoordinator::modeReplSet) { +const int kDefaultWTimeoutMs = 60 * 1000; +const WriteConcernOptions DefaultWriteConcern(2, WriteConcernOptions::NONE, kDefaultWTimeoutMs); - Status status = - replCoordinator->checkIfWriteConcernCanBeSatisfied(DefaultWriteConcern); - if (status.isOK()) { - return DefaultWriteConcern; - } +/** + * Returns the default write concern for migration cleanup (at donor shard) and + * cloning documents (at recipient shard). + */ +WriteConcernOptions getDefaultWriteConcern() { + repl::ReplicationCoordinator* replCoordinator = repl::getGlobalReplicationCoordinator(); + if (replCoordinator->getReplicationMode() == mongo::repl::ReplicationCoordinator::modeReplSet) { + Status status = replCoordinator->checkIfWriteConcernCanBeSatisfied(DefaultWriteConcern); + if (status.isOK()) { + return DefaultWriteConcern; } - - return WriteConcernOptions(1, WriteConcernOptions::NONE, 0); } -} // namespace - - MONGO_FP_DECLARE(failMigrationCommit); - MONGO_FP_DECLARE(failMigrationConfigWritePrepare); - MONGO_FP_DECLARE(failMigrationApplyOps); - - Tee* migrateLog = RamLog::get("migrate"); + return WriteConcernOptions(1, WriteConcernOptions::NONE, 0); +} - class MoveTimingHelper { - public: - MoveTimingHelper(OperationContext* txn, - const string& where, - const string& ns, - BSONObj min, - BSONObj max , - int total, - string* cmdErrmsg, - string toShard, - string fromShard) - : _txn(txn), - _where(where), - _ns(ns), - _to(toShard), - _from(fromShard), - _next(0), - _total(total), - _cmdErrmsg(cmdErrmsg) { - - _b.append( "min" , min ); - _b.append( "max" , max ); - } - - ~MoveTimingHelper() { - // even if logChange doesn't throw, bson does - // sigh - try { - if ( !_to.empty() ){ - _b.append( "to", _to ); - } - if ( !_from.empty() ){ - _b.append( "from", _from ); - } - if ( _next != _total ) { - _b.append( "note" , "aborted" ); - } - else { - _b.append( "note" , "success" ); - } - if ( !_cmdErrmsg->empty() ) { - _b.append( "errmsg" , *_cmdErrmsg ); - } +} // namespace + +MONGO_FP_DECLARE(failMigrationCommit); +MONGO_FP_DECLARE(failMigrationConfigWritePrepare); +MONGO_FP_DECLARE(failMigrationApplyOps); + +Tee* migrateLog = RamLog::get("migrate"); + +class MoveTimingHelper { +public: + MoveTimingHelper(OperationContext* txn, + const string& where, + const string& ns, + BSONObj min, + BSONObj max, + int total, + string* cmdErrmsg, + string toShard, + string fromShard) + : _txn(txn), + _where(where), + _ns(ns), + _to(toShard), + _from(fromShard), + _next(0), + _total(total), + _cmdErrmsg(cmdErrmsg) { + _b.append("min", min); + _b.append("max", max); + } - grid.catalogManager()->logChange(_txn, - (string)"moveChunk." + _where, - _ns, - _b.obj()); + ~MoveTimingHelper() { + // even if logChange doesn't throw, bson does + // sigh + try { + if (!_to.empty()) { + _b.append("to", _to); } - catch ( const std::exception& e ) { - warning() << "couldn't record timing for moveChunk '" << _where << "': " << e.what() << migrateLog; + if (!_from.empty()) { + _b.append("from", _from); + } + if (_next != _total) { + _b.append("note", "aborted"); + } else { + _b.append("note", "success"); + } + if (!_cmdErrmsg->empty()) { + _b.append("errmsg", *_cmdErrmsg); } - } - void done(int step) { - invariant(step == ++_next); - invariant(step <= _total); + grid.catalogManager()->logChange(_txn, (string) "moveChunk." + _where, _ns, _b.obj()); + } catch (const std::exception& e) { + warning() << "couldn't record timing for moveChunk '" << _where << "': " << e.what() + << migrateLog; + } + } - const string s = str::stream() << "step " << step << " of " << _total; + void done(int step) { + invariant(step == ++_next); + invariant(step <= _total); - CurOp * op = CurOp::get(_txn); - { - std::lock_guard<Client> lk(*_txn->getClient()); - op->setMessage_inlock(s.c_str()); - } + const string s = str::stream() << "step " << step << " of " << _total; - _b.appendNumber(s, _t.millis()); - _t.reset(); + CurOp* op = CurOp::get(_txn); + { + std::lock_guard<Client> lk(*_txn->getClient()); + op->setMessage_inlock(s.c_str()); } - private: - OperationContext* const _txn; - Timer _t; + _b.appendNumber(s, _t.millis()); + _t.reset(); + } - string _where; - string _ns; - string _to; - string _from; +private: + OperationContext* const _txn; + Timer _t; - int _next; - int _total; // expected # of steps + string _where; + string _ns; + string _to; + string _from; - const string* _cmdErrmsg; + int _next; + int _total; // expected # of steps - BSONObjBuilder _b; - }; + const string* _cmdErrmsg; - class ChunkCommandHelper : public Command { - public: - ChunkCommandHelper( const char * name ) - : Command( name ) { - } + BSONObjBuilder _b; +}; - virtual void help( std::stringstream& help ) const { - help << "internal - should not be called directly"; - } - virtual bool slaveOk() const { return false; } - virtual bool adminOnly() const { return true; } - virtual bool isWriteCommandForConfigServer() const { return false; } +class ChunkCommandHelper : public Command { +public: + ChunkCommandHelper(const char* name) : Command(name) {} - }; - - bool isInRange( const BSONObj& obj , - const BSONObj& min , - const BSONObj& max , - const BSONObj& shardKeyPattern ) { - ShardKeyPattern shardKey( shardKeyPattern ); - BSONObj k = shardKey.extractShardKeyFromDoc( obj ); - return k.woCompare( min ) >= 0 && k.woCompare( max ) < 0; + virtual void help(std::stringstream& help) const { + help << "internal - should not be called directly"; } + virtual bool slaveOk() const { + return false; + } + virtual bool adminOnly() const { + return true; + } + virtual bool isWriteCommandForConfigServer() const { + return false; + } +}; + +bool isInRange(const BSONObj& obj, + const BSONObj& min, + const BSONObj& max, + const BSONObj& shardKeyPattern) { + ShardKeyPattern shardKey(shardKeyPattern); + BSONObj k = shardKey.extractShardKeyFromDoc(obj); + return k.woCompare(min) >= 0 && k.woCompare(max) < 0; +} - class MigrateFromStatus { - public: - MigrateFromStatus(): - _inCriticalSection(false), - _memoryUsed(0), - _active(false) { - } - - /** - * @return false if cannot start. One of the reason for not being able to - * start is there is already an existing migration in progress. - */ - bool start(OperationContext* txn, - const std::string& ns, - const BSONObj& min, - const BSONObj& max, - const BSONObj& shardKeyPattern) { - verify(!min.isEmpty()); - verify(!max.isEmpty()); - verify(!ns.empty()); +class MigrateFromStatus { +public: + MigrateFromStatus() : _inCriticalSection(false), _memoryUsed(0), _active(false) {} - // Get global shared to synchronize with logOp. Also see comments in the class - // members declaration for more details. - Lock::GlobalRead globalShared(txn->lockState()); - std::lock_guard<std::mutex> lk(_mutex); + /** + * @return false if cannot start. One of the reason for not being able to + * start is there is already an existing migration in progress. + */ + bool start(OperationContext* txn, + const std::string& ns, + const BSONObj& min, + const BSONObj& max, + const BSONObj& shardKeyPattern) { + verify(!min.isEmpty()); + verify(!max.isEmpty()); + verify(!ns.empty()); + + // Get global shared to synchronize with logOp. Also see comments in the class + // members declaration for more details. + Lock::GlobalRead globalShared(txn->lockState()); + std::lock_guard<std::mutex> lk(_mutex); + + if (_active) { + return false; + } - if (_active) { - return false; - } + _ns = ns; + _min = min; + _max = max; + _shardKeyPattern = shardKeyPattern; - _ns = ns; - _min = min; - _max = max; - _shardKeyPattern = shardKeyPattern; + verify(_deleted.size() == 0); + verify(_reload.size() == 0); + verify(_memoryUsed == 0); - verify(_deleted.size() == 0); - verify(_reload.size() == 0); - verify(_memoryUsed == 0); + _active = true; - _active = true; + std::lock_guard<std::mutex> tLock(_cloneLocsMutex); + verify(_cloneLocs.size() == 0); - std::lock_guard<std::mutex> tLock(_cloneLocsMutex); - verify(_cloneLocs.size() == 0); + return true; + } - return true; - } + void done(OperationContext* txn) { + log() << "MigrateFromStatus::done About to acquire global lock to exit critical " + "section"; - void done(OperationContext* txn) { - log() << "MigrateFromStatus::done About to acquire global lock to exit critical " - "section"; + // Get global shared to synchronize with logOp. Also see comments in the class + // members declaration for more details. + Lock::GlobalRead globalShared(txn->lockState()); + std::lock_guard<std::mutex> lk(_mutex); - // Get global shared to synchronize with logOp. Also see comments in the class - // members declaration for more details. - Lock::GlobalRead globalShared(txn->lockState()); - std::lock_guard<std::mutex> lk(_mutex); + _active = false; + _deleteNotifyExec.reset(NULL); + _inCriticalSection = false; + _inCriticalSectionCV.notify_all(); - _active = false; - _deleteNotifyExec.reset( NULL ); - _inCriticalSection = false; - _inCriticalSectionCV.notify_all(); + _deleted.clear(); + _reload.clear(); + _memoryUsed = 0; - _deleted.clear(); - _reload.clear(); - _memoryUsed = 0; + std::lock_guard<std::mutex> cloneLock(_cloneLocsMutex); + _cloneLocs.clear(); + } - std::lock_guard<std::mutex> cloneLock(_cloneLocsMutex); - _cloneLocs.clear(); + void logOp(OperationContext* txn, + const char* opstr, + const char* ns, + const BSONObj& obj, + BSONObj* patt, + bool notInActiveChunk) { + ensureShardVersionOKOrThrow(txn->getClient(), ns); + + const char op = opstr[0]; + + if (notInActiveChunk) { + // Ignore writes that came from the migration process like cleanup so they + // won't be transferred to the recipient shard. Also ignore ops from + // _migrateClone and _transferMods since it is impossible to move a chunk + // to self. + return; } - void logOp(OperationContext* txn, - const char* opstr, - const char* ns, - const BSONObj& obj, - BSONObj* patt, - bool notInActiveChunk) { - ensureShardVersionOKOrThrow(txn->getClient(), ns); + dassert(txn->lockState()->isWriteLocked()); // Must have Global IX. - const char op = opstr[0]; + if (!_active) + return; - if (notInActiveChunk) { - // Ignore writes that came from the migration process like cleanup so they - // won't be transferred to the recipient shard. Also ignore ops from - // _migrateClone and _transferMods since it is impossible to move a chunk - // to self. - return; - } + if (_ns != ns) + return; - dassert(txn->lockState()->isWriteLocked()); // Must have Global IX. + // no need to log if this is not an insertion, an update, or an actual deletion + // note: opstr 'db' isn't a deletion but a mention that a database exists + // (for replication machinery mostly). + if (op == 'n' || op == 'c' || (op == 'd' && opstr[1] == 'b')) + return; - if (!_active) - return; + BSONElement ide; + if (patt) + ide = patt->getField("_id"); + else + ide = obj["_id"]; - if (_ns != ns) - return; + if (ide.eoo()) { + warning() << "logOpForSharding got mod with no _id, ignoring obj: " << obj + << migrateLog; + return; + } - // no need to log if this is not an insertion, an update, or an actual deletion - // note: opstr 'db' isn't a deletion but a mention that a database exists - // (for replication machinery mostly). - if (op == 'n' || op == 'c' || (op == 'd' && opstr[1] == 'b')) - return; + if (op == 'i' && (!isInRange(obj, _min, _max, _shardKeyPattern))) { + return; + } - BSONElement ide; - if (patt) - ide = patt->getField("_id"); - else - ide = obj["_id"]; + BSONObj idObj(ide.wrap()); - if (ide.eoo()) { - warning() << "logOpForSharding got mod with no _id, ignoring obj: " - << obj << migrateLog; + if (op == 'u') { + BSONObj fullDoc; + OldClientContext ctx(txn, _ns, false); + if (!Helpers::findById(txn, ctx.db(), _ns.c_str(), idObj, fullDoc)) { + warning() << "logOpForSharding couldn't find: " << idObj + << " even though should have" << migrateLog; + dassert(false); // TODO: Abort the migration. return; } - if (op == 'i' && (!isInRange(obj, _min, _max, _shardKeyPattern))) { + if (!isInRange(fullDoc, _min, _max, _shardKeyPattern)) { return; } + } - BSONObj idObj(ide.wrap()); + // Note: can't check if delete is in active chunk since the document is gone! - if (op == 'u') { - BSONObj fullDoc; - OldClientContext ctx(txn, _ns, false); - if (!Helpers::findById(txn, ctx.db(), _ns.c_str(), idObj, fullDoc)) { - warning() << "logOpForSharding couldn't find: " << idObj - << " even though should have" << migrateLog; - dassert(false); // TODO: Abort the migration. - return; - } + txn->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idObj, op)); + } - if (!isInRange(fullDoc, _min, _max, _shardKeyPattern)) { - return; + /** + * Insert items from docIdList to a new array with the given fieldName in the given + * builder. If explode is true, the inserted object will be the full version of the + * document. Note that the whenever an item from the docList is inserted to the array, + * it will also be removed from docList. + * + * Should be holding the collection lock for ns if explode is true. + */ + void xfer(OperationContext* txn, + const string& ns, + Database* db, + list<BSONObj>* docIdList, + BSONObjBuilder& builder, + const char* fieldName, + long long& size, + bool explode) { + const long long maxSize = 1024 * 1024; + + if (docIdList->size() == 0 || size > maxSize) + return; + + BSONArrayBuilder arr(builder.subarrayStart(fieldName)); + + list<BSONObj>::iterator docIdIter = docIdList->begin(); + while (docIdIter != docIdList->end() && size < maxSize) { + BSONObj idDoc = *docIdIter; + if (explode) { + BSONObj fullDoc; + if (Helpers::findById(txn, db, ns.c_str(), idDoc, fullDoc)) { + arr.append(fullDoc); + size += fullDoc.objsize(); } + } else { + arr.append(idDoc); + size += idDoc.objsize(); } - // Note: can't check if delete is in active chunk since the document is gone! - - txn->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idObj, op)); + docIdIter = docIdList->erase(docIdIter); } - /** - * Insert items from docIdList to a new array with the given fieldName in the given - * builder. If explode is true, the inserted object will be the full version of the - * document. Note that the whenever an item from the docList is inserted to the array, - * it will also be removed from docList. - * - * Should be holding the collection lock for ns if explode is true. - */ - void xfer(OperationContext* txn, - const string& ns, - Database* db, - list<BSONObj> *docIdList, - BSONObjBuilder& builder, - const char* fieldName, - long long& size, - bool explode) { - const long long maxSize = 1024 * 1024; - - if (docIdList->size() == 0 || size > maxSize) - return; + arr.done(); + } - BSONArrayBuilder arr(builder.subarrayStart(fieldName)); + /** + * called from the dest of a migrate + * transfers mods from src to dest + */ + bool transferMods(OperationContext* txn, string& errmsg, BSONObjBuilder& b) { + long long size = 0; - list<BSONObj>::iterator docIdIter = docIdList->begin(); - while (docIdIter != docIdList->end() && size < maxSize) { - BSONObj idDoc = *docIdIter; - if (explode) { - BSONObj fullDoc; - if (Helpers::findById(txn, db, ns.c_str(), idDoc, fullDoc)) { - arr.append( fullDoc ); - size += fullDoc.objsize(); - } - } - else { - arr.append(idDoc); - size += idDoc.objsize(); - } + { + AutoGetCollectionForRead ctx(txn, getNS()); - docIdIter = docIdList->erase(docIdIter); + std::lock_guard<std::mutex> sl(_mutex); + if (!_active) { + errmsg = "no active migration!"; + return false; } - arr.done(); + // TODO: fix SERVER-16540 race + xfer(txn, _ns, ctx.getDb(), &_deleted, b, "deleted", size, false); + xfer(txn, _ns, ctx.getDb(), &_reload, b, "reload", size, true); } - /** - * called from the dest of a migrate - * transfers mods from src to dest - */ - bool transferMods(OperationContext* txn, string& errmsg, BSONObjBuilder& b) { - long long size = 0; - - { - AutoGetCollectionForRead ctx(txn, getNS()); - - std::lock_guard<std::mutex> sl(_mutex); - if (!_active) { - errmsg = "no active migration!"; - return false; - } + b.append("size", size); - // TODO: fix SERVER-16540 race - xfer(txn, _ns, ctx.getDb(), &_deleted, b, "deleted", size, false); - xfer(txn, _ns, ctx.getDb(), &_reload, b, "reload", size, true); - } - - b.append( "size" , size ); + return true; + } - return true; + /** + * Get the disklocs that belong to the chunk migrated and sort them in _cloneLocs + * (to avoid seeking disk later). + * + * @param maxChunkSize number of bytes beyond which a chunk's base data (no indices) + * is considered too large to move. + * @param errmsg filled with textual description of error if this call return false. + * @return false if approximate chunk size is too big to move or true otherwise. + */ + bool storeCurrentLocs(OperationContext* txn, + long long maxChunkSize, + string& errmsg, + BSONObjBuilder& result) { + AutoGetCollectionForRead ctx(txn, getNS()); + Collection* collection = ctx.getCollection(); + if (!collection) { + errmsg = "ns not found, should be impossible"; + return false; } - /** - * Get the disklocs that belong to the chunk migrated and sort them in _cloneLocs - * (to avoid seeking disk later). - * - * @param maxChunkSize number of bytes beyond which a chunk's base data (no indices) - * is considered too large to move. - * @param errmsg filled with textual description of error if this call return false. - * @return false if approximate chunk size is too big to move or true otherwise. - */ - bool storeCurrentLocs(OperationContext* txn, - long long maxChunkSize, - string& errmsg, - BSONObjBuilder& result ) { - AutoGetCollectionForRead ctx(txn, getNS()); - Collection* collection = ctx.getCollection(); - if ( !collection ) { - errmsg = "ns not found, should be impossible"; - return false; - } - - // Allow multiKey based on the invariant that shard keys must be single-valued. - // Therefore, any multi-key index prefixed by shard key cannot be multikey over - // the shard key fields. - IndexDescriptor *idx = - collection->getIndexCatalog()->findShardKeyPrefixedIndex(txn, - _shardKeyPattern , - false); // requireSingleKey + // Allow multiKey based on the invariant that shard keys must be single-valued. + // Therefore, any multi-key index prefixed by shard key cannot be multikey over + // the shard key fields. + IndexDescriptor* idx = + collection->getIndexCatalog()->findShardKeyPrefixedIndex(txn, + _shardKeyPattern, + false); // requireSingleKey + + if (idx == NULL) { + errmsg = str::stream() << "can't find index with prefix " << _shardKeyPattern + << " in storeCurrentLocs for " << _ns; + return false; + } - if (idx == NULL) { - errmsg = str::stream() << "can't find index with prefix " << _shardKeyPattern - << " in storeCurrentLocs for " << _ns; - return false; - } + // Assume both min and max non-empty, append MinKey's to make them fit chosen index + BSONObj min; + BSONObj max; + KeyPattern kp(idx->keyPattern()); - // Assume both min and max non-empty, append MinKey's to make them fit chosen index - BSONObj min; - BSONObj max; - KeyPattern kp(idx->keyPattern()); + { + // It's alright not to lock _mutex all the way through based on the assumption + // that this is only called by the main thread that drives the migration and + // only it can start and stop the current migration. + std::lock_guard<std::mutex> sl(_mutex); - { - // It's alright not to lock _mutex all the way through based on the assumption - // that this is only called by the main thread that drives the migration and - // only it can start and stop the current migration. - std::lock_guard<std::mutex> sl(_mutex); + invariant(_deleteNotifyExec.get() == NULL); + WorkingSet* ws = new WorkingSet(); + DeleteNotificationStage* dns = new DeleteNotificationStage(); + PlanExecutor* deleteNotifyExec; + // Takes ownership of 'ws' and 'dns'. + Status execStatus = PlanExecutor::make( + txn, ws, dns, collection, PlanExecutor::YIELD_MANUAL, &deleteNotifyExec); + invariant(execStatus.isOK()); + deleteNotifyExec->registerExec(); + _deleteNotifyExec.reset(deleteNotifyExec); + + min = Helpers::toKeyFormat(kp.extendRangeBound(_min, false)); + max = Helpers::toKeyFormat(kp.extendRangeBound(_max, false)); + } - invariant( _deleteNotifyExec.get() == NULL ); - WorkingSet* ws = new WorkingSet(); - DeleteNotificationStage* dns = new DeleteNotificationStage(); - PlanExecutor* deleteNotifyExec; - // Takes ownership of 'ws' and 'dns'. - Status execStatus = PlanExecutor::make(txn, - ws, - dns, - collection, - PlanExecutor::YIELD_MANUAL, - &deleteNotifyExec); - invariant(execStatus.isOK()); - deleteNotifyExec->registerExec(); - _deleteNotifyExec.reset(deleteNotifyExec); - - min = Helpers::toKeyFormat(kp.extendRangeBound(_min, false)); - max = Helpers::toKeyFormat(kp.extendRangeBound(_max, false)); - } - - std::unique_ptr<PlanExecutor> exec( - InternalPlanner::indexScan(txn, collection, idx, min, max, false)); - // We can afford to yield here because any change to the base data that we might - // miss is already being queued and will migrate in the 'transferMods' stage. - exec->setYieldPolicy(PlanExecutor::YIELD_AUTO); - - // use the average object size to estimate how many objects a full chunk would carry - // do that while traversing the chunk's range using the sharding index, below - // there's a fair amount of slack before we determine a chunk is too large because object sizes will vary - unsigned long long maxRecsWhenFull; - long long avgRecSize; - const long long totalRecs = collection->numRecords(txn); - if ( totalRecs > 0 ) { - avgRecSize = collection->dataSize(txn) / totalRecs; - maxRecsWhenFull = maxChunkSize / avgRecSize; - maxRecsWhenFull = std::min((unsigned long long)(Chunk::MaxObjectPerChunk + 1) , 130 * maxRecsWhenFull / 100 /* slack */ ); - } - else { - avgRecSize = 0; - maxRecsWhenFull = Chunk::MaxObjectPerChunk + 1; - } - - // do a full traversal of the chunk and don't stop even if we think it is a large chunk - // we want the number of records to better report, in that case - bool isLargeChunk = false; - unsigned long long recCount = 0;; - RecordId dl; - while (PlanExecutor::ADVANCED == exec->getNext(NULL, &dl)) { - if ( ! isLargeChunk ) { - std::lock_guard<std::mutex> lk(_cloneLocsMutex); - _cloneLocs.insert( dl ); - } + std::unique_ptr<PlanExecutor> exec( + InternalPlanner::indexScan(txn, collection, idx, min, max, false)); + // We can afford to yield here because any change to the base data that we might + // miss is already being queued and will migrate in the 'transferMods' stage. + exec->setYieldPolicy(PlanExecutor::YIELD_AUTO); + + // use the average object size to estimate how many objects a full chunk would carry + // do that while traversing the chunk's range using the sharding index, below + // there's a fair amount of slack before we determine a chunk is too large because object sizes will vary + unsigned long long maxRecsWhenFull; + long long avgRecSize; + const long long totalRecs = collection->numRecords(txn); + if (totalRecs > 0) { + avgRecSize = collection->dataSize(txn) / totalRecs; + maxRecsWhenFull = maxChunkSize / avgRecSize; + maxRecsWhenFull = std::min((unsigned long long)(Chunk::MaxObjectPerChunk + 1), + 130 * maxRecsWhenFull / 100 /* slack */); + } else { + avgRecSize = 0; + maxRecsWhenFull = Chunk::MaxObjectPerChunk + 1; + } - if ( ++recCount > maxRecsWhenFull ) { - isLargeChunk = true; - // continue on despite knowing that it will fail, - // just to get the correct value for recCount. - } + // do a full traversal of the chunk and don't stop even if we think it is a large chunk + // we want the number of records to better report, in that case + bool isLargeChunk = false; + unsigned long long recCount = 0; + ; + RecordId dl; + while (PlanExecutor::ADVANCED == exec->getNext(NULL, &dl)) { + if (!isLargeChunk) { + std::lock_guard<std::mutex> lk(_cloneLocsMutex); + _cloneLocs.insert(dl); } - exec.reset(); - if ( isLargeChunk ) { - std::lock_guard<std::mutex> sl(_mutex); - warning() << "cannot move chunk: the maximum number of documents for a chunk is " - << maxRecsWhenFull << " , the maximum chunk size is " << maxChunkSize - << " , average document size is " << avgRecSize - << ". Found " << recCount << " documents in chunk " - << " ns: " << _ns << " " - << _min << " -> " << _max << migrateLog; - - result.appendBool( "chunkTooBig" , true ); - result.appendNumber( "estimatedChunkSize" , (long long)(recCount * avgRecSize) ); - errmsg = "chunk too big to move"; - return false; + if (++recCount > maxRecsWhenFull) { + isLargeChunk = true; + // continue on despite knowing that it will fail, + // just to get the correct value for recCount. } + } + exec.reset(); - log() << "moveChunk number of documents: " << cloneLocsRemaining() << migrateLog; - - txn->recoveryUnit()->abandonSnapshot(); - return true; + if (isLargeChunk) { + std::lock_guard<std::mutex> sl(_mutex); + warning() << "cannot move chunk: the maximum number of documents for a chunk is " + << maxRecsWhenFull << " , the maximum chunk size is " << maxChunkSize + << " , average document size is " << avgRecSize << ". Found " << recCount + << " documents in chunk " + << " ns: " << _ns << " " << _min << " -> " << _max << migrateLog; + + result.appendBool("chunkTooBig", true); + result.appendNumber("estimatedChunkSize", (long long)(recCount * avgRecSize)); + errmsg = "chunk too big to move"; + return false; } - bool clone(OperationContext* txn, string& errmsg , BSONObjBuilder& result ) { - ElapsedTracker tracker(internalQueryExecYieldIterations, - internalQueryExecYieldPeriodMS); + log() << "moveChunk number of documents: " << cloneLocsRemaining() << migrateLog; - int allocSize = 0; - { - AutoGetCollectionForRead ctx(txn, getNS()); + txn->recoveryUnit()->abandonSnapshot(); + return true; + } - std::lock_guard<std::mutex> sl(_mutex); - if (!_active) { - errmsg = "not active"; - return false; - } + bool clone(OperationContext* txn, string& errmsg, BSONObjBuilder& result) { + ElapsedTracker tracker(internalQueryExecYieldIterations, internalQueryExecYieldPeriodMS); - Collection* collection = ctx.getCollection(); - if (!collection) { - errmsg = str::stream() << "collection " << _ns << " does not exist"; - return false; - } + int allocSize = 0; + { + AutoGetCollectionForRead ctx(txn, getNS()); - allocSize = - std::min(BSONObjMaxUserSize, - static_cast<int>((12 + collection->averageObjectSize(txn)) * - cloneLocsRemaining())); + std::lock_guard<std::mutex> sl(_mutex); + if (!_active) { + errmsg = "not active"; + return false; } - bool isBufferFilled = false; - BSONArrayBuilder clonedDocsArrayBuilder(allocSize); - while (!isBufferFilled) { - AutoGetCollectionForRead ctx(txn, getNS()); + Collection* collection = ctx.getCollection(); + if (!collection) { + errmsg = str::stream() << "collection " << _ns << " does not exist"; + return false; + } - std::lock_guard<std::mutex> sl(_mutex); - if (!_active) { - errmsg = "not active"; - return false; - } + allocSize = std::min( + BSONObjMaxUserSize, + static_cast<int>((12 + collection->averageObjectSize(txn)) * cloneLocsRemaining())); + } - // TODO: fix SERVER-16540 race + bool isBufferFilled = false; + BSONArrayBuilder clonedDocsArrayBuilder(allocSize); + while (!isBufferFilled) { + AutoGetCollectionForRead ctx(txn, getNS()); - Collection* collection = ctx.getCollection(); + std::lock_guard<std::mutex> sl(_mutex); + if (!_active) { + errmsg = "not active"; + return false; + } - if (!collection) { - errmsg = str::stream() << "collection " << _ns << " does not exist"; - return false; - } + // TODO: fix SERVER-16540 race - std::lock_guard<std::mutex> lk(_cloneLocsMutex); - set<RecordId>::iterator cloneLocsIter = _cloneLocs.begin(); - for ( ; cloneLocsIter != _cloneLocs.end(); ++cloneLocsIter) { - if (tracker.intervalHasElapsed()) // should I yield? - break; + Collection* collection = ctx.getCollection(); - RecordId dl = *cloneLocsIter; - Snapshotted<BSONObj> doc; - if (!collection->findDoc(txn, dl, &doc)) { - // doc was deleted - continue; - } + if (!collection) { + errmsg = str::stream() << "collection " << _ns << " does not exist"; + return false; + } - // Use the builder size instead of accumulating 'doc's size so that we take - // into consideration the overhead of BSONArray indices, and *always* - // append one doc. - if (clonedDocsArrayBuilder.arrSize() != 0 && - (clonedDocsArrayBuilder.len() + doc.value().objsize() + 1024) - > BSONObjMaxUserSize) { - isBufferFilled = true; // break out of outer while loop - break; - } + std::lock_guard<std::mutex> lk(_cloneLocsMutex); + set<RecordId>::iterator cloneLocsIter = _cloneLocs.begin(); + for (; cloneLocsIter != _cloneLocs.end(); ++cloneLocsIter) { + if (tracker.intervalHasElapsed()) // should I yield? + break; - clonedDocsArrayBuilder.append(doc.value()); + RecordId dl = *cloneLocsIter; + Snapshotted<BSONObj> doc; + if (!collection->findDoc(txn, dl, &doc)) { + // doc was deleted + continue; } - _cloneLocs.erase(_cloneLocs.begin(), cloneLocsIter); - - // Note: must be holding _cloneLocsMutex, don't move this inside while condition! - if (_cloneLocs.empty()) { + // Use the builder size instead of accumulating 'doc's size so that we take + // into consideration the overhead of BSONArray indices, and *always* + // append one doc. + if (clonedDocsArrayBuilder.arrSize() != 0 && + (clonedDocsArrayBuilder.len() + doc.value().objsize() + 1024) > + BSONObjMaxUserSize) { + isBufferFilled = true; // break out of outer while loop break; } + + clonedDocsArrayBuilder.append(doc.value()); } - result.appendArray("objects", clonedDocsArrayBuilder.arr()); - return true; + _cloneLocs.erase(_cloneLocs.begin(), cloneLocsIter); + + // Note: must be holding _cloneLocsMutex, don't move this inside while condition! + if (_cloneLocs.empty()) { + break; + } } - void aboutToDelete( const RecordId& dl ) { - // Even though above we call findDoc to check for existance - // that check only works for non-mmapv1 engines, and this is needed - // for mmapv1. + result.appendArray("objects", clonedDocsArrayBuilder.arr()); + return true; + } - std::lock_guard<std::mutex> lk(_cloneLocsMutex); - _cloneLocs.erase( dl ); - } + void aboutToDelete(const RecordId& dl) { + // Even though above we call findDoc to check for existance + // that check only works for non-mmapv1 engines, and this is needed + // for mmapv1. - std::size_t cloneLocsRemaining() { - std::lock_guard<std::mutex> lk(_cloneLocsMutex); - return _cloneLocs.size(); - } + std::lock_guard<std::mutex> lk(_cloneLocsMutex); + _cloneLocs.erase(dl); + } - long long mbUsed() const { - std::lock_guard<std::mutex> lk(_mutex); - return _memoryUsed / ( 1024 * 1024 ); - } + std::size_t cloneLocsRemaining() { + std::lock_guard<std::mutex> lk(_cloneLocsMutex); + return _cloneLocs.size(); + } - bool getInCriticalSection() const { - std::lock_guard<std::mutex> lk(_mutex); - return _inCriticalSection; - } + long long mbUsed() const { + std::lock_guard<std::mutex> lk(_mutex); + return _memoryUsed / (1024 * 1024); + } - void setInCriticalSection( bool b ) { - std::lock_guard<std::mutex> lk(_mutex); - _inCriticalSection = b; - _inCriticalSectionCV.notify_all(); - } + bool getInCriticalSection() const { + std::lock_guard<std::mutex> lk(_mutex); + return _inCriticalSection; + } - std::string getNS() const { - std::lock_guard<std::mutex> sl(_mutex); - return _ns; - } + void setInCriticalSection(bool b) { + std::lock_guard<std::mutex> lk(_mutex); + _inCriticalSection = b; + _inCriticalSectionCV.notify_all(); + } - /** - * @return true if we are NOT in the critical section - */ - bool waitTillNotInCriticalSection( int maxSecondsToWait ) { - const auto deadline = system_clock::now() + seconds(maxSecondsToWait); - std::unique_lock<std::mutex> lk(_mutex); - while (_inCriticalSection) { - if (std::cv_status::timeout == _inCriticalSectionCV.wait_until(lk, deadline)) - return false; - } + std::string getNS() const { + std::lock_guard<std::mutex> sl(_mutex); + return _ns; + } - return true; + /** + * @return true if we are NOT in the critical section + */ + bool waitTillNotInCriticalSection(int maxSecondsToWait) { + const auto deadline = system_clock::now() + seconds(maxSecondsToWait); + std::unique_lock<std::mutex> lk(_mutex); + while (_inCriticalSection) { + if (std::cv_status::timeout == _inCriticalSectionCV.wait_until(lk, deadline)) + return false; } - bool isActive() const { return _getActive(); } + return true; + } - private: - bool _getActive() const { std::lock_guard<std::mutex> lk(_mutex); return _active; } - void _setActive( bool b ) { std::lock_guard<std::mutex> lk(_mutex); _active = b; } + bool isActive() const { + return _getActive(); + } + +private: + bool _getActive() const { + std::lock_guard<std::mutex> lk(_mutex); + return _active; + } + void _setActive(bool b) { + std::lock_guard<std::mutex> lk(_mutex); + _active = b; + } + /** + * Used to commit work for LogOpForSharding. Used to keep track of changes in documents + * that are part of a chunk being migrated. + */ + class LogOpForShardingHandler : public RecoveryUnit::Change { + public: /** - * Used to commit work for LogOpForSharding. Used to keep track of changes in documents - * that are part of a chunk being migrated. + * Invariant: idObj should belong to a document that is part of the active chunk + * being migrated. */ - class LogOpForShardingHandler : public RecoveryUnit::Change { - public: - /** - * Invariant: idObj should belong to a document that is part of the active chunk - * being migrated. - */ - LogOpForShardingHandler(MigrateFromStatus* migrateFromStatus, - const BSONObj& idObj, - const char op): - _migrateFromStatus(migrateFromStatus), - _idObj(idObj.getOwned()), - _op(op) { - } - - virtual void commit() { - switch (_op) { + LogOpForShardingHandler(MigrateFromStatus* migrateFromStatus, + const BSONObj& idObj, + const char op) + : _migrateFromStatus(migrateFromStatus), _idObj(idObj.getOwned()), _op(op) {} + + virtual void commit() { + switch (_op) { case 'd': { std::lock_guard<std::mutex> sl(_migrateFromStatus->_mutex); _migrateFromStatus->_deleted.push_back(_idObj); @@ -746,8 +738,7 @@ namespace { } case 'i': - case 'u': - { + case 'u': { std::lock_guard<std::mutex> sl(_migrateFromStatus->_mutex); _migrateFromStatus->_reload.push_back(_idObj); _migrateFromStatus->_memoryUsed += _idObj.firstElement().size() + 5; @@ -756,2184 +747,2146 @@ namespace { default: invariant(false); - - } } + } - virtual void rollback() { } + virtual void rollback() {} - private: - MigrateFromStatus* _migrateFromStatus; - const BSONObj _idObj; - const char _op; - }; + private: + MigrateFromStatus* _migrateFromStatus; + const BSONObj _idObj; + const char _op; + }; - /** - * Used to receive invalidation notifications. - * - * XXX: move to the exec/ directory. - */ - class DeleteNotificationStage : public PlanStage { - public: - virtual void invalidate(OperationContext* txn, - const RecordId& dl, - InvalidationType type); + /** + * Used to receive invalidation notifications. + * + * XXX: move to the exec/ directory. + */ + class DeleteNotificationStage : public PlanStage { + public: + virtual void invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type); - virtual StageState work(WorkingSetID* out) { - invariant( false ); - } - virtual bool isEOF() { - invariant( false ); - return false; - } - virtual void kill() { - } - virtual void saveState() { - invariant( false ); - } - virtual void restoreState(OperationContext* opCtx) { - invariant( false ); - } - virtual PlanStageStats* getStats() { - invariant( false ); - return NULL; - } - virtual CommonStats* getCommonStats() const { - invariant( false ); - return NULL; - } - virtual SpecificStats* getSpecificStats() const { - invariant( false ); - return NULL; - } - virtual std::vector<PlanStage*> getChildren() const { - vector<PlanStage*> empty; - return empty; - } - virtual StageType stageType() const { - return STAGE_NOTIFY_DELETE; - } - }; + virtual StageState work(WorkingSetID* out) { + invariant(false); + } + virtual bool isEOF() { + invariant(false); + return false; + } + virtual void kill() {} + virtual void saveState() { + invariant(false); + } + virtual void restoreState(OperationContext* opCtx) { + invariant(false); + } + virtual PlanStageStats* getStats() { + invariant(false); + return NULL; + } + virtual CommonStats* getCommonStats() const { + invariant(false); + return NULL; + } + virtual SpecificStats* getSpecificStats() const { + invariant(false); + return NULL; + } + virtual std::vector<PlanStage*> getChildren() const { + vector<PlanStage*> empty; + return empty; + } + virtual StageType stageType() const { + return STAGE_NOTIFY_DELETE; + } + }; - // - // All member variables are labeled with one of the following codes indicating the - // synchronization rules for accessing them. - // - // (M) Must hold _mutex for access. - // (MG) For reads, _mutex *OR* Global IX Lock must be held. - // For writes, the _mutex *AND* (Global Shared or Exclusive Lock) must be held. - // (C) Must hold _cloneLocsMutex for access. - // - // Locking order: - // - // Global Lock -> _mutex -> _cloneLocsMutex + // + // All member variables are labeled with one of the following codes indicating the + // synchronization rules for accessing them. + // + // (M) Must hold _mutex for access. + // (MG) For reads, _mutex *OR* Global IX Lock must be held. + // For writes, the _mutex *AND* (Global Shared or Exclusive Lock) must be held. + // (C) Must hold _cloneLocsMutex for access. + // + // Locking order: + // + // Global Lock -> _mutex -> _cloneLocsMutex - mutable std::mutex _mutex; + mutable std::mutex _mutex; - std::condition_variable _inCriticalSectionCV; // (M) + std::condition_variable _inCriticalSectionCV; // (M) - // Is migration currently in critical section. This can be used to block new writes. - bool _inCriticalSection; // (M) + // Is migration currently in critical section. This can be used to block new writes. + bool _inCriticalSection; // (M) - std::unique_ptr<PlanExecutor> _deleteNotifyExec; // (M) + std::unique_ptr<PlanExecutor> _deleteNotifyExec; // (M) - // List of _id of documents that were modified that must be re-cloned. - list<BSONObj> _reload; // (M) + // List of _id of documents that were modified that must be re-cloned. + list<BSONObj> _reload; // (M) - // List of _id of documents that were deleted during clone that should be deleted later. - list<BSONObj> _deleted; // (M) + // List of _id of documents that were deleted during clone that should be deleted later. + list<BSONObj> _deleted; // (M) - // bytes in _reload + _deleted - long long _memoryUsed; // (M) + // bytes in _reload + _deleted + long long _memoryUsed; // (M) - // If a migration is currently active. - bool _active; // (MG) + // If a migration is currently active. + bool _active; // (MG) - string _ns; // (MG) - BSONObj _min; // (MG) - BSONObj _max; // (MG) - BSONObj _shardKeyPattern; // (MG) + string _ns; // (MG) + BSONObj _min; // (MG) + BSONObj _max; // (MG) + BSONObj _shardKeyPattern; // (MG) - mutable std::mutex _cloneLocsMutex; + mutable std::mutex _cloneLocsMutex; - // List of record id that needs to be transferred from here to the other side. - set<RecordId> _cloneLocs; // (C) + // List of record id that needs to be transferred from here to the other side. + set<RecordId> _cloneLocs; // (C) - } migrateFromStatus; +} migrateFromStatus; - void MigrateFromStatus::DeleteNotificationStage::invalidate(OperationContext *txn, - const RecordId& dl, - InvalidationType type) { - if ( type == INVALIDATION_DELETION ) { - migrateFromStatus.aboutToDelete( dl ); - } +void MigrateFromStatus::DeleteNotificationStage::invalidate(OperationContext* txn, + const RecordId& dl, + InvalidationType type) { + if (type == INVALIDATION_DELETION) { + migrateFromStatus.aboutToDelete(dl); } +} - struct MigrateStatusHolder { - MigrateStatusHolder( OperationContext* txn, - const std::string& ns , - const BSONObj& min , - const BSONObj& max , - const BSONObj& shardKeyPattern ) - : _txn(txn) { - _isAnotherMigrationActive = - !migrateFromStatus.start(txn, ns, min, max, shardKeyPattern); - } - ~MigrateStatusHolder() { - if (!_isAnotherMigrationActive) { - migrateFromStatus.done(_txn); - } +struct MigrateStatusHolder { + MigrateStatusHolder(OperationContext* txn, + const std::string& ns, + const BSONObj& min, + const BSONObj& max, + const BSONObj& shardKeyPattern) + : _txn(txn) { + _isAnotherMigrationActive = !migrateFromStatus.start(txn, ns, min, max, shardKeyPattern); + } + ~MigrateStatusHolder() { + if (!_isAnotherMigrationActive) { + migrateFromStatus.done(_txn); } + } - bool isAnotherMigrationActive() const { - return _isAnotherMigrationActive; - } + bool isAnotherMigrationActive() const { + return _isAnotherMigrationActive; + } - private: - OperationContext* _txn; - bool _isAnotherMigrationActive; - }; +private: + OperationContext* _txn; + bool _isAnotherMigrationActive; +}; + +void logOpForSharding(OperationContext* txn, + const char* opstr, + const char* ns, + const BSONObj& obj, + BSONObj* patt, + bool notInActiveChunk) { + migrateFromStatus.logOp(txn, opstr, ns, obj, patt, notInActiveChunk); +} - void logOpForSharding(OperationContext* txn, - const char * opstr, - const char * ns, - const BSONObj& obj, - BSONObj * patt, - bool notInActiveChunk) { - migrateFromStatus.logOp(txn, opstr, ns, obj, patt, notInActiveChunk); +class TransferModsCommand : public ChunkCommandHelper { +public: + void help(std::stringstream& h) const { + h << "internal"; } + TransferModsCommand() : ChunkCommandHelper("_transferMods") {} + virtual void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) { + ActionSet actions; + actions.addAction(ActionType::internal); + out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); + } + bool run(OperationContext* txn, + const string&, + BSONObj& cmdObj, + int, + string& errmsg, + BSONObjBuilder& result) { + return migrateFromStatus.transferMods(txn, errmsg, result); + } +} transferModsCommand; - class TransferModsCommand : public ChunkCommandHelper { - public: - void help(std::stringstream& h) const { h << "internal"; } - TransferModsCommand() : ChunkCommandHelper( "_transferMods" ) {} - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) { - ActionSet actions; - actions.addAction(ActionType::internal); - out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); - } - bool run(OperationContext* txn, - const string&, - BSONObj& cmdObj, - int, - string& errmsg, - BSONObjBuilder& result) { - return migrateFromStatus.transferMods(txn, errmsg, result); - } - } transferModsCommand; - - - class InitialCloneCommand : public ChunkCommandHelper { - public: - void help(std::stringstream& h) const { h << "internal"; } - InitialCloneCommand() : ChunkCommandHelper( "_migrateClone" ) {} - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) { - ActionSet actions; - actions.addAction(ActionType::internal); - out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); - } - bool run(OperationContext* txn, - const string&, - BSONObj& cmdObj, - int, - string& errmsg, - BSONObjBuilder& result) { - return migrateFromStatus.clone(txn, errmsg, result); - } - } initialCloneCommand; - - // Tests can pause / resume moveChunk's progress at each step by enabling / disabling each fail point. - MONGO_FP_DECLARE(moveChunkHangAtStep1); - MONGO_FP_DECLARE(moveChunkHangAtStep2); - MONGO_FP_DECLARE(moveChunkHangAtStep3); - MONGO_FP_DECLARE(moveChunkHangAtStep4); - MONGO_FP_DECLARE(moveChunkHangAtStep5); - MONGO_FP_DECLARE(moveChunkHangAtStep6); - /** - * this is the main entry for moveChunk - * called to initial a move - * usually by a mongos - * this is called on the "from" side - * - * Format: - * { - * moveChunk: "namespace", - * from: "hostAndPort", - * fromShard: "shardName", - * to: "hostAndPort", - * toShard: "shardName", - * min: {}, - * max: {}, - * maxChunkBytes: numeric, - * configdb: "hostAndPort", - * - * // optional - * secondaryThrottle: bool, //defaults to true. - * writeConcern: {} // applies to individual writes. - * } - */ - class MoveChunkCommand : public Command { - public: - MoveChunkCommand() : Command( "moveChunk" ) {} - virtual void help( std::stringstream& help ) const { - help << "should not be calling this directly"; - } - - virtual bool slaveOk() const { return false; } - virtual bool adminOnly() const { return true; } - virtual bool isWriteCommandForConfigServer() const { return false; } - virtual Status checkAuthForCommand(ClientBasic* client, - const std::string& dbname, - const BSONObj& cmdObj) { - if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource( - ResourcePattern::forExactNamespace(NamespaceString(parseNs(dbname, cmdObj))), - ActionType::moveChunk)) { - return Status(ErrorCodes::Unauthorized, "Unauthorized"); - } - return Status::OK(); - } - virtual std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const { - return parseNsFullyQualified(dbname, cmdObj); - } - - bool run(OperationContext* txn, - const string& dbname, - BSONObj& cmdObj, - int, - string& errmsg, - BSONObjBuilder& result) { - // 1. Parse options - // 2. Make sure my view is complete and lock the distributed lock to ensure shard - // metadata stability. - // 3. Migration - // Retrieve all RecordIds, which need to be migrated in order to do as little seeking - // as possible during transfer. Retrieval of the RecordIds happens under a collection - // lock, but then the collection lock is dropped. This opens up an opportunity for - // repair or compact to invalidate these RecordIds, because these commands do not - // synchronized with migration. Note that data modifications are not a problem, - // because we are registered for change notifications. - // - // 4. pause till migrate caught up - // 5. LOCK - // a) update my config, essentially locking - // b) finish migrate - // c) update config server - // d) logChange to config server - // 6. wait for all current cursors to expire - // 7. remove data locally - - // ------------------------------- - - // 1. - string ns = parseNs(dbname, cmdObj); - - // The shard addresses, redundant, but allows for validation - string toShardHost = cmdObj["to"].str(); - string fromShardHost = cmdObj["from"].str(); - - // The shard names - string toShardName = cmdObj["toShard"].str(); - string fromShardName = cmdObj["fromShard"].str(); - - // Process secondary throttle settings and assign defaults if necessary. - BSONObj secThrottleObj; - WriteConcernOptions writeConcern; - Status status = writeConcern.parseSecondaryThrottle(cmdObj, &secThrottleObj); - - if (!status.isOK()){ - if (status.code() != ErrorCodes::WriteConcernNotDefined) { - warning() << status.toString(); - return appendCommandStatus(result, status); - } +class InitialCloneCommand : public ChunkCommandHelper { +public: + void help(std::stringstream& h) const { + h << "internal"; + } + InitialCloneCommand() : ChunkCommandHelper("_migrateClone") {} + virtual void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) { + ActionSet actions; + actions.addAction(ActionType::internal); + out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); + } + bool run(OperationContext* txn, + const string&, + BSONObj& cmdObj, + int, + string& errmsg, + BSONObjBuilder& result) { + return migrateFromStatus.clone(txn, errmsg, result); + } +} initialCloneCommand; - writeConcern = getDefaultWriteConcern(); - } - else { - repl::ReplicationCoordinator* replCoordinator = - repl::getGlobalReplicationCoordinator(); +// Tests can pause / resume moveChunk's progress at each step by enabling / disabling each fail point. +MONGO_FP_DECLARE(moveChunkHangAtStep1); +MONGO_FP_DECLARE(moveChunkHangAtStep2); +MONGO_FP_DECLARE(moveChunkHangAtStep3); +MONGO_FP_DECLARE(moveChunkHangAtStep4); +MONGO_FP_DECLARE(moveChunkHangAtStep5); +MONGO_FP_DECLARE(moveChunkHangAtStep6); - if (replCoordinator->getReplicationMode() == - repl::ReplicationCoordinator::modeMasterSlave && - writeConcern.shouldWaitForOtherNodes()) { - warning() << "moveChunk cannot check if secondary throttle setting " - << writeConcern.toBSON() - << " can be enforced in a master slave configuration"; - } +/** + * this is the main entry for moveChunk + * called to initial a move + * usually by a mongos + * this is called on the "from" side + * + * Format: + * { + * moveChunk: "namespace", + * from: "hostAndPort", + * fromShard: "shardName", + * to: "hostAndPort", + * toShard: "shardName", + * min: {}, + * max: {}, + * maxChunkBytes: numeric, + * configdb: "hostAndPort", + * + * // optional + * secondaryThrottle: bool, //defaults to true. + * writeConcern: {} // applies to individual writes. + * } + */ +class MoveChunkCommand : public Command { +public: + MoveChunkCommand() : Command("moveChunk") {} + virtual void help(std::stringstream& help) const { + help << "should not be calling this directly"; + } - Status status = replCoordinator->checkIfWriteConcernCanBeSatisfied(writeConcern); - if (!status.isOK() && status != ErrorCodes::NoReplicationEnabled) { - warning() << status.toString(); - return appendCommandStatus(result, status); - } - } + virtual bool slaveOk() const { + return false; + } + virtual bool adminOnly() const { + return true; + } + virtual bool isWriteCommandForConfigServer() const { + return false; + } + virtual Status checkAuthForCommand(ClientBasic* client, + const std::string& dbname, + const BSONObj& cmdObj) { + if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource( + ResourcePattern::forExactNamespace(NamespaceString(parseNs(dbname, cmdObj))), + ActionType::moveChunk)) { + return Status(ErrorCodes::Unauthorized, "Unauthorized"); + } + return Status::OK(); + } + virtual std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const { + return parseNsFullyQualified(dbname, cmdObj); + } - if (writeConcern.shouldWaitForOtherNodes() && - writeConcern.wTimeout == WriteConcernOptions::kNoTimeout) { - // Don't allow no timeout. - writeConcern.wTimeout = kDefaultWTimeoutMs; - } + bool run(OperationContext* txn, + const string& dbname, + BSONObj& cmdObj, + int, + string& errmsg, + BSONObjBuilder& result) { + // 1. Parse options + // 2. Make sure my view is complete and lock the distributed lock to ensure shard + // metadata stability. + // 3. Migration + // Retrieve all RecordIds, which need to be migrated in order to do as little seeking + // as possible during transfer. Retrieval of the RecordIds happens under a collection + // lock, but then the collection lock is dropped. This opens up an opportunity for + // repair or compact to invalidate these RecordIds, because these commands do not + // synchronized with migration. Note that data modifications are not a problem, + // because we are registered for change notifications. + // + // 4. pause till migrate caught up + // 5. LOCK + // a) update my config, essentially locking + // b) finish migrate + // c) update config server + // d) logChange to config server + // 6. wait for all current cursors to expire + // 7. remove data locally - // Do inline deletion - bool waitForDelete = cmdObj["waitForDelete"].trueValue(); - if (waitForDelete) { - log() << "moveChunk waiting for full cleanup after move"; - } + // ------------------------------- - BSONObj min = cmdObj["min"].Obj(); - BSONObj max = cmdObj["max"].Obj(); - BSONElement maxSizeElem = cmdObj["maxChunkSizeBytes"]; + // 1. + string ns = parseNs(dbname, cmdObj); - if ( ns.empty() ) { - errmsg = "need to specify namespace in command"; - return false; - } + // The shard addresses, redundant, but allows for validation + string toShardHost = cmdObj["to"].str(); + string fromShardHost = cmdObj["from"].str(); - if ( toShardName.empty() ) { - errmsg = "need to specify shard to move chunk to"; - return false; - } - if ( fromShardName.empty() ) { - errmsg = "need to specify shard to move chunk from"; - return false; - } + // The shard names + string toShardName = cmdObj["toShard"].str(); + string fromShardName = cmdObj["fromShard"].str(); - if ( min.isEmpty() ) { - errmsg = "need to specify a min"; - return false; - } + // Process secondary throttle settings and assign defaults if necessary. + BSONObj secThrottleObj; + WriteConcernOptions writeConcern; + Status status = writeConcern.parseSecondaryThrottle(cmdObj, &secThrottleObj); - if ( max.isEmpty() ) { - errmsg = "need to specify a max"; - return false; + if (!status.isOK()) { + if (status.code() != ErrorCodes::WriteConcernNotDefined) { + warning() << status.toString(); + return appendCommandStatus(result, status); } - if ( maxSizeElem.eoo() || ! maxSizeElem.isNumber() ) { - errmsg = "need to specify maxChunkSizeBytes"; - return false; - } - const long long maxChunkSize = maxSizeElem.numberLong(); // in bytes + writeConcern = getDefaultWriteConcern(); + } else { + repl::ReplicationCoordinator* replCoordinator = repl::getGlobalReplicationCoordinator(); - // This could be the first call that enables sharding - make sure we initialize the - // sharding state for this shard. - if ( ! shardingState.enabled() ) { - if ( cmdObj["configdb"].type() != String ) { - errmsg = "sharding not enabled"; - warning() << errmsg; - return false; - } - string configdb = cmdObj["configdb"].String(); - ShardingState::initialize(configdb); + if (replCoordinator->getReplicationMode() == + repl::ReplicationCoordinator::modeMasterSlave && + writeConcern.shouldWaitForOtherNodes()) { + warning() << "moveChunk cannot check if secondary throttle setting " + << writeConcern.toBSON() + << " can be enforced in a master slave configuration"; } - // Initialize our current shard name in the shard state if needed - shardingState.gotShardName(fromShardName); - - // Make sure we're as up-to-date as possible with shard information - // This catches the case where we had to previously changed a shard's host by - // removing/adding a shard with the same name - Shard::reloadShardInfo(); - - ConnectionString configLoc = ConnectionString::parse(shardingState.getConfigServer(), - errmsg); - if (!configLoc.isValid()) { - warning() << errmsg; - return false; + Status status = replCoordinator->checkIfWriteConcernCanBeSatisfied(writeConcern); + if (!status.isOK() && status != ErrorCodes::NoReplicationEnabled) { + warning() << status.toString(); + return appendCommandStatus(result, status); } + } - MoveTimingHelper timing(txn, "from" , ns , min , max , 6 /* steps */ , &errmsg, - toShardName, fromShardName ); + if (writeConcern.shouldWaitForOtherNodes() && + writeConcern.wTimeout == WriteConcernOptions::kNoTimeout) { + // Don't allow no timeout. + writeConcern.wTimeout = kDefaultWTimeoutMs; + } - log() << "received moveChunk request: " << cmdObj << migrateLog; + // Do inline deletion + bool waitForDelete = cmdObj["waitForDelete"].trueValue(); + if (waitForDelete) { + log() << "moveChunk waiting for full cleanup after move"; + } - timing.done(1); - MONGO_FP_PAUSE_WHILE(moveChunkHangAtStep1); + BSONObj min = cmdObj["min"].Obj(); + BSONObj max = cmdObj["max"].Obj(); + BSONElement maxSizeElem = cmdObj["maxChunkSizeBytes"]; - // 2. + if (ns.empty()) { + errmsg = "need to specify namespace in command"; + return false; + } - if ( migrateFromStatus.isActive() ) { - errmsg = "migration already in progress"; - warning() << errmsg; - return false; - } + if (toShardName.empty()) { + errmsg = "need to specify shard to move chunk to"; + return false; + } + if (fromShardName.empty()) { + errmsg = "need to specify shard to move chunk from"; + return false; + } - // - // Get the distributed lock - // + if (min.isEmpty()) { + errmsg = "need to specify a min"; + return false; + } - string whyMessage(str::stream() << "migrating chunk [" << minKey << ", " << maxKey - << ") in " << ns); - auto scopedDistLock = grid.catalogManager()->getDistLockManager()->lock( - ns, whyMessage); + if (max.isEmpty()) { + errmsg = "need to specify a max"; + return false; + } - if (!scopedDistLock.isOK()) { - errmsg = stream() << "could not acquire collection lock for " << ns - << " to migrate chunk [" << minKey << "," << maxKey << ")" - << causedBy(scopedDistLock.getStatus()); + if (maxSizeElem.eoo() || !maxSizeElem.isNumber()) { + errmsg = "need to specify maxChunkSizeBytes"; + return false; + } + const long long maxChunkSize = maxSizeElem.numberLong(); // in bytes + // This could be the first call that enables sharding - make sure we initialize the + // sharding state for this shard. + if (!shardingState.enabled()) { + if (cmdObj["configdb"].type() != String) { + errmsg = "sharding not enabled"; warning() << errmsg; return false; } + string configdb = cmdObj["configdb"].String(); + ShardingState::initialize(configdb); + } - BSONObj chunkInfo = BSON("min" << min << "max" << max << - "from" << fromShardName << "to" << toShardName); + // Initialize our current shard name in the shard state if needed + shardingState.gotShardName(fromShardName); - grid.catalogManager()->logChange(txn, "moveChunk.start", ns, chunkInfo); + // Make sure we're as up-to-date as possible with shard information + // This catches the case where we had to previously changed a shard's host by + // removing/adding a shard with the same name + Shard::reloadShardInfo(); - // Always refresh our metadata remotely - ChunkVersion origShardVersion; - Status refreshStatus = shardingState.refreshMetadataNow(txn, ns, &origShardVersion); + ConnectionString configLoc = + ConnectionString::parse(shardingState.getConfigServer(), errmsg); + if (!configLoc.isValid()) { + warning() << errmsg; + return false; + } - if (!refreshStatus.isOK()) { - errmsg = str::stream() << "moveChunk cannot start migrate of chunk " - << "[" << minKey << "," << maxKey << ")" - << causedBy(refreshStatus.reason()); + MoveTimingHelper timing( + txn, "from", ns, min, max, 6 /* steps */, &errmsg, toShardName, fromShardName); - warning() << errmsg; - return false; - } - - if (origShardVersion.majorVersion() == 0) { - // It makes no sense to migrate if our version is zero and we have no chunks - errmsg = str::stream() << "moveChunk cannot start migrate of chunk " - << "[" << minKey << "," << maxKey << ")" - << " with zero shard version"; + log() << "received moveChunk request: " << cmdObj << migrateLog; - warning() << errmsg; - return false; - } + timing.done(1); + MONGO_FP_PAUSE_WHILE(moveChunkHangAtStep1); - // From mongos >= v3.0. - BSONElement epochElem(cmdObj["epoch"]); - if (epochElem.type() == jstOID) { - OID cmdEpoch = epochElem.OID(); + // 2. - if (cmdEpoch != origShardVersion.epoch()) { - errmsg = str::stream() << "moveChunk cannot move chunk " - << "[" << minKey << "," - << maxKey << "), " - << "collection may have been dropped. " - << "current epoch: " << origShardVersion.epoch() - << ", cmd epoch: " << cmdEpoch; - warning() << errmsg; - return false; - } - } + if (migrateFromStatus.isActive()) { + errmsg = "migration already in progress"; + warning() << errmsg; + return false; + } - // Get collection metadata - const CollectionMetadataPtr origCollMetadata(shardingState.getCollectionMetadata(ns)); + // + // Get the distributed lock + // - // With nonzero shard version, we must have metadata - invariant(NULL != origCollMetadata); + string whyMessage(str::stream() << "migrating chunk [" << minKey << ", " << maxKey + << ") in " << ns); + auto scopedDistLock = grid.catalogManager()->getDistLockManager()->lock(ns, whyMessage); - ChunkVersion origCollVersion = origCollMetadata->getCollVersion(); - BSONObj shardKeyPattern = origCollMetadata->getKeyPattern(); + if (!scopedDistLock.isOK()) { + errmsg = stream() << "could not acquire collection lock for " << ns + << " to migrate chunk [" << minKey << "," << maxKey << ")" + << causedBy(scopedDistLock.getStatus()); - // With nonzero shard version, we must have a coll version >= our shard version - invariant(origCollVersion >= origShardVersion); + warning() << errmsg; + return false; + } - // With nonzero shard version, we must have a shard key - invariant(!shardKeyPattern.isEmpty()); + BSONObj chunkInfo = + BSON("min" << min << "max" << max << "from" << fromShardName << "to" << toShardName); - ChunkType origChunk; - if (!origCollMetadata->getNextChunk(min, &origChunk) - || origChunk.getMin().woCompare(min) - || origChunk.getMax().woCompare(max)) { + grid.catalogManager()->logChange(txn, "moveChunk.start", ns, chunkInfo); - // Our boundaries are different from those passed in - errmsg = str::stream() << "moveChunk cannot find chunk " - << "[" << minKey << "," << maxKey << ")" - << " to migrate, the chunk boundaries may be stale"; + // Always refresh our metadata remotely + ChunkVersion origShardVersion; + Status refreshStatus = shardingState.refreshMetadataNow(txn, ns, &origShardVersion); - warning() << errmsg; - return false; - } + if (!refreshStatus.isOK()) { + errmsg = str::stream() << "moveChunk cannot start migrate of chunk " + << "[" << minKey << "," << maxKey << ")" + << causedBy(refreshStatus.reason()); - log() << "moveChunk request accepted at version " << origShardVersion; + warning() << errmsg; + return false; + } - timing.done(2); - MONGO_FP_PAUSE_WHILE(moveChunkHangAtStep2); + if (origShardVersion.majorVersion() == 0) { + // It makes no sense to migrate if our version is zero and we have no chunks + errmsg = str::stream() << "moveChunk cannot start migrate of chunk " + << "[" << minKey << "," << maxKey << ")" + << " with zero shard version"; - // 3. - MigrateStatusHolder statusHolder(txn, ns, min, max, shardKeyPattern); + warning() << errmsg; + return false; + } - if (statusHolder.isAnotherMigrationActive()) { - errmsg = "moveChunk is already in progress from this shard"; + // From mongos >= v3.0. + BSONElement epochElem(cmdObj["epoch"]); + if (epochElem.type() == jstOID) { + OID cmdEpoch = epochElem.OID(); + + if (cmdEpoch != origShardVersion.epoch()) { + errmsg = str::stream() << "moveChunk cannot move chunk " + << "[" << minKey << "," << maxKey << "), " + << "collection may have been dropped. " + << "current epoch: " << origShardVersion.epoch() + << ", cmd epoch: " << cmdEpoch; warning() << errmsg; return false; } + } - ConnectionString fromShardCS; - ConnectionString toShardCS; + // Get collection metadata + const CollectionMetadataPtr origCollMetadata(shardingState.getCollectionMetadata(ns)); - // Resolve the shard connection strings. - { - std::shared_ptr<Shard> fromShard = - grid.shardRegistry()->getShard(fromShardName); - uassert(28674, - str::stream() << "Source shard " << fromShardName - << " is missing. This indicates metadata corruption.", - fromShard); + // With nonzero shard version, we must have metadata + invariant(NULL != origCollMetadata); - fromShardCS = fromShard->getConnString(); + ChunkVersion origCollVersion = origCollMetadata->getCollVersion(); + BSONObj shardKeyPattern = origCollMetadata->getKeyPattern(); - std::shared_ptr<Shard> toShard = grid.shardRegistry()->getShard(toShardName); - uassert(28675, - str::stream() << "Destination shard " << toShardName - << " is missing. This indicates metadata corruption.", - toShard); + // With nonzero shard version, we must have a coll version >= our shard version + invariant(origCollVersion >= origShardVersion); - toShardCS = toShard->getConnString(); - } + // With nonzero shard version, we must have a shard key + invariant(!shardKeyPattern.isEmpty()); - { - // See comment at the top of the function for more information on what - // synchronization is used here. - if (!migrateFromStatus.storeCurrentLocs(txn, maxChunkSize, errmsg, result)) { - warning() << errmsg; - return false; - } + ChunkType origChunk; + if (!origCollMetadata->getNextChunk(min, &origChunk) || origChunk.getMin().woCompare(min) || + origChunk.getMax().woCompare(max)) { + // Our boundaries are different from those passed in + errmsg = str::stream() << "moveChunk cannot find chunk " + << "[" << minKey << "," << maxKey << ")" + << " to migrate, the chunk boundaries may be stale"; - ScopedDbConnection connTo(toShardCS); - BSONObj res; - bool ok; - - const bool isSecondaryThrottle(writeConcern.shouldWaitForOtherNodes()); - - BSONObjBuilder recvChunkStartBuilder; - recvChunkStartBuilder.append("_recvChunkStart", ns); - recvChunkStartBuilder.append("from", fromShardCS.toString()); - recvChunkStartBuilder.append("fromShardName", fromShardName); - recvChunkStartBuilder.append("toShardName", toShardName); - recvChunkStartBuilder.append("min", min); - recvChunkStartBuilder.append("max", max); - recvChunkStartBuilder.append("shardKeyPattern", shardKeyPattern); - recvChunkStartBuilder.append("configServer", shardingState.getConfigServer()); - recvChunkStartBuilder.append("secondaryThrottle", isSecondaryThrottle); - - // Follow the same convention in moveChunk. - if (isSecondaryThrottle && !secThrottleObj.isEmpty()) { - recvChunkStartBuilder.append("writeConcern", secThrottleObj); - } + warning() << errmsg; + return false; + } - try{ - ok = connTo->runCommand("admin", recvChunkStartBuilder.done(), res); - } - catch( DBException& e ){ - errmsg = str::stream() << "moveChunk could not contact to: shard " - << toShardName << " to start transfer" << causedBy( e ); - warning() << errmsg; - return false; - } + log() << "moveChunk request accepted at version " << origShardVersion; - connTo.done(); + timing.done(2); + MONGO_FP_PAUSE_WHILE(moveChunkHangAtStep2); - if ( ! ok ) { - errmsg = "moveChunk failed to engage TO-shard in the data transfer: "; - verify( res["errmsg"].type() ); - errmsg += res["errmsg"].String(); - result.append( "cause" , res ); - warning() << errmsg; - return false; - } + // 3. + MigrateStatusHolder statusHolder(txn, ns, min, max, shardKeyPattern); - } - timing.done( 3 ); - MONGO_FP_PAUSE_WHILE(moveChunkHangAtStep3); + if (statusHolder.isAnotherMigrationActive()) { + errmsg = "moveChunk is already in progress from this shard"; + warning() << errmsg; + return false; + } - // 4. + ConnectionString fromShardCS; + ConnectionString toShardCS; - // Track last result from TO shard for sanity check - BSONObj res; - for ( int i=0; i<86400; i++ ) { // don't want a single chunk move to take more than a day - invariant(!txn->lockState()->isLocked()); + // Resolve the shard connection strings. + { + std::shared_ptr<Shard> fromShard = grid.shardRegistry()->getShard(fromShardName); + uassert(28674, + str::stream() << "Source shard " << fromShardName + << " is missing. This indicates metadata corruption.", + fromShard); - // Exponential sleep backoff, up to 1024ms. Don't sleep much on the first few - // iterations, since we want empty chunk migrations to be fast. - sleepmillis(1 << std::min(i, 10)); + fromShardCS = fromShard->getConnString(); - ScopedDbConnection conn(toShardCS); - bool ok; - res = BSONObj(); - try { - ok = conn->runCommand( "admin" , BSON( "_recvChunkStatus" << 1 ) , res ); - res = res.getOwned(); - } - catch( DBException& e ){ - errmsg = str::stream() << "moveChunk could not contact to: shard " << toShardName << " to monitor transfer" << causedBy( e ); - warning() << errmsg; - return false; - } - - conn.done(); - - if ( res["ns"].str() != ns || - res["from"].str() != fromShardCS.toString() || - !res["min"].isABSONObj() || - res["min"].Obj().woCompare(min) != 0 || - !res["max"].isABSONObj() || - res["max"].Obj().woCompare(max) != 0 ) { - // This can happen when the destination aborted the migration and - // received another recvChunk before this thread sees the transition - // to the abort state. This is currently possible only if multiple migrations - // are happening at once. This is an unfortunate consequence of the shards not - // being able to keep track of multiple incoming and outgoing migrations. - errmsg = str::stream() << "Destination shard aborted migration, " - "now running a new one: " << res; - warning() << errmsg; - return false; - } + std::shared_ptr<Shard> toShard = grid.shardRegistry()->getShard(toShardName); + uassert(28675, + str::stream() << "Destination shard " << toShardName + << " is missing. This indicates metadata corruption.", + toShard); - LOG(0) << "moveChunk data transfer progress: " << res << " my mem used: " << migrateFromStatus.mbUsed() << migrateLog; + toShardCS = toShard->getConnString(); + } - if ( ! ok || res["state"].String() == "fail" ) { - warning() << "moveChunk error transferring data caused migration abort: " << res << migrateLog; - errmsg = "data transfer error"; - result.append( "cause" , res ); - return false; - } + { + // See comment at the top of the function for more information on what + // synchronization is used here. + if (!migrateFromStatus.storeCurrentLocs(txn, maxChunkSize, errmsg, result)) { + warning() << errmsg; + return false; + } - if ( res["state"].String() == "steady" ) - break; + ScopedDbConnection connTo(toShardCS); + BSONObj res; + bool ok; - if ( migrateFromStatus.mbUsed() > (500 * 1024 * 1024) ) { - // This is too much memory for us to use for this so we're going to abort - // the migrate - ScopedDbConnection conn(toShardCS); + const bool isSecondaryThrottle(writeConcern.shouldWaitForOtherNodes()); - BSONObj res; - if (!conn->runCommand("admin", BSON("_recvChunkAbort" << 1), res)) { - warning() << "Error encountered while trying to abort migration on " - << "destination shard" << toShardCS; - } + BSONObjBuilder recvChunkStartBuilder; + recvChunkStartBuilder.append("_recvChunkStart", ns); + recvChunkStartBuilder.append("from", fromShardCS.toString()); + recvChunkStartBuilder.append("fromShardName", fromShardName); + recvChunkStartBuilder.append("toShardName", toShardName); + recvChunkStartBuilder.append("min", min); + recvChunkStartBuilder.append("max", max); + recvChunkStartBuilder.append("shardKeyPattern", shardKeyPattern); + recvChunkStartBuilder.append("configServer", shardingState.getConfigServer()); + recvChunkStartBuilder.append("secondaryThrottle", isSecondaryThrottle); - res = res.getOwned(); - conn.done(); - error() << "aborting migrate because too much memory used res: " << res << migrateLog; - errmsg = "aborting migrate because too much memory used"; - result.appendBool( "split" , true ); - return false; - } + // Follow the same convention in moveChunk. + if (isSecondaryThrottle && !secThrottleObj.isEmpty()) { + recvChunkStartBuilder.append("writeConcern", secThrottleObj); + } - txn->checkForInterrupt(); + try { + ok = connTo->runCommand("admin", recvChunkStartBuilder.done(), res); + } catch (DBException& e) { + errmsg = str::stream() << "moveChunk could not contact to: shard " << toShardName + << " to start transfer" << causedBy(e); + warning() << errmsg; + return false; } - timing.done(4); - MONGO_FP_PAUSE_WHILE(moveChunkHangAtStep4); + connTo.done(); - // 5. + if (!ok) { + errmsg = "moveChunk failed to engage TO-shard in the data transfer: "; + verify(res["errmsg"].type()); + errmsg += res["errmsg"].String(); + result.append("cause", res); + warning() << errmsg; + return false; + } + } + timing.done(3); + MONGO_FP_PAUSE_WHILE(moveChunkHangAtStep3); - // Before we get into the critical section of the migration, let's double check - // that the docs have been cloned, the config servers are reachable, - // and the lock is in place. - log() << "About to check if it is safe to enter critical section"; + // 4. - // Ensure all cloned docs have actually been transferred - std::size_t locsRemaining = migrateFromStatus.cloneLocsRemaining(); - if ( locsRemaining != 0 ) { + // Track last result from TO shard for sanity check + BSONObj res; + for (int i = 0; i < 86400; i++) { // don't want a single chunk move to take more than a day + invariant(!txn->lockState()->isLocked()); - errmsg = - str::stream() << "moveChunk cannot enter critical section before all data is" - << " cloned, " << locsRemaining << " locs were not transferred" - << " but to-shard reported " << res; + // Exponential sleep backoff, up to 1024ms. Don't sleep much on the first few + // iterations, since we want empty chunk migrations to be fast. + sleepmillis(1 << std::min(i, 10)); - // Should never happen, but safe to abort before critical section - error() << errmsg << migrateLog; - dassert( false ); + ScopedDbConnection conn(toShardCS); + bool ok; + res = BSONObj(); + try { + ok = conn->runCommand("admin", BSON("_recvChunkStatus" << 1), res); + res = res.getOwned(); + } catch (DBException& e) { + errmsg = str::stream() << "moveChunk could not contact to: shard " << toShardName + << " to monitor transfer" << causedBy(e); + warning() << errmsg; return false; } - // Ensure distributed lock still held - Status lockStatus = scopedDistLock.getValue().checkStatus(); - if (!lockStatus.isOK()) { - errmsg = str::stream() << "not entering migrate critical section because " - << lockStatus.toString(); + conn.done(); + + if (res["ns"].str() != ns || res["from"].str() != fromShardCS.toString() || + !res["min"].isABSONObj() || res["min"].Obj().woCompare(min) != 0 || + !res["max"].isABSONObj() || res["max"].Obj().woCompare(max) != 0) { + // This can happen when the destination aborted the migration and + // received another recvChunk before this thread sees the transition + // to the abort state. This is currently possible only if multiple migrations + // are happening at once. This is an unfortunate consequence of the shards not + // being able to keep track of multiple incoming and outgoing migrations. + errmsg = str::stream() << "Destination shard aborted migration, " + "now running a new one: " << res; warning() << errmsg; return false; } - log() << "About to enter migrate critical section"; + LOG(0) << "moveChunk data transfer progress: " << res + << " my mem used: " << migrateFromStatus.mbUsed() << migrateLog; - { - // 5.a - // we're under the collection lock here, so no other migrate can change maxVersion - // or CollectionMetadata state - migrateFromStatus.setInCriticalSection( true ); - ChunkVersion myVersion = origCollVersion; - myVersion.incMajor(); + if (!ok || res["state"].String() == "fail") { + warning() << "moveChunk error transferring data caused migration abort: " << res + << migrateLog; + errmsg = "data transfer error"; + result.append("cause", res); + return false; + } - { - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock lk(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX); - Lock::CollectionLock collLock(txn->lockState(), ns, MODE_X); - verify( myVersion > shardingState.getVersion( ns ) ); + if (res["state"].String() == "steady") + break; - // bump the metadata's version up and "forget" about the chunk being moved - // this is not the commit point but in practice the state in this shard won't - // until the commit it done - shardingState.donateChunk(txn, ns, min, max, myVersion); + if (migrateFromStatus.mbUsed() > (500 * 1024 * 1024)) { + // This is too much memory for us to use for this so we're going to abort + // the migrate + ScopedDbConnection conn(toShardCS); + + BSONObj res; + if (!conn->runCommand("admin", BSON("_recvChunkAbort" << 1), res)) { + warning() << "Error encountered while trying to abort migration on " + << "destination shard" << toShardCS; } - log() << "moveChunk setting version to: " << myVersion << migrateLog; + res = res.getOwned(); + conn.done(); + error() << "aborting migrate because too much memory used res: " << res + << migrateLog; + errmsg = "aborting migrate because too much memory used"; + result.appendBool("split", true); + return false; + } - // 5.b - // we're under the collection lock here, too, so we can undo the chunk donation because no other state change - // could be ongoing + txn->checkForInterrupt(); + } - BSONObj res; - bool ok; + timing.done(4); + MONGO_FP_PAUSE_WHILE(moveChunkHangAtStep4); - try { - ScopedDbConnection connTo(toShardCS, 35.0); - ok = connTo->runCommand( "admin", BSON( "_recvChunkCommit" << 1 ), res ); - connTo.done(); - } - catch ( DBException& e ) { - errmsg = str::stream() << "moveChunk could not contact to: shard " - << toShardCS.toString() - << " to commit transfer" << causedBy(e); - warning() << errmsg; - ok = false; - } + // 5. - if ( !ok || MONGO_FAIL_POINT(failMigrationCommit) ) { - log() << "moveChunk migrate commit not accepted by TO-shard: " << res - << " resetting shard version to: " << origShardVersion << migrateLog; - { - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dbLock(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX); - Lock::CollectionLock collLock(txn->lockState(), ns, MODE_X); + // Before we get into the critical section of the migration, let's double check + // that the docs have been cloned, the config servers are reachable, + // and the lock is in place. + log() << "About to check if it is safe to enter critical section"; - log() << "moveChunk collection lock acquired to reset shard version from " - "failed migration" - ; + // Ensure all cloned docs have actually been transferred + std::size_t locsRemaining = migrateFromStatus.cloneLocsRemaining(); + if (locsRemaining != 0) { + errmsg = str::stream() << "moveChunk cannot enter critical section before all data is" + << " cloned, " << locsRemaining << " locs were not transferred" + << " but to-shard reported " << res; - // revert the chunk manager back to the state before "forgetting" about the - // chunk - shardingState.undoDonateChunk(txn, ns, origCollMetadata); - } - log() << "Shard version successfully reset to clean up failed migration" - ; + // Should never happen, but safe to abort before critical section + error() << errmsg << migrateLog; + dassert(false); + return false; + } - errmsg = "_recvChunkCommit failed!"; - result.append( "cause", res ); - return false; - } + // Ensure distributed lock still held + Status lockStatus = scopedDistLock.getValue().checkStatus(); + if (!lockStatus.isOK()) { + errmsg = str::stream() << "not entering migrate critical section because " + << lockStatus.toString(); + warning() << errmsg; + return false; + } - log() << "moveChunk migrate commit accepted by TO-shard: " << res << migrateLog; + log() << "About to enter migrate critical section"; - // 5.c + { + // 5.a + // we're under the collection lock here, so no other migrate can change maxVersion + // or CollectionMetadata state + migrateFromStatus.setInCriticalSection(true); + ChunkVersion myVersion = origCollVersion; + myVersion.incMajor(); - // version at which the next highest lastmod will be set - // if the chunk being moved is the last in the shard, nextVersion is that chunk's lastmod - // otherwise the highest version is from the chunk being bumped on the FROM-shard - ChunkVersion nextVersion; + { + ScopedTransaction transaction(txn, MODE_IX); + Lock::DBLock lk(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX); + Lock::CollectionLock collLock(txn->lockState(), ns, MODE_X); + verify(myVersion > shardingState.getVersion(ns)); - // we want to go only once to the configDB but perhaps change two chunks, the one being migrated and another - // local one (so to bump version for the entire shard) - // we use the 'applyOps' mechanism to group the two updates and make them safer - // TODO pull config update code to a module + // bump the metadata's version up and "forget" about the chunk being moved + // this is not the commit point but in practice the state in this shard won't + // until the commit it done + shardingState.donateChunk(txn, ns, min, max, myVersion); + } - BSONArrayBuilder updates; - { - // update for the chunk being moved - BSONObjBuilder op; - op.append( "op" , "u" ); - op.appendBool( "b" , false /* no upserting */ ); - op.append( "ns" , ChunkType::ConfigNS ); - - BSONObjBuilder n( op.subobjStart( "o" ) ); - n.append(ChunkType::name(), Chunk::genID(ns, min)); - myVersion.addToBSON(n, ChunkType::DEPRECATED_lastmod()); - n.append(ChunkType::ns(), ns); - n.append(ChunkType::min(), min); - n.append(ChunkType::max(), max); - n.append(ChunkType::shard(), toShardName); - n.done(); - - BSONObjBuilder q( op.subobjStart( "o2" ) ); - q.append(ChunkType::name(), Chunk::genID(ns, min)); - q.done(); - - updates.append( op.obj() ); - } + log() << "moveChunk setting version to: " << myVersion << migrateLog; - nextVersion = myVersion; + // 5.b + // we're under the collection lock here, too, so we can undo the chunk donation because no other state change + // could be ongoing - // if we have chunks left on the FROM shard, update the version of one of them as - // well. we can figure that out by grabbing the metadata installed on 5.a + BSONObj res; + bool ok; - const CollectionMetadataPtr bumpedCollMetadata( shardingState.getCollectionMetadata( ns ) ); - if( bumpedCollMetadata->getNumChunks() > 0 ) { + try { + ScopedDbConnection connTo(toShardCS, 35.0); + ok = connTo->runCommand("admin", BSON("_recvChunkCommit" << 1), res); + connTo.done(); + } catch (DBException& e) { + errmsg = str::stream() << "moveChunk could not contact to: shard " + << toShardCS.toString() << " to commit transfer" + << causedBy(e); + warning() << errmsg; + ok = false; + } - // get another chunk on that shard - ChunkType bumpChunk; - bool chunkRes = - bumpedCollMetadata->getNextChunk(bumpedCollMetadata->getMinKey(), - &bumpChunk); - BSONObj bumpMin = bumpChunk.getMin(); - BSONObj bumpMax = bumpChunk.getMax(); + if (!ok || MONGO_FAIL_POINT(failMigrationCommit)) { + log() << "moveChunk migrate commit not accepted by TO-shard: " << res + << " resetting shard version to: " << origShardVersion << migrateLog; + { + ScopedTransaction transaction(txn, MODE_IX); + Lock::DBLock dbLock(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX); + Lock::CollectionLock collLock(txn->lockState(), ns, MODE_X); - (void)chunkRes; // for compile warning on non-debug - dassert(chunkRes); - dassert( bumpMin.woCompare( min ) != 0 ); + log() << "moveChunk collection lock acquired to reset shard version from " + "failed migration"; - BSONObjBuilder op; - op.append( "op" , "u" ); - op.appendBool( "b" , false ); - op.append( "ns" , ChunkType::ConfigNS ); + // revert the chunk manager back to the state before "forgetting" about the + // chunk + shardingState.undoDonateChunk(txn, ns, origCollMetadata); + } + log() << "Shard version successfully reset to clean up failed migration"; - nextVersion.incMinor(); // same as used on donateChunk - BSONObjBuilder n( op.subobjStart( "o" ) ); - n.append(ChunkType::name(), Chunk::genID(ns, bumpMin)); - nextVersion.addToBSON(n, ChunkType::DEPRECATED_lastmod()); - n.append(ChunkType::ns(), ns); - n.append(ChunkType::min(), bumpMin); - n.append(ChunkType::max(), bumpMax); - n.append(ChunkType::shard(), fromShardName); - n.done(); + errmsg = "_recvChunkCommit failed!"; + result.append("cause", res); + return false; + } - BSONObjBuilder q( op.subobjStart( "o2" ) ); - q.append(ChunkType::name(), Chunk::genID(ns, bumpMin)); - q.done(); + log() << "moveChunk migrate commit accepted by TO-shard: " << res << migrateLog; - updates.append( op.obj() ); + // 5.c - log() << "moveChunk updating self version to: " << nextVersion << " through " - << bumpMin << " -> " << bumpMax << " for collection '" << ns << "'" << migrateLog; + // version at which the next highest lastmod will be set + // if the chunk being moved is the last in the shard, nextVersion is that chunk's lastmod + // otherwise the highest version is from the chunk being bumped on the FROM-shard + ChunkVersion nextVersion; - } - else { + // we want to go only once to the configDB but perhaps change two chunks, the one being migrated and another + // local one (so to bump version for the entire shard) + // we use the 'applyOps' mechanism to group the two updates and make them safer + // TODO pull config update code to a module - log() << "moveChunk moved last chunk out for collection '" << ns << "'" << migrateLog; - } + BSONArrayBuilder updates; + { + // update for the chunk being moved + BSONObjBuilder op; + op.append("op", "u"); + op.appendBool("b", false /* no upserting */); + op.append("ns", ChunkType::ConfigNS); + + BSONObjBuilder n(op.subobjStart("o")); + n.append(ChunkType::name(), Chunk::genID(ns, min)); + myVersion.addToBSON(n, ChunkType::DEPRECATED_lastmod()); + n.append(ChunkType::ns(), ns); + n.append(ChunkType::min(), min); + n.append(ChunkType::max(), max); + n.append(ChunkType::shard(), toShardName); + n.done(); + + BSONObjBuilder q(op.subobjStart("o2")); + q.append(ChunkType::name(), Chunk::genID(ns, min)); + q.done(); + + updates.append(op.obj()); + } + + nextVersion = myVersion; + + // if we have chunks left on the FROM shard, update the version of one of them as + // well. we can figure that out by grabbing the metadata installed on 5.a + + const CollectionMetadataPtr bumpedCollMetadata(shardingState.getCollectionMetadata(ns)); + if (bumpedCollMetadata->getNumChunks() > 0) { + // get another chunk on that shard + ChunkType bumpChunk; + bool chunkRes = + bumpedCollMetadata->getNextChunk(bumpedCollMetadata->getMinKey(), &bumpChunk); + BSONObj bumpMin = bumpChunk.getMin(); + BSONObj bumpMax = bumpChunk.getMax(); + + (void)chunkRes; // for compile warning on non-debug + dassert(chunkRes); + dassert(bumpMin.woCompare(min) != 0); + + BSONObjBuilder op; + op.append("op", "u"); + op.appendBool("b", false); + op.append("ns", ChunkType::ConfigNS); + + nextVersion.incMinor(); // same as used on donateChunk + BSONObjBuilder n(op.subobjStart("o")); + n.append(ChunkType::name(), Chunk::genID(ns, bumpMin)); + nextVersion.addToBSON(n, ChunkType::DEPRECATED_lastmod()); + n.append(ChunkType::ns(), ns); + n.append(ChunkType::min(), bumpMin); + n.append(ChunkType::max(), bumpMax); + n.append(ChunkType::shard(), fromShardName); + n.done(); + + BSONObjBuilder q(op.subobjStart("o2")); + q.append(ChunkType::name(), Chunk::genID(ns, bumpMin)); + q.done(); + + updates.append(op.obj()); + + log() << "moveChunk updating self version to: " << nextVersion << " through " + << bumpMin << " -> " << bumpMax << " for collection '" << ns << "'" + << migrateLog; + + } else { + log() << "moveChunk moved last chunk out for collection '" << ns << "'" + << migrateLog; + } - BSONArrayBuilder preCond; + BSONArrayBuilder preCond; + { + BSONObjBuilder b; + b.append("ns", ChunkType::ConfigNS); + b.append("q", + BSON("query" << BSON(ChunkType::ns(ns)) << "orderby" + << BSON(ChunkType::DEPRECATED_lastmod() << -1))); { - BSONObjBuilder b; - b.append("ns", ChunkType::ConfigNS); - b.append("q", BSON("query" << BSON(ChunkType::ns(ns)) << - "orderby" << BSON(ChunkType::DEPRECATED_lastmod() << -1))); - { - BSONObjBuilder bb( b.subobjStart( "res" ) ); - // TODO: For backwards compatibility, we can't yet require an epoch here - bb.appendTimestamp(ChunkType::DEPRECATED_lastmod(), origCollVersion.toLong()); - bb.done(); - } - preCond.append( b.obj() ); + BSONObjBuilder bb(b.subobjStart("res")); + // TODO: For backwards compatibility, we can't yet require an epoch here + bb.appendTimestamp(ChunkType::DEPRECATED_lastmod(), origCollVersion.toLong()); + bb.done(); } + preCond.append(b.obj()); + } - Status applyOpsStatus{Status::OK()}; - try { - - // For testing migration failures - if ( MONGO_FAIL_POINT(failMigrationConfigWritePrepare) ) { - throw DBException( "mock migration failure before config write", - PrepareConfigsFailedCode ); - } + Status applyOpsStatus{Status::OK()}; + try { + // For testing migration failures + if (MONGO_FAIL_POINT(failMigrationConfigWritePrepare)) { + throw DBException("mock migration failure before config write", + PrepareConfigsFailedCode); + } - applyOpsStatus = grid.catalogManager()->applyChunkOpsDeprecated(updates.arr(), - preCond.arr()); + applyOpsStatus = + grid.catalogManager()->applyChunkOpsDeprecated(updates.arr(), preCond.arr()); - if (MONGO_FAIL_POINT(failMigrationApplyOps)) { - throw SocketException(SocketException::RECV_ERROR, - shardingState.getConfigServer()); - } - } - catch (const DBException& ex) { - warning() << ex << migrateLog; - applyOpsStatus = ex.toStatus(); + if (MONGO_FAIL_POINT(failMigrationApplyOps)) { + throw SocketException(SocketException::RECV_ERROR, + shardingState.getConfigServer()); } + } catch (const DBException& ex) { + warning() << ex << migrateLog; + applyOpsStatus = ex.toStatus(); + } - if (applyOpsStatus == ErrorCodes::PrepareConfigsFailedCode) { - - // In the process of issuing the migrate commit, the SyncClusterConnection - // checks that the config servers are reachable. If they are not, we are - // sure that the applyOps command was not sent to any of the configs, so we - // can safely back out of the migration here, by resetting the shard - // version that we bumped up to in the donateChunk() call above. + if (applyOpsStatus == ErrorCodes::PrepareConfigsFailedCode) { + // In the process of issuing the migrate commit, the SyncClusterConnection + // checks that the config servers are reachable. If they are not, we are + // sure that the applyOps command was not sent to any of the configs, so we + // can safely back out of the migration here, by resetting the shard + // version that we bumped up to in the donateChunk() call above. - log() << "About to acquire moveChunk coll lock to reset shard version from " - << "failed migration"; + log() << "About to acquire moveChunk coll lock to reset shard version from " + << "failed migration"; - { - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dbLock(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX); - Lock::CollectionLock collLock(txn->lockState(), ns, MODE_X); + { + ScopedTransaction transaction(txn, MODE_IX); + Lock::DBLock dbLock(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX); + Lock::CollectionLock collLock(txn->lockState(), ns, MODE_X); - // Revert the metadata back to the state before "forgetting" - // about the chunk. - shardingState.undoDonateChunk(txn, ns, origCollMetadata); - } + // Revert the metadata back to the state before "forgetting" + // about the chunk. + shardingState.undoDonateChunk(txn, ns, origCollMetadata); + } - log() << "Shard version successfully reset to clean up failed migration"; + log() << "Shard version successfully reset to clean up failed migration"; - errmsg = "Failed to send migrate commit to configs because " + errmsg; - return false; + errmsg = "Failed to send migrate commit to configs because " + errmsg; + return false; - } - else if (!applyOpsStatus.isOK()) { - - // this could be a blip in the connectivity - // wait out a few seconds and check if the commit request made it - // - // if the commit made it to the config, we'll see the chunk in the new shard and there's no action - // if the commit did not make it, currently the only way to fix this state is to bounce the mongod so - // that the old state (before migrating) be brought in - - warning() << "moveChunk commit outcome ongoing" << migrateLog; - sleepsecs( 10 ); - - // look for the chunk in this shard whose version got bumped - // we assume that if that mod made it to the config, the applyOps was successful - try { - std::vector<ChunkType> newestChunk; - Status status = grid.catalogManager()->getChunks( - Query(BSON(ChunkType::ns(ns))) - .sort(ChunkType::DEPRECATED_lastmod(), -1), - 1, - &newestChunk); - uassertStatusOK(status); - - ChunkVersion checkVersion; - if (!newestChunk.empty()) { - invariant(newestChunk.size() == 1); - checkVersion = newestChunk[0].getVersion(); - } + } else if (!applyOpsStatus.isOK()) { + // this could be a blip in the connectivity + // wait out a few seconds and check if the commit request made it + // + // if the commit made it to the config, we'll see the chunk in the new shard and there's no action + // if the commit did not make it, currently the only way to fix this state is to bounce the mongod so + // that the old state (before migrating) be brought in - if (checkVersion.equals(nextVersion)) { - log() << "moveChunk commit confirmed" << migrateLog; - errmsg.clear(); + warning() << "moveChunk commit outcome ongoing" << migrateLog; + sleepsecs(10); - } - else { - error() << "moveChunk commit failed: version is at " - << checkVersion << " instead of " << nextVersion << migrateLog; - error() << "TERMINATING" << migrateLog; - dbexit( EXIT_SHARDING_ERROR ); - } + // look for the chunk in this shard whose version got bumped + // we assume that if that mod made it to the config, the applyOps was successful + try { + std::vector<ChunkType> newestChunk; + Status status = grid.catalogManager()->getChunks( + Query(BSON(ChunkType::ns(ns))).sort(ChunkType::DEPRECATED_lastmod(), -1), + 1, + &newestChunk); + uassertStatusOK(status); + + ChunkVersion checkVersion; + if (!newestChunk.empty()) { + invariant(newestChunk.size() == 1); + checkVersion = newestChunk[0].getVersion(); } - catch ( ... ) { - error() << "moveChunk failed to get confirmation of commit" << migrateLog; + + if (checkVersion.equals(nextVersion)) { + log() << "moveChunk commit confirmed" << migrateLog; + errmsg.clear(); + + } else { + error() << "moveChunk commit failed: version is at " << checkVersion + << " instead of " << nextVersion << migrateLog; error() << "TERMINATING" << migrateLog; - dbexit( EXIT_SHARDING_ERROR ); + dbexit(EXIT_SHARDING_ERROR); } + } catch (...) { + error() << "moveChunk failed to get confirmation of commit" << migrateLog; + error() << "TERMINATING" << migrateLog; + dbexit(EXIT_SHARDING_ERROR); } + } - migrateFromStatus.setInCriticalSection( false ); - - // 5.d - BSONObjBuilder commitInfo; - commitInfo.appendElements( chunkInfo ); - if (res["counts"].type() == Object) { - commitInfo.appendElements(res["counts"].Obj()); - } + migrateFromStatus.setInCriticalSection(false); - grid.catalogManager()->logChange(txn, "moveChunk.commit", ns, commitInfo.obj()); + // 5.d + BSONObjBuilder commitInfo; + commitInfo.appendElements(chunkInfo); + if (res["counts"].type() == Object) { + commitInfo.appendElements(res["counts"].Obj()); } - migrateFromStatus.done(txn); - timing.done(5); - MONGO_FP_PAUSE_WHILE(moveChunkHangAtStep5); - - // 6. - // NOTE: It is important that the distributed collection lock be held for this step. - RangeDeleter* deleter = getDeleter(); - RangeDeleterOptions deleterOptions(KeyRange(ns, - min.getOwned(), - max.getOwned(), - shardKeyPattern)); - deleterOptions.writeConcern = writeConcern; - deleterOptions.waitForOpenCursors = true; - deleterOptions.fromMigrate = true; - deleterOptions.onlyRemoveOrphanedDocs = true; - deleterOptions.removeSaverReason = "post-cleanup"; - - if (waitForDelete) { - log() << "doing delete inline for cleanup of chunk data" << migrateLog; - - string errMsg; - // This is an immediate delete, and as a consequence, there could be more - // deletes happening simultaneously than there are deleter worker threads. - if (!deleter->deleteNow(txn, - deleterOptions, - &errMsg)) { - log() << "Error occured while performing cleanup: " << errMsg; - } - } - else { - log() << "forking for cleanup of chunk data" << migrateLog; + grid.catalogManager()->logChange(txn, "moveChunk.commit", ns, commitInfo.obj()); + } - string errMsg; - if (!deleter->queueDelete(txn, - deleterOptions, - NULL, // Don't want to be notified. - &errMsg)) { - log() << "could not queue migration cleanup: " << errMsg; - } + migrateFromStatus.done(txn); + timing.done(5); + MONGO_FP_PAUSE_WHILE(moveChunkHangAtStep5); + + // 6. + // NOTE: It is important that the distributed collection lock be held for this step. + RangeDeleter* deleter = getDeleter(); + RangeDeleterOptions deleterOptions( + KeyRange(ns, min.getOwned(), max.getOwned(), shardKeyPattern)); + deleterOptions.writeConcern = writeConcern; + deleterOptions.waitForOpenCursors = true; + deleterOptions.fromMigrate = true; + deleterOptions.onlyRemoveOrphanedDocs = true; + deleterOptions.removeSaverReason = "post-cleanup"; + + if (waitForDelete) { + log() << "doing delete inline for cleanup of chunk data" << migrateLog; + + string errMsg; + // This is an immediate delete, and as a consequence, there could be more + // deletes happening simultaneously than there are deleter worker threads. + if (!deleter->deleteNow(txn, deleterOptions, &errMsg)) { + log() << "Error occured while performing cleanup: " << errMsg; + } + } else { + log() << "forking for cleanup of chunk data" << migrateLog; + + string errMsg; + if (!deleter->queueDelete(txn, + deleterOptions, + NULL, // Don't want to be notified. + &errMsg)) { + log() << "could not queue migration cleanup: " << errMsg; } - timing.done(6); - MONGO_FP_PAUSE_WHILE(moveChunkHangAtStep6); + } + timing.done(6); + MONGO_FP_PAUSE_WHILE(moveChunkHangAtStep6); - return true; + return true; + } - } +} moveChunkCmd; + +bool ShardingState::inCriticalMigrateSection() { + return migrateFromStatus.getInCriticalSection(); +} - } moveChunkCmd; +bool ShardingState::waitTillNotInCriticalSection(int maxSecondsToWait) { + return migrateFromStatus.waitTillNotInCriticalSection(maxSecondsToWait); +} - bool ShardingState::inCriticalMigrateSection() { - return migrateFromStatus.getInCriticalSection(); +/* ----- + below this are the "to" side commands + + command to initiate + worker thread + does initial clone + pulls initial change set + keeps pulling + keeps state + command to get state + commend to "commit" +*/ + +// Enabling / disabling these fail points pauses / resumes MigrateStatus::_go(), the thread +// that receives a chunk migration from the donor. +MONGO_FP_DECLARE(migrateThreadHangAtStep1); +MONGO_FP_DECLARE(migrateThreadHangAtStep2); +MONGO_FP_DECLARE(migrateThreadHangAtStep3); +MONGO_FP_DECLARE(migrateThreadHangAtStep4); +MONGO_FP_DECLARE(migrateThreadHangAtStep5); + +class MigrateStatus { +public: + enum State { READY, CLONE, CATCHUP, STEADY, COMMIT_START, DONE, FAIL, ABORT }; + + MigrateStatus() + : _active(false), + _numCloned(0), + _clonedBytes(0), + _numCatchup(0), + _numSteady(0), + _state(READY) {} + + void setState(State newState) { + std::lock_guard<std::mutex> sl(_mutex); + _state = newState; } - bool ShardingState::waitTillNotInCriticalSection( int maxSecondsToWait ) { - return migrateFromStatus.waitTillNotInCriticalSection( maxSecondsToWait ); + State getState() const { + std::lock_guard<std::mutex> sl(_mutex); + return _state; } - /* ----- - below this are the "to" side commands + /** + * Returns OK if preparation was successful. + */ + Status prepare(const std::string& ns, + const std::string& fromShard, + const BSONObj& min, + const BSONObj& max, + const BSONObj& shardKeyPattern) { + std::lock_guard<std::mutex> lk(_mutex); + + if (_active) { + return Status(ErrorCodes::ConflictingOperationInProgress, + str::stream() << "Active migration already in progress " + << "ns: " << _ns << ", from: " << _from << ", min: " << _min + << ", max: " << _max); + } - command to initiate - worker thread - does initial clone - pulls initial change set - keeps pulling - keeps state - command to get state - commend to "commit" - */ + _state = READY; + _errmsg = ""; - // Enabling / disabling these fail points pauses / resumes MigrateStatus::_go(), the thread - // that receives a chunk migration from the donor. - MONGO_FP_DECLARE(migrateThreadHangAtStep1); - MONGO_FP_DECLARE(migrateThreadHangAtStep2); - MONGO_FP_DECLARE(migrateThreadHangAtStep3); - MONGO_FP_DECLARE(migrateThreadHangAtStep4); - MONGO_FP_DECLARE(migrateThreadHangAtStep5); + _ns = ns; + _from = fromShard; + _min = min; + _max = max; + _shardKeyPattern = shardKeyPattern; - class MigrateStatus { - public: - enum State { - READY, - CLONE, - CATCHUP, - STEADY, - COMMIT_START, - DONE, - FAIL, - ABORT - }; - - MigrateStatus(): - _active(false), - _numCloned(0), - _clonedBytes(0), - _numCatchup(0), - _numSteady(0), - _state(READY) { - } - - void setState(State newState) { - std::lock_guard<std::mutex> sl(_mutex); - _state = newState; - } + _numCloned = 0; + _clonedBytes = 0; + _numCatchup = 0; + _numSteady = 0; - State getState() const { - std::lock_guard<std::mutex> sl(_mutex); - return _state; - } + _active = true; - /** - * Returns OK if preparation was successful. - */ - Status prepare(const std::string& ns, - const std::string& fromShard, - const BSONObj& min, - const BSONObj& max, - const BSONObj& shardKeyPattern) { - std::lock_guard<std::mutex> lk(_mutex); - - if (_active) { - return Status(ErrorCodes::ConflictingOperationInProgress, - str::stream() << "Active migration already in progress " - << "ns: " << _ns - << ", from: " << _from - << ", min: " << _min - << ", max: " << _max); - } - - _state = READY; - _errmsg = ""; - - _ns = ns; - _from = fromShard; - _min = min; - _max = max; - _shardKeyPattern = shardKeyPattern; - - _numCloned = 0; - _clonedBytes = 0; - _numCatchup = 0; - _numSteady = 0; - - _active = true; - - return Status::OK(); - } - - void go(OperationContext* txn, - const std::string& ns, - BSONObj min, - BSONObj max, - BSONObj shardKeyPattern, - const std::string& fromShard, - const OID& epoch, - const WriteConcernOptions& writeConcern) { - try { - _go(txn, ns, min, max, shardKeyPattern, fromShard, epoch, writeConcern); - } - catch ( std::exception& e ) { - { - std::lock_guard<std::mutex> sl(_mutex); - _state = FAIL; - _errmsg = e.what(); - } + return Status::OK(); + } - error() << "migrate failed: " << e.what() << migrateLog; + void go(OperationContext* txn, + const std::string& ns, + BSONObj min, + BSONObj max, + BSONObj shardKeyPattern, + const std::string& fromShard, + const OID& epoch, + const WriteConcernOptions& writeConcern) { + try { + _go(txn, ns, min, max, shardKeyPattern, fromShard, epoch, writeConcern); + } catch (std::exception& e) { + { + std::lock_guard<std::mutex> sl(_mutex); + _state = FAIL; + _errmsg = e.what(); } - catch ( ... ) { - { - std::lock_guard<std::mutex> sl(_mutex); - _state = FAIL; - _errmsg = "UNKNOWN ERROR"; - } - error() << "migrate failed with unknown exception" << migrateLog; + error() << "migrate failed: " << e.what() << migrateLog; + } catch (...) { + { + std::lock_guard<std::mutex> sl(_mutex); + _state = FAIL; + _errmsg = "UNKNOWN ERROR"; } - if ( getState() != DONE ) { - // Unprotect the range if needed/possible on unsuccessful TO migration - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dbLock(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX); - Lock::CollectionLock collLock(txn->lockState(), ns, MODE_X); + error() << "migrate failed with unknown exception" << migrateLog; + } - string errMsg; - if (!shardingState.forgetPending(txn, ns, min, max, epoch, &errMsg)) { - warning() << errMsg; - } - } + if (getState() != DONE) { + // Unprotect the range if needed/possible on unsuccessful TO migration + ScopedTransaction transaction(txn, MODE_IX); + Lock::DBLock dbLock(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX); + Lock::CollectionLock collLock(txn->lockState(), ns, MODE_X); - setActive( false ); + string errMsg; + if (!shardingState.forgetPending(txn, ns, min, max, epoch, &errMsg)) { + warning() << errMsg; + } } - void _go(OperationContext* txn, - const std::string& ns, - BSONObj min, - BSONObj max, - BSONObj shardKeyPattern, - const std::string& fromShard, - const OID& epoch, - const WriteConcernOptions& writeConcern) { - verify( getActive() ); - verify( getState() == READY ); - verify( ! min.isEmpty() ); - verify( ! max.isEmpty() ); + setActive(false); + } - DisableDocumentValidation validationDisabler(txn); + void _go(OperationContext* txn, + const std::string& ns, + BSONObj min, + BSONObj max, + BSONObj shardKeyPattern, + const std::string& fromShard, + const OID& epoch, + const WriteConcernOptions& writeConcern) { + verify(getActive()); + verify(getState() == READY); + verify(!min.isEmpty()); + verify(!max.isEmpty()); + + DisableDocumentValidation validationDisabler(txn); + + log() << "starting receiving-end of migration of chunk " << min << " -> " << max + << " for collection " << ns << " from " << fromShard << " at epoch " + << epoch.toString(); + + string errmsg; + MoveTimingHelper timing(txn, "to", ns, min, max, 5 /* steps */, &errmsg, "", ""); + + ScopedDbConnection conn(fromShard); + conn->getLastError(); // just test connection + + NamespaceString nss(ns); + { + // 0. copy system.namespaces entry if collection doesn't already exist + OldClientWriteContext ctx(txn, ns); + if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss)) { + errmsg = str::stream() << "Not primary during migration: " << ns + << ": checking if collection exists"; + warning() << errmsg; + setState(FAIL); + return; + } - log() << "starting receiving-end of migration of chunk " << min << " -> " << max << - " for collection " << ns << " from " << fromShard - << " at epoch " << epoch.toString(); + // Only copy if ns doesn't already exist + Database* db = ctx.db(); + Collection* collection = db->getCollection(ns); - string errmsg; - MoveTimingHelper timing(txn, "to", ns, min, max, 5 /* steps */, &errmsg, "", ""); + if (!collection) { + list<BSONObj> infos = conn->getCollectionInfos( + nsToDatabase(ns), BSON("name" << nsToCollectionSubstring(ns))); - ScopedDbConnection conn(fromShard); - conn->getLastError(); // just test connection + BSONObj options; + if (infos.size() > 0) { + BSONObj entry = infos.front(); + if (entry["options"].isABSONObj()) { + options = entry["options"].Obj(); + } + } - NamespaceString nss(ns); - { - // 0. copy system.namespaces entry if collection doesn't already exist - OldClientWriteContext ctx(txn, ns ); - if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss)) { - errmsg = str::stream() << "Not primary during migration: " << ns - << ": checking if collection exists"; - warning() << errmsg; - setState(FAIL); - return; + WriteUnitOfWork wuow(txn); + Status status = userCreateNS(txn, db, ns, options, false); + if (!status.isOK()) { + warning() << "failed to create collection [" << ns << "] " + << " with options " << options << ": " << status; } + wuow.commit(); + } + } - // Only copy if ns doesn't already exist - Database* db = ctx.db(); - Collection* collection = db->getCollection( ns ); + { + // 1. copy indexes - if ( !collection ) { - list<BSONObj> infos = - conn->getCollectionInfos(nsToDatabase(ns), - BSON("name" << nsToCollectionSubstring(ns))); + vector<BSONObj> indexSpecs; + { + const std::list<BSONObj> indexes = conn->getIndexSpecs(ns); + indexSpecs.insert(indexSpecs.begin(), indexes.begin(), indexes.end()); + } - BSONObj options; - if (infos.size() > 0) { - BSONObj entry = infos.front(); - if (entry["options"].isABSONObj()) { - options = entry["options"].Obj(); - } - } + ScopedTransaction transaction(txn, MODE_IX); + Lock::DBLock lk(txn->lockState(), nsToDatabaseSubstring(ns), MODE_X); + OldClientContext ctx(txn, ns); + if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss)) { + errmsg = str::stream() << "Not primary during migration: " << ns; + warning() << errmsg; + setState(FAIL); + return; + } - WriteUnitOfWork wuow(txn); - Status status = userCreateNS(txn, db, ns, options, false); - if ( !status.isOK() ) { - warning() << "failed to create collection [" << ns << "] " - << " with options " << options << ": " << status; - } - wuow.commit(); - } + Database* db = ctx.db(); + Collection* collection = db->getCollection(ns); + if (!collection) { + errmsg = str::stream() << "collection dropped during migration: " << ns; + warning() << errmsg; + setState(FAIL); + return; } - { - // 1. copy indexes + MultiIndexBlock indexer(txn, collection); - vector<BSONObj> indexSpecs; - { - const std::list<BSONObj> indexes = conn->getIndexSpecs(ns); - indexSpecs.insert(indexSpecs.begin(), indexes.begin(), indexes.end()); - } + indexer.removeExistingIndexes(&indexSpecs); - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock lk(txn->lockState(), nsToDatabaseSubstring(ns), MODE_X); - OldClientContext ctx(txn, ns); - if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss)) { - errmsg = str::stream() << "Not primary during migration: " << ns; + if (!indexSpecs.empty()) { + // Only copy indexes if the collection does not have any documents. + if (collection->numRecords(txn) > 0) { + errmsg = str::stream() << "aborting migration, shard is missing " + << indexSpecs.size() << " indexes and " + << "collection is not empty. Non-trivial " + << "index creation should be scheduled manually"; warning() << errmsg; setState(FAIL); return; } - Database* db = ctx.db(); - Collection* collection = db->getCollection( ns ); - if ( !collection ) { - errmsg = str::stream() << "collection dropped during migration: " << ns; + Status status = indexer.init(indexSpecs); + if (!status.isOK()) { + errmsg = str::stream() << "failed to create index before migrating data. " + << " error: " << status.toString(); warning() << errmsg; setState(FAIL); return; } - MultiIndexBlock indexer(txn, collection); + status = indexer.insertAllDocumentsInCollection(); + if (!status.isOK()) { + errmsg = str::stream() << "failed to create index before migrating data. " + << " error: " << status.toString(); + warning() << errmsg; + setState(FAIL); + return; + } - indexer.removeExistingIndexes(&indexSpecs); + WriteUnitOfWork wunit(txn); + indexer.commit(); - if (!indexSpecs.empty()) { - // Only copy indexes if the collection does not have any documents. - if (collection->numRecords(txn) > 0) { - errmsg = str::stream() << "aborting migration, shard is missing " - << indexSpecs.size() << " indexes and " - << "collection is not empty. Non-trivial " - << "index creation should be scheduled manually"; - warning() << errmsg; - setState(FAIL); - return; - } + for (size_t i = 0; i < indexSpecs.size(); i++) { + // make sure to create index on secondaries as well + getGlobalServiceContext()->getOpObserver()->onCreateIndex( + txn, db->getSystemIndexesName(), indexSpecs[i], true /* fromMigrate */); + } - Status status = indexer.init(indexSpecs); - if ( !status.isOK() ) { - errmsg = str::stream() << "failed to create index before migrating data. " - << " error: " << status.toString(); - warning() << errmsg; - setState(FAIL); - return; - } + wunit.commit(); + } - status = indexer.insertAllDocumentsInCollection(); - if ( !status.isOK() ) { - errmsg = str::stream() << "failed to create index before migrating data. " - << " error: " << status.toString(); - warning() << errmsg; - setState(FAIL); - return; - } + timing.done(1); + MONGO_FP_PAUSE_WHILE(migrateThreadHangAtStep1); + } - WriteUnitOfWork wunit(txn); - indexer.commit(); + { + // 2. delete any data already in range + RangeDeleterOptions deleterOptions( + KeyRange(ns, min.getOwned(), max.getOwned(), shardKeyPattern)); + deleterOptions.writeConcern = writeConcern; + // No need to wait since all existing cursors will filter out this range when + // returning the results. + deleterOptions.waitForOpenCursors = false; + deleterOptions.fromMigrate = true; + deleterOptions.onlyRemoveOrphanedDocs = true; + deleterOptions.removeSaverReason = "preCleanup"; - for (size_t i = 0; i < indexSpecs.size(); i++) { - // make sure to create index on secondaries as well - getGlobalServiceContext()->getOpObserver()->onCreateIndex( - txn, - db->getSystemIndexesName(), - indexSpecs[i], - true /* fromMigrate */); - } + string errMsg; - wunit.commit(); - } - - timing.done(1); - MONGO_FP_PAUSE_WHILE(migrateThreadHangAtStep1); + if (!getDeleter()->deleteNow(txn, deleterOptions, &errMsg)) { + warning() << "Failed to queue delete for migrate abort: " << errMsg; + setState(FAIL); + return; } { - // 2. delete any data already in range - RangeDeleterOptions deleterOptions(KeyRange(ns, - min.getOwned(), - max.getOwned(), - shardKeyPattern)); - deleterOptions.writeConcern = writeConcern; - // No need to wait since all existing cursors will filter out this range when - // returning the results. - deleterOptions.waitForOpenCursors = false; - deleterOptions.fromMigrate = true; - deleterOptions.onlyRemoveOrphanedDocs = true; - deleterOptions.removeSaverReason = "preCleanup"; - - string errMsg; - - if (!getDeleter()->deleteNow(txn, deleterOptions, &errMsg)) { - warning() << "Failed to queue delete for migrate abort: " << errMsg; + // Protect the range by noting that we're now starting a migration to it + ScopedTransaction transaction(txn, MODE_IX); + Lock::DBLock dbLock(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX); + Lock::CollectionLock collLock(txn->lockState(), ns, MODE_X); + + if (!shardingState.notePending(txn, ns, min, max, epoch, &errmsg)) { + warning() << errmsg; setState(FAIL); return; } + } - { - // Protect the range by noting that we're now starting a migration to it - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dbLock(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX); - Lock::CollectionLock collLock(txn->lockState(), ns, MODE_X); + timing.done(2); + MONGO_FP_PAUSE_WHILE(migrateThreadHangAtStep2); + } - if (!shardingState.notePending(txn, ns, min, max, epoch, &errmsg)) { - warning() << errmsg; - setState(FAIL); - return; - } - } + State currentState = getState(); + if (currentState == FAIL || currentState == ABORT) { + string errMsg; + RangeDeleterOptions deleterOptions( + KeyRange(ns, min.getOwned(), max.getOwned(), shardKeyPattern)); + deleterOptions.writeConcern = writeConcern; + // No need to wait since all existing cursors will filter out this range when + // returning the results. + deleterOptions.waitForOpenCursors = false; + deleterOptions.fromMigrate = true; + deleterOptions.onlyRemoveOrphanedDocs = true; - timing.done(2); - MONGO_FP_PAUSE_WHILE(migrateThreadHangAtStep2); - } - - State currentState = getState(); - if (currentState == FAIL || currentState == ABORT) { - string errMsg; - RangeDeleterOptions deleterOptions(KeyRange(ns, - min.getOwned(), - max.getOwned(), - shardKeyPattern)); - deleterOptions.writeConcern = writeConcern; - // No need to wait since all existing cursors will filter out this range when - // returning the results. - deleterOptions.waitForOpenCursors = false; - deleterOptions.fromMigrate = true; - deleterOptions.onlyRemoveOrphanedDocs = true; - - if (!getDeleter()->queueDelete(txn, deleterOptions, NULL /* notifier */, &errMsg)) { - warning() << "Failed to queue delete for migrate abort: " << errMsg; - } + if (!getDeleter()->queueDelete(txn, deleterOptions, NULL /* notifier */, &errMsg)) { + warning() << "Failed to queue delete for migrate abort: " << errMsg; } + } - { - // 3. initial bulk clone - setState(CLONE); - - while ( true ) { - BSONObj res; - if ( ! conn->runCommand( "admin" , BSON( "_migrateClone" << 1 ) , res ) ) { // gets array of objects to copy, in disk order - setState(FAIL); - errmsg = "_migrateClone failed: "; - errmsg += res.toString(); + { + // 3. initial bulk clone + setState(CLONE); + + while (true) { + BSONObj res; + if (!conn->runCommand("admin", + BSON("_migrateClone" << 1), + res)) { // gets array of objects to copy, in disk order + setState(FAIL); + errmsg = "_migrateClone failed: "; + errmsg += res.toString(); + error() << errmsg << migrateLog; + conn.done(); + return; + } + + BSONObj arr = res["objects"].Obj(); + int thisTime = 0; + + BSONObjIterator i(arr); + while (i.more()) { + txn->checkForInterrupt(); + + if (getState() == ABORT) { + errmsg = str::stream() << "Migration abort requested while " + << "copying documents"; error() << errmsg << migrateLog; - conn.done(); return; } - BSONObj arr = res["objects"].Obj(); - int thisTime = 0; - - BSONObjIterator i( arr ); - while( i.more() ) { - txn->checkForInterrupt(); - - if ( getState() == ABORT ) { - errmsg = str::stream() << "Migration abort requested while " - << "copying documents"; - error() << errmsg << migrateLog; - return; + BSONObj docToClone = i.next().Obj(); + { + OldClientWriteContext cx(txn, ns); + + BSONObj localDoc; + if (willOverrideLocalId(txn, + ns, + min, + max, + shardKeyPattern, + cx.db(), + docToClone, + &localDoc)) { + string errMsg = str::stream() << "cannot migrate chunk, local document " + << localDoc << " has same _id as cloned " + << "remote document " << docToClone; + + warning() << errMsg; + + // Exception will abort migration cleanly + uasserted(16976, errMsg); } - BSONObj docToClone = i.next().Obj(); - { - OldClientWriteContext cx(txn, ns ); - - BSONObj localDoc; - if (willOverrideLocalId(txn, - ns, - min, - max, - shardKeyPattern, - cx.db(), - docToClone, - &localDoc)) { - string errMsg = - str::stream() << "cannot migrate chunk, local document " - << localDoc - << " has same _id as cloned " - << "remote document " << docToClone; - - warning() << errMsg; - - // Exception will abort migration cleanly - uasserted( 16976, errMsg ); - } - - Helpers::upsert( txn, ns, docToClone, true ); - } - thisTime++; + Helpers::upsert(txn, ns, docToClone, true); + } + thisTime++; - { - std::lock_guard<std::mutex> statsLock(_mutex); - _numCloned++; - _clonedBytes += docToClone.objsize(); - } + { + std::lock_guard<std::mutex> statsLock(_mutex); + _numCloned++; + _clonedBytes += docToClone.objsize(); + } - if (writeConcern.shouldWaitForOtherNodes()) { - repl::ReplicationCoordinator::StatusAndDuration replStatus = - repl::getGlobalReplicationCoordinator()->awaitReplication( - txn, - repl::ReplClientInfo::forClient( - txn->getClient()).getLastOp(), - writeConcern); - if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) { - warning() << "secondaryThrottle on, but doc insert timed out; " - "continuing"; - } - else { - massertStatusOK(replStatus.status); - } + if (writeConcern.shouldWaitForOtherNodes()) { + repl::ReplicationCoordinator::StatusAndDuration replStatus = + repl::getGlobalReplicationCoordinator()->awaitReplication( + txn, + repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(), + writeConcern); + if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) { + warning() << "secondaryThrottle on, but doc insert timed out; " + "continuing"; + } else { + massertStatusOK(replStatus.status); } } - - if ( thisTime == 0 ) - break; } - timing.done(3); - MONGO_FP_PAUSE_WHILE(migrateThreadHangAtStep3); + if (thisTime == 0) + break; } - // If running on a replicated system, we'll need to flush the docs we cloned to the - // secondaries - repl::OpTime lastOpApplied = - repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(); + timing.done(3); + MONGO_FP_PAUSE_WHILE(migrateThreadHangAtStep3); + } - { - // 4. do bulk of mods - setState(CATCHUP); - while ( true ) { - BSONObj res; - if ( ! conn->runCommand( "admin" , BSON( "_transferMods" << 1 ) , res ) ) { - setState(FAIL); - errmsg = "_transferMods failed: "; - errmsg += res.toString(); - error() << "_transferMods failed: " << res << migrateLog; - conn.done(); - return; - } - if ( res["size"].number() == 0 ) - break; + // If running on a replicated system, we'll need to flush the docs we cloned to the + // secondaries + repl::OpTime lastOpApplied = repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(); - apply(txn, ns, min, max, shardKeyPattern, res, &lastOpApplied); + { + // 4. do bulk of mods + setState(CATCHUP); + while (true) { + BSONObj res; + if (!conn->runCommand("admin", BSON("_transferMods" << 1), res)) { + setState(FAIL); + errmsg = "_transferMods failed: "; + errmsg += res.toString(); + error() << "_transferMods failed: " << res << migrateLog; + conn.done(); + return; + } + if (res["size"].number() == 0) + break; - const int maxIterations = 3600*50; - int i; - for ( i=0;i<maxIterations; i++) { - txn->checkForInterrupt(); + apply(txn, ns, min, max, shardKeyPattern, res, &lastOpApplied); - if ( getState() == ABORT ) { - errmsg = str::stream() << "Migration abort requested while waiting " - << "for replication at catch up stage"; - error() << errmsg << migrateLog; + const int maxIterations = 3600 * 50; + int i; + for (i = 0; i < maxIterations; i++) { + txn->checkForInterrupt(); - return; - } + if (getState() == ABORT) { + errmsg = str::stream() << "Migration abort requested while waiting " + << "for replication at catch up stage"; + error() << errmsg << migrateLog; - if (opReplicatedEnough(txn, lastOpApplied, writeConcern)) - break; + return; + } - if ( i > 100 ) { - warning() << "secondaries having hard time keeping up with migrate" << migrateLog; - } + if (opReplicatedEnough(txn, lastOpApplied, writeConcern)) + break; - sleepmillis( 20 ); + if (i > 100) { + warning() << "secondaries having hard time keeping up with migrate" + << migrateLog; } - if ( i == maxIterations ) { - errmsg = "secondary can't keep up with migrate"; - error() << errmsg << migrateLog; - conn.done(); - setState(FAIL); - return; - } + sleepmillis(20); } - timing.done(4); - MONGO_FP_PAUSE_WHILE(migrateThreadHangAtStep4); + if (i == maxIterations) { + errmsg = "secondary can't keep up with migrate"; + error() << errmsg << migrateLog; + conn.done(); + setState(FAIL); + return; + } } - { - // pause to wait for replication - // this will prevent us from going into critical section until we're ready - Timer t; - while ( t.minutes() < 600 ) { - txn->checkForInterrupt(); + timing.done(4); + MONGO_FP_PAUSE_WHILE(migrateThreadHangAtStep4); + } - if (getState() == ABORT) { - errmsg = "Migration abort requested while waiting for replication"; - error() << errmsg << migrateLog; - return; - } + { + // pause to wait for replication + // this will prevent us from going into critical section until we're ready + Timer t; + while (t.minutes() < 600) { + txn->checkForInterrupt(); - log() << "Waiting for replication to catch up before entering critical section" - ; - if (flushPendingWrites(txn, ns, min, max, lastOpApplied, writeConcern)) - break; - sleepsecs(1); + if (getState() == ABORT) { + errmsg = "Migration abort requested while waiting for replication"; + error() << errmsg << migrateLog; + return; } - if (t.minutes() >= 600) { - setState(FAIL); - errmsg = "Cannot go to critical section because secondaries cannot keep up"; - error() << errmsg << migrateLog; - return; - } + log() << "Waiting for replication to catch up before entering critical section"; + if (flushPendingWrites(txn, ns, min, max, lastOpApplied, writeConcern)) + break; + sleepsecs(1); } - { - // 5. wait for commit - - setState(STEADY); - bool transferAfterCommit = false; - while ( getState() == STEADY || getState() == COMMIT_START ) { - txn->checkForInterrupt(); + if (t.minutes() >= 600) { + setState(FAIL); + errmsg = "Cannot go to critical section because secondaries cannot keep up"; + error() << errmsg << migrateLog; + return; + } + } - // Make sure we do at least one transfer after recv'ing the commit message - // If we aren't sure that at least one transfer happens *after* our state - // changes to COMMIT_START, there could be mods still on the FROM shard that - // got logged *after* our _transferMods but *before* the critical section. - if ( getState() == COMMIT_START ) transferAfterCommit = true; - - BSONObj res; - if ( ! conn->runCommand( "admin" , BSON( "_transferMods" << 1 ) , res ) ) { - log() << "_transferMods failed in STEADY state: " << res << migrateLog; - errmsg = res.toString(); - setState(FAIL); - conn.done(); - return; - } + { + // 5. wait for commit - if (res["size"].number() > 0 && - apply(txn, ns, min, max, shardKeyPattern, res, &lastOpApplied)) { - continue; - } + setState(STEADY); + bool transferAfterCommit = false; + while (getState() == STEADY || getState() == COMMIT_START) { + txn->checkForInterrupt(); - if ( getState() == ABORT ) { - return; - } + // Make sure we do at least one transfer after recv'ing the commit message + // If we aren't sure that at least one transfer happens *after* our state + // changes to COMMIT_START, there could be mods still on the FROM shard that + // got logged *after* our _transferMods but *before* the critical section. + if (getState() == COMMIT_START) + transferAfterCommit = true; - // We know we're finished when: - // 1) The from side has told us that it has locked writes (COMMIT_START) - // 2) We've checked at least one more time for un-transmitted mods - if ( getState() == COMMIT_START && transferAfterCommit == true ) { - if (flushPendingWrites(txn, ns, min, max, lastOpApplied, writeConcern)) - break; - } + BSONObj res; + if (!conn->runCommand("admin", BSON("_transferMods" << 1), res)) { + log() << "_transferMods failed in STEADY state: " << res << migrateLog; + errmsg = res.toString(); + setState(FAIL); + conn.done(); + return; + } - // Only sleep if we aren't committing - if ( getState() == STEADY ) sleepmillis( 10 ); + if (res["size"].number() > 0 && + apply(txn, ns, min, max, shardKeyPattern, res, &lastOpApplied)) { + continue; } - if ( getState() == FAIL ) { - errmsg = "timed out waiting for commit"; + if (getState() == ABORT) { return; } - timing.done(5); - MONGO_FP_PAUSE_WHILE(migrateThreadHangAtStep5); + // We know we're finished when: + // 1) The from side has told us that it has locked writes (COMMIT_START) + // 2) We've checked at least one more time for un-transmitted mods + if (getState() == COMMIT_START && transferAfterCommit == true) { + if (flushPendingWrites(txn, ns, min, max, lastOpApplied, writeConcern)) + break; + } + + // Only sleep if we aren't committing + if (getState() == STEADY) + sleepmillis(10); } - setState(DONE); - conn.done(); + if (getState() == FAIL) { + errmsg = "timed out waiting for commit"; + return; + } + + timing.done(5); + MONGO_FP_PAUSE_WHILE(migrateThreadHangAtStep5); } - void status(BSONObjBuilder& b) { - std::lock_guard<std::mutex> sl(_mutex); + setState(DONE); + conn.done(); + } - b.appendBool("active", _active); + void status(BSONObjBuilder& b) { + std::lock_guard<std::mutex> sl(_mutex); - b.append("ns", _ns); - b.append("from", _from); - b.append("min", _min); - b.append("max", _max); - b.append("shardKeyPattern", _shardKeyPattern); + b.appendBool("active", _active); - b.append("state", stateToString(_state)); - if (_state == FAIL) { - b.append("errmsg", _errmsg); - } + b.append("ns", _ns); + b.append("from", _from); + b.append("min", _min); + b.append("max", _max); + b.append("shardKeyPattern", _shardKeyPattern); - BSONObjBuilder bb(b.subobjStart("counts")); - bb.append("cloned", _numCloned); - bb.append("clonedBytes", _clonedBytes); - bb.append("catchup", _numCatchup); - bb.append("steady", _numSteady); - bb.done(); + b.append("state", stateToString(_state)); + if (_state == FAIL) { + b.append("errmsg", _errmsg); } - bool apply(OperationContext* txn, - const string& ns, - BSONObj min, - BSONObj max, - BSONObj shardKeyPattern, - const BSONObj& xfer, - repl::OpTime* lastOpApplied) { - - repl::OpTime dummy; - if ( lastOpApplied == NULL ) { - lastOpApplied = &dummy; - } + BSONObjBuilder bb(b.subobjStart("counts")); + bb.append("cloned", _numCloned); + bb.append("clonedBytes", _clonedBytes); + bb.append("catchup", _numCatchup); + bb.append("steady", _numSteady); + bb.done(); + } - bool didAnything = false; + bool apply(OperationContext* txn, + const string& ns, + BSONObj min, + BSONObj max, + BSONObj shardKeyPattern, + const BSONObj& xfer, + repl::OpTime* lastOpApplied) { + repl::OpTime dummy; + if (lastOpApplied == NULL) { + lastOpApplied = &dummy; + } - if ( xfer["deleted"].isABSONObj() ) { - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dlk(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX); - Helpers::RemoveSaver rs( "moveChunk" , ns , "removedDuring" ); + bool didAnything = false; - BSONObjIterator i( xfer["deleted"].Obj() ); - while ( i.more() ) { - Lock::CollectionLock clk(txn->lockState(), ns, MODE_X); - OldClientContext ctx(txn, ns); + if (xfer["deleted"].isABSONObj()) { + ScopedTransaction transaction(txn, MODE_IX); + Lock::DBLock dlk(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX); + Helpers::RemoveSaver rs("moveChunk", ns, "removedDuring"); - BSONObj id = i.next().Obj(); + BSONObjIterator i(xfer["deleted"].Obj()); + while (i.more()) { + Lock::CollectionLock clk(txn->lockState(), ns, MODE_X); + OldClientContext ctx(txn, ns); - // do not apply deletes if they do not belong to the chunk being migrated - BSONObj fullObj; - if (Helpers::findById(txn, ctx.db(), ns.c_str(), id, fullObj)) { - if (!isInRange(fullObj , min , max , shardKeyPattern)) { - log() << "not applying out of range deletion: " << fullObj << migrateLog; + BSONObj id = i.next().Obj(); - continue; - } - } + // do not apply deletes if they do not belong to the chunk being migrated + BSONObj fullObj; + if (Helpers::findById(txn, ctx.db(), ns.c_str(), id, fullObj)) { + if (!isInRange(fullObj, min, max, shardKeyPattern)) { + log() << "not applying out of range deletion: " << fullObj << migrateLog; - if (serverGlobalParams.moveParanoia) { - rs.goingToDelete(fullObj); + continue; } + } - deleteObjects(txn, - ctx.db(), - ns, - id, - PlanExecutor::YIELD_MANUAL, - true /* justOne */, - false /* god */, - true /* fromMigrate */); - - *lastOpApplied = repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(); - didAnything = true; + if (serverGlobalParams.moveParanoia) { + rs.goingToDelete(fullObj); } + + deleteObjects(txn, + ctx.db(), + ns, + id, + PlanExecutor::YIELD_MANUAL, + true /* justOne */, + false /* god */, + true /* fromMigrate */); + + *lastOpApplied = repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(); + didAnything = true; } + } - if ( xfer["reload"].isABSONObj() ) { - BSONObjIterator i( xfer["reload"].Obj() ); - while ( i.more() ) { - OldClientWriteContext cx(txn, ns); - - BSONObj updatedDoc = i.next().Obj(); - - BSONObj localDoc; - if (willOverrideLocalId(txn, - ns, - min, - max, - shardKeyPattern, - cx.db(), - updatedDoc, - &localDoc)) { - string errMsg = - str::stream() << "cannot migrate chunk, local document " - << localDoc - << " has same _id as reloaded remote document " - << updatedDoc; - - warning() << errMsg; - - // Exception will abort migration cleanly - uasserted( 16977, errMsg ); - } + if (xfer["reload"].isABSONObj()) { + BSONObjIterator i(xfer["reload"].Obj()); + while (i.more()) { + OldClientWriteContext cx(txn, ns); - // We are in write lock here, so sure we aren't killing - Helpers::upsert( txn, ns , updatedDoc , true ); + BSONObj updatedDoc = i.next().Obj(); - *lastOpApplied = repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(); - didAnything = true; - } - } + BSONObj localDoc; + if (willOverrideLocalId( + txn, ns, min, max, shardKeyPattern, cx.db(), updatedDoc, &localDoc)) { + string errMsg = str::stream() + << "cannot migrate chunk, local document " << localDoc + << " has same _id as reloaded remote document " << updatedDoc; - return didAnything; - } + warning() << errMsg; - /** - * Checks if an upsert of a remote document will override a local document with the same _id - * but in a different range on this shard. - * Must be in WriteContext to avoid races and DBHelper errors. - * TODO: Could optimize this check out if sharding on _id. - */ - bool willOverrideLocalId(OperationContext* txn, - const string& ns, - BSONObj min, - BSONObj max, - BSONObj shardKeyPattern, - Database* db, - BSONObj remoteDoc, - BSONObj* localDoc) { + // Exception will abort migration cleanly + uasserted(16977, errMsg); + } + + // We are in write lock here, so sure we aren't killing + Helpers::upsert(txn, ns, updatedDoc, true); - *localDoc = BSONObj(); - if ( Helpers::findById( txn, db, ns.c_str(), remoteDoc, *localDoc ) ) { - return !isInRange( *localDoc , min , max , shardKeyPattern ); + *lastOpApplied = repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(); + didAnything = true; } + } - return false; + return didAnything; + } + + /** + * Checks if an upsert of a remote document will override a local document with the same _id + * but in a different range on this shard. + * Must be in WriteContext to avoid races and DBHelper errors. + * TODO: Could optimize this check out if sharding on _id. + */ + bool willOverrideLocalId(OperationContext* txn, + const string& ns, + BSONObj min, + BSONObj max, + BSONObj shardKeyPattern, + Database* db, + BSONObj remoteDoc, + BSONObj* localDoc) { + *localDoc = BSONObj(); + if (Helpers::findById(txn, db, ns.c_str(), remoteDoc, *localDoc)) { + return !isInRange(*localDoc, min, max, shardKeyPattern); } - /** - * Returns true if the majority of the nodes and the nodes corresponding to the given - * writeConcern (if not empty) have applied till the specified lastOp. - */ - bool opReplicatedEnough(OperationContext* txn, - const repl::OpTime& lastOpApplied, - const WriteConcernOptions& writeConcern) { - WriteConcernOptions majorityWriteConcern; - majorityWriteConcern.wTimeout = -1; - majorityWriteConcern.wMode = WriteConcernOptions::kMajority; - Status majorityStatus = repl::getGlobalReplicationCoordinator()->awaitReplication( - txn, lastOpApplied, majorityWriteConcern).status; - - if (!writeConcern.shouldWaitForOtherNodes()) { - return majorityStatus.isOK(); - } - - // Also enforce the user specified write concern after "majority" so it covers - // the union of the 2 write concerns. - - WriteConcernOptions userWriteConcern(writeConcern); - userWriteConcern.wTimeout = -1; - Status userStatus = repl::getGlobalReplicationCoordinator()->awaitReplication( - txn, lastOpApplied, userWriteConcern).status; - - return majorityStatus.isOK() && userStatus.isOK(); - } - - bool flushPendingWrites(OperationContext* txn, - const std::string& ns, - BSONObj min, - BSONObj max, - const repl::OpTime& lastOpApplied, - const WriteConcernOptions& writeConcern) { - if (!opReplicatedEnough(txn, lastOpApplied, writeConcern)) { - repl::OpTime op(lastOpApplied); - OCCASIONALLY warning() << "migrate commit waiting for a majority of slaves for '" - << ns << "' " << min << " -> " << max - << " waiting for: " << op - << migrateLog; - return false; - } + return false; + } - log() << "migrate commit succeeded flushing to secondaries for '" << ns << "' " << min << " -> " << max << migrateLog; + /** + * Returns true if the majority of the nodes and the nodes corresponding to the given + * writeConcern (if not empty) have applied till the specified lastOp. + */ + bool opReplicatedEnough(OperationContext* txn, + const repl::OpTime& lastOpApplied, + const WriteConcernOptions& writeConcern) { + WriteConcernOptions majorityWriteConcern; + majorityWriteConcern.wTimeout = -1; + majorityWriteConcern.wMode = WriteConcernOptions::kMajority; + Status majorityStatus = repl::getGlobalReplicationCoordinator() + ->awaitReplication(txn, lastOpApplied, majorityWriteConcern) + .status; + + if (!writeConcern.shouldWaitForOtherNodes()) { + return majorityStatus.isOK(); + } - { - // Get global lock to wait for write to be commited to journal. - ScopedTransaction transaction(txn, MODE_S); - Lock::GlobalRead lk(txn->lockState()); + // Also enforce the user specified write concern after "majority" so it covers + // the union of the 2 write concerns. - // if durability is on, force a write to journal - if (getDur().commitNow(txn)) { - log() << "migrate commit flushed to journal for '" << ns << "' " << min << " -> " << max << migrateLog; - } - } + WriteConcernOptions userWriteConcern(writeConcern); + userWriteConcern.wTimeout = -1; + Status userStatus = repl::getGlobalReplicationCoordinator() + ->awaitReplication(txn, lastOpApplied, userWriteConcern) + .status; - return true; - } + return majorityStatus.isOK() && userStatus.isOK(); + } - static string stateToString(State state) { - switch (state) { - case READY: return "ready"; - case CLONE: return "clone"; - case CATCHUP: return "catchup"; - case STEADY: return "steady"; - case COMMIT_START: return "commitStart"; - case DONE: return "done"; - case FAIL: return "fail"; - case ABORT: return "abort"; - } - verify(0); - return ""; + bool flushPendingWrites(OperationContext* txn, + const std::string& ns, + BSONObj min, + BSONObj max, + const repl::OpTime& lastOpApplied, + const WriteConcernOptions& writeConcern) { + if (!opReplicatedEnough(txn, lastOpApplied, writeConcern)) { + repl::OpTime op(lastOpApplied); + OCCASIONALLY warning() << "migrate commit waiting for a majority of slaves for '" << ns + << "' " << min << " -> " << max << " waiting for: " << op + << migrateLog; + return false; } - bool startCommit() { - std::unique_lock<std::mutex> lock(_mutex); + log() << "migrate commit succeeded flushing to secondaries for '" << ns << "' " << min + << " -> " << max << migrateLog; - if (_state != STEADY) { - return false; - } + { + // Get global lock to wait for write to be commited to journal. + ScopedTransaction transaction(txn, MODE_S); + Lock::GlobalRead lk(txn->lockState()); - const auto deadline = system_clock::now() + seconds(30); - _state = COMMIT_START; - while (_active) { - if (std::cv_status::timeout == isActiveCV.wait_until(lock, deadline)) { - _state = FAIL; - log() << "startCommit never finished!" << migrateLog; - return false; - } + // if durability is on, force a write to journal + if (getDur().commitNow(txn)) { + log() << "migrate commit flushed to journal for '" << ns << "' " << min << " -> " + << max << migrateLog; } + } - if (_state == DONE) { - return true; - } + return true; + } - log() << "startCommit failed, final data failed to transfer" << migrateLog; + static string stateToString(State state) { + switch (state) { + case READY: + return "ready"; + case CLONE: + return "clone"; + case CATCHUP: + return "catchup"; + case STEADY: + return "steady"; + case COMMIT_START: + return "commitStart"; + case DONE: + return "done"; + case FAIL: + return "fail"; + case ABORT: + return "abort"; + } + verify(0); + return ""; + } + + bool startCommit() { + std::unique_lock<std::mutex> lock(_mutex); + + if (_state != STEADY) { return false; } - void abort() { - std::lock_guard<std::mutex> sl(_mutex); - _state = ABORT; - _errmsg = "aborted"; + const auto deadline = system_clock::now() + seconds(30); + _state = COMMIT_START; + while (_active) { + if (std::cv_status::timeout == isActiveCV.wait_until(lock, deadline)) { + _state = FAIL; + log() << "startCommit never finished!" << migrateLog; + return false; + } } - bool getActive() const { std::lock_guard<std::mutex> lk(_mutex); return _active; } - void setActive( bool b ) { - std::lock_guard<std::mutex> lk(_mutex); - _active = b; - isActiveCV.notify_all(); + if (_state == DONE) { + return true; } - // Guards all fields. - mutable std::mutex _mutex; - bool _active; - std::condition_variable isActiveCV; + log() << "startCommit failed, final data failed to transfer" << migrateLog; + return false; + } + + void abort() { + std::lock_guard<std::mutex> sl(_mutex); + _state = ABORT; + _errmsg = "aborted"; + } + + bool getActive() const { + std::lock_guard<std::mutex> lk(_mutex); + return _active; + } + void setActive(bool b) { + std::lock_guard<std::mutex> lk(_mutex); + _active = b; + isActiveCV.notify_all(); + } - std::string _ns; - std::string _from; - BSONObj _min; - BSONObj _max; - BSONObj _shardKeyPattern; + // Guards all fields. + mutable std::mutex _mutex; + bool _active; + std::condition_variable isActiveCV; - long long _numCloned; - long long _clonedBytes; - long long _numCatchup; - long long _numSteady; + std::string _ns; + std::string _from; + BSONObj _min; + BSONObj _max; + BSONObj _shardKeyPattern; - State _state; - std::string _errmsg; + long long _numCloned; + long long _clonedBytes; + long long _numCatchup; + long long _numSteady; - } migrateStatus; + State _state; + std::string _errmsg; - void migrateThread(std::string ns, - BSONObj min, - BSONObj max, - BSONObj shardKeyPattern, - std::string fromShard, - OID epoch, - WriteConcernOptions writeConcern) { - Client::initThread( "migrateThread" ); - OperationContextImpl txn; - if (getGlobalAuthorizationManager()->isAuthEnabled()) { - ShardedConnectionInfo::addHook(); - AuthorizationSession::get(txn.getClient())->grantInternalAuthorization(); - } +} migrateStatus; - migrateStatus.go(&txn, ns, min, max, shardKeyPattern, fromShard, epoch, writeConcern); +void migrateThread(std::string ns, + BSONObj min, + BSONObj max, + BSONObj shardKeyPattern, + std::string fromShard, + OID epoch, + WriteConcernOptions writeConcern) { + Client::initThread("migrateThread"); + OperationContextImpl txn; + if (getGlobalAuthorizationManager()->isAuthEnabled()) { + ShardedConnectionInfo::addHook(); + AuthorizationSession::get(txn.getClient())->grantInternalAuthorization(); } - /** - * Command for initiating the recipient side of the migration to start copying data - * from the donor shard. - * - * { - * _recvChunkStart: "namespace", - * congfigServer: "hostAndPort", - * from: "hostAndPort", - * fromShardName: "shardName", - * toShardName: "shardName", - * min: {}, - * max: {}, - * shardKeyPattern: {}, - * - * // optional - * secondaryThrottle: bool, // defaults to true - * writeConcern: {} // applies to individual writes. - * } - */ - class RecvChunkStartCommand : public ChunkCommandHelper { - public: - void help(std::stringstream& h) const { h << "internal"; } - RecvChunkStartCommand() : ChunkCommandHelper( "_recvChunkStart" ) {} - - virtual bool isWriteCommandForConfigServer() const { return false; } - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) { - ActionSet actions; - actions.addAction(ActionType::internal); - out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); - } - bool run(OperationContext* txn, - const string&, - BSONObj& cmdObj, - int, - string& errmsg, - BSONObjBuilder& result) { - - // Active state of TO-side migrations (MigrateStatus) is serialized by distributed - // collection lock. - if ( migrateStatus.getActive() ) { - errmsg = "migrate already in progress"; - return false; - } + migrateStatus.go(&txn, ns, min, max, shardKeyPattern, fromShard, epoch, writeConcern); +} - // Pending deletes (for migrations) are serialized by the distributed collection lock, - // we are sure we registered a delete for a range *before* we can migrate-in a - // subrange. - const size_t numDeletes = getDeleter()->getTotalDeletes(); - if (numDeletes > 0) { +/** + * Command for initiating the recipient side of the migration to start copying data + * from the donor shard. + * + * { + * _recvChunkStart: "namespace", + * congfigServer: "hostAndPort", + * from: "hostAndPort", + * fromShardName: "shardName", + * toShardName: "shardName", + * min: {}, + * max: {}, + * shardKeyPattern: {}, + * + * // optional + * secondaryThrottle: bool, // defaults to true + * writeConcern: {} // applies to individual writes. + * } + */ +class RecvChunkStartCommand : public ChunkCommandHelper { +public: + void help(std::stringstream& h) const { + h << "internal"; + } + RecvChunkStartCommand() : ChunkCommandHelper("_recvChunkStart") {} - errmsg = str::stream() << "can't accept new chunks because " - << " there are still " << numDeletes - << " deletes from previous migration"; + virtual bool isWriteCommandForConfigServer() const { + return false; + } + virtual void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) { + ActionSet actions; + actions.addAction(ActionType::internal); + out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); + } + bool run(OperationContext* txn, + const string&, + BSONObj& cmdObj, + int, + string& errmsg, + BSONObjBuilder& result) { + // Active state of TO-side migrations (MigrateStatus) is serialized by distributed + // collection lock. + if (migrateStatus.getActive()) { + errmsg = "migrate already in progress"; + return false; + } + + // Pending deletes (for migrations) are serialized by the distributed collection lock, + // we are sure we registered a delete for a range *before* we can migrate-in a + // subrange. + const size_t numDeletes = getDeleter()->getTotalDeletes(); + if (numDeletes > 0) { + errmsg = str::stream() << "can't accept new chunks because " + << " there are still " << numDeletes + << " deletes from previous migration"; + + warning() << errmsg; + return false; + } + + if (!shardingState.enabled()) { + if (!cmdObj["configServer"].eoo()) { + dassert(cmdObj["configServer"].type() == String); + ShardingState::initialize(cmdObj["configServer"].String()); + } else { + errmsg = str::stream() + << "cannot start recv'ing chunk, " + << "sharding is not enabled and no config server was provided"; warning() << errmsg; return false; } + } - if (!shardingState.enabled()) { - if (!cmdObj["configServer"].eoo()) { - dassert(cmdObj["configServer"].type() == String); - ShardingState::initialize(cmdObj["configServer"].String()); - } - else { - - errmsg = str::stream() - << "cannot start recv'ing chunk, " - << "sharding is not enabled and no config server was provided"; + if (!cmdObj["toShardName"].eoo()) { + dassert(cmdObj["toShardName"].type() == String); + shardingState.gotShardName(cmdObj["toShardName"].String()); + } - warning() << errmsg; - return false; - } - } + string ns = cmdObj.firstElement().String(); + BSONObj min = cmdObj["min"].Obj().getOwned(); + BSONObj max = cmdObj["max"].Obj().getOwned(); - if ( !cmdObj["toShardName"].eoo() ) { - dassert( cmdObj["toShardName"].type() == String ); - shardingState.gotShardName( cmdObj["toShardName"].String() ); - } + // Refresh our collection manager from the config server, we need a collection manager + // to start registering pending chunks. + // We force the remote refresh here to make the behavior consistent and predictable, + // generally we'd refresh anyway, and to be paranoid. + ChunkVersion currentVersion; + Status status = shardingState.refreshMetadataNow(txn, ns, ¤tVersion); - string ns = cmdObj.firstElement().String(); - BSONObj min = cmdObj["min"].Obj().getOwned(); - BSONObj max = cmdObj["max"].Obj().getOwned(); + if (!status.isOK()) { + errmsg = str::stream() << "cannot start recv'ing chunk " + << "[" << min << "," << max << ")" << causedBy(status.reason()); - // Refresh our collection manager from the config server, we need a collection manager - // to start registering pending chunks. - // We force the remote refresh here to make the behavior consistent and predictable, - // generally we'd refresh anyway, and to be paranoid. - ChunkVersion currentVersion; - Status status = shardingState.refreshMetadataNow(txn, ns, ¤tVersion ); + warning() << errmsg; + return false; + } - if ( !status.isOK() ) { - errmsg = str::stream() << "cannot start recv'ing chunk " - << "[" << min << "," << max << ")" - << causedBy( status.reason() ); + // Process secondary throttle settings and assign defaults if necessary. + WriteConcernOptions writeConcern; + status = writeConcern.parseSecondaryThrottle(cmdObj, NULL); - warning() << errmsg; - return false; + if (!status.isOK()) { + if (status.code() != ErrorCodes::WriteConcernNotDefined) { + warning() << status.toString(); + return appendCommandStatus(result, status); } - // Process secondary throttle settings and assign defaults if necessary. - WriteConcernOptions writeConcern; - status = writeConcern.parseSecondaryThrottle(cmdObj, NULL); + writeConcern = getDefaultWriteConcern(); + } else { + repl::ReplicationCoordinator* replCoordinator = repl::getGlobalReplicationCoordinator(); - if (!status.isOK()){ - if (status.code() != ErrorCodes::WriteConcernNotDefined) { - warning() << status.toString(); - return appendCommandStatus(result, status); - } + if (replCoordinator->getReplicationMode() == + repl::ReplicationCoordinator::modeMasterSlave && + writeConcern.shouldWaitForOtherNodes()) { + warning() << "recvChunk cannot check if secondary throttle setting " + << writeConcern.toBSON() + << " can be enforced in a master slave configuration"; + } - writeConcern = getDefaultWriteConcern(); + Status status = replCoordinator->checkIfWriteConcernCanBeSatisfied(writeConcern); + if (!status.isOK() && status != ErrorCodes::NoReplicationEnabled) { + warning() << status.toString(); + return appendCommandStatus(result, status); } - else { - repl::ReplicationCoordinator* replCoordinator = - repl::getGlobalReplicationCoordinator(); + } - if (replCoordinator->getReplicationMode() == - repl::ReplicationCoordinator::modeMasterSlave && - writeConcern.shouldWaitForOtherNodes()) { - warning() << "recvChunk cannot check if secondary throttle setting " - << writeConcern.toBSON() - << " can be enforced in a master slave configuration"; - } + if (writeConcern.shouldWaitForOtherNodes() && + writeConcern.wTimeout == WriteConcernOptions::kNoTimeout) { + // Don't allow no timeout. + writeConcern.wTimeout = kDefaultWTimeoutMs; + } - Status status = replCoordinator->checkIfWriteConcernCanBeSatisfied(writeConcern); - if (!status.isOK() && status != ErrorCodes::NoReplicationEnabled) { - warning() << status.toString(); - return appendCommandStatus(result, status); - } - } + BSONObj shardKeyPattern; + if (cmdObj.hasField("shardKeyPattern")) { + shardKeyPattern = cmdObj["shardKeyPattern"].Obj().getOwned(); + } else { + // shardKeyPattern may not be provided if another shard is from pre 2.2 + // In that case, assume the shard key pattern is the same as the range + // specifiers provided. + BSONObj keya = Helpers::inferKeyPattern(min); + BSONObj keyb = Helpers::inferKeyPattern(max); + verify(keya == keyb); + + warning() + << "No shard key pattern provided by source shard for migration." + " This is likely because the source shard is running a version prior to 2.2." + " Falling back to assuming the shard key matches the pattern of the min and max" + " chunk range specifiers. Inferred shard key: " << keya; + + shardKeyPattern = keya.getOwned(); + } - if (writeConcern.shouldWaitForOtherNodes() && - writeConcern.wTimeout == WriteConcernOptions::kNoTimeout) { - // Don't allow no timeout. - writeConcern.wTimeout = kDefaultWTimeoutMs; - } + const string fromShard(cmdObj["from"].String()); - BSONObj shardKeyPattern; - if (cmdObj.hasField("shardKeyPattern")) { - shardKeyPattern = cmdObj["shardKeyPattern"].Obj().getOwned(); - } else { - // shardKeyPattern may not be provided if another shard is from pre 2.2 - // In that case, assume the shard key pattern is the same as the range - // specifiers provided. - BSONObj keya = Helpers::inferKeyPattern(min); - BSONObj keyb = Helpers::inferKeyPattern(max); - verify( keya == keyb ); + // Set the TO-side migration to active + Status prepareStatus = migrateStatus.prepare(ns, fromShard, min, max, shardKeyPattern); - warning() << "No shard key pattern provided by source shard for migration." - " This is likely because the source shard is running a version prior to 2.2." - " Falling back to assuming the shard key matches the pattern of the min and max" - " chunk range specifiers. Inferred shard key: " << keya; + if (!prepareStatus.isOK()) { + return appendCommandStatus(result, prepareStatus); + } - shardKeyPattern = keya.getOwned(); - } + std::thread m(migrateThread, + ns, + min, + max, + shardKeyPattern, + fromShard, + currentVersion.epoch(), + writeConcern); + + m.detach(); + result.appendBool("started", true); + return true; + } - const string fromShard(cmdObj["from"].String()); +} recvChunkStartCmd; - // Set the TO-side migration to active - Status prepareStatus = migrateStatus.prepare(ns, fromShard, min, max, shardKeyPattern); +class RecvChunkStatusCommand : public ChunkCommandHelper { +public: + void help(std::stringstream& h) const { + h << "internal"; + } + RecvChunkStatusCommand() : ChunkCommandHelper("_recvChunkStatus") {} + virtual void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) { + ActionSet actions; + actions.addAction(ActionType::internal); + out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); + } + bool run(OperationContext* txn, + const string&, + BSONObj& cmdObj, + int, + string& errmsg, + BSONObjBuilder& result) { + migrateStatus.status(result); + return 1; + } - if (!prepareStatus.isOK()) { - return appendCommandStatus(result, prepareStatus); - } +} recvChunkStatusCommand; - std::thread m(migrateThread, - ns, - min, - max, - shardKeyPattern, - fromShard, - currentVersion.epoch(), - writeConcern); +class RecvChunkCommitCommand : public ChunkCommandHelper { +public: + void help(std::stringstream& h) const { + h << "internal"; + } + RecvChunkCommitCommand() : ChunkCommandHelper("_recvChunkCommit") {} + virtual void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) { + ActionSet actions; + actions.addAction(ActionType::internal); + out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); + } + bool run(OperationContext* txn, + const string&, + BSONObj& cmdObj, + int, + string& errmsg, + BSONObjBuilder& result) { + bool ok = migrateStatus.startCommit(); + migrateStatus.status(result); + return ok; + } - m.detach(); - result.appendBool( "started" , true ); - return true; - } +} recvChunkCommitCommand; - } recvChunkStartCmd; +class RecvChunkAbortCommand : public ChunkCommandHelper { +public: + void help(std::stringstream& h) const { + h << "internal"; + } + RecvChunkAbortCommand() : ChunkCommandHelper("_recvChunkAbort") {} + virtual void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) { + ActionSet actions; + actions.addAction(ActionType::internal); + out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); + } + bool run(OperationContext* txn, + const string&, + BSONObj& cmdObj, + int, + string& errmsg, + BSONObjBuilder& result) { + migrateStatus.abort(); + migrateStatus.status(result); + return true; + } - class RecvChunkStatusCommand : public ChunkCommandHelper { - public: - void help(std::stringstream& h) const { h << "internal"; } - RecvChunkStatusCommand() : ChunkCommandHelper( "_recvChunkStatus" ) {} - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) { - ActionSet actions; - actions.addAction(ActionType::internal); - out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); - } - bool run(OperationContext* txn, - const string&, - BSONObj& cmdObj, - int, - string& errmsg, - BSONObjBuilder& result) { - migrateStatus.status( result ); - return 1; - } - - } recvChunkStatusCommand; - - class RecvChunkCommitCommand : public ChunkCommandHelper { - public: - void help(std::stringstream& h) const { h << "internal"; } - RecvChunkCommitCommand() : ChunkCommandHelper( "_recvChunkCommit" ) {} - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) { - ActionSet actions; - actions.addAction(ActionType::internal); - out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); - } - bool run(OperationContext* txn, - const string&, - BSONObj& cmdObj, - int, - string& errmsg, - BSONObjBuilder& result) { - bool ok = migrateStatus.startCommit(); - migrateStatus.status( result ); - return ok; - } - - } recvChunkCommitCommand; - - class RecvChunkAbortCommand : public ChunkCommandHelper { - public: - void help(std::stringstream& h) const { h << "internal"; } - RecvChunkAbortCommand() : ChunkCommandHelper( "_recvChunkAbort" ) {} - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) { - ActionSet actions; - actions.addAction(ActionType::internal); - out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); - } - bool run(OperationContext* txn, - const string&, - BSONObj& cmdObj, - int, - string& errmsg, - BSONObjBuilder& result) { - migrateStatus.abort(); - migrateStatus.status( result ); - return true; - } +} recvChunkAboortCommand; - } recvChunkAboortCommand; +class IsInRangeTest : public StartupTest { +public: + void run() { + BSONObj min = BSON("x" << 1); + BSONObj max = BSON("x" << 5); + BSONObj skey = BSON("x" << 1); - class IsInRangeTest : public StartupTest { - public: - void run() { - BSONObj min = BSON( "x" << 1 ); - BSONObj max = BSON( "x" << 5 ); - BSONObj skey = BSON( "x" << 1 ); - - verify( ! isInRange( BSON( "x" << 0 ) , min , max , skey ) ); - verify( isInRange( BSON( "x" << 1 ) , min , max , skey ) ); - verify( isInRange( BSON( "x" << 3 ) , min , max , skey ) ); - verify( isInRange( BSON( "x" << 4 ) , min , max , skey ) ); - verify( ! isInRange( BSON( "x" << 5 ) , min , max , skey ) ); - verify( ! isInRange( BSON( "x" << 6 ) , min , max , skey ) ); - - BSONObj obj = BSON( "n" << 3 ); - BSONObj min2 = BSON( "x" << BSONElementHasher::hash64( obj.firstElement() , 0 ) - 2 ); - BSONObj max2 = BSON( "x" << BSONElementHasher::hash64( obj.firstElement() , 0 ) + 2 ); - BSONObj hashedKey = BSON( "x" << "hashed" ); - - verify( isInRange( BSON( "x" << 3 ) , min2 , max2 , hashedKey ) ); - verify( ! isInRange( BSON( "x" << 3 ) , min , max , hashedKey ) ); - verify( ! isInRange( BSON( "x" << 4 ) , min2 , max2 , hashedKey ) ); - - LOG(1) << "isInRangeTest passed" << migrateLog; - } - } isInRangeTest; + verify(!isInRange(BSON("x" << 0), min, max, skey)); + verify(isInRange(BSON("x" << 1), min, max, skey)); + verify(isInRange(BSON("x" << 3), min, max, skey)); + verify(isInRange(BSON("x" << 4), min, max, skey)); + verify(!isInRange(BSON("x" << 5), min, max, skey)); + verify(!isInRange(BSON("x" << 6), min, max, skey)); + + BSONObj obj = BSON("n" << 3); + BSONObj min2 = BSON("x" << BSONElementHasher::hash64(obj.firstElement(), 0) - 2); + BSONObj max2 = BSON("x" << BSONElementHasher::hash64(obj.firstElement(), 0) + 2); + BSONObj hashedKey = BSON("x" + << "hashed"); + + verify(isInRange(BSON("x" << 3), min2, max2, hashedKey)); + verify(!isInRange(BSON("x" << 3), min, max, hashedKey)); + verify(!isInRange(BSON("x" << 4), min2, max2, hashedKey)); + + LOG(1) << "isInRangeTest passed" << migrateLog; + } +} isInRangeTest; } |