diff options
author | Eric Maynard <eric.maynard@mongodb.com> | 2020-09-21 14:23:08 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-10-02 16:56:24 +0000 |
commit | 0b805a4119272e83f9ffe9bf7990397be9b0c7e9 (patch) | |
tree | f9efd343c873da5250d2ee8382e339e3748aa18d /src | |
parent | b02bba04f8379a0ef6bc76ba475f1b063a6e1323 (diff) | |
download | mongo-0b805a4119272e83f9ffe9bf7990397be9b0c7e9.tar.gz |
SERVER-49899 Create config.transactions cloner for resharding (only pulls txns, doesn't merge)
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/s/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/s/resharding_txn_cloner.cpp | 110 | ||||
-rw-r--r-- | src/mongo/db/s/resharding_txn_cloner.h | 54 | ||||
-rw-r--r-- | src/mongo/db/s/resharding_txn_cloner_test.cpp | 208 |
4 files changed, 374 insertions, 0 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 76740dcbd9e..66bc7641c00 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -136,6 +136,7 @@ env.Library( env.Library( target='resharding_util', source=[ + 'resharding_txn_cloner.cpp', 'resharding_util.cpp', env.Idlc('resharding/donor_oplog_id.idl')[0], ], @@ -449,6 +450,7 @@ env.CppUnitTest( 'start_chunk_clone_request_test.cpp', 'type_shard_identity_test.cpp', 'vector_clock_shard_server_test.cpp', + 'resharding_txn_cloner_test.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/auth/authmocks', diff --git a/src/mongo/db/s/resharding_txn_cloner.cpp b/src/mongo/db/s/resharding_txn_cloner.cpp new file mode 100644 index 00000000000..f5e4721d3b4 --- /dev/null +++ b/src/mongo/db/s/resharding_txn_cloner.cpp @@ -0,0 +1,110 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding + + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/resharding_txn_cloner.h" + +#include <fmt/format.h> + +#include "mongo/bson/bsonobj.h" +#include "mongo/client/dbclient_connection.h" +#include "mongo/client/fetcher.h" +#include "mongo/client/read_preference.h" +#include "mongo/client/remote_command_targeter.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/pipeline/sharded_agg_helpers.h" +#include "mongo/db/query/query_request.h" +#include "mongo/db/s/resharding_util.h" +#include "mongo/s/shard_id.h" + +namespace mongo { + +using namespace fmt::literals; + +std::unique_ptr<Fetcher> cloneConfigTxnsForResharding( + OperationContext* opCtx, + const ShardId& shardId, + Timestamp fetchTimestamp, + boost::optional<LogicalSessionId> startAfter, + std::function<void(StatusWith<BSONObj>)> merge) { + boost::intrusive_ptr<ExpressionContext> expCtx = make_intrusive<ExpressionContext>( + opCtx, nullptr, NamespaceString::kSessionTransactionsTableNamespace); + auto pipeline = + createConfigTxnCloningPipelineForResharding(expCtx, fetchTimestamp, std::move(startAfter)); + AggregationRequest request(NamespaceString::kSessionTransactionsTableNamespace, + pipeline->serializeToBson()); + + request.setReadConcern(BSON(repl::ReadConcernArgs::kLevelFieldName + << repl::ReadConcernLevel::kMajorityReadConcern + << repl::ReadConcernArgs::kAfterClusterTimeFieldName + << fetchTimestamp)); + request.setHint(BSON("_id_" << 1)); + + auto shard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId)); + const auto targetHost = uassertStatusOK( + shard->getTargeter()->findHost(opCtx, ReadPreferenceSetting{ReadPreference::Nearest})); + + auto fetcherCallback = [merge](const Fetcher::QueryResponseStatus& dataStatus, + Fetcher::NextAction* nextAction, + BSONObjBuilder* getMoreBob) { + if (!dataStatus.isOK()) { + merge(dataStatus.getStatus()); + return; + } + + auto data = dataStatus.getValue(); + for (BSONObj doc : data.documents) { + merge(doc); + } + + if (!getMoreBob) { + return; + } + getMoreBob->append("getMore", data.cursorId); + getMoreBob->append("collection", data.nss.coll()); + }; + + auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); + + auto fetcher = std::make_unique<Fetcher>( + executor.get(), + targetHost, + "config", + request.serializeToCommandObj().toBson(), + fetcherCallback, + ReadPreferenceSetting(ReadPreference::Nearest).toContainingBSON()); + uassertStatusOK(fetcher->schedule()); + return fetcher; +} + +} // namespace mongo diff --git a/src/mongo/db/s/resharding_txn_cloner.h b/src/mongo/db/s/resharding_txn_cloner.h new file mode 100644 index 00000000000..e7fe58360ef --- /dev/null +++ b/src/mongo/db/s/resharding_txn_cloner.h @@ -0,0 +1,54 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#pragma once + +#include <vector> + +#include "mongo/bson/bsonobj.h" +#include "mongo/client/fetcher.h" +#include "mongo/db/operation_context.h" +#include "mongo/s/shard_id.h" + +namespace mongo { + +/** + * Clone config.transactions from source and updates the config.transactions on itself. + * The parameter merge is a function called on every transaction received and should be used + * to merge the transaction into this machine's own congig.transactions collection. + * + * returns a pointer to the fetcher object sending the command. + */ +std::unique_ptr<Fetcher> cloneConfigTxnsForResharding( + OperationContext* opCtx, + const ShardId& shardId, + Timestamp fetchTimestamp, + boost::optional<LogicalSessionId> startAfter, + std::function<void(StatusWith<BSONObj>)> merge); + +} // namespace mongo diff --git a/src/mongo/db/s/resharding_txn_cloner_test.cpp b/src/mongo/db/s/resharding_txn_cloner_test.cpp new file mode 100644 index 00000000000..c5993ee3582 --- /dev/null +++ b/src/mongo/db/s/resharding_txn_cloner_test.cpp @@ -0,0 +1,208 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest + +#include "mongo/platform/basic.h" + +#include <vector> + +#include "mongo/bson/bsonobj.h" +#include "mongo/db/pipeline/document_source_mock.h" +#include "mongo/db/repl/wait_for_majority_service.h" +#include "mongo/db/s/resharding_txn_cloner.h" +#include "mongo/db/s/shard_server_test_fixture.h" +#include "mongo/db/session_txn_record_gen.h" +#include "mongo/s/catalog/sharding_catalog_client_mock.h" +#include "mongo/s/catalog/type_shard.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +class ReshardingTxnClonerTest : public ShardServerTestFixture { + void setUp() { + ShardServerTestFixture::setUp(); + for (const auto& shardId : kTwoShardIdList) { + auto shardTargeter = RemoteCommandTargeterMock::get( + uassertStatusOK(shardRegistry()->getShard(operationContext(), shardId)) + ->getTargeter()); + shardTargeter->setFindHostReturnValue(makeHostAndPort(shardId)); + } + + WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext()); + } + + void tearDown() { + WaitForMajorityService::get(getServiceContext()).shutDown(); + ShardServerTestFixture::tearDown(); + } + + /** + * Override the CatalogClient to make CatalogClient::getAllShards automatically return the + * expected shards. We cannot mock the network responses for the ShardRegistry reload, since the + * ShardRegistry reload is done over DBClient, not the NetworkInterface, and there is no + * DBClientMock analogous to the NetworkInterfaceMock. + */ + std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient( + std::unique_ptr<DistLockManager> distLockManager) { + + class StaticCatalogClient final : public ShardingCatalogClientMock { + public: + StaticCatalogClient(std::vector<ShardId> shardIds) + : ShardingCatalogClientMock(nullptr), _shardIds(std::move(shardIds)) {} + + StatusWith<repl::OpTimeWith<std::vector<ShardType>>> getAllShards( + OperationContext* opCtx, repl::ReadConcernLevel readConcern) override { + std::vector<ShardType> shardTypes; + for (const auto& shardId : _shardIds) { + const ConnectionString cs = ConnectionString::forReplicaSet( + shardId.toString(), {makeHostAndPort(shardId)}); + ShardType sType; + sType.setName(cs.getSetName()); + sType.setHost(cs.toString()); + shardTypes.push_back(std::move(sType)); + }; + return repl::OpTimeWith<std::vector<ShardType>>(shardTypes); + } + + private: + const std::vector<ShardId> _shardIds; + }; + + return std::make_unique<StaticCatalogClient>(kTwoShardIdList); + } + +protected: + const std::vector<ShardId> kTwoShardIdList{{"s1"}, {"s2"}}; + + BSONObj makeTxn() { + return SessionTxnRecord( + makeLogicalSessionIdForTest(), 0, repl::OpTime(Timestamp::min(), 0), Date_t()) + .toBSON(); + ; + } + +private: + static HostAndPort makeHostAndPort(const ShardId& shardId) { + return HostAndPort(str::stream() << shardId << ":123"); + } +}; + +TEST_F(ReshardingTxnClonerTest, TxnAggregation) { + std::vector<BSONObj> expectedTransactions{ + makeTxn(), makeTxn(), makeTxn(), makeTxn(), makeTxn(), makeTxn(), makeTxn()}; + std::vector<BSONObj> retrievedTransactions; + + auto future = launchAsync([&, this] { + auto fetcher = + cloneConfigTxnsForResharding(operationContext(), + kTwoShardIdList[1], + Timestamp::max(), + boost::none, + [&](StatusWith<BSONObj> statusWithTransaction) { + auto transaction = + unittest::assertGet(statusWithTransaction); + retrievedTransactions.push_back(transaction); + }); + + fetcher->join(); + }); + + onCommand([&](const executor::RemoteCommandRequest& request) { + return CursorResponse(NamespaceString::kSessionTransactionsTableNamespace, + CursorId{123}, + std::vector<BSONObj>(expectedTransactions.begin(), + expectedTransactions.begin() + 4)) + .toBSON(CursorResponse::ResponseType::InitialResponse); + }); + + onCommand([&](const executor::RemoteCommandRequest& request) { + return CursorResponse(NamespaceString::kSessionTransactionsTableNamespace, + CursorId{0}, + std::vector<BSONObj>(expectedTransactions.begin() + 4, + expectedTransactions.end())) + .toBSON(CursorResponse::ResponseType::SubsequentResponse); + }); + + future.default_timed_get(); + + ASSERT(std::equal(expectedTransactions.begin(), + expectedTransactions.end(), + retrievedTransactions.begin(), + [](BSONObj a, BSONObj b) { return a.binaryEqual(b); })); +} + +TEST_F(ReshardingTxnClonerTest, CursorNotFoundError) { + std::vector<BSONObj> expectedTransactions{ + makeTxn(), makeTxn(), makeTxn(), makeTxn(), makeTxn(), makeTxn(), makeTxn()}; + std::vector<BSONObj> retrievedTransactions; + int errorsReturned = 0; + Status error = Status::OK(); + + auto future = launchAsync([&, this] { + auto fetcher = cloneConfigTxnsForResharding( + operationContext(), + kTwoShardIdList[1], + Timestamp::max(), + boost::none, + [&](StatusWith<BSONObj> statusWithTransaction) { + if (statusWithTransaction.isOK()) { + retrievedTransactions.push_back(statusWithTransaction.getValue()); + } else { + errorsReturned++; + error = statusWithTransaction.getStatus(); + } + }); + fetcher->join(); + }); + + onCommand([&](const executor::RemoteCommandRequest& request) { + return CursorResponse(NamespaceString::kSessionTransactionsTableNamespace, + CursorId{124}, + expectedTransactions) + .toBSON(CursorResponse::ResponseType::InitialResponse); + }); + + onCommand([&](const executor::RemoteCommandRequest& request) { + return Status(ErrorCodes::CursorNotFound, "Simulate cursor not found error"); + }); + + future.default_timed_get(); + + ASSERT(std::equal(expectedTransactions.begin(), + expectedTransactions.end(), + retrievedTransactions.begin(), + [](BSONObj a, BSONObj b) { return a.binaryEqual(b); })); + ASSERT_EQ(errorsReturned, 1); + ASSERT_EQ(error, ErrorCodes::CursorNotFound); +} + +} // namespace +} // namespace mongo |