summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMax Hirschhorn <max.hirschhorn@mongodb.com>2021-04-09 17:03:05 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-04-09 17:26:13 +0000
commitd7c268102917688e9763063d009ad04fdf1d3532 (patch)
tree251fd43f16e034245642e17a751cbd882c2df416
parenta59ed06e5f2984cf7b7a1d9dac138297ace1f0a0 (diff)
downloadmongo-d7c268102917688e9763063d009ad04fdf1d3532.tar.gz
SERVER-55288 Move recipient helpers into resharding_data_replication.
-rw-r--r--src/mongo/db/s/SConscript1
-rw-r--r--src/mongo/db/s/resharding/resharding_data_replication.cpp76
-rw-r--r--src/mongo/db/s/resharding/resharding_data_replication.h21
-rw-r--r--src/mongo/db/s/resharding/resharding_data_replication_test.cpp231
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp2
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.cpp47
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.h13
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service_test.cpp128
8 files changed, 311 insertions, 208 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 2f8fd92ab26..ec1dac6b245 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -494,6 +494,7 @@ env.CppUnitTest(
'resharding_destined_recipient_test.cpp',
'resharding/resharding_agg_test.cpp',
'resharding/resharding_collection_cloner_test.cpp',
+ 'resharding/resharding_data_replication_test.cpp',
'resharding/resharding_donor_oplog_iterator_test.cpp',
'resharding/resharding_donor_recipient_common_test.cpp',
'resharding/resharding_metrics_test.cpp',
diff --git a/src/mongo/db/s/resharding/resharding_data_replication.cpp b/src/mongo/db/s/resharding/resharding_data_replication.cpp
index 3864357c299..3bbdf20e590 100644
--- a/src/mongo/db/s/resharding/resharding_data_replication.cpp
+++ b/src/mongo/db/s/resharding/resharding_data_replication.cpp
@@ -35,11 +35,11 @@
#include "mongo/db/repl/oplog_applier.h"
#include "mongo/db/s/resharding/resharding_collection_cloner.h"
+#include "mongo/db/s/resharding/resharding_data_copy_util.h"
#include "mongo/db/s/resharding/resharding_future_util.h"
#include "mongo/db/s/resharding/resharding_metrics.h"
#include "mongo/db/s/resharding/resharding_oplog_applier.h"
#include "mongo/db/s/resharding/resharding_oplog_fetcher.h"
-#include "mongo/db/s/resharding/resharding_recipient_service.h"
#include "mongo/db/s/resharding/resharding_server_parameters_gen.h"
#include "mongo/db/s/resharding/resharding_txn_cloner.h"
#include "mongo/db/s/resharding_util.h"
@@ -83,14 +83,14 @@ std::unique_ptr<ReshardingCollectionCloner> ReshardingDataReplication::_makeColl
ReshardingMetrics* metrics,
const CommonReshardingMetadata& metadata,
const ShardId& myShardId,
- Timestamp fetchTimestamp) {
+ Timestamp cloneTimestamp) {
return std::make_unique<ReshardingCollectionCloner>(
std::make_unique<ReshardingCollectionCloner::Env>(metrics),
ShardKeyPattern{metadata.getReshardingKey()},
metadata.getSourceNss(),
metadata.getSourceUUID(),
myShardId,
- fetchTimestamp,
+ cloneTimestamp,
metadata.getTempReshardingNss());
}
@@ -121,18 +121,18 @@ std::vector<std::unique_ptr<ReshardingOplogFetcher>> ReshardingDataReplication::
for (const auto& donor : donorShards) {
auto oplogBufferNss =
getLocalOplogBufferNamespace(metadata.getSourceUUID(), donor.getShardId());
- auto fetchTimestamp = *donor.getMinFetchTimestamp();
- auto idToResumeFrom =
- resharding::getFetcherIdToResumeFrom(opCtx, oplogBufferNss, fetchTimestamp);
- invariant((idToResumeFrom >= ReshardingDonorOplogId{fetchTimestamp, fetchTimestamp}));
+ auto minFetchTimestamp = *donor.getMinFetchTimestamp();
+ auto idToResumeFrom = getOplogFetcherResumeId(opCtx, oplogBufferNss, minFetchTimestamp);
+ invariant((idToResumeFrom >= ReshardingDonorOplogId{minFetchTimestamp, minFetchTimestamp}));
oplogFetchers.emplace_back(std::make_unique<ReshardingOplogFetcher>(
std::make_unique<ReshardingOplogFetcher::Env>(opCtx->getServiceContext(), metrics),
metadata.getReshardingUUID(),
metadata.getSourceUUID(),
// The recipient fetches oplog entries from the donor starting from the largest _id
- // value in the oplog buffer. Otherwise, it starts at fetchTimestamp, which corresponds
- // to {clusterTime: fetchTimestamp, ts: fetchTimestamp} as a resume token value.
+ // value in the oplog buffer. Otherwise, it starts at minFetchTimestamp, which
+ // corresponds to {clusterTime: minFetchTimestamp, ts: minFetchTimestamp} as a resume
+ // token value.
std::move(idToResumeFrom),
donor.getShardId(),
myShardId,
@@ -175,10 +175,9 @@ std::vector<std::unique_ptr<ReshardingOplogApplier>> ReshardingDataReplication::
for (size_t i = 0; i < donorShards.size(); ++i) {
auto sourceId =
ReshardingSourceId{metadata.getReshardingUUID(), donorShards[i].getShardId()};
- auto minOplogTimestamp = *donorShards[i].getMinFetchTimestamp();
- auto idToResumeFrom =
- resharding::getApplierIdToResumeFrom(opCtx, sourceId, minOplogTimestamp);
- invariant((idToResumeFrom >= ReshardingDonorOplogId{minOplogTimestamp, minOplogTimestamp}));
+ auto minFetchTimestamp = *donorShards[i].getMinFetchTimestamp();
+ auto idToResumeFrom = getOplogApplierResumeId(opCtx, sourceId, minFetchTimestamp);
+ invariant((idToResumeFrom >= ReshardingDonorOplogId{minFetchTimestamp, minFetchTimestamp}));
const auto& oplogBufferNss =
getLocalOplogBufferNamespace(metadata.getSourceUUID(), donorShards[i].getShardId());
@@ -224,9 +223,7 @@ std::unique_ptr<ReshardingDataReplicationInterface> ReshardingDataReplication::m
auto oplogFetcherExecutor = _makeOplogFetcherExecutor(donorShards.size());
- auto stashCollections = resharding::ensureStashCollectionsExist(
- opCtx, sourceChunkMgr, metadata.getSourceUUID(), donorShards);
-
+ auto stashCollections = ensureStashCollectionsExist(opCtx, sourceChunkMgr, donorShards);
auto oplogAppliers = _makeOplogAppliers(opCtx,
metrics,
metadata,
@@ -386,7 +383,7 @@ std::vector<SharedSemiFuture<void>> ReshardingDataReplication::_runTxnCloners(
}
// ReshardingTxnCloners must complete before the recipient transitions to kApplying to avoid
- // errors caused by donor shards unpinning the fetchTimestamp.
+ // errors caused by donor shards unpinning their minFetchTimestamp.
return txnClonerFutures;
}
@@ -455,4 +452,49 @@ void ReshardingDataReplication::shutdown() {
_oplogFetcherExecutor->shutdown();
}
+std::vector<NamespaceString> ReshardingDataReplication::ensureStashCollectionsExist(
+ OperationContext* opCtx,
+ const ChunkManager& sourceChunkMgr,
+ const std::vector<DonorShardFetchTimestamp>& donorShards) {
+ // Use the same collation for the stash collections as the temporary resharding collection
+ // (which is also the same as the collation for the collection being resharded).
+ CollectionOptions options;
+ if (auto collator = sourceChunkMgr.getDefaultCollator()) {
+ options.collation = collator->getSpec().toBSON();
+ }
+
+ std::vector<NamespaceString> stashCollections;
+ stashCollections.reserve(donorShards.size());
+
+ for (const auto& donor : donorShards) {
+ stashCollections.emplace_back(ReshardingOplogApplier::ensureStashCollectionExists(
+ opCtx, *sourceChunkMgr.getUUID(), donor.getShardId(), options));
+ }
+
+ return stashCollections;
+}
+
+ReshardingDonorOplogId ReshardingDataReplication::getOplogFetcherResumeId(
+ OperationContext* opCtx, const NamespaceString& oplogBufferNss, Timestamp minFetchTimestamp) {
+ invariant(!opCtx->lockState()->isLocked());
+
+ AutoGetCollection coll(opCtx, oplogBufferNss, MODE_IS);
+ if (coll) {
+ auto highestOplogBufferId = resharding::data_copy::findHighestInsertedId(opCtx, *coll);
+ if (!highestOplogBufferId.missing()) {
+ return ReshardingDonorOplogId::parse({"getOplogFetcherResumeId"},
+ highestOplogBufferId.getDocument().toBson());
+ }
+ }
+
+ return ReshardingDonorOplogId{minFetchTimestamp, minFetchTimestamp};
+}
+
+ReshardingDonorOplogId ReshardingDataReplication::getOplogApplierResumeId(
+ OperationContext* opCtx, const ReshardingSourceId& sourceId, Timestamp minFetchTimestamp) {
+ auto applierProgress = ReshardingOplogApplier::checkStoredProgress(opCtx, sourceId);
+ return applierProgress ? applierProgress->getProgress()
+ : ReshardingDonorOplogId{minFetchTimestamp, minFetchTimestamp};
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_data_replication.h b/src/mongo/db/s/resharding/resharding_data_replication.h
index 9653c7f02ae..73bb12ba856 100644
--- a/src/mongo/db/s/resharding/resharding_data_replication.h
+++ b/src/mongo/db/s/resharding/resharding_data_replication.h
@@ -35,6 +35,7 @@
#include "mongo/bson/timestamp.h"
#include "mongo/db/cancelable_operation_context.h"
+#include "mongo/db/s/resharding/donor_oplog_id_gen.h"
#include "mongo/s/chunk_manager.h"
#include "mongo/s/resharding/common_types_gen.h"
#include "mongo/s/shard_id.h"
@@ -120,7 +121,7 @@ public:
/**
* Returns a future that becomes ready when either
* (a) the recipient with respect to each donor shard has applied through the timestamp it has
- * finished cloning at (the fetchTimestamp), or
+ * finished cloning at (the cloneTimestamp), or
* (b) the recipient has encountered an operation-fatal error.
*/
virtual SharedSemiFuture<void> awaitConsistentButStale() = 0;
@@ -178,12 +179,28 @@ public:
void shutdown() override;
+ // The following methods are called by ReshardingDataReplication::make() and only exposed
+ // publicly for unit-testing purposes.
+
+ static std::vector<NamespaceString> ensureStashCollectionsExist(
+ OperationContext* opCtx,
+ const ChunkManager& sourceChunkMgr,
+ const std::vector<DonorShardFetchTimestamp>& donorShards);
+
+ static ReshardingDonorOplogId getOplogFetcherResumeId(OperationContext* opCtx,
+ const NamespaceString& oplogBufferNss,
+ Timestamp minFetchTimestamp);
+
+ static ReshardingDonorOplogId getOplogApplierResumeId(OperationContext* opCtx,
+ const ReshardingSourceId& sourceId,
+ Timestamp minFetchTimestamp);
+
private:
static std::unique_ptr<ReshardingCollectionCloner> _makeCollectionCloner(
ReshardingMetrics* metrics,
const CommonReshardingMetadata& metadata,
const ShardId& myShardId,
- Timestamp fetchTimestamp);
+ Timestamp cloneTimestamp);
static std::vector<std::unique_ptr<ReshardingTxnCloner>> _makeTxnCloners(
const CommonReshardingMetadata& metadata,
diff --git a/src/mongo/db/s/resharding/resharding_data_replication_test.cpp b/src/mongo/db/s/resharding/resharding_data_replication_test.cpp
new file mode 100644
index 00000000000..10091b5f3df
--- /dev/null
+++ b/src/mongo/db/s/resharding/resharding_data_replication_test.cpp
@@ -0,0 +1,231 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
+
+#include "mongo/platform/basic.h"
+
+#include <memory>
+#include <vector>
+
+#include "mongo/bson/bsonmisc.h"
+#include "mongo/db/persistent_task_store.h"
+#include "mongo/db/query/collation/collator_factory_mock.h"
+#include "mongo/db/query/collation/collator_interface_mock.h"
+#include "mongo/db/repl/replication_coordinator_mock.h"
+#include "mongo/db/s/resharding/resharding_data_copy_util.h"
+#include "mongo/db/s/resharding/resharding_data_replication.h"
+#include "mongo/db/s/resharding/resharding_oplog_applier_progress_gen.h"
+#include "mongo/db/s/resharding_util.h"
+#include "mongo/db/service_context_d_test_fixture.h"
+#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/s/chunk_manager.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+class ReshardingDataReplicationTest : public ServiceContextMongoDTest {
+public:
+ void setUp() override {
+ ServiceContextMongoDTest::setUp();
+
+ auto serviceContext = getServiceContext();
+ {
+ auto opCtx = makeOperationContext();
+ auto replCoord = std::make_unique<repl::ReplicationCoordinatorMock>(serviceContext);
+ ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY));
+ repl::ReplicationCoordinator::set(serviceContext, std::move(replCoord));
+
+ repl::createOplog(opCtx.get());
+
+ CollatorFactoryInterface::set(serviceContext, std::make_unique<CollatorFactoryMock>());
+ }
+ }
+
+ ChunkManager makeChunkManagerForSourceCollection(
+ std::unique_ptr<CollatorInterface> defaultCollator) {
+ const OID epoch = OID::gen();
+ std::vector<ChunkType> chunks = {ChunkType{
+ _sourceNss,
+ ChunkRange{BSON(_currentShardKey << MINKEY), BSON(_currentShardKey << MAXKEY)},
+ ChunkVersion(100, 0, epoch, boost::none /* timestamp */),
+ _myDonorId}};
+
+ auto rt = RoutingTableHistory::makeNew(_sourceNss,
+ _sourceUUID,
+ BSON(_currentShardKey << 1),
+ std::move(defaultCollator),
+ false /* unique */,
+ std::move(epoch),
+ boost::none /* timestamp */,
+ boost::none /* reshardingFields */,
+ true /* allowMigrations */,
+ chunks);
+
+ return ChunkManager(_myDonorId,
+ DatabaseVersion(UUID::gen()),
+ makeStandaloneRoutingTableHistory(std::move(rt)),
+ boost::none /* clusterTime */);
+ }
+
+private:
+ RoutingTableHistoryValueHandle makeStandaloneRoutingTableHistory(RoutingTableHistory rt) {
+ const auto version = rt.getVersion();
+ return RoutingTableHistoryValueHandle(
+ std::move(rt), ComparableChunkVersion::makeComparableChunkVersion(version));
+ }
+
+ const StringData _currentShardKey = "sk";
+
+ const NamespaceString _sourceNss{"test_crud", "collection_being_resharded"};
+ const CollectionUUID _sourceUUID = UUID::gen();
+
+ const ShardId _myDonorId{"myDonorId"};
+};
+
+TEST_F(ReshardingDataReplicationTest, StashCollectionsHaveSameCollationAsReshardingCollection) {
+ CollatorInterfaceMock collator(CollatorInterfaceMock::MockType::kReverseString);
+ auto sourceChunkMgr = makeChunkManagerForSourceCollection(collator.clone());
+
+ auto stashCollections = [&] {
+ auto opCtx = makeOperationContext();
+ return ReshardingDataReplication::ensureStashCollectionsExist(
+ opCtx.get(),
+ sourceChunkMgr,
+ {DonorShardFetchTimestamp{{"shard0"}}, DonorShardFetchTimestamp{{"shard1"}}});
+ }();
+
+ for (const auto& nss : stashCollections) {
+ auto opCtx = makeOperationContext();
+ AutoGetCollection stashColl(opCtx.get(), nss, MODE_IS);
+ ASSERT_TRUE(bool(stashColl->getDefaultCollator()))
+ << "Stash collection was created with 'simple' collation";
+ ASSERT_BSONOBJ_BINARY_EQ(stashColl->getDefaultCollator()->getSpec().toBSON(),
+ collator.getSpec().toBSON());
+ }
+}
+
+TEST_F(ReshardingDataReplicationTest, GetOplogFetcherResumeId) {
+ auto opCtx = makeOperationContext();
+
+ const auto reshardingUUID = UUID::gen();
+ auto oplogBufferNss = getLocalOplogBufferNamespace(reshardingUUID, {"shard0"});
+
+ const auto minFetchTimestamp = Timestamp{10, 0};
+ const auto oplogId1 = ReshardingDonorOplogId{{20, 0}, {18, 0}};
+ const auto oplogId2 = ReshardingDonorOplogId{{20, 0}, {19, 0}};
+ const auto oplogId3 = ReshardingDonorOplogId{{20, 0}, {20, 0}};
+
+ // The minFetchTimestamp value is used when the oplog buffer collection doesn't exist.
+ ASSERT_BSONOBJ_BINARY_EQ(
+ ReshardingDataReplication::getOplogFetcherResumeId(
+ opCtx.get(), oplogBufferNss, minFetchTimestamp)
+ .toBSON(),
+ (ReshardingDonorOplogId{minFetchTimestamp, minFetchTimestamp}.toBSON()));
+
+ // The minFetchTimestamp value is used when the oplog buffer collection is empty.
+ resharding::data_copy::ensureCollectionExists(opCtx.get(), oplogBufferNss, CollectionOptions{});
+ ASSERT_BSONOBJ_BINARY_EQ(
+ ReshardingDataReplication::getOplogFetcherResumeId(
+ opCtx.get(), oplogBufferNss, minFetchTimestamp)
+ .toBSON(),
+ (ReshardingDonorOplogId{minFetchTimestamp, minFetchTimestamp}.toBSON()));
+
+ auto insertFn = [&](const ReshardingDonorOplogId& oplogId) {
+ AutoGetCollection oplogBufferColl(opCtx.get(), oplogBufferNss, MODE_IX);
+ WriteUnitOfWork wuow(opCtx.get());
+ ASSERT_OK(oplogBufferColl->insertDocument(
+ opCtx.get(), InsertStatement{BSON("_id" << oplogId.toBSON())}, nullptr));
+ wuow.commit();
+ };
+
+ insertFn(oplogId2);
+ ASSERT_BSONOBJ_BINARY_EQ(ReshardingDataReplication::getOplogFetcherResumeId(
+ opCtx.get(), oplogBufferNss, minFetchTimestamp)
+ .toBSON(),
+ oplogId2.toBSON());
+
+ // The highest oplog ID is used regardless of the original insertion order.
+ insertFn(oplogId3);
+ insertFn(oplogId1);
+ ASSERT_BSONOBJ_BINARY_EQ(ReshardingDataReplication::getOplogFetcherResumeId(
+ opCtx.get(), oplogBufferNss, minFetchTimestamp)
+ .toBSON(),
+ oplogId3.toBSON());
+}
+
+TEST_F(ReshardingDataReplicationTest, GetOplogApplierResumeId) {
+ auto opCtx = makeOperationContext();
+
+ const auto reshardingUUID = UUID::gen();
+ const auto minFetchTimestamp = Timestamp{10, 0};
+ const ReshardingSourceId sourceId{reshardingUUID, {"shard0"}};
+
+ // The minFetchTimestamp value is used when the applier progress collection doesn't exist.
+ ASSERT_BSONOBJ_BINARY_EQ(
+ ReshardingDataReplication::getOplogApplierResumeId(opCtx.get(), sourceId, minFetchTimestamp)
+ .toBSON(),
+ (ReshardingDonorOplogId{minFetchTimestamp, minFetchTimestamp}.toBSON()));
+
+ // The minFetchTimestamp value is used when the applier progress collection is empty.
+ resharding::data_copy::ensureCollectionExists(
+ opCtx.get(), NamespaceString::kReshardingApplierProgressNamespace, CollectionOptions{});
+ ASSERT_BSONOBJ_BINARY_EQ(
+ ReshardingDataReplication::getOplogApplierResumeId(opCtx.get(), sourceId, minFetchTimestamp)
+ .toBSON(),
+ (ReshardingDonorOplogId{minFetchTimestamp, minFetchTimestamp}.toBSON()));
+
+ PersistentTaskStore<ReshardingOplogApplierProgress> store(
+ NamespaceString::kReshardingApplierProgressNamespace);
+
+ const auto expectedOplogId = ReshardingDonorOplogId{{20, 0}, {18, 0}};
+ store.add(opCtx.get(), ReshardingOplogApplierProgress{sourceId, expectedOplogId, 5});
+ ASSERT_BSONOBJ_BINARY_EQ(
+ ReshardingDataReplication::getOplogApplierResumeId(opCtx.get(), sourceId, minFetchTimestamp)
+ .toBSON(),
+ expectedOplogId.toBSON());
+
+ // The progress for the specific ReshardingSourceId is returned.
+ store.add(opCtx.get(),
+ ReshardingOplogApplierProgress{ReshardingSourceId{reshardingUUID, {"shard1"}},
+ ReshardingDonorOplogId{{20, 0}, {19, 0}},
+ 6});
+ store.add(opCtx.get(),
+ ReshardingOplogApplierProgress{ReshardingSourceId{UUID::gen(), {"shard0"}},
+ ReshardingDonorOplogId{{20, 0}, {20, 0}},
+ 7});
+ ASSERT_BSONOBJ_BINARY_EQ(
+ ReshardingDataReplication::getOplogApplierResumeId(opCtx.get(), sourceId, minFetchTimestamp)
+ .toBSON(),
+ expectedOplogId.toBSON());
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp
index ec292da2930..a8616102cdc 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp
@@ -259,7 +259,7 @@ private:
std::move(epoch),
boost::none /* timestamp */,
boost::none /* reshardingFields */,
- false /* allowMigrations */,
+ true /* allowMigrations */,
chunks);
return ChunkManager(_myDonorId,
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
index a6c67ebe2b7..6da05212806 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
@@ -152,53 +152,6 @@ void createTemporaryReshardingCollectionLocally(OperationContext* opCtx,
opCtx, reshardingNss, optionsAndIndexes);
}
-std::vector<NamespaceString> ensureStashCollectionsExist(
- OperationContext* opCtx,
- const ChunkManager& cm,
- const UUID& existingUUID,
- const std::vector<DonorShardFetchTimestamp>& donorShards) {
- // Use the same collation for the stash collections as the temporary resharding collection
- auto collator = cm.getDefaultCollator();
- BSONObj collationSpec = collator ? collator->getSpec().toBSON() : BSONObj();
-
- std::vector<NamespaceString> stashCollections;
- stashCollections.reserve(donorShards.size());
-
- {
- CollectionOptions options;
- options.collation = std::move(collationSpec);
- for (const auto& donor : donorShards) {
- stashCollections.emplace_back(ReshardingOplogApplier::ensureStashCollectionExists(
- opCtx, existingUUID, donor.getShardId(), options));
- }
- }
-
- return stashCollections;
-}
-
-ReshardingDonorOplogId getFetcherIdToResumeFrom(OperationContext* opCtx,
- NamespaceString oplogBufferNss,
- Timestamp fetchTimestamp) {
- AutoGetCollection collection(opCtx, oplogBufferNss, MODE_IS);
- if (!collection) {
- return ReshardingDonorOplogId{fetchTimestamp, fetchTimestamp};
- }
-
- auto highestOplogBufferId = resharding::data_copy::findHighestInsertedId(opCtx, *collection);
- return highestOplogBufferId.missing()
- ? ReshardingDonorOplogId{fetchTimestamp, fetchTimestamp}
- : ReshardingDonorOplogId::parse({"resharding::getFetcherIdToResumeFrom"},
- highestOplogBufferId.getDocument().toBson());
-}
-
-ReshardingDonorOplogId getApplierIdToResumeFrom(OperationContext* opCtx,
- ReshardingSourceId sourceId,
- Timestamp fetchTimestamp) {
- auto applierProgress = ReshardingOplogApplier::checkStoredProgress(opCtx, sourceId);
- return !applierProgress ? ReshardingDonorOplogId{fetchTimestamp, fetchTimestamp}
- : applierProgress->getProgress();
-}
-
} // namespace resharding
std::shared_ptr<repl::PrimaryOnlyService::Instance> ReshardingRecipientService::constructInstance(
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h
index b4b63efb071..b6fa21316c6 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.h
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.h
@@ -53,19 +53,6 @@ void createTemporaryReshardingCollectionLocally(OperationContext* opCtx,
const UUID& existingUUID,
Timestamp fetchTimestamp);
-std::vector<NamespaceString> ensureStashCollectionsExist(
- OperationContext* opCtx,
- const ChunkManager& cm,
- const UUID& existingUUID,
- const std::vector<DonorShardFetchTimestamp>& donorShards);
-
-ReshardingDonorOplogId getFetcherIdToResumeFrom(OperationContext* opCtx,
- NamespaceString oplogBufferNss,
- Timestamp fetchTimestamp);
-
-ReshardingDonorOplogId getApplierIdToResumeFrom(OperationContext* opCtx,
- ReshardingSourceId sourceId,
- Timestamp fetchTimestamp);
} // namespace resharding
class ReshardingRecipientService final : public repl::PrimaryOnlyService {
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
index c4a5b594045..8d118974d5b 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
@@ -542,133 +542,5 @@ TEST_F(ReshardingRecipientServiceTest,
verifyCollectionAndIndexes(kReshardingNss, kReshardingUUID, indexes);
}
-TEST_F(ReshardingRecipientServiceTest, StashCollectionsHaveSameCollationAsReshardingCollection) {
- auto shards = setupNShards(2);
-
- std::unique_ptr<CollatorInterfaceMock> collator =
- std::make_unique<CollatorInterfaceMock>(CollatorInterfaceMock::MockType::kReverseString);
- auto collationSpec = collator->getSpec().toBSON();
- auto srcChunkMgr = makeChunkManager(kOrigNss,
- ShardKeyPattern(BSON("_id" << 1)),
- std::move(collator),
- false /* unique */,
- {} /* splitPoints */);
-
- // Create stash collections for both donor shards.
- auto stashCollections = resharding::ensureStashCollectionsExist(
- operationContext(), srcChunkMgr, kOrigUUID, {ShardId("shard0"), ShardId("shard1")});
-
- // Verify that each stash collation has the collation we passed in above.
- {
- auto opCtx = operationContext();
-
- DBDirectClient client(opCtx);
- auto collInfos = client.getCollectionInfos("config");
- StringMap<BSONObj> nsToOptions;
- for (const auto& coll : collInfos) {
- nsToOptions[coll["name"].str()] = coll["options"].Obj();
- }
-
- for (const auto& coll : stashCollections) {
- auto it = nsToOptions.find(coll.coll());
- ASSERT(it != nsToOptions.end());
- auto options = it->second;
-
- ASSERT(options.hasField("collation"));
- auto collation = options["collation"].Obj();
- ASSERT_BSONOBJ_EQ(collationSpec, collation);
- }
- }
-}
-
-TEST_F(ReshardingRecipientServiceTest, FindFetcherIdToResumeFrom) {
- auto opCtx = operationContext();
- NamespaceString oplogBufferNs =
- getLocalOplogBufferNamespace(kReshardingUUID, ShardId("shard0"));
- auto timestamp0 = Timestamp(1, 0);
- auto timestamp1 = Timestamp(1, 1);
- auto timestamp2 = Timestamp(1, 2);
- auto timestamp3 = Timestamp(1, 3);
-
- // Start from FetchTimestamp since localOplogBuffer doesn't exist.
- ASSERT((resharding::getFetcherIdToResumeFrom(opCtx, oplogBufferNs, timestamp0) ==
- ReshardingDonorOplogId{timestamp0, timestamp0}));
-
-
- DBDirectClient client(opCtx);
- client.insert(oplogBufferNs.toString(),
- BSON("_id" << BSON("clusterTime" << timestamp3 << "ts" << timestamp1)));
-
- // Make sure to use the entry in localOplogBuffer.
- ASSERT((resharding::getFetcherIdToResumeFrom(opCtx, oplogBufferNs, timestamp0) ==
- ReshardingDonorOplogId{timestamp3, timestamp1}));
-
-
- client.insert(oplogBufferNs.toString(),
- BSON("_id" << BSON("clusterTime" << timestamp3 << "ts" << timestamp3)));
- client.insert(oplogBufferNs.toString(),
- BSON("_id" << BSON("clusterTime" << timestamp3 << "ts" << timestamp2)));
-
- // Make sure to choose the largest timestamp.
- ASSERT((resharding::getFetcherIdToResumeFrom(opCtx, oplogBufferNs, timestamp0) ==
- ReshardingDonorOplogId{timestamp3, timestamp3}));
-}
-
-TEST_F(ReshardingRecipientServiceTest, FindApplierIdToResumeFrom) {
- auto opCtx = operationContext();
- const ReshardingSourceId sourceId0{UUID::gen(), ShardId("shard0")};
- const ReshardingSourceId sourceId1{UUID::gen(), ShardId("shard1")};
-
- auto timestamp0 = Timestamp(1, 0);
- auto timestamp1 = Timestamp(1, 1);
- auto timestamp2 = Timestamp(1, 2);
- auto timestamp3 = Timestamp(1, 3);
-
- // Start from FetchTimestamp since reshardingApplierProgress doesn't exist.
- ASSERT((resharding::getApplierIdToResumeFrom(opCtx, sourceId0, timestamp0) ==
- ReshardingDonorOplogId{timestamp0, timestamp0}));
- ASSERT((resharding::getApplierIdToResumeFrom(opCtx, sourceId1, timestamp0) ==
- ReshardingDonorOplogId{timestamp0, timestamp0}));
-
-
- DBDirectClient client(opCtx);
- auto updateApplierProgress = [&client](auto sourceId, auto clusterTime, auto ts) {
- client.update(
- NamespaceString::kReshardingApplierProgressNamespace.ns(),
- QUERY(ReshardingOplogApplierProgress::kOplogSourceIdFieldName << sourceId.toBSON()),
- BSON("$set" << BSON(ReshardingOplogApplierProgress::kProgressFieldName
- << BSON("clusterTime" << clusterTime << "ts" << ts)
- << ReshardingOplogApplierProgress::kNumEntriesAppliedFieldName
- << 1LL)), /* not used for this test */
- true /* upsert */,
- false /* multi */);
- };
-
- updateApplierProgress(sourceId0, timestamp3, timestamp1);
-
- // SourceId0 resumes from the progress field but sourceId1 still uses FetchTimestamp.
- ASSERT((resharding::getApplierIdToResumeFrom(opCtx, sourceId0, timestamp0) ==
- ReshardingDonorOplogId{timestamp3, timestamp1}));
- ASSERT((resharding::getApplierIdToResumeFrom(opCtx, sourceId1, timestamp0) ==
- ReshardingDonorOplogId{timestamp0, timestamp0}));
-
- updateApplierProgress(sourceId1, timestamp3, timestamp1);
-
- // Both resume from the progress field.
- ASSERT((resharding::getApplierIdToResumeFrom(opCtx, sourceId0, timestamp0) ==
- ReshardingDonorOplogId{timestamp3, timestamp1}));
- ASSERT((resharding::getApplierIdToResumeFrom(opCtx, sourceId1, timestamp0) ==
- ReshardingDonorOplogId{timestamp3, timestamp1}));
-
- updateApplierProgress(sourceId0, timestamp3, timestamp3);
- updateApplierProgress(sourceId1, timestamp3, timestamp2);
-
- // Resume from the updated progress value.
- ASSERT((resharding::getApplierIdToResumeFrom(opCtx, sourceId0, timestamp0) ==
- ReshardingDonorOplogId{timestamp3, timestamp3}));
- ASSERT((resharding::getApplierIdToResumeFrom(opCtx, sourceId1, timestamp0) ==
- ReshardingDonorOplogId{timestamp3, timestamp2}));
-}
-
} // namespace
} // namespace mongo