diff options
author | Benety Goh <benety@mongodb.com> | 2016-05-03 12:03:06 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2016-05-16 13:51:28 -0400 |
commit | 643439ff4fc91c49ffc2b103ba3858de1a165024 (patch) | |
tree | 6cc69534128bf845c5d58e200016c96ac758d3f0 /src | |
parent | 6a0904bd38a2deb2de127865943d57ca7cfa6927 (diff) | |
download | mongo-643439ff4fc91c49ffc2b103ba3858de1a165024.tar.gz |
SERVER-23134 added retry support for scheduling remote commands on a task executor
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/client/SConscript | 22 | ||||
-rw-r--r-- | src/mongo/client/fetcher.cpp | 52 | ||||
-rw-r--r-- | src/mongo/client/fetcher.h | 24 | ||||
-rw-r--r-- | src/mongo/client/fetcher_test.cpp | 511 | ||||
-rw-r--r-- | src/mongo/client/remote_command_retry_scheduler.cpp | 238 | ||||
-rw-r--r-- | src/mongo/client/remote_command_retry_scheduler.h | 201 | ||||
-rw-r--r-- | src/mongo/client/remote_command_retry_scheduler_test.cpp | 450 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_mock.cpp | 8 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_mock.h | 17 | ||||
-rw-r--r-- | src/mongo/executor/remote_command_request.cpp | 19 | ||||
-rw-r--r-- | src/mongo/executor/remote_command_request.h | 7 | ||||
-rw-r--r-- | src/mongo/executor/remote_command_response.cpp | 16 | ||||
-rw-r--r-- | src/mongo/executor/remote_command_response.h | 7 | ||||
-rw-r--r-- | src/mongo/executor/task_executor_test_common.cpp | 8 | ||||
-rw-r--r-- | src/mongo/s/client/SConscript | 3 | ||||
-rw-r--r-- | src/mongo/s/client/shard_remote.cpp | 38 |
16 files changed, 1343 insertions, 278 deletions
diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript index 10c78120d79..3145929e5f9 100644 --- a/src/mongo/client/SConscript +++ b/src/mongo/client/SConscript @@ -252,6 +252,7 @@ env.Library( 'fetcher.cpp', ], LIBDEPS=[ + 'remote_command_retry_scheduler', '$BUILD_DIR/mongo/executor/task_executor_interface', '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/namespace_string', @@ -270,3 +271,24 @@ env.CppUnitTest( '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture', ], ) + +env.Library( + target='remote_command_retry_scheduler', + source=[ + 'remote_command_retry_scheduler.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/executor/task_executor_interface', + '$BUILD_DIR/mongo/base', + ], +) + +env.CppUnitTest( + target='remote_command_retry_scheduler_test', + source='remote_command_retry_scheduler_test.cpp', + LIBDEPS=[ + 'remote_command_retry_scheduler', + '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture', + '$BUILD_DIR/mongo/unittest/task_executor_proxy', + ], +) diff --git a/src/mongo/client/fetcher.cpp b/src/mongo/client/fetcher.cpp index 843cf02a550..acc446298b5 100644 --- a/src/mongo/client/fetcher.cpp +++ b/src/mongo/client/fetcher.cpp @@ -144,20 +144,20 @@ Fetcher::Fetcher(executor::TaskExecutor* executor, const BSONObj& findCmdObj, const CallbackFn& work, const BSONObj& metadata, - Milliseconds timeout) + Milliseconds timeout, + std::unique_ptr<RemoteCommandRetryScheduler::RetryPolicy> firstCommandRetryPolicy) : _executor(executor), _source(source), _dbname(dbname), _cmdObj(findCmdObj.getOwned()), _metadata(metadata.getOwned()), _work(work), - _active(false), - _first(true), - _remoteCommandCallbackHandle(), - _timeout(timeout) { - uassert(ErrorCodes::BadValue, "null task executor", executor); - uassert(ErrorCodes::BadValue, "database name cannot be empty", !dbname.empty()); - uassert(ErrorCodes::BadValue, "command object cannot be empty", !findCmdObj.isEmpty()); + _timeout(timeout), + _firstRemoteCommandScheduler( + _executor, + RemoteCommandRequest(_source, _dbname, _cmdObj, _metadata, _timeout), + stdx::bind(&Fetcher::_callback, this, stdx::placeholders::_1, kFirstBatchFieldName), + std::move(firstCommandRetryPolicy)) { uassert(ErrorCodes::BadValue, "callback function cannot be null", work); } @@ -205,11 +205,18 @@ Status Fetcher::schedule() { if (_active) { return Status(ErrorCodes::IllegalOperation, "fetcher already scheduled"); } - return _schedule_inlock(_cmdObj, kFirstBatchFieldName); + + auto status = _firstRemoteCommandScheduler.startup(); + if (!status.isOK()) { + return status; + } + + _active = true; + return Status::OK(); } void Fetcher::cancel() { - executor::TaskExecutor::CallbackHandle remoteCommandCallbackHandle; + executor::TaskExecutor::CallbackHandle handle; { stdx::lock_guard<stdx::mutex> lk(_mutex); @@ -217,11 +224,16 @@ void Fetcher::cancel() { return; } - remoteCommandCallbackHandle = _remoteCommandCallbackHandle; + _firstRemoteCommandScheduler.shutdown(); + + if (!_getMoreCallbackHandle.isValid()) { + return; + } + + handle = _getMoreCallbackHandle; } - invariant(remoteCommandCallbackHandle.isValid()); - _executor->cancel(remoteCommandCallbackHandle); + _executor->cancel(handle); } void Fetcher::wait() { @@ -229,18 +241,19 @@ void Fetcher::wait() { _condition.wait(lk, [this]() { return !_active; }); } -Status Fetcher::_schedule_inlock(const BSONObj& cmdObj, const char* batchFieldName) { +Status Fetcher::_scheduleGetMore(const BSONObj& cmdObj) { StatusWith<executor::TaskExecutor::CallbackHandle> scheduleResult = _executor->scheduleRemoteCommand( RemoteCommandRequest(_source, _dbname, cmdObj, _metadata, _timeout), - stdx::bind(&Fetcher::_callback, this, stdx::placeholders::_1, batchFieldName)); + stdx::bind(&Fetcher::_callback, this, stdx::placeholders::_1, kNextBatchFieldName)); if (!scheduleResult.isOK()) { return scheduleResult.getStatus(); } - _active = true; - _remoteCommandCallbackHandle = scheduleResult.getValue(); + stdx::lock_guard<stdx::mutex> lk(_mutex); + _getMoreCallbackHandle = scheduleResult.getValue(); + return Status::OK(); } @@ -305,10 +318,7 @@ void Fetcher::_callback(const RemoteCommandCallbackArgs& rcbd, const char* batch return; } - { - stdx::lock_guard<stdx::mutex> lk(_mutex); - status = _schedule_inlock(cmdObj, kNextBatchFieldName); - } + status = _scheduleGetMore(cmdObj); if (!status.isOK()) { nextAction = NextAction::kNoAction; _work(StatusWith<Fetcher::QueryResponse>(status), nullptr, nullptr); diff --git a/src/mongo/client/fetcher.h b/src/mongo/client/fetcher.h index 2050c3687d5..222e5ebf193 100644 --- a/src/mongo/client/fetcher.h +++ b/src/mongo/client/fetcher.h @@ -28,6 +28,7 @@ #pragma once +#include <memory> #include <string> #include <vector> @@ -35,6 +36,7 @@ #include "mongo/base/status.h" #include "mongo/base/status_with.h" #include "mongo/bson/bsonobj.h" +#include "mongo/client/remote_command_retry_scheduler.h" #include "mongo/db/clientcursor.h" #include "mongo/db/namespace_string.h" #include "mongo/executor/task_executor.h" @@ -116,6 +118,9 @@ public: * * The callback function 'work' is not allowed to call into the Fetcher instance. This * behavior is undefined and may result in a deadlock. + * + * An optional retry policy may be provided for the first remote command request so that + * the remote command scheduler will re-send the command in case of transient network errors. */ Fetcher(executor::TaskExecutor* executor, const HostAndPort& source, @@ -123,7 +128,9 @@ public: const BSONObj& cmdObj, const CallbackFn& work, const BSONObj& metadata = rpc::makeEmptyMetadata(), - Milliseconds timeout = RemoteCommandRequest::kNoTimeout); + Milliseconds timeout = RemoteCommandRequest::kNoTimeout, + std::unique_ptr<RemoteCommandRetryScheduler::RetryPolicy> firstCommandRetryPolicy = + RemoteCommandRetryScheduler::makeNoRetryPolicy()); virtual ~Fetcher(); @@ -177,9 +184,9 @@ public: private: /** - * Schedules remote command to be run by the executor + * Schedules getMore command to be run by the executor */ - Status _schedule_inlock(const BSONObj& cmdObj, const char* batchFieldName); + Status _scheduleGetMore(const BSONObj& cmdObj); /** * Callback for remote command. @@ -214,17 +221,20 @@ private: mutable stdx::condition_variable _condition; // _active is true when Fetcher is scheduled to be run by the executor. - bool _active; + bool _active = false; // _first is true for first query response and false for subsequent responses. // Using boolean instead of a counter to avoid issues with wrap around. - bool _first; + bool _first = true; - // Callback handle to the scheduled remote command. - executor::TaskExecutor::CallbackHandle _remoteCommandCallbackHandle; + // Callback handle to the scheduled getMore command. + executor::TaskExecutor::CallbackHandle _getMoreCallbackHandle; // Socket timeout Milliseconds _timeout; + + // First remote command scheduler. + RemoteCommandRetryScheduler _firstRemoteCommandScheduler; }; } // namespace mongo diff --git a/src/mongo/client/fetcher_test.cpp b/src/mongo/client/fetcher_test.cpp index 26e7c156c3a..f9398e22547 100644 --- a/src/mongo/client/fetcher_test.cpp +++ b/src/mongo/client/fetcher_test.cpp @@ -53,20 +53,35 @@ public: FetcherTest(); void clear(); void scheduleNetworkResponse(const BSONObj& obj); - void scheduleNetworkResponse(const BSONObj& obj, Milliseconds millis); + void scheduleNetworkResponse(const BSONObj& obj, Milliseconds elapsed); void scheduleNetworkResponse(ErrorCodes::Error code, const std::string& reason); void scheduleNetworkResponseFor(const BSONObj& filter, const BSONObj& obj); void scheduleNetworkResponseFor(NetworkInterfaceMock::NetworkOperationIterator noi, const BSONObj& obj); - // Calls scheduleNetworkResponse + finishProcessingNetworkResponse - void processNetworkResponse(const BSONObj& obj); - // Calls scheduleNetworkResponse + finishProcessingNetworkResponse - void processNetworkResponse(ErrorCodes::Error code, const std::string& reason); + enum class ReadyQueueState { kEmpty, kHasReadyRequests }; - void finishProcessingNetworkResponse(); + enum class FetcherState { kInactive, kActive }; + + // Calls scheduleNetworkResponse + finishProcessingNetworkResponse + void processNetworkResponse(const BSONObj& obj, + ReadyQueueState readyQueueStateAfterProcessing, + FetcherState fetcherStateAfterProcessing); + void processNetworkResponse(const BSONObj& obj, + Milliseconds elapsed, + ReadyQueueState readyQueueStateAfterProcessing, + FetcherState fetcherStateAfterProcessing); + void processNetworkResponse(ErrorCodes::Error code, + const std::string& reason, + ReadyQueueState readyQueueStateAfterProcessing, + FetcherState fetcherStateAfterProcessing); + + void finishProcessingNetworkResponse(ReadyQueueState readyQueueStateAfterProcessing, + FetcherState fetcherStateAfterProcessing); protected: + Fetcher::CallbackFn makeCallback(); + void setUp() override; void tearDown() override; @@ -90,18 +105,19 @@ private: FetcherTest::FetcherTest() : status(getDetectableErrorStatus()), cursorId(-1), nextAction(Fetcher::NextAction::kInvalid) {} +Fetcher::CallbackFn FetcherTest::makeCallback() { + return stdx::bind(&FetcherTest::_callback, + this, + stdx::placeholders::_1, + stdx::placeholders::_2, + stdx::placeholders::_3); +} + void FetcherTest::setUp() { executor::ThreadPoolExecutorTest::setUp(); clear(); - fetcher = stdx::make_unique<Fetcher>(&getExecutor(), - source, - "db", - findCmdObj, - stdx::bind(&FetcherTest::_callback, - this, - stdx::placeholders::_1, - stdx::placeholders::_2, - stdx::placeholders::_3)); + callbackHook = Fetcher::CallbackFn(); + fetcher = stdx::make_unique<Fetcher>(&getExecutor(), source, "db", findCmdObj, makeCallback()); launchExecutorThread(); } @@ -119,17 +135,16 @@ void FetcherTest::clear() { elapsedMillis = Milliseconds(0); first = false; nextAction = Fetcher::NextAction::kInvalid; - callbackHook = Fetcher::CallbackFn(); } void FetcherTest::scheduleNetworkResponse(const BSONObj& obj) { scheduleNetworkResponse(obj, Milliseconds(0)); } -void FetcherTest::scheduleNetworkResponse(const BSONObj& obj, Milliseconds millis) { +void FetcherTest::scheduleNetworkResponse(const BSONObj& obj, Milliseconds elapsed) { NetworkInterfaceMock* net = getNet(); ASSERT_TRUE(net->hasReadyRequests()); - executor::RemoteCommandResponse response(obj, BSONObj(), millis); + executor::RemoteCommandResponse response(obj, BSONObj(), elapsed); TaskExecutor::ResponseStatus responseStatus(response); net->scheduleResponse(net->getNextReadyRequest(), net->now(), responseStatus); } @@ -137,8 +152,8 @@ void FetcherTest::scheduleNetworkResponseFor(const BSONObj& filter, const BSONOb ASSERT_TRUE(filter[1].eoo()); // The filter should only have one field, to match the cmd name NetworkInterfaceMock* net = getNet(); ASSERT_TRUE(net->hasReadyRequests()); - Milliseconds millis(0); - executor::RemoteCommandResponse response(obj, BSONObj(), millis); + Milliseconds elapsed(0); + executor::RemoteCommandResponse response(obj, BSONObj(), elapsed); TaskExecutor::ResponseStatus responseStatus(response); auto req = net->getNextReadyRequest(); ASSERT_EQ(req->getRequest().cmdObj[0], filter[0]); @@ -148,8 +163,8 @@ void FetcherTest::scheduleNetworkResponseFor(const BSONObj& filter, const BSONOb void FetcherTest::scheduleNetworkResponseFor(NetworkInterfaceMock::NetworkOperationIterator noi, const BSONObj& obj) { NetworkInterfaceMock* net = getNet(); - Milliseconds millis(0); - executor::RemoteCommandResponse response(obj, BSONObj(), millis); + Milliseconds elapsed(0); + executor::RemoteCommandResponse response(obj, BSONObj(), elapsed); TaskExecutor::ResponseStatus responseStatus(response); net->scheduleResponse(noi, net->now(), responseStatus); } @@ -161,28 +176,40 @@ void FetcherTest::scheduleNetworkResponse(ErrorCodes::Error code, const std::str net->scheduleResponse(net->getNextReadyRequest(), net->now(), responseStatus); } -void FetcherTest::processNetworkResponse(const BSONObj& obj) { - auto net = getNet(); - net->enterNetwork(); +void FetcherTest::processNetworkResponse(const BSONObj& obj, + ReadyQueueState readyQueueStateAfterProcessing, + FetcherState fetcherStateAfterProcessing) { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); scheduleNetworkResponse(obj); - finishProcessingNetworkResponse(); - net->exitNetwork(); + finishProcessingNetworkResponse(readyQueueStateAfterProcessing, fetcherStateAfterProcessing); } -void FetcherTest::processNetworkResponse(ErrorCodes::Error code, const std::string& reason) { - auto net = getNet(); - net->enterNetwork(); +void FetcherTest::processNetworkResponse(const BSONObj& obj, + Milliseconds elapsed, + ReadyQueueState readyQueueStateAfterProcessing, + FetcherState fetcherStateAfterProcessing) { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + scheduleNetworkResponse(obj, elapsed); + finishProcessingNetworkResponse(readyQueueStateAfterProcessing, fetcherStateAfterProcessing); +} + +void FetcherTest::processNetworkResponse(ErrorCodes::Error code, + const std::string& reason, + ReadyQueueState readyQueueStateAfterProcessing, + FetcherState fetcherStateAfterProcessing) { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); scheduleNetworkResponse(code, reason); - finishProcessingNetworkResponse(); - net->exitNetwork(); + finishProcessingNetworkResponse(readyQueueStateAfterProcessing, fetcherStateAfterProcessing); } -void FetcherTest::finishProcessingNetworkResponse() { +void FetcherTest::finishProcessingNetworkResponse(ReadyQueueState readyQueueStateAfterProcessing, + FetcherState fetcherStateAfterProcessing) { clear(); ASSERT_TRUE(fetcher->isActive()); getNet()->runReadyNetworkOperations(); - ASSERT_FALSE(getNet()->hasReadyRequests()); - ASSERT_FALSE(fetcher->isActive()); + ASSERT_EQUALS(readyQueueStateAfterProcessing == ReadyQueueState::kHasReadyRequests, + getNet()->hasReadyRequests()); + ASSERT_EQUALS(fetcherStateAfterProcessing == FetcherState::kActive, fetcher->isActive()); } void FetcherTest::_callback(const StatusWith<Fetcher::QueryResponse>& result, @@ -224,25 +251,46 @@ TEST_F(FetcherTest, InvalidConstruction) { ASSERT_THROWS_CODE_AND_WHAT(Fetcher(nullptr, source, "db", findCmdObj, unreachableCallback), UserException, ErrorCodes::BadValue, - "null task executor"); + "task executor cannot be null"); + + // Empty source. + ASSERT_THROWS_CODE_AND_WHAT( + Fetcher(&executor, HostAndPort(), "db", findCmdObj, unreachableCallback), + UserException, + ErrorCodes::BadValue, + "source in remote command request cannot be empty"); // Empty database name. ASSERT_THROWS_CODE_AND_WHAT(Fetcher(&executor, source, "", findCmdObj, unreachableCallback), UserException, ErrorCodes::BadValue, - "database name cannot be empty"); + "database name in remote command request cannot be empty"); // Empty command object. ASSERT_THROWS_CODE_AND_WHAT(Fetcher(&executor, source, "db", BSONObj(), unreachableCallback), UserException, ErrorCodes::BadValue, - "command object cannot be empty"); + "command object in remote command request cannot be empty"); // Callback function cannot be null. ASSERT_THROWS_CODE_AND_WHAT(Fetcher(&executor, source, "db", findCmdObj, Fetcher::CallbackFn()), UserException, ErrorCodes::BadValue, "callback function cannot be null"); + + // Retry policy for first command cannot be null. + ASSERT_THROWS_CODE_AND_WHAT( + Fetcher(&executor, + source, + "db", + findCmdObj, + unreachableCallback, + rpc::makeEmptyMetadata(), + RemoteCommandRequest::kNoTimeout, + std::unique_ptr<RemoteCommandRetryScheduler::RetryPolicy>()), + UserException, + ErrorCodes::BadValue, + "retry policy cannot be null"); } // Command object can refer to any command that returns a cursor. This @@ -275,11 +323,13 @@ TEST_F(FetcherTest, RemoteCommandRequestShouldContainCommandParametersPassedToCo ASSERT_OK(fetcher->schedule()); auto net = getNet(); - net->enterNetwork(); - ASSERT_TRUE(net->hasReadyRequests()); - auto noi = net->getNextReadyRequest(); - auto request = noi->getRequest(); - net->exitNetwork(); + executor::RemoteCommandRequest request; + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + ASSERT_TRUE(net->hasReadyRequests()); + auto noi = net->getNextReadyRequest(); + request = noi->getRequest(); + } ASSERT_EQUALS(source, request.target); ASSERT_EQUALS(findCmdObj, request.cmdObj); @@ -300,7 +350,7 @@ TEST_F(FetcherTest, IsActiveAfterSchedule) { TEST_F(FetcherTest, ScheduleWhenActive) { ASSERT_OK(fetcher->schedule()); ASSERT_TRUE(fetcher->isActive()); - ASSERT_NOT_OK(fetcher->schedule()); + ASSERT_EQUALS(ErrorCodes::IllegalOperation, fetcher->schedule()); } TEST_F(FetcherTest, CancelWithoutSchedule) { @@ -315,7 +365,7 @@ TEST_F(FetcherTest, WaitWithoutSchedule) { TEST_F(FetcherTest, ShutdownBeforeSchedule) { getExecutor().shutdown(); - ASSERT_NOT_OK(fetcher->schedule()); + ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, fetcher->schedule()); ASSERT_FALSE(fetcher->isActive()); } @@ -323,15 +373,17 @@ TEST_F(FetcherTest, ScheduleAndCancel) { ASSERT_OK(fetcher->schedule()); auto net = getNet(); - net->enterNetwork(); - scheduleNetworkResponse(BSON("ok" << 1)); - net->exitNetwork(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + scheduleNetworkResponse(BSON("ok" << 1)); + } fetcher->cancel(); - net->enterNetwork(); - finishProcessingNetworkResponse(); - net->exitNetwork(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + finishProcessingNetworkResponse(ReadyQueueState::kEmpty, FetcherState::kInactive); + } ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status.code()); } @@ -340,18 +392,20 @@ TEST_F(FetcherTest, ScheduleButShutdown) { ASSERT_OK(fetcher->schedule()); auto net = getNet(); - net->enterNetwork(); - scheduleNetworkResponse(BSON("ok" << 1)); - // Network interface should not deliver mock response to callback - // until runReadyNetworkOperations() is called. - net->exitNetwork(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + scheduleNetworkResponse(BSON("ok" << 1)); + // Network interface should not deliver mock response to callback + // until runReadyNetworkOperations() is called. + } ASSERT_TRUE(fetcher->isActive()); getExecutor().shutdown(); - net->enterNetwork(); - net->runReadyNetworkOperations(); - net->exitNetwork(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + net->runReadyNetworkOperations(); + } ASSERT_FALSE(fetcher->isActive()); ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status.code()); @@ -359,7 +413,8 @@ TEST_F(FetcherTest, ScheduleButShutdown) { TEST_F(FetcherTest, FindCommandFailed1) { ASSERT_OK(fetcher->schedule()); - processNetworkResponse(ErrorCodes::BadValue, "bad hint"); + processNetworkResponse( + ErrorCodes::BadValue, "bad hint", ReadyQueueState::kEmpty, FetcherState::kInactive); ASSERT_EQUALS(ErrorCodes::BadValue, status.code()); ASSERT_EQUALS("bad hint", status.reason()); } @@ -368,21 +423,24 @@ TEST_F(FetcherTest, FindCommandFailed2) { ASSERT_OK(fetcher->schedule()); processNetworkResponse(BSON("ok" << 0 << "errmsg" << "bad hint" - << "code" << int(ErrorCodes::BadValue))); + << "code" << int(ErrorCodes::BadValue)), + ReadyQueueState::kEmpty, + FetcherState::kInactive); ASSERT_EQUALS(ErrorCodes::BadValue, status.code()); ASSERT_EQUALS("bad hint", status.reason()); } TEST_F(FetcherTest, CursorFieldMissing) { ASSERT_OK(fetcher->schedule()); - processNetworkResponse(BSON("ok" << 1)); + processNetworkResponse(BSON("ok" << 1), ReadyQueueState::kEmpty, FetcherState::kInactive); ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code()); ASSERT_STRING_CONTAINS(status.reason(), "must contain 'cursor' field"); } TEST_F(FetcherTest, CursorNotAnObject) { ASSERT_OK(fetcher->schedule()); - processNetworkResponse(BSON("cursor" << 123 << "ok" << 1)); + processNetworkResponse( + BSON("cursor" << 123 << "ok" << 1), ReadyQueueState::kEmpty, FetcherState::kInactive); ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code()); ASSERT_STRING_CONTAINS(status.reason(), "'cursor' field must be an object"); } @@ -391,17 +449,20 @@ TEST_F(FetcherTest, CursorIdFieldMissing) { ASSERT_OK(fetcher->schedule()); processNetworkResponse(BSON("cursor" << BSON("ns" << "db.coll" - << "firstBatch" << BSONArray()) << "ok" << 1)); + << "firstBatch" << BSONArray()) << "ok" << 1), + ReadyQueueState::kEmpty, + FetcherState::kInactive); ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code()); ASSERT_STRING_CONTAINS(status.reason(), "must contain 'cursor.id' field"); } TEST_F(FetcherTest, CursorIdNotLongNumber) { ASSERT_OK(fetcher->schedule()); - processNetworkResponse( - BSON("cursor" << BSON("id" << 123.1 << "ns" - << "db.coll" - << "firstBatch" << BSONArray()) << "ok" << 1)); + processNetworkResponse(BSON("cursor" << BSON("id" << 123.1 << "ns" + << "db.coll" + << "firstBatch" << BSONArray()) << "ok" << 1), + ReadyQueueState::kEmpty, + FetcherState::kInactive); ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code()); ASSERT_STRING_CONTAINS(status.reason(), "'cursor.id' field must be"); ASSERT_EQ((int)Fetcher::NextAction::kInvalid, (int)nextAction); @@ -410,7 +471,9 @@ TEST_F(FetcherTest, CursorIdNotLongNumber) { TEST_F(FetcherTest, NamespaceFieldMissing) { ASSERT_OK(fetcher->schedule()); processNetworkResponse( - BSON("cursor" << BSON("id" << 123LL << "firstBatch" << BSONArray()) << "ok" << 1)); + BSON("cursor" << BSON("id" << 123LL << "firstBatch" << BSONArray()) << "ok" << 1), + ReadyQueueState::kEmpty, + FetcherState::kInactive); ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code()); ASSERT_STRING_CONTAINS(status.reason(), "must contain 'cursor.ns' field"); } @@ -418,27 +481,31 @@ TEST_F(FetcherTest, NamespaceFieldMissing) { TEST_F(FetcherTest, NamespaceNotAString) { ASSERT_OK(fetcher->schedule()); processNetworkResponse(BSON("cursor" << BSON("id" << 123LL << "ns" << 123 << "firstBatch" - << BSONArray()) << "ok" << 1)); + << BSONArray()) << "ok" << 1), + ReadyQueueState::kEmpty, + FetcherState::kInactive); ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code()); ASSERT_STRING_CONTAINS(status.reason(), "'cursor.ns' field must be a string"); } TEST_F(FetcherTest, NamespaceEmpty) { ASSERT_OK(fetcher->schedule()); - processNetworkResponse( - BSON("cursor" << BSON("id" << 123LL << "ns" - << "" - << "firstBatch" << BSONArray()) << "ok" << 1)); + processNetworkResponse(BSON("cursor" << BSON("id" << 123LL << "ns" + << "" + << "firstBatch" << BSONArray()) << "ok" << 1), + ReadyQueueState::kEmpty, + FetcherState::kInactive); ASSERT_EQUALS(ErrorCodes::BadValue, status.code()); ASSERT_STRING_CONTAINS(status.reason(), "'cursor.ns' contains an invalid namespace"); } TEST_F(FetcherTest, NamespaceMissingCollectionName) { ASSERT_OK(fetcher->schedule()); - processNetworkResponse( - BSON("cursor" << BSON("id" << 123LL << "ns" - << "db." - << "firstBatch" << BSONArray()) << "ok" << 1)); + processNetworkResponse(BSON("cursor" << BSON("id" << 123LL << "ns" + << "db." + << "firstBatch" << BSONArray()) << "ok" << 1), + ReadyQueueState::kEmpty, + FetcherState::kInactive); ASSERT_EQUALS(ErrorCodes::BadValue, status.code()); ASSERT_STRING_CONTAINS(status.reason(), "'cursor.ns' contains an invalid namespace"); } @@ -446,7 +513,9 @@ TEST_F(FetcherTest, NamespaceMissingCollectionName) { TEST_F(FetcherTest, FirstBatchFieldMissing) { ASSERT_OK(fetcher->schedule()); processNetworkResponse(BSON("cursor" << BSON("id" << 0LL << "ns" - << "db.coll") << "ok" << 1)); + << "db.coll") << "ok" << 1), + ReadyQueueState::kEmpty, + FetcherState::kInactive); ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code()); ASSERT_STRING_CONTAINS(status.reason(), "must contain 'cursor.firstBatch' field"); } @@ -455,7 +524,9 @@ TEST_F(FetcherTest, FirstBatchNotAnArray) { ASSERT_OK(fetcher->schedule()); processNetworkResponse(BSON("cursor" << BSON("id" << 0LL << "ns" << "db.coll" - << "firstBatch" << 123) << "ok" << 1)); + << "firstBatch" << 123) << "ok" << 1), + ReadyQueueState::kEmpty, + FetcherState::kInactive); ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code()); ASSERT_STRING_CONTAINS(status.reason(), "'cursor.firstBatch' field must be an array"); } @@ -465,7 +536,9 @@ TEST_F(FetcherTest, FirstBatchArrayContainsNonObject) { processNetworkResponse( BSON("cursor" << BSON("id" << 0LL << "ns" << "db.coll" - << "firstBatch" << BSON_ARRAY(8)) << "ok" << 1)); + << "firstBatch" << BSON_ARRAY(8)) << "ok" << 1), + ReadyQueueState::kEmpty, + FetcherState::kInactive); ASSERT_EQUALS(ErrorCodes::FailedToParse, status.code()); ASSERT_STRING_CONTAINS(status.reason(), "found non-object"); ASSERT_STRING_CONTAINS(status.reason(), "in 'cursor.firstBatch' field"); @@ -473,10 +546,11 @@ TEST_F(FetcherTest, FirstBatchArrayContainsNonObject) { TEST_F(FetcherTest, FirstBatchEmptyArray) { ASSERT_OK(fetcher->schedule()); - processNetworkResponse( - BSON("cursor" << BSON("id" << 0LL << "ns" - << "db.coll" - << "firstBatch" << BSONArray()) << "ok" << 1)); + processNetworkResponse(BSON("cursor" << BSON("id" << 0LL << "ns" + << "db.coll" + << "firstBatch" << BSONArray()) << "ok" << 1), + ReadyQueueState::kEmpty, + FetcherState::kInactive); ASSERT_OK(status); ASSERT_EQUALS(0, cursorId); ASSERT_EQUALS("db.coll", nss.ns()); @@ -489,7 +563,9 @@ TEST_F(FetcherTest, FetchOneDocument) { processNetworkResponse( BSON("cursor" << BSON("id" << 0LL << "ns" << "db.coll" - << "firstBatch" << BSON_ARRAY(doc)) << "ok" << 1)); + << "firstBatch" << BSON_ARRAY(doc)) << "ok" << 1), + ReadyQueueState::kEmpty, + FetcherState::kInactive); ASSERT_OK(status); ASSERT_EQUALS(0, cursorId); ASSERT_EQUALS("db.coll", nss.ns()); @@ -513,7 +589,9 @@ TEST_F(FetcherTest, SetNextActionToContinueWhenNextBatchIsNotAvailable) { processNetworkResponse( BSON("cursor" << BSON("id" << 0LL << "ns" << "db.coll" - << "firstBatch" << BSON_ARRAY(doc)) << "ok" << 1)); + << "firstBatch" << BSON_ARRAY(doc)) << "ok" << 1), + ReadyQueueState::kEmpty, + FetcherState::kInactive); ASSERT_OK(status); ASSERT_EQUALS(0, cursorId); ASSERT_EQUALS("db.coll", nss.ns()); @@ -539,15 +617,13 @@ TEST_F(FetcherTest, FetchMultipleBatches) { const BSONObj doc = BSON("_id" << 1); - auto net = getNet(); - net->enterNetwork(); - scheduleNetworkResponse( + processNetworkResponse( BSON("cursor" << BSON("id" << 1LL << "ns" << "db.coll" << "firstBatch" << BSON_ARRAY(doc)) << "ok" << 1), - Milliseconds(100)); - getNet()->runReadyNetworkOperations(); - net->exitNetwork(); + Milliseconds(100), + ReadyQueueState::kHasReadyRequests, + FetcherState::kActive); ASSERT_OK(status); ASSERT_EQUALS(1LL, cursorId); @@ -557,19 +633,16 @@ TEST_F(FetcherTest, FetchMultipleBatches) { ASSERT_EQUALS(elapsedMillis, Milliseconds(100)); ASSERT_TRUE(first); ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction); - ASSERT_TRUE(fetcher->isActive()); const BSONObj doc2 = BSON("_id" << 2); - net->enterNetwork(); - ASSERT_TRUE(getNet()->hasReadyRequests()); - scheduleNetworkResponse( + processNetworkResponse( BSON("cursor" << BSON("id" << 1LL << "ns" << "db.coll" << "nextBatch" << BSON_ARRAY(doc2)) << "ok" << 1), - Milliseconds(200)); - getNet()->runReadyNetworkOperations(); - net->exitNetwork(); + Milliseconds(200), + ReadyQueueState::kHasReadyRequests, + FetcherState::kActive); ASSERT_OK(status); ASSERT_EQUALS(1LL, cursorId); @@ -579,19 +652,16 @@ TEST_F(FetcherTest, FetchMultipleBatches) { ASSERT_EQUALS(elapsedMillis, Milliseconds(200)); ASSERT_FALSE(first); ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction); - ASSERT_TRUE(fetcher->isActive()); const BSONObj doc3 = BSON("_id" << 3); - net->enterNetwork(); - ASSERT_TRUE(getNet()->hasReadyRequests()); - scheduleNetworkResponse( + processNetworkResponse( BSON("cursor" << BSON("id" << 0LL << "ns" << "db.coll" << "nextBatch" << BSON_ARRAY(doc3)) << "ok" << 1), - Milliseconds(300)); - getNet()->runReadyNetworkOperations(); - net->exitNetwork(); + Milliseconds(300), + ReadyQueueState::kEmpty, + FetcherState::kInactive); ASSERT_OK(status); ASSERT_EQUALS(0, cursorId); @@ -601,11 +671,6 @@ TEST_F(FetcherTest, FetchMultipleBatches) { ASSERT_EQUALS(elapsedMillis, Milliseconds(300)); ASSERT_FALSE(first); ASSERT_TRUE(Fetcher::NextAction::kNoAction == nextAction); - ASSERT_FALSE(fetcher->isActive()); - - net->enterNetwork(); - ASSERT_FALSE(getNet()->hasReadyRequests()); - net->exitNetwork(); } TEST_F(FetcherTest, ScheduleGetMoreAndCancel) { @@ -615,14 +680,12 @@ TEST_F(FetcherTest, ScheduleGetMoreAndCancel) { const BSONObj doc = BSON("_id" << 1); - auto net = getNet(); - net->enterNetwork(); - scheduleNetworkResponse( + processNetworkResponse( BSON("cursor" << BSON("id" << 1LL << "ns" << "db.coll" - << "firstBatch" << BSON_ARRAY(doc)) << "ok" << 1)); - getNet()->runReadyNetworkOperations(); - net->exitNetwork(); + << "firstBatch" << BSON_ARRAY(doc)) << "ok" << 1), + ReadyQueueState::kHasReadyRequests, + FetcherState::kActive); ASSERT_OK(status); ASSERT_EQUALS(1LL, cursorId); @@ -630,17 +693,14 @@ TEST_F(FetcherTest, ScheduleGetMoreAndCancel) { ASSERT_EQUALS(1U, documents.size()); ASSERT_EQUALS(doc, documents.front()); ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction); - ASSERT_TRUE(fetcher->isActive()); const BSONObj doc2 = BSON("_id" << 2); - net->enterNetwork(); - ASSERT_TRUE(getNet()->hasReadyRequests()); - scheduleNetworkResponse( + processNetworkResponse( BSON("cursor" << BSON("id" << 1LL << "ns" << "db.coll" - << "nextBatch" << BSON_ARRAY(doc2)) << "ok" << 1)); - getNet()->runReadyNetworkOperations(); - net->exitNetwork(); + << "nextBatch" << BSON_ARRAY(doc2)) << "ok" << 1), + ReadyQueueState::kHasReadyRequests, + FetcherState::kActive); ASSERT_OK(status); ASSERT_EQUALS(1LL, cursorId); @@ -648,15 +708,16 @@ TEST_F(FetcherTest, ScheduleGetMoreAndCancel) { ASSERT_EQUALS(1U, documents.size()); ASSERT_EQUALS(doc2, documents.front()); ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction); - ASSERT_TRUE(fetcher->isActive()); fetcher->cancel(); - net->enterNetwork(); - finishProcessingNetworkResponse(); - net->exitNetwork(); + { + auto net = getNet(); + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + finishProcessingNetworkResponse(ReadyQueueState::kEmpty, FetcherState::kInactive); + } - ASSERT_NOT_OK(status); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status); } TEST_F(FetcherTest, ScheduleGetMoreButShutdown) { @@ -666,14 +727,12 @@ TEST_F(FetcherTest, ScheduleGetMoreButShutdown) { const BSONObj doc = BSON("_id" << 1); - auto net = getNet(); - net->enterNetwork(); - scheduleNetworkResponse( + processNetworkResponse( BSON("cursor" << BSON("id" << 1LL << "ns" << "db.coll" - << "firstBatch" << BSON_ARRAY(doc)) << "ok" << 1)); - getNet()->runReadyNetworkOperations(); - net->exitNetwork(); + << "firstBatch" << BSON_ARRAY(doc)) << "ok" << 1), + ReadyQueueState::kHasReadyRequests, + FetcherState::kActive); ASSERT_OK(status); ASSERT_EQUALS(1LL, cursorId); @@ -681,18 +740,15 @@ TEST_F(FetcherTest, ScheduleGetMoreButShutdown) { ASSERT_EQUALS(1U, documents.size()); ASSERT_EQUALS(doc, documents.front()); ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction); - ASSERT_TRUE(fetcher->isActive()); const BSONObj doc2 = BSON("_id" << 2); - net->enterNetwork(); - ASSERT_TRUE(getNet()->hasReadyRequests()); - scheduleNetworkResponse( + processNetworkResponse( BSON("cursor" << BSON("id" << 1LL << "ns" << "db.coll" - << "nextBatch" << BSON_ARRAY(doc2)) << "ok" << 1)); - getNet()->runReadyNetworkOperations(); - net->exitNetwork(); + << "nextBatch" << BSON_ARRAY(doc2)) << "ok" << 1), + ReadyQueueState::kHasReadyRequests, + FetcherState::kActive); ASSERT_OK(status); ASSERT_EQUALS(1LL, cursorId); @@ -700,15 +756,16 @@ TEST_F(FetcherTest, ScheduleGetMoreButShutdown) { ASSERT_EQUALS(1U, documents.size()); ASSERT_EQUALS(doc2, documents.front()); ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction); - ASSERT_TRUE(fetcher->isActive()); getExecutor().shutdown(); - net->enterNetwork(); - net->runReadyNetworkOperations(); - net->exitNetwork(); + { + auto net = getNet(); + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + net->runReadyNetworkOperations(); + } - ASSERT_NOT_OK(status); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status); } @@ -723,14 +780,12 @@ TEST_F(FetcherTest, EmptyGetMoreRequestAfterFirstBatchMakesFetcherInactiveAndKil const BSONObj doc = BSON("_id" << 1); - auto net = getNet(); - net->enterNetwork(); - scheduleNetworkResponse( + processNetworkResponse( BSON("cursor" << BSON("id" << 1LL << "ns" << "db.coll" - << "firstBatch" << BSON_ARRAY(doc)) << "ok" << 1)); - getNet()->runReadyNetworkOperations(); - net->exitNetwork(); + << "firstBatch" << BSON_ARRAY(doc)) << "ok" << 1), + ReadyQueueState::kHasReadyRequests, + FetcherState::kInactive); ASSERT_OK(status); ASSERT_EQUALS(1LL, cursorId); @@ -738,13 +793,15 @@ TEST_F(FetcherTest, EmptyGetMoreRequestAfterFirstBatchMakesFetcherInactiveAndKil ASSERT_EQUALS(1U, documents.size()); ASSERT_EQUALS(doc, documents.front()); ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction); - ASSERT_FALSE(fetcher->isActive()); - net->enterNetwork(); - ASSERT_TRUE(getNet()->hasReadyRequests()); - auto noi = getNet()->getNextReadyRequest(); - auto request = noi->getRequest(); - net->exitNetwork(); + executor::RemoteCommandRequest request; + { + auto net = getNet(); + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + ASSERT_TRUE(getNet()->hasReadyRequests()); + auto noi = getNet()->getNextReadyRequest(); + request = noi->getRequest(); + } ASSERT_EQUALS(nss.db(), request.dbname); auto&& cmdObj = request.cmdObj; @@ -778,14 +835,12 @@ TEST_F(FetcherTest, UpdateNextActionAfterSecondBatch) { const BSONObj doc = BSON("_id" << 1); - auto net = getNet(); - net->enterNetwork(); - scheduleNetworkResponse( + processNetworkResponse( BSON("cursor" << BSON("id" << 1LL << "ns" << "db.coll" - << "firstBatch" << BSON_ARRAY(doc)) << "ok" << 1)); - getNet()->runReadyNetworkOperations(); - net->exitNetwork(); + << "firstBatch" << BSON_ARRAY(doc)) << "ok" << 1), + ReadyQueueState::kHasReadyRequests, + FetcherState::kActive); ASSERT_OK(status); ASSERT_EQUALS(1LL, cursorId); @@ -793,20 +848,17 @@ TEST_F(FetcherTest, UpdateNextActionAfterSecondBatch) { ASSERT_EQUALS(1U, documents.size()); ASSERT_EQUALS(doc, documents.front()); ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction); - ASSERT_TRUE(fetcher->isActive()); const BSONObj doc2 = BSON("_id" << 2); callbackHook = setNextActionToNoAction; - net->enterNetwork(); - ASSERT_TRUE(getNet()->hasReadyRequests()); - scheduleNetworkResponse( + processNetworkResponse( BSON("cursor" << BSON("id" << 1LL << "ns" << "db.coll" - << "nextBatch" << BSON_ARRAY(doc2)) << "ok" << 1)); - getNet()->runReadyNetworkOperations(); - net->exitNetwork(); + << "nextBatch" << BSON_ARRAY(doc2)) << "ok" << 1), + ReadyQueueState::kHasReadyRequests, + FetcherState::kInactive); ASSERT_OK(status); ASSERT_EQUALS(1LL, cursorId); @@ -814,29 +866,28 @@ TEST_F(FetcherTest, UpdateNextActionAfterSecondBatch) { ASSERT_EQUALS(1U, documents.size()); ASSERT_EQUALS(doc2, documents.front()); ASSERT_TRUE(Fetcher::NextAction::kNoAction == nextAction); - ASSERT_FALSE(fetcher->isActive()); - - net->enterNetwork(); - ASSERT_TRUE(getNet()->hasReadyRequests()); - auto noi = getNet()->getNextReadyRequest(); - auto request = noi->getRequest(); - net->exitNetwork(); - - ASSERT_EQUALS(nss.db(), request.dbname); - auto&& cmdObj = request.cmdObj; - auto firstElement = cmdObj.firstElement(); - ASSERT_EQUALS("killCursors", firstElement.fieldNameStringData()); - ASSERT_EQUALS(nss.coll(), firstElement.String()); - ASSERT_EQUALS(mongo::BSONType::Array, cmdObj["cursors"].type()); - auto cursors = cmdObj["cursors"].Array(); - ASSERT_EQUALS(1U, cursors.size()); - ASSERT_EQUALS(cursorId, cursors.front().numberLong()); - // Failed killCursors command response should be logged. - net->enterNetwork(); - scheduleNetworkResponseFor(noi, BSON("ok" << false)); - getNet()->runReadyNetworkOperations(); - net->exitNetwork(); + { + auto net = getNet(); + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + ASSERT_TRUE(net->hasReadyRequests()); + auto noi = net->getNextReadyRequest(); + auto request = noi->getRequest(); + + ASSERT_EQUALS(nss.db(), request.dbname); + auto&& cmdObj = request.cmdObj; + auto firstElement = cmdObj.firstElement(); + ASSERT_EQUALS("killCursors", firstElement.fieldNameStringData()); + ASSERT_EQUALS(nss.coll(), firstElement.String()); + ASSERT_EQUALS(mongo::BSONType::Array, cmdObj["cursors"].type()); + auto cursors = cmdObj["cursors"].Array(); + ASSERT_EQUALS(1U, cursors.size()); + ASSERT_EQUALS(cursorId, cursors.front().numberLong()); + + // Failed killCursors command response should be logged. + scheduleNetworkResponseFor(noi, BSON("ok" << false)); + getNet()->runReadyNetworkOperations(); + } ASSERT_EQUALS(1, countLogLinesContaining("killCursors command failed: UnknownError")); } @@ -877,14 +928,12 @@ TEST_F(FetcherTest, ShutdownDuringSecondBatch) { const BSONObj doc = BSON("_id" << 1); - auto net = getNet(); - net->enterNetwork(); - scheduleNetworkResponse( + processNetworkResponse( BSON("cursor" << BSON("id" << 1LL << "ns" << "db.coll" - << "firstBatch" << BSON_ARRAY(doc)) << "ok" << 1)); - getNet()->runReadyNetworkOperations(); - net->exitNetwork(); + << "firstBatch" << BSON_ARRAY(doc)) << "ok" << 1), + ReadyQueueState::kHasReadyRequests, + FetcherState::kActive); ASSERT_OK(status); ASSERT_EQUALS(1LL, cursorId); @@ -892,7 +941,6 @@ TEST_F(FetcherTest, ShutdownDuringSecondBatch) { ASSERT_EQUALS(1U, documents.size()); ASSERT_EQUALS(doc, documents.front()); ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction); - ASSERT_TRUE(fetcher->isActive()); const BSONObj doc2 = BSON("_id" << 2); @@ -905,14 +953,12 @@ TEST_F(FetcherTest, ShutdownDuringSecondBatch) { &getExecutor(), &isShutdownCalled); - net->enterNetwork(); - ASSERT_TRUE(getNet()->hasReadyRequests()); - scheduleNetworkResponse( + processNetworkResponse( BSON("cursor" << BSON("id" << 1LL << "ns" << "db.coll" - << "nextBatch" << BSON_ARRAY(doc2)) << "ok" << 1)); - getNet()->runReadyNetworkOperations(); - net->exitNetwork(); + << "nextBatch" << BSON_ARRAY(doc2)) << "ok" << 1), + ReadyQueueState::kEmpty, + FetcherState::kInactive); // Fetcher should attempt (unsuccessfully) to schedule a killCursors command. ASSERT_EQUALS( @@ -921,7 +967,54 @@ TEST_F(FetcherTest, ShutdownDuringSecondBatch) { "failed to schedule killCursors command: ShutdownInProgress: Shutdown in progress")); ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status.code()); - ASSERT_FALSE(fetcher->isActive()); +} + +TEST_F(FetcherTest, FetcherAppliesRetryPolicyToFirstCommandButNotToGetMoreRequests) { + auto policy = RemoteCommandRetryScheduler::makeRetryPolicy( + 3U, + executor::RemoteCommandRequest::kNoTimeout, + {ErrorCodes::BadValue, ErrorCodes::InternalError}); + + fetcher = stdx::make_unique<Fetcher>(&getExecutor(), + source, + "db", + findCmdObj, + makeCallback(), + rpc::makeEmptyMetadata(), + executor::RemoteCommandRequest::kNoTimeout, + std::move(policy)); + + callbackHook = appendGetMoreRequest; + + ASSERT_OK(fetcher->schedule()); + + // Retry policy is applied to find command. + const BSONObj doc = BSON("_id" << 1); + processNetworkResponse( + ErrorCodes::BadValue, "first", ReadyQueueState::kHasReadyRequests, FetcherState::kActive); + processNetworkResponse(ErrorCodes::InternalError, + "second", + ReadyQueueState::kHasReadyRequests, + FetcherState::kActive); + processNetworkResponse( + BSON("cursor" << BSON("id" << 1LL << "ns" + << "db.coll" + << "firstBatch" << BSON_ARRAY(doc)) << "ok" << 1), + ReadyQueueState::kHasReadyRequests, + FetcherState::kActive); + ASSERT_OK(status); + ASSERT_EQUALS(1LL, cursorId); + ASSERT_EQUALS("db.coll", nss.ns()); + ASSERT_EQUALS(1U, documents.size()); + ASSERT_EQUALS(doc, documents.front()); + ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction); + + // No retry policy for subsequent getMore commands. + processNetworkResponse(ErrorCodes::OperationFailed, + "getMore failed", + ReadyQueueState::kEmpty, + FetcherState::kInactive); + ASSERT_EQUALS(ErrorCodes::OperationFailed, status); } } // namespace diff --git a/src/mongo/client/remote_command_retry_scheduler.cpp b/src/mongo/client/remote_command_retry_scheduler.cpp new file mode 100644 index 00000000000..f290a85484f --- /dev/null +++ b/src/mongo/client/remote_command_retry_scheduler.cpp @@ -0,0 +1,238 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kExecutor + +#include "mongo/platform/basic.h" + +#include <algorithm> +#include <vector> + +#include "mongo/client/remote_command_retry_scheduler.h" +#include "mongo/stdx/memory.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/log.h" + +namespace mongo { + +namespace { + +class RetryPolicyImpl : public RemoteCommandRetryScheduler::RetryPolicy { +public: + RetryPolicyImpl(std::size_t maximumAttempts, + Milliseconds maximumResponseElapsedTotal, + const std::initializer_list<ErrorCodes::Error>& retryableErrors); + std::size_t getMaximumAttempts() const override; + Milliseconds getMaximumResponseElapsedTotal() const override; + bool shouldRetryOnError(ErrorCodes::Error error) const override; + +private: + std::size_t _maximumAttempts; + Milliseconds _maximumResponseElapsedTotal; + std::vector<ErrorCodes::Error> _retryableErrors; +}; + +RetryPolicyImpl::RetryPolicyImpl(std::size_t maximumAttempts, + Milliseconds maximumResponseElapsedTotal, + const std::initializer_list<ErrorCodes::Error>& retryableErrors) + : _maximumAttempts(maximumAttempts), + _maximumResponseElapsedTotal(maximumResponseElapsedTotal), + _retryableErrors(retryableErrors) { + std::sort(_retryableErrors.begin(), _retryableErrors.end()); +} + +std::size_t RetryPolicyImpl::getMaximumAttempts() const { + return _maximumAttempts; +} + +Milliseconds RetryPolicyImpl::getMaximumResponseElapsedTotal() const { + return _maximumResponseElapsedTotal; +} + +bool RetryPolicyImpl::shouldRetryOnError(ErrorCodes::Error error) const { + return std::binary_search(_retryableErrors.cbegin(), _retryableErrors.cend(), error); +} + +} // namespace + +const std::initializer_list<ErrorCodes::Error> RemoteCommandRetryScheduler::kNotMasterErrors{ + ErrorCodes::NotMaster, ErrorCodes::NotMasterNoSlaveOk, ErrorCodes::NotMasterOrSecondary}; + +const std::initializer_list<ErrorCodes::Error> RemoteCommandRetryScheduler::kAllRetriableErrors{ + ErrorCodes::NotMaster, + ErrorCodes::NotMasterNoSlaveOk, + ErrorCodes::NotMasterOrSecondary, + // If write concern failed to be satisfied on the remote server, this most probably means that + // some of the secondary nodes were unreachable or otherwise unresponsive, so the call is safe + // to be retried if idempotency can be guaranteed. + ErrorCodes::WriteConcernFailed, + ErrorCodes::HostUnreachable, + ErrorCodes::HostNotFound, + ErrorCodes::NetworkTimeout, + ErrorCodes::InterruptedDueToReplStateChange}; + +std::unique_ptr<RemoteCommandRetryScheduler::RetryPolicy> +RemoteCommandRetryScheduler::makeNoRetryPolicy() { + return makeRetryPolicy(1U, executor::RemoteCommandRequest::kNoTimeout, {}); +} + +std::unique_ptr<RemoteCommandRetryScheduler::RetryPolicy> +RemoteCommandRetryScheduler::makeRetryPolicy( + std::size_t maxAttempts, + Milliseconds maxResponseElapsedTotal, + const std::initializer_list<ErrorCodes::Error>& retryableErrors) { + std::unique_ptr<RetryPolicy> policy = + stdx::make_unique<RetryPolicyImpl>(maxAttempts, maxResponseElapsedTotal, retryableErrors); + return policy; +} + +RemoteCommandRetryScheduler::RemoteCommandRetryScheduler( + executor::TaskExecutor* executor, + const executor::RemoteCommandRequest& request, + const executor::TaskExecutor::RemoteCommandCallbackFn& callback, + std::unique_ptr<RetryPolicy> retryPolicy) + : _executor(executor), + _request(request), + _callback(callback), + _retryPolicy(std::move(retryPolicy)) { + uassert(ErrorCodes::BadValue, "task executor cannot be null", executor); + uassert(ErrorCodes::BadValue, + "source in remote command request cannot be empty", + !request.target.empty()); + uassert(ErrorCodes::BadValue, + "database name in remote command request cannot be empty", + !request.dbname.empty()); + uassert(ErrorCodes::BadValue, + "command object in remote command request cannot be empty", + !request.cmdObj.isEmpty()); + uassert(ErrorCodes::BadValue, "remote command callback function cannot be null", callback); + uassert(ErrorCodes::BadValue, "retry policy cannot be null", _retryPolicy); + uassert(ErrorCodes::BadValue, + "policy max attempts cannot be zero", + _retryPolicy->getMaximumAttempts() != 0); + uassert(ErrorCodes::BadValue, + "policy max response elapsed total cannot be negative", + !(_retryPolicy->getMaximumResponseElapsedTotal() != + executor::RemoteCommandRequest::kNoTimeout && + _retryPolicy->getMaximumResponseElapsedTotal() < Milliseconds(0))); +} + +RemoteCommandRetryScheduler::~RemoteCommandRetryScheduler() { + DESTRUCTOR_GUARD(shutdown(); join();); +} + +bool RemoteCommandRetryScheduler::isActive() const { + stdx::lock_guard<stdx::mutex> lock(_mutex); + return _active; +} + +Status RemoteCommandRetryScheduler::startup() { + stdx::lock_guard<stdx::mutex> lock(_mutex); + + if (_active) { + return Status(ErrorCodes::IllegalOperation, "fetcher already scheduled"); + } + + auto scheduleStatus = _schedule_inlock(0); + if (!scheduleStatus.isOK()) { + return scheduleStatus; + } + + _active = true; + return Status::OK(); +} + +void RemoteCommandRetryScheduler::shutdown() { + executor::TaskExecutor::CallbackHandle remoteCommandCallbackHandle; + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + + if (!_active) { + return; + } + + remoteCommandCallbackHandle = _remoteCommandCallbackHandle; + } + + invariant(remoteCommandCallbackHandle.isValid()); + _executor->cancel(remoteCommandCallbackHandle); +} + +void RemoteCommandRetryScheduler::join() { + stdx::unique_lock<stdx::mutex> lock(_mutex); + _condition.wait(lock, [this]() { return !_active; }); +} + +Status RemoteCommandRetryScheduler::_schedule_inlock(std::size_t requestCount) { + auto scheduleResult = _executor->scheduleRemoteCommand( + _request, + stdx::bind(&RemoteCommandRetryScheduler::_remoteCommandCallback, + this, + stdx::placeholders::_1, + requestCount + 1)); + + if (!scheduleResult.isOK()) { + return scheduleResult.getStatus(); + } + + _remoteCommandCallbackHandle = scheduleResult.getValue(); + return Status::OK(); +} + +void RemoteCommandRetryScheduler::_remoteCommandCallback( + const executor::TaskExecutor::RemoteCommandCallbackArgs& rcba, std::size_t requestCount) { + auto status = rcba.response.getStatus(); + + if (status.isOK() || status == ErrorCodes::CallbackCanceled || + requestCount == _retryPolicy->getMaximumAttempts() || + !_retryPolicy->shouldRetryOnError(status.code())) { + _onComplete(rcba); + return; + } + + // TODO(benety): Check cumulative elapsed time of failed responses received against retry + // policy. Requires SERVER-24067. + + auto scheduleStatus = _schedule_inlock(requestCount); + if (!scheduleStatus.isOK()) { + _onComplete({rcba.executor, rcba.myHandle, rcba.request, scheduleStatus}); + return; + } +} + +void RemoteCommandRetryScheduler::_onComplete( + const executor::TaskExecutor::RemoteCommandCallbackArgs& rcba) { + _callback(rcba); + + stdx::lock_guard<stdx::mutex> lock(_mutex); + invariant(_active); + _active = false; + _condition.notify_all(); +} + +} // namespace mongo diff --git a/src/mongo/client/remote_command_retry_scheduler.h b/src/mongo/client/remote_command_retry_scheduler.h new file mode 100644 index 00000000000..e8bba705edf --- /dev/null +++ b/src/mongo/client/remote_command_retry_scheduler.h @@ -0,0 +1,201 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <cstdlib> +#include <initializer_list> +#include <memory> + +#include "mongo/base/disallow_copying.h" +#include "mongo/base/error_codes.h" +#include "mongo/executor/task_executor.h" +#include "mongo/stdx/mutex.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/util/time_support.h" + +namespace mongo { + +/** + * Schedules a remote command request. On receiving a response from task executor (or remote + * server), decides if the response should be forwarded to the "_callback" provided in the + * constructor based on the retry policy. + * + * If the command is successful or has been canceled (either by calling cancel() or canceled by + * the task executor on shutdown), the response is forwarded immediately to "_callback" and the + * scheduler becomes inactive. + * + * Otherwise, the retry policy (specified at construction) is used to decide if we should + * resubmit the remote command request. The retry policy is defined by: + * - maximum number of times to run the remote command; + * - maximum elapsed time of all failed remote command responses (requires SERVER-24067); + * - list of error codes, if present in the response, should stop the scheduler. + */ +class RemoteCommandRetryScheduler { + MONGO_DISALLOW_COPYING(RemoteCommandRetryScheduler); + +public: + class RetryPolicy; + + /** + * List of not master error codes. + */ + static const std::initializer_list<ErrorCodes::Error> kNotMasterErrors; + + /** + * List of retriable error codes. + */ + static const std::initializer_list<ErrorCodes::Error> kAllRetriableErrors; + + /** + * Generates a retry policy that will send the remote command request to the source at most + * once. + */ + static std::unique_ptr<RetryPolicy> makeNoRetryPolicy(); + + /** + * Creates a retry policy that will send the remote command request at most "maxAttempts". + * This policy will also direct the scheduler to stop retrying if it encounters any of the + * errors in "nonRetryableErrors". + * (Requires SERVER-24067) The scheduler will also stop retrying if the total elapsed time + * of all failed requests exceeds "maxResponseElapsedTotal". + */ + static std::unique_ptr<RetryPolicy> makeRetryPolicy( + std::size_t maxAttempts, + Milliseconds maxResponseElapsedTotal, + const std::initializer_list<ErrorCodes::Error>& retryableErrors); + + /** + * Creates scheduler but does not schedule any remote command request. + */ + RemoteCommandRetryScheduler(executor::TaskExecutor* executor, + const executor::RemoteCommandRequest& request, + const executor::TaskExecutor::RemoteCommandCallbackFn& callback, + std::unique_ptr<RetryPolicy> retryPolicy); + + virtual ~RemoteCommandRetryScheduler(); + + /** + * Returns true if we have scheduled a remote command and are waiting for the response. + */ + bool isActive() const; + + /** + * Schedules remote command request. + */ + Status startup(); + + /** + * Cancels scheduled remote command requests. + * Returns immediately if the scheduler is not active. + * It is fine to call this multiple times. + */ + void shutdown(); + + /** + * Waits until the scheduler is inactive. + * It is fine to call this multiple times. + */ + void join(); + +private: + /** + * Schedules remote command to be run by the executor. + * "requestCount" is number of requests scheduled before calling this function. + * When this function is called for the first time by startup(), "requestCount" will be 0. + * The executor will invoke _remoteCommandCallback() with the remote command response and + * ("requestCount" + 1). + * By passing "requestCount" between tasks, we avoid having to synchronize access to this count + * if it were a field. + */ + Status _schedule_inlock(std::size_t requestCount); + + /** + * Callback for remote command. + * "requestCount" is number of requests scheduled prior to this response. + */ + void _remoteCommandCallback(const executor::TaskExecutor::RemoteCommandCallbackArgs& rcba, + std::size_t requestCount); + + /** + * Notifies caller that the scheduler has completed processing responses. + */ + void _onComplete(const executor::TaskExecutor::RemoteCommandCallbackArgs& rcba); + + // Not owned by us. + executor::TaskExecutor* _executor; + + const executor::RemoteCommandRequest _request; + const executor::TaskExecutor::RemoteCommandCallbackFn _callback; + std::unique_ptr<RetryPolicy> _retryPolicy; + + // Protects member data of this scheduler declared after mutex. + mutable stdx::mutex _mutex; + + mutable stdx::condition_variable _condition; + + // _active is true when remote command is scheduled to be run by the executor. + bool _active = false; + + // Callback handle to the scheduled remote command. + executor::TaskExecutor::CallbackHandle _remoteCommandCallbackHandle; +}; + +/** + * Policy used by RemoteCommandRetryScheduler to determine if it is necessary to schedule another + * remote command request. + */ +class RemoteCommandRetryScheduler::RetryPolicy { +public: + virtual ~RetryPolicy() = default; + + /** + * Retry scheduler should not send remote command request more than this limit. + */ + virtual std::size_t getMaximumAttempts() const = 0; + + /** + * Retry scheduler should not re-send remote command request if total response elapsed times of + * prior responses exceed this limit. + * Assumes that re-sending the command will not exceed the limit returned by + * "getMaximumAttempts()". + * Returns executor::RemoteCommandRequest::kNoTimeout if this limit should be ignored. + */ + virtual Milliseconds getMaximumResponseElapsedTotal() const = 0; + + /** + * Checks the error code in the most recent remote command response and returns true if + * scheduler should retry the remote command request. + * Assumes that re-sending the command will not exceed the limit returned by + * "getMaximumAttempts()" and total response elapsed time has not been exceeded (see + * "getMaximumResponseElapsedTotal()"). + */ + virtual bool shouldRetryOnError(ErrorCodes::Error error) const = 0; +}; + +} // namespace mongo diff --git a/src/mongo/client/remote_command_retry_scheduler_test.cpp b/src/mongo/client/remote_command_retry_scheduler_test.cpp new file mode 100644 index 00000000000..3e3b3d3c513 --- /dev/null +++ b/src/mongo/client/remote_command_retry_scheduler_test.cpp @@ -0,0 +1,450 @@ +/** + * Copyright 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include <memory> +#include <string> +#include <vector> + +#include "mongo/base/disallow_copying.h" +#include "mongo/base/status_with.h" +#include "mongo/client/remote_command_retry_scheduler.h" +#include "mongo/db/jsobj.h" +#include "mongo/executor/remote_command_response.h" +#include "mongo/executor/task_executor.h" +#include "mongo/executor/thread_pool_task_executor_test_fixture.h" +#include "mongo/stdx/memory.h" +#include "mongo/unittest/task_executor_proxy.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/net/hostandport.h" + +namespace { + +using namespace mongo; + +class CallbackResponseSaver; + +class RemoteCommandRetrySchedulerTest : public executor::ThreadPoolExecutorTest { +public: + void start(RemoteCommandRetryScheduler* scheduler); + void checkCompletionStatus(RemoteCommandRetryScheduler* scheduler, + const CallbackResponseSaver& callbackResponseSaver, + const executor::TaskExecutor::ResponseStatus& response); + void processNetworkResponse(const executor::TaskExecutor::ResponseStatus& response); + void runReadyNetworkOperations(); + +protected: + void setUp() override; + void tearDown() override; +}; + +class CallbackResponseSaver { + MONGO_DISALLOW_COPYING(CallbackResponseSaver); + +public: + CallbackResponseSaver(); + + /** + * Use this for scheduler callback. + */ + void operator()(const executor::TaskExecutor::RemoteCommandCallbackArgs& rcba); + + std::vector<StatusWith<executor::RemoteCommandResponse>> getResponses() const; + +private: + std::vector<StatusWith<executor::RemoteCommandResponse>> _responses; +}; + +/** + * Task executor proxy with fail point for scheduleRemoteCommand(). + */ +class TaskExecutorWithFailureInScheduleRemoteCommand : public unittest::TaskExecutorProxy { +public: + TaskExecutorWithFailureInScheduleRemoteCommand(executor::TaskExecutor* executor) + : unittest::TaskExecutorProxy(executor) {} + virtual StatusWith<executor::TaskExecutor::CallbackHandle> scheduleRemoteCommand( + const executor::RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb) override { + if (scheduleRemoteCommandFailPoint) { + return Status(ErrorCodes::ShutdownInProgress, + "failed to send remote command - shutdown in progress"); + } + return getExecutor()->scheduleRemoteCommand(request, cb); + } + + bool scheduleRemoteCommandFailPoint = false; +}; + +void RemoteCommandRetrySchedulerTest::start(RemoteCommandRetryScheduler* scheduler) { + ASSERT_FALSE(scheduler->isActive()); + + ASSERT_OK(scheduler->startup()); + ASSERT_TRUE(scheduler->isActive()); + + // Starting an already active scheduler should fail. + ASSERT_EQUALS(ErrorCodes::IllegalOperation, scheduler->startup()); + ASSERT_TRUE(scheduler->isActive()); + + auto net = getNet(); + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + ASSERT_TRUE(net->hasReadyRequests()); +} + +void RemoteCommandRetrySchedulerTest::checkCompletionStatus( + RemoteCommandRetryScheduler* scheduler, + const CallbackResponseSaver& callbackResponseSaver, + const executor::TaskExecutor::ResponseStatus& response) { + ASSERT_FALSE(scheduler->isActive()); + auto responses = callbackResponseSaver.getResponses(); + ASSERT_EQUALS(1U, responses.size()); + if (response.isOK()) { + ASSERT_OK(responses.front().getStatus()); + ASSERT_EQUALS(response.getValue(), responses.front().getValue()); + } else { + ASSERT_EQUALS(response.getStatus(), responses.front().getStatus()); + } +} + +void RemoteCommandRetrySchedulerTest::processNetworkResponse( + const executor::TaskExecutor::ResponseStatus& response) { + auto net = getNet(); + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + ASSERT_TRUE(net->hasReadyRequests()); + auto noi = net->getNextReadyRequest(); + net->scheduleResponse(noi, net->now(), response); + net->runReadyNetworkOperations(); +} + +void RemoteCommandRetrySchedulerTest::runReadyNetworkOperations() { + auto net = getNet(); + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + net->runReadyNetworkOperations(); +} + +void RemoteCommandRetrySchedulerTest::setUp() { + executor::ThreadPoolExecutorTest::setUp(); + launchExecutorThread(); +} + +void RemoteCommandRetrySchedulerTest::tearDown() { + executor::ThreadPoolExecutorTest::tearDown(); +} + +CallbackResponseSaver::CallbackResponseSaver() = default; + +void CallbackResponseSaver::operator()( + const executor::TaskExecutor::RemoteCommandCallbackArgs& rcba) { + _responses.push_back(rcba.response); +} + +std::vector<StatusWith<executor::RemoteCommandResponse>> CallbackResponseSaver::getResponses() + const { + return _responses; +} + +const executor::RemoteCommandRequest request(HostAndPort("h1:12345"), "db1", BSON("ping" << 1)); + +TEST_F(RemoteCommandRetrySchedulerTest, MakeSingleShotRetryPolicy) { + auto policy = RemoteCommandRetryScheduler::makeNoRetryPolicy(); + ASSERT_TRUE(policy); + ASSERT_EQUALS(1U, policy->getMaximumAttempts()); + ASSERT_EQUALS(executor::RemoteCommandRequest::kNoTimeout, + policy->getMaximumResponseElapsedTotal()); + // Doesn't matter what "shouldRetryOnError()" returns since we won't be retrying the remote + // command. + for (int i = 0; i < int(ErrorCodes::MaxError); ++i) { + auto error = ErrorCodes::fromInt(i); + ASSERT_FALSE(policy->shouldRetryOnError(error)); + } +} + +TEST_F(RemoteCommandRetrySchedulerTest, MakeRetryPolicy) { + auto policy = RemoteCommandRetryScheduler::makeRetryPolicy( + 5U, + Milliseconds(100), + {ErrorCodes::FailedToParse, ErrorCodes::InvalidNamespace, ErrorCodes::InternalError}); + ASSERT_EQUALS(5U, policy->getMaximumAttempts()); + ASSERT_EQUALS(Milliseconds(100), policy->getMaximumResponseElapsedTotal()); + for (int i = 0; i < int(ErrorCodes::MaxError); ++i) { + auto error = ErrorCodes::fromInt(i); + if (error == ErrorCodes::InternalError || error == ErrorCodes::FailedToParse || + error == ErrorCodes::InvalidNamespace) { + ASSERT_TRUE(policy->shouldRetryOnError(error)); + continue; + } + ASSERT_FALSE(policy->shouldRetryOnError(error)); + } +} + +TEST_F(RemoteCommandRetrySchedulerTest, InvalidConstruction) { + auto callback = [](const executor::TaskExecutor::RemoteCommandCallbackArgs&) {}; + auto makeRetryPolicy = [] { return RemoteCommandRetryScheduler::makeNoRetryPolicy(); }; + + // Null executor. + ASSERT_THROWS_CODE_AND_WHAT( + RemoteCommandRetryScheduler(nullptr, request, callback, makeRetryPolicy()), + UserException, + ErrorCodes::BadValue, + "task executor cannot be null"); + + // Empty source in remote command request. + ASSERT_THROWS_CODE_AND_WHAT( + RemoteCommandRetryScheduler( + &getExecutor(), + executor::RemoteCommandRequest(HostAndPort(), request.dbname, request.cmdObj), + callback, + makeRetryPolicy()), + UserException, + ErrorCodes::BadValue, + "source in remote command request cannot be empty"); + + // Empty source in remote command request. + ASSERT_THROWS_CODE_AND_WHAT(RemoteCommandRetryScheduler(&getExecutor(), + executor::RemoteCommandRequest( + request.target, "", request.cmdObj), + callback, + makeRetryPolicy()), + UserException, + ErrorCodes::BadValue, + "database name in remote command request cannot be empty"); + + // Empty command object in remote command request. + ASSERT_THROWS_CODE_AND_WHAT( + RemoteCommandRetryScheduler( + &getExecutor(), + executor::RemoteCommandRequest(request.target, request.dbname, BSONObj()), + callback, + makeRetryPolicy()), + UserException, + ErrorCodes::BadValue, + "command object in remote command request cannot be empty"); + + // Null remote command callback function. + ASSERT_THROWS_CODE_AND_WHAT( + RemoteCommandRetryScheduler(&getExecutor(), + request, + executor::TaskExecutor::RemoteCommandCallbackFn(), + makeRetryPolicy()), + UserException, + ErrorCodes::BadValue, + "remote command callback function cannot be null"); + + // Null retry policy. + ASSERT_THROWS_CODE_AND_WHAT( + RemoteCommandRetryScheduler(&getExecutor(), + request, + callback, + std::unique_ptr<RemoteCommandRetryScheduler::RetryPolicy>()), + UserException, + ErrorCodes::BadValue, + "retry policy cannot be null"); + + // Policy max attempts should be positive. + ASSERT_THROWS_CODE_AND_WHAT( + RemoteCommandRetryScheduler( + &getExecutor(), + request, + callback, + RemoteCommandRetryScheduler::makeRetryPolicy(0, Milliseconds(100), {})), + UserException, + ErrorCodes::BadValue, + "policy max attempts cannot be zero"); + + // Policy max response elapsed total cannot be negative. + ASSERT_THROWS_CODE_AND_WHAT( + RemoteCommandRetryScheduler( + &getExecutor(), + request, + callback, + RemoteCommandRetryScheduler::makeRetryPolicy(1U, Milliseconds(-100), {})), + UserException, + ErrorCodes::BadValue, + "policy max response elapsed total cannot be negative"); +} + +TEST_F(RemoteCommandRetrySchedulerTest, StartupFailsWhenExecutorIsShutDown) { + auto callback = [](const executor::TaskExecutor::RemoteCommandCallbackArgs&) {}; + auto policy = RemoteCommandRetryScheduler::makeNoRetryPolicy(); + + RemoteCommandRetryScheduler scheduler(&getExecutor(), request, callback, std::move(policy)); + ASSERT_FALSE(scheduler.isActive()); + + getExecutor().shutdown(); + + ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, scheduler.startup()); + ASSERT_FALSE(scheduler.isActive()); +} + +TEST_F(RemoteCommandRetrySchedulerTest, + ShuttingDownExecutorAfterSchedulerStartupInvokesCallbackWithCallbackCanceledError) { + CallbackResponseSaver callback; + auto policy = RemoteCommandRetryScheduler::makeRetryPolicy( + 10U, Milliseconds(1), {ErrorCodes::HostNotFound}); + RemoteCommandRetryScheduler scheduler( + &getExecutor(), request, stdx::ref(callback), std::move(policy)); + start(&scheduler); + + auto net = getNet(); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + ASSERT_EQUALS(request, net->getNextReadyRequest()->getRequest()); + } + + getExecutor().shutdown(); + + runReadyNetworkOperations(); + checkCompletionStatus( + &scheduler, callback, {ErrorCodes::CallbackCanceled, "executor shutdown"}); +} + + +TEST_F(RemoteCommandRetrySchedulerTest, + ShuttingDownSchedulerAfterSchedulerStartupInvokesCallbackWithCallbackCanceledError) { + CallbackResponseSaver callback; + auto policy = RemoteCommandRetryScheduler::makeRetryPolicy( + 10U, Milliseconds(1), {ErrorCodes::HostNotFound}); + RemoteCommandRetryScheduler scheduler( + &getExecutor(), request, stdx::ref(callback), std::move(policy)); + start(&scheduler); + + scheduler.shutdown(); + + runReadyNetworkOperations(); + checkCompletionStatus( + &scheduler, callback, {ErrorCodes::CallbackCanceled, "scheduler shutdown"}); +} + +TEST_F(RemoteCommandRetrySchedulerTest, SchedulerInvokesCallbackOnNonRetryableErrorInResponse) { + CallbackResponseSaver callback; + auto policy = RemoteCommandRetryScheduler::makeRetryPolicy( + 10U, Milliseconds(1), RemoteCommandRetryScheduler::kNotMasterErrors); + RemoteCommandRetryScheduler scheduler( + &getExecutor(), request, stdx::ref(callback), std::move(policy)); + start(&scheduler); + + // This should match one of the non-retryable error codes in the policy. + Status response(ErrorCodes::OperationFailed, "injected error"); + + processNetworkResponse(response); + checkCompletionStatus(&scheduler, callback, response); +} + +TEST_F(RemoteCommandRetrySchedulerTest, SchedulerInvokesCallbackOnFirstSuccessfulResponse) { + CallbackResponseSaver callback; + auto policy = RemoteCommandRetryScheduler::makeRetryPolicy( + 10U, Milliseconds(1), {ErrorCodes::HostNotFound}); + RemoteCommandRetryScheduler scheduler( + &getExecutor(), request, stdx::ref(callback), std::move(policy)); + start(&scheduler); + + // Elapsed time in response is ignored on successful responses. + executor::RemoteCommandResponse response( + BSON("ok" << 1 << "x" << 123), BSON("z" << 456), Milliseconds(100)); + + processNetworkResponse(response); + checkCompletionStatus(&scheduler, callback, response); +} + +TEST_F(RemoteCommandRetrySchedulerTest, SchedulerIgnoresEmbeddedErrorInSuccessfulResponse) { + CallbackResponseSaver callback; + auto policy = RemoteCommandRetryScheduler::makeRetryPolicy( + 10U, Milliseconds(1), {ErrorCodes::HostNotFound}); + RemoteCommandRetryScheduler scheduler( + &getExecutor(), request, stdx::ref(callback), std::move(policy)); + start(&scheduler); + + // Scheduler does not parse document in a successful response for embedded errors. + // This is the case with some commands (e.g. find) which do not always return errors using the + // wire protocol. + executor::RemoteCommandResponse response(BSON("ok" << 0 << "code" + << int(ErrorCodes::FailedToParse) << "errmsg" + << "injected error"), + BSON("z" << 456), + Milliseconds(100)); + + processNetworkResponse(response); + checkCompletionStatus(&scheduler, callback, response); +} + +TEST_F(RemoteCommandRetrySchedulerTest, + SchedulerInvokesCallbackWithErrorFromExecutorIfScheduleRemoteCommandFailsOnRetry) { + CallbackResponseSaver callback; + auto policy = RemoteCommandRetryScheduler::makeRetryPolicy( + 3U, executor::RemoteCommandRequest::kNoTimeout, {ErrorCodes::HostNotFound}); + TaskExecutorWithFailureInScheduleRemoteCommand badExecutor(&getExecutor()); + RemoteCommandRetryScheduler scheduler( + &badExecutor, request, stdx::ref(callback), std::move(policy)); + start(&scheduler); + + processNetworkResponse({ErrorCodes::HostNotFound, "first"}); + + // scheduleRemoteCommand() will fail with ErrorCodes::ShutdownInProgress when trying to send + // third remote command request after processing second failed response. + badExecutor.scheduleRemoteCommandFailPoint = true; + processNetworkResponse({ErrorCodes::HostNotFound, "second"}); + + checkCompletionStatus(&scheduler, callback, {ErrorCodes::ShutdownInProgress, ""}); +} + +TEST_F(RemoteCommandRetrySchedulerTest, + SchedulerEnforcesPolicyMaximumAttemptsAndReturnsErrorOfLastFailedRequest) { + CallbackResponseSaver callback; + auto policy = RemoteCommandRetryScheduler::makeRetryPolicy( + 3U, + executor::RemoteCommandRequest::kNoTimeout, + RemoteCommandRetryScheduler::kAllRetriableErrors); + RemoteCommandRetryScheduler scheduler( + &getExecutor(), request, stdx::ref(callback), std::move(policy)); + start(&scheduler); + + processNetworkResponse({ErrorCodes::HostNotFound, "first"}); + processNetworkResponse({ErrorCodes::HostUnreachable, "second"}); + + Status response(ErrorCodes::NetworkTimeout, "last"); + processNetworkResponse(response); + checkCompletionStatus(&scheduler, callback, response); +} + +TEST_F(RemoteCommandRetrySchedulerTest, SchedulerShouldRetryUntilSuccessfulResponseIsReceived) { + CallbackResponseSaver callback; + auto policy = RemoteCommandRetryScheduler::makeRetryPolicy( + 3U, executor::RemoteCommandRequest::kNoTimeout, {ErrorCodes::HostNotFound}); + RemoteCommandRetryScheduler scheduler( + &getExecutor(), request, stdx::ref(callback), std::move(policy)); + start(&scheduler); + + processNetworkResponse({ErrorCodes::HostNotFound, "first"}); + + executor::RemoteCommandResponse response( + BSON("ok" << 1 << "x" << 123), BSON("z" << 456), Milliseconds(100)); + processNetworkResponse(response); + checkCompletionStatus(&scheduler, callback, response); +} + +} // namespace diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp index 054693178d2..1c822c9c48b 100644 --- a/src/mongo/executor/network_interface_mock.cpp +++ b/src/mongo/executor/network_interface_mock.cpp @@ -559,5 +559,13 @@ void NetworkInterfaceMock::NetworkOperation::finishResponse() { _onFinish = RemoteCommandCompletionFn(); } +NetworkInterfaceMock::InNetworkGuard::InNetworkGuard(NetworkInterfaceMock* net) : _net(net) { + _net->enterNetwork(); +} + +NetworkInterfaceMock::InNetworkGuard::~InNetworkGuard() { + _net->exitNetwork(); +} + } // namespace executor } // namespace mongo diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h index 3d69bad78a5..ac516ba2f5a 100644 --- a/src/mongo/executor/network_interface_mock.h +++ b/src/mongo/executor/network_interface_mock.h @@ -35,6 +35,7 @@ #include <utility> #include <vector> +#include "mongo/base/disallow_copying.h" #include "mongo/executor/network_interface.h" #include "mongo/rpc/metadata/metadata_hook.h" #include "mongo/stdx/condition_variable.h" @@ -118,6 +119,11 @@ public: //////////////////////////////////////////////////////////////////////////////// /** + * RAII-style class for entering and exiting network. + */ + class InNetworkGuard; + + /** * Causes the currently running (non-executor) thread to assume the mantle of the network * simulation thread. * @@ -421,5 +427,16 @@ private: RemoteCommandCompletionFn _onFinish; }; +class NetworkInterfaceMock::InNetworkGuard { + MONGO_DISALLOW_COPYING(InNetworkGuard); + +public: + explicit InNetworkGuard(NetworkInterfaceMock* net); + ~InNetworkGuard(); + +private: + NetworkInterfaceMock* _net; +}; + } // namespace executor } // namespace mongo diff --git a/src/mongo/executor/remote_command_request.cpp b/src/mongo/executor/remote_command_request.cpp index d1c10cdd1f9..1f6486afd70 100644 --- a/src/mongo/executor/remote_command_request.cpp +++ b/src/mongo/executor/remote_command_request.cpp @@ -30,6 +30,8 @@ #include "mongo/executor/remote_command_request.h" +#include <ostream> + #include "mongo/platform/atomic_word.h" #include "mongo/util/mongoutils/str.h" @@ -85,5 +87,22 @@ std::string RemoteCommandRequest::toString() const { return out; } +bool RemoteCommandRequest::operator==(const RemoteCommandRequest& rhs) const { + if (this == &rhs) { + return true; + } + return target == rhs.target && dbname == rhs.dbname && cmdObj == rhs.cmdObj && + metadata == rhs.metadata && timeout == rhs.timeout; +} + +bool RemoteCommandRequest::operator!=(const RemoteCommandRequest& rhs) const { + return !(*this == rhs); +} + } // namespace executor + +std::ostream& operator<<(std::ostream& os, const executor::RemoteCommandRequest& request) { + return os << request.toString(); +} + } // namespace mongo diff --git a/src/mongo/executor/remote_command_request.h b/src/mongo/executor/remote_command_request.h index 7e5b96618af..975f7498a71 100644 --- a/src/mongo/executor/remote_command_request.h +++ b/src/mongo/executor/remote_command_request.h @@ -28,6 +28,7 @@ #pragma once +#include <iosfwd> #include <string> #include "mongo/db/jsobj.h" @@ -85,6 +86,9 @@ struct RemoteCommandRequest { std::string toString() const; + bool operator==(const RemoteCommandRequest& rhs) const; + bool operator!=(const RemoteCommandRequest& rhs) const; + // Internal id of this request. Not interpereted and used for tracing purposes only. RequestId id; @@ -99,4 +103,7 @@ struct RemoteCommandRequest { }; } // namespace executor + +std::ostream& operator<<(std::ostream& os, const executor::RemoteCommandRequest& response); + } // namespace mongo diff --git a/src/mongo/executor/remote_command_response.cpp b/src/mongo/executor/remote_command_response.cpp index a0b1baeeb79..ab67febd598 100644 --- a/src/mongo/executor/remote_command_response.cpp +++ b/src/mongo/executor/remote_command_response.cpp @@ -48,5 +48,21 @@ std::string RemoteCommandResponse::toString() const { << " cmd:" << data.toString(); } +bool RemoteCommandResponse::operator==(const RemoteCommandResponse& rhs) const { + if (this == &rhs) { + return true; + } + return data == rhs.data && metadata == rhs.metadata && elapsedMillis == rhs.elapsedMillis; +} + +bool RemoteCommandResponse::operator!=(const RemoteCommandResponse& rhs) const { + return !(*this == rhs); +} + } // namespace executor + +std::ostream& operator<<(std::ostream& os, const executor::RemoteCommandResponse& response) { + return os << response.toString(); +} + } // namespace mongo diff --git a/src/mongo/executor/remote_command_response.h b/src/mongo/executor/remote_command_response.h index d72dcf28329..b6cedb8f976 100644 --- a/src/mongo/executor/remote_command_response.h +++ b/src/mongo/executor/remote_command_response.h @@ -28,6 +28,7 @@ #pragma once +#include <iosfwd> #include <string> #include <memory> @@ -66,6 +67,9 @@ struct RemoteCommandResponse { std::string toString() const; + bool operator==(const RemoteCommandResponse& rhs) const; + bool operator!=(const RemoteCommandResponse& rhs) const; + std::shared_ptr<const Message> message; // May be null. BSONObj data; // Either owned or points into message. BSONObj metadata; // Either owned or points into message. @@ -73,4 +77,7 @@ struct RemoteCommandResponse { }; } // namespace executor + +std::ostream& operator<<(std::ostream& os, const executor::RemoteCommandResponse& request); + } // namespace mongo diff --git a/src/mongo/executor/task_executor_test_common.cpp b/src/mongo/executor/task_executor_test_common.cpp index f427bdd40b4..d6c479a1a32 100644 --- a/src/mongo/executor/task_executor_test_common.cpp +++ b/src/mongo/executor/task_executor_test_common.cpp @@ -106,14 +106,6 @@ public: }); \ void CET_##TEST_NAME::_doTest() -bool operator==(const RemoteCommandRequest lhs, const RemoteCommandRequest rhs) { - return lhs.target == rhs.target && lhs.dbname == rhs.dbname && lhs.cmdObj == rhs.cmdObj; -} - -bool operator!=(const RemoteCommandRequest lhs, const RemoteCommandRequest rhs) { - return !(lhs == rhs); -} - void setStatus(const TaskExecutor::CallbackArgs& cbData, Status* target) { *target = cbData.status; } diff --git a/src/mongo/s/client/SConscript b/src/mongo/s/client/SConscript index 3ea184d27b6..62ea2fa0fb7 100644 --- a/src/mongo/s/client/SConscript +++ b/src/mongo/s/client/SConscript @@ -50,10 +50,11 @@ env.CppUnitTest( LIBDEPS=[ 'sharding_client', 'sharding_connection_hook', - '$BUILD_DIR/mongo/s/coreshard', + '$BUILD_DIR/mongo/client/remote_command_retry_scheduler', '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', '$BUILD_DIR/mongo/db/service_context_noop_init', '$BUILD_DIR/mongo/dbtests/mocklib', + '$BUILD_DIR/mongo/s/coreshard', '$BUILD_DIR/mongo/s/mongoscore', '$BUILD_DIR/mongo/util/net/network', ] diff --git a/src/mongo/s/client/shard_remote.cpp b/src/mongo/s/client/shard_remote.cpp index 5024c50451b..8f7322ef307 100644 --- a/src/mongo/s/client/shard_remote.cpp +++ b/src/mongo/s/client/shard_remote.cpp @@ -32,10 +32,12 @@ #include "mongo/s/client/shard_remote.h" +#include <algorithm> #include <string> #include "mongo/client/fetcher.h" #include "mongo/client/read_preference.h" +#include "mongo/client/remote_command_retry_scheduler.h" #include "mongo/client/remote_command_targeter.h" #include "mongo/client/replica_set_monitor.h" #include "mongo/db/jsobj.h" @@ -43,7 +45,6 @@ #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/query/lite_parsed_query.h" #include "mongo/executor/task_executor_pool.h" -#include "mongo/platform/unordered_set.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/rpc/metadata/server_selection_metadata.h" @@ -85,31 +86,6 @@ const BSONObj kReplSecondaryOkMetadata{[] { return o.obj(); }()}; -class ErrorCodesHash { -public: - size_t operator()(ErrorCodes::Error e) const { - return std::hash<typename std::underlying_type<ErrorCodes::Error>::type>()(e); - } -}; - -using ErrorCodesSet = unordered_set<ErrorCodes::Error, ErrorCodesHash>; - -const ErrorCodesSet kNotMasterErrors{ - ErrorCodes::NotMaster, ErrorCodes::NotMasterNoSlaveOk, ErrorCodes::NotMasterOrSecondary}; - -const ErrorCodesSet kAllRetriableErrors{ - ErrorCodes::NotMaster, - ErrorCodes::NotMasterNoSlaveOk, - ErrorCodes::NotMasterOrSecondary, - // If write concern failed to be satisfied on the remote server, this most probably means that - // some of the secondary nodes were unreachable or otherwise unresponsive, so the call is safe - // to be retried if idempotency can be guaranteed. - ErrorCodes::WriteConcernFailed, - ErrorCodes::HostUnreachable, - ErrorCodes::HostNotFound, - ErrorCodes::NetworkTimeout, - ErrorCodes::InterruptedDueToReplStateChange}; - /** * Returns a new BSONObj describing the same command and arguments as 'cmdObj', but with a maxTimeMS * set on it that is the minimum of the maxTimeMS in 'cmdObj' (if present), 'maxTimeMicros', and @@ -172,12 +148,10 @@ bool ShardRemote::isRetriableError(ErrorCodes::Error code, RetryPolicy options) return false; } - if (options == RetryPolicy::kIdempotent) { - return kAllRetriableErrors.count(code); - } else { - invariant(options == RetryPolicy::kNotIdempotent); - return kNotMasterErrors.count(code); - } + const auto& retriableErrors = options == RetryPolicy::kIdempotent + ? RemoteCommandRetryScheduler::kAllRetriableErrors + : RemoteCommandRetryScheduler::kNotMasterErrors; + return std::find(retriableErrors.begin(), retriableErrors.end(), code) != retriableErrors.end(); } const ConnectionString ShardRemote::getConnString() const { |