diff options
author | Nathan Myers <ncm@asperasoft.com> | 2016-12-23 23:24:10 -0500 |
---|---|---|
committer | Nathan Myers <nathan.myers@10gen.com> | 2016-12-27 15:48:15 -0500 |
commit | 1c43b90c0483bc7574fecd0eaeee610480cc9217 (patch) | |
tree | 4741711a9ca349c0379cb8847808cd077f2ef18b | |
parent | 9969d473ffcf43d1014c3db13f5c639e83ee7df1 (diff) | |
download | mongo-1c43b90c0483bc7574fecd0eaeee610480cc9217.tar.gz |
SERVER-26987 Release migration manager lock during find ops in clone
-rw-r--r-- | src/mongo/db/s/migration_source_manager.cpp | 100 |
1 files changed, 67 insertions, 33 deletions
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index 96dd8de9b68..d6ea4110beb 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -469,6 +469,29 @@ bool MigrationSourceManager::storeCurrentLocs(OperationContext* txn, return true; } +namespace { + +static bool stillSameSession(boost::optional<MigrationSessionId> const& memberSessionId, + MigrationSessionId const& argSessionId, + std::string& errmsg) { + if (!memberSessionId) { + errmsg = "not active"; + return false; + } + + // A mongod version < v3.2 will not have sessionId, in which case it is empty and + // ignored. + if (!argSessionId.isEmpty() && !memberSessionId->matches(argSessionId)) { + errmsg = str::stream() << "migration session id changed from " << argSessionId.toString() + << " to " << memberSessionId->toString() + << " while initial clone was active"; + return false; + } + return true; +} + +} // namespace + bool MigrationSourceManager::clone(OperationContext* txn, const MigrationSessionId& sessionId, string& errmsg, @@ -483,16 +506,7 @@ bool MigrationSourceManager::clone(OperationContext* txn, stdx::lock_guard<stdx::mutex> sl(_mutex); - if (!_sessionId) { - errmsg = "not active"; - return false; - } - - // A mongod version < v3.2 will not have sessionId, in which case it is empty and ignored. - if (!sessionId.isEmpty() && !_sessionId->matches(sessionId)) { - errmsg = str::stream() << "requested migration session id " << sessionId.toString() - << " does not match active session id " - << _sessionId->toString(); + if (!stillSameSession(_sessionId, sessionId, errmsg)) { return false; } @@ -507,37 +521,46 @@ bool MigrationSourceManager::clone(OperationContext* txn, static_cast<int>((12 + collection->averageObjectSize(txn)) * cloneLocsRemaining())); } - bool isBufferFilled = false; BSONArrayBuilder clonedDocsArrayBuilder(allocSize); + std::vector<RecordId> cloneLocsTemp; + + // We carve off a limited number of records to look up per externally-visible iteration + // so that, for very big collections, progress meters register activity. + cloneLocsTemp.reserve(1000); + + bool isBufferFilled = false; while (!isBufferFilled) { ScopedTransaction scopedXact(txn, MODE_IS); AutoGetCollection autoColl(txn, _getNS(), MODE_IS); + Collection* collection = autoColl.getCollection(); - stdx::lock_guard<stdx::mutex> sl(_mutex); - - if (!_sessionId) { - errmsg = "not active"; + if (!collection) { + errmsg = str::stream() << "collection " << _getNS().toString() << " does not exist"; return false; } - // A mongod version < v3.2 will not have sessionId, in which case it is empty and ignored. - if (!sessionId.isEmpty() && !_sessionId->matches(sessionId)) { - errmsg = str::stream() << "migration session id changed from " << sessionId.toString() - << " to " << _sessionId->toString() - << " while initial clone was active"; - return false; - } + { + stdx::lock_guard<stdx::mutex> sl(_mutex); - Collection* collection = autoColl.getCollection(); - if (!collection) { - errmsg = str::stream() << "collection " << _nss.toString() << " does not exist"; - return false; - } + if (!stillSameSession(_sessionId, sessionId, errmsg)) { + return false; + } - stdx::lock_guard<stdx::mutex> lk(_cloneLocsMutex); + { + stdx::lock_guard<stdx::mutex> lk(_cloneLocsMutex); + + for (RecordId const& cloneLoc : _cloneLocs) { + cloneLocsTemp.push_back(cloneLoc); + if (cloneLocsTemp.size() == cloneLocsTemp.capacity()) { + break; // enough for now + } + } + } + } - std::set<RecordId>::iterator cloneLocsIter = _cloneLocs.begin(); - for (; cloneLocsIter != _cloneLocs.end(); ++cloneLocsIter) { + // release locks during find ops + auto cloneLocsIter = cloneLocsTemp.begin(); + for (; cloneLocsIter != cloneLocsTemp.end(); ++cloneLocsIter) { if (tracker.intervalHasElapsed()) // should I yield? break; @@ -561,12 +584,23 @@ bool MigrationSourceManager::clone(OperationContext* txn, clonedDocsArrayBuilder.append(doc.value()); } - _cloneLocs.erase(_cloneLocs.begin(), cloneLocsIter); + // reclaim locks and record progress + + stdx::lock_guard<stdx::mutex> sl(_mutex); + + if (!stillSameSession(_sessionId, sessionId, errmsg)) { + return false; + } + + stdx::lock_guard<stdx::mutex> lk(_cloneLocsMutex); - // Note: must be holding _cloneLocsMutex, don't move this inside while condition! + std::for_each(cloneLocsTemp.begin(), + cloneLocsIter, + [&](RecordId const& loc) { _cloneLocs.erase(loc); }); if (_cloneLocs.empty()) { - break; + break; // and return } + cloneLocsTemp.clear(); } result.appendArray("objects", clonedDocsArrayBuilder.arr()); |