path: root/src/mongo/db/s/global_index/global_index_cloning_service_test.cpp
diff options
Diffstat (limited to 'src/mongo/db/s/global_index/global_index_cloning_service_test.cpp')
1 files changed, 484 insertions, 0 deletions
diff --git a/src/mongo/db/s/global_index/global_index_cloning_service_test.cpp b/src/mongo/db/s/global_index/global_index_cloning_service_test.cpp
new file mode 100644
index 00000000000..187271df7cf
--- /dev/null
+++ b/src/mongo/db/s/global_index/global_index_cloning_service_test.cpp
@@ -0,0 +1,484 @@
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#include "mongo/db/commands/list_collections_filter.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/op_observer/op_observer_noop.h"
+#include "mongo/db/op_observer/op_observer_registry.h"
+#include "mongo/db/read_write_concern_defaults_cache_lookup_mock.h"
+#include "mongo/db/repl/database_cloner_gen.h"
+#include "mongo/db/repl/primary_only_service_test_fixture.h"
+#include "mongo/db/repl/storage_interface.h"
+#include "mongo/db/repl/storage_interface_mock.h"
+#include "mongo/db/s/global_index/global_index_cloning_external_state.h"
+#include "mongo/db/s/global_index/global_index_cloning_service.h"
+#include "mongo/db/s/global_index/global_index_util.h"
+#include "mongo/db/session/logical_session_cache_noop.h"
+#include "mongo/db/session/session_catalog_mongod.h"
+#include "mongo/db/transaction/session_catalog_mongod_transaction_interface_impl.h"
+#include "mongo/idl/server_parameter_test_util.h"
+#include "mongo/logv2/log.h"
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
+namespace mongo {
+namespace global_index {
+namespace {
+const ShardId kRecipientShardId{"myShardId"};
+const NamespaceString kSourceNss{"sourcedb", "sourcecollection"};
+constexpr auto kSourceShardKey = "key"_sd;
+class GlobalIndexExternalStateForTest : public GlobalIndexCloningService::CloningExternalState {
+ ShardId myShardId(ServiceContext* serviceContext) const override {
+ return kRecipientShardId;
+ }
+ ChunkManager getShardedCollectionRoutingInfo(OperationContext* opCtx,
+ const NamespaceString& nss) const override {
+ invariant(nss == kSourceNss);
+ const OID epoch = OID::gen();
+ std::vector<ChunkType> chunks = {
+ ChunkType{_sourceUUID,
+ ChunkRange{BSON(kSourceShardKey << MINKEY), BSON(kSourceShardKey << MAXKEY)},
+ ChunkVersion({epoch, Timestamp(1, 1)}, {100, 0}),
+ _someDonorId}};
+ auto rt = RoutingTableHistory::makeNew(kSourceNss,
+ _sourceUUID,
+ BSON(kSourceShardKey << 1),
+ nullptr /* defaultCollator */,
+ false /* unique */,
+ std::move(epoch),
+ Timestamp(1, 1),
+ boost::none /* timeseriesFields */,
+ boost::none /* reshardingFields */,
+ boost::none /* chunkSizeBytes */,
+ true /* allowMigrations */,
+ chunks);
+ return ChunkManager(_someDonorId,
+ DatabaseVersion(UUID::gen(), Timestamp(1, 1)),
+ _makeStandaloneRoutingTableHistory(std::move(rt)),
+ boost::none /* clusterTime */);
+ }
+ RoutingTableHistoryValueHandle _makeStandaloneRoutingTableHistory(
+ RoutingTableHistory rt) const {
+ const auto version = rt.getVersion();
+ return RoutingTableHistoryValueHandle(
+ std::make_shared<RoutingTableHistory>(std::move(rt)),
+ ComparableChunkVersion::makeComparableChunkVersion(version));
+ }
+ const UUID _sourceUUID{UUID::gen()};
+ const ShardId _someDonorId{"otherShardId"};
+class MockGlobalIndexClonerFetcher : public GlobalIndexClonerFetcherInterface {
+ void setResultList(std::list<FetchedEntry> newResults) {
+ _docs = std::move(newResults);
+ }
+ boost::optional<FetchedEntry> getNext(OperationContext* opCtx) override {
+ if (_docs.empty()) {
+ return boost::none;
+ }
+ auto ret = _docs.front();
+ _docs.pop_front();
+ return ret;
+ }
+ std::list<FetchedEntry> _docs;
+class GlobalIndexCloningFetcherFactoryForTest : public GlobalIndexClonerFetcherFactoryInterface {
+ explicit GlobalIndexCloningFetcherFactoryForTest(MockGlobalIndexClonerFetcher* mockFetcher)
+ : _mockFetcher(mockFetcher) {}
+ std::unique_ptr<GlobalIndexClonerFetcherInterface> make(
+ NamespaceString nss,
+ UUID collUUId,
+ UUID indexUUID,
+ ShardId myShardId,
+ Timestamp minFetchTimestamp,
+ KeyPattern sourceShardKeyPattern,
+ KeyPattern globalIndexPattern) override {
+ return std::make_unique<MockGlobalIndexClonerFetcher>(*_mockFetcher);
+ }
+ MockGlobalIndexClonerFetcher* _mockFetcher;
+class GlobalIndexCloningServiceForTest : public GlobalIndexCloningService {
+ explicit GlobalIndexCloningServiceForTest(ServiceContext* serviceContext,
+ MockGlobalIndexClonerFetcher* mockFetcher)
+ : GlobalIndexCloningService(serviceContext),
+ _serviceContext(serviceContext),
+ _mockFetcher(mockFetcher) {}
+ std::shared_ptr<repl::PrimaryOnlyService::Instance> constructInstance(
+ BSONObj initialState) override {
+ return std::make_shared<GlobalIndexCloningService::CloningStateMachine>(
+ _serviceContext,
+ this,
+ std::make_unique<GlobalIndexExternalStateForTest>(),
+ std::make_unique<GlobalIndexCloningFetcherFactoryForTest>(_mockFetcher),
+ GlobalIndexClonerDoc::parse(IDLParserContext{"GlobalIndexCloningServiceForTest"},
+ initialState));
+ }
+ ServiceContext* _serviceContext;
+ MockGlobalIndexClonerFetcher* _mockFetcher;
+class Blocker {
+ ~Blocker() {
+ stdx::unique_lock lk(_mutex);
+ _shouldBlock = false;
+ _cvBlocked.notify_all();
+ }
+ void blockIfActivated(OperationContext* opCtx) {
+ stdx::unique_lock lk(_mutex);
+ _blockedOnce = true;
+ opCtx->waitForConditionOrInterrupt(_cvBlocked, lk, [this] { return !_shouldBlock; });
+ }
+ void waitUntilBlockedOccurred(OperationContext* opCtx) {
+ stdx::unique_lock lk(_mutex);
+ opCtx->waitForConditionOrInterrupt(_cvBlocked, lk, [this] { return _blockedOnce; });
+ }
+ void block() {
+ stdx::unique_lock lk(_mutex);
+ _shouldBlock = true;
+ }
+ void unblock() {
+ stdx::unique_lock lk(_mutex);
+ _shouldBlock = false;
+ }
+ Mutex _mutex = MONGO_MAKE_LATCH("GlobalIndexCloningServiceTestBlocker::_mutex");
+ stdx::condition_variable _cvBlocked;
+ bool _shouldBlock{false};
+ bool _blockedOnce{false};
+class OpObserverForTest : public OpObserverNoop {
+ OpObserverForTest(Blocker* insertBlocker, Blocker* deleteBlocker)
+ : _insertBlocker(insertBlocker), _deleteBlocker(deleteBlocker) {}
+ void onInserts(OperationContext* opCtx,
+ const CollectionPtr& coll,
+ std::vector<InsertStatement>::const_iterator begin,
+ std::vector<InsertStatement>::const_iterator end,
+ bool fromMigrate) override {
+ if (NamespaceString::kGlobalIndexClonerNamespace == coll->ns()) {
+ _insertBlocker->blockIfActivated(opCtx);
+ }
+ }
+ void onDelete(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const UUID& uuid,
+ StmtId stmtId,
+ const OplogDeleteEntryArgs& args) override {
+ if (NamespaceString::kGlobalIndexClonerNamespace == nss) {
+ _deleteBlocker->blockIfActivated(opCtx);
+ }
+ }
+ Blocker* _insertBlocker;
+ Blocker* _deleteBlocker;
+GlobalIndexCloningService::InstanceID extractInstanceId(const GlobalIndexClonerDoc& doc) {
+ return BSON("_id" << doc.getIndexCollectionUUID());
+using GlobalIndexStateMachine = GlobalIndexCloningServiceForTest::CloningStateMachine;
+class GlobalIndexClonerServiceTest : public repl::PrimaryOnlyServiceMongoDTest {
+ std::unique_ptr<repl::PrimaryOnlyService> makeService(ServiceContext* serviceContext) override {
+ return std::make_unique<GlobalIndexCloningServiceForTest>(serviceContext, &_mockFetcher);
+ }
+ void setUp() override {
+ repl::PrimaryOnlyServiceMongoDTest::setUp();
+ auto serviceContext = getServiceContext();
+ auto storageMock = std::make_unique<repl::StorageInterfaceMock>();
+ repl::StorageInterface::set(serviceContext, std::move(storageMock));
+ // The ReadWriteConcernDefaults decoration on the service context won't always be created,
+ // so we should manually instantiate it to ensure it exists in our tests.
+ ReadWriteConcernDefaults::create(getServiceContext(), _lookupMock.getFetchDefaultsFn());
+ _opObserverRegistry->addObserver(
+ std::make_unique<OpObserverForTest>(&_stateDocInsertBlocker, &_stateDocDeleteBlocker));
+ // Create config.transactions collection
+ auto opCtx = serviceContext->makeOperationContext(Client::getCurrent());
+ DBDirectClient client(opCtx.get());
+ client.createCollection(NamespaceString::kSessionTransactionsTableNamespace.ns());
+ client.createIndexes(NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ {MongoDSessionCatalog::getConfigTxnPartialIndexSpec()});
+ MongoDSessionCatalog::set(
+ getServiceContext(),
+ std::make_unique<MongoDSessionCatalog>(
+ std::make_unique<MongoDSessionCatalogTransactionInterfaceImpl>()));
+ // Session cache is needed otherwise client session info will ignored.
+ LogicalSessionCache::set(getServiceContext(), std::make_unique<LogicalSessionCacheNoop>());
+ std::list<GlobalIndexClonerFetcherInterface::FetchedEntry> fetcherResults;
+ fetcherResults.push_front(
+ {BSON("_id" << 10 << kSourceShardKey << 20), BSON(_indexKey << 30)});
+ replaceFetcherResultList(std::move(fetcherResults));
+ const auto& indexNs = globalIndexNss(kSourceNss, _indexName);
+ client.createCollection(indexNs.ns());
+ auto all = client.getCollectionInfos(indexNs.db().toString(),
+ BSON("name" << indexNs.coll().toString()));
+ ASSERT_EQ(1, all.size());
+ _indexCollectionUUID.emplace(uassertStatusOK(UUID::parse(all.front()["info"]["uuid"])));
+ }
+ void checkIndexCollection(OperationContext* opCtx) {
+ DBDirectClient client(opCtx);
+ MockGlobalIndexClonerFetcher fetcherCopy(_fetcherCopyForVerification);
+ while (auto next = fetcherCopy.getNext(opCtx)) {
+ FindCommandRequest query(globalIndexNss(kSourceNss, _indexName));
+ query.setFilter(BSON("_id" << next->indexKeyValues));
+ auto doc = client.findOne(query);
+ ASSERT_TRUE(!doc.isEmpty())
+ << "doc with index key: " << next->indexKeyValues
+ << " missing in global index output collection: " << dumpOutputColl(opCtx);
+ }
+ }
+ GlobalIndexClonerDoc makeStateDocument() {
+ return GlobalIndexClonerDoc(*_indexCollectionUUID,
+ kSourceNss,
+ _collectionUUID,
+ _indexName,
+ _indexSpec,
+ {},
+ GlobalIndexClonerStateEnum::kUnused);
+ }
+ bool doesCollectionExist(OperationContext* opCtx, const NamespaceString& nss) {
+ DBDirectClient client(opCtx);
+ auto collectionInfos = client.getCollectionInfos(
+ nss.db().toString(), ListCollectionsFilter::makeTypeCollectionFilter());
+ for (auto&& info : collectionInfos) {
+ auto coll =
+ repl::ListCollectionResult::parse(IDLParserContext("doesCollectionExist"), info);
+ if (coll.getName() == nss.coll()) {
+ return true;
+ }
+ }
+ return false;
+ }
+ Blocker* getStateDocInsertBlocker() {
+ return &_stateDocInsertBlocker;
+ }
+ Blocker* getStateDocDeleteBlocker() {
+ return &_stateDocDeleteBlocker;
+ }
+ void replaceFetcherResultList(
+ std::list<GlobalIndexClonerFetcherInterface::FetchedEntry> newResults) {
+ _mockFetcher.setResultList(std::move(newResults));
+ _fetcherCopyForVerification = _mockFetcher;
+ }
+ StringData indexKey() const {
+ return _indexKey;
+ }
+ std::string dumpOutputColl(OperationContext* opCtx) {
+ DBDirectClient client(opCtx);
+ FindCommandRequest query(globalIndexNss(kSourceNss, _indexName));
+ std::ostringstream outputStr;
+ auto res = client.find(query);
+ if (!res || !res->more()) {
+ return "<empty>";
+ }
+ outputStr << "docs: " << std::endl;
+ while (res->more()) {
+ auto doc = res->next();
+ outputStr << doc.toString() << std::endl;
+ }
+ return outputStr.str();
+ }
+ boost::optional<UUID> _indexCollectionUUID;
+ const UUID _collectionUUID{UUID::gen()};
+ const std::string _indexName{"global_x_1"};
+ const StringData _indexKey{"x"};
+ const BSONObj _indexSpec{BSON("key" << BSON(_indexKey << 1) << "unique" << true)};
+ ReadWriteConcernDefaultsLookupMock _lookupMock;
+ Blocker _stateDocInsertBlocker;
+ Blocker _stateDocDeleteBlocker;
+ MockGlobalIndexClonerFetcher _mockFetcher;
+ MockGlobalIndexClonerFetcher _fetcherCopyForVerification;
+TEST_F(GlobalIndexClonerServiceTest, CloneInsertsToGlobalIndexCollection) {
+ auto doc = makeStateDocument();
+ auto opCtx = makeOperationContext();
+ auto rawOpCtx = opCtx.get();
+ auto cloner = GlobalIndexStateMachine::getOrCreate(rawOpCtx, _service, doc.toBSON());
+ auto future = cloner->getCompletionFuture();
+ future.get();
+ ASSERT_TRUE(doesCollectionExist(rawOpCtx, skipIdNss(doc.getNss(), doc.getIndexName())));
+ checkIndexCollection(rawOpCtx);
+TEST_F(GlobalIndexClonerServiceTest, ShouldBeSafeToRetryOnStepDown) {
+ auto doc = makeStateDocument();
+ auto opCtx = makeOperationContext();
+ auto rawOpCtx = opCtx.get();
+ auto stateDocInsertBlocker = getStateDocInsertBlocker();
+ stateDocInsertBlocker->block();
+ auto stateDocDeleteBlocker = getStateDocDeleteBlocker();
+ stateDocDeleteBlocker->block();
+ {
+ auto cloner = GlobalIndexStateMachine::getOrCreate(rawOpCtx, _service, doc.toBSON());
+ stateDocInsertBlocker->waitUntilBlockedOccurred(rawOpCtx);
+ stepDown();
+ ASSERT_THROWS(cloner->getCompletionFuture().get(), DBException);
+ }
+ stepUp(rawOpCtx);
+ {
+ auto cloner = GlobalIndexStateMachine::getOrCreate(rawOpCtx, _service, doc.toBSON());
+ stateDocInsertBlocker->unblock();
+ stateDocDeleteBlocker->waitUntilBlockedOccurred(rawOpCtx);
+ stepDown();
+ ASSERT_THROWS(cloner->getCompletionFuture().get(), DBException);
+ }
+ stepUp(rawOpCtx);
+ // It is possible for the primary only service to run to completion and no longer exists.
+ {
+ auto cloner = GlobalIndexStateMachine::lookup(rawOpCtx, _service, extractInstanceId(doc));
+ stateDocDeleteBlocker->unblock();
+ (*cloner)->getCompletionFuture().get();
+ }
+ checkIndexCollection(rawOpCtx);
+TEST_F(GlobalIndexClonerServiceTest, ShouldBeAbleToConsumeMultipleBatchesWorthofDocs) {
+ std::list<GlobalIndexClonerFetcherInterface::FetchedEntry> fetcherResults;
+ RAIIServerParameterControllerForTest batchSizeForTest(
+ "globalIndexClonerServiceFetchBatchMaxSizeBytes", 50);
+ std::string padding(50, 'x');
+ // Populate enough to have more than one batch worth of documents.
+ for (int x = 0; x < 4; x++) {
+ fetcherResults.push_front({BSON("_id" << x << kSourceShardKey << x),
+ BSON(indexKey() << (std::to_string(x) + padding))});
+ }
+ replaceFetcherResultList(std::move(fetcherResults));
+ auto doc = makeStateDocument();
+ auto opCtx = makeOperationContext();
+ auto rawOpCtx = opCtx.get();
+ auto cloner = GlobalIndexStateMachine::getOrCreate(rawOpCtx, _service, doc.toBSON());
+ auto future = cloner->getCompletionFuture();
+ future.get();
+ ASSERT_TRUE(doesCollectionExist(rawOpCtx, skipIdNss(doc.getNss(), doc.getIndexName())));
+ checkIndexCollection(rawOpCtx);
+TEST_F(GlobalIndexClonerServiceTest, ShouldWorkWithEmptyCollection) {
+ replaceFetcherResultList({});
+ auto doc = makeStateDocument();
+ auto opCtx = makeOperationContext();
+ auto rawOpCtx = opCtx.get();
+ auto cloner = GlobalIndexStateMachine::getOrCreate(rawOpCtx, _service, doc.toBSON());
+ auto future = cloner->getCompletionFuture();
+ future.get();
+ ASSERT_TRUE(doesCollectionExist(rawOpCtx, skipIdNss(doc.getNss(), doc.getIndexName())));
+ checkIndexCollection(rawOpCtx);
+} // namespace
+} // namespace global_index
+} // namespace mongo