summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-01-08 18:32:49 -0500
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-01-15 12:30:30 -0500
commitb29905578c6a537a2e94c9c934601aff1c02fd9b (patch)
tree295cf4d5a46f9d49728cbda6c4cd0b87e9b0e300
parent9d78b85155127d5b82a216e634ba7ee0c7c5d87d (diff)
downloadmongo-b29905578c6a537a2e94c9c934601aff1c02fd9b.tar.gz
SERVER-37880 Implement an AsyncWorkScheduler without cancellation
-rw-r--r--src/mongo/db/transaction_coordinator_futures_util.cpp74
-rw-r--r--src/mongo/db/transaction_coordinator_futures_util.h101
-rw-r--r--src/mongo/db/transaction_coordinator_futures_util_test.cpp271
3 files changed, 446 insertions, 0 deletions
diff --git a/src/mongo/db/transaction_coordinator_futures_util.cpp b/src/mongo/db/transaction_coordinator_futures_util.cpp
index ae150724f1c..f072d8b330b 100644
--- a/src/mongo/db/transaction_coordinator_futures_util.cpp
+++ b/src/mongo/db/transaction_coordinator_futures_util.cpp
@@ -28,12 +28,86 @@
* it in the license file.
*/
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kTransaction
+
#include "mongo/platform/basic.h"
#include "mongo/db/transaction_coordinator_futures_util.h"
+#include "mongo/client/remote_command_retry_scheduler.h"
+#include "mongo/client/remote_command_targeter.h"
+#include "mongo/util/log.h"
+
namespace mongo {
namespace txn {
+namespace {
+
+using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackArgs;
+using ResponseStatus = executor::TaskExecutor::ResponseStatus;
+
+} // namespace
+
+AsyncWorkScheduler::AsyncWorkScheduler(ServiceContext* serviceContext)
+ : _serviceContext(serviceContext),
+ _executor(Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor()) {}
+
+AsyncWorkScheduler::~AsyncWorkScheduler() = default;
+
+Future<executor::TaskExecutor::ResponseStatus> AsyncWorkScheduler::scheduleRemoteCommand(
+ const ShardId& shardId, const ReadPreferenceSetting& readPref, const BSONObj& commandObj) {
+ auto promiseAndFuture = makePromiseFuture<ResponseStatus>();
+ auto sharedPromise =
+ std::make_shared<Promise<ResponseStatus>>(std::move(promiseAndFuture.promise));
+
+ _targetHostAsync(shardId, readPref)
+ .then([ this, shardId, sharedPromise, commandObj = commandObj.getOwned(), readPref ](
+ HostAndPort shardHostAndPort) mutable {
+ LOG(3) << "Coordinator sending command " << commandObj << " to shard " << shardId;
+
+ executor::RemoteCommandRequest request(shardHostAndPort,
+ NamespaceString::kAdminDb.toString(),
+ commandObj,
+ readPref.toContainingBSON(),
+ nullptr);
+
+ uassertStatusOK(_executor->scheduleRemoteCommand(
+ request, [ commandObj = commandObj.getOwned(),
+ shardId,
+ sharedPromise ](const RemoteCommandCallbackArgs& args) mutable {
+ LOG(3) << "Coordinator shard got response " << args.response.data << " for "
+ << commandObj << " to " << shardId;
+ auto status = args.response.status;
+ // Only consider actual failures to send the command as errors.
+ if (status.isOK()) {
+ sharedPromise->emplaceValue(args.response);
+ } else {
+ sharedPromise->setError(status);
+ }
+ }));
+ })
+ .onError([ shardId, commandObj = commandObj.getOwned(), sharedPromise ](Status s) {
+ LOG(3) << "Coordinator shard failed to target command " << commandObj << " to shard "
+ << shardId << causedBy(s);
+
+ sharedPromise->setError(s);
+ })
+ .getAsync([](Status) {});
+
+ // Do not wait for the callback to run. The callback will reschedule the remote request on
+ // the same executor if necessary.
+ return std::move(promiseAndFuture.future);
+}
+
+Future<HostAndPort> AsyncWorkScheduler::_targetHostAsync(const ShardId& shardId,
+ const ReadPreferenceSetting& readPref) {
+ return scheduleWork([shardId, readPref](OperationContext* opCtx) {
+ const auto shardRegistry = Grid::get(opCtx)->shardRegistry();
+ const auto shard = uassertStatusOK(shardRegistry->getShard(opCtx, shardId));
+
+ // TODO (SERVER-35678): Return a SemiFuture<HostAndPort> rather than using a blocking call
+ return shard->getTargeter()->findHostWithMaxWait(readPref, Seconds(20)).get(opCtx);
+ });
+}
Future<void> whenAll(std::vector<Future<void>>& futures) {
std::vector<Future<int>> dummyFutures;
diff --git a/src/mongo/db/transaction_coordinator_futures_util.h b/src/mongo/db/transaction_coordinator_futures_util.h
index 8b941e4bdcb..c64071d53a4 100644
--- a/src/mongo/db/transaction_coordinator_futures_util.h
+++ b/src/mongo/db/transaction_coordinator_futures_util.h
@@ -32,12 +32,80 @@
#include <vector>
+#include "mongo/client/read_preference.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/executor/task_executor_pool.h"
+#include "mongo/s/grid.h"
#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/future.h"
+#include "mongo/util/time_support.h"
namespace mongo {
namespace txn {
+/**
+ * This class groups all the asynchronous work scheduled by a given TransactionCoordinatorDriver.
+ */
+class AsyncWorkScheduler {
+public:
+ AsyncWorkScheduler(ServiceContext* serviceContext);
+ ~AsyncWorkScheduler();
+
+ template <class Callable>
+ Future<FutureContinuationResult<Callable, OperationContext*>> scheduleWork(
+ Callable&& task) noexcept {
+ return scheduleWorkIn(Milliseconds(0), std::forward<Callable>(task));
+ }
+
+ template <class Callable>
+ Future<FutureContinuationResult<Callable, OperationContext*>> scheduleWorkIn(
+ Milliseconds millis, Callable&& task) noexcept {
+ return scheduleWorkAt(_executor->now() + millis, std::forward<Callable>(task));
+ }
+
+ template <class Callable>
+ Future<FutureContinuationResult<Callable, OperationContext*>> scheduleWorkAt(
+ Date_t when, Callable&& task) noexcept {
+ using ReturnType = FutureContinuationResult<Callable, OperationContext*>;
+ auto pf = makePromiseFuture<ReturnType>();
+ auto taskCompletionPromise = std::make_shared<Promise<ReturnType>>(std::move(pf.promise));
+ try {
+ uassertStatusOK(_executor->scheduleWorkAt(
+ when,
+ [ this, task = std::forward<Callable>(task), taskCompletionPromise ](
+ const executor::TaskExecutor::CallbackArgs&) mutable noexcept {
+ ThreadClient tc(_serviceContext);
+ auto uniqueOpCtx = Client::getCurrent()->makeOperationContext();
+ taskCompletionPromise->setWith([&] { return task(uniqueOpCtx.get()); });
+ }));
+ } catch (const DBException& ex) {
+ taskCompletionPromise->setError(ex.toStatus());
+ }
+
+ return std::move(pf.future);
+ }
+
+ /**
+ * Sends a command asynchronously to the given shard and returns a Future when that request
+ * completes (with error or not).
+ */
+ Future<executor::TaskExecutor::ResponseStatus> scheduleRemoteCommand(
+ const ShardId& shardId, const ReadPreferenceSetting& readPref, const BSONObj& commandObj);
+
+private:
+ /**
+ * Finds the host and port for a shard.
+ */
+ Future<HostAndPort> _targetHostAsync(const ShardId& shardId,
+ const ReadPreferenceSetting& readPref);
+
+ // Service context under which this executor runs
+ ServiceContext* const _serviceContext;
+
+ // Cached reference to the executor to use
+ executor::TaskExecutor* const _executor;
+};
+
enum class ShouldStopIteration { kYes, kNo };
/**
@@ -202,6 +270,39 @@ Future<void> whenAll(std::vector<Future<void>>& futures);
/**
* Executes a function returning a Future until the function does not return an error status or
* until one of the provided error codes is returned.
+ */
+template <class LoopBodyFn, class ShouldRetryFn>
+Future<FutureContinuationResult<LoopBodyFn>> doWhile(AsyncWorkScheduler& scheduler,
+ boost::optional<Backoff> backoff,
+ ShouldRetryFn&& shouldRetryFn,
+ LoopBodyFn&& f) {
+ using ReturnType = typename decltype(f())::value_type;
+ auto future = f();
+ return std::move(future).onCompletion([
+ &scheduler,
+ backoff = std::move(backoff),
+ shouldRetryFn = std::forward<ShouldRetryFn>(shouldRetryFn),
+ f = std::forward<LoopBodyFn>(f)
+ ](StatusOrStatusWith<ReturnType> s) mutable {
+ if (!shouldRetryFn(s))
+ return Future<ReturnType>(std::move(s));
+
+ // Retry after a delay.
+ const auto delayMillis = (backoff ? backoff->nextSleep() : Milliseconds(0));
+ return scheduler.scheduleWorkIn(delayMillis, [](OperationContext* opCtx) {}).then([
+ &scheduler,
+ backoff = std::move(backoff),
+ shouldRetryFn = std::move(shouldRetryFn),
+ f = std::move(f)
+ ]() mutable {
+ return doWhile(scheduler, std::move(backoff), std::move(shouldRetryFn), std::move(f));
+ });
+ });
+}
+
+/**
+ * Executes a function returning a Future until the function does not return an error status or
+ * until one of the provided error codes is returned.
*
* TODO (SERVER-37880): Implement backoff for retries.
*/
diff --git a/src/mongo/db/transaction_coordinator_futures_util_test.cpp b/src/mongo/db/transaction_coordinator_futures_util_test.cpp
index 198f7af34a9..86754e7ef60 100644
--- a/src/mongo/db/transaction_coordinator_futures_util_test.cpp
+++ b/src/mongo/db/transaction_coordinator_futures_util_test.cpp
@@ -29,10 +29,16 @@
#include "mongo/platform/basic.h"
+#include "mongo/client/remote_command_targeter_mock.h"
#include "mongo/db/transaction_coordinator_futures_util.h"
+#include "mongo/s/catalog/sharding_catalog_client_mock.h"
+#include "mongo/s/catalog/type_shard.h"
+#include "mongo/s/shard_server_test_fixture.h"
+#include "mongo/unittest/barrier.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
+namespace txn {
namespace {
TEST(TransactionCoordinatorFuturesUtilTest, CollectReturnsInitValueWhenInputIsEmptyVector) {
@@ -83,5 +89,270 @@ TEST(TransactionCoordinatorFuturesUtilTest, CollectReturnsCombinedResultWithSeve
ASSERT_EQ(resultFuture.get(), std::accumulate(futureValues.begin(), futureValues.end(), 0));
}
+
+class AsyncWorkSchedulerTest : public ShardServerTestFixture {
+protected:
+ void setUp() override {
+ ShardServerTestFixture::setUp();
+
+ for (const auto& shardId : kShardIds) {
+ auto shardTargeter = RemoteCommandTargeterMock::get(
+ uassertStatusOK(shardRegistry()->getShard(operationContext(), shardId))
+ ->getTargeter());
+ shardTargeter->setFindHostReturnValue(HostAndPort(str::stream() << shardId << ":123"));
+ }
+ }
+
+ void assertCommandSentAndRespondWith(const StringData& commandName,
+ const StatusWith<BSONObj>& response,
+ boost::optional<BSONObj> expectedWriteConcern) {
+ onCommand([&](const executor::RemoteCommandRequest& request) {
+ ASSERT_EQ(request.cmdObj.firstElement().fieldNameStringData(), commandName);
+ if (expectedWriteConcern) {
+ ASSERT_BSONOBJ_EQ(
+ *expectedWriteConcern,
+ request.cmdObj.getObjectField(WriteConcernOptions::kWriteConcernField));
+ }
+ return response;
+ });
+ }
+
+ // Override the CatalogClient to make CatalogClient::getAllShards automatically return the
+ // expected shards. We cannot mock the network responses for the ShardRegistry reload, since the
+ // ShardRegistry reload is done over DBClient, not the NetworkInterface, and there is no
+ // DBClientMock analogous to the NetworkInterfaceMock.
+ std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient(
+ std::unique_ptr<DistLockManager> distLockManager) override {
+
+ class StaticCatalogClient final : public ShardingCatalogClientMock {
+ public:
+ StaticCatalogClient() : ShardingCatalogClientMock(nullptr) {}
+
+ StatusWith<repl::OpTimeWith<std::vector<ShardType>>> getAllShards(
+ OperationContext* opCtx, repl::ReadConcernLevel readConcern) override {
+ std::vector<ShardType> shardTypes;
+ for (const auto& shardId : makeThreeShardIdsList()) {
+ const ConnectionString cs = ConnectionString::forReplicaSet(
+ shardId.toString(), {HostAndPort(str::stream() << shardId << ":123")});
+ ShardType sType;
+ sType.setName(cs.getSetName());
+ sType.setHost(cs.toString());
+ shardTypes.push_back(std::move(sType));
+ };
+ return repl::OpTimeWith<std::vector<ShardType>>(shardTypes);
+ }
+ };
+
+ return stdx::make_unique<StaticCatalogClient>();
+ }
+
+ static std::vector<ShardId> makeThreeShardIdsList() {
+ return std::vector<ShardId>{{"s1"}, {"s2"}, {"s3"}};
+ }
+ const std::vector<ShardId> kShardIds = makeThreeShardIdsList();
+};
+
+TEST_F(AsyncWorkSchedulerTest, ScheduledBlockingWorkSucceeds) {
+ AsyncWorkScheduler async(getServiceContext());
+
+ unittest::Barrier barrier(2);
+ auto pf = makePromiseFuture<int>();
+ auto future =
+ async.scheduleWork([&barrier, future = std::move(pf.future) ](OperationContext * opCtx) {
+ barrier.countDownAndWait();
+ return future.get(opCtx);
+ });
+
+ barrier.countDownAndWait();
+ ASSERT(!future.isReady());
+
+ pf.promise.emplaceValue(5);
+ ASSERT_EQ(5, future.get(operationContext()));
+}
+
+TEST_F(AsyncWorkSchedulerTest, ScheduledBlockingWorkThrowsException) {
+ AsyncWorkScheduler async(getServiceContext());
+
+ unittest::Barrier barrier(2);
+ auto pf = makePromiseFuture<int>();
+ auto future =
+ async.scheduleWork([&barrier, future = std::move(pf.future) ](OperationContext * opCtx) {
+ barrier.countDownAndWait();
+ future.get(opCtx);
+ uasserted(ErrorCodes::InternalError, "Test error");
+ });
+
+ barrier.countDownAndWait();
+ ASSERT(!future.isReady());
+
+ pf.promise.emplaceValue(5);
+ ASSERT_THROWS_CODE(
+ future.get(operationContext()), AssertionException, ErrorCodes::InternalError);
+}
+
+TEST_F(AsyncWorkSchedulerTest, ScheduledBlockingWorkInSucceeds) {
+ AsyncWorkScheduler async(getServiceContext());
+
+ auto pf = makePromiseFuture<int>();
+ auto future = async.scheduleWorkIn(
+ Milliseconds{10},
+ [future = std::move(pf.future)](OperationContext * opCtx) { return future.get(opCtx); });
+
+ pf.promise.emplaceValue(5);
+ ASSERT(!future.isReady());
+
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(network());
+ network()->runUntil(network()->now() + Milliseconds{5});
+ ASSERT(!future.isReady());
+ }
+
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(network());
+ network()->runUntil(network()->now() + Milliseconds{5});
+ ASSERT(future.isReady());
+ }
+
+ ASSERT_EQ(5, future.get(operationContext()));
+}
+
+TEST_F(AsyncWorkSchedulerTest, ScheduledRemoteCommandRespondsOK) {
+ AsyncWorkScheduler async(getServiceContext());
+
+ auto future = async.scheduleRemoteCommand(
+ kShardIds[1], ReadPreferenceSetting{ReadPreference::PrimaryOnly}, BSON("TestCommand" << 1));
+ ASSERT(!future.isReady());
+
+ const auto objResponse = BSON("ok" << 1 << "responseData" << 2);
+ onCommand([&](const executor::RemoteCommandRequest& request) {
+ ASSERT_BSONOBJ_EQ(BSON("TestCommand" << 1), request.cmdObj);
+ return objResponse;
+ });
+
+ const auto& response = future.get(operationContext());
+ ASSERT(response.isOK());
+ ASSERT_BSONOBJ_EQ(objResponse, response.data);
+}
+
+TEST_F(AsyncWorkSchedulerTest, ScheduledRemoteCommandRespondsNotOK) {
+ AsyncWorkScheduler async(getServiceContext());
+
+ auto future = async.scheduleRemoteCommand(
+ kShardIds[1], ReadPreferenceSetting{ReadPreference::PrimaryOnly}, BSON("TestCommand" << 2));
+ ASSERT(!future.isReady());
+
+ const auto objResponse = BSON("ok" << 0 << "responseData" << 3);
+ onCommand([&](const executor::RemoteCommandRequest& request) {
+ ASSERT_BSONOBJ_EQ(BSON("TestCommand" << 2), request.cmdObj);
+ return objResponse;
+ });
+
+ const auto& response = future.get(operationContext());
+ ASSERT(response.isOK());
+ ASSERT_BSONOBJ_EQ(objResponse, response.data);
+}
+
+TEST_F(AsyncWorkSchedulerTest, ScheduledRemoteCommandsOneOKAndOneError) {
+ AsyncWorkScheduler async(getServiceContext());
+
+ auto future1 = async.scheduleRemoteCommand(
+ kShardIds[1], ReadPreferenceSetting{ReadPreference::PrimaryOnly}, BSON("TestCommand" << 2));
+ auto future2 = async.scheduleRemoteCommand(
+ kShardIds[2], ReadPreferenceSetting{ReadPreference::PrimaryOnly}, BSON("TestCommand" << 3));
+
+ ASSERT(!future1.isReady());
+ ASSERT(!future2.isReady());
+
+ onCommand([](const executor::RemoteCommandRequest& request) {
+ return BSON("ok" << 1 << "responseData" << 3);
+ });
+ onCommand([](const executor::RemoteCommandRequest& request) {
+ return BSON("ok" << 0 << "responseData" << 3);
+ });
+
+ const auto& response2 = future2.get(operationContext());
+ ASSERT(response2.isOK());
+
+ const auto& response1 = future1.get(operationContext());
+ ASSERT(response1.isOK());
+}
+
+
+using DoWhileTest = AsyncWorkSchedulerTest;
+
+TEST_F(DoWhileTest, LoopBodyExecutesAtLeastOnceWithBackoff) {
+ AsyncWorkScheduler async(getServiceContext());
+
+ int numLoops = 0;
+ auto future = doWhile(async,
+ Backoff(Seconds(1), Milliseconds::max()),
+ [](const StatusWith<int>& status) {
+ uassertStatusOK(status);
+ return false;
+ },
+ [&numLoops] { return Future<int>::makeReady(++numLoops); });
+
+ ASSERT(future.isReady());
+ ASSERT_EQ(1, numLoops);
+ ASSERT_EQ(1, future.get(operationContext()));
+}
+
+TEST_F(DoWhileTest, LoopBodyExecutesManyIterationsWithoutBackoff) {
+ AsyncWorkScheduler async(getServiceContext());
+
+ int remainingLoops = 100'000;
+ auto future = doWhile(async,
+ boost::none,
+ [&remainingLoops](const StatusWith<int>& status) {
+ uassertStatusOK(status);
+ return remainingLoops > 0;
+ },
+ [&remainingLoops] { return Future<int>::makeReady(--remainingLoops); });
+
+ ASSERT_EQ(0, future.get(operationContext()));
+ ASSERT_EQ(0, remainingLoops);
+}
+
+TEST_F(DoWhileTest, LoopObeysBackoff) {
+ AsyncWorkScheduler async(getServiceContext());
+
+ int numLoops = 0;
+ auto future = doWhile(async,
+ Backoff(Seconds(1), Milliseconds::max()),
+ [](const StatusWith<int>& status) { return uassertStatusOK(status) < 3; },
+ [&numLoops] { return Future<int>::makeReady(++numLoops); });
+
+ // The loop body needs to execute at least once
+ ASSERT(!future.isReady());
+ ASSERT_EQ(1, numLoops);
+
+ // Back-off is 1 millisecond now
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(network());
+ network()->runUntil(network()->now() + Milliseconds{1});
+ ASSERT(!future.isReady());
+ ASSERT_EQ(2, numLoops);
+ }
+
+ // Back-off is 2 milliseconds now, so advancing the time by 1 millisecond will not cause the
+ // loop body to run
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(network());
+ network()->runUntil(network()->now() + Milliseconds{1});
+ ASSERT(!future.isReady());
+ ASSERT_EQ(2, numLoops);
+ }
+
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(network());
+ network()->runUntil(network()->now() + Seconds{1});
+ ASSERT(future.isReady());
+ ASSERT_EQ(3, numLoops);
+ }
+
+ ASSERT_EQ(3, future.get(operationContext()));
+}
+
} // namespace
+} // namespace txn
} // namespace mongo