From b29905578c6a537a2e94c9c934601aff1c02fd9b Mon Sep 17 00:00:00 2001 From: Kaloian Manassiev Date: Tue, 8 Jan 2019 18:32:49 -0500 Subject: SERVER-37880 Implement an AsyncWorkScheduler without cancellation --- .../db/transaction_coordinator_futures_util.cpp | 74 ++++++ .../db/transaction_coordinator_futures_util.h | 101 ++++++++ .../transaction_coordinator_futures_util_test.cpp | 271 +++++++++++++++++++++ 3 files changed, 446 insertions(+) diff --git a/src/mongo/db/transaction_coordinator_futures_util.cpp b/src/mongo/db/transaction_coordinator_futures_util.cpp index ae150724f1c..f072d8b330b 100644 --- a/src/mongo/db/transaction_coordinator_futures_util.cpp +++ b/src/mongo/db/transaction_coordinator_futures_util.cpp @@ -28,12 +28,86 @@ * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kTransaction + #include "mongo/platform/basic.h" #include "mongo/db/transaction_coordinator_futures_util.h" +#include "mongo/client/remote_command_retry_scheduler.h" +#include "mongo/client/remote_command_targeter.h" +#include "mongo/util/log.h" + namespace mongo { namespace txn { +namespace { + +using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackArgs; +using ResponseStatus = executor::TaskExecutor::ResponseStatus; + +} // namespace + +AsyncWorkScheduler::AsyncWorkScheduler(ServiceContext* serviceContext) + : _serviceContext(serviceContext), + _executor(Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor()) {} + +AsyncWorkScheduler::~AsyncWorkScheduler() = default; + +Future AsyncWorkScheduler::scheduleRemoteCommand( + const ShardId& shardId, const ReadPreferenceSetting& readPref, const BSONObj& commandObj) { + auto promiseAndFuture = makePromiseFuture(); + auto sharedPromise = + std::make_shared>(std::move(promiseAndFuture.promise)); + + _targetHostAsync(shardId, readPref) + .then([ this, shardId, sharedPromise, commandObj = commandObj.getOwned(), readPref ]( + HostAndPort shardHostAndPort) mutable { + LOG(3) << "Coordinator sending command " << commandObj << " to shard " << shardId; + + executor::RemoteCommandRequest request(shardHostAndPort, + NamespaceString::kAdminDb.toString(), + commandObj, + readPref.toContainingBSON(), + nullptr); + + uassertStatusOK(_executor->scheduleRemoteCommand( + request, [ commandObj = commandObj.getOwned(), + shardId, + sharedPromise ](const RemoteCommandCallbackArgs& args) mutable { + LOG(3) << "Coordinator shard got response " << args.response.data << " for " + << commandObj << " to " << shardId; + auto status = args.response.status; + // Only consider actual failures to send the command as errors. + if (status.isOK()) { + sharedPromise->emplaceValue(args.response); + } else { + sharedPromise->setError(status); + } + })); + }) + .onError([ shardId, commandObj = commandObj.getOwned(), sharedPromise ](Status s) { + LOG(3) << "Coordinator shard failed to target command " << commandObj << " to shard " + << shardId << causedBy(s); + + sharedPromise->setError(s); + }) + .getAsync([](Status) {}); + + // Do not wait for the callback to run. The callback will reschedule the remote request on + // the same executor if necessary. + return std::move(promiseAndFuture.future); +} + +Future AsyncWorkScheduler::_targetHostAsync(const ShardId& shardId, + const ReadPreferenceSetting& readPref) { + return scheduleWork([shardId, readPref](OperationContext* opCtx) { + const auto shardRegistry = Grid::get(opCtx)->shardRegistry(); + const auto shard = uassertStatusOK(shardRegistry->getShard(opCtx, shardId)); + + // TODO (SERVER-35678): Return a SemiFuture rather than using a blocking call + return shard->getTargeter()->findHostWithMaxWait(readPref, Seconds(20)).get(opCtx); + }); +} Future whenAll(std::vector>& futures) { std::vector> dummyFutures; diff --git a/src/mongo/db/transaction_coordinator_futures_util.h b/src/mongo/db/transaction_coordinator_futures_util.h index 8b941e4bdcb..c64071d53a4 100644 --- a/src/mongo/db/transaction_coordinator_futures_util.h +++ b/src/mongo/db/transaction_coordinator_futures_util.h @@ -32,12 +32,80 @@ #include +#include "mongo/client/read_preference.h" +#include "mongo/executor/task_executor.h" +#include "mongo/executor/task_executor_pool.h" +#include "mongo/s/grid.h" #include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/future.h" +#include "mongo/util/time_support.h" namespace mongo { namespace txn { +/** + * This class groups all the asynchronous work scheduled by a given TransactionCoordinatorDriver. + */ +class AsyncWorkScheduler { +public: + AsyncWorkScheduler(ServiceContext* serviceContext); + ~AsyncWorkScheduler(); + + template + Future> scheduleWork( + Callable&& task) noexcept { + return scheduleWorkIn(Milliseconds(0), std::forward(task)); + } + + template + Future> scheduleWorkIn( + Milliseconds millis, Callable&& task) noexcept { + return scheduleWorkAt(_executor->now() + millis, std::forward(task)); + } + + template + Future> scheduleWorkAt( + Date_t when, Callable&& task) noexcept { + using ReturnType = FutureContinuationResult; + auto pf = makePromiseFuture(); + auto taskCompletionPromise = std::make_shared>(std::move(pf.promise)); + try { + uassertStatusOK(_executor->scheduleWorkAt( + when, + [ this, task = std::forward(task), taskCompletionPromise ]( + const executor::TaskExecutor::CallbackArgs&) mutable noexcept { + ThreadClient tc(_serviceContext); + auto uniqueOpCtx = Client::getCurrent()->makeOperationContext(); + taskCompletionPromise->setWith([&] { return task(uniqueOpCtx.get()); }); + })); + } catch (const DBException& ex) { + taskCompletionPromise->setError(ex.toStatus()); + } + + return std::move(pf.future); + } + + /** + * Sends a command asynchronously to the given shard and returns a Future when that request + * completes (with error or not). + */ + Future scheduleRemoteCommand( + const ShardId& shardId, const ReadPreferenceSetting& readPref, const BSONObj& commandObj); + +private: + /** + * Finds the host and port for a shard. + */ + Future _targetHostAsync(const ShardId& shardId, + const ReadPreferenceSetting& readPref); + + // Service context under which this executor runs + ServiceContext* const _serviceContext; + + // Cached reference to the executor to use + executor::TaskExecutor* const _executor; +}; + enum class ShouldStopIteration { kYes, kNo }; /** @@ -199,6 +267,39 @@ Future> async(ThreadPool* pool, Callable&& ta */ Future whenAll(std::vector>& futures); +/** + * Executes a function returning a Future until the function does not return an error status or + * until one of the provided error codes is returned. + */ +template +Future> doWhile(AsyncWorkScheduler& scheduler, + boost::optional backoff, + ShouldRetryFn&& shouldRetryFn, + LoopBodyFn&& f) { + using ReturnType = typename decltype(f())::value_type; + auto future = f(); + return std::move(future).onCompletion([ + &scheduler, + backoff = std::move(backoff), + shouldRetryFn = std::forward(shouldRetryFn), + f = std::forward(f) + ](StatusOrStatusWith s) mutable { + if (!shouldRetryFn(s)) + return Future(std::move(s)); + + // Retry after a delay. + const auto delayMillis = (backoff ? backoff->nextSleep() : Milliseconds(0)); + return scheduler.scheduleWorkIn(delayMillis, [](OperationContext* opCtx) {}).then([ + &scheduler, + backoff = std::move(backoff), + shouldRetryFn = std::move(shouldRetryFn), + f = std::move(f) + ]() mutable { + return doWhile(scheduler, std::move(backoff), std::move(shouldRetryFn), std::move(f)); + }); + }); +} + /** * Executes a function returning a Future until the function does not return an error status or * until one of the provided error codes is returned. diff --git a/src/mongo/db/transaction_coordinator_futures_util_test.cpp b/src/mongo/db/transaction_coordinator_futures_util_test.cpp index 198f7af34a9..86754e7ef60 100644 --- a/src/mongo/db/transaction_coordinator_futures_util_test.cpp +++ b/src/mongo/db/transaction_coordinator_futures_util_test.cpp @@ -29,10 +29,16 @@ #include "mongo/platform/basic.h" +#include "mongo/client/remote_command_targeter_mock.h" #include "mongo/db/transaction_coordinator_futures_util.h" +#include "mongo/s/catalog/sharding_catalog_client_mock.h" +#include "mongo/s/catalog/type_shard.h" +#include "mongo/s/shard_server_test_fixture.h" +#include "mongo/unittest/barrier.h" #include "mongo/unittest/unittest.h" namespace mongo { +namespace txn { namespace { TEST(TransactionCoordinatorFuturesUtilTest, CollectReturnsInitValueWhenInputIsEmptyVector) { @@ -83,5 +89,270 @@ TEST(TransactionCoordinatorFuturesUtilTest, CollectReturnsCombinedResultWithSeve ASSERT_EQ(resultFuture.get(), std::accumulate(futureValues.begin(), futureValues.end(), 0)); } + +class AsyncWorkSchedulerTest : public ShardServerTestFixture { +protected: + void setUp() override { + ShardServerTestFixture::setUp(); + + for (const auto& shardId : kShardIds) { + auto shardTargeter = RemoteCommandTargeterMock::get( + uassertStatusOK(shardRegistry()->getShard(operationContext(), shardId)) + ->getTargeter()); + shardTargeter->setFindHostReturnValue(HostAndPort(str::stream() << shardId << ":123")); + } + } + + void assertCommandSentAndRespondWith(const StringData& commandName, + const StatusWith& response, + boost::optional expectedWriteConcern) { + onCommand([&](const executor::RemoteCommandRequest& request) { + ASSERT_EQ(request.cmdObj.firstElement().fieldNameStringData(), commandName); + if (expectedWriteConcern) { + ASSERT_BSONOBJ_EQ( + *expectedWriteConcern, + request.cmdObj.getObjectField(WriteConcernOptions::kWriteConcernField)); + } + return response; + }); + } + + // Override the CatalogClient to make CatalogClient::getAllShards automatically return the + // expected shards. We cannot mock the network responses for the ShardRegistry reload, since the + // ShardRegistry reload is done over DBClient, not the NetworkInterface, and there is no + // DBClientMock analogous to the NetworkInterfaceMock. + std::unique_ptr makeShardingCatalogClient( + std::unique_ptr distLockManager) override { + + class StaticCatalogClient final : public ShardingCatalogClientMock { + public: + StaticCatalogClient() : ShardingCatalogClientMock(nullptr) {} + + StatusWith>> getAllShards( + OperationContext* opCtx, repl::ReadConcernLevel readConcern) override { + std::vector shardTypes; + for (const auto& shardId : makeThreeShardIdsList()) { + const ConnectionString cs = ConnectionString::forReplicaSet( + shardId.toString(), {HostAndPort(str::stream() << shardId << ":123")}); + ShardType sType; + sType.setName(cs.getSetName()); + sType.setHost(cs.toString()); + shardTypes.push_back(std::move(sType)); + }; + return repl::OpTimeWith>(shardTypes); + } + }; + + return stdx::make_unique(); + } + + static std::vector makeThreeShardIdsList() { + return std::vector{{"s1"}, {"s2"}, {"s3"}}; + } + const std::vector kShardIds = makeThreeShardIdsList(); +}; + +TEST_F(AsyncWorkSchedulerTest, ScheduledBlockingWorkSucceeds) { + AsyncWorkScheduler async(getServiceContext()); + + unittest::Barrier barrier(2); + auto pf = makePromiseFuture(); + auto future = + async.scheduleWork([&barrier, future = std::move(pf.future) ](OperationContext * opCtx) { + barrier.countDownAndWait(); + return future.get(opCtx); + }); + + barrier.countDownAndWait(); + ASSERT(!future.isReady()); + + pf.promise.emplaceValue(5); + ASSERT_EQ(5, future.get(operationContext())); +} + +TEST_F(AsyncWorkSchedulerTest, ScheduledBlockingWorkThrowsException) { + AsyncWorkScheduler async(getServiceContext()); + + unittest::Barrier barrier(2); + auto pf = makePromiseFuture(); + auto future = + async.scheduleWork([&barrier, future = std::move(pf.future) ](OperationContext * opCtx) { + barrier.countDownAndWait(); + future.get(opCtx); + uasserted(ErrorCodes::InternalError, "Test error"); + }); + + barrier.countDownAndWait(); + ASSERT(!future.isReady()); + + pf.promise.emplaceValue(5); + ASSERT_THROWS_CODE( + future.get(operationContext()), AssertionException, ErrorCodes::InternalError); +} + +TEST_F(AsyncWorkSchedulerTest, ScheduledBlockingWorkInSucceeds) { + AsyncWorkScheduler async(getServiceContext()); + + auto pf = makePromiseFuture(); + auto future = async.scheduleWorkIn( + Milliseconds{10}, + [future = std::move(pf.future)](OperationContext * opCtx) { return future.get(opCtx); }); + + pf.promise.emplaceValue(5); + ASSERT(!future.isReady()); + + { + executor::NetworkInterfaceMock::InNetworkGuard guard(network()); + network()->runUntil(network()->now() + Milliseconds{5}); + ASSERT(!future.isReady()); + } + + { + executor::NetworkInterfaceMock::InNetworkGuard guard(network()); + network()->runUntil(network()->now() + Milliseconds{5}); + ASSERT(future.isReady()); + } + + ASSERT_EQ(5, future.get(operationContext())); +} + +TEST_F(AsyncWorkSchedulerTest, ScheduledRemoteCommandRespondsOK) { + AsyncWorkScheduler async(getServiceContext()); + + auto future = async.scheduleRemoteCommand( + kShardIds[1], ReadPreferenceSetting{ReadPreference::PrimaryOnly}, BSON("TestCommand" << 1)); + ASSERT(!future.isReady()); + + const auto objResponse = BSON("ok" << 1 << "responseData" << 2); + onCommand([&](const executor::RemoteCommandRequest& request) { + ASSERT_BSONOBJ_EQ(BSON("TestCommand" << 1), request.cmdObj); + return objResponse; + }); + + const auto& response = future.get(operationContext()); + ASSERT(response.isOK()); + ASSERT_BSONOBJ_EQ(objResponse, response.data); +} + +TEST_F(AsyncWorkSchedulerTest, ScheduledRemoteCommandRespondsNotOK) { + AsyncWorkScheduler async(getServiceContext()); + + auto future = async.scheduleRemoteCommand( + kShardIds[1], ReadPreferenceSetting{ReadPreference::PrimaryOnly}, BSON("TestCommand" << 2)); + ASSERT(!future.isReady()); + + const auto objResponse = BSON("ok" << 0 << "responseData" << 3); + onCommand([&](const executor::RemoteCommandRequest& request) { + ASSERT_BSONOBJ_EQ(BSON("TestCommand" << 2), request.cmdObj); + return objResponse; + }); + + const auto& response = future.get(operationContext()); + ASSERT(response.isOK()); + ASSERT_BSONOBJ_EQ(objResponse, response.data); +} + +TEST_F(AsyncWorkSchedulerTest, ScheduledRemoteCommandsOneOKAndOneError) { + AsyncWorkScheduler async(getServiceContext()); + + auto future1 = async.scheduleRemoteCommand( + kShardIds[1], ReadPreferenceSetting{ReadPreference::PrimaryOnly}, BSON("TestCommand" << 2)); + auto future2 = async.scheduleRemoteCommand( + kShardIds[2], ReadPreferenceSetting{ReadPreference::PrimaryOnly}, BSON("TestCommand" << 3)); + + ASSERT(!future1.isReady()); + ASSERT(!future2.isReady()); + + onCommand([](const executor::RemoteCommandRequest& request) { + return BSON("ok" << 1 << "responseData" << 3); + }); + onCommand([](const executor::RemoteCommandRequest& request) { + return BSON("ok" << 0 << "responseData" << 3); + }); + + const auto& response2 = future2.get(operationContext()); + ASSERT(response2.isOK()); + + const auto& response1 = future1.get(operationContext()); + ASSERT(response1.isOK()); +} + + +using DoWhileTest = AsyncWorkSchedulerTest; + +TEST_F(DoWhileTest, LoopBodyExecutesAtLeastOnceWithBackoff) { + AsyncWorkScheduler async(getServiceContext()); + + int numLoops = 0; + auto future = doWhile(async, + Backoff(Seconds(1), Milliseconds::max()), + [](const StatusWith& status) { + uassertStatusOK(status); + return false; + }, + [&numLoops] { return Future::makeReady(++numLoops); }); + + ASSERT(future.isReady()); + ASSERT_EQ(1, numLoops); + ASSERT_EQ(1, future.get(operationContext())); +} + +TEST_F(DoWhileTest, LoopBodyExecutesManyIterationsWithoutBackoff) { + AsyncWorkScheduler async(getServiceContext()); + + int remainingLoops = 100'000; + auto future = doWhile(async, + boost::none, + [&remainingLoops](const StatusWith& status) { + uassertStatusOK(status); + return remainingLoops > 0; + }, + [&remainingLoops] { return Future::makeReady(--remainingLoops); }); + + ASSERT_EQ(0, future.get(operationContext())); + ASSERT_EQ(0, remainingLoops); +} + +TEST_F(DoWhileTest, LoopObeysBackoff) { + AsyncWorkScheduler async(getServiceContext()); + + int numLoops = 0; + auto future = doWhile(async, + Backoff(Seconds(1), Milliseconds::max()), + [](const StatusWith& status) { return uassertStatusOK(status) < 3; }, + [&numLoops] { return Future::makeReady(++numLoops); }); + + // The loop body needs to execute at least once + ASSERT(!future.isReady()); + ASSERT_EQ(1, numLoops); + + // Back-off is 1 millisecond now + { + executor::NetworkInterfaceMock::InNetworkGuard guard(network()); + network()->runUntil(network()->now() + Milliseconds{1}); + ASSERT(!future.isReady()); + ASSERT_EQ(2, numLoops); + } + + // Back-off is 2 milliseconds now, so advancing the time by 1 millisecond will not cause the + // loop body to run + { + executor::NetworkInterfaceMock::InNetworkGuard guard(network()); + network()->runUntil(network()->now() + Milliseconds{1}); + ASSERT(!future.isReady()); + ASSERT_EQ(2, numLoops); + } + + { + executor::NetworkInterfaceMock::InNetworkGuard guard(network()); + network()->runUntil(network()->now() + Seconds{1}); + ASSERT(future.isReady()); + ASSERT_EQ(3, numLoops); + } + + ASSERT_EQ(3, future.get(operationContext())); +} + } // namespace +} // namespace txn } // namespace mongo -- cgit v1.2.1