diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2015-07-13 11:26:52 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2015-07-14 18:16:03 -0400 |
commit | 00c74d1386d4678ebcdeb37878fbb3c53cb02d83 (patch) | |
tree | 1f14b3adeb997b989d2a10d8f35f84c37f3efa51 /src/mongo/s/d_migrate.cpp | |
parent | d9500825fd369a98e177eeedbe9c64dbcf8d9392 (diff) | |
download | mongo-00c74d1386d4678ebcdeb37878fbb3c53cb02d83.tar.gz |
SERVER-18084 Modularize chunk migration state machine
* Move the sharding migration destination (receiving side) management to a separate source file
* Move the destination thread management to be owned by the destination manager
* Move the sharding migration source (donor side) management to a separate source file
* Remove the medianKey command since it is noop and not used anywhere
No migration logic changes.
Diffstat (limited to 'src/mongo/s/d_migrate.cpp')
-rw-r--r-- | src/mongo/s/d_migrate.cpp | 1765 |
1 files changed, 92 insertions, 1673 deletions
diff --git a/src/mongo/s/d_migrate.cpp b/src/mongo/s/d_migrate.cpp index b98c02dada8..beebc9b7dc6 100644 --- a/src/mongo/s/d_migrate.cpp +++ b/src/mongo/s/d_migrate.cpp @@ -39,31 +39,23 @@ #include "mongo/db/auth/action_set.h" #include "mongo/db/auth/action_type.h" #include "mongo/db/auth/authorization_manager.h" -#include "mongo/db/auth/authorization_manager_global.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/auth/privilege.h" -#include "mongo/db/catalog/document_validation.h" #include "mongo/db/catalog/index_create.h" #include "mongo/db/clientcursor.h" #include "mongo/db/commands.h" #include "mongo/db/concurrency/lock_state.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbhelpers.h" -#include "mongo/db/exec/plan_stage.h" #include "mongo/db/field_parser.h" -#include "mongo/db/hasher.h" #include "mongo/db/jsobj.h" #include "mongo/db/namespace_string.h" #include "mongo/db/op_observer.h" -#include "mongo/db/operation_context_impl.h" -#include "mongo/db/ops/delete.h" -#include "mongo/db/query/internal_plans.h" -#include "mongo/db/query/query_knobs.h" #include "mongo/db/range_deleter_service.h" -#include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_coordinator_global.h" -#include "mongo/db/storage/mmap_v1/dur.h" #include "mongo/db/s/collection_metadata.h" +#include "mongo/db/s/migration_destination_manager.h" +#include "mongo/db/s/migration_source_manager.h" #include "mongo/db/s/sharded_connection_info.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/write_concern.h" @@ -78,29 +70,15 @@ #include "mongo/s/config.h" #include "mongo/s/d_state.h" #include "mongo/s/grid.h" -#include "mongo/s/shard_key_pattern.h" #include "mongo/stdx/chrono.h" -#include "mongo/stdx/condition_variable.h" #include "mongo/stdx/memory.h" -#include "mongo/stdx/mutex.h" -#include "mongo/stdx/thread.h" #include "mongo/util/assert_util.h" -#include "mongo/util/elapsed_tracker.h" #include "mongo/util/exit.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" -#include "mongo/util/processinfo.h" -#include "mongo/util/startup_test.h" - -// Pause while a fail point is enabled. -#define MONGO_FP_PAUSE_WHILE(symbol) \ - while (MONGO_FAIL_POINT(symbol)) { \ - sleepmillis(100); \ - } namespace mongo { -using namespace stdx::chrono; using std::list; using std::set; using std::string; @@ -112,9 +90,14 @@ namespace { const int kDefaultWTimeoutMs = 60 * 1000; const WriteConcernOptions DefaultWriteConcern(2, WriteConcernOptions::NONE, kDefaultWTimeoutMs); +Tee* migrateLog = RamLog::get("migrate"); + +MigrationSourceManager migrateSourceManager; +MigrationDestinationManager migrateDestManager; + /** - * Returns the default write concern for migration cleanup (at donor shard) and - * cloning documents (at recipient shard). + * Returns the default write concern for migration cleanup (at donor shard) and cloning documents + * (on the destination shard). */ WriteConcernOptions getDefaultWriteConcern() { repl::ReplicationCoordinator* replCoordinator = repl::getGlobalReplicationCoordinator(); @@ -128,98 +111,39 @@ WriteConcernOptions getDefaultWriteConcern() { return WriteConcernOptions(1, WriteConcernOptions::NONE, 0); } -} // namespace - -MONGO_FP_DECLARE(failMigrationCommit); -MONGO_FP_DECLARE(failMigrationConfigWritePrepare); -MONGO_FP_DECLARE(failMigrationApplyOps); - -Tee* migrateLog = RamLog::get("migrate"); - -class MoveTimingHelper { -public: - MoveTimingHelper(OperationContext* txn, - const string& where, - const string& ns, - BSONObj min, - BSONObj max, - int total, - string* cmdErrmsg, - string toShard, - string fromShard) - : _txn(txn), - _where(where), - _ns(ns), - _to(toShard), - _from(fromShard), - _next(0), - _total(total), - _cmdErrmsg(cmdErrmsg) { - _b.append("min", min); - _b.append("max", max); +struct MigrateStatusHolder { + MigrateStatusHolder(OperationContext* txn, + MigrationSourceManager* migrateSourceManager, + const std::string& ns, + const BSONObj& min, + const BSONObj& max, + const BSONObj& shardKeyPattern) + : _txn(txn), _migrateSourceManager(migrateSourceManager) { + _isAnotherMigrationActive = + !_migrateSourceManager->start(txn, ns, min, max, shardKeyPattern); } - - ~MoveTimingHelper() { - // even if logChange doesn't throw, bson does - // sigh - try { - if (!_to.empty()) { - _b.append("to", _to); - } - if (!_from.empty()) { - _b.append("from", _from); - } - if (_next != _total) { - _b.append("note", "aborted"); - } else { - _b.append("note", "success"); - } - if (!_cmdErrmsg->empty()) { - _b.append("errmsg", *_cmdErrmsg); - } - - grid.catalogManager()->logChange(_txn->getClient()->clientAddress(true), - (string) "moveChunk." + _where, - _ns, - _b.obj()); - } catch (const std::exception& e) { - warning() << "couldn't record timing for moveChunk '" << _where << "': " << e.what() - << migrateLog; + ~MigrateStatusHolder() { + if (!_isAnotherMigrationActive) { + _migrateSourceManager->done(_txn); } } - void done(int step) { - invariant(step == ++_next); - invariant(step <= _total); - - const string s = str::stream() << "step " << step << " of " << _total; - - CurOp* op = CurOp::get(_txn); - { - stdx::lock_guard<Client> lk(*_txn->getClient()); - op->setMessage_inlock(s.c_str()); - } - - _b.appendNumber(s, _t.millis()); - _t.reset(); + bool isAnotherMigrationActive() const { + return _isAnotherMigrationActive; } private: OperationContext* const _txn; - Timer _t; + MigrationSourceManager* const _migrateSourceManager; - string _where; - string _ns; - string _to; - string _from; - - int _next; - int _total; // expected # of steps + bool _isAnotherMigrationActive; +}; - const string* _cmdErrmsg; +} // namespace - BSONObjBuilder _b; -}; +MONGO_FP_DECLARE(failMigrationCommit); +MONGO_FP_DECLARE(failMigrationConfigWritePrepare); +MONGO_FP_DECLARE(failMigrationApplyOps); class ChunkCommandHelper : public Command { public: @@ -239,662 +163,6 @@ public: } }; -bool isInRange(const BSONObj& obj, - const BSONObj& min, - const BSONObj& max, - const BSONObj& shardKeyPattern) { - ShardKeyPattern shardKey(shardKeyPattern); - BSONObj k = shardKey.extractShardKeyFromDoc(obj); - return k.woCompare(min) >= 0 && k.woCompare(max) < 0; -} - -class MigrateFromStatus { -public: - MigrateFromStatus() : _inCriticalSection(false), _memoryUsed(0), _active(false) {} - - /** - * @return false if cannot start. One of the reason for not being able to - * start is there is already an existing migration in progress. - */ - bool start(OperationContext* txn, - const std::string& ns, - const BSONObj& min, - const BSONObj& max, - const BSONObj& shardKeyPattern) { - verify(!min.isEmpty()); - verify(!max.isEmpty()); - verify(!ns.empty()); - - // Get global shared to synchronize with logOp. Also see comments in the class - // members declaration for more details. - Lock::GlobalRead globalShared(txn->lockState()); - stdx::lock_guard<stdx::mutex> lk(_mutex); - - if (_active) { - return false; - } - - _ns = ns; - _min = min; - _max = max; - _shardKeyPattern = shardKeyPattern; - - verify(_deleted.size() == 0); - verify(_reload.size() == 0); - verify(_memoryUsed == 0); - - _active = true; - - stdx::lock_guard<stdx::mutex> tLock(_cloneLocsMutex); - verify(_cloneLocs.size() == 0); - - return true; - } - - void done(OperationContext* txn) { - log() << "MigrateFromStatus::done About to acquire global lock to exit critical " - "section"; - - // Get global shared to synchronize with logOp. Also see comments in the class - // members declaration for more details. - Lock::GlobalRead globalShared(txn->lockState()); - stdx::lock_guard<stdx::mutex> lk(_mutex); - - _active = false; - _deleteNotifyExec.reset(NULL); - _inCriticalSection = false; - _inCriticalSectionCV.notify_all(); - - _deleted.clear(); - _reload.clear(); - _memoryUsed = 0; - - stdx::lock_guard<stdx::mutex> cloneLock(_cloneLocsMutex); - _cloneLocs.clear(); - } - - void logOp(OperationContext* txn, - const char* opstr, - const char* ns, - const BSONObj& obj, - BSONObj* patt, - bool notInActiveChunk) { - ensureShardVersionOKOrThrow(txn->getClient(), ns); - - const char op = opstr[0]; - - if (notInActiveChunk) { - // Ignore writes that came from the migration process like cleanup so they - // won't be transferred to the recipient shard. Also ignore ops from - // _migrateClone and _transferMods since it is impossible to move a chunk - // to self. - return; - } - - dassert(txn->lockState()->isWriteLocked()); // Must have Global IX. - - if (!_active) - return; - - if (_ns != ns) - return; - - // no need to log if this is not an insertion, an update, or an actual deletion - // note: opstr 'db' isn't a deletion but a mention that a database exists - // (for replication machinery mostly). - if (op == 'n' || op == 'c' || (op == 'd' && opstr[1] == 'b')) - return; - - BSONElement ide; - if (patt) - ide = patt->getField("_id"); - else - ide = obj["_id"]; - - if (ide.eoo()) { - warning() << "logOpForSharding got mod with no _id, ignoring obj: " << obj - << migrateLog; - return; - } - - if (op == 'i' && (!isInRange(obj, _min, _max, _shardKeyPattern))) { - return; - } - - BSONObj idObj(ide.wrap()); - - if (op == 'u') { - BSONObj fullDoc; - OldClientContext ctx(txn, _ns, false); - if (!Helpers::findById(txn, ctx.db(), _ns.c_str(), idObj, fullDoc)) { - warning() << "logOpForSharding couldn't find: " << idObj - << " even though should have" << migrateLog; - dassert(false); // TODO: Abort the migration. - return; - } - - if (!isInRange(fullDoc, _min, _max, _shardKeyPattern)) { - return; - } - } - - // Note: can't check if delete is in active chunk since the document is gone! - - txn->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idObj, op)); - } - - /** - * Insert items from docIdList to a new array with the given fieldName in the given - * builder. If explode is true, the inserted object will be the full version of the - * document. Note that the whenever an item from the docList is inserted to the array, - * it will also be removed from docList. - * - * Should be holding the collection lock for ns if explode is true. - */ - void xfer(OperationContext* txn, - const string& ns, - Database* db, - list<BSONObj>* docIdList, - BSONObjBuilder& builder, - const char* fieldName, - long long& size, - bool explode) { - const long long maxSize = 1024 * 1024; - - if (docIdList->size() == 0 || size > maxSize) - return; - - BSONArrayBuilder arr(builder.subarrayStart(fieldName)); - - list<BSONObj>::iterator docIdIter = docIdList->begin(); - while (docIdIter != docIdList->end() && size < maxSize) { - BSONObj idDoc = *docIdIter; - if (explode) { - BSONObj fullDoc; - if (Helpers::findById(txn, db, ns.c_str(), idDoc, fullDoc)) { - arr.append(fullDoc); - size += fullDoc.objsize(); - } - } else { - arr.append(idDoc); - size += idDoc.objsize(); - } - - docIdIter = docIdList->erase(docIdIter); - } - - arr.done(); - } - - /** - * called from the dest of a migrate - * transfers mods from src to dest - */ - bool transferMods(OperationContext* txn, string& errmsg, BSONObjBuilder& b) { - long long size = 0; - - { - AutoGetCollectionForRead ctx(txn, getNS()); - - stdx::lock_guard<stdx::mutex> sl(_mutex); - if (!_active) { - errmsg = "no active migration!"; - return false; - } - - // TODO: fix SERVER-16540 race - xfer(txn, _ns, ctx.getDb(), &_deleted, b, "deleted", size, false); - xfer(txn, _ns, ctx.getDb(), &_reload, b, "reload", size, true); - } - - b.append("size", size); - - return true; - } - - /** - * Get the disklocs that belong to the chunk migrated and sort them in _cloneLocs - * (to avoid seeking disk later). - * - * @param maxChunkSize number of bytes beyond which a chunk's base data (no indices) - * is considered too large to move. - * @param errmsg filled with textual description of error if this call return false. - * @return false if approximate chunk size is too big to move or true otherwise. - */ - bool storeCurrentLocs(OperationContext* txn, - long long maxChunkSize, - string& errmsg, - BSONObjBuilder& result) { - AutoGetCollectionForRead ctx(txn, getNS()); - Collection* collection = ctx.getCollection(); - if (!collection) { - errmsg = "ns not found, should be impossible"; - return false; - } - - // Allow multiKey based on the invariant that shard keys must be single-valued. - // Therefore, any multi-key index prefixed by shard key cannot be multikey over - // the shard key fields. - IndexDescriptor* idx = - collection->getIndexCatalog()->findShardKeyPrefixedIndex(txn, - _shardKeyPattern, - false); // requireSingleKey - - if (idx == NULL) { - errmsg = str::stream() << "can't find index with prefix " << _shardKeyPattern - << " in storeCurrentLocs for " << _ns; - return false; - } - - // Assume both min and max non-empty, append MinKey's to make them fit chosen index - BSONObj min; - BSONObj max; - KeyPattern kp(idx->keyPattern()); - - { - // It's alright not to lock _mutex all the way through based on the assumption - // that this is only called by the main thread that drives the migration and - // only it can start and stop the current migration. - stdx::lock_guard<stdx::mutex> sl(_mutex); - - invariant(_deleteNotifyExec.get() == NULL); - unique_ptr<WorkingSet> ws = stdx::make_unique<WorkingSet>(); - unique_ptr<DeleteNotificationStage> dns = stdx::make_unique<DeleteNotificationStage>(); - // Takes ownership of 'ws' and 'dns'. - auto statusWithPlanExecutor = PlanExecutor::make( - txn, std::move(ws), std::move(dns), collection, PlanExecutor::YIELD_MANUAL); - invariant(statusWithPlanExecutor.isOK()); - _deleteNotifyExec = std::move(statusWithPlanExecutor.getValue()); - _deleteNotifyExec->registerExec(); - - min = Helpers::toKeyFormat(kp.extendRangeBound(_min, false)); - max = Helpers::toKeyFormat(kp.extendRangeBound(_max, false)); - } - - unique_ptr<PlanExecutor> exec( - InternalPlanner::indexScan(txn, collection, idx, min, max, false)); - // We can afford to yield here because any change to the base data that we might - // miss is already being queued and will migrate in the 'transferMods' stage. - exec->setYieldPolicy(PlanExecutor::YIELD_AUTO); - - // use the average object size to estimate how many objects a full chunk would carry - // do that while traversing the chunk's range using the sharding index, below - // there's a fair amount of slack before we determine a chunk is too large because object - // sizes will vary - unsigned long long maxRecsWhenFull; - long long avgRecSize; - const long long totalRecs = collection->numRecords(txn); - if (totalRecs > 0) { - avgRecSize = collection->dataSize(txn) / totalRecs; - maxRecsWhenFull = maxChunkSize / avgRecSize; - maxRecsWhenFull = std::min((unsigned long long)(Chunk::MaxObjectPerChunk + 1), - 130 * maxRecsWhenFull / 100 /* slack */); - } else { - avgRecSize = 0; - maxRecsWhenFull = Chunk::MaxObjectPerChunk + 1; - } - - // do a full traversal of the chunk and don't stop even if we think it is a large chunk - // we want the number of records to better report, in that case - bool isLargeChunk = false; - unsigned long long recCount = 0; - ; - RecordId dl; - while (PlanExecutor::ADVANCED == exec->getNext(NULL, &dl)) { - if (!isLargeChunk) { - stdx::lock_guard<stdx::mutex> lk(_cloneLocsMutex); - _cloneLocs.insert(dl); - } - - if (++recCount > maxRecsWhenFull) { - isLargeChunk = true; - // continue on despite knowing that it will fail, - // just to get the correct value for recCount. - } - } - exec.reset(); - - if (isLargeChunk) { - stdx::lock_guard<stdx::mutex> sl(_mutex); - warning() << "cannot move chunk: the maximum number of documents for a chunk is " - << maxRecsWhenFull << " , the maximum chunk size is " << maxChunkSize - << " , average document size is " << avgRecSize << ". Found " << recCount - << " documents in chunk " - << " ns: " << _ns << " " << _min << " -> " << _max << migrateLog; - - result.appendBool("chunkTooBig", true); - result.appendNumber("estimatedChunkSize", (long long)(recCount * avgRecSize)); - errmsg = "chunk too big to move"; - return false; - } - - log() << "moveChunk number of documents: " << cloneLocsRemaining() << migrateLog; - - txn->recoveryUnit()->abandonSnapshot(); - return true; - } - - bool clone(OperationContext* txn, string& errmsg, BSONObjBuilder& result) { - ElapsedTracker tracker(internalQueryExecYieldIterations, internalQueryExecYieldPeriodMS); - - int allocSize = 0; - { - AutoGetCollectionForRead ctx(txn, getNS()); - - stdx::lock_guard<stdx::mutex> sl(_mutex); - if (!_active) { - errmsg = "not active"; - return false; - } - - Collection* collection = ctx.getCollection(); - if (!collection) { - errmsg = str::stream() << "collection " << _ns << " does not exist"; - return false; - } - - allocSize = std::min( - BSONObjMaxUserSize, - static_cast<int>((12 + collection->averageObjectSize(txn)) * cloneLocsRemaining())); - } - - bool isBufferFilled = false; - BSONArrayBuilder clonedDocsArrayBuilder(allocSize); - while (!isBufferFilled) { - AutoGetCollectionForRead ctx(txn, getNS()); - - stdx::lock_guard<stdx::mutex> sl(_mutex); - if (!_active) { - errmsg = "not active"; - return false; - } - - // TODO: fix SERVER-16540 race - - Collection* collection = ctx.getCollection(); - - if (!collection) { - errmsg = str::stream() << "collection " << _ns << " does not exist"; - return false; - } - - stdx::lock_guard<stdx::mutex> lk(_cloneLocsMutex); - set<RecordId>::iterator cloneLocsIter = _cloneLocs.begin(); - for (; cloneLocsIter != _cloneLocs.end(); ++cloneLocsIter) { - if (tracker.intervalHasElapsed()) // should I yield? - break; - - RecordId dl = *cloneLocsIter; - Snapshotted<BSONObj> doc; - if (!collection->findDoc(txn, dl, &doc)) { - // doc was deleted - continue; - } - - // Use the builder size instead of accumulating 'doc's size so that we take - // into consideration the overhead of BSONArray indices, and *always* - // append one doc. - if (clonedDocsArrayBuilder.arrSize() != 0 && - (clonedDocsArrayBuilder.len() + doc.value().objsize() + 1024) > - BSONObjMaxUserSize) { - isBufferFilled = true; // break out of outer while loop - break; - } - - clonedDocsArrayBuilder.append(doc.value()); - } - - _cloneLocs.erase(_cloneLocs.begin(), cloneLocsIter); - - // Note: must be holding _cloneLocsMutex, don't move this inside while condition! - if (_cloneLocs.empty()) { - break; - } - } - - result.appendArray("objects", clonedDocsArrayBuilder.arr()); - return true; - } - - void aboutToDelete(const RecordId& dl) { - // Even though above we call findDoc to check for existance - // that check only works for non-mmapv1 engines, and this is needed - // for mmapv1. - - stdx::lock_guard<stdx::mutex> lk(_cloneLocsMutex); - _cloneLocs.erase(dl); - } - - std::size_t cloneLocsRemaining() { - stdx::lock_guard<stdx::mutex> lk(_cloneLocsMutex); - return _cloneLocs.size(); - } - - long long mbUsed() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); - return _memoryUsed / (1024 * 1024); - } - - bool getInCriticalSection() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); - return _inCriticalSection; - } - - void setInCriticalSection(bool b) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - _inCriticalSection = b; - _inCriticalSectionCV.notify_all(); - } - - std::string getNS() const { - stdx::lock_guard<stdx::mutex> sl(_mutex); - return _ns; - } - - /** - * @return true if we are NOT in the critical section - */ - bool waitTillNotInCriticalSection(int maxSecondsToWait) { - const auto deadline = system_clock::now() + seconds(maxSecondsToWait); - stdx::unique_lock<stdx::mutex> lk(_mutex); - while (_inCriticalSection) { - if (stdx::cv_status::timeout == _inCriticalSectionCV.wait_until(lk, deadline)) - return false; - } - - return true; - } - - bool isActive() const { - return _getActive(); - } - -private: - bool _getActive() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); - return _active; - } - void _setActive(bool b) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - _active = b; - } - - /** - * Used to commit work for LogOpForSharding. Used to keep track of changes in documents - * that are part of a chunk being migrated. - */ - class LogOpForShardingHandler : public RecoveryUnit::Change { - public: - /** - * Invariant: idObj should belong to a document that is part of the active chunk - * being migrated. - */ - LogOpForShardingHandler(MigrateFromStatus* migrateFromStatus, - const BSONObj& idObj, - const char op) - : _migrateFromStatus(migrateFromStatus), _idObj(idObj.getOwned()), _op(op) {} - - virtual void commit() { - switch (_op) { - case 'd': { - stdx::lock_guard<stdx::mutex> sl(_migrateFromStatus->_mutex); - _migrateFromStatus->_deleted.push_back(_idObj); - _migrateFromStatus->_memoryUsed += _idObj.firstElement().size() + 5; - break; - } - - case 'i': - case 'u': { - stdx::lock_guard<stdx::mutex> sl(_migrateFromStatus->_mutex); - _migrateFromStatus->_reload.push_back(_idObj); - _migrateFromStatus->_memoryUsed += _idObj.firstElement().size() + 5; - break; - } - - default: - invariant(false); - } - } - - virtual void rollback() {} - - private: - MigrateFromStatus* _migrateFromStatus; - const BSONObj _idObj; - const char _op; - }; - - /** - * Used to receive invalidation notifications. - * - * XXX: move to the exec/ directory. - */ - class DeleteNotificationStage : public PlanStage { - public: - virtual void invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type); - - virtual StageState work(WorkingSetID* out) { - invariant(false); - } - virtual bool isEOF() { - invariant(false); - return false; - } - virtual void kill() {} - virtual void saveState() { - invariant(false); - } - virtual void restoreState(OperationContext* opCtx) { - invariant(false); - } - virtual unique_ptr<PlanStageStats> getStats() { - invariant(false); - return NULL; - } - virtual CommonStats* getCommonStats() const { - invariant(false); - return NULL; - } - virtual SpecificStats* getSpecificStats() const { - invariant(false); - return NULL; - } - virtual std::vector<PlanStage*> getChildren() const { - vector<PlanStage*> empty; - return empty; - } - virtual StageType stageType() const { - return STAGE_NOTIFY_DELETE; - } - }; - - // - // All member variables are labeled with one of the following codes indicating the - // synchronization rules for accessing them. - // - // (M) Must hold _mutex for access. - // (MG) For reads, _mutex *OR* Global IX Lock must be held. - // For writes, the _mutex *AND* (Global Shared or Exclusive Lock) must be held. - // (C) Must hold _cloneLocsMutex for access. - // - // Locking order: - // - // Global Lock -> _mutex -> _cloneLocsMutex - - mutable stdx::mutex _mutex; - - stdx::condition_variable _inCriticalSectionCV; // (M) - - // Is migration currently in critical section. This can be used to block new writes. - bool _inCriticalSection; // (M) - - unique_ptr<PlanExecutor> _deleteNotifyExec; // (M) - - // List of _id of documents that were modified that must be re-cloned. - list<BSONObj> _reload; // (M) - - // List of _id of documents that were deleted during clone that should be deleted later. - list<BSONObj> _deleted; // (M) - - // bytes in _reload + _deleted - long long _memoryUsed; // (M) - - // If a migration is currently active. - bool _active; // (MG) - - string _ns; // (MG) - BSONObj _min; // (MG) - BSONObj _max; // (MG) - BSONObj _shardKeyPattern; // (MG) - - mutable stdx::mutex _cloneLocsMutex; - - // List of record id that needs to be transferred from here to the other side. - set<RecordId> _cloneLocs; // (C) - -} migrateFromStatus; - -void MigrateFromStatus::DeleteNotificationStage::invalidate(OperationContext* txn, - const RecordId& dl, - InvalidationType type) { - if (type == INVALIDATION_DELETION) { - migrateFromStatus.aboutToDelete(dl); - } -} - -struct MigrateStatusHolder { - MigrateStatusHolder(OperationContext* txn, - const std::string& ns, - const BSONObj& min, - const BSONObj& max, - const BSONObj& shardKeyPattern) - : _txn(txn) { - _isAnotherMigrationActive = !migrateFromStatus.start(txn, ns, min, max, shardKeyPattern); - } - ~MigrateStatusHolder() { - if (!_isAnotherMigrationActive) { - migrateFromStatus.done(_txn); - } - } - - bool isAnotherMigrationActive() const { - return _isAnotherMigrationActive; - } - -private: - OperationContext* _txn; - bool _isAnotherMigrationActive; -}; - -void logOpForSharding(OperationContext* txn, - const char* opstr, - const char* ns, - const BSONObj& obj, - BSONObj* patt, - bool notInActiveChunk) { - migrateFromStatus.logOp(txn, opstr, ns, obj, patt, notInActiveChunk); -} - class TransferModsCommand : public ChunkCommandHelper { public: void help(std::stringstream& h) const { @@ -914,8 +182,9 @@ public: int, string& errmsg, BSONObjBuilder& result) { - return migrateFromStatus.transferMods(txn, errmsg, result); + return migrateSourceManager.transferMods(txn, errmsg, result); } + } transferModsCommand; @@ -938,8 +207,9 @@ public: int, string& errmsg, BSONObjBuilder& result) { - return migrateFromStatus.clone(txn, errmsg, result); + return migrateSourceManager.clone(txn, errmsg, result); } + } initialCloneCommand; // Tests can pause / resume moveChunk's progress at each step by enabling / disabling each fail @@ -1153,11 +423,12 @@ public: log() << "received moveChunk request: " << cmdObj << migrateLog; timing.done(1); - MONGO_FP_PAUSE_WHILE(moveChunkHangAtStep1); + + MONGO_FAIL_POINT_PAUSE_WHILE_SET(moveChunkHangAtStep1); // 2. - if (migrateFromStatus.isActive()) { + if (migrateSourceManager.isActive()) { errmsg = "migration already in progress"; warning() << errmsg; return false; @@ -1257,10 +528,11 @@ public: log() << "moveChunk request accepted at version " << origShardVersion; timing.done(2); - MONGO_FP_PAUSE_WHILE(moveChunkHangAtStep2); + + MONGO_FAIL_POINT_PAUSE_WHILE_SET(moveChunkHangAtStep2); // 3. - MigrateStatusHolder statusHolder(txn, ns, min, max, shardKeyPattern); + MigrateStatusHolder statusHolder(txn, &migrateSourceManager, ns, min, max, shardKeyPattern); if (statusHolder.isAnotherMigrationActive()) { errmsg = "moveChunk is already in progress from this shard"; @@ -1293,7 +565,7 @@ public: { // See comment at the top of the function for more information on what // synchronization is used here. - if (!migrateFromStatus.storeCurrentLocs(txn, maxChunkSize, errmsg, result)) { + if (!migrateSourceManager.storeCurrentLocs(txn, maxChunkSize, errmsg, result)) { warning() << errmsg; return false; } @@ -1341,8 +613,10 @@ public: return false; } } + timing.done(3); - MONGO_FP_PAUSE_WHILE(moveChunkHangAtStep3); + + MONGO_FAIL_POINT_PAUSE_WHILE_SET(moveChunkHangAtStep3); // 4. @@ -1385,7 +659,7 @@ public: } LOG(0) << "moveChunk data transfer progress: " << res - << " my mem used: " << migrateFromStatus.mbUsed() << migrateLog; + << " my mem used: " << migrateSourceManager.mbUsed() << migrateLog; if (!ok || res["state"].String() == "fail") { warning() << "moveChunk error transferring data caused migration abort: " << res @@ -1398,7 +672,7 @@ public: if (res["state"].String() == "steady") break; - if (migrateFromStatus.mbUsed() > (500 * 1024 * 1024)) { + if (migrateSourceManager.mbUsed() > (500 * 1024 * 1024)) { // This is too much memory for us to use for this so we're going to abort // the migrate ScopedDbConnection conn(toShardCS); @@ -1422,7 +696,8 @@ public: } timing.done(4); - MONGO_FP_PAUSE_WHILE(moveChunkHangAtStep4); + + MONGO_FAIL_POINT_PAUSE_WHILE_SET(moveChunkHangAtStep4); // 5. @@ -1432,7 +707,7 @@ public: log() << "About to check if it is safe to enter critical section"; // Ensure all cloned docs have actually been transferred - std::size_t locsRemaining = migrateFromStatus.cloneLocsRemaining(); + std::size_t locsRemaining = migrateSourceManager.cloneLocsRemaining(); if (locsRemaining != 0) { errmsg = str::stream() << "moveChunk cannot enter critical section before all data is" << " cloned, " << locsRemaining << " locs were not transferred" @@ -1459,7 +734,7 @@ public: // 5.a // we're under the collection lock here, so no other migrate can change maxVersion // or CollectionMetadata state - migrateFromStatus.setInCriticalSection(true); + migrateSourceManager.setInCriticalSection(true); ChunkVersion myVersion = origCollVersion; myVersion.incMajor(); @@ -1600,7 +875,6 @@ public: log() << "moveChunk updating self version to: " << nextVersion << " through " << bumpMin << " -> " << bumpMax << " for collection '" << ns << "'" << migrateLog; - } else { log() << "moveChunk moved last chunk out for collection '" << ns << "'" << migrateLog; @@ -1713,7 +987,7 @@ public: } } - migrateFromStatus.setInCriticalSection(false); + migrateSourceManager.setInCriticalSection(false); // 5.d BSONObjBuilder commitInfo; @@ -1726,9 +1000,10 @@ public: txn->getClient()->clientAddress(true), "moveChunk.commit", ns, commitInfo.obj()); } - migrateFromStatus.done(txn); + migrateSourceManager.done(txn); timing.done(5); - MONGO_FP_PAUSE_WHILE(moveChunkHangAtStep5); + + MONGO_FAIL_POINT_PAUSE_WHILE_SET(moveChunkHangAtStep5); // 6. // NOTE: It is important that the distributed collection lock be held for this step. @@ -1761,22 +1036,16 @@ public: log() << "could not queue migration cleanup: " << errMsg; } } + timing.done(6); - MONGO_FP_PAUSE_WHILE(moveChunkHangAtStep6); + + MONGO_FAIL_POINT_PAUSE_WHILE_SET(moveChunkHangAtStep6); return true; } } moveChunkCmd; -bool ShardingState::inCriticalMigrateSection() { - return migrateFromStatus.getInCriticalSection(); -} - -bool ShardingState::waitTillNotInCriticalSection(int maxSecondsToWait) { - return migrateFromStatus.waitTillNotInCriticalSection(maxSecondsToWait); -} - /* ----- below this are the "to" side commands @@ -1790,837 +1059,6 @@ bool ShardingState::waitTillNotInCriticalSection(int maxSecondsToWait) { commend to "commit" */ -// Enabling / disabling these fail points pauses / resumes MigrateStatus::_go(), the thread -// that receives a chunk migration from the donor. -MONGO_FP_DECLARE(migrateThreadHangAtStep1); -MONGO_FP_DECLARE(migrateThreadHangAtStep2); -MONGO_FP_DECLARE(migrateThreadHangAtStep3); -MONGO_FP_DECLARE(migrateThreadHangAtStep4); -MONGO_FP_DECLARE(migrateThreadHangAtStep5); - -class MigrateStatus { -public: - enum State { READY, CLONE, CATCHUP, STEADY, COMMIT_START, DONE, FAIL, ABORT }; - - MigrateStatus() - : _active(false), - _numCloned(0), - _clonedBytes(0), - _numCatchup(0), - _numSteady(0), - _state(READY) {} - - void setState(State newState) { - stdx::lock_guard<stdx::mutex> sl(_mutex); - _state = newState; - } - - State getState() const { - stdx::lock_guard<stdx::mutex> sl(_mutex); - return _state; - } - - /** - * Returns OK if preparation was successful. - */ - Status prepare(const std::string& ns, - const std::string& fromShard, - const BSONObj& min, - const BSONObj& max, - const BSONObj& shardKeyPattern) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - - if (_active) { - return Status(ErrorCodes::ConflictingOperationInProgress, - str::stream() << "Active migration already in progress " - << "ns: " << _ns << ", from: " << _from << ", min: " << _min - << ", max: " << _max); - } - - _state = READY; - _errmsg = ""; - - _ns = ns; - _from = fromShard; - _min = min; - _max = max; - _shardKeyPattern = shardKeyPattern; - - _numCloned = 0; - _clonedBytes = 0; - _numCatchup = 0; - _numSteady = 0; - - _active = true; - - return Status::OK(); - } - - void go(OperationContext* txn, - const std::string& ns, - BSONObj min, - BSONObj max, - BSONObj shardKeyPattern, - const std::string& fromShard, - const OID& epoch, - const WriteConcernOptions& writeConcern) { - try { - _go(txn, ns, min, max, shardKeyPattern, fromShard, epoch, writeConcern); - } catch (std::exception& e) { - { - stdx::lock_guard<stdx::mutex> sl(_mutex); - _state = FAIL; - _errmsg = e.what(); - } - - error() << "migrate failed: " << e.what() << migrateLog; - } catch (...) { - { - stdx::lock_guard<stdx::mutex> sl(_mutex); - _state = FAIL; - _errmsg = "UNKNOWN ERROR"; - } - - error() << "migrate failed with unknown exception" << migrateLog; - } - - if (getState() != DONE) { - // Unprotect the range if needed/possible on unsuccessful TO migration - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dbLock(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX); - Lock::CollectionLock collLock(txn->lockState(), ns, MODE_X); - - string errMsg; - if (!ShardingState::get(txn)->forgetPending(txn, ns, min, max, epoch, &errMsg)) { - warning() << errMsg; - } - } - - setActive(false); - } - - void _go(OperationContext* txn, - const std::string& ns, - BSONObj min, - BSONObj max, - BSONObj shardKeyPattern, - const std::string& fromShard, - const OID& epoch, - const WriteConcernOptions& writeConcern) { - verify(getActive()); - verify(getState() == READY); - verify(!min.isEmpty()); - verify(!max.isEmpty()); - - DisableDocumentValidation validationDisabler(txn); - - log() << "starting receiving-end of migration of chunk " << min << " -> " << max - << " for collection " << ns << " from " << fromShard << " at epoch " - << epoch.toString(); - - string errmsg; - MoveTimingHelper timing(txn, "to", ns, min, max, 5 /* steps */, &errmsg, "", ""); - - ScopedDbConnection conn(fromShard); - conn->getLastError(); // just test connection - - NamespaceString nss(ns); - { - // 0. copy system.namespaces entry if collection doesn't already exist - OldClientWriteContext ctx(txn, ns); - if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss)) { - errmsg = str::stream() << "Not primary during migration: " << ns - << ": checking if collection exists"; - warning() << errmsg; - setState(FAIL); - return; - } - - // Only copy if ns doesn't already exist - Database* db = ctx.db(); - Collection* collection = db->getCollection(ns); - - if (!collection) { - list<BSONObj> infos = conn->getCollectionInfos( - nsToDatabase(ns), BSON("name" << nsToCollectionSubstring(ns))); - - BSONObj options; - if (infos.size() > 0) { - BSONObj entry = infos.front(); - if (entry["options"].isABSONObj()) { - options = entry["options"].Obj(); - } - } - - WriteUnitOfWork wuow(txn); - Status status = userCreateNS(txn, db, ns, options, false); - if (!status.isOK()) { - warning() << "failed to create collection [" << ns << "] " - << " with options " << options << ": " << status; - } - wuow.commit(); - } - } - - { - // 1. copy indexes - - vector<BSONObj> indexSpecs; - { - const std::list<BSONObj> indexes = conn->getIndexSpecs(ns); - indexSpecs.insert(indexSpecs.begin(), indexes.begin(), indexes.end()); - } - - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock lk(txn->lockState(), nsToDatabaseSubstring(ns), MODE_X); - OldClientContext ctx(txn, ns); - if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss)) { - errmsg = str::stream() << "Not primary during migration: " << ns; - warning() << errmsg; - setState(FAIL); - return; - } - - Database* db = ctx.db(); - Collection* collection = db->getCollection(ns); - if (!collection) { - errmsg = str::stream() << "collection dropped during migration: " << ns; - warning() << errmsg; - setState(FAIL); - return; - } - - MultiIndexBlock indexer(txn, collection); - - indexer.removeExistingIndexes(&indexSpecs); - - if (!indexSpecs.empty()) { - // Only copy indexes if the collection does not have any documents. - if (collection->numRecords(txn) > 0) { - errmsg = str::stream() << "aborting migration, shard is missing " - << indexSpecs.size() << " indexes and " - << "collection is not empty. Non-trivial " - << "index creation should be scheduled manually"; - warning() << errmsg; - setState(FAIL); - return; - } - - Status status = indexer.init(indexSpecs); - if (!status.isOK()) { - errmsg = str::stream() << "failed to create index before migrating data. " - << " error: " << status.toString(); - warning() << errmsg; - setState(FAIL); - return; - } - - status = indexer.insertAllDocumentsInCollection(); - if (!status.isOK()) { - errmsg = str::stream() << "failed to create index before migrating data. " - << " error: " << status.toString(); - warning() << errmsg; - setState(FAIL); - return; - } - - WriteUnitOfWork wunit(txn); - indexer.commit(); - - for (size_t i = 0; i < indexSpecs.size(); i++) { - // make sure to create index on secondaries as well - getGlobalServiceContext()->getOpObserver()->onCreateIndex( - txn, db->getSystemIndexesName(), indexSpecs[i], true /* fromMigrate */); - } - - wunit.commit(); - } - - timing.done(1); - MONGO_FP_PAUSE_WHILE(migrateThreadHangAtStep1); - } - - { - // 2. delete any data already in range - RangeDeleterOptions deleterOptions( - KeyRange(ns, min.getOwned(), max.getOwned(), shardKeyPattern)); - deleterOptions.writeConcern = writeConcern; - // No need to wait since all existing cursors will filter out this range when - // returning the results. - deleterOptions.waitForOpenCursors = false; - deleterOptions.fromMigrate = true; - deleterOptions.onlyRemoveOrphanedDocs = true; - deleterOptions.removeSaverReason = "preCleanup"; - - string errMsg; - - if (!getDeleter()->deleteNow(txn, deleterOptions, &errMsg)) { - warning() << "Failed to queue delete for migrate abort: " << errMsg; - setState(FAIL); - return; - } - - { - // Protect the range by noting that we're now starting a migration to it - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dbLock(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX); - Lock::CollectionLock collLock(txn->lockState(), ns, MODE_X); - - if (!ShardingState::get(txn)->notePending(txn, ns, min, max, epoch, &errmsg)) { - warning() << errmsg; - setState(FAIL); - return; - } - } - - timing.done(2); - MONGO_FP_PAUSE_WHILE(migrateThreadHangAtStep2); - } - - State currentState = getState(); - if (currentState == FAIL || currentState == ABORT) { - string errMsg; - RangeDeleterOptions deleterOptions( - KeyRange(ns, min.getOwned(), max.getOwned(), shardKeyPattern)); - deleterOptions.writeConcern = writeConcern; - // No need to wait since all existing cursors will filter out this range when - // returning the results. - deleterOptions.waitForOpenCursors = false; - deleterOptions.fromMigrate = true; - deleterOptions.onlyRemoveOrphanedDocs = true; - - if (!getDeleter()->queueDelete(txn, deleterOptions, NULL /* notifier */, &errMsg)) { - warning() << "Failed to queue delete for migrate abort: " << errMsg; - } - } - - { - // 3. initial bulk clone - setState(CLONE); - - while (true) { - BSONObj res; - if (!conn->runCommand("admin", - BSON("_migrateClone" << 1), - res)) { // gets array of objects to copy, in disk order - setState(FAIL); - errmsg = "_migrateClone failed: "; - errmsg += res.toString(); - error() << errmsg << migrateLog; - conn.done(); - return; - } - - BSONObj arr = res["objects"].Obj(); - int thisTime = 0; - - BSONObjIterator i(arr); - while (i.more()) { - txn->checkForInterrupt(); - - if (getState() == ABORT) { - errmsg = str::stream() << "Migration abort requested while " - << "copying documents"; - error() << errmsg << migrateLog; - return; - } - - BSONObj docToClone = i.next().Obj(); - { - OldClientWriteContext cx(txn, ns); - - BSONObj localDoc; - if (willOverrideLocalId(txn, - ns, - min, - max, - shardKeyPattern, - cx.db(), - docToClone, - &localDoc)) { - string errMsg = str::stream() << "cannot migrate chunk, local document " - << localDoc << " has same _id as cloned " - << "remote document " << docToClone; - - warning() << errMsg; - - // Exception will abort migration cleanly - uasserted(16976, errMsg); - } - - Helpers::upsert(txn, ns, docToClone, true); - } - thisTime++; - - { - stdx::lock_guard<stdx::mutex> statsLock(_mutex); - _numCloned++; - _clonedBytes += docToClone.objsize(); - } - - if (writeConcern.shouldWaitForOtherNodes()) { - repl::ReplicationCoordinator::StatusAndDuration replStatus = - repl::getGlobalReplicationCoordinator()->awaitReplication( - txn, - repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(), - writeConcern); - if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) { - warning() << "secondaryThrottle on, but doc insert timed out; " - "continuing"; - } else { - massertStatusOK(replStatus.status); - } - } - } - - if (thisTime == 0) - break; - } - - timing.done(3); - MONGO_FP_PAUSE_WHILE(migrateThreadHangAtStep3); - } - - // If running on a replicated system, we'll need to flush the docs we cloned to the - // secondaries - repl::OpTime lastOpApplied = repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(); - - { - // 4. do bulk of mods - setState(CATCHUP); - while (true) { - BSONObj res; - if (!conn->runCommand("admin", BSON("_transferMods" << 1), res)) { - setState(FAIL); - errmsg = "_transferMods failed: "; - errmsg += res.toString(); - error() << "_transferMods failed: " << res << migrateLog; - conn.done(); - return; - } - if (res["size"].number() == 0) - break; - - apply(txn, ns, min, max, shardKeyPattern, res, &lastOpApplied); - - const int maxIterations = 3600 * 50; - int i; - for (i = 0; i < maxIterations; i++) { - txn->checkForInterrupt(); - - if (getState() == ABORT) { - errmsg = str::stream() << "Migration abort requested while waiting " - << "for replication at catch up stage"; - error() << errmsg << migrateLog; - - return; - } - - if (opReplicatedEnough(txn, lastOpApplied, writeConcern)) - break; - - if (i > 100) { - warning() << "secondaries having hard time keeping up with migrate" - << migrateLog; - } - - sleepmillis(20); - } - - if (i == maxIterations) { - errmsg = "secondary can't keep up with migrate"; - error() << errmsg << migrateLog; - conn.done(); - setState(FAIL); - return; - } - } - - timing.done(4); - MONGO_FP_PAUSE_WHILE(migrateThreadHangAtStep4); - } - - { - // pause to wait for replication - // this will prevent us from going into critical section until we're ready - Timer t; - while (t.minutes() < 600) { - txn->checkForInterrupt(); - - if (getState() == ABORT) { - errmsg = "Migration abort requested while waiting for replication"; - error() << errmsg << migrateLog; - return; - } - - log() << "Waiting for replication to catch up before entering critical section"; - if (flushPendingWrites(txn, ns, min, max, lastOpApplied, writeConcern)) - break; - sleepsecs(1); - } - - if (t.minutes() >= 600) { - setState(FAIL); - errmsg = "Cannot go to critical section because secondaries cannot keep up"; - error() << errmsg << migrateLog; - return; - } - } - - { - // 5. wait for commit - - setState(STEADY); - bool transferAfterCommit = false; - while (getState() == STEADY || getState() == COMMIT_START) { - txn->checkForInterrupt(); - - // Make sure we do at least one transfer after recv'ing the commit message - // If we aren't sure that at least one transfer happens *after* our state - // changes to COMMIT_START, there could be mods still on the FROM shard that - // got logged *after* our _transferMods but *before* the critical section. - if (getState() == COMMIT_START) - transferAfterCommit = true; - - BSONObj res; - if (!conn->runCommand("admin", BSON("_transferMods" << 1), res)) { - log() << "_transferMods failed in STEADY state: " << res << migrateLog; - errmsg = res.toString(); - setState(FAIL); - conn.done(); - return; - } - - if (res["size"].number() > 0 && - apply(txn, ns, min, max, shardKeyPattern, res, &lastOpApplied)) { - continue; - } - - if (getState() == ABORT) { - return; - } - - // We know we're finished when: - // 1) The from side has told us that it has locked writes (COMMIT_START) - // 2) We've checked at least one more time for un-transmitted mods - if (getState() == COMMIT_START && transferAfterCommit == true) { - if (flushPendingWrites(txn, ns, min, max, lastOpApplied, writeConcern)) - break; - } - - // Only sleep if we aren't committing - if (getState() == STEADY) - sleepmillis(10); - } - - if (getState() == FAIL) { - errmsg = "timed out waiting for commit"; - return; - } - - timing.done(5); - MONGO_FP_PAUSE_WHILE(migrateThreadHangAtStep5); - } - - setState(DONE); - conn.done(); - } - - void status(BSONObjBuilder& b) { - stdx::lock_guard<stdx::mutex> sl(_mutex); - - b.appendBool("active", _active); - - b.append("ns", _ns); - b.append("from", _from); - b.append("min", _min); - b.append("max", _max); - b.append("shardKeyPattern", _shardKeyPattern); - - b.append("state", stateToString(_state)); - if (_state == FAIL) { - b.append("errmsg", _errmsg); - } - - BSONObjBuilder bb(b.subobjStart("counts")); - bb.append("cloned", _numCloned); - bb.append("clonedBytes", _clonedBytes); - bb.append("catchup", _numCatchup); - bb.append("steady", _numSteady); - bb.done(); - } - - bool apply(OperationContext* txn, - const string& ns, - BSONObj min, - BSONObj max, - BSONObj shardKeyPattern, - const BSONObj& xfer, - repl::OpTime* lastOpApplied) { - repl::OpTime dummy; - if (lastOpApplied == NULL) { - lastOpApplied = &dummy; - } - - bool didAnything = false; - - if (xfer["deleted"].isABSONObj()) { - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dlk(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX); - Helpers::RemoveSaver rs("moveChunk", ns, "removedDuring"); - - BSONObjIterator i(xfer["deleted"].Obj()); - while (i.more()) { - Lock::CollectionLock clk(txn->lockState(), ns, MODE_X); - OldClientContext ctx(txn, ns); - - BSONObj id = i.next().Obj(); - - // do not apply deletes if they do not belong to the chunk being migrated - BSONObj fullObj; - if (Helpers::findById(txn, ctx.db(), ns.c_str(), id, fullObj)) { - if (!isInRange(fullObj, min, max, shardKeyPattern)) { - log() << "not applying out of range deletion: " << fullObj << migrateLog; - - continue; - } - } - - if (serverGlobalParams.moveParanoia) { - rs.goingToDelete(fullObj); - } - - deleteObjects(txn, - ctx.db(), - ns, - id, - PlanExecutor::YIELD_MANUAL, - true /* justOne */, - false /* god */, - true /* fromMigrate */); - - *lastOpApplied = repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(); - didAnything = true; - } - } - - if (xfer["reload"].isABSONObj()) { - BSONObjIterator i(xfer["reload"].Obj()); - while (i.more()) { - OldClientWriteContext cx(txn, ns); - - BSONObj updatedDoc = i.next().Obj(); - - BSONObj localDoc; - if (willOverrideLocalId( - txn, ns, min, max, shardKeyPattern, cx.db(), updatedDoc, &localDoc)) { - string errMsg = str::stream() - << "cannot migrate chunk, local document " << localDoc - << " has same _id as reloaded remote document " << updatedDoc; - - warning() << errMsg; - - // Exception will abort migration cleanly - uasserted(16977, errMsg); - } - - // We are in write lock here, so sure we aren't killing - Helpers::upsert(txn, ns, updatedDoc, true); - - *lastOpApplied = repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(); - didAnything = true; - } - } - - return didAnything; - } - - /** - * Checks if an upsert of a remote document will override a local document with the same _id - * but in a different range on this shard. - * Must be in WriteContext to avoid races and DBHelper errors. - * TODO: Could optimize this check out if sharding on _id. - */ - bool willOverrideLocalId(OperationContext* txn, - const string& ns, - BSONObj min, - BSONObj max, - BSONObj shardKeyPattern, - Database* db, - BSONObj remoteDoc, - BSONObj* localDoc) { - *localDoc = BSONObj(); - if (Helpers::findById(txn, db, ns.c_str(), remoteDoc, *localDoc)) { - return !isInRange(*localDoc, min, max, shardKeyPattern); - } - - return false; - } - - /** - * Returns true if the majority of the nodes and the nodes corresponding to the given - * writeConcern (if not empty) have applied till the specified lastOp. - */ - bool opReplicatedEnough(OperationContext* txn, - const repl::OpTime& lastOpApplied, - const WriteConcernOptions& writeConcern) { - WriteConcernOptions majorityWriteConcern; - majorityWriteConcern.wTimeout = -1; - majorityWriteConcern.wMode = WriteConcernOptions::kMajority; - Status majorityStatus = repl::getGlobalReplicationCoordinator() - ->awaitReplication(txn, lastOpApplied, majorityWriteConcern) - .status; - - if (!writeConcern.shouldWaitForOtherNodes()) { - return majorityStatus.isOK(); - } - - // Also enforce the user specified write concern after "majority" so it covers - // the union of the 2 write concerns. - - WriteConcernOptions userWriteConcern(writeConcern); - userWriteConcern.wTimeout = -1; - Status userStatus = repl::getGlobalReplicationCoordinator() - ->awaitReplication(txn, lastOpApplied, userWriteConcern) - .status; - - return majorityStatus.isOK() && userStatus.isOK(); - } - - bool flushPendingWrites(OperationContext* txn, - const std::string& ns, - BSONObj min, - BSONObj max, - const repl::OpTime& lastOpApplied, - const WriteConcernOptions& writeConcern) { - if (!opReplicatedEnough(txn, lastOpApplied, writeConcern)) { - repl::OpTime op(lastOpApplied); - OCCASIONALLY warning() << "migrate commit waiting for a majority of slaves for '" << ns - << "' " << min << " -> " << max << " waiting for: " << op - << migrateLog; - return false; - } - - log() << "migrate commit succeeded flushing to secondaries for '" << ns << "' " << min - << " -> " << max << migrateLog; - - { - // Get global lock to wait for write to be commited to journal. - ScopedTransaction transaction(txn, MODE_S); - Lock::GlobalRead lk(txn->lockState()); - - // if durability is on, force a write to journal - if (getDur().commitNow(txn)) { - log() << "migrate commit flushed to journal for '" << ns << "' " << min << " -> " - << max << migrateLog; - } - } - - return true; - } - - static string stateToString(State state) { - switch (state) { - case READY: - return "ready"; - case CLONE: - return "clone"; - case CATCHUP: - return "catchup"; - case STEADY: - return "steady"; - case COMMIT_START: - return "commitStart"; - case DONE: - return "done"; - case FAIL: - return "fail"; - case ABORT: - return "abort"; - } - verify(0); - return ""; - } - - bool startCommit() { - stdx::unique_lock<stdx::mutex> lock(_mutex); - - if (_state != STEADY) { - return false; - } - - const auto deadline = system_clock::now() + seconds(30); - _state = COMMIT_START; - while (_active) { - if (stdx::cv_status::timeout == isActiveCV.wait_until(lock, deadline)) { - _state = FAIL; - log() << "startCommit never finished!" << migrateLog; - return false; - } - } - - if (_state == DONE) { - return true; - } - - log() << "startCommit failed, final data failed to transfer" << migrateLog; - return false; - } - - void abort() { - stdx::lock_guard<stdx::mutex> sl(_mutex); - _state = ABORT; - _errmsg = "aborted"; - } - - bool getActive() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); - return _active; - } - void setActive(bool b) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - _active = b; - isActiveCV.notify_all(); - } - - // Guards all fields. - mutable stdx::mutex _mutex; - bool _active; - stdx::condition_variable isActiveCV; - - std::string _ns; - std::string _from; - BSONObj _min; - BSONObj _max; - BSONObj _shardKeyPattern; - - long long _numCloned; - long long _clonedBytes; - long long _numCatchup; - long long _numSteady; - - State _state; - std::string _errmsg; - -} migrateStatus; - -void migrateThread(std::string ns, - BSONObj min, - BSONObj max, - BSONObj shardKeyPattern, - std::string fromShard, - OID epoch, - WriteConcernOptions writeConcern) { - Client::initThread("migrateThread"); - OperationContextImpl txn; - if (getGlobalAuthorizationManager()->isAuthEnabled()) { - ShardedConnectionInfo::addHook(); - AuthorizationSession::get(txn.getClient())->grantInternalAuthorization(); - } - - migrateStatus.go(&txn, ns, min, max, shardKeyPattern, fromShard, epoch, writeConcern); -} - /** * Command for initiating the recipient side of the migration to start copying data * from the donor shard. @@ -2645,11 +1083,13 @@ public: void help(std::stringstream& h) const { h << "internal"; } + RecvChunkStartCommand() : ChunkCommandHelper("_recvChunkStart") {} virtual bool isWriteCommandForConfigServer() const { return false; } + virtual void addRequiredPrivileges(const std::string& dbname, const BSONObj& cmdObj, std::vector<Privilege>* out) { @@ -2657,6 +1097,7 @@ public: actions.addAction(ActionType::internal); out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); } + bool run(OperationContext* txn, const string&, BSONObj& cmdObj, @@ -2665,7 +1106,7 @@ public: BSONObjBuilder& result) { // Active state of TO-side migrations (MigrateStatus) is serialized by distributed // collection lock. - if (migrateStatus.getActive()) { + if (migrateDestManager.getActive()) { errmsg = "migrate already in progress"; return false; } @@ -2706,13 +1147,12 @@ public: BSONObj min = cmdObj["min"].Obj().getOwned(); BSONObj max = cmdObj["max"].Obj().getOwned(); - // Refresh our collection manager from the config server, we need a collection manager - // to start registering pending chunks. - // We force the remote refresh here to make the behavior consistent and predictable, - // generally we'd refresh anyway, and to be paranoid. + // Refresh our collection manager from the config server, we need a collection manager to + // start registering pending chunks. We force the remote refresh here to make the behavior + // consistent and predictable, generally we'd refresh anyway, and to be paranoid. ChunkVersion currentVersion; - Status status = ShardingState::get(txn)->refreshMetadataNow(txn, ns, ¤tVersion); + Status status = ShardingState::get(txn)->refreshMetadataNow(txn, ns, ¤tVersion); if (!status.isOK()) { errmsg = str::stream() << "cannot start recv'ing chunk " << "[" << min << "," << max << ")" << causedBy(status.reason()); @@ -2778,23 +1218,13 @@ public: const string fromShard(cmdObj["from"].String()); - // Set the TO-side migration to active - Status prepareStatus = migrateStatus.prepare(ns, fromShard, min, max, shardKeyPattern); + Status startStatus = migrateDestManager.start( + ns, fromShard, min, max, shardKeyPattern, currentVersion.epoch(), writeConcern); - if (!prepareStatus.isOK()) { - return appendCommandStatus(result, prepareStatus); + if (!startStatus.isOK()) { + return appendCommandStatus(result, startStatus); } - stdx::thread m(migrateThread, - ns, - min, - max, - shardKeyPattern, - fromShard, - currentVersion.epoch(), - writeConcern); - - m.detach(); result.appendBool("started", true); return true; } @@ -2820,7 +1250,7 @@ public: int, string& errmsg, BSONObjBuilder& result) { - migrateStatus.status(result); + migrateDestManager.report(result); return 1; } @@ -2845,8 +1275,8 @@ public: int, string& errmsg, BSONObjBuilder& result) { - bool ok = migrateStatus.startCommit(); - migrateStatus.status(result); + bool ok = migrateDestManager.startCommit(); + migrateDestManager.report(result); return ok; } @@ -2871,39 +1301,28 @@ public: int, string& errmsg, BSONObjBuilder& result) { - migrateStatus.abort(); - migrateStatus.status(result); + migrateDestManager.abort(); + migrateDestManager.report(result); return true; } -} recvChunkAboortCommand; +} recvChunkAbortCommand; +bool ShardingState::inCriticalMigrateSection() { + return migrateSourceManager.getInCriticalSection(); +} -class IsInRangeTest : public StartupTest { -public: - void run() { - BSONObj min = BSON("x" << 1); - BSONObj max = BSON("x" << 5); - BSONObj skey = BSON("x" << 1); - - verify(!isInRange(BSON("x" << 0), min, max, skey)); - verify(isInRange(BSON("x" << 1), min, max, skey)); - verify(isInRange(BSON("x" << 3), min, max, skey)); - verify(isInRange(BSON("x" << 4), min, max, skey)); - verify(!isInRange(BSON("x" << 5), min, max, skey)); - verify(!isInRange(BSON("x" << 6), min, max, skey)); - - BSONObj obj = BSON("n" << 3); - BSONObj min2 = BSON("x" << BSONElementHasher::hash64(obj.firstElement(), 0) - 2); - BSONObj max2 = BSON("x" << BSONElementHasher::hash64(obj.firstElement(), 0) + 2); - BSONObj hashedKey = BSON("x" - << "hashed"); - - verify(isInRange(BSON("x" << 3), min2, max2, hashedKey)); - verify(!isInRange(BSON("x" << 3), min, max, hashedKey)); - verify(!isInRange(BSON("x" << 4), min2, max2, hashedKey)); - - LOG(1) << "isInRangeTest passed" << migrateLog; - } -} isInRangeTest; +bool ShardingState::waitTillNotInCriticalSection(int maxSecondsToWait) { + return migrateSourceManager.waitTillNotInCriticalSection(maxSecondsToWait); +} + +void logOpForSharding(OperationContext* txn, + const char* opstr, + const char* ns, + const BSONObj& obj, + BSONObj* patt, + bool notInActiveChunk) { + migrateSourceManager.logOp(txn, opstr, ns, obj, patt, notInActiveChunk); } + +} // namespace mongo |