summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheahuychou Mao <mao.cheahuychou@gmail.com>2021-06-11 15:14:33 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-07-20 01:07:23 +0000
commit1c8566834570ab5342787eb6f9176771fa710333 (patch)
tree2d7402a056a467a3ecd7573db28a12c153bbaddc
parentabb6b9c2bf675e9e2aeaecba05f0f8359d99e203 (diff)
downloadmongo-1c8566834570ab5342787eb6f9176771fa710333.tar.gz
SERVER-57650 Make MigrationChunkClonerSource interruptible when waiting for response to recipient commands
(cherry picked from commit d07f4ee6efe1f4405d2bc6d4063599ab867ef7c8)
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp35
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.h2
2 files changed, 26 insertions, 11 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index 042f6a4269a..d1fb2ca5fcd 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -310,7 +310,7 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx,
}
}
- auto startChunkCloneResponseStatus = _callRecipient(cmdBuilder.obj());
+ auto startChunkCloneResponseStatus = _callRecipient(opCtx, cmdBuilder.obj());
if (!startChunkCloneResponseStatus.isOK()) {
return startChunkCloneResponseStatus.getStatus();
}
@@ -391,7 +391,7 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx) {
}
}
- auto startChunkCloneResponseStatus = _callRecipient(cmdBuilder.obj());
+ auto startChunkCloneResponseStatus = _callRecipient(opCtx, cmdBuilder.obj());
if (!startChunkCloneResponseStatus.isOK()) {
return startChunkCloneResponseStatus.getStatus();
}
@@ -441,8 +441,8 @@ StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::commitClone(OperationConte
_sessionCatalogSource->onCommitCloneStarted();
}
- auto responseStatus =
- _callRecipient(createRequestWithSessionId(kRecvChunkCommit, _args.getNss(), _sessionId));
+ auto responseStatus = _callRecipient(
+ opCtx, createRequestWithSessionId(kRecvChunkCommit, _args.getNss(), _sessionId));
if (responseStatus.isOK()) {
_cleanup(opCtx);
@@ -471,9 +471,10 @@ void MigrationChunkClonerSourceLegacy::cancelClone(OperationContext* opCtx) {
case kDone:
break;
case kCloning: {
- const auto status = _callRecipient(createRequestWithSessionId(
- kRecvChunkAbort, _args.getNss(), _sessionId))
- .getStatus();
+ const auto status =
+ _callRecipient(
+ opCtx, createRequestWithSessionId(kRecvChunkAbort, _args.getNss(), _sessionId))
+ .getStatus();
if (!status.isOK()) {
LOGV2(21991,
"Failed to cancel migration: {error}",
@@ -848,7 +849,8 @@ void MigrationChunkClonerSourceLegacy::_cleanup(OperationContext* opCtx) {
_untransferredDeletesCounter = 0;
}
-StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::_callRecipient(const BSONObj& cmdObj) {
+StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::_callRecipient(OperationContext* opCtx,
+ const BSONObj& cmdObj) {
executor::RemoteCommandResponse responseStatus(
Status{ErrorCodes::InternalError, "Uninitialized value"});
@@ -864,7 +866,20 @@ StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::_callRecipient(const BSONO
return scheduleStatus.getStatus();
}
- executor->wait(scheduleStatus.getValue());
+ auto cbHandle = scheduleStatus.getValue();
+
+ try {
+ executor->wait(cbHandle, opCtx);
+ } catch (const DBException& ex) {
+ // If waiting for the response is interrupted, then we still have a callback out and
+ // registered with the TaskExecutor to run when the response finally does come back.
+ // Since the callback references local state, cbResponse, it would be invalid for the
+ // callback to run after leaving the this function. Therefore, we cancel the callback
+ // and wait uninterruptably for the callback to be run.
+ executor->cancel(cbHandle);
+ executor->wait(cbHandle);
+ return ex.toStatus();
+ }
if (!responseStatus.isOK()) {
return responseStatus.status;
@@ -1082,7 +1097,7 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC
int iteration = 0;
while ((Date_t::now() - startTime) < maxTimeToWait) {
auto responseStatus = _callRecipient(
- createRequestWithSessionId(kRecvChunkStatus, _args.getNss(), _sessionId, true));
+ opCtx, createRequestWithSessionId(kRecvChunkStatus, _args.getNss(), _sessionId, true));
if (!responseStatus.isOK()) {
return responseStatus.getStatus().withContext(
"Failed to contact recipient shard to monitor data transfer");
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
index 94cf4408d4f..402b31e6529 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
@@ -221,7 +221,7 @@ private:
* Synchronously invokes the recipient shard with the specified command and either returns the
* command response (if succeeded) or the status, if the command failed.
*/
- StatusWith<BSONObj> _callRecipient(const BSONObj& cmdObj);
+ StatusWith<BSONObj> _callRecipient(OperationContext* opCtx, const BSONObj& cmdObj);
StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> _getIndexScanExecutor(
OperationContext* opCtx,