summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjannaerin <golden.janna@gmail.com>2018-02-05 13:10:20 -0500
committerjannaerin <golden.janna@gmail.com>2018-02-13 21:46:09 -0500
commit2dc87c711cb0bf85fdc8dd1d65b5a83e144509fa (patch)
tree5b922efe934463855988475f17654f756139a0c7
parentc4e444a62ce32eb37928396699fc7aaa0536226e (diff)
downloadmongo-2dc87c711cb0bf85fdc8dd1d65b5a83e144509fa.tar.gz
SERVER-32886 Remove unnecessary sleeps during chunk migration
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp17
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp24
-rw-r--r--src/mongo/db/s/migration_destination_manager.h5
-rw-r--r--src/mongo/db/s/migration_destination_manager_legacy_commands.cpp9
4 files changed, 41 insertions, 14 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index 78cf1102ef4..1cf3c4e8edc 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -76,9 +76,11 @@ bool isInRange(const BSONObj& obj,
BSONObj createRequestWithSessionId(StringData commandName,
const NamespaceString& nss,
- const MigrationSessionId& sessionId) {
+ const MigrationSessionId& sessionId,
+ bool waitForSteadyOrDone = false) {
BSONObjBuilder builder;
builder.append(commandName, nss.ns());
+ builder.append("waitForSteadyOrDone", waitForSteadyOrDone);
sessionId.append(&builder);
return builder.obj();
}
@@ -258,13 +260,8 @@ Status MigrationChunkClonerSourceLegacy::awaitUntilCriticalSectionIsAppropriate(
int iteration = 0;
while ((Date_t::now() - startTime) < maxTimeToWait) {
- // Exponential sleep backoff, up to 1024ms. Don't sleep much on the first few iterations,
- // since we want empty chunk migrations to be fast.
- sleepmillis(1LL << std::min(iteration, 10));
- iteration++;
-
auto responseStatus = _callRecipient(
- createRequestWithSessionId(kRecvChunkStatus, _args.getNss(), _sessionId));
+ createRequestWithSessionId(kRecvChunkStatus, _args.getNss(), _sessionId, true));
if (!responseStatus.isOK()) {
return responseStatus.getStatus().withContext(
"Failed to contact recipient shard to monitor data transfer");
@@ -272,6 +269,11 @@ Status MigrationChunkClonerSourceLegacy::awaitUntilCriticalSectionIsAppropriate(
const BSONObj& res = responseStatus.getValue();
+ if (res["waited"].boolean()) {
+ sleepmillis(1LL << std::min(iteration, 10));
+ }
+ iteration++;
+
stdx::lock_guard<stdx::mutex> sl(_mutex);
const std::size_t cloneLocsRemaining = _cloneLocs.size();
@@ -335,7 +337,6 @@ Status MigrationChunkClonerSourceLegacy::awaitUntilCriticalSectionIsAppropriate(
StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::commitClone(OperationContext* opCtx) {
invariant(_state == kCloning);
invariant(!opCtx->lockState()->isLocked());
-
auto responseStatus =
_callRecipient(createRequestWithSessionId(kRecvChunkCommit, _args.getNss(), _sessionId));
if (responseStatus.isOK()) {
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index c6f2496bc3d..a29bbc368bf 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -223,6 +223,7 @@ MigrationDestinationManager::State MigrationDestinationManager::getState() const
void MigrationDestinationManager::setState(State newState) {
stdx::lock_guard<stdx::mutex> sl(_mutex);
_state = newState;
+ _stateChangedCV.notify_all();
}
void MigrationDestinationManager::setStateFail(std::string msg) {
@@ -231,6 +232,7 @@ void MigrationDestinationManager::setStateFail(std::string msg) {
stdx::lock_guard<stdx::mutex> sl(_mutex);
_errmsg = std::move(msg);
_state = FAIL;
+ _stateChangedCV.notify_all();
}
_sessionMigration->forceFail(msg);
@@ -242,6 +244,7 @@ void MigrationDestinationManager::setStateFailWarn(std::string msg) {
stdx::lock_guard<stdx::mutex> sl(_mutex);
_errmsg = std::move(msg);
_state = FAIL;
+ _stateChangedCV.notify_all();
}
_sessionMigration->forceFail(msg);
@@ -256,7 +259,20 @@ bool MigrationDestinationManager::_isActive(WithLock) const {
return _sessionId.is_initialized();
}
-void MigrationDestinationManager::report(BSONObjBuilder& b) {
+void MigrationDestinationManager::report(BSONObjBuilder& b,
+ OperationContext* opCtx,
+ bool waitForSteadyOrDone) {
+ if (waitForSteadyOrDone) {
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+ try {
+ opCtx->waitForConditionOrInterruptFor(_stateChangedCV, lock, Seconds(1), [&]() -> bool {
+ return _state != READY && _state != CLONE && _state != CATCHUP;
+ });
+ } catch (...) {
+ // Ignoring this error because this is an optional parameter and we catch timeout
+ // exceptions later.
+ }
+ }
stdx::lock_guard<stdx::mutex> sl(_mutex);
b.appendBool("active", _sessionId.is_initialized());
@@ -264,6 +280,7 @@ void MigrationDestinationManager::report(BSONObjBuilder& b) {
if (_sessionId) {
b.append("sessionId", _sessionId->toString());
}
+ b.append("waited", waitForSteadyOrDone);
b.append("ns", _nss.ns());
b.append("from", _fromShardConnString.toString());
@@ -312,6 +329,7 @@ Status MigrationDestinationManager::start(const NamespaceString& nss,
invariant(!_scopedRegisterReceiveChunk);
_state = READY;
+ _stateChangedCV.notify_all();
_errmsg = "";
_nss = nss;
@@ -366,6 +384,7 @@ Status MigrationDestinationManager::abort(const MigrationSessionId& sessionId) {
}
_state = ABORT;
+ _stateChangedCV.notify_all();
_errmsg = "aborted";
return Status::OK();
@@ -374,6 +393,7 @@ Status MigrationDestinationManager::abort(const MigrationSessionId& sessionId) {
void MigrationDestinationManager::abortWithoutSessionIdCheck() {
stdx::lock_guard<stdx::mutex> sl(_mutex);
_state = ABORT;
+ _stateChangedCV.notify_all();
_errmsg = "aborted without session id check";
}
@@ -406,6 +426,7 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio
_sessionMigration->finish();
_state = COMMIT_START;
+ _stateChangedCV.notify_all();
auto const deadline = Date_t::now() + Seconds(30);
while (_sessionId) {
@@ -413,6 +434,7 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio
_isActiveCV.wait_until(lock, deadline.toSystemTimePoint())) {
_errmsg = str::stream() << "startCommit timed out waiting, " << _sessionId->toString();
_state = FAIL;
+ _stateChangedCV.notify_all();
return {ErrorCodes::CommandFailed, _errmsg};
}
}
diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h
index bcbfb65facb..1a6304f8010 100644
--- a/src/mongo/db/s/migration_destination_manager.h
+++ b/src/mongo/db/s/migration_destination_manager.h
@@ -94,7 +94,7 @@ public:
/**
* Reports the state of the migration manager as a BSON document.
*/
- void report(BSONObjBuilder& b);
+ void report(BSONObjBuilder& b, OperationContext* opCtx, bool waitForSteadyOrDone);
/**
* Returns a report on the active migration, if the migration is active. Otherwise return an
@@ -223,6 +223,9 @@ private:
std::string _errmsg;
std::unique_ptr<SessionCatalogMigrationDestination> _sessionMigration;
+
+ // Condition variable, which is signalled every time the state of the migration changes.
+ stdx::condition_variable _stateChangedCV;
};
} // namespace mongo
diff --git a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp
index 98c2b85cd18..ac7dd9bc0f1 100644
--- a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp
+++ b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp
@@ -180,7 +180,8 @@ public:
const std::string& dbname,
const BSONObj& cmdObj,
BSONObjBuilder& result) override {
- MigrationDestinationManager::get(opCtx)->report(result);
+ bool waitForSteadyOrDone = cmdObj["waitForSteadyOrDone"].boolean();
+ MigrationDestinationManager::get(opCtx)->report(result, opCtx, waitForSteadyOrDone);
return true;
}
@@ -222,7 +223,7 @@ public:
auto const sessionId = uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj));
auto const mdm = MigrationDestinationManager::get(opCtx);
Status const status = mdm->startCommit(sessionId);
- mdm->report(result);
+ mdm->report(result, opCtx, false);
if (!status.isOK()) {
log() << status.reason();
return CommandHelpers::appendCommandStatus(result, status);
@@ -270,14 +271,14 @@ public:
if (migrationSessionIdStatus.isOK()) {
Status const status = mdm->abort(migrationSessionIdStatus.getValue());
- mdm->report(result);
+ mdm->report(result, opCtx, false);
if (!status.isOK()) {
log() << status.reason();
return CommandHelpers::appendCommandStatus(result, status);
}
} else if (migrationSessionIdStatus == ErrorCodes::NoSuchKey) {
mdm->abortWithoutSessionIdCheck();
- mdm->report(result);
+ mdm->report(result, opCtx, false);
}
uassertStatusOK(migrationSessionIdStatus.getStatus());