/**
* Copyright (C) 2018 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
#include "mongo/platform/basic.h"
#include "mongo/client/remote_command_targeter_mock.h"
#include "mongo/db/commands/txn_cmds_gen.h"
#include "mongo/db/operation_context_session_mongod.h"
#include "mongo/db/session_catalog.h"
#include "mongo/db/transaction_coordinator_commands_impl.h"
#include "mongo/s/catalog/sharding_catalog_client_mock.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/shard_server_test_fixture.h"
#include "mongo/transport/transport_layer_manager.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/log.h"
namespace mongo {
namespace {
const std::vector shardIds{{"s1"}, {"s2"}, {"s3"}};
const int kMaxNumFailedHostRetryAttempts = 3;
HostAndPort makeHostAndPort(const ShardId& shardId) {
return HostAndPort(str::stream() << shardId << ":123");
}
/**
* This test fixture allows testing the behavior of a shard server acting as a transaction
* coordinator. The fixture provides:
*
* - helpers that simulate running voteAbortTransaction, voteCommitTransaction, and
* coordinateCommitTransaction
* - methods to expect the coordinator to send commitTransaction and abortTransaction over the
* network and respond with success or error.
*/
class TransactionCoordinatorTestFixture : public ShardServerTestFixture {
protected:
void setUp() final {
ShardServerTestFixture::setUp();
SessionCatalog::get(getServiceContext())->onStepUp(operationContext());
auto scopedSession =
SessionCatalog::get(operationContext())->getOrCreateSession(operationContext(), _lsid);
TransactionCoordinator::create(scopedSession.get());
for_each(shardIds.begin(), shardIds.end(), [this](const ShardId& shardId) {
auto shardTargeter = RemoteCommandTargeterMock::get(
uassertStatusOK(shardRegistry()->getShard(operationContext(), shardId))
->getTargeter());
shardTargeter->setFindHostReturnValue(makeHostAndPort(shardId));
});
}
void tearDown() final {
SessionCatalog::get(getServiceContext())->reset_forTest();
ShardServerTestFixture::tearDown();
}
/**
* This is a generic helper that simulates that this shard server received a command by
* launching a thread to run the command. The 'commandBody' argument should be a function that
* simulates the command's run().
*/
template
auto simulateHandleRequest(Lambda commandBody) {
auto future = launchAsync([this, commandBody] {
try {
// Set up OperationContext for this thread.
ON_BLOCK_EXIT([&] { Client::destroy(); });
Client::initThreadIfNotAlready();
auto opCtxPtr = cc().makeOperationContext();
auto opCtx = opCtxPtr.get();
// Required in order for OperationContextSession to check out the session.
opCtx->setLogicalSessionId(this->_lsid);
// Check out the session.
OperationContextSession ocs(opCtx, true);
auto session = OperationContextSession::get(opCtx);
invariant(session);
// Call the command's "run".
commandBody(opCtx);
} catch (DBException& e) {
log() << "Caught exception while running command: " << e.toStatus();
MONGO_UNREACHABLE;
}
});
return future;
}
auto receiveCoordinateCommit(std::set participantList) {
auto commandFn =
std::bind(txn::recvCoordinateCommit, std::placeholders::_1, participantList);
return simulateHandleRequest(commandFn);
}
auto receiveVoteCommit(ShardId shardId, int prepareTimestamp) {
auto commandFn =
std::bind(txn::recvVoteCommit, std::placeholders::_1, shardId, prepareTimestamp);
return simulateHandleRequest(commandFn);
}
auto receiveVoteAbort(ShardId shardId) {
auto commandFn = std::bind(txn::recvVoteAbort, std::placeholders::_1, shardId);
return simulateHandleRequest(commandFn);
}
void expectSendAbortAndReturnRetryableErrror() {
for (int i = 0; i <= kMaxNumFailedHostRetryAttempts; i++) {
onCommand([](const executor::RemoteCommandRequest& request) -> Status {
ASSERT_EQUALS("abortTransaction",
request.cmdObj.firstElement().fieldNameStringData());
return {ErrorCodes::HostUnreachable, ""};
});
}
}
void expectSendAbortAndReturnSuccess() {
onCommand([](const executor::RemoteCommandRequest& request) {
ASSERT_EQUALS("abortTransaction", request.cmdObj.firstElement().fieldNameStringData());
return BSON("ok" << 1);
});
}
void expectSendCommitAndReturnRetryableError() {
for (int i = 0; i <= kMaxNumFailedHostRetryAttempts; i++) {
onCommand([](const executor::RemoteCommandRequest& request) -> Status {
ASSERT_EQUALS("commitTransaction",
request.cmdObj.firstElement().fieldNameStringData());
return {ErrorCodes::HostUnreachable, ""};
});
}
}
void expectSendCommitAndReturnSuccess() {
onCommand([](const executor::RemoteCommandRequest& request) {
ASSERT_EQUALS("commitTransaction", request.cmdObj.firstElement().fieldNameStringData());
return BSON("ok" << 1);
});
}
private:
// Override the CatalogClient to make CatalogClient::getAllShards automatically return the
// expected shards. We cannot mock the network responses for the ShardRegistry reload, since the
// ShardRegistry reload is done over DBClient, not the NetworkInterface, and there is no
// DBClientMock analogous to the NetworkInterfaceMock.
std::unique_ptr makeShardingCatalogClient(
std::unique_ptr distLockManager) override {
class StaticCatalogClient final : public ShardingCatalogClientMock {
public:
StaticCatalogClient() : ShardingCatalogClientMock(nullptr) {}
StatusWith>> getAllShards(
OperationContext* opCtx, repl::ReadConcernLevel readConcern) override {
std::vector shardTypes;
for_each(shardIds.begin(), shardIds.end(), [&shardTypes](const ShardId& shardId) {
const ConnectionString cs = ConnectionString::forReplicaSet(
shardId.toString(), {makeHostAndPort(shardId)});
ShardType sType;
sType.setName(cs.getSetName());
sType.setHost(cs.toString());
shardTypes.push_back(std::move(sType));
});
return repl::OpTimeWith>(shardTypes);
}
};
return stdx::make_unique();
}
const LogicalSessionId _lsid{makeLogicalSessionIdForTest()};
};
//
// VoteCommit tests
//
TEST_F(TransactionCoordinatorTestFixture,
VoteCommitDoesNotSendCommitIfParticipantListNotYetReceived) {
auto future = receiveVoteCommit(shardIds[0], 0);
future.timed_get(kFutureTimeout);
}
TEST_F(TransactionCoordinatorTestFixture,
ResentVoteCommitDoesNotSendCommitIfParticipantListNotYetReceived) {
auto future = receiveVoteCommit(shardIds[0], 0);
future.timed_get(kFutureTimeout);
future = receiveVoteCommit(shardIds[0], 0);
future.timed_get(kFutureTimeout);
}
TEST_F(TransactionCoordinatorTestFixture,
VoteCommitDoesNotSendCommitIfSomeParticipantsNotYetVoted) {
auto future = receiveCoordinateCommit({shardIds[0], shardIds[1]});
future.timed_get(kFutureTimeout);
future = receiveVoteCommit(shardIds[0], 0);
future.timed_get(kFutureTimeout);
}
TEST_F(TransactionCoordinatorTestFixture,
ResentVoteCommitDoesNotSendCommitIfSomeParticipantsNotYetVoted) {
auto future = receiveCoordinateCommit({shardIds[0], shardIds[1]});
future.timed_get(kFutureTimeout);
future = receiveVoteCommit(shardIds[0], 0);
future.timed_get(kFutureTimeout);
future = receiveVoteCommit(shardIds[0], 0);
future.timed_get(kFutureTimeout);
}
TEST_F(TransactionCoordinatorTestFixture, FinalVoteCommitSendsCommit) {
auto future = receiveCoordinateCommit({shardIds[0], shardIds[1]});
future.timed_get(kFutureTimeout);
future = receiveVoteCommit(shardIds[0], 0);
future.timed_get(kFutureTimeout);
future = receiveVoteCommit(shardIds[1], 0);
expectSendCommitAndReturnSuccess();
expectSendCommitAndReturnSuccess();
future.timed_get(kFutureTimeout);
}
TEST_F(TransactionCoordinatorTestFixture,
ResentFinalVoteCommitOnlySendsCommitToNonAckedParticipants) {
auto future = receiveCoordinateCommit({shardIds[0], shardIds[1]});
future.timed_get(kFutureTimeout);
future = receiveVoteCommit(shardIds[0], 0);
future.timed_get(kFutureTimeout);
future = receiveVoteCommit(shardIds[1], 0);
expectSendCommitAndReturnSuccess();
expectSendCommitAndReturnRetryableError();
future.timed_get(kFutureTimeout);
future = receiveVoteCommit(shardIds[1], 0);
expectSendCommitAndReturnSuccess();
future.timed_get(kFutureTimeout);
}
TEST_F(TransactionCoordinatorTestFixture,
ResentFinalVoteCommitDoesNotSendCommitIfAllParticipantsAcked) {
auto future = receiveCoordinateCommit({shardIds[0], shardIds[1]});
future.timed_get(kFutureTimeout);
future = receiveVoteCommit(shardIds[0], 0);
future.timed_get(kFutureTimeout);
future = receiveVoteCommit(shardIds[1], 0);
expectSendCommitAndReturnSuccess();
expectSendCommitAndReturnSuccess();
future.timed_get(kFutureTimeout);
future = receiveVoteCommit(shardIds[1], 0);
future.timed_get(kFutureTimeout);
}
//
// VoteAbort tests
//
TEST_F(TransactionCoordinatorTestFixture, VoteAbortDoesNotSendAbortIfIsOnlyVoteReceivedSoFar) {
auto future = receiveVoteAbort(shardIds[0]);
future.timed_get(kFutureTimeout);
}
TEST_F(TransactionCoordinatorTestFixture,
ResentVoteAbortDoesNotSendAbortIfIsOnlyVoteReceivedSoFar) {
auto future = receiveVoteAbort(shardIds[0]);
future.timed_get(kFutureTimeout);
future = receiveVoteAbort(shardIds[0]);
future.timed_get(kFutureTimeout);
}
TEST_F(TransactionCoordinatorTestFixture, VoteAbortDoesNotSendAbortIfAllVotesSoFarWereToAbort) {
auto future = receiveVoteAbort(shardIds[0]);
future.timed_get(kFutureTimeout);
future = receiveVoteAbort(shardIds[1]);
future.timed_get(kFutureTimeout);
future = receiveVoteAbort(shardIds[2]);
future.timed_get(kFutureTimeout);
}
TEST_F(TransactionCoordinatorTestFixture,
ResentVoteAbortDoesNotSendAbortIfAllVotesSoFarWereToAbort) {
auto future = receiveVoteAbort(shardIds[0]);
future.timed_get(kFutureTimeout);
future = receiveVoteAbort(shardIds[1]);
future.timed_get(kFutureTimeout);
future = receiveVoteAbort(shardIds[2]);
future.timed_get(kFutureTimeout);
future = receiveVoteAbort(shardIds[2]);
future.timed_get(kFutureTimeout);
}
TEST_F(TransactionCoordinatorTestFixture, VoteAbortSendsAbortIfSomeParticipantsHaveVotedCommit) {
auto future = receiveVoteCommit(shardIds[0], 0);
future.timed_get(kFutureTimeout);
future = receiveVoteAbort(shardIds[1]);
expectSendAbortAndReturnSuccess();
future.timed_get(kFutureTimeout);
}
TEST_F(TransactionCoordinatorTestFixture,
VoteAbortDoesNotSendAbortIfAlreadySentAbortToAllParticipantsWhoHaveVotedSoFar) {
auto future = receiveVoteCommit(shardIds[0], 0);
future.timed_get(kFutureTimeout);
future = receiveVoteAbort(shardIds[1]);
expectSendAbortAndReturnSuccess();
future.timed_get(kFutureTimeout);
future = receiveVoteAbort(shardIds[2]);
future.timed_get(kFutureTimeout);
}
TEST_F(TransactionCoordinatorTestFixture,
ResentVoteAbortDoesNotSendAbortIfAlreadySentAbortToAllParticipantsWhoHaveVotedSoFar) {
auto future = receiveVoteCommit(shardIds[0], 0);
future.timed_get(kFutureTimeout);
future = receiveVoteAbort(shardIds[1]);
expectSendAbortAndReturnSuccess();
future.timed_get(kFutureTimeout);
future = receiveVoteAbort(shardIds[1]);
future.timed_get(kFutureTimeout);
}
TEST_F(TransactionCoordinatorTestFixture,
ResentVoteAbortDoesNotSendAbortEvenIfMoreParticipantsHaveVotedCommit) {
auto future = receiveVoteCommit(shardIds[0], 0);
future.timed_get(kFutureTimeout);
future = receiveVoteAbort(shardIds[1]);
expectSendAbortAndReturnSuccess();
future.timed_get(kFutureTimeout);
future = receiveVoteCommit(shardIds[2], 0);
future.timed_get(kFutureTimeout);
future = receiveVoteAbort(shardIds[1]);
future.timed_get(kFutureTimeout);
}
TEST_F(TransactionCoordinatorTestFixture,
VoteAbortAfterReceivingParticipantListSendsAbortToAllParticipantsWhoHaventVotedAbort) {
auto future = receiveCoordinateCommit({shardIds[0], shardIds[1], shardIds[2]});
future.timed_get(kFutureTimeout);
future = receiveVoteCommit(shardIds[0], 0);
future.timed_get(kFutureTimeout);
future = receiveVoteAbort(shardIds[1]);
expectSendAbortAndReturnSuccess();
expectSendAbortAndReturnSuccess();
future.timed_get(kFutureTimeout);
}
//
// CoordinateCommit tests
//
TEST_F(TransactionCoordinatorTestFixture, CoordinateCommitDoesNotSendCommitIfNoParticipantsVoted) {
auto future = receiveCoordinateCommit({shardIds[0], shardIds[1]});
future.timed_get(kFutureTimeout);
}
TEST_F(TransactionCoordinatorTestFixture,
ResentCoordinateCommitDoesNotSendCommitIfNoParticipantsVoted) {
auto future = receiveCoordinateCommit({shardIds[0], shardIds[1]});
future.timed_get(kFutureTimeout);
future = receiveCoordinateCommit({shardIds[0], shardIds[1]});
future.timed_get(kFutureTimeout);
}
TEST_F(TransactionCoordinatorTestFixture,
CoordinateCommitDoesNotSendCommitIfSomeParticipantsNotYetVoted) {
auto future = receiveVoteCommit(shardIds[1], 0);
future.timed_get(kFutureTimeout);
future = receiveCoordinateCommit({shardIds[0], shardIds[1]});
future.timed_get(kFutureTimeout);
}
TEST_F(TransactionCoordinatorTestFixture,
ResentCoordinateCommitDoesNotSendCommitIfSomeParticipantsNotYetVoted) {
auto future = receiveVoteCommit(shardIds[1], 0);
future.timed_get(kFutureTimeout);
future = receiveCoordinateCommit({shardIds[0], shardIds[1]});
future.timed_get(kFutureTimeout);
future = receiveCoordinateCommit({shardIds[0], shardIds[1]});
future.timed_get(kFutureTimeout);
}
TEST_F(TransactionCoordinatorTestFixture,
CoordinateCommitDoesNotSendAbortEvenIfSomeParticipantsVotedAbort) {
auto future = receiveVoteAbort(shardIds[0]);
future.timed_get(kFutureTimeout);
future = receiveCoordinateCommit({shardIds[0], shardIds[1]});
future.timed_get(kFutureTimeout);
}
TEST_F(TransactionCoordinatorTestFixture,
ResentCoordinateCommitDoesNotSendAbortEvenIfSomeParticipantsVotedAbort) {
auto future = receiveVoteAbort(shardIds[0]);
future.timed_get(kFutureTimeout);
future = receiveCoordinateCommit({shardIds[0], shardIds[1]});
future.timed_get(kFutureTimeout);
future = receiveCoordinateCommit({shardIds[0], shardIds[1]});
future.timed_get(kFutureTimeout);
}
TEST_F(TransactionCoordinatorTestFixture,
ResentCoordinateCommitDoesNotSendCommitEvenIfAllParticipantsAlreadyVotedCommit) {
auto future = receiveVoteCommit(shardIds[0], 0);
future.timed_get(kFutureTimeout);
future = receiveCoordinateCommit({shardIds[0], shardIds[1]});
future.timed_get(kFutureTimeout);
future = receiveVoteCommit(shardIds[1], 0);
expectSendCommitAndReturnSuccess();
expectSendCommitAndReturnSuccess();
future.timed_get(kFutureTimeout);
future = receiveCoordinateCommit({shardIds[0], shardIds[1]});
future.timed_get(kFutureTimeout);
}
} // namespace
} // namespace mongo