diff options
Diffstat (limited to 'src/mongo/db/s/transaction_coordinator_futures_util.cpp')
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_futures_util.cpp | 236 |
1 files changed, 236 insertions, 0 deletions
diff --git a/src/mongo/db/s/transaction_coordinator_futures_util.cpp b/src/mongo/db/s/transaction_coordinator_futures_util.cpp new file mode 100644 index 00000000000..f4374e7c5a2 --- /dev/null +++ b/src/mongo/db/s/transaction_coordinator_futures_util.cpp @@ -0,0 +1,236 @@ + +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kTransaction + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/transaction_coordinator_futures_util.h" + +#include "mongo/client/remote_command_retry_scheduler.h" +#include "mongo/client/remote_command_targeter.h" +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/s/sharding_state.h" +#include "mongo/transport/service_entry_point.h" +#include "mongo/util/fail_point_service.h" +#include "mongo/util/log.h" + +namespace mongo { +namespace txn { +namespace { + +MONGO_FAIL_POINT_DEFINE(hangWhileTargetingRemoteHost); +MONGO_FAIL_POINT_DEFINE(hangWhileTargetingLocalHost); + +using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackArgs; +using ResponseStatus = executor::TaskExecutor::ResponseStatus; + +} // namespace + +AsyncWorkScheduler::AsyncWorkScheduler(ServiceContext* serviceContext) + : _serviceContext(serviceContext), + _executor(Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor()) {} + +AsyncWorkScheduler::~AsyncWorkScheduler() { + { + stdx::lock_guard<stdx::mutex> lg(_mutex); + invariant(_activeOpContexts.empty()); + invariant(_activeHandles.empty()); + invariant(_childSchedulers.empty()); + } + + if (!_parent) + return; + + stdx::lock_guard<stdx::mutex> lg(_parent->_mutex); + _parent->_childSchedulers.erase(_itToRemove); + _parent = nullptr; +} + +Future<executor::TaskExecutor::ResponseStatus> AsyncWorkScheduler::scheduleRemoteCommand( + const ShardId& shardId, const ReadPreferenceSetting& readPref, const BSONObj& commandObj) { + + bool isSelfShard = [this, shardId] { + if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { + return shardId == ShardRegistry::kConfigServerShardId; + } + if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { + return shardId == ShardingState::get(_serviceContext)->shardId(); + } + MONGO_UNREACHABLE; // Only sharded systems should use the two-phase commit path. + }(); + + if (isSelfShard) { + // If sending a command to the same shard as this node is in, send it directly to this node + // rather than going through the host targeting below. This ensures that the state changes + // for the participant and coordinator occur sequentially on a single branch of replica set + // history. See SERVER-38142 for details. + return scheduleWork([ this, shardId, commandObj = commandObj.getOwned() ](OperationContext * + opCtx) { + // Note: This internal authorization is tied to the lifetime of 'opCtx', which is + // destroyed by 'scheduleWork' immediately after this lambda ends. + AuthorizationSession::get(Client::getCurrent())->grantInternalAuthorization(); + + if (MONGO_FAIL_POINT(hangWhileTargetingLocalHost)) { + LOG(0) << "Hit hangWhileTargetingLocalHost failpoint"; + MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED(opCtx, hangWhileTargetingLocalHost); + } + + const auto service = opCtx->getServiceContext(); + auto start = _executor->now(); + + auto requestOpMsg = + OpMsgRequest::fromDBAndBody(NamespaceString::kAdminDb, commandObj).serialize(); + const auto replyOpMsg = OpMsg::parseOwned( + service->getServiceEntryPoint()->handleRequest(opCtx, requestOpMsg).response); + + // Document sequences are not yet being used for responses. + invariant(replyOpMsg.sequences.empty()); + + // 'ResponseStatus' is the response format of a remote request sent over the network + // so we simulate that format manually here, since we sent the request over the + // loopback. + return ResponseStatus{replyOpMsg.body.getOwned(), _executor->now() - start}; + }); + } + + return _targetHostAsync(shardId, readPref) + .then([ this, shardId, commandObj = commandObj.getOwned(), readPref ]( + HostAndPort shardHostAndPort) mutable { + executor::RemoteCommandRequest request(shardHostAndPort, + NamespaceString::kAdminDb.toString(), + commandObj, + readPref.toContainingBSON(), + nullptr); + + auto pf = makePromiseFuture<ResponseStatus>(); + + stdx::unique_lock<stdx::mutex> ul(_mutex); + uassertStatusOK(_shutdownStatus); + + auto scheduledCommandHandle = + uassertStatusOK(_executor->scheduleRemoteCommand(request, [ + this, + commandObj = std::move(commandObj), + shardId = std::move(shardId), + promise = std::make_shared<Promise<ResponseStatus>>(std::move(pf.promise)) + ](const RemoteCommandCallbackArgs& args) mutable noexcept { + auto status = args.response.status; + // Only consider actual failures to send the command as errors. + if (status.isOK()) { + promise->emplaceValue(std::move(args.response)); + } else { + promise->setError([&] { + if (status == ErrorCodes::CallbackCanceled) { + stdx::unique_lock<stdx::mutex> ul(_mutex); + return _shutdownStatus.isOK() ? status : _shutdownStatus; + } + return status; + }()); + } + })); + + auto it = + _activeHandles.emplace(_activeHandles.begin(), std::move(scheduledCommandHandle)); + + ul.unlock(); + + return std::move(pf.future).tapAll( + [ this, it = std::move(it) ](StatusWith<ResponseStatus> s) { + stdx::lock_guard<stdx::mutex> lg(_mutex); + _activeHandles.erase(it); + }); + }); +} + +std::unique_ptr<AsyncWorkScheduler> AsyncWorkScheduler::makeChildScheduler() { + auto child = stdx::make_unique<AsyncWorkScheduler>(_serviceContext); + + stdx::lock_guard<stdx::mutex> lg(_mutex); + if (!_shutdownStatus.isOK()) + child->shutdown(_shutdownStatus); + + child->_parent = this; + child->_itToRemove = _childSchedulers.emplace(_childSchedulers.begin(), child.get()); + + return child; +} + +void AsyncWorkScheduler::shutdown(Status status) { + invariant(!status.isOK()); + + stdx::lock_guard<stdx::mutex> lg(_mutex); + if (!_shutdownStatus.isOK()) + return; + + _shutdownStatus = std::move(status); + + for (const auto& it : _activeOpContexts) { + stdx::lock_guard<Client> clientLock(*it->getClient()); + _serviceContext->killOperation(clientLock, it.get(), _shutdownStatus.code()); + } + + for (const auto& it : _activeHandles) { + _executor->cancel(it); + } + + for (auto& child : _childSchedulers) { + child->shutdown(_shutdownStatus); + } +} + +Future<HostAndPort> AsyncWorkScheduler::_targetHostAsync(const ShardId& shardId, + const ReadPreferenceSetting& readPref) { + return scheduleWork([shardId, readPref](OperationContext* opCtx) { + const auto shardRegistry = Grid::get(opCtx)->shardRegistry(); + const auto shard = uassertStatusOK(shardRegistry->getShard(opCtx, shardId)); + + if (MONGO_FAIL_POINT(hangWhileTargetingRemoteHost)) { + LOG(0) << "Hit hangWhileTargetingRemoteHost failpoint"; + MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED(opCtx, hangWhileTargetingRemoteHost); + } + + // TODO (SERVER-35678): Return a SemiFuture<HostAndPort> rather than using a blocking call + return shard->getTargeter()->findHostWithMaxWait(readPref, Seconds(20)).get(opCtx); + }); +} + +Future<void> whenAll(std::vector<Future<void>>& futures) { + std::vector<Future<int>> dummyFutures; + for (auto&& f : futures) { + dummyFutures.push_back(std::move(f).then([]() { return 0; })); + } + return collect( + std::move(dummyFutures), 0, [](int, const int&) { return ShouldStopIteration::kNo; }) + .ignoreValue(); +} + +} // namespace txn +} // namespace mongo |