summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2016-05-03 12:03:06 -0400
committerBenety Goh <benety@mongodb.com>2016-05-16 13:51:28 -0400
commit643439ff4fc91c49ffc2b103ba3858de1a165024 (patch)
tree6cc69534128bf845c5d58e200016c96ac758d3f0 /src
parent6a0904bd38a2deb2de127865943d57ca7cfa6927 (diff)
downloadmongo-643439ff4fc91c49ffc2b103ba3858de1a165024.tar.gz
SERVER-23134 added retry support for scheduling remote commands on a task executor
Diffstat (limited to 'src')
-rw-r--r--src/mongo/client/SConscript22
-rw-r--r--src/mongo/client/fetcher.cpp52
-rw-r--r--src/mongo/client/fetcher.h24
-rw-r--r--src/mongo/client/fetcher_test.cpp511
-rw-r--r--src/mongo/client/remote_command_retry_scheduler.cpp238
-rw-r--r--src/mongo/client/remote_command_retry_scheduler.h201
-rw-r--r--src/mongo/client/remote_command_retry_scheduler_test.cpp450
-rw-r--r--src/mongo/executor/network_interface_mock.cpp8
-rw-r--r--src/mongo/executor/network_interface_mock.h17
-rw-r--r--src/mongo/executor/remote_command_request.cpp19
-rw-r--r--src/mongo/executor/remote_command_request.h7
-rw-r--r--src/mongo/executor/remote_command_response.cpp16
-rw-r--r--src/mongo/executor/remote_command_response.h7
-rw-r--r--src/mongo/executor/task_executor_test_common.cpp8
-rw-r--r--src/mongo/s/client/SConscript3
-rw-r--r--src/mongo/s/client/shard_remote.cpp38
16 files changed, 1343 insertions, 278 deletions
diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript
index 10c78120d79..3145929e5f9 100644
--- a/src/mongo/client/SConscript
+++ b/src/mongo/client/SConscript
@@ -252,6 +252,7 @@ env.Library(
'fetcher.cpp',
],
LIBDEPS=[
+ 'remote_command_retry_scheduler',
'$BUILD_DIR/mongo/executor/task_executor_interface',
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/namespace_string',
@@ -270,3 +271,24 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture',
],
)
+
+env.Library(
+ target='remote_command_retry_scheduler',
+ source=[
+ 'remote_command_retry_scheduler.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/executor/task_executor_interface',
+ '$BUILD_DIR/mongo/base',
+ ],
+)
+
+env.CppUnitTest(
+ target='remote_command_retry_scheduler_test',
+ source='remote_command_retry_scheduler_test.cpp',
+ LIBDEPS=[
+ 'remote_command_retry_scheduler',
+ '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture',
+ '$BUILD_DIR/mongo/unittest/task_executor_proxy',
+ ],
+)
diff --git a/src/mongo/client/fetcher.cpp b/src/mongo/client/fetcher.cpp
index 843cf02a550..acc446298b5 100644
--- a/src/mongo/client/fetcher.cpp
+++ b/src/mongo/client/fetcher.cpp
@@ -144,20 +144,20 @@ Fetcher::Fetcher(executor::TaskExecutor* executor,
const BSONObj& findCmdObj,
const CallbackFn& work,
const BSONObj& metadata,
- Milliseconds timeout)
+ Milliseconds timeout,
+ std::unique_ptr<RemoteCommandRetryScheduler::RetryPolicy> firstCommandRetryPolicy)
: _executor(executor),
_source(source),
_dbname(dbname),
_cmdObj(findCmdObj.getOwned()),
_metadata(metadata.getOwned()),
_work(work),
- _active(false),
- _first(true),
- _remoteCommandCallbackHandle(),
- _timeout(timeout) {
- uassert(ErrorCodes::BadValue, "null task executor", executor);
- uassert(ErrorCodes::BadValue, "database name cannot be empty", !dbname.empty());
- uassert(ErrorCodes::BadValue, "command object cannot be empty", !findCmdObj.isEmpty());
+ _timeout(timeout),
+ _firstRemoteCommandScheduler(
+ _executor,
+ RemoteCommandRequest(_source, _dbname, _cmdObj, _metadata, _timeout),
+ stdx::bind(&Fetcher::_callback, this, stdx::placeholders::_1, kFirstBatchFieldName),
+ std::move(firstCommandRetryPolicy)) {
uassert(ErrorCodes::BadValue, "callback function cannot be null", work);
}
@@ -205,11 +205,18 @@ Status Fetcher::schedule() {
if (_active) {
return Status(ErrorCodes::IllegalOperation, "fetcher already scheduled");
}
- return _schedule_inlock(_cmdObj, kFirstBatchFieldName);
+
+ auto status = _firstRemoteCommandScheduler.startup();
+ if (!status.isOK()) {
+ return status;
+ }
+
+ _active = true;
+ return Status::OK();
}
void Fetcher::cancel() {
- executor::TaskExecutor::CallbackHandle remoteCommandCallbackHandle;
+ executor::TaskExecutor::CallbackHandle handle;
{
stdx::lock_guard<stdx::mutex> lk(_mutex);
@@ -217,11 +224,16 @@ void Fetcher::cancel() {
return;
}
- remoteCommandCallbackHandle = _remoteCommandCallbackHandle;
+ _firstRemoteCommandScheduler.shutdown();
+
+ if (!_getMoreCallbackHandle.isValid()) {
+ return;
+ }
+
+ handle = _getMoreCallbackHandle;
}
- invariant(remoteCommandCallbackHandle.isValid());
- _executor->cancel(remoteCommandCallbackHandle);
+ _executor->cancel(handle);
}
void Fetcher::wait() {
@@ -229,18 +241,19 @@ void Fetcher::wait() {
_condition.wait(lk, [this]() { return !_active; });
}
-Status Fetcher::_schedule_inlock(const BSONObj& cmdObj, const char* batchFieldName) {
+Status Fetcher::_scheduleGetMore(const BSONObj& cmdObj) {
StatusWith<executor::TaskExecutor::CallbackHandle> scheduleResult =
_executor->scheduleRemoteCommand(
RemoteCommandRequest(_source, _dbname, cmdObj, _metadata, _timeout),
- stdx::bind(&Fetcher::_callback, this, stdx::placeholders::_1, batchFieldName));
+ stdx::bind(&Fetcher::_callback, this, stdx::placeholders::_1, kNextBatchFieldName));
if (!scheduleResult.isOK()) {
return scheduleResult.getStatus();
}
- _active = true;
- _remoteCommandCallbackHandle = scheduleResult.getValue();
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _getMoreCallbackHandle = scheduleResult.getValue();
+
return Status::OK();
}
@@ -305,10 +318,7 @@ void Fetcher::_callback(const RemoteCommandCallbackArgs& rcbd, const char* batch
return;
}
- {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- status = _schedule_inlock(cmdObj, kNextBatchFieldName);
- }
+ status = _scheduleGetMore(cmdObj);
if (!status.isOK()) {
nextAction = NextAction::kNoAction;
_work(StatusWith<Fetcher::QueryResponse>(status), nullptr, nullptr);
diff --git a/src/mongo/client/fetcher.h b/src/mongo/client/fetcher.h
index 2050c3687d5..222e5ebf193 100644
--- a/src/mongo/client/fetcher.h
+++ b/src/mongo/client/fetcher.h
@@ -28,6 +28,7 @@
#pragma once
+#include <memory>
#include <string>
#include <vector>
@@ -35,6 +36,7 @@
#include "mongo/base/status.h"
#include "mongo/base/status_with.h"
#include "mongo/bson/bsonobj.h"
+#include "mongo/client/remote_command_retry_scheduler.h"
#include "mongo/db/clientcursor.h"
#include "mongo/db/namespace_string.h"
#include "mongo/executor/task_executor.h"
@@ -116,6 +118,9 @@ public:
*
* The callback function 'work' is not allowed to call into the Fetcher instance. This
* behavior is undefined and may result in a deadlock.
+ *
+ * An optional retry policy may be provided for the first remote command request so that
+ * the remote command scheduler will re-send the command in case of transient network errors.
*/
Fetcher(executor::TaskExecutor* executor,
const HostAndPort& source,
@@ -123,7 +128,9 @@ public:
const BSONObj& cmdObj,
const CallbackFn& work,
const BSONObj& metadata = rpc::makeEmptyMetadata(),
- Milliseconds timeout = RemoteCommandRequest::kNoTimeout);
+ Milliseconds timeout = RemoteCommandRequest::kNoTimeout,
+ std::unique_ptr<RemoteCommandRetryScheduler::RetryPolicy> firstCommandRetryPolicy =
+ RemoteCommandRetryScheduler::makeNoRetryPolicy());
virtual ~Fetcher();
@@ -177,9 +184,9 @@ public:
private:
/**
- * Schedules remote command to be run by the executor
+ * Schedules getMore command to be run by the executor
*/
- Status _schedule_inlock(const BSONObj& cmdObj, const char* batchFieldName);
+ Status _scheduleGetMore(const BSONObj& cmdObj);
/**
* Callback for remote command.
@@ -214,17 +221,20 @@ private:
mutable stdx::condition_variable _condition;
// _active is true when Fetcher is scheduled to be run by the executor.
- bool _active;
+ bool _active = false;
// _first is true for first query response and false for subsequent responses.
// Using boolean instead of a counter to avoid issues with wrap around.
- bool _first;
+ bool _first = true;
- // Callback handle to the scheduled remote command.
- executor::TaskExecutor::CallbackHandle _remoteCommandCallbackHandle;
+ // Callback handle to the scheduled getMore command.
+ executor::TaskExecutor::CallbackHandle _getMoreCallbackHandle;
// Socket timeout
Milliseconds _timeout;
+
+ // First remote command scheduler.
+ RemoteCommandRetryScheduler _firstRemoteCommandScheduler;
};
} // namespace mongo
diff --git a/src/mongo/client/fetcher_test.cpp b/src/mongo/client/fetcher_test.cpp
index 26e7c156c3a..f9398e22547 100644
--- a/src/mongo/client/fetcher_test.cpp
+++ b/src/mongo/client/fetcher_test.cpp
@@ -53,20 +53,35 @@ public:
FetcherTest();
void clear();
void scheduleNetworkResponse(const BSONObj& obj);
- void scheduleNetworkResponse(const BSONObj& obj, Milliseconds millis);
+ void scheduleNetworkResponse(const BSONObj& obj, Milliseconds elapsed);
void scheduleNetworkResponse(ErrorCodes::Error code, const std::string& reason);
void scheduleNetworkResponseFor(const BSONObj& filter, const BSONObj& obj);
void scheduleNetworkResponseFor(NetworkInterfaceMock::NetworkOperationIterator noi,
const BSONObj& obj);
- // Calls scheduleNetworkResponse + finishProcessingNetworkResponse
- void processNetworkResponse(const BSONObj& obj);
- // Calls scheduleNetworkResponse + finishProcessingNetworkResponse
- void processNetworkResponse(ErrorCodes::Error code, const std::string& reason);
+ enum class ReadyQueueState { kEmpty, kHasReadyRequests };
- void finishProcessingNetworkResponse();
+ enum class FetcherState { kInactive, kActive };
+
+ // Calls scheduleNetworkResponse + finishProcessingNetworkResponse
+ void processNetworkResponse(const BSONObj& obj,
+ ReadyQueueState readyQueueStateAfterProcessing,
+ FetcherState fetcherStateAfterProcessing);
+ void processNetworkResponse(const BSONObj& obj,
+ Milliseconds elapsed,
+ ReadyQueueState readyQueueStateAfterProcessing,
+ FetcherState fetcherStateAfterProcessing);
+ void processNetworkResponse(ErrorCodes::Error code,
+ const std::string& reason,
+ ReadyQueueState readyQueueStateAfterProcessing,
+ FetcherState fetcherStateAfterProcessing);
+
+ void finishProcessingNetworkResponse(ReadyQueueState readyQueueStateAfterProcessing,
+ FetcherState fetcherStateAfterProcessing);
protected:
+ Fetcher::CallbackFn makeCallback();
+
void setUp() override;
void tearDown() override;
@@ -90,18 +105,19 @@ private:
FetcherTest::FetcherTest()
: status(getDetectableErrorStatus()), cursorId(-1), nextAction(Fetcher::NextAction::kInvalid) {}
+Fetcher::CallbackFn FetcherTest::makeCallback() {
+ return stdx::bind(&FetcherTest::_callback,
+ this,
+ stdx::placeholders::_1,
+ stdx::placeholders::_2,
+ stdx::placeholders::_3);
+}
+
void FetcherTest::setUp() {
executor::ThreadPoolExecutorTest::setUp();
clear();
- fetcher = stdx::make_unique<Fetcher>(&getExecutor(),
- source,
- "db",
- findCmdObj,
- stdx::bind(&FetcherTest::_callback,
- this,
- stdx::placeholders::_1,
- stdx::placeholders::_2,
- stdx::placeholders::_3));
+ callbackHook = Fetcher::CallbackFn();
+ fetcher = stdx::make_unique<Fetcher>(&getExecutor(), source, "db", findCmdObj, makeCallback());
launchExecutorThread();
}
@@ -119,17 +135,16 @@ void FetcherTest::clear() {
elapsedMillis = Milliseconds(0);
first = false;
nextAction = Fetcher::NextAction::kInvalid;
- callbackHook = Fetcher::CallbackFn();
}
void FetcherTest::scheduleNetworkResponse(const BSONObj& obj) {
scheduleNetworkResponse(obj, Milliseconds(0));
}
-void FetcherTest::scheduleNetworkResponse(const BSONObj& obj, Milliseconds millis) {
+void FetcherTest::scheduleNetworkResponse(const BSONObj& obj, Milliseconds elapsed) {
NetworkInterfaceMock* net = getNet();
ASSERT_TRUE(net->hasReadyRequests());
- executor::RemoteCommandResponse response(obj, BSONObj(), millis);
+ executor::RemoteCommandResponse response(obj, BSONObj(), elapsed);
TaskExecutor::ResponseStatus responseStatus(response);
net->scheduleResponse(net->getNextReadyRequest(), net->now(), responseStatus);
}
@@ -137,8 +152,8 @@ void FetcherTest::scheduleNetworkResponseFor(const BSONObj& filter, const BSONOb
ASSERT_TRUE(filter[1].eoo()); // The filter should only have one field, to match the cmd name
NetworkInterfaceMock* net = getNet();
ASSERT_TRUE(net->hasReadyRequests());
- Milliseconds millis(0);
- executor::RemoteCommandResponse response(obj, BSONObj(), millis);
+ Milliseconds elapsed(0);
+ executor::RemoteCommandResponse response(obj, BSONObj(), elapsed);
TaskExecutor::ResponseStatus responseStatus(response);
auto req = net->getNextReadyRequest();
ASSERT_EQ(req->getRequest().cmdObj[0], filter[0]);
@@ -148,8 +163,8 @@ void FetcherTest::scheduleNetworkResponseFor(const BSONObj& filter, const BSONOb
void FetcherTest::scheduleNetworkResponseFor(NetworkInterfaceMock::NetworkOperationIterator noi,
const BSONObj& obj) {
NetworkInterfaceMock* net = getNet();
- Milliseconds millis(0);
- executor::RemoteCommandResponse response(obj, BSONObj(), millis);
+ Milliseconds elapsed(0);
+ executor::RemoteCommandResponse response(obj, BSONObj(), elapsed);
TaskExecutor::ResponseStatus responseStatus(response);
net->scheduleResponse(noi, net->now(), responseStatus);
}
@@ -161,28 +176,40 @@ void FetcherTest::scheduleNetworkResponse(ErrorCodes::Error code, const std::str
net->scheduleResponse(net->getNextReadyRequest(), net->now(), responseStatus);
}
-void FetcherTest::processNetworkResponse(const BSONObj& obj) {
- auto net = getNet();
- net->enterNetwork();
+void FetcherTest::processNetworkResponse(const BSONObj& obj,
+ ReadyQueueState readyQueueStateAfterProcessing,
+ FetcherState fetcherStateAfterProcessing) {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
scheduleNetworkResponse(obj);
- finishProcessingNetworkResponse();
- net->exitNetwork();
+ finishProcessingNetworkResponse(readyQueueStateAfterProcessing, fetcherStateAfterProcessing);
}
-void FetcherTest::processNetworkResponse(ErrorCodes::Error code, const std::string& reason) {
- auto net = getNet();
- net->enterNetwork();
+void FetcherTest::processNetworkResponse(const BSONObj& obj,
+ Milliseconds elapsed,
+ ReadyQueueState readyQueueStateAfterProcessing,
+ FetcherState fetcherStateAfterProcessing) {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ scheduleNetworkResponse(obj, elapsed);
+ finishProcessingNetworkResponse(readyQueueStateAfterProcessing, fetcherStateAfterProcessing);
+}
+
+void FetcherTest::processNetworkResponse(ErrorCodes::Error code,
+ const std::string& reason,
+ ReadyQueueState readyQueueStateAfterProcessing,
+ FetcherState fetcherStateAfterProcessing) {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
scheduleNetworkResponse(code, reason);
- finishProcessingNetworkResponse();
- net->exitNetwork();
+ finishProcessingNetworkResponse(readyQueueStateAfterProcessing, fetcherStateAfterProcessing);
}
-void FetcherTest::finishProcessingNetworkResponse() {
+void FetcherTest::finishProcessingNetworkResponse(ReadyQueueState readyQueueStateAfterProcessing,
+ FetcherState fetcherStateAfterProcessing) {
clear();
ASSERT_TRUE(fetcher->isActive());
getNet()->runReadyNetworkOperations();
- ASSERT_FALSE(getNet()->hasReadyRequests());
- ASSERT_FALSE(fetcher->isActive());
+ ASSERT_EQUALS(readyQueueStateAfterProcessing == ReadyQueueState::kHasReadyRequests,
+ getNet()->hasReadyRequests());
+ ASSERT_EQUALS(fetcherStateAfterProcessing == FetcherState::kActive, fetcher->isActive());
}
void FetcherTest::_callback(const StatusWith<Fetcher::QueryResponse>& result,
@@ -224,25 +251,46 @@ TEST_F(FetcherTest, InvalidConstruction) {
ASSERT_THROWS_CODE_AND_WHAT(Fetcher(nullptr, source, "db", findCmdObj, unreachableCallback),
UserException,
ErrorCodes::BadValue,
- "null task executor");
+ "task executor cannot be null");
+
+ // Empty source.
+ ASSERT_THROWS_CODE_AND_WHAT(
+ Fetcher(&executor, HostAndPort(), "db", findCmdObj, unreachableCallback),
+ UserException,
+ ErrorCodes::BadValue,
+ "source in remote command request cannot be empty");
// Empty database name.
ASSERT_THROWS_CODE_AND_WHAT(Fetcher(&executor, source, "", findCmdObj, unreachableCallback),
UserException,
ErrorCodes::BadValue,
- "database name cannot be empty");
+ "database name in remote command request cannot be empty");
// Empty command object.
ASSERT_THROWS_CODE_AND_WHAT(Fetcher(&executor, source, "db", BSONObj(), unreachableCallback),
UserException,
ErrorCodes::BadValue,
- "command object cannot be empty");
+ "command object in remote command request cannot be empty");
// Callback function cannot be null.
ASSERT_THROWS_CODE_AND_WHAT(Fetcher(&executor, source, "db", findCmdObj, Fetcher::CallbackFn()),
UserException,
ErrorCodes::BadValue,
"callback function cannot be null");
+
+ // Retry policy for first command cannot be null.
+ ASSERT_THROWS_CODE_AND_WHAT(
+ Fetcher(&executor,
+ source,
+ "db",
+ findCmdObj,
+ unreachableCallback,
+ rpc::makeEmptyMetadata(),
+ RemoteCommandRequest::kNoTimeout,
+ std::unique_ptr<RemoteCommandRetryScheduler::RetryPolicy>()),
+ UserException,
+ ErrorCodes::BadValue,
+ "retry policy cannot be null");
}
// Command object can refer to any command that returns a cursor. This
@@ -275,11 +323,13 @@ TEST_F(FetcherTest, RemoteCommandRequestShouldContainCommandParametersPassedToCo
ASSERT_OK(fetcher->schedule());
auto net = getNet();
- net->enterNetwork();
- ASSERT_TRUE(net->hasReadyRequests());
- auto noi = net->getNextReadyRequest();
- auto request = noi->getRequest();
- net->exitNetwork();
+ executor::RemoteCommandRequest request;
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+ ASSERT_TRUE(net->hasReadyRequests());
+ auto noi = net->getNextReadyRequest();
+ request = noi->getRequest();
+ }
ASSERT_EQUALS(source, request.target);
ASSERT_EQUALS(findCmdObj, request.cmdObj);
@@ -300,7 +350,7 @@ TEST_F(FetcherTest, IsActiveAfterSchedule) {
TEST_F(FetcherTest, ScheduleWhenActive) {
ASSERT_OK(fetcher->schedule());
ASSERT_TRUE(fetcher->isActive());
- ASSERT_NOT_OK(fetcher->schedule());
+ ASSERT_EQUALS(ErrorCodes::IllegalOperation, fetcher->schedule());
}
TEST_F(FetcherTest, CancelWithoutSchedule) {
@@ -315,7 +365,7 @@ TEST_F(FetcherTest, WaitWithoutSchedule) {
TEST_F(FetcherTest, ShutdownBeforeSchedule) {
getExecutor().shutdown();
- ASSERT_NOT_OK(fetcher->schedule());
+ ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, fetcher->schedule());
ASSERT_FALSE(fetcher->isActive());
}
@@ -323,15 +373,17 @@ TEST_F(FetcherTest, ScheduleAndCancel) {
ASSERT_OK(fetcher->schedule());
auto net = getNet();
- net->enterNetwork();
- scheduleNetworkResponse(BSON("ok" << 1));
- net->exitNetwork();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+ scheduleNetworkResponse(BSON("ok" << 1));
+ }
fetcher->cancel();
- net->enterNetwork();
- finishProcessingNetworkResponse();
- net->exitNetwork();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+ finishProcessingNetworkResponse(ReadyQueueState::kEmpty, FetcherState::kInactive);
+ }
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status.code());
}
@@ -340,18 +392,20 @@ TEST_F(FetcherTest, ScheduleButShutdown) {
ASSERT_OK(fetcher->schedule());
auto net = getNet();
- net->enterNetwork();
- scheduleNetworkResponse(BSON("ok" << 1));
- // Network interface should not deliver mock response to callback
- // until runReadyNetworkOperations() is called.
- net->exitNetwork();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+ scheduleNetworkResponse(BSON("ok" << 1));
+ // Network interface should not deliver mock response to callback
+ // until runReadyNetworkOperations() is called.
+ }
ASSERT_TRUE(fetcher->isActive());
getExecutor().shutdown();
- net->enterNetwork();
- net->runReadyNetworkOperations();
- net->exitNetwork();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+ net->runReadyNetworkOperations();
+ }
ASSERT_FALSE(fetcher->isActive());
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status.code());
@@ -359,7 +413,8 @@ TEST_F(FetcherTest, ScheduleButShutdown) {
TEST_F(FetcherTest, FindCommandFailed1) {
ASSERT_OK(fetcher->schedule());
- processNetworkResponse(ErrorCodes::BadValue, "bad hint");
+ processNetworkResponse(
+ ErrorCodes::BadValue, "bad hint", ReadyQueueState::kEmpty, FetcherState::kInactive);
ASSERT_EQUALS(ErrorCodes::BadValue, status.code());
ASSERT_EQUALS("bad hint", status.reason());
}
@@ -368,21 +423,24 @@ TEST_F(FetcherTest, FindCommandFailed2) {
ASSERT_OK(fetcher->schedule());
processNetworkResponse(BSON("ok" << 0 << "errmsg"
<< "bad hint"
- << "code" << int(ErrorCodes::BadValue)));
+ << "code" << int(ErrorCodes::BadValue)),
+ ReadyQueueState::kEmpty,
+ FetcherState::kInactive);
ASSERT_EQUALS(ErrorCodes::BadValue, status.code());
ASSERT_EQUALS("bad hint", status.reason());
}
TEST_F(FetcherTest, CursorFieldMissing) {
ASSERT_OK(fetcher->schedule());
- processNetworkResponse(BSON("ok" << 1));
+ processNetworkResponse(BSON("ok" << 1), ReadyQueueState::kEmpty, FetcherState::kInactive);
ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code());
ASSERT_STRING_CONTAINS(status.reason(), "must contain 'cursor' field");
}
TEST_F(FetcherTest, CursorNotAnObject) {
ASSERT_OK(fetcher->schedule());
- processNetworkResponse(BSON("cursor" << 123 << "ok" << 1));
+ processNetworkResponse(
+ BSON("cursor" << 123 << "ok" << 1), ReadyQueueState::kEmpty, FetcherState::kInactive);
ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code());
ASSERT_STRING_CONTAINS(status.reason(), "'cursor' field must be an object");
}
@@ -391,17 +449,20 @@ TEST_F(FetcherTest, CursorIdFieldMissing) {
ASSERT_OK(fetcher->schedule());
processNetworkResponse(BSON("cursor" << BSON("ns"
<< "db.coll"
- << "firstBatch" << BSONArray()) << "ok" << 1));
+ << "firstBatch" << BSONArray()) << "ok" << 1),
+ ReadyQueueState::kEmpty,
+ FetcherState::kInactive);
ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code());
ASSERT_STRING_CONTAINS(status.reason(), "must contain 'cursor.id' field");
}
TEST_F(FetcherTest, CursorIdNotLongNumber) {
ASSERT_OK(fetcher->schedule());
- processNetworkResponse(
- BSON("cursor" << BSON("id" << 123.1 << "ns"
- << "db.coll"
- << "firstBatch" << BSONArray()) << "ok" << 1));
+ processNetworkResponse(BSON("cursor" << BSON("id" << 123.1 << "ns"
+ << "db.coll"
+ << "firstBatch" << BSONArray()) << "ok" << 1),
+ ReadyQueueState::kEmpty,
+ FetcherState::kInactive);
ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code());
ASSERT_STRING_CONTAINS(status.reason(), "'cursor.id' field must be");
ASSERT_EQ((int)Fetcher::NextAction::kInvalid, (int)nextAction);
@@ -410,7 +471,9 @@ TEST_F(FetcherTest, CursorIdNotLongNumber) {
TEST_F(FetcherTest, NamespaceFieldMissing) {
ASSERT_OK(fetcher->schedule());
processNetworkResponse(
- BSON("cursor" << BSON("id" << 123LL << "firstBatch" << BSONArray()) << "ok" << 1));
+ BSON("cursor" << BSON("id" << 123LL << "firstBatch" << BSONArray()) << "ok" << 1),
+ ReadyQueueState::kEmpty,
+ FetcherState::kInactive);
ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code());
ASSERT_STRING_CONTAINS(status.reason(), "must contain 'cursor.ns' field");
}
@@ -418,27 +481,31 @@ TEST_F(FetcherTest, NamespaceFieldMissing) {
TEST_F(FetcherTest, NamespaceNotAString) {
ASSERT_OK(fetcher->schedule());
processNetworkResponse(BSON("cursor" << BSON("id" << 123LL << "ns" << 123 << "firstBatch"
- << BSONArray()) << "ok" << 1));
+ << BSONArray()) << "ok" << 1),
+ ReadyQueueState::kEmpty,
+ FetcherState::kInactive);
ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code());
ASSERT_STRING_CONTAINS(status.reason(), "'cursor.ns' field must be a string");
}
TEST_F(FetcherTest, NamespaceEmpty) {
ASSERT_OK(fetcher->schedule());
- processNetworkResponse(
- BSON("cursor" << BSON("id" << 123LL << "ns"
- << ""
- << "firstBatch" << BSONArray()) << "ok" << 1));
+ processNetworkResponse(BSON("cursor" << BSON("id" << 123LL << "ns"
+ << ""
+ << "firstBatch" << BSONArray()) << "ok" << 1),
+ ReadyQueueState::kEmpty,
+ FetcherState::kInactive);
ASSERT_EQUALS(ErrorCodes::BadValue, status.code());
ASSERT_STRING_CONTAINS(status.reason(), "'cursor.ns' contains an invalid namespace");
}
TEST_F(FetcherTest, NamespaceMissingCollectionName) {
ASSERT_OK(fetcher->schedule());
- processNetworkResponse(
- BSON("cursor" << BSON("id" << 123LL << "ns"
- << "db."
- << "firstBatch" << BSONArray()) << "ok" << 1));
+ processNetworkResponse(BSON("cursor" << BSON("id" << 123LL << "ns"
+ << "db."
+ << "firstBatch" << BSONArray()) << "ok" << 1),
+ ReadyQueueState::kEmpty,
+ FetcherState::kInactive);
ASSERT_EQUALS(ErrorCodes::BadValue, status.code());
ASSERT_STRING_CONTAINS(status.reason(), "'cursor.ns' contains an invalid namespace");
}
@@ -446,7 +513,9 @@ TEST_F(FetcherTest, NamespaceMissingCollectionName) {
TEST_F(FetcherTest, FirstBatchFieldMissing) {
ASSERT_OK(fetcher->schedule());
processNetworkResponse(BSON("cursor" << BSON("id" << 0LL << "ns"
- << "db.coll") << "ok" << 1));
+ << "db.coll") << "ok" << 1),
+ ReadyQueueState::kEmpty,
+ FetcherState::kInactive);
ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code());
ASSERT_STRING_CONTAINS(status.reason(), "must contain 'cursor.firstBatch' field");
}
@@ -455,7 +524,9 @@ TEST_F(FetcherTest, FirstBatchNotAnArray) {
ASSERT_OK(fetcher->schedule());
processNetworkResponse(BSON("cursor" << BSON("id" << 0LL << "ns"
<< "db.coll"
- << "firstBatch" << 123) << "ok" << 1));
+ << "firstBatch" << 123) << "ok" << 1),
+ ReadyQueueState::kEmpty,
+ FetcherState::kInactive);
ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code());
ASSERT_STRING_CONTAINS(status.reason(), "'cursor.firstBatch' field must be an array");
}
@@ -465,7 +536,9 @@ TEST_F(FetcherTest, FirstBatchArrayContainsNonObject) {
processNetworkResponse(
BSON("cursor" << BSON("id" << 0LL << "ns"
<< "db.coll"
- << "firstBatch" << BSON_ARRAY(8)) << "ok" << 1));
+ << "firstBatch" << BSON_ARRAY(8)) << "ok" << 1),
+ ReadyQueueState::kEmpty,
+ FetcherState::kInactive);
ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code());
ASSERT_STRING_CONTAINS(status.reason(), "found non-object");
ASSERT_STRING_CONTAINS(status.reason(), "in 'cursor.firstBatch' field");
@@ -473,10 +546,11 @@ TEST_F(FetcherTest, FirstBatchArrayContainsNonObject) {
TEST_F(FetcherTest, FirstBatchEmptyArray) {
ASSERT_OK(fetcher->schedule());
- processNetworkResponse(
- BSON("cursor" << BSON("id" << 0LL << "ns"
- << "db.coll"
- << "firstBatch" << BSONArray()) << "ok" << 1));
+ processNetworkResponse(BSON("cursor" << BSON("id" << 0LL << "ns"
+ << "db.coll"
+ << "firstBatch" << BSONArray()) << "ok" << 1),
+ ReadyQueueState::kEmpty,
+ FetcherState::kInactive);
ASSERT_OK(status);
ASSERT_EQUALS(0, cursorId);
ASSERT_EQUALS("db.coll", nss.ns());
@@ -489,7 +563,9 @@ TEST_F(FetcherTest, FetchOneDocument) {
processNetworkResponse(
BSON("cursor" << BSON("id" << 0LL << "ns"
<< "db.coll"
- << "firstBatch" << BSON_ARRAY(doc)) << "ok" << 1));
+ << "firstBatch" << BSON_ARRAY(doc)) << "ok" << 1),
+ ReadyQueueState::kEmpty,
+ FetcherState::kInactive);
ASSERT_OK(status);
ASSERT_EQUALS(0, cursorId);
ASSERT_EQUALS("db.coll", nss.ns());
@@ -513,7 +589,9 @@ TEST_F(FetcherTest, SetNextActionToContinueWhenNextBatchIsNotAvailable) {
processNetworkResponse(
BSON("cursor" << BSON("id" << 0LL << "ns"
<< "db.coll"
- << "firstBatch" << BSON_ARRAY(doc)) << "ok" << 1));
+ << "firstBatch" << BSON_ARRAY(doc)) << "ok" << 1),
+ ReadyQueueState::kEmpty,
+ FetcherState::kInactive);
ASSERT_OK(status);
ASSERT_EQUALS(0, cursorId);
ASSERT_EQUALS("db.coll", nss.ns());
@@ -539,15 +617,13 @@ TEST_F(FetcherTest, FetchMultipleBatches) {
const BSONObj doc = BSON("_id" << 1);
- auto net = getNet();
- net->enterNetwork();
- scheduleNetworkResponse(
+ processNetworkResponse(
BSON("cursor" << BSON("id" << 1LL << "ns"
<< "db.coll"
<< "firstBatch" << BSON_ARRAY(doc)) << "ok" << 1),
- Milliseconds(100));
- getNet()->runReadyNetworkOperations();
- net->exitNetwork();
+ Milliseconds(100),
+ ReadyQueueState::kHasReadyRequests,
+ FetcherState::kActive);
ASSERT_OK(status);
ASSERT_EQUALS(1LL, cursorId);
@@ -557,19 +633,16 @@ TEST_F(FetcherTest, FetchMultipleBatches) {
ASSERT_EQUALS(elapsedMillis, Milliseconds(100));
ASSERT_TRUE(first);
ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction);
- ASSERT_TRUE(fetcher->isActive());
const BSONObj doc2 = BSON("_id" << 2);
- net->enterNetwork();
- ASSERT_TRUE(getNet()->hasReadyRequests());
- scheduleNetworkResponse(
+ processNetworkResponse(
BSON("cursor" << BSON("id" << 1LL << "ns"
<< "db.coll"
<< "nextBatch" << BSON_ARRAY(doc2)) << "ok" << 1),
- Milliseconds(200));
- getNet()->runReadyNetworkOperations();
- net->exitNetwork();
+ Milliseconds(200),
+ ReadyQueueState::kHasReadyRequests,
+ FetcherState::kActive);
ASSERT_OK(status);
ASSERT_EQUALS(1LL, cursorId);
@@ -579,19 +652,16 @@ TEST_F(FetcherTest, FetchMultipleBatches) {
ASSERT_EQUALS(elapsedMillis, Milliseconds(200));
ASSERT_FALSE(first);
ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction);
- ASSERT_TRUE(fetcher->isActive());
const BSONObj doc3 = BSON("_id" << 3);
- net->enterNetwork();
- ASSERT_TRUE(getNet()->hasReadyRequests());
- scheduleNetworkResponse(
+ processNetworkResponse(
BSON("cursor" << BSON("id" << 0LL << "ns"
<< "db.coll"
<< "nextBatch" << BSON_ARRAY(doc3)) << "ok" << 1),
- Milliseconds(300));
- getNet()->runReadyNetworkOperations();
- net->exitNetwork();
+ Milliseconds(300),
+ ReadyQueueState::kEmpty,
+ FetcherState::kInactive);
ASSERT_OK(status);
ASSERT_EQUALS(0, cursorId);
@@ -601,11 +671,6 @@ TEST_F(FetcherTest, FetchMultipleBatches) {
ASSERT_EQUALS(elapsedMillis, Milliseconds(300));
ASSERT_FALSE(first);
ASSERT_TRUE(Fetcher::NextAction::kNoAction == nextAction);
- ASSERT_FALSE(fetcher->isActive());
-
- net->enterNetwork();
- ASSERT_FALSE(getNet()->hasReadyRequests());
- net->exitNetwork();
}
TEST_F(FetcherTest, ScheduleGetMoreAndCancel) {
@@ -615,14 +680,12 @@ TEST_F(FetcherTest, ScheduleGetMoreAndCancel) {
const BSONObj doc = BSON("_id" << 1);
- auto net = getNet();
- net->enterNetwork();
- scheduleNetworkResponse(
+ processNetworkResponse(
BSON("cursor" << BSON("id" << 1LL << "ns"
<< "db.coll"
- << "firstBatch" << BSON_ARRAY(doc)) << "ok" << 1));
- getNet()->runReadyNetworkOperations();
- net->exitNetwork();
+ << "firstBatch" << BSON_ARRAY(doc)) << "ok" << 1),
+ ReadyQueueState::kHasReadyRequests,
+ FetcherState::kActive);
ASSERT_OK(status);
ASSERT_EQUALS(1LL, cursorId);
@@ -630,17 +693,14 @@ TEST_F(FetcherTest, ScheduleGetMoreAndCancel) {
ASSERT_EQUALS(1U, documents.size());
ASSERT_EQUALS(doc, documents.front());
ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction);
- ASSERT_TRUE(fetcher->isActive());
const BSONObj doc2 = BSON("_id" << 2);
- net->enterNetwork();
- ASSERT_TRUE(getNet()->hasReadyRequests());
- scheduleNetworkResponse(
+ processNetworkResponse(
BSON("cursor" << BSON("id" << 1LL << "ns"
<< "db.coll"
- << "nextBatch" << BSON_ARRAY(doc2)) << "ok" << 1));
- getNet()->runReadyNetworkOperations();
- net->exitNetwork();
+ << "nextBatch" << BSON_ARRAY(doc2)) << "ok" << 1),
+ ReadyQueueState::kHasReadyRequests,
+ FetcherState::kActive);
ASSERT_OK(status);
ASSERT_EQUALS(1LL, cursorId);
@@ -648,15 +708,16 @@ TEST_F(FetcherTest, ScheduleGetMoreAndCancel) {
ASSERT_EQUALS(1U, documents.size());
ASSERT_EQUALS(doc2, documents.front());
ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction);
- ASSERT_TRUE(fetcher->isActive());
fetcher->cancel();
- net->enterNetwork();
- finishProcessingNetworkResponse();
- net->exitNetwork();
+ {
+ auto net = getNet();
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+ finishProcessingNetworkResponse(ReadyQueueState::kEmpty, FetcherState::kInactive);
+ }
- ASSERT_NOT_OK(status);
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status);
}
TEST_F(FetcherTest, ScheduleGetMoreButShutdown) {
@@ -666,14 +727,12 @@ TEST_F(FetcherTest, ScheduleGetMoreButShutdown) {
const BSONObj doc = BSON("_id" << 1);
- auto net = getNet();
- net->enterNetwork();
- scheduleNetworkResponse(
+ processNetworkResponse(
BSON("cursor" << BSON("id" << 1LL << "ns"
<< "db.coll"
- << "firstBatch" << BSON_ARRAY(doc)) << "ok" << 1));
- getNet()->runReadyNetworkOperations();
- net->exitNetwork();
+ << "firstBatch" << BSON_ARRAY(doc)) << "ok" << 1),
+ ReadyQueueState::kHasReadyRequests,
+ FetcherState::kActive);
ASSERT_OK(status);
ASSERT_EQUALS(1LL, cursorId);
@@ -681,18 +740,15 @@ TEST_F(FetcherTest, ScheduleGetMoreButShutdown) {
ASSERT_EQUALS(1U, documents.size());
ASSERT_EQUALS(doc, documents.front());
ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction);
- ASSERT_TRUE(fetcher->isActive());
const BSONObj doc2 = BSON("_id" << 2);
- net->enterNetwork();
- ASSERT_TRUE(getNet()->hasReadyRequests());
- scheduleNetworkResponse(
+ processNetworkResponse(
BSON("cursor" << BSON("id" << 1LL << "ns"
<< "db.coll"
- << "nextBatch" << BSON_ARRAY(doc2)) << "ok" << 1));
- getNet()->runReadyNetworkOperations();
- net->exitNetwork();
+ << "nextBatch" << BSON_ARRAY(doc2)) << "ok" << 1),
+ ReadyQueueState::kHasReadyRequests,
+ FetcherState::kActive);
ASSERT_OK(status);
ASSERT_EQUALS(1LL, cursorId);
@@ -700,15 +756,16 @@ TEST_F(FetcherTest, ScheduleGetMoreButShutdown) {
ASSERT_EQUALS(1U, documents.size());
ASSERT_EQUALS(doc2, documents.front());
ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction);
- ASSERT_TRUE(fetcher->isActive());
getExecutor().shutdown();
- net->enterNetwork();
- net->runReadyNetworkOperations();
- net->exitNetwork();
+ {
+ auto net = getNet();
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+ net->runReadyNetworkOperations();
+ }
- ASSERT_NOT_OK(status);
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status);
}
@@ -723,14 +780,12 @@ TEST_F(FetcherTest, EmptyGetMoreRequestAfterFirstBatchMakesFetcherInactiveAndKil
const BSONObj doc = BSON("_id" << 1);
- auto net = getNet();
- net->enterNetwork();
- scheduleNetworkResponse(
+ processNetworkResponse(
BSON("cursor" << BSON("id" << 1LL << "ns"
<< "db.coll"
- << "firstBatch" << BSON_ARRAY(doc)) << "ok" << 1));
- getNet()->runReadyNetworkOperations();
- net->exitNetwork();
+ << "firstBatch" << BSON_ARRAY(doc)) << "ok" << 1),
+ ReadyQueueState::kHasReadyRequests,
+ FetcherState::kInactive);
ASSERT_OK(status);
ASSERT_EQUALS(1LL, cursorId);
@@ -738,13 +793,15 @@ TEST_F(FetcherTest, EmptyGetMoreRequestAfterFirstBatchMakesFetcherInactiveAndKil
ASSERT_EQUALS(1U, documents.size());
ASSERT_EQUALS(doc, documents.front());
ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction);
- ASSERT_FALSE(fetcher->isActive());
- net->enterNetwork();
- ASSERT_TRUE(getNet()->hasReadyRequests());
- auto noi = getNet()->getNextReadyRequest();
- auto request = noi->getRequest();
- net->exitNetwork();
+ executor::RemoteCommandRequest request;
+ {
+ auto net = getNet();
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+ ASSERT_TRUE(getNet()->hasReadyRequests());
+ auto noi = getNet()->getNextReadyRequest();
+ request = noi->getRequest();
+ }
ASSERT_EQUALS(nss.db(), request.dbname);
auto&& cmdObj = request.cmdObj;
@@ -778,14 +835,12 @@ TEST_F(FetcherTest, UpdateNextActionAfterSecondBatch) {
const BSONObj doc = BSON("_id" << 1);
- auto net = getNet();
- net->enterNetwork();
- scheduleNetworkResponse(
+ processNetworkResponse(
BSON("cursor" << BSON("id" << 1LL << "ns"
<< "db.coll"
- << "firstBatch" << BSON_ARRAY(doc)) << "ok" << 1));
- getNet()->runReadyNetworkOperations();
- net->exitNetwork();
+ << "firstBatch" << BSON_ARRAY(doc)) << "ok" << 1),
+ ReadyQueueState::kHasReadyRequests,
+ FetcherState::kActive);
ASSERT_OK(status);
ASSERT_EQUALS(1LL, cursorId);
@@ -793,20 +848,17 @@ TEST_F(FetcherTest, UpdateNextActionAfterSecondBatch) {
ASSERT_EQUALS(1U, documents.size());
ASSERT_EQUALS(doc, documents.front());
ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction);
- ASSERT_TRUE(fetcher->isActive());
const BSONObj doc2 = BSON("_id" << 2);
callbackHook = setNextActionToNoAction;
- net->enterNetwork();
- ASSERT_TRUE(getNet()->hasReadyRequests());
- scheduleNetworkResponse(
+ processNetworkResponse(
BSON("cursor" << BSON("id" << 1LL << "ns"
<< "db.coll"
- << "nextBatch" << BSON_ARRAY(doc2)) << "ok" << 1));
- getNet()->runReadyNetworkOperations();
- net->exitNetwork();
+ << "nextBatch" << BSON_ARRAY(doc2)) << "ok" << 1),
+ ReadyQueueState::kHasReadyRequests,
+ FetcherState::kInactive);
ASSERT_OK(status);
ASSERT_EQUALS(1LL, cursorId);
@@ -814,29 +866,28 @@ TEST_F(FetcherTest, UpdateNextActionAfterSecondBatch) {
ASSERT_EQUALS(1U, documents.size());
ASSERT_EQUALS(doc2, documents.front());
ASSERT_TRUE(Fetcher::NextAction::kNoAction == nextAction);
- ASSERT_FALSE(fetcher->isActive());
-
- net->enterNetwork();
- ASSERT_TRUE(getNet()->hasReadyRequests());
- auto noi = getNet()->getNextReadyRequest();
- auto request = noi->getRequest();
- net->exitNetwork();
-
- ASSERT_EQUALS(nss.db(), request.dbname);
- auto&& cmdObj = request.cmdObj;
- auto firstElement = cmdObj.firstElement();
- ASSERT_EQUALS("killCursors", firstElement.fieldNameStringData());
- ASSERT_EQUALS(nss.coll(), firstElement.String());
- ASSERT_EQUALS(mongo::BSONType::Array, cmdObj["cursors"].type());
- auto cursors = cmdObj["cursors"].Array();
- ASSERT_EQUALS(1U, cursors.size());
- ASSERT_EQUALS(cursorId, cursors.front().numberLong());
- // Failed killCursors command response should be logged.
- net->enterNetwork();
- scheduleNetworkResponseFor(noi, BSON("ok" << false));
- getNet()->runReadyNetworkOperations();
- net->exitNetwork();
+ {
+ auto net = getNet();
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+ ASSERT_TRUE(net->hasReadyRequests());
+ auto noi = net->getNextReadyRequest();
+ auto request = noi->getRequest();
+
+ ASSERT_EQUALS(nss.db(), request.dbname);
+ auto&& cmdObj = request.cmdObj;
+ auto firstElement = cmdObj.firstElement();
+ ASSERT_EQUALS("killCursors", firstElement.fieldNameStringData());
+ ASSERT_EQUALS(nss.coll(), firstElement.String());
+ ASSERT_EQUALS(mongo::BSONType::Array, cmdObj["cursors"].type());
+ auto cursors = cmdObj["cursors"].Array();
+ ASSERT_EQUALS(1U, cursors.size());
+ ASSERT_EQUALS(cursorId, cursors.front().numberLong());
+
+ // Failed killCursors command response should be logged.
+ scheduleNetworkResponseFor(noi, BSON("ok" << false));
+ getNet()->runReadyNetworkOperations();
+ }
ASSERT_EQUALS(1, countLogLinesContaining("killCursors command failed: UnknownError"));
}
@@ -877,14 +928,12 @@ TEST_F(FetcherTest, ShutdownDuringSecondBatch) {
const BSONObj doc = BSON("_id" << 1);
- auto net = getNet();
- net->enterNetwork();
- scheduleNetworkResponse(
+ processNetworkResponse(
BSON("cursor" << BSON("id" << 1LL << "ns"
<< "db.coll"
- << "firstBatch" << BSON_ARRAY(doc)) << "ok" << 1));
- getNet()->runReadyNetworkOperations();
- net->exitNetwork();
+ << "firstBatch" << BSON_ARRAY(doc)) << "ok" << 1),
+ ReadyQueueState::kHasReadyRequests,
+ FetcherState::kActive);
ASSERT_OK(status);
ASSERT_EQUALS(1LL, cursorId);
@@ -892,7 +941,6 @@ TEST_F(FetcherTest, ShutdownDuringSecondBatch) {
ASSERT_EQUALS(1U, documents.size());
ASSERT_EQUALS(doc, documents.front());
ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction);
- ASSERT_TRUE(fetcher->isActive());
const BSONObj doc2 = BSON("_id" << 2);
@@ -905,14 +953,12 @@ TEST_F(FetcherTest, ShutdownDuringSecondBatch) {
&getExecutor(),
&isShutdownCalled);
- net->enterNetwork();
- ASSERT_TRUE(getNet()->hasReadyRequests());
- scheduleNetworkResponse(
+ processNetworkResponse(
BSON("cursor" << BSON("id" << 1LL << "ns"
<< "db.coll"
- << "nextBatch" << BSON_ARRAY(doc2)) << "ok" << 1));
- getNet()->runReadyNetworkOperations();
- net->exitNetwork();
+ << "nextBatch" << BSON_ARRAY(doc2)) << "ok" << 1),
+ ReadyQueueState::kEmpty,
+ FetcherState::kInactive);
// Fetcher should attempt (unsuccessfully) to schedule a killCursors command.
ASSERT_EQUALS(
@@ -921,7 +967,54 @@ TEST_F(FetcherTest, ShutdownDuringSecondBatch) {
"failed to schedule killCursors command: ShutdownInProgress: Shutdown in progress"));
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status.code());
- ASSERT_FALSE(fetcher->isActive());
+}
+
+TEST_F(FetcherTest, FetcherAppliesRetryPolicyToFirstCommandButNotToGetMoreRequests) {
+ auto policy = RemoteCommandRetryScheduler::makeRetryPolicy(
+ 3U,
+ executor::RemoteCommandRequest::kNoTimeout,
+ {ErrorCodes::BadValue, ErrorCodes::InternalError});
+
+ fetcher = stdx::make_unique<Fetcher>(&getExecutor(),
+ source,
+ "db",
+ findCmdObj,
+ makeCallback(),
+ rpc::makeEmptyMetadata(),
+ executor::RemoteCommandRequest::kNoTimeout,
+ std::move(policy));
+
+ callbackHook = appendGetMoreRequest;
+
+ ASSERT_OK(fetcher->schedule());
+
+ // Retry policy is applied to find command.
+ const BSONObj doc = BSON("_id" << 1);
+ processNetworkResponse(
+ ErrorCodes::BadValue, "first", ReadyQueueState::kHasReadyRequests, FetcherState::kActive);
+ processNetworkResponse(ErrorCodes::InternalError,
+ "second",
+ ReadyQueueState::kHasReadyRequests,
+ FetcherState::kActive);
+ processNetworkResponse(
+ BSON("cursor" << BSON("id" << 1LL << "ns"
+ << "db.coll"
+ << "firstBatch" << BSON_ARRAY(doc)) << "ok" << 1),
+ ReadyQueueState::kHasReadyRequests,
+ FetcherState::kActive);
+ ASSERT_OK(status);
+ ASSERT_EQUALS(1LL, cursorId);
+ ASSERT_EQUALS("db.coll", nss.ns());
+ ASSERT_EQUALS(1U, documents.size());
+ ASSERT_EQUALS(doc, documents.front());
+ ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction);
+
+ // No retry policy for subsequent getMore commands.
+ processNetworkResponse(ErrorCodes::OperationFailed,
+ "getMore failed",
+ ReadyQueueState::kEmpty,
+ FetcherState::kInactive);
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, status);
}
} // namespace
diff --git a/src/mongo/client/remote_command_retry_scheduler.cpp b/src/mongo/client/remote_command_retry_scheduler.cpp
new file mode 100644
index 00000000000..f290a85484f
--- /dev/null
+++ b/src/mongo/client/remote_command_retry_scheduler.cpp
@@ -0,0 +1,238 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kExecutor
+
+#include "mongo/platform/basic.h"
+
+#include <algorithm>
+#include <vector>
+
+#include "mongo/client/remote_command_retry_scheduler.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+
+namespace {
+
+class RetryPolicyImpl : public RemoteCommandRetryScheduler::RetryPolicy {
+public:
+ RetryPolicyImpl(std::size_t maximumAttempts,
+ Milliseconds maximumResponseElapsedTotal,
+ const std::initializer_list<ErrorCodes::Error>& retryableErrors);
+ std::size_t getMaximumAttempts() const override;
+ Milliseconds getMaximumResponseElapsedTotal() const override;
+ bool shouldRetryOnError(ErrorCodes::Error error) const override;
+
+private:
+ std::size_t _maximumAttempts;
+ Milliseconds _maximumResponseElapsedTotal;
+ std::vector<ErrorCodes::Error> _retryableErrors;
+};
+
+RetryPolicyImpl::RetryPolicyImpl(std::size_t maximumAttempts,
+ Milliseconds maximumResponseElapsedTotal,
+ const std::initializer_list<ErrorCodes::Error>& retryableErrors)
+ : _maximumAttempts(maximumAttempts),
+ _maximumResponseElapsedTotal(maximumResponseElapsedTotal),
+ _retryableErrors(retryableErrors) {
+ std::sort(_retryableErrors.begin(), _retryableErrors.end());
+}
+
+std::size_t RetryPolicyImpl::getMaximumAttempts() const {
+ return _maximumAttempts;
+}
+
+Milliseconds RetryPolicyImpl::getMaximumResponseElapsedTotal() const {
+ return _maximumResponseElapsedTotal;
+}
+
+bool RetryPolicyImpl::shouldRetryOnError(ErrorCodes::Error error) const {
+ return std::binary_search(_retryableErrors.cbegin(), _retryableErrors.cend(), error);
+}
+
+} // namespace
+
+const std::initializer_list<ErrorCodes::Error> RemoteCommandRetryScheduler::kNotMasterErrors{
+ ErrorCodes::NotMaster, ErrorCodes::NotMasterNoSlaveOk, ErrorCodes::NotMasterOrSecondary};
+
+const std::initializer_list<ErrorCodes::Error> RemoteCommandRetryScheduler::kAllRetriableErrors{
+ ErrorCodes::NotMaster,
+ ErrorCodes::NotMasterNoSlaveOk,
+ ErrorCodes::NotMasterOrSecondary,
+ // If write concern failed to be satisfied on the remote server, this most probably means that
+ // some of the secondary nodes were unreachable or otherwise unresponsive, so the call is safe
+ // to be retried if idempotency can be guaranteed.
+ ErrorCodes::WriteConcernFailed,
+ ErrorCodes::HostUnreachable,
+ ErrorCodes::HostNotFound,
+ ErrorCodes::NetworkTimeout,
+ ErrorCodes::InterruptedDueToReplStateChange};
+
+std::unique_ptr<RemoteCommandRetryScheduler::RetryPolicy>
+RemoteCommandRetryScheduler::makeNoRetryPolicy() {
+ return makeRetryPolicy(1U, executor::RemoteCommandRequest::kNoTimeout, {});
+}
+
+std::unique_ptr<RemoteCommandRetryScheduler::RetryPolicy>
+RemoteCommandRetryScheduler::makeRetryPolicy(
+ std::size_t maxAttempts,
+ Milliseconds maxResponseElapsedTotal,
+ const std::initializer_list<ErrorCodes::Error>& retryableErrors) {
+ std::unique_ptr<RetryPolicy> policy =
+ stdx::make_unique<RetryPolicyImpl>(maxAttempts, maxResponseElapsedTotal, retryableErrors);
+ return policy;
+}
+
+RemoteCommandRetryScheduler::RemoteCommandRetryScheduler(
+ executor::TaskExecutor* executor,
+ const executor::RemoteCommandRequest& request,
+ const executor::TaskExecutor::RemoteCommandCallbackFn& callback,
+ std::unique_ptr<RetryPolicy> retryPolicy)
+ : _executor(executor),
+ _request(request),
+ _callback(callback),
+ _retryPolicy(std::move(retryPolicy)) {
+ uassert(ErrorCodes::BadValue, "task executor cannot be null", executor);
+ uassert(ErrorCodes::BadValue,
+ "source in remote command request cannot be empty",
+ !request.target.empty());
+ uassert(ErrorCodes::BadValue,
+ "database name in remote command request cannot be empty",
+ !request.dbname.empty());
+ uassert(ErrorCodes::BadValue,
+ "command object in remote command request cannot be empty",
+ !request.cmdObj.isEmpty());
+ uassert(ErrorCodes::BadValue, "remote command callback function cannot be null", callback);
+ uassert(ErrorCodes::BadValue, "retry policy cannot be null", _retryPolicy);
+ uassert(ErrorCodes::BadValue,
+ "policy max attempts cannot be zero",
+ _retryPolicy->getMaximumAttempts() != 0);
+ uassert(ErrorCodes::BadValue,
+ "policy max response elapsed total cannot be negative",
+ !(_retryPolicy->getMaximumResponseElapsedTotal() !=
+ executor::RemoteCommandRequest::kNoTimeout &&
+ _retryPolicy->getMaximumResponseElapsedTotal() < Milliseconds(0)));
+}
+
+RemoteCommandRetryScheduler::~RemoteCommandRetryScheduler() {
+ DESTRUCTOR_GUARD(shutdown(); join(););
+}
+
+bool RemoteCommandRetryScheduler::isActive() const {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ return _active;
+}
+
+Status RemoteCommandRetryScheduler::startup() {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+
+ if (_active) {
+ return Status(ErrorCodes::IllegalOperation, "fetcher already scheduled");
+ }
+
+ auto scheduleStatus = _schedule_inlock(0);
+ if (!scheduleStatus.isOK()) {
+ return scheduleStatus;
+ }
+
+ _active = true;
+ return Status::OK();
+}
+
+void RemoteCommandRetryScheduler::shutdown() {
+ executor::TaskExecutor::CallbackHandle remoteCommandCallbackHandle;
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+
+ if (!_active) {
+ return;
+ }
+
+ remoteCommandCallbackHandle = _remoteCommandCallbackHandle;
+ }
+
+ invariant(remoteCommandCallbackHandle.isValid());
+ _executor->cancel(remoteCommandCallbackHandle);
+}
+
+void RemoteCommandRetryScheduler::join() {
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+ _condition.wait(lock, [this]() { return !_active; });
+}
+
+Status RemoteCommandRetryScheduler::_schedule_inlock(std::size_t requestCount) {
+ auto scheduleResult = _executor->scheduleRemoteCommand(
+ _request,
+ stdx::bind(&RemoteCommandRetryScheduler::_remoteCommandCallback,
+ this,
+ stdx::placeholders::_1,
+ requestCount + 1));
+
+ if (!scheduleResult.isOK()) {
+ return scheduleResult.getStatus();
+ }
+
+ _remoteCommandCallbackHandle = scheduleResult.getValue();
+ return Status::OK();
+}
+
+void RemoteCommandRetryScheduler::_remoteCommandCallback(
+ const executor::TaskExecutor::RemoteCommandCallbackArgs& rcba, std::size_t requestCount) {
+ auto status = rcba.response.getStatus();
+
+ if (status.isOK() || status == ErrorCodes::CallbackCanceled ||
+ requestCount == _retryPolicy->getMaximumAttempts() ||
+ !_retryPolicy->shouldRetryOnError(status.code())) {
+ _onComplete(rcba);
+ return;
+ }
+
+ // TODO(benety): Check cumulative elapsed time of failed responses received against retry
+ // policy. Requires SERVER-24067.
+
+ auto scheduleStatus = _schedule_inlock(requestCount);
+ if (!scheduleStatus.isOK()) {
+ _onComplete({rcba.executor, rcba.myHandle, rcba.request, scheduleStatus});
+ return;
+ }
+}
+
+void RemoteCommandRetryScheduler::_onComplete(
+ const executor::TaskExecutor::RemoteCommandCallbackArgs& rcba) {
+ _callback(rcba);
+
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ invariant(_active);
+ _active = false;
+ _condition.notify_all();
+}
+
+} // namespace mongo
diff --git a/src/mongo/client/remote_command_retry_scheduler.h b/src/mongo/client/remote_command_retry_scheduler.h
new file mode 100644
index 00000000000..e8bba705edf
--- /dev/null
+++ b/src/mongo/client/remote_command_retry_scheduler.h
@@ -0,0 +1,201 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <cstdlib>
+#include <initializer_list>
+#include <memory>
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/base/error_codes.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+
+/**
+ * Schedules a remote command request. On receiving a response from task executor (or remote
+ * server), decides if the response should be forwarded to the "_callback" provided in the
+ * constructor based on the retry policy.
+ *
+ * If the command is successful or has been canceled (either by calling cancel() or canceled by
+ * the task executor on shutdown), the response is forwarded immediately to "_callback" and the
+ * scheduler becomes inactive.
+ *
+ * Otherwise, the retry policy (specified at construction) is used to decide if we should
+ * resubmit the remote command request. The retry policy is defined by:
+ * - maximum number of times to run the remote command;
+ * - maximum elapsed time of all failed remote command responses (requires SERVER-24067);
+ * - list of error codes, if present in the response, should stop the scheduler.
+ */
+class RemoteCommandRetryScheduler {
+ MONGO_DISALLOW_COPYING(RemoteCommandRetryScheduler);
+
+public:
+ class RetryPolicy;
+
+ /**
+ * List of not master error codes.
+ */
+ static const std::initializer_list<ErrorCodes::Error> kNotMasterErrors;
+
+ /**
+ * List of retriable error codes.
+ */
+ static const std::initializer_list<ErrorCodes::Error> kAllRetriableErrors;
+
+ /**
+ * Generates a retry policy that will send the remote command request to the source at most
+ * once.
+ */
+ static std::unique_ptr<RetryPolicy> makeNoRetryPolicy();
+
+ /**
+ * Creates a retry policy that will send the remote command request at most "maxAttempts".
+ * This policy will also direct the scheduler to stop retrying if it encounters any of the
+ * errors in "nonRetryableErrors".
+ * (Requires SERVER-24067) The scheduler will also stop retrying if the total elapsed time
+ * of all failed requests exceeds "maxResponseElapsedTotal".
+ */
+ static std::unique_ptr<RetryPolicy> makeRetryPolicy(
+ std::size_t maxAttempts,
+ Milliseconds maxResponseElapsedTotal,
+ const std::initializer_list<ErrorCodes::Error>& retryableErrors);
+
+ /**
+ * Creates scheduler but does not schedule any remote command request.
+ */
+ RemoteCommandRetryScheduler(executor::TaskExecutor* executor,
+ const executor::RemoteCommandRequest& request,
+ const executor::TaskExecutor::RemoteCommandCallbackFn& callback,
+ std::unique_ptr<RetryPolicy> retryPolicy);
+
+ virtual ~RemoteCommandRetryScheduler();
+
+ /**
+ * Returns true if we have scheduled a remote command and are waiting for the response.
+ */
+ bool isActive() const;
+
+ /**
+ * Schedules remote command request.
+ */
+ Status startup();
+
+ /**
+ * Cancels scheduled remote command requests.
+ * Returns immediately if the scheduler is not active.
+ * It is fine to call this multiple times.
+ */
+ void shutdown();
+
+ /**
+ * Waits until the scheduler is inactive.
+ * It is fine to call this multiple times.
+ */
+ void join();
+
+private:
+ /**
+ * Schedules remote command to be run by the executor.
+ * "requestCount" is number of requests scheduled before calling this function.
+ * When this function is called for the first time by startup(), "requestCount" will be 0.
+ * The executor will invoke _remoteCommandCallback() with the remote command response and
+ * ("requestCount" + 1).
+ * By passing "requestCount" between tasks, we avoid having to synchronize access to this count
+ * if it were a field.
+ */
+ Status _schedule_inlock(std::size_t requestCount);
+
+ /**
+ * Callback for remote command.
+ * "requestCount" is number of requests scheduled prior to this response.
+ */
+ void _remoteCommandCallback(const executor::TaskExecutor::RemoteCommandCallbackArgs& rcba,
+ std::size_t requestCount);
+
+ /**
+ * Notifies caller that the scheduler has completed processing responses.
+ */
+ void _onComplete(const executor::TaskExecutor::RemoteCommandCallbackArgs& rcba);
+
+ // Not owned by us.
+ executor::TaskExecutor* _executor;
+
+ const executor::RemoteCommandRequest _request;
+ const executor::TaskExecutor::RemoteCommandCallbackFn _callback;
+ std::unique_ptr<RetryPolicy> _retryPolicy;
+
+ // Protects member data of this scheduler declared after mutex.
+ mutable stdx::mutex _mutex;
+
+ mutable stdx::condition_variable _condition;
+
+ // _active is true when remote command is scheduled to be run by the executor.
+ bool _active = false;
+
+ // Callback handle to the scheduled remote command.
+ executor::TaskExecutor::CallbackHandle _remoteCommandCallbackHandle;
+};
+
+/**
+ * Policy used by RemoteCommandRetryScheduler to determine if it is necessary to schedule another
+ * remote command request.
+ */
+class RemoteCommandRetryScheduler::RetryPolicy {
+public:
+ virtual ~RetryPolicy() = default;
+
+ /**
+ * Retry scheduler should not send remote command request more than this limit.
+ */
+ virtual std::size_t getMaximumAttempts() const = 0;
+
+ /**
+ * Retry scheduler should not re-send remote command request if total response elapsed times of
+ * prior responses exceed this limit.
+ * Assumes that re-sending the command will not exceed the limit returned by
+ * "getMaximumAttempts()".
+ * Returns executor::RemoteCommandRequest::kNoTimeout if this limit should be ignored.
+ */
+ virtual Milliseconds getMaximumResponseElapsedTotal() const = 0;
+
+ /**
+ * Checks the error code in the most recent remote command response and returns true if
+ * scheduler should retry the remote command request.
+ * Assumes that re-sending the command will not exceed the limit returned by
+ * "getMaximumAttempts()" and total response elapsed time has not been exceeded (see
+ * "getMaximumResponseElapsedTotal()").
+ */
+ virtual bool shouldRetryOnError(ErrorCodes::Error error) const = 0;
+};
+
+} // namespace mongo
diff --git a/src/mongo/client/remote_command_retry_scheduler_test.cpp b/src/mongo/client/remote_command_retry_scheduler_test.cpp
new file mode 100644
index 00000000000..3e3b3d3c513
--- /dev/null
+++ b/src/mongo/client/remote_command_retry_scheduler_test.cpp
@@ -0,0 +1,450 @@
+/**
+ * Copyright 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/base/status_with.h"
+#include "mongo/client/remote_command_retry_scheduler.h"
+#include "mongo/db/jsobj.h"
+#include "mongo/executor/remote_command_response.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/unittest/task_executor_proxy.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/net/hostandport.h"
+
+namespace {
+
+using namespace mongo;
+
+class CallbackResponseSaver;
+
+class RemoteCommandRetrySchedulerTest : public executor::ThreadPoolExecutorTest {
+public:
+ void start(RemoteCommandRetryScheduler* scheduler);
+ void checkCompletionStatus(RemoteCommandRetryScheduler* scheduler,
+ const CallbackResponseSaver& callbackResponseSaver,
+ const executor::TaskExecutor::ResponseStatus& response);
+ void processNetworkResponse(const executor::TaskExecutor::ResponseStatus& response);
+ void runReadyNetworkOperations();
+
+protected:
+ void setUp() override;
+ void tearDown() override;
+};
+
+class CallbackResponseSaver {
+ MONGO_DISALLOW_COPYING(CallbackResponseSaver);
+
+public:
+ CallbackResponseSaver();
+
+ /**
+ * Use this for scheduler callback.
+ */
+ void operator()(const executor::TaskExecutor::RemoteCommandCallbackArgs& rcba);
+
+ std::vector<StatusWith<executor::RemoteCommandResponse>> getResponses() const;
+
+private:
+ std::vector<StatusWith<executor::RemoteCommandResponse>> _responses;
+};
+
+/**
+ * Task executor proxy with fail point for scheduleRemoteCommand().
+ */
+class TaskExecutorWithFailureInScheduleRemoteCommand : public unittest::TaskExecutorProxy {
+public:
+ TaskExecutorWithFailureInScheduleRemoteCommand(executor::TaskExecutor* executor)
+ : unittest::TaskExecutorProxy(executor) {}
+ virtual StatusWith<executor::TaskExecutor::CallbackHandle> scheduleRemoteCommand(
+ const executor::RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb) override {
+ if (scheduleRemoteCommandFailPoint) {
+ return Status(ErrorCodes::ShutdownInProgress,
+ "failed to send remote command - shutdown in progress");
+ }
+ return getExecutor()->scheduleRemoteCommand(request, cb);
+ }
+
+ bool scheduleRemoteCommandFailPoint = false;
+};
+
+void RemoteCommandRetrySchedulerTest::start(RemoteCommandRetryScheduler* scheduler) {
+ ASSERT_FALSE(scheduler->isActive());
+
+ ASSERT_OK(scheduler->startup());
+ ASSERT_TRUE(scheduler->isActive());
+
+ // Starting an already active scheduler should fail.
+ ASSERT_EQUALS(ErrorCodes::IllegalOperation, scheduler->startup());
+ ASSERT_TRUE(scheduler->isActive());
+
+ auto net = getNet();
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+ ASSERT_TRUE(net->hasReadyRequests());
+}
+
+void RemoteCommandRetrySchedulerTest::checkCompletionStatus(
+ RemoteCommandRetryScheduler* scheduler,
+ const CallbackResponseSaver& callbackResponseSaver,
+ const executor::TaskExecutor::ResponseStatus& response) {
+ ASSERT_FALSE(scheduler->isActive());
+ auto responses = callbackResponseSaver.getResponses();
+ ASSERT_EQUALS(1U, responses.size());
+ if (response.isOK()) {
+ ASSERT_OK(responses.front().getStatus());
+ ASSERT_EQUALS(response.getValue(), responses.front().getValue());
+ } else {
+ ASSERT_EQUALS(response.getStatus(), responses.front().getStatus());
+ }
+}
+
+void RemoteCommandRetrySchedulerTest::processNetworkResponse(
+ const executor::TaskExecutor::ResponseStatus& response) {
+ auto net = getNet();
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+ ASSERT_TRUE(net->hasReadyRequests());
+ auto noi = net->getNextReadyRequest();
+ net->scheduleResponse(noi, net->now(), response);
+ net->runReadyNetworkOperations();
+}
+
+void RemoteCommandRetrySchedulerTest::runReadyNetworkOperations() {
+ auto net = getNet();
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+ net->runReadyNetworkOperations();
+}
+
+void RemoteCommandRetrySchedulerTest::setUp() {
+ executor::ThreadPoolExecutorTest::setUp();
+ launchExecutorThread();
+}
+
+void RemoteCommandRetrySchedulerTest::tearDown() {
+ executor::ThreadPoolExecutorTest::tearDown();
+}
+
+CallbackResponseSaver::CallbackResponseSaver() = default;
+
+void CallbackResponseSaver::operator()(
+ const executor::TaskExecutor::RemoteCommandCallbackArgs& rcba) {
+ _responses.push_back(rcba.response);
+}
+
+std::vector<StatusWith<executor::RemoteCommandResponse>> CallbackResponseSaver::getResponses()
+ const {
+ return _responses;
+}
+
+const executor::RemoteCommandRequest request(HostAndPort("h1:12345"), "db1", BSON("ping" << 1));
+
+TEST_F(RemoteCommandRetrySchedulerTest, MakeSingleShotRetryPolicy) {
+ auto policy = RemoteCommandRetryScheduler::makeNoRetryPolicy();
+ ASSERT_TRUE(policy);
+ ASSERT_EQUALS(1U, policy->getMaximumAttempts());
+ ASSERT_EQUALS(executor::RemoteCommandRequest::kNoTimeout,
+ policy->getMaximumResponseElapsedTotal());
+ // Doesn't matter what "shouldRetryOnError()" returns since we won't be retrying the remote
+ // command.
+ for (int i = 0; i < int(ErrorCodes::MaxError); ++i) {
+ auto error = ErrorCodes::fromInt(i);
+ ASSERT_FALSE(policy->shouldRetryOnError(error));
+ }
+}
+
+TEST_F(RemoteCommandRetrySchedulerTest, MakeRetryPolicy) {
+ auto policy = RemoteCommandRetryScheduler::makeRetryPolicy(
+ 5U,
+ Milliseconds(100),
+ {ErrorCodes::FailedToParse, ErrorCodes::InvalidNamespace, ErrorCodes::InternalError});
+ ASSERT_EQUALS(5U, policy->getMaximumAttempts());
+ ASSERT_EQUALS(Milliseconds(100), policy->getMaximumResponseElapsedTotal());
+ for (int i = 0; i < int(ErrorCodes::MaxError); ++i) {
+ auto error = ErrorCodes::fromInt(i);
+ if (error == ErrorCodes::InternalError || error == ErrorCodes::FailedToParse ||
+ error == ErrorCodes::InvalidNamespace) {
+ ASSERT_TRUE(policy->shouldRetryOnError(error));
+ continue;
+ }
+ ASSERT_FALSE(policy->shouldRetryOnError(error));
+ }
+}
+
+TEST_F(RemoteCommandRetrySchedulerTest, InvalidConstruction) {
+ auto callback = [](const executor::TaskExecutor::RemoteCommandCallbackArgs&) {};
+ auto makeRetryPolicy = [] { return RemoteCommandRetryScheduler::makeNoRetryPolicy(); };
+
+ // Null executor.
+ ASSERT_THROWS_CODE_AND_WHAT(
+ RemoteCommandRetryScheduler(nullptr, request, callback, makeRetryPolicy()),
+ UserException,
+ ErrorCodes::BadValue,
+ "task executor cannot be null");
+
+ // Empty source in remote command request.
+ ASSERT_THROWS_CODE_AND_WHAT(
+ RemoteCommandRetryScheduler(
+ &getExecutor(),
+ executor::RemoteCommandRequest(HostAndPort(), request.dbname, request.cmdObj),
+ callback,
+ makeRetryPolicy()),
+ UserException,
+ ErrorCodes::BadValue,
+ "source in remote command request cannot be empty");
+
+ // Empty source in remote command request.
+ ASSERT_THROWS_CODE_AND_WHAT(RemoteCommandRetryScheduler(&getExecutor(),
+ executor::RemoteCommandRequest(
+ request.target, "", request.cmdObj),
+ callback,
+ makeRetryPolicy()),
+ UserException,
+ ErrorCodes::BadValue,
+ "database name in remote command request cannot be empty");
+
+ // Empty command object in remote command request.
+ ASSERT_THROWS_CODE_AND_WHAT(
+ RemoteCommandRetryScheduler(
+ &getExecutor(),
+ executor::RemoteCommandRequest(request.target, request.dbname, BSONObj()),
+ callback,
+ makeRetryPolicy()),
+ UserException,
+ ErrorCodes::BadValue,
+ "command object in remote command request cannot be empty");
+
+ // Null remote command callback function.
+ ASSERT_THROWS_CODE_AND_WHAT(
+ RemoteCommandRetryScheduler(&getExecutor(),
+ request,
+ executor::TaskExecutor::RemoteCommandCallbackFn(),
+ makeRetryPolicy()),
+ UserException,
+ ErrorCodes::BadValue,
+ "remote command callback function cannot be null");
+
+ // Null retry policy.
+ ASSERT_THROWS_CODE_AND_WHAT(
+ RemoteCommandRetryScheduler(&getExecutor(),
+ request,
+ callback,
+ std::unique_ptr<RemoteCommandRetryScheduler::RetryPolicy>()),
+ UserException,
+ ErrorCodes::BadValue,
+ "retry policy cannot be null");
+
+ // Policy max attempts should be positive.
+ ASSERT_THROWS_CODE_AND_WHAT(
+ RemoteCommandRetryScheduler(
+ &getExecutor(),
+ request,
+ callback,
+ RemoteCommandRetryScheduler::makeRetryPolicy(0, Milliseconds(100), {})),
+ UserException,
+ ErrorCodes::BadValue,
+ "policy max attempts cannot be zero");
+
+ // Policy max response elapsed total cannot be negative.
+ ASSERT_THROWS_CODE_AND_WHAT(
+ RemoteCommandRetryScheduler(
+ &getExecutor(),
+ request,
+ callback,
+ RemoteCommandRetryScheduler::makeRetryPolicy(1U, Milliseconds(-100), {})),
+ UserException,
+ ErrorCodes::BadValue,
+ "policy max response elapsed total cannot be negative");
+}
+
+TEST_F(RemoteCommandRetrySchedulerTest, StartupFailsWhenExecutorIsShutDown) {
+ auto callback = [](const executor::TaskExecutor::RemoteCommandCallbackArgs&) {};
+ auto policy = RemoteCommandRetryScheduler::makeNoRetryPolicy();
+
+ RemoteCommandRetryScheduler scheduler(&getExecutor(), request, callback, std::move(policy));
+ ASSERT_FALSE(scheduler.isActive());
+
+ getExecutor().shutdown();
+
+ ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, scheduler.startup());
+ ASSERT_FALSE(scheduler.isActive());
+}
+
+TEST_F(RemoteCommandRetrySchedulerTest,
+ ShuttingDownExecutorAfterSchedulerStartupInvokesCallbackWithCallbackCanceledError) {
+ CallbackResponseSaver callback;
+ auto policy = RemoteCommandRetryScheduler::makeRetryPolicy(
+ 10U, Milliseconds(1), {ErrorCodes::HostNotFound});
+ RemoteCommandRetryScheduler scheduler(
+ &getExecutor(), request, stdx::ref(callback), std::move(policy));
+ start(&scheduler);
+
+ auto net = getNet();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+ ASSERT_EQUALS(request, net->getNextReadyRequest()->getRequest());
+ }
+
+ getExecutor().shutdown();
+
+ runReadyNetworkOperations();
+ checkCompletionStatus(
+ &scheduler, callback, {ErrorCodes::CallbackCanceled, "executor shutdown"});
+}
+
+
+TEST_F(RemoteCommandRetrySchedulerTest,
+ ShuttingDownSchedulerAfterSchedulerStartupInvokesCallbackWithCallbackCanceledError) {
+ CallbackResponseSaver callback;
+ auto policy = RemoteCommandRetryScheduler::makeRetryPolicy(
+ 10U, Milliseconds(1), {ErrorCodes::HostNotFound});
+ RemoteCommandRetryScheduler scheduler(
+ &getExecutor(), request, stdx::ref(callback), std::move(policy));
+ start(&scheduler);
+
+ scheduler.shutdown();
+
+ runReadyNetworkOperations();
+ checkCompletionStatus(
+ &scheduler, callback, {ErrorCodes::CallbackCanceled, "scheduler shutdown"});
+}
+
+TEST_F(RemoteCommandRetrySchedulerTest, SchedulerInvokesCallbackOnNonRetryableErrorInResponse) {
+ CallbackResponseSaver callback;
+ auto policy = RemoteCommandRetryScheduler::makeRetryPolicy(
+ 10U, Milliseconds(1), RemoteCommandRetryScheduler::kNotMasterErrors);
+ RemoteCommandRetryScheduler scheduler(
+ &getExecutor(), request, stdx::ref(callback), std::move(policy));
+ start(&scheduler);
+
+ // This should match one of the non-retryable error codes in the policy.
+ Status response(ErrorCodes::OperationFailed, "injected error");
+
+ processNetworkResponse(response);
+ checkCompletionStatus(&scheduler, callback, response);
+}
+
+TEST_F(RemoteCommandRetrySchedulerTest, SchedulerInvokesCallbackOnFirstSuccessfulResponse) {
+ CallbackResponseSaver callback;
+ auto policy = RemoteCommandRetryScheduler::makeRetryPolicy(
+ 10U, Milliseconds(1), {ErrorCodes::HostNotFound});
+ RemoteCommandRetryScheduler scheduler(
+ &getExecutor(), request, stdx::ref(callback), std::move(policy));
+ start(&scheduler);
+
+ // Elapsed time in response is ignored on successful responses.
+ executor::RemoteCommandResponse response(
+ BSON("ok" << 1 << "x" << 123), BSON("z" << 456), Milliseconds(100));
+
+ processNetworkResponse(response);
+ checkCompletionStatus(&scheduler, callback, response);
+}
+
+TEST_F(RemoteCommandRetrySchedulerTest, SchedulerIgnoresEmbeddedErrorInSuccessfulResponse) {
+ CallbackResponseSaver callback;
+ auto policy = RemoteCommandRetryScheduler::makeRetryPolicy(
+ 10U, Milliseconds(1), {ErrorCodes::HostNotFound});
+ RemoteCommandRetryScheduler scheduler(
+ &getExecutor(), request, stdx::ref(callback), std::move(policy));
+ start(&scheduler);
+
+ // Scheduler does not parse document in a successful response for embedded errors.
+ // This is the case with some commands (e.g. find) which do not always return errors using the
+ // wire protocol.
+ executor::RemoteCommandResponse response(BSON("ok" << 0 << "code"
+ << int(ErrorCodes::FailedToParse) << "errmsg"
+ << "injected error"),
+ BSON("z" << 456),
+ Milliseconds(100));
+
+ processNetworkResponse(response);
+ checkCompletionStatus(&scheduler, callback, response);
+}
+
+TEST_F(RemoteCommandRetrySchedulerTest,
+ SchedulerInvokesCallbackWithErrorFromExecutorIfScheduleRemoteCommandFailsOnRetry) {
+ CallbackResponseSaver callback;
+ auto policy = RemoteCommandRetryScheduler::makeRetryPolicy(
+ 3U, executor::RemoteCommandRequest::kNoTimeout, {ErrorCodes::HostNotFound});
+ TaskExecutorWithFailureInScheduleRemoteCommand badExecutor(&getExecutor());
+ RemoteCommandRetryScheduler scheduler(
+ &badExecutor, request, stdx::ref(callback), std::move(policy));
+ start(&scheduler);
+
+ processNetworkResponse({ErrorCodes::HostNotFound, "first"});
+
+ // scheduleRemoteCommand() will fail with ErrorCodes::ShutdownInProgress when trying to send
+ // third remote command request after processing second failed response.
+ badExecutor.scheduleRemoteCommandFailPoint = true;
+ processNetworkResponse({ErrorCodes::HostNotFound, "second"});
+
+ checkCompletionStatus(&scheduler, callback, {ErrorCodes::ShutdownInProgress, ""});
+}
+
+TEST_F(RemoteCommandRetrySchedulerTest,
+ SchedulerEnforcesPolicyMaximumAttemptsAndReturnsErrorOfLastFailedRequest) {
+ CallbackResponseSaver callback;
+ auto policy = RemoteCommandRetryScheduler::makeRetryPolicy(
+ 3U,
+ executor::RemoteCommandRequest::kNoTimeout,
+ RemoteCommandRetryScheduler::kAllRetriableErrors);
+ RemoteCommandRetryScheduler scheduler(
+ &getExecutor(), request, stdx::ref(callback), std::move(policy));
+ start(&scheduler);
+
+ processNetworkResponse({ErrorCodes::HostNotFound, "first"});
+ processNetworkResponse({ErrorCodes::HostUnreachable, "second"});
+
+ Status response(ErrorCodes::NetworkTimeout, "last");
+ processNetworkResponse(response);
+ checkCompletionStatus(&scheduler, callback, response);
+}
+
+TEST_F(RemoteCommandRetrySchedulerTest, SchedulerShouldRetryUntilSuccessfulResponseIsReceived) {
+ CallbackResponseSaver callback;
+ auto policy = RemoteCommandRetryScheduler::makeRetryPolicy(
+ 3U, executor::RemoteCommandRequest::kNoTimeout, {ErrorCodes::HostNotFound});
+ RemoteCommandRetryScheduler scheduler(
+ &getExecutor(), request, stdx::ref(callback), std::move(policy));
+ start(&scheduler);
+
+ processNetworkResponse({ErrorCodes::HostNotFound, "first"});
+
+ executor::RemoteCommandResponse response(
+ BSON("ok" << 1 << "x" << 123), BSON("z" << 456), Milliseconds(100));
+ processNetworkResponse(response);
+ checkCompletionStatus(&scheduler, callback, response);
+}
+
+} // namespace
diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp
index 054693178d2..1c822c9c48b 100644
--- a/src/mongo/executor/network_interface_mock.cpp
+++ b/src/mongo/executor/network_interface_mock.cpp
@@ -559,5 +559,13 @@ void NetworkInterfaceMock::NetworkOperation::finishResponse() {
_onFinish = RemoteCommandCompletionFn();
}
+NetworkInterfaceMock::InNetworkGuard::InNetworkGuard(NetworkInterfaceMock* net) : _net(net) {
+ _net->enterNetwork();
+}
+
+NetworkInterfaceMock::InNetworkGuard::~InNetworkGuard() {
+ _net->exitNetwork();
+}
+
} // namespace executor
} // namespace mongo
diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h
index 3d69bad78a5..ac516ba2f5a 100644
--- a/src/mongo/executor/network_interface_mock.h
+++ b/src/mongo/executor/network_interface_mock.h
@@ -35,6 +35,7 @@
#include <utility>
#include <vector>
+#include "mongo/base/disallow_copying.h"
#include "mongo/executor/network_interface.h"
#include "mongo/rpc/metadata/metadata_hook.h"
#include "mongo/stdx/condition_variable.h"
@@ -118,6 +119,11 @@ public:
////////////////////////////////////////////////////////////////////////////////
/**
+ * RAII-style class for entering and exiting network.
+ */
+ class InNetworkGuard;
+
+ /**
* Causes the currently running (non-executor) thread to assume the mantle of the network
* simulation thread.
*
@@ -421,5 +427,16 @@ private:
RemoteCommandCompletionFn _onFinish;
};
+class NetworkInterfaceMock::InNetworkGuard {
+ MONGO_DISALLOW_COPYING(InNetworkGuard);
+
+public:
+ explicit InNetworkGuard(NetworkInterfaceMock* net);
+ ~InNetworkGuard();
+
+private:
+ NetworkInterfaceMock* _net;
+};
+
} // namespace executor
} // namespace mongo
diff --git a/src/mongo/executor/remote_command_request.cpp b/src/mongo/executor/remote_command_request.cpp
index d1c10cdd1f9..1f6486afd70 100644
--- a/src/mongo/executor/remote_command_request.cpp
+++ b/src/mongo/executor/remote_command_request.cpp
@@ -30,6 +30,8 @@
#include "mongo/executor/remote_command_request.h"
+#include <ostream>
+
#include "mongo/platform/atomic_word.h"
#include "mongo/util/mongoutils/str.h"
@@ -85,5 +87,22 @@ std::string RemoteCommandRequest::toString() const {
return out;
}
+bool RemoteCommandRequest::operator==(const RemoteCommandRequest& rhs) const {
+ if (this == &rhs) {
+ return true;
+ }
+ return target == rhs.target && dbname == rhs.dbname && cmdObj == rhs.cmdObj &&
+ metadata == rhs.metadata && timeout == rhs.timeout;
+}
+
+bool RemoteCommandRequest::operator!=(const RemoteCommandRequest& rhs) const {
+ return !(*this == rhs);
+}
+
} // namespace executor
+
+std::ostream& operator<<(std::ostream& os, const executor::RemoteCommandRequest& request) {
+ return os << request.toString();
+}
+
} // namespace mongo
diff --git a/src/mongo/executor/remote_command_request.h b/src/mongo/executor/remote_command_request.h
index 7e5b96618af..975f7498a71 100644
--- a/src/mongo/executor/remote_command_request.h
+++ b/src/mongo/executor/remote_command_request.h
@@ -28,6 +28,7 @@
#pragma once
+#include <iosfwd>
#include <string>
#include "mongo/db/jsobj.h"
@@ -85,6 +86,9 @@ struct RemoteCommandRequest {
std::string toString() const;
+ bool operator==(const RemoteCommandRequest& rhs) const;
+ bool operator!=(const RemoteCommandRequest& rhs) const;
+
// Internal id of this request. Not interpereted and used for tracing purposes only.
RequestId id;
@@ -99,4 +103,7 @@ struct RemoteCommandRequest {
};
} // namespace executor
+
+std::ostream& operator<<(std::ostream& os, const executor::RemoteCommandRequest& response);
+
} // namespace mongo
diff --git a/src/mongo/executor/remote_command_response.cpp b/src/mongo/executor/remote_command_response.cpp
index a0b1baeeb79..ab67febd598 100644
--- a/src/mongo/executor/remote_command_response.cpp
+++ b/src/mongo/executor/remote_command_response.cpp
@@ -48,5 +48,21 @@ std::string RemoteCommandResponse::toString() const {
<< " cmd:" << data.toString();
}
+bool RemoteCommandResponse::operator==(const RemoteCommandResponse& rhs) const {
+ if (this == &rhs) {
+ return true;
+ }
+ return data == rhs.data && metadata == rhs.metadata && elapsedMillis == rhs.elapsedMillis;
+}
+
+bool RemoteCommandResponse::operator!=(const RemoteCommandResponse& rhs) const {
+ return !(*this == rhs);
+}
+
} // namespace executor
+
+std::ostream& operator<<(std::ostream& os, const executor::RemoteCommandResponse& response) {
+ return os << response.toString();
+}
+
} // namespace mongo
diff --git a/src/mongo/executor/remote_command_response.h b/src/mongo/executor/remote_command_response.h
index d72dcf28329..b6cedb8f976 100644
--- a/src/mongo/executor/remote_command_response.h
+++ b/src/mongo/executor/remote_command_response.h
@@ -28,6 +28,7 @@
#pragma once
+#include <iosfwd>
#include <string>
#include <memory>
@@ -66,6 +67,9 @@ struct RemoteCommandResponse {
std::string toString() const;
+ bool operator==(const RemoteCommandResponse& rhs) const;
+ bool operator!=(const RemoteCommandResponse& rhs) const;
+
std::shared_ptr<const Message> message; // May be null.
BSONObj data; // Either owned or points into message.
BSONObj metadata; // Either owned or points into message.
@@ -73,4 +77,7 @@ struct RemoteCommandResponse {
};
} // namespace executor
+
+std::ostream& operator<<(std::ostream& os, const executor::RemoteCommandResponse& request);
+
} // namespace mongo
diff --git a/src/mongo/executor/task_executor_test_common.cpp b/src/mongo/executor/task_executor_test_common.cpp
index f427bdd40b4..d6c479a1a32 100644
--- a/src/mongo/executor/task_executor_test_common.cpp
+++ b/src/mongo/executor/task_executor_test_common.cpp
@@ -106,14 +106,6 @@ public:
}); \
void CET_##TEST_NAME::_doTest()
-bool operator==(const RemoteCommandRequest lhs, const RemoteCommandRequest rhs) {
- return lhs.target == rhs.target && lhs.dbname == rhs.dbname && lhs.cmdObj == rhs.cmdObj;
-}
-
-bool operator!=(const RemoteCommandRequest lhs, const RemoteCommandRequest rhs) {
- return !(lhs == rhs);
-}
-
void setStatus(const TaskExecutor::CallbackArgs& cbData, Status* target) {
*target = cbData.status;
}
diff --git a/src/mongo/s/client/SConscript b/src/mongo/s/client/SConscript
index 3ea184d27b6..62ea2fa0fb7 100644
--- a/src/mongo/s/client/SConscript
+++ b/src/mongo/s/client/SConscript
@@ -50,10 +50,11 @@ env.CppUnitTest(
LIBDEPS=[
'sharding_client',
'sharding_connection_hook',
- '$BUILD_DIR/mongo/s/coreshard',
+ '$BUILD_DIR/mongo/client/remote_command_retry_scheduler',
'$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
'$BUILD_DIR/mongo/db/service_context_noop_init',
'$BUILD_DIR/mongo/dbtests/mocklib',
+ '$BUILD_DIR/mongo/s/coreshard',
'$BUILD_DIR/mongo/s/mongoscore',
'$BUILD_DIR/mongo/util/net/network',
]
diff --git a/src/mongo/s/client/shard_remote.cpp b/src/mongo/s/client/shard_remote.cpp
index 5024c50451b..8f7322ef307 100644
--- a/src/mongo/s/client/shard_remote.cpp
+++ b/src/mongo/s/client/shard_remote.cpp
@@ -32,10 +32,12 @@
#include "mongo/s/client/shard_remote.h"
+#include <algorithm>
#include <string>
#include "mongo/client/fetcher.h"
#include "mongo/client/read_preference.h"
+#include "mongo/client/remote_command_retry_scheduler.h"
#include "mongo/client/remote_command_targeter.h"
#include "mongo/client/replica_set_monitor.h"
#include "mongo/db/jsobj.h"
@@ -43,7 +45,6 @@
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/query/lite_parsed_query.h"
#include "mongo/executor/task_executor_pool.h"
-#include "mongo/platform/unordered_set.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/rpc/metadata/server_selection_metadata.h"
@@ -85,31 +86,6 @@ const BSONObj kReplSecondaryOkMetadata{[] {
return o.obj();
}()};
-class ErrorCodesHash {
-public:
- size_t operator()(ErrorCodes::Error e) const {
- return std::hash<typename std::underlying_type<ErrorCodes::Error>::type>()(e);
- }
-};
-
-using ErrorCodesSet = unordered_set<ErrorCodes::Error, ErrorCodesHash>;
-
-const ErrorCodesSet kNotMasterErrors{
- ErrorCodes::NotMaster, ErrorCodes::NotMasterNoSlaveOk, ErrorCodes::NotMasterOrSecondary};
-
-const ErrorCodesSet kAllRetriableErrors{
- ErrorCodes::NotMaster,
- ErrorCodes::NotMasterNoSlaveOk,
- ErrorCodes::NotMasterOrSecondary,
- // If write concern failed to be satisfied on the remote server, this most probably means that
- // some of the secondary nodes were unreachable or otherwise unresponsive, so the call is safe
- // to be retried if idempotency can be guaranteed.
- ErrorCodes::WriteConcernFailed,
- ErrorCodes::HostUnreachable,
- ErrorCodes::HostNotFound,
- ErrorCodes::NetworkTimeout,
- ErrorCodes::InterruptedDueToReplStateChange};
-
/**
* Returns a new BSONObj describing the same command and arguments as 'cmdObj', but with a maxTimeMS
* set on it that is the minimum of the maxTimeMS in 'cmdObj' (if present), 'maxTimeMicros', and
@@ -172,12 +148,10 @@ bool ShardRemote::isRetriableError(ErrorCodes::Error code, RetryPolicy options)
return false;
}
- if (options == RetryPolicy::kIdempotent) {
- return kAllRetriableErrors.count(code);
- } else {
- invariant(options == RetryPolicy::kNotIdempotent);
- return kNotMasterErrors.count(code);
- }
+ const auto& retriableErrors = options == RetryPolicy::kIdempotent
+ ? RemoteCommandRetryScheduler::kAllRetriableErrors
+ : RemoteCommandRetryScheduler::kNotMasterErrors;
+ return std::find(retriableErrors.begin(), retriableErrors.end(), code) != retriableErrors.end();
}
const ConnectionString ShardRemote::getConnString() const {