summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorEric Maynard <eric.maynard@mongodb.com>2020-09-21 14:23:08 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-10-02 16:56:24 +0000
commit0b805a4119272e83f9ffe9bf7990397be9b0c7e9 (patch)
treef9efd343c873da5250d2ee8382e339e3748aa18d /src
parentb02bba04f8379a0ef6bc76ba475f1b063a6e1323 (diff)
downloadmongo-0b805a4119272e83f9ffe9bf7990397be9b0c7e9.tar.gz
SERVER-49899 Create config.transactions cloner for resharding (only pulls txns, doesn't merge)
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/s/SConscript2
-rw-r--r--src/mongo/db/s/resharding_txn_cloner.cpp110
-rw-r--r--src/mongo/db/s/resharding_txn_cloner.h54
-rw-r--r--src/mongo/db/s/resharding_txn_cloner_test.cpp208
4 files changed, 374 insertions, 0 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 76740dcbd9e..66bc7641c00 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -136,6 +136,7 @@ env.Library(
env.Library(
target='resharding_util',
source=[
+ 'resharding_txn_cloner.cpp',
'resharding_util.cpp',
env.Idlc('resharding/donor_oplog_id.idl')[0],
],
@@ -449,6 +450,7 @@ env.CppUnitTest(
'start_chunk_clone_request_test.cpp',
'type_shard_identity_test.cpp',
'vector_clock_shard_server_test.cpp',
+ 'resharding_txn_cloner_test.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/auth/authmocks',
diff --git a/src/mongo/db/s/resharding_txn_cloner.cpp b/src/mongo/db/s/resharding_txn_cloner.cpp
new file mode 100644
index 00000000000..f5e4721d3b4
--- /dev/null
+++ b/src/mongo/db/s/resharding_txn_cloner.cpp
@@ -0,0 +1,110 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
+
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/s/resharding_txn_cloner.h"
+
+#include <fmt/format.h>
+
+#include "mongo/bson/bsonobj.h"
+#include "mongo/client/dbclient_connection.h"
+#include "mongo/client/fetcher.h"
+#include "mongo/client/read_preference.h"
+#include "mongo/client/remote_command_targeter.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/pipeline/sharded_agg_helpers.h"
+#include "mongo/db/query/query_request.h"
+#include "mongo/db/s/resharding_util.h"
+#include "mongo/s/shard_id.h"
+
+namespace mongo {
+
+using namespace fmt::literals;
+
+std::unique_ptr<Fetcher> cloneConfigTxnsForResharding(
+ OperationContext* opCtx,
+ const ShardId& shardId,
+ Timestamp fetchTimestamp,
+ boost::optional<LogicalSessionId> startAfter,
+ std::function<void(StatusWith<BSONObj>)> merge) {
+ boost::intrusive_ptr<ExpressionContext> expCtx = make_intrusive<ExpressionContext>(
+ opCtx, nullptr, NamespaceString::kSessionTransactionsTableNamespace);
+ auto pipeline =
+ createConfigTxnCloningPipelineForResharding(expCtx, fetchTimestamp, std::move(startAfter));
+ AggregationRequest request(NamespaceString::kSessionTransactionsTableNamespace,
+ pipeline->serializeToBson());
+
+ request.setReadConcern(BSON(repl::ReadConcernArgs::kLevelFieldName
+ << repl::ReadConcernLevel::kMajorityReadConcern
+ << repl::ReadConcernArgs::kAfterClusterTimeFieldName
+ << fetchTimestamp));
+ request.setHint(BSON("_id_" << 1));
+
+ auto shard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId));
+ const auto targetHost = uassertStatusOK(
+ shard->getTargeter()->findHost(opCtx, ReadPreferenceSetting{ReadPreference::Nearest}));
+
+ auto fetcherCallback = [merge](const Fetcher::QueryResponseStatus& dataStatus,
+ Fetcher::NextAction* nextAction,
+ BSONObjBuilder* getMoreBob) {
+ if (!dataStatus.isOK()) {
+ merge(dataStatus.getStatus());
+ return;
+ }
+
+ auto data = dataStatus.getValue();
+ for (BSONObj doc : data.documents) {
+ merge(doc);
+ }
+
+ if (!getMoreBob) {
+ return;
+ }
+ getMoreBob->append("getMore", data.cursorId);
+ getMoreBob->append("collection", data.nss.coll());
+ };
+
+ auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
+
+ auto fetcher = std::make_unique<Fetcher>(
+ executor.get(),
+ targetHost,
+ "config",
+ request.serializeToCommandObj().toBson(),
+ fetcherCallback,
+ ReadPreferenceSetting(ReadPreference::Nearest).toContainingBSON());
+ uassertStatusOK(fetcher->schedule());
+ return fetcher;
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/s/resharding_txn_cloner.h b/src/mongo/db/s/resharding_txn_cloner.h
new file mode 100644
index 00000000000..e7fe58360ef
--- /dev/null
+++ b/src/mongo/db/s/resharding_txn_cloner.h
@@ -0,0 +1,54 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#pragma once
+
+#include <vector>
+
+#include "mongo/bson/bsonobj.h"
+#include "mongo/client/fetcher.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/s/shard_id.h"
+
+namespace mongo {
+
+/**
+ * Clone config.transactions from source and updates the config.transactions on itself.
+ * The parameter merge is a function called on every transaction received and should be used
+ * to merge the transaction into this machine's own congig.transactions collection.
+ *
+ * returns a pointer to the fetcher object sending the command.
+ */
+std::unique_ptr<Fetcher> cloneConfigTxnsForResharding(
+ OperationContext* opCtx,
+ const ShardId& shardId,
+ Timestamp fetchTimestamp,
+ boost::optional<LogicalSessionId> startAfter,
+ std::function<void(StatusWith<BSONObj>)> merge);
+
+} // namespace mongo
diff --git a/src/mongo/db/s/resharding_txn_cloner_test.cpp b/src/mongo/db/s/resharding_txn_cloner_test.cpp
new file mode 100644
index 00000000000..c5993ee3582
--- /dev/null
+++ b/src/mongo/db/s/resharding_txn_cloner_test.cpp
@@ -0,0 +1,208 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
+
+#include "mongo/platform/basic.h"
+
+#include <vector>
+
+#include "mongo/bson/bsonobj.h"
+#include "mongo/db/pipeline/document_source_mock.h"
+#include "mongo/db/repl/wait_for_majority_service.h"
+#include "mongo/db/s/resharding_txn_cloner.h"
+#include "mongo/db/s/shard_server_test_fixture.h"
+#include "mongo/db/session_txn_record_gen.h"
+#include "mongo/s/catalog/sharding_catalog_client_mock.h"
+#include "mongo/s/catalog/type_shard.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+class ReshardingTxnClonerTest : public ShardServerTestFixture {
+ void setUp() {
+ ShardServerTestFixture::setUp();
+ for (const auto& shardId : kTwoShardIdList) {
+ auto shardTargeter = RemoteCommandTargeterMock::get(
+ uassertStatusOK(shardRegistry()->getShard(operationContext(), shardId))
+ ->getTargeter());
+ shardTargeter->setFindHostReturnValue(makeHostAndPort(shardId));
+ }
+
+ WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext());
+ }
+
+ void tearDown() {
+ WaitForMajorityService::get(getServiceContext()).shutDown();
+ ShardServerTestFixture::tearDown();
+ }
+
+ /**
+ * Override the CatalogClient to make CatalogClient::getAllShards automatically return the
+ * expected shards. We cannot mock the network responses for the ShardRegistry reload, since the
+ * ShardRegistry reload is done over DBClient, not the NetworkInterface, and there is no
+ * DBClientMock analogous to the NetworkInterfaceMock.
+ */
+ std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient(
+ std::unique_ptr<DistLockManager> distLockManager) {
+
+ class StaticCatalogClient final : public ShardingCatalogClientMock {
+ public:
+ StaticCatalogClient(std::vector<ShardId> shardIds)
+ : ShardingCatalogClientMock(nullptr), _shardIds(std::move(shardIds)) {}
+
+ StatusWith<repl::OpTimeWith<std::vector<ShardType>>> getAllShards(
+ OperationContext* opCtx, repl::ReadConcernLevel readConcern) override {
+ std::vector<ShardType> shardTypes;
+ for (const auto& shardId : _shardIds) {
+ const ConnectionString cs = ConnectionString::forReplicaSet(
+ shardId.toString(), {makeHostAndPort(shardId)});
+ ShardType sType;
+ sType.setName(cs.getSetName());
+ sType.setHost(cs.toString());
+ shardTypes.push_back(std::move(sType));
+ };
+ return repl::OpTimeWith<std::vector<ShardType>>(shardTypes);
+ }
+
+ private:
+ const std::vector<ShardId> _shardIds;
+ };
+
+ return std::make_unique<StaticCatalogClient>(kTwoShardIdList);
+ }
+
+protected:
+ const std::vector<ShardId> kTwoShardIdList{{"s1"}, {"s2"}};
+
+ BSONObj makeTxn() {
+ return SessionTxnRecord(
+ makeLogicalSessionIdForTest(), 0, repl::OpTime(Timestamp::min(), 0), Date_t())
+ .toBSON();
+ ;
+ }
+
+private:
+ static HostAndPort makeHostAndPort(const ShardId& shardId) {
+ return HostAndPort(str::stream() << shardId << ":123");
+ }
+};
+
+TEST_F(ReshardingTxnClonerTest, TxnAggregation) {
+ std::vector<BSONObj> expectedTransactions{
+ makeTxn(), makeTxn(), makeTxn(), makeTxn(), makeTxn(), makeTxn(), makeTxn()};
+ std::vector<BSONObj> retrievedTransactions;
+
+ auto future = launchAsync([&, this] {
+ auto fetcher =
+ cloneConfigTxnsForResharding(operationContext(),
+ kTwoShardIdList[1],
+ Timestamp::max(),
+ boost::none,
+ [&](StatusWith<BSONObj> statusWithTransaction) {
+ auto transaction =
+ unittest::assertGet(statusWithTransaction);
+ retrievedTransactions.push_back(transaction);
+ });
+
+ fetcher->join();
+ });
+
+ onCommand([&](const executor::RemoteCommandRequest& request) {
+ return CursorResponse(NamespaceString::kSessionTransactionsTableNamespace,
+ CursorId{123},
+ std::vector<BSONObj>(expectedTransactions.begin(),
+ expectedTransactions.begin() + 4))
+ .toBSON(CursorResponse::ResponseType::InitialResponse);
+ });
+
+ onCommand([&](const executor::RemoteCommandRequest& request) {
+ return CursorResponse(NamespaceString::kSessionTransactionsTableNamespace,
+ CursorId{0},
+ std::vector<BSONObj>(expectedTransactions.begin() + 4,
+ expectedTransactions.end()))
+ .toBSON(CursorResponse::ResponseType::SubsequentResponse);
+ });
+
+ future.default_timed_get();
+
+ ASSERT(std::equal(expectedTransactions.begin(),
+ expectedTransactions.end(),
+ retrievedTransactions.begin(),
+ [](BSONObj a, BSONObj b) { return a.binaryEqual(b); }));
+}
+
+TEST_F(ReshardingTxnClonerTest, CursorNotFoundError) {
+ std::vector<BSONObj> expectedTransactions{
+ makeTxn(), makeTxn(), makeTxn(), makeTxn(), makeTxn(), makeTxn(), makeTxn()};
+ std::vector<BSONObj> retrievedTransactions;
+ int errorsReturned = 0;
+ Status error = Status::OK();
+
+ auto future = launchAsync([&, this] {
+ auto fetcher = cloneConfigTxnsForResharding(
+ operationContext(),
+ kTwoShardIdList[1],
+ Timestamp::max(),
+ boost::none,
+ [&](StatusWith<BSONObj> statusWithTransaction) {
+ if (statusWithTransaction.isOK()) {
+ retrievedTransactions.push_back(statusWithTransaction.getValue());
+ } else {
+ errorsReturned++;
+ error = statusWithTransaction.getStatus();
+ }
+ });
+ fetcher->join();
+ });
+
+ onCommand([&](const executor::RemoteCommandRequest& request) {
+ return CursorResponse(NamespaceString::kSessionTransactionsTableNamespace,
+ CursorId{124},
+ expectedTransactions)
+ .toBSON(CursorResponse::ResponseType::InitialResponse);
+ });
+
+ onCommand([&](const executor::RemoteCommandRequest& request) {
+ return Status(ErrorCodes::CursorNotFound, "Simulate cursor not found error");
+ });
+
+ future.default_timed_get();
+
+ ASSERT(std::equal(expectedTransactions.begin(),
+ expectedTransactions.end(),
+ retrievedTransactions.begin(),
+ [](BSONObj a, BSONObj b) { return a.binaryEqual(b); }));
+ ASSERT_EQ(errorsReturned, 1);
+ ASSERT_EQ(error, ErrorCodes::CursorNotFound);
+}
+
+} // namespace
+} // namespace mongo