summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNathan Myers <ncm@asperasoft.com>2016-12-23 23:24:10 -0500
committerNathan Myers <nathan.myers@10gen.com>2016-12-27 15:48:15 -0500
commit1c43b90c0483bc7574fecd0eaeee610480cc9217 (patch)
tree4741711a9ca349c0379cb8847808cd077f2ef18b
parent9969d473ffcf43d1014c3db13f5c639e83ee7df1 (diff)
downloadmongo-1c43b90c0483bc7574fecd0eaeee610480cc9217.tar.gz
SERVER-26987 Release migration manager lock during find ops in clone
-rw-r--r--src/mongo/db/s/migration_source_manager.cpp100
1 files changed, 67 insertions, 33 deletions
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp
index 96dd8de9b68..d6ea4110beb 100644
--- a/src/mongo/db/s/migration_source_manager.cpp
+++ b/src/mongo/db/s/migration_source_manager.cpp
@@ -469,6 +469,29 @@ bool MigrationSourceManager::storeCurrentLocs(OperationContext* txn,
return true;
}
+namespace {
+
+static bool stillSameSession(boost::optional<MigrationSessionId> const& memberSessionId,
+ MigrationSessionId const& argSessionId,
+ std::string& errmsg) {
+ if (!memberSessionId) {
+ errmsg = "not active";
+ return false;
+ }
+
+ // A mongod version < v3.2 will not have sessionId, in which case it is empty and
+ // ignored.
+ if (!argSessionId.isEmpty() && !memberSessionId->matches(argSessionId)) {
+ errmsg = str::stream() << "migration session id changed from " << argSessionId.toString()
+ << " to " << memberSessionId->toString()
+ << " while initial clone was active";
+ return false;
+ }
+ return true;
+}
+
+} // namespace
+
bool MigrationSourceManager::clone(OperationContext* txn,
const MigrationSessionId& sessionId,
string& errmsg,
@@ -483,16 +506,7 @@ bool MigrationSourceManager::clone(OperationContext* txn,
stdx::lock_guard<stdx::mutex> sl(_mutex);
- if (!_sessionId) {
- errmsg = "not active";
- return false;
- }
-
- // A mongod version < v3.2 will not have sessionId, in which case it is empty and ignored.
- if (!sessionId.isEmpty() && !_sessionId->matches(sessionId)) {
- errmsg = str::stream() << "requested migration session id " << sessionId.toString()
- << " does not match active session id "
- << _sessionId->toString();
+ if (!stillSameSession(_sessionId, sessionId, errmsg)) {
return false;
}
@@ -507,37 +521,46 @@ bool MigrationSourceManager::clone(OperationContext* txn,
static_cast<int>((12 + collection->averageObjectSize(txn)) * cloneLocsRemaining()));
}
- bool isBufferFilled = false;
BSONArrayBuilder clonedDocsArrayBuilder(allocSize);
+ std::vector<RecordId> cloneLocsTemp;
+
+ // We carve off a limited number of records to look up per externally-visible iteration
+ // so that, for very big collections, progress meters register activity.
+ cloneLocsTemp.reserve(1000);
+
+ bool isBufferFilled = false;
while (!isBufferFilled) {
ScopedTransaction scopedXact(txn, MODE_IS);
AutoGetCollection autoColl(txn, _getNS(), MODE_IS);
+ Collection* collection = autoColl.getCollection();
- stdx::lock_guard<stdx::mutex> sl(_mutex);
-
- if (!_sessionId) {
- errmsg = "not active";
+ if (!collection) {
+ errmsg = str::stream() << "collection " << _getNS().toString() << " does not exist";
return false;
}
- // A mongod version < v3.2 will not have sessionId, in which case it is empty and ignored.
- if (!sessionId.isEmpty() && !_sessionId->matches(sessionId)) {
- errmsg = str::stream() << "migration session id changed from " << sessionId.toString()
- << " to " << _sessionId->toString()
- << " while initial clone was active";
- return false;
- }
+ {
+ stdx::lock_guard<stdx::mutex> sl(_mutex);
- Collection* collection = autoColl.getCollection();
- if (!collection) {
- errmsg = str::stream() << "collection " << _nss.toString() << " does not exist";
- return false;
- }
+ if (!stillSameSession(_sessionId, sessionId, errmsg)) {
+ return false;
+ }
- stdx::lock_guard<stdx::mutex> lk(_cloneLocsMutex);
+ {
+ stdx::lock_guard<stdx::mutex> lk(_cloneLocsMutex);
+
+ for (RecordId const& cloneLoc : _cloneLocs) {
+ cloneLocsTemp.push_back(cloneLoc);
+ if (cloneLocsTemp.size() == cloneLocsTemp.capacity()) {
+ break; // enough for now
+ }
+ }
+ }
+ }
- std::set<RecordId>::iterator cloneLocsIter = _cloneLocs.begin();
- for (; cloneLocsIter != _cloneLocs.end(); ++cloneLocsIter) {
+ // release locks during find ops
+ auto cloneLocsIter = cloneLocsTemp.begin();
+ for (; cloneLocsIter != cloneLocsTemp.end(); ++cloneLocsIter) {
if (tracker.intervalHasElapsed()) // should I yield?
break;
@@ -561,12 +584,23 @@ bool MigrationSourceManager::clone(OperationContext* txn,
clonedDocsArrayBuilder.append(doc.value());
}
- _cloneLocs.erase(_cloneLocs.begin(), cloneLocsIter);
+ // reclaim locks and record progress
+
+ stdx::lock_guard<stdx::mutex> sl(_mutex);
+
+ if (!stillSameSession(_sessionId, sessionId, errmsg)) {
+ return false;
+ }
+
+ stdx::lock_guard<stdx::mutex> lk(_cloneLocsMutex);
- // Note: must be holding _cloneLocsMutex, don't move this inside while condition!
+ std::for_each(cloneLocsTemp.begin(),
+ cloneLocsIter,
+ [&](RecordId const& loc) { _cloneLocs.erase(loc); });
if (_cloneLocs.empty()) {
- break;
+ break; // and return
}
+ cloneLocsTemp.clear();
}
result.appendArray("objects", clonedDocsArrayBuilder.arr());