summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjannaerin <golden.janna@gmail.com>2018-02-05 13:10:20 -0500
committerjannaerin <golden.janna@gmail.com>2018-03-12 11:08:24 -0400
commit4d816381bef06ed82019089bf91cf2003d59d16f (patch)
treec263351f74b24a649b7d388d58b8eb3f3f21f3ab
parente54d2ce84bb9f1285228e75b63b7ef4e555601e6 (diff)
downloadmongo-4d816381bef06ed82019089bf91cf2003d59d16f.tar.gz
SERVER-32886 Remove unnecessary sleeps during chunk migration
-rw-r--r--src/mongo/db/operation_context.h34
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp16
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp23
-rw-r--r--src/mongo/db/s/migration_destination_manager.h5
-rw-r--r--src/mongo/db/s/migration_destination_manager_legacy_commands.cpp11
5 files changed, 76 insertions, 13 deletions
diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h
index 2c8be360a77..79455e47309 100644
--- a/src/mongo/db/operation_context.h
+++ b/src/mongo/db/operation_context.h
@@ -202,6 +202,40 @@ public:
}
/**
+ * Waits on condition "cv" for "pred" until "pred" returns true, or the given "deadline"
+ * expires, or this operation is interrupted, or this operation's own deadline expires.
+ *
+ *
+ * If the operation deadline expires or the operation is interrupted, throws a DBException. If
+ * the given "deadline" expires, returns pred. Otherwise, returns true.
+ */
+ template <typename Pred>
+ bool waitForConditionOrInterruptUntilPred(stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& m,
+ Date_t deadline,
+ Pred pred) {
+ while (!pred()) {
+ if (stdx::cv_status::timeout == waitForConditionOrInterruptUntil(cv, m, deadline)) {
+ return pred();
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Same as the predicate form of waitForConditionOrInterruptUntilPred, but takes a relative
+ * amount of time to wait instead of an absolute time point.
+ */
+ template <typename Pred>
+ bool waitForConditionOrInterruptFor(stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& m,
+ Milliseconds duration,
+ Pred pred) {
+ return waitForConditionOrInterruptUntilPred(
+ cv, m, getServiceContext()->getPreciseClockSource()->now() + duration, pred);
+ }
+
+ /**
* Same as waitForConditionOrInterruptUntil, except returns StatusWith<stdx::cv_status> and
* non-ok status indicates the error instead of a DBException.
*/
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index 94798cb509d..ea83fa83d0d 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -75,9 +75,11 @@ bool isInRange(const BSONObj& obj,
BSONObj createRequestWithSessionId(StringData commandName,
const NamespaceString& nss,
- const MigrationSessionId& sessionId) {
+ const MigrationSessionId& sessionId,
+ bool waitForSteadyOrDone = false) {
BSONObjBuilder builder;
builder.append(commandName, nss.ns());
+ builder.append("waitForSteadyOrDone", waitForSteadyOrDone);
sessionId.append(&builder);
return builder.obj();
}
@@ -231,13 +233,8 @@ Status MigrationChunkClonerSourceLegacy::awaitUntilCriticalSectionIsAppropriate(
int iteration = 0;
while ((Date_t::now() - startTime) < maxTimeToWait) {
- // Exponential sleep backoff, up to 1024ms. Don't sleep much on the first few iterations,
- // since we want empty chunk migrations to be fast.
- sleepmillis(1LL << std::min(iteration, 10));
- iteration++;
-
auto responseStatus = _callRecipient(
- createRequestWithSessionId(kRecvChunkStatus, _args.getNss(), _sessionId));
+ createRequestWithSessionId(kRecvChunkStatus, _args.getNss(), _sessionId, true));
if (!responseStatus.isOK()) {
return {responseStatus.getStatus().code(),
str::stream()
@@ -247,6 +244,11 @@ Status MigrationChunkClonerSourceLegacy::awaitUntilCriticalSectionIsAppropriate(
const BSONObj& res = responseStatus.getValue();
+ if (!res["waited"].boolean()) {
+ sleepmillis(1LL << std::min(iteration, 10));
+ }
+ iteration++;
+
stdx::lock_guard<stdx::mutex> sl(_mutex);
const std::size_t cloneLocsRemaining = _cloneLocs.size();
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index 8e24e04f171..1c02208ee27 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -227,6 +227,7 @@ MigrationDestinationManager::State MigrationDestinationManager::getState() const
void MigrationDestinationManager::setState(State newState) {
stdx::lock_guard<stdx::mutex> sl(_mutex);
_state = newState;
+ _stateChangedCV.notify_all();
}
bool MigrationDestinationManager::isActive() const {
@@ -238,7 +239,21 @@ bool MigrationDestinationManager::_isActive_inlock() const {
return _sessionId.is_initialized();
}
-void MigrationDestinationManager::report(BSONObjBuilder& b) {
+void MigrationDestinationManager::report(BSONObjBuilder& b,
+ OperationContext* opCtx,
+ bool waitForSteadyOrDone) {
+ if (waitForSteadyOrDone) {
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+ try {
+ opCtx->waitForConditionOrInterruptFor(_stateChangedCV, lock, Seconds(1), [&]() -> bool {
+ return _state != READY && _state != CLONE && _state != CATCHUP;
+ });
+ } catch (...) {
+ // Ignoring this error because this is an optional parameter and we catch timeout
+ // exceptions later.
+ }
+ b.append("waited", true);
+ }
stdx::lock_guard<stdx::mutex> sl(_mutex);
b.appendBool("active", _sessionId.is_initialized());
@@ -293,6 +308,7 @@ Status MigrationDestinationManager::start(const NamespaceString& nss,
invariant(!_scopedRegisterReceiveChunk);
_state = READY;
+ _stateChangedCV.notify_all();
_errmsg = "";
_nss = nss;
@@ -343,6 +359,7 @@ bool MigrationDestinationManager::abort(const MigrationSessionId& sessionId) {
}
_state = ABORT;
+ _stateChangedCV.notify_all();
_errmsg = "aborted";
return true;
@@ -351,6 +368,7 @@ bool MigrationDestinationManager::abort(const MigrationSessionId& sessionId) {
void MigrationDestinationManager::abortWithoutSessionIdCheck() {
stdx::lock_guard<stdx::mutex> sl(_mutex);
_state = ABORT;
+ _stateChangedCV.notify_all();
_errmsg = "aborted without session id check";
}
@@ -375,6 +393,7 @@ bool MigrationDestinationManager::startCommit(const MigrationSessionId& sessionI
}
_state = COMMIT_START;
+ _stateChangedCV.notify_all();
const auto deadline = Date_t::now() + Seconds(30);
@@ -382,6 +401,8 @@ bool MigrationDestinationManager::startCommit(const MigrationSessionId& sessionI
if (stdx::cv_status::timeout ==
_isActiveCV.wait_until(lock, deadline.toSystemTimePoint())) {
_state = FAIL;
+ _stateChangedCV.notify_all();
+
log() << "startCommit never finished!" << migrateLog;
return false;
}
diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h
index 0b16202d55e..4bd269f37fd 100644
--- a/src/mongo/db/s/migration_destination_manager.h
+++ b/src/mongo/db/s/migration_destination_manager.h
@@ -77,7 +77,7 @@ public:
/**
* Reports the state of the migration manager as a BSON document.
*/
- void report(BSONObjBuilder& b);
+ void report(BSONObjBuilder& b, OperationContext* opCtx, bool waitForSteadyOrDone);
/**
* Returns a report on the active migration, if the migration is active. Otherwise return an
@@ -222,6 +222,9 @@ private:
State _state{READY};
std::string _errmsg;
+
+ // Condition variable, which is signalled every time the state of the migration changes.
+ stdx::condition_variable _stateChangedCV;
};
} // namespace mongo
diff --git a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp
index a20db670259..ac9752fd96d 100644
--- a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp
+++ b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp
@@ -220,7 +220,9 @@ public:
int,
string& errmsg,
BSONObjBuilder& result) {
- ShardingState::get(txn)->migrationDestinationManager()->report(result);
+ bool waitForSteadyOrDone = cmdObj["waitForSteadyOrDone"].boolean();
+ ShardingState::get(txn)->migrationDestinationManager()->report(
+ result, txn, waitForSteadyOrDone);
return true;
}
@@ -261,12 +263,13 @@ public:
int,
string& errmsg,
BSONObjBuilder& result) {
+
const MigrationSessionId migrationSessionid(
uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj)));
const bool ok =
ShardingState::get(txn)->migrationDestinationManager()->startCommit(migrationSessionid);
- ShardingState::get(txn)->migrationDestinationManager()->report(result);
+ ShardingState::get(txn)->migrationDestinationManager()->report(result, txn, false);
return ok;
}
@@ -313,11 +316,11 @@ public:
if (migrationSessionIdStatus.isOK()) {
const bool ok = mdm->abort(migrationSessionIdStatus.getValue());
- mdm->report(result);
+ mdm->report(result, txn, false);
return ok;
} else if (migrationSessionIdStatus == ErrorCodes::NoSuchKey) {
mdm->abortWithoutSessionIdCheck();
- mdm->report(result);
+ mdm->report(result, txn, false);
return true;
}