summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEsha Maharishi <esha.maharishi@mongodb.com>2017-03-29 18:11:41 -0400
committerEsha Maharishi <esha.maharishi@mongodb.com>2017-03-30 11:06:36 -0400
commit916d62e8a448e516bae38712f01e78f67ef5293f (patch)
treef1bb5a079e6e557d0c305a8ec0bcbd26dbe5e8c6
parent7677a8aa544a5df51c2bd45f6859cd3b97459108 (diff)
downloadmongo-916d62e8a448e516bae38712f01e78f67ef5293f.tar.gz
SERVER-28542 make ARS store interrupt status instead of CallbackCanceled in remotes if interrupted
-rw-r--r--src/mongo/s/async_requests_sender.cpp14
-rw-r--r--src/mongo/s/async_requests_sender.h17
-rw-r--r--src/mongo/s/commands/cluster_get_last_error_cmd.cpp2
-rw-r--r--src/mongo/s/write_ops/batch_write_exec.cpp2
4 files changed, 20 insertions, 15 deletions
diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp
index bf1cf1bd144..eb512b1a281 100644
--- a/src/mongo/s/async_requests_sender.cpp
+++ b/src/mongo/s/async_requests_sender.cpp
@@ -53,7 +53,7 @@ const int kMaxNumFailedHostRetryAttempts = 3;
AsyncRequestsSender::AsyncRequestsSender(OperationContext* opCtx,
executor::TaskExecutor* executor,
- StringData db,
+ const std::string db,
const std::vector<AsyncRequestsSender::Request>& requests,
const ReadPreferenceSetting& readPreference)
: _opCtx(opCtx), _executor(executor), _db(std::move(db)), _readPreference(readPreference) {
@@ -96,15 +96,14 @@ AsyncRequestsSender::Response AsyncRequestsSender::next() {
boost::optional<Response> readyResponse;
while (!(readyResponse = _ready())) {
// Otherwise, wait for some response to be received.
- if (_checkForInterrupt) {
+ if (_interruptStatus.isOK()) {
try {
_notification->get(_opCtx);
} catch (const UserException& ex) {
// If the operation is interrupted, we cancel outstanding requests and switch to
// waiting for the (canceled) callbacks to finish without checking for interrupts.
- invariant(!_opCtx->checkForInterruptNoAssert().isOK());
+ _interruptStatus = ex.toStatus();
_cancelPendingRequests();
- _checkForInterrupt = false;
continue;
}
} else {
@@ -157,6 +156,11 @@ boost::optional<AsyncRequestsSender::Response> AsyncRequestsSender::_ready() {
std::move(remote.swResponse->getValue()),
std::move(*remote.shardHostAndPort));
} else {
+ // If _interruptStatus is set, promote CallbackCanceled errors to it.
+ if (!_interruptStatus.isOK() &&
+ ErrorCodes::CallbackCanceled == remote.swResponse->getStatus().code()) {
+ remote.swResponse = _interruptStatus;
+ }
return Response(std::move(remote.shardId),
std::move(remote.swResponse->getStatus()),
std::move(remote.shardHostAndPort));
@@ -234,7 +238,7 @@ Status AsyncRequestsSender::_scheduleRequest_inlock(size_t remoteIndex) {
}
executor::RemoteCommandRequest request(
- *remote.shardHostAndPort, _db.toString(), remote.cmdObj, _metadataObj, _opCtx);
+ *remote.shardHostAndPort, _db, remote.cmdObj, _metadataObj, _opCtx);
auto callbackStatus = _executor->scheduleRemoteCommand(
request,
diff --git a/src/mongo/s/async_requests_sender.h b/src/mongo/s/async_requests_sender.h
index 0501ad55c8e..75938b89d91 100644
--- a/src/mongo/s/async_requests_sender.h
+++ b/src/mongo/s/async_requests_sender.h
@@ -123,7 +123,7 @@ public:
*/
AsyncRequestsSender(OperationContext* opCtx,
executor::TaskExecutor* executor,
- StringData db,
+ const std::string db,
const std::vector<AsyncRequestsSender::Request>& requests,
const ReadPreferenceSetting& readPreference);
@@ -249,10 +249,8 @@ private:
void _handleResponse(const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData,
size_t remoteIndex);
- // Not owned here.
OperationContext* _opCtx;
- // Not owned here.
executor::TaskExecutor* _executor;
// The metadata obj to pass along with the command remote. Used to indicate that the command is
@@ -260,15 +258,18 @@ private:
BSONObj _metadataObj;
// The database against which the commands are run.
- const StringData _db;
+ const std::string _db;
// The readPreference to use for all requests.
ReadPreferenceSetting _readPreference;
- // Used to determine whether to check for interrupt when waiting for a remote to be ready.
- // Set to false if we are interrupted, so that we can still wait for callbacks to complete.
- // This is only accessed by the thread in next().
- bool _checkForInterrupt = true;
+ // Is set to a non-OK status if the client operation is interrupted.
+ // When waiting for a remote to be ready, we only check for interrupt if the _interruptStatus
+ // has not already been set to an error (so we can wait for callbacks for (canceled) outstanding
+ // requests to complete after interrupt).
+ // When processing responses from remotes, if _interruptStatus is non-OK and the response status
+ // is CallbackCanceled, we promote the response status to the _interruptStatus.
+ Status _interruptStatus = Status::OK();
// Must be acquired before accessing the below data members.
// Must also be held when calling any of the '_inlock()' helper functions.
diff --git a/src/mongo/s/commands/cluster_get_last_error_cmd.cpp b/src/mongo/s/commands/cluster_get_last_error_cmd.cpp
index 733e318208b..b5a4bfb8015 100644
--- a/src/mongo/s/commands/cluster_get_last_error_cmd.cpp
+++ b/src/mongo/s/commands/cluster_get_last_error_cmd.cpp
@@ -116,7 +116,7 @@ Status enforceLegacyWriteConcern(OperationContext* opCtx,
const ReadPreferenceSetting readPref(ReadPreference::PrimaryOnly, TagSet());
AsyncRequestsSender ars(opCtx,
Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
- dbName,
+ dbName.toString(),
requests,
readPref);
diff --git a/src/mongo/s/write_ops/batch_write_exec.cpp b/src/mongo/s/write_ops/batch_write_exec.cpp
index 99f71ddf85a..c7c5500dd43 100644
--- a/src/mongo/s/write_ops/batch_write_exec.cpp
+++ b/src/mongo/s/write_ops/batch_write_exec.cpp
@@ -204,7 +204,7 @@ void BatchWriteExec::executeBatch(OperationContext* opCtx,
const ReadPreferenceSetting readPref(ReadPreference::PrimaryOnly, TagSet());
AsyncRequestsSender ars(opCtx,
Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
- clientRequest.getTargetingNSS().db(),
+ clientRequest.getTargetingNSS().db().toString(),
requests,
readPref);
numSent += pendingBatches.size();