diff options
author | jannaerin <golden.janna@gmail.com> | 2018-02-05 13:10:20 -0500 |
---|---|---|
committer | jannaerin <golden.janna@gmail.com> | 2018-02-13 21:46:09 -0500 |
commit | 2dc87c711cb0bf85fdc8dd1d65b5a83e144509fa (patch) | |
tree | 5b922efe934463855988475f17654f756139a0c7 | |
parent | c4e444a62ce32eb37928396699fc7aaa0536226e (diff) | |
download | mongo-2dc87c711cb0bf85fdc8dd1d65b5a83e144509fa.tar.gz |
SERVER-32886 Remove unnecessary sleeps during chunk migration
4 files changed, 41 insertions, 14 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index 78cf1102ef4..1cf3c4e8edc 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -76,9 +76,11 @@ bool isInRange(const BSONObj& obj, BSONObj createRequestWithSessionId(StringData commandName, const NamespaceString& nss, - const MigrationSessionId& sessionId) { + const MigrationSessionId& sessionId, + bool waitForSteadyOrDone = false) { BSONObjBuilder builder; builder.append(commandName, nss.ns()); + builder.append("waitForSteadyOrDone", waitForSteadyOrDone); sessionId.append(&builder); return builder.obj(); } @@ -258,13 +260,8 @@ Status MigrationChunkClonerSourceLegacy::awaitUntilCriticalSectionIsAppropriate( int iteration = 0; while ((Date_t::now() - startTime) < maxTimeToWait) { - // Exponential sleep backoff, up to 1024ms. Don't sleep much on the first few iterations, - // since we want empty chunk migrations to be fast. - sleepmillis(1LL << std::min(iteration, 10)); - iteration++; - auto responseStatus = _callRecipient( - createRequestWithSessionId(kRecvChunkStatus, _args.getNss(), _sessionId)); + createRequestWithSessionId(kRecvChunkStatus, _args.getNss(), _sessionId, true)); if (!responseStatus.isOK()) { return responseStatus.getStatus().withContext( "Failed to contact recipient shard to monitor data transfer"); @@ -272,6 +269,11 @@ Status MigrationChunkClonerSourceLegacy::awaitUntilCriticalSectionIsAppropriate( const BSONObj& res = responseStatus.getValue(); + if (res["waited"].boolean()) { + sleepmillis(1LL << std::min(iteration, 10)); + } + iteration++; + stdx::lock_guard<stdx::mutex> sl(_mutex); const std::size_t cloneLocsRemaining = _cloneLocs.size(); @@ -335,7 +337,6 @@ Status MigrationChunkClonerSourceLegacy::awaitUntilCriticalSectionIsAppropriate( StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::commitClone(OperationContext* opCtx) { invariant(_state == kCloning); invariant(!opCtx->lockState()->isLocked()); - auto responseStatus = _callRecipient(createRequestWithSessionId(kRecvChunkCommit, _args.getNss(), _sessionId)); if (responseStatus.isOK()) { diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index c6f2496bc3d..a29bbc368bf 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -223,6 +223,7 @@ MigrationDestinationManager::State MigrationDestinationManager::getState() const void MigrationDestinationManager::setState(State newState) { stdx::lock_guard<stdx::mutex> sl(_mutex); _state = newState; + _stateChangedCV.notify_all(); } void MigrationDestinationManager::setStateFail(std::string msg) { @@ -231,6 +232,7 @@ void MigrationDestinationManager::setStateFail(std::string msg) { stdx::lock_guard<stdx::mutex> sl(_mutex); _errmsg = std::move(msg); _state = FAIL; + _stateChangedCV.notify_all(); } _sessionMigration->forceFail(msg); @@ -242,6 +244,7 @@ void MigrationDestinationManager::setStateFailWarn(std::string msg) { stdx::lock_guard<stdx::mutex> sl(_mutex); _errmsg = std::move(msg); _state = FAIL; + _stateChangedCV.notify_all(); } _sessionMigration->forceFail(msg); @@ -256,7 +259,20 @@ bool MigrationDestinationManager::_isActive(WithLock) const { return _sessionId.is_initialized(); } -void MigrationDestinationManager::report(BSONObjBuilder& b) { +void MigrationDestinationManager::report(BSONObjBuilder& b, + OperationContext* opCtx, + bool waitForSteadyOrDone) { + if (waitForSteadyOrDone) { + stdx::unique_lock<stdx::mutex> lock(_mutex); + try { + opCtx->waitForConditionOrInterruptFor(_stateChangedCV, lock, Seconds(1), [&]() -> bool { + return _state != READY && _state != CLONE && _state != CATCHUP; + }); + } catch (...) { + // Ignoring this error because this is an optional parameter and we catch timeout + // exceptions later. + } + } stdx::lock_guard<stdx::mutex> sl(_mutex); b.appendBool("active", _sessionId.is_initialized()); @@ -264,6 +280,7 @@ void MigrationDestinationManager::report(BSONObjBuilder& b) { if (_sessionId) { b.append("sessionId", _sessionId->toString()); } + b.append("waited", waitForSteadyOrDone); b.append("ns", _nss.ns()); b.append("from", _fromShardConnString.toString()); @@ -312,6 +329,7 @@ Status MigrationDestinationManager::start(const NamespaceString& nss, invariant(!_scopedRegisterReceiveChunk); _state = READY; + _stateChangedCV.notify_all(); _errmsg = ""; _nss = nss; @@ -366,6 +384,7 @@ Status MigrationDestinationManager::abort(const MigrationSessionId& sessionId) { } _state = ABORT; + _stateChangedCV.notify_all(); _errmsg = "aborted"; return Status::OK(); @@ -374,6 +393,7 @@ Status MigrationDestinationManager::abort(const MigrationSessionId& sessionId) { void MigrationDestinationManager::abortWithoutSessionIdCheck() { stdx::lock_guard<stdx::mutex> sl(_mutex); _state = ABORT; + _stateChangedCV.notify_all(); _errmsg = "aborted without session id check"; } @@ -406,6 +426,7 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio _sessionMigration->finish(); _state = COMMIT_START; + _stateChangedCV.notify_all(); auto const deadline = Date_t::now() + Seconds(30); while (_sessionId) { @@ -413,6 +434,7 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio _isActiveCV.wait_until(lock, deadline.toSystemTimePoint())) { _errmsg = str::stream() << "startCommit timed out waiting, " << _sessionId->toString(); _state = FAIL; + _stateChangedCV.notify_all(); return {ErrorCodes::CommandFailed, _errmsg}; } } diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h index bcbfb65facb..1a6304f8010 100644 --- a/src/mongo/db/s/migration_destination_manager.h +++ b/src/mongo/db/s/migration_destination_manager.h @@ -94,7 +94,7 @@ public: /** * Reports the state of the migration manager as a BSON document. */ - void report(BSONObjBuilder& b); + void report(BSONObjBuilder& b, OperationContext* opCtx, bool waitForSteadyOrDone); /** * Returns a report on the active migration, if the migration is active. Otherwise return an @@ -223,6 +223,9 @@ private: std::string _errmsg; std::unique_ptr<SessionCatalogMigrationDestination> _sessionMigration; + + // Condition variable, which is signalled every time the state of the migration changes. + stdx::condition_variable _stateChangedCV; }; } // namespace mongo diff --git a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp index 98c2b85cd18..ac7dd9bc0f1 100644 --- a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp +++ b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp @@ -180,7 +180,8 @@ public: const std::string& dbname, const BSONObj& cmdObj, BSONObjBuilder& result) override { - MigrationDestinationManager::get(opCtx)->report(result); + bool waitForSteadyOrDone = cmdObj["waitForSteadyOrDone"].boolean(); + MigrationDestinationManager::get(opCtx)->report(result, opCtx, waitForSteadyOrDone); return true; } @@ -222,7 +223,7 @@ public: auto const sessionId = uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj)); auto const mdm = MigrationDestinationManager::get(opCtx); Status const status = mdm->startCommit(sessionId); - mdm->report(result); + mdm->report(result, opCtx, false); if (!status.isOK()) { log() << status.reason(); return CommandHelpers::appendCommandStatus(result, status); @@ -270,14 +271,14 @@ public: if (migrationSessionIdStatus.isOK()) { Status const status = mdm->abort(migrationSessionIdStatus.getValue()); - mdm->report(result); + mdm->report(result, opCtx, false); if (!status.isOK()) { log() << status.reason(); return CommandHelpers::appendCommandStatus(result, status); } } else if (migrationSessionIdStatus == ErrorCodes::NoSuchKey) { mdm->abortWithoutSessionIdCheck(); - mdm->report(result); + mdm->report(result, opCtx, false); } uassertStatusOK(migrationSessionIdStatus.getStatus()); |