From 4d816381bef06ed82019089bf91cf2003d59d16f Mon Sep 17 00:00:00 2001 From: jannaerin Date: Mon, 5 Feb 2018 13:10:20 -0500 Subject: SERVER-32886 Remove unnecessary sleeps during chunk migration --- src/mongo/db/operation_context.h | 34 ++++++++++++++++++++++ .../db/s/migration_chunk_cloner_source_legacy.cpp | 16 +++++----- src/mongo/db/s/migration_destination_manager.cpp | 23 ++++++++++++++- src/mongo/db/s/migration_destination_manager.h | 5 +++- ...gration_destination_manager_legacy_commands.cpp | 11 ++++--- 5 files changed, 76 insertions(+), 13 deletions(-) diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h index 2c8be360a77..79455e47309 100644 --- a/src/mongo/db/operation_context.h +++ b/src/mongo/db/operation_context.h @@ -201,6 +201,40 @@ public: return stdx::cv_status::no_timeout; } + /** + * Waits on condition "cv" for "pred" until "pred" returns true, or the given "deadline" + * expires, or this operation is interrupted, or this operation's own deadline expires. + * + * + * If the operation deadline expires or the operation is interrupted, throws a DBException. If + * the given "deadline" expires, returns pred. Otherwise, returns true. + */ + template + bool waitForConditionOrInterruptUntilPred(stdx::condition_variable& cv, + stdx::unique_lock& m, + Date_t deadline, + Pred pred) { + while (!pred()) { + if (stdx::cv_status::timeout == waitForConditionOrInterruptUntil(cv, m, deadline)) { + return pred(); + } + } + return true; + } + + /** + * Same as the predicate form of waitForConditionOrInterruptUntilPred, but takes a relative + * amount of time to wait instead of an absolute time point. + */ + template + bool waitForConditionOrInterruptFor(stdx::condition_variable& cv, + stdx::unique_lock& m, + Milliseconds duration, + Pred pred) { + return waitForConditionOrInterruptUntilPred( + cv, m, getServiceContext()->getPreciseClockSource()->now() + duration, pred); + } + /** * Same as waitForConditionOrInterruptUntil, except returns StatusWith and * non-ok status indicates the error instead of a DBException. diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index 94798cb509d..ea83fa83d0d 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -75,9 +75,11 @@ bool isInRange(const BSONObj& obj, BSONObj createRequestWithSessionId(StringData commandName, const NamespaceString& nss, - const MigrationSessionId& sessionId) { + const MigrationSessionId& sessionId, + bool waitForSteadyOrDone = false) { BSONObjBuilder builder; builder.append(commandName, nss.ns()); + builder.append("waitForSteadyOrDone", waitForSteadyOrDone); sessionId.append(&builder); return builder.obj(); } @@ -231,13 +233,8 @@ Status MigrationChunkClonerSourceLegacy::awaitUntilCriticalSectionIsAppropriate( int iteration = 0; while ((Date_t::now() - startTime) < maxTimeToWait) { - // Exponential sleep backoff, up to 1024ms. Don't sleep much on the first few iterations, - // since we want empty chunk migrations to be fast. - sleepmillis(1LL << std::min(iteration, 10)); - iteration++; - auto responseStatus = _callRecipient( - createRequestWithSessionId(kRecvChunkStatus, _args.getNss(), _sessionId)); + createRequestWithSessionId(kRecvChunkStatus, _args.getNss(), _sessionId, true)); if (!responseStatus.isOK()) { return {responseStatus.getStatus().code(), str::stream() @@ -247,6 +244,11 @@ Status MigrationChunkClonerSourceLegacy::awaitUntilCriticalSectionIsAppropriate( const BSONObj& res = responseStatus.getValue(); + if (!res["waited"].boolean()) { + sleepmillis(1LL << std::min(iteration, 10)); + } + iteration++; + stdx::lock_guard sl(_mutex); const std::size_t cloneLocsRemaining = _cloneLocs.size(); diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index 8e24e04f171..1c02208ee27 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -227,6 +227,7 @@ MigrationDestinationManager::State MigrationDestinationManager::getState() const void MigrationDestinationManager::setState(State newState) { stdx::lock_guard sl(_mutex); _state = newState; + _stateChangedCV.notify_all(); } bool MigrationDestinationManager::isActive() const { @@ -238,7 +239,21 @@ bool MigrationDestinationManager::_isActive_inlock() const { return _sessionId.is_initialized(); } -void MigrationDestinationManager::report(BSONObjBuilder& b) { +void MigrationDestinationManager::report(BSONObjBuilder& b, + OperationContext* opCtx, + bool waitForSteadyOrDone) { + if (waitForSteadyOrDone) { + stdx::unique_lock lock(_mutex); + try { + opCtx->waitForConditionOrInterruptFor(_stateChangedCV, lock, Seconds(1), [&]() -> bool { + return _state != READY && _state != CLONE && _state != CATCHUP; + }); + } catch (...) { + // Ignoring this error because this is an optional parameter and we catch timeout + // exceptions later. + } + b.append("waited", true); + } stdx::lock_guard sl(_mutex); b.appendBool("active", _sessionId.is_initialized()); @@ -293,6 +308,7 @@ Status MigrationDestinationManager::start(const NamespaceString& nss, invariant(!_scopedRegisterReceiveChunk); _state = READY; + _stateChangedCV.notify_all(); _errmsg = ""; _nss = nss; @@ -343,6 +359,7 @@ bool MigrationDestinationManager::abort(const MigrationSessionId& sessionId) { } _state = ABORT; + _stateChangedCV.notify_all(); _errmsg = "aborted"; return true; @@ -351,6 +368,7 @@ bool MigrationDestinationManager::abort(const MigrationSessionId& sessionId) { void MigrationDestinationManager::abortWithoutSessionIdCheck() { stdx::lock_guard sl(_mutex); _state = ABORT; + _stateChangedCV.notify_all(); _errmsg = "aborted without session id check"; } @@ -375,6 +393,7 @@ bool MigrationDestinationManager::startCommit(const MigrationSessionId& sessionI } _state = COMMIT_START; + _stateChangedCV.notify_all(); const auto deadline = Date_t::now() + Seconds(30); @@ -382,6 +401,8 @@ bool MigrationDestinationManager::startCommit(const MigrationSessionId& sessionI if (stdx::cv_status::timeout == _isActiveCV.wait_until(lock, deadline.toSystemTimePoint())) { _state = FAIL; + _stateChangedCV.notify_all(); + log() << "startCommit never finished!" << migrateLog; return false; } diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h index 0b16202d55e..4bd269f37fd 100644 --- a/src/mongo/db/s/migration_destination_manager.h +++ b/src/mongo/db/s/migration_destination_manager.h @@ -77,7 +77,7 @@ public: /** * Reports the state of the migration manager as a BSON document. */ - void report(BSONObjBuilder& b); + void report(BSONObjBuilder& b, OperationContext* opCtx, bool waitForSteadyOrDone); /** * Returns a report on the active migration, if the migration is active. Otherwise return an @@ -222,6 +222,9 @@ private: State _state{READY}; std::string _errmsg; + + // Condition variable, which is signalled every time the state of the migration changes. + stdx::condition_variable _stateChangedCV; }; } // namespace mongo diff --git a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp index a20db670259..ac9752fd96d 100644 --- a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp +++ b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp @@ -220,7 +220,9 @@ public: int, string& errmsg, BSONObjBuilder& result) { - ShardingState::get(txn)->migrationDestinationManager()->report(result); + bool waitForSteadyOrDone = cmdObj["waitForSteadyOrDone"].boolean(); + ShardingState::get(txn)->migrationDestinationManager()->report( + result, txn, waitForSteadyOrDone); return true; } @@ -261,12 +263,13 @@ public: int, string& errmsg, BSONObjBuilder& result) { + const MigrationSessionId migrationSessionid( uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj))); const bool ok = ShardingState::get(txn)->migrationDestinationManager()->startCommit(migrationSessionid); - ShardingState::get(txn)->migrationDestinationManager()->report(result); + ShardingState::get(txn)->migrationDestinationManager()->report(result, txn, false); return ok; } @@ -313,11 +316,11 @@ public: if (migrationSessionIdStatus.isOK()) { const bool ok = mdm->abort(migrationSessionIdStatus.getValue()); - mdm->report(result); + mdm->report(result, txn, false); return ok; } else if (migrationSessionIdStatus == ErrorCodes::NoSuchKey) { mdm->abortWithoutSessionIdCheck(); - mdm->report(result); + mdm->report(result, txn, false); return true; } -- cgit v1.2.1