diff options
author | jannaerin <golden.janna@gmail.com> | 2020-10-22 15:56:13 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-11-23 18:00:09 +0000 |
commit | 1a7845dc4e68566cbcc854c91b0f3befceb4508c (patch) | |
tree | 364836afadfe50f39d3168ddcfd427031a1f9271 | |
parent | 4cf9000c24591166f4c093f4702a522a4a62097f (diff) | |
download | mongo-1a7845dc4e68566cbcc854c91b0f3befceb4508c.tar.gz |
SERVER-49901 Implement ordinary insert rule for resharding's oplog application
7 files changed, 682 insertions, 8 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index a8f6a7edde3..76a66095f9c 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -78,6 +78,7 @@ env.Library( 'resharding/resharding_op_observer.cpp', 'resharding/resharding_oplog_applier.cpp', 'resharding/resharding_oplog_applier_progress.idl', + 'resharding/resharding_oplog_application.cpp', 'resharding/resharding_oplog_fetcher.cpp', 'resharding/resharding_recipient_service.cpp', 'resharding/resharding_server_parameters.idl', diff --git a/src/mongo/db/s/resharding/resharding_oplog_application.cpp b/src/mongo/db/s/resharding/resharding_oplog_application.cpp new file mode 100644 index 00000000000..b9232293d0a --- /dev/null +++ b/src/mongo/db/s/resharding/resharding_oplog_application.cpp @@ -0,0 +1,261 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/resharding/resharding_oplog_application.h" + +#include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/dbhelpers.h" +#include "mongo/db/index/index_access_method.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/ops/delete.h" +#include "mongo/db/ops/update.h" +#include "mongo/db/stats/counters.h" +#include "mongo/logv2/log.h" + +namespace mongo { + +ReshardingOplogApplicationRules::ReshardingOplogApplicationRules(const NamespaceString& outputNss, + const NamespaceString& stashNss, + const ShardId& donorShardId, + ChunkManager sourceChunkMgr) + : _outputNss(outputNss), + _stashNss(stashNss), + _donorShardId(donorShardId), + _sourceChunkMgr(std::move(sourceChunkMgr)) {} + +Status ReshardingOplogApplicationRules::applyOperation( + OperationContext* opCtx, const repl::OplogEntryOrGroupedInserts& opOrGroupedInserts) { + LOGV2_DEBUG( + 49901, 3, "Applying op for resharding", "op"_attr = redact(opOrGroupedInserts.toBSON())); + + invariant(!opCtx->lockState()->inAWriteUnitOfWork()); + invariant(opCtx->writesAreReplicated()); + + auto op = opOrGroupedInserts.getOp(); + + return writeConflictRetry(opCtx, "applyOplogEntryResharding", op.getNss().ns(), [&] { + Status status = Status::OK(); + + WriteUnitOfWork wuow(opCtx); + + // Take the global lock now in order to avoid hitting the invariant that disallows unlocking + // the global lock while inside a WUOW upon releasing the DB lock. + Lock::GlobalLock globalLock(opCtx, MODE_IX); + + auto opType = op.getOpType(); + switch (opType) { + case repl::OpTypeEnum::kInsert: + status = _applyInsert(opCtx, opOrGroupedInserts); + break; + case repl::OpTypeEnum::kUpdate: + status = _applyUpdate(opCtx, opOrGroupedInserts); + break; + case repl::OpTypeEnum::kDelete: + status = _applyDelete(opCtx, opOrGroupedInserts); + break; + default: + MONGO_UNREACHABLE; + } + + if (!status.isOK() && status.code() == ErrorCodes::WriteConflict) { + throw WriteConflictException(); + } + + if (status.isOK()) + wuow.commit(); + + return status; + }); +} + +Status ReshardingOplogApplicationRules::_applyInsert( + OperationContext* opCtx, const repl::OplogEntryOrGroupedInserts& opOrGroupedInserts) { + /** + * The rules to apply ordinary insert operations are as follows: + * + * Note that [op _id] refers to the value of op["o"]["_id"]. + * + * 1. If there exists a document with _id == [op _id] in the conflict stash collection, replace + * the contents of the doc in the conflict stash collection for this donor shard with the + * contents of 'op'. + * 2. If there does NOT exist a document with _id == [op _id] in the output collection, insert + * the contents of 'op' into the output collection. + * 3. If there exists a document with _id == [op _id] in the output collection and it is owned + * by this donor shard, replace the contents of the doc in the output collection with the + * contents of 'op'. + * 4. If there exists a document with _id == [op _id] in the output collection and it is NOT + * owned by this donor shard, insert the contents of 'op' into the conflict stash collection. + */ + auto op = opOrGroupedInserts.getOp(); + + uassert(ErrorCodes::OperationFailed, + "Cannot apply an array insert as a part of resharding oplog application", + !opOrGroupedInserts.isGroupedInserts()); + + // Writes are replicated, so use global op counters. + OpCounters* opCounters = &globalOpCounters; + opCounters->gotInsert(); + + BSONObj o = op.getObject(); + + // If the 'o' field does not have an _id, the oplog entry is corrupted. + auto idField = o["_id"]; + uassert(ErrorCodes::NoSuchKey, + str::stream() << "Failed to apply insert due to missing _id: " << redact(op.toBSON()), + !idField.eoo()); + + BSONObj idQuery = idField.wrap(); + const NamespaceString outputNss = op.getNss(); + auto updateMod = write_ops::UpdateModification::parseFromClassicUpdate(o); + + // First, query the conflict stash collection using [op _id] as the query. If a doc exists, + // apply rule #1 and run a replacement update on the stash collection. + auto stashCollDoc = _queryCollForId(opCtx, _stashNss, idQuery); + if (!stashCollDoc.isEmpty()) { + auto updateStashColl = [this, idQuery, updateMod](OperationContext* opCtx, + Database* db, + const AutoGetCollection& collection) { + auto request = UpdateRequest(); + request.setNamespaceString(_stashNss); + request.setQuery(idQuery); + request.setUpdateModification(updateMod); + request.setUpsert(false); + request.setFromOplogApplication(true); + + UpdateResult ur = update(opCtx, db, request); + invariant(ur.numMatched != 0); + + return Status::OK(); + }; + + return _getCollectionAndApplyOp(opCtx, _stashNss, updateStashColl); + } + + // Query the output collection for a doc with _id == [op _id]. If a doc does not exist, apply + // rule #2 and insert this doc into the output collection. + auto outputCollDoc = _queryCollForId(opCtx, _outputNss, idQuery); + + if (outputCollDoc.isEmpty()) { + auto insertToOutputColl = + [this, o](OperationContext* opCtx, Database* db, const AutoGetCollection& collection) { + OpDebug* const nullOpDebug = nullptr; + + return collection->insertDocument( + opCtx, InsertStatement(o), nullOpDebug, false /* fromMigrate */); + }; + + return _getCollectionAndApplyOp(opCtx, _outputNss, insertToOutputColl); + } + + // A doc with [op _id] already exists in the output collection. Check whether this doc belongs + // to '_donorShardId' under the original shard key. If it does, apply rule #3 and run a + // replacement update on the output collection. + if (_sourceChunkMgr.keyBelongsToShard( + _sourceChunkMgr.getShardKeyPattern().extractShardKeyFromDoc(outputCollDoc), + _donorShardId)) { + auto updateOutputCollection = + [this, idQuery, updateMod]( + OperationContext* opCtx, Database* db, const AutoGetCollection& collection) { + auto request = UpdateRequest(); + request.setNamespaceString(_outputNss); + request.setQuery(idQuery); + request.setUpdateModification(updateMod); + request.setUpsert(false); + request.setFromOplogApplication(true); + + UpdateResult ur = update(opCtx, db, request); + invariant(ur.numMatched != 0); + + return Status::OK(); + }; + + return _getCollectionAndApplyOp(opCtx, _outputNss, updateOutputCollection); + } + + // The doc does not belong to '_donorShardId' under the original shard key, so apply rule #4 + // and insert the contents of 'op' to the stash collection. + auto insertToStashColl = + [this, o](OperationContext* opCtx, Database* db, const AutoGetCollection& collection) { + OpDebug* const nullOpDebug = nullptr; + return collection->insertDocument( + opCtx, InsertStatement(o), nullOpDebug, false /* fromMigrate */); + }; + + return _getCollectionAndApplyOp(opCtx, _stashNss, insertToStashColl); +} + +Status ReshardingOplogApplicationRules::_applyUpdate( + OperationContext* opCtx, const repl::OplogEntryOrGroupedInserts& opOrGroupedInserts) { + // TODO SERVER-49903 + return Status::OK(); +} + +Status ReshardingOplogApplicationRules::_applyDelete( + OperationContext* opCtx, const repl::OplogEntryOrGroupedInserts& opOrGroupedInserts) { + // TODO SERVER-49902 + return Status::OK(); +} + +Status ReshardingOplogApplicationRules::_getCollectionAndApplyOp( + OperationContext* opCtx, + const NamespaceString& nss, + unique_function<Status(OperationContext*, Database*, const AutoGetCollection& collection)> + applyOpFn) { + AutoGetCollection autoColl(opCtx, nss, MODE_IX); + uassert(ErrorCodes::NamespaceNotFound, + str::stream() << "Failed to apply op during resharding due to missing collection " + << nss.ns(), + autoColl); + + return applyOpFn(opCtx, autoColl.getDb(), autoColl); +} + +BSONObj ReshardingOplogApplicationRules::_queryCollForId(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& idQuery) { + AutoGetCollectionForRead autoRead(opCtx, nss); + uassert(ErrorCodes::NamespaceNotFound, + str::stream() << "Failed to apply op during resharding due to missing collection " + << nss.ns(), + autoRead); + + const IndexCatalog* indexCatalog = autoRead->getIndexCatalog(); + uassert(4990100, + str::stream() << "Missing _id index for collection " << nss.ns(), + indexCatalog->haveIdIndex(opCtx)); + + BSONObj result; + Helpers::findById(opCtx, autoRead.getDb(), nss.ns(), idQuery, result); + return result; +} +} // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_oplog_application.h b/src/mongo/db/s/resharding/resharding_oplog_application.h new file mode 100644 index 00000000000..b2b02b325d9 --- /dev/null +++ b/src/mongo/db/s/resharding/resharding_oplog_application.h @@ -0,0 +1,109 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <functional> +#include <string> +#include <vector> + +#include "mongo/base/status.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/bson/timestamp.h" +#include "mongo/db/catalog/collection_catalog.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/repl/oplog_entry.h" +#include "mongo/db/repl/oplog_entry_or_grouped_inserts.h" +#include "mongo/db/repl/optime.h" +#include "mongo/db/repl/replication_coordinator.h" +#include "mongo/s/chunk_manager.h" + +namespace mongo { +class Collection; +class CollectionPtr; +class NamespaceString; +class OperationContext; + +/** + * Applies an operation from an oplog entry using special rules that apply to resharding. + */ +class ReshardingOplogApplicationRules { +public: + ReshardingOplogApplicationRules(const NamespaceString& outputNss, + const NamespaceString& stashNss, + const ShardId& donorShardId, + ChunkManager sourceChunkMgr); + + /** + * Wraps the op application in a writeConflictRetry loop and is responsible for creating and + * committing the WUOW. + */ + Status applyOperation(OperationContext* opCtx, + const repl::OplogEntryOrGroupedInserts& opOrGroupedInserts); + +private: + // Applies an insert operation + Status _applyInsert(OperationContext* opCtx, + const repl::OplogEntryOrGroupedInserts& opOrGroupedInserts); + + // Applies an update operation + Status _applyUpdate(OperationContext* opCtx, + const repl::OplogEntryOrGroupedInserts& opOrGroupedInserts); + + // Applies a delete operation + Status _applyDelete(OperationContext* opCtx, + const repl::OplogEntryOrGroupedInserts& opOrGroupedInserts); + + // Takes db and collection locks in MODE_IX for 'nss' and then applies an op by calling + // 'applyOpFn'. 'nss' must either be '_outputNss' or '_stashNss'. + Status _getCollectionAndApplyOp( + OperationContext* opCtx, + const NamespaceString& nss, + unique_function<Status(OperationContext*, Database*, const AutoGetCollection&)> applyOpFn); + + // Takes db and collection locks in MODE_IS for 'nss' and queries the collection using + // 'idQuery'. + BSONObj _queryCollForId(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& idQuery); + + // Namespace where operations should be applied, unless there is an _id conflict. + const NamespaceString _outputNss; + + // Namespace where operations are applied if there is an _id conflict. + const NamespaceString _stashNss; + + // ShardId of the donor shard that the operations being applied originate from. + const ShardId _donorShardId; + + // The chunk manager for the source namespace and original shard key. + const ChunkManager _sourceChunkMgr; +}; + +} // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp index 3eae9fd93e0..fac4c892b7a 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp @@ -37,10 +37,14 @@ #include "mongo/base/simple_string_data_comparator.h" #include "mongo/db/catalog/create_collection.h" +#include "mongo/db/catalog/document_validation.h" #include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/namespace_string.h" #include "mongo/db/persistent_task_store.h" #include "mongo/db/repl/oplog_applier_utils.h" #include "mongo/db/s/resharding/resharding_donor_oplog_iterator_interface.h" +#include "mongo/db/s/resharding/resharding_server_parameters_gen.h" #include "mongo/db/s/resharding_util.h" #include "mongo/db/session_catalog_mongod.h" #include "mongo/db/transaction_participant.h" @@ -173,6 +177,7 @@ ReshardingOplogApplier::ReshardingOplogApplier( Timestamp reshardingCloneFinishedTs, std::unique_ptr<ReshardingDonorOplogIteratorInterface> oplogIterator, size_t batchSize, + const ChunkManager& sourceChunkMgr, OutOfLineExecutor* executor, ThreadPool* writerPool) : _sourceId(std::move(sourceId)), @@ -185,6 +190,8 @@ ReshardingOplogApplier::ReshardingOplogApplier( "{}.{}"_format(_nsBeingResharded.coll(), _oplogNs.coll())), _reshardingCloneFinishedTs(std::move(reshardingCloneFinishedTs)), _batchSize(batchSize), + _applicationRules(ReshardingOplogApplicationRules( + _outputNs, _reshardingTempNs, _sourceId.getShardId(), sourceChunkMgr)), _service(service), _executor(executor), _writerPool(writerPool), @@ -487,13 +494,32 @@ Status ReshardingOplogApplier::_applyOplogEntryOrGroupedInserts( return insertOplogAndUpdateConfigForRetryable(opCtx, entryOrGroupedInserts.getOp()); } - // We always use oplog application mode 'kInitialSync', because we're applying oplog entries to - // a cloned database the way initial sync does. - return repl::OplogApplierUtils::applyOplogEntryOrGroupedInsertsCommon(opCtx, - entryOrGroupedInserts, - oplogApplicationMode, - incrementOpsAppliedStats, - nullptr /* opCounters*/); + invariant(DocumentValidationSettings::get(opCtx).isSchemaValidationDisabled()); + + auto opType = op.getOpType(); + if (opType == repl::OpTypeEnum::kNoop) { + return Status::OK(); + } else if (resharding::gUseReshardingOplogApplicationRules) { + if (opType == repl::OpTypeEnum::kInsert) { + return _applicationRules.applyOperation(opCtx, entryOrGroupedInserts); + } else { + // TODO SERVER-49902 call ReshardingOplogApplicationRules::applyOperation for deletes + // TODO SERVER-49903 call ReshardingOplogApplicationRules::applyOperation for updates + return repl::OplogApplierUtils::applyOplogEntryOrGroupedInsertsCommon( + opCtx, + entryOrGroupedInserts, + oplogApplicationMode, + incrementOpsAppliedStats, + nullptr); + } + } else { + // We always use oplog application mode 'kInitialSync', because we're applying oplog entries + // to a cloned database the way initial sync does. + return repl::OplogApplierUtils::applyOplogEntryOrGroupedInsertsCommon( + opCtx, entryOrGroupedInserts, oplogApplicationMode, incrementOpsAppliedStats, nullptr); + } + + MONGO_UNREACHABLE; } // TODO: use MutableOplogEntry to handle prePostImageOps? Because OplogEntry tries to create BSON diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.h b/src/mongo/db/s/resharding/resharding_oplog_applier.h index a95bb3e7029..a711bd1de88 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_applier.h +++ b/src/mongo/db/s/resharding/resharding_oplog_applier.h @@ -34,7 +34,9 @@ #include "mongo/db/repl/oplog_entry_or_grouped_inserts.h" #include "mongo/db/s/resharding/donor_oplog_id_gen.h" #include "mongo/db/s/resharding/resharding_donor_oplog_iterator.h" +#include "mongo/db/s/resharding/resharding_oplog_application.h" #include "mongo/db/s/resharding/resharding_oplog_applier_progress_gen.h" +#include "mongo/s/chunk_manager.h" #include "mongo/util/future.h" namespace mongo { @@ -58,6 +60,7 @@ public: Timestamp reshardingCloneFinishedTs, std::unique_ptr<ReshardingDonorOplogIteratorInterface> oplogIterator, size_t batchSize, + const ChunkManager& sourceChunkMgr, OutOfLineExecutor* executor, ThreadPool* writerPool); @@ -174,6 +177,10 @@ private: // before deciding to apply all oplog entries currently in the buffer. const size_t _batchSize; + // Actually applies the ops, using special rules that apply only to resharding. Only used when + // the 'useReshardingOplogApplicationRules' server parameter is set to true. + ReshardingOplogApplicationRules _applicationRules; + Mutex _mutex = MONGO_MAKE_LATCH("ReshardingOplogApplier::_mutex"); // Member variable concurrency access rules: @@ -209,7 +216,7 @@ private: int _remainingWritersToWait{0}; // (M) Keeps track of the status from writer vectors. Will only keep one error if there are - // mulitple occurrances. + // multiple occurrances. Status _currentBatchConsolidatedStatus{Status::OK()}; // (R) The source of the oplog entries to be applied. diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp index d6dd459d982..c20a32d2169 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp @@ -34,19 +34,25 @@ #include <fmt/format.h> #include "mongo/db/catalog/create_collection.h" +#include "mongo/db/catalog_raii.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" +#include "mongo/db/logical_session_cache_noop.h" #include "mongo/db/repl/oplog_applier.h" #include "mongo/db/repl/storage_interface_impl.h" +#include "mongo/db/s/collection_sharding_runtime.h" +#include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/resharding/resharding_donor_oplog_iterator_interface.h" #include "mongo/db/s/resharding/resharding_oplog_applier.h" #include "mongo/db/s/resharding_util.h" #include "mongo/db/s/sharding_mongod_test_fixture.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/db/service_context_d_test_fixture.h" #include "mongo/db/session_catalog_mongod.h" #include "mongo/db/transaction_participant.h" #include "mongo/executor/thread_pool_task_executor_test_fixture.h" #include "mongo/logv2/log.h" +#include "mongo/s/catalog_cache_loader_mock.h" #include "mongo/unittest/unittest.h" #include "mongo/util/uuid.h" @@ -93,11 +99,25 @@ private: class ReshardingOplogApplierTest : public ShardingMongodTestFixture { public: + const HostAndPort kConfigHostAndPort{"DummyConfig", 12345}; + const std::string kOriginalShardKey = "sk"; + const BSONObj kOriginalShardKeyPattern{BSON(kOriginalShardKey << 1)}; + void setUp() override { ShardingMongodTestFixture::setUp(); serverGlobalParams.clusterRole = ClusterRole::ShardServer; + auto clusterId = OID::gen(); + ShardingState::get(getServiceContext()) + ->setInitialized(_sourceId.getShardId().toString(), clusterId); + + auto mockLoader = std::make_unique<CatalogCacheLoaderMock>(); + CatalogCacheLoader::set(getServiceContext(), std::move(mockLoader)); + + uassertStatusOK( + initializeGlobalShardingStateForMongodForTest(ConnectionString(kConfigHostAndPort))); + auto mockNetwork = std::make_unique<executor::NetworkInterfaceMock>(); _executor = executor::makeThreadPoolTestExecutor(std::move(mockNetwork)); _executor->startup(); @@ -107,6 +127,38 @@ public: uassertStatusOK(createCollection(operationContext(), kAppliedToNs.db().toString(), BSON("create" << kAppliedToNs.coll()))); + + _cm = createChunkManagerForOriginalColl(); + } + + ChunkManager createChunkManagerForOriginalColl() { + // Create two chunks, one that is owned by this donor shard and the other owned by some + // other shard. + const OID epoch = OID::gen(); + std::vector<ChunkType> chunks = { + ChunkType{kCrudNs, + ChunkRange{BSON(kOriginalShardKey << MINKEY), BSON(kOriginalShardKey << 0)}, + ChunkVersion(1, 0, epoch), + kOtherShardId}, + ChunkType{kCrudNs, + ChunkRange{BSON(kOriginalShardKey << 0), BSON(kOriginalShardKey << MAXKEY)}, + ChunkVersion(1, 0, epoch), + _sourceId.getShardId()}}; + + auto rt = RoutingTableHistory::makeNew(kCrudNs, + kCrudUUID, + kOriginalShardKeyPattern, + nullptr, + false, + epoch, + boost::none, + false, + chunks); + + return ChunkManager(_sourceId.getShardId(), + DatabaseVersion(UUID::gen(), 1), + makeStandaloneRoutingTableHistory(std::move(rt)), + boost::none); } ThreadPool* writerPool() { @@ -147,6 +199,13 @@ public: Value(id.toBSON())); } + void setReshardingOplogApplicationServerParameterTrue() { + const ServerParameter::Map& parameterMap = ServerParameterSet::getGlobal()->getMap(); + invariant(parameterMap.size()); + const auto param = parameterMap.find("useReshardingOplogApplicationRules"); + uassertStatusOK(param->second->setFromString("true")); + } + const NamespaceString& oplogNs() { return kOplogNs; } @@ -163,6 +222,10 @@ public: return kAppliedToNs; } + const NamespaceString& stashNs() { + return kStashNs; + } + executor::ThreadPoolTaskExecutor* getExecutor() { return _executor.get(); } @@ -171,14 +234,21 @@ public: return _sourceId; } + const ChunkManager& chunkManager() { + return _cm.get(); + } + protected: static constexpr int kWriterPoolSize = 4; const NamespaceString kOplogNs{"config.localReshardingOplogBuffer.xxx.yyy"}; const NamespaceString kCrudNs{"foo.bar"}; const UUID kCrudUUID = UUID::gen(); const NamespaceString kAppliedToNs{"foo", "system.resharding.{}"_format(kCrudUUID.toString())}; + const NamespaceString kStashNs{"foo", "{}.{}"_format(kCrudNs.coll(), kOplogNs.coll())}; const ShardId kMyShardId{"shard1"}; + const ShardId kOtherShardId{"shard2"}; UUID _crudNsUuid = UUID::gen(); + boost::optional<ChunkManager> _cm; const ReshardingSourceId _sourceId{UUID::gen(), kMyShardId}; @@ -198,6 +268,7 @@ TEST_F(ReshardingOplogApplierTest, NothingToIterate) { Timestamp(6, 3), std::move(iterator), 2 /* batchSize */, + chunkManager(), getExecutor(), writerPool()); @@ -236,6 +307,7 @@ TEST_F(ReshardingOplogApplierTest, ApplyBasicCrud) { Timestamp(6, 3), std::move(iterator), 2 /* batchSize */, + chunkManager(), getExecutor(), writerPool()); @@ -283,6 +355,7 @@ TEST_F(ReshardingOplogApplierTest, InsertTypeOplogAppliedInMultipleBatches) { Timestamp(8, 3), std::move(iterator), 3 /* batchSize */, + chunkManager(), getExecutor(), writerPool()); @@ -338,6 +411,7 @@ TEST_F(ReshardingOplogApplierTest, ErrorDuringBatchApplyCloningPhase) { Timestamp(7, 3), std::move(iterator), 4 /* batchSize */, + chunkManager(), getExecutor(), writerPool()); @@ -381,6 +455,7 @@ TEST_F(ReshardingOplogApplierTest, ErrorDuringBatchApplyCatchUpPhase) { Timestamp(6, 3), std::move(iterator), 2 /* batchSize */, + chunkManager(), getExecutor(), writerPool()); @@ -425,6 +500,7 @@ TEST_F(ReshardingOplogApplierTest, ErrorWhileIteratingFirstOplogCloningPhase) { Timestamp(6, 3), std::move(iterator), 2 /* batchSize */, + chunkManager(), getExecutor(), writerPool()); @@ -466,6 +542,7 @@ TEST_F(ReshardingOplogApplierTest, ErrorWhileIteratingFirstOplogCatchUpPhase) { Timestamp(5, 3), std::move(iterator), 2 /* batchSize */, + chunkManager(), getExecutor(), writerPool()); @@ -507,6 +584,7 @@ TEST_F(ReshardingOplogApplierTest, ErrorWhileIteratingFirstBatchCloningPhase) { Timestamp(8, 3), std::move(iterator), 4 /* batchSize */, + chunkManager(), getExecutor(), writerPool()); @@ -552,6 +630,7 @@ TEST_F(ReshardingOplogApplierTest, ErrorWhileIteratingFirstBatchCatchUpPhase) { Timestamp(6, 3), std::move(iterator), 2 /* batchSize */, + chunkManager(), getExecutor(), writerPool()); @@ -598,6 +677,7 @@ TEST_F(ReshardingOplogApplierTest, ErrorWhileIteratingSecondBatchCloningPhase) { Timestamp(7, 3), std::move(iterator), 2 /* batchSize */, + chunkManager(), getExecutor(), writerPool()); @@ -655,6 +735,7 @@ TEST_F(ReshardingOplogApplierTest, ErrorWhileIteratingSecondBatchCatchUpPhase) { Timestamp(6, 3), std::move(iterator), 2 /* batchSize */, + chunkManager(), getExecutor(), writerPool()); @@ -703,6 +784,7 @@ TEST_F(ReshardingOplogApplierTest, ExecutorIsShutDownCloningPhase) { Timestamp(5, 3), std::move(iterator), 4 /* batchSize */, + chunkManager(), getExecutor(), writerPool()); @@ -743,6 +825,7 @@ TEST_F(ReshardingOplogApplierTest, ExecutorIsShutDownCatchUpPhase) { Timestamp(5, 3), std::move(iterator), 2 /* batchSize */, + chunkManager(), getExecutor(), writerPool()); @@ -780,6 +863,7 @@ TEST_F(ReshardingOplogApplierTest, WriterPoolIsShutDownCloningPhase) { Timestamp(5, 3), std::move(iterator), 4 /* batchSize */, + chunkManager(), getExecutor(), writerPool()); @@ -820,6 +904,7 @@ TEST_F(ReshardingOplogApplierTest, WriterPoolIsShutDownCatchUpPhase) { Timestamp(5, 3), std::move(iterator), 2 /* batchSize */, + chunkManager(), getExecutor(), writerPool()); @@ -841,6 +926,176 @@ TEST_F(ReshardingOplogApplierTest, WriterPoolIsShutDownCatchUpPhase) { ASSERT_EQ(Timestamp(6, 3), progressDoc->getProgress().getTs()); } +TEST_F(ReshardingOplogApplierTest, InsertOpIntoOuputCollectionUseReshardingApplicationRules) { + // This case tests applying rule #2 described in ReshardingOplogApplicationRules::_applyInsert. + setReshardingOplogApplicationServerParameterTrue(); + + std::queue<repl::OplogEntry> crudOps; + crudOps.push(makeOplog(repl::OpTime(Timestamp(5, 3), 1), + repl::OpTypeEnum::kInsert, + BSON("_id" << 1), + boost::none)); + crudOps.push(makeOplog(repl::OpTime(Timestamp(6, 3), 1), + repl::OpTypeEnum::kInsert, + BSON("_id" << 2), + boost::none)); + crudOps.push(makeOplog(repl::OpTime(Timestamp(7, 3), 1), + repl::OpTypeEnum::kInsert, + BSON("_id" << 3), + boost::none)); + crudOps.push(makeOplog(repl::OpTime(Timestamp(8, 3), 1), + repl::OpTypeEnum::kInsert, + BSON("_id" << 4), + boost::none)); + + auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps)); + ReshardingOplogApplier applier(getServiceContext(), + sourceId(), + oplogNs(), + crudNs(), + crudUUID(), + Timestamp(6, 3), + std::move(iterator), + 2 /* batchSize */, + chunkManager(), + getExecutor(), + writerPool()); + + auto future = applier.applyUntilCloneFinishedTs(); + future.get(); + + DBDirectClient client(operationContext()); + auto doc = client.findOne(appliedToNs().ns(), BSON("_id" << 1)); + ASSERT_BSONOBJ_EQ(BSON("_id" << 1), doc); + + doc = client.findOne(appliedToNs().ns(), BSON("_id" << 2)); + ASSERT_BSONOBJ_EQ(BSON("_id" << 2), doc); + + future = applier.applyUntilDone(); + future.get(); + + doc = client.findOne(appliedToNs().ns(), BSON("_id" << 3)); + ASSERT_BSONOBJ_EQ(BSON("_id" << 3), doc); + + doc = client.findOne(appliedToNs().ns(), BSON("_id" << 4)); + ASSERT_BSONOBJ_EQ(BSON("_id" << 4), doc); + + auto progressDoc = ReshardingOplogApplier::checkStoredProgress(operationContext(), sourceId()); + ASSERT_TRUE(progressDoc); + ASSERT_EQ(Timestamp(8, 3), progressDoc->getProgress().getClusterTime()); + ASSERT_EQ(Timestamp(8, 3), progressDoc->getProgress().getTs()); +} + +TEST_F(ReshardingOplogApplierTest, + InsertOpShouldTurnIntoReplacementUpdateOnOutputCollectionUseReshardingApplicationRules) { + // This case tests applying rule #3 described in ReshardingOplogApplicationRules::_applyInsert. + setReshardingOplogApplicationServerParameterTrue(); + + std::queue<repl::OplogEntry> crudOps; + crudOps.push(makeOplog(repl::OpTime(Timestamp(5, 3), 1), + repl::OpTypeEnum::kInsert, + BSON("_id" << 1 << "sk" << 2), + boost::none)); + + auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps)); + ReshardingOplogApplier applier(getServiceContext(), + sourceId(), + oplogNs(), + crudNs(), + crudUUID(), + Timestamp(5, 3), + std::move(iterator), + 1 /* batchSize */, + chunkManager(), + getExecutor(), + writerPool()); + + // Make sure a doc with {_id: 1} exists in the output collection before applying an insert with + // the same _id. This donor shard owns these docs under the original shard key (it owns the + // range {sk: 0} -> {sk: maxKey}). + DBDirectClient client(operationContext()); + client.insert(appliedToNs().toString(), BSON("_id" << 1 << "sk" << 1)); + + auto future = applier.applyUntilCloneFinishedTs(); + future.get(); + + // We should have replaced the existing doc in the output collection. + auto doc = client.findOne(appliedToNs().ns(), BSON("_id" << 1)); + ASSERT_BSONOBJ_EQ(BSON("_id" << 1 << "sk" << 2), doc); + + future = applier.applyUntilDone(); + future.get(); + + auto progressDoc = ReshardingOplogApplier::checkStoredProgress(operationContext(), sourceId()); + ASSERT_TRUE(progressDoc); + ASSERT_EQ(Timestamp(5, 3), progressDoc->getProgress().getClusterTime()); + ASSERT_EQ(Timestamp(5, 3), progressDoc->getProgress().getTs()); +} + +TEST_F(ReshardingOplogApplierTest, + InsertOpShouldWriteToStashCollectionUseReshardingApplicationRules) { + // This case tests applying rules #1 and #4 described in + // ReshardingOplogApplicationRules::_applyInsert. + setReshardingOplogApplicationServerParameterTrue(); + + std::queue<repl::OplogEntry> crudOps; + crudOps.push(makeOplog(repl::OpTime(Timestamp(5, 3), 1), + repl::OpTypeEnum::kInsert, + BSON("_id" << 1 << "sk" << 2), + boost::none)); + crudOps.push(makeOplog(repl::OpTime(Timestamp(6, 3), 1), + repl::OpTypeEnum::kInsert, + BSON("_id" << 1 << "sk" << 3), + boost::none)); + + auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps)); + ReshardingOplogApplier applier(getServiceContext(), + sourceId(), + oplogNs(), + crudNs(), + crudUUID(), + Timestamp(5, 3), + std::move(iterator), + 1 /* batchSize */, + chunkManager(), + getExecutor(), + writerPool()); + + // Make sure a doc with {_id: 1} exists in the output collection before applying inserts with + // the same _id. This donor shard does not own the doc {_id: 1, sk: -1} under the original shard + // key, so we should apply rule #4 and insert the doc into the stash collection. + DBDirectClient client(operationContext()); + client.insert(appliedToNs().toString(), BSON("_id" << 1 << "sk" << -1)); + + auto future = applier.applyUntilCloneFinishedTs(); + future.get(); + + // The output collection should still hold the doc {_id: 1, sk: -1}, and the doc with {_id: 1, + // sk: 2} should have been inserted into the stash collection. + auto doc = client.findOne(appliedToNs().ns(), BSON("_id" << 1)); + ASSERT_BSONOBJ_EQ(BSON("_id" << 1 << "sk" << -1), doc); + + doc = client.findOne(stashNs().ns(), BSON("_id" << 1)); + ASSERT_BSONOBJ_EQ(BSON("_id" << 1 << "sk" << 2), doc); + + future = applier.applyUntilDone(); + future.get(); + + // The output collection should still hold the doc {_id: 1, x: 1}. We should have applied rule + // #1 and turned the last insert op into a replacement update on the stash collection, so the + // doc {_id: 1, x: 3} should now exist in the stash collection. + doc = client.findOne(appliedToNs().ns(), BSON("_id" << 1)); + ASSERT_BSONOBJ_EQ(BSON("_id" << 1 << "sk" << -1), doc); + + doc = client.findOne(stashNs().ns(), BSON("_id" << 1)); + ASSERT_BSONOBJ_EQ(BSON("_id" << 1 << "sk" << 3), doc); + + auto progressDoc = ReshardingOplogApplier::checkStoredProgress(operationContext(), sourceId()); + ASSERT_TRUE(progressDoc); + ASSERT_EQ(Timestamp(6, 3), progressDoc->getProgress().getClusterTime()); + ASSERT_EQ(Timestamp(6, 3), progressDoc->getProgress().getTs()); +} + class ReshardingOplogApplierRetryableTest : public ReshardingOplogApplierTest { public: void setUp() override { @@ -976,6 +1231,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, CrudWithEmptyConfigTransactions) { Timestamp(6, 3), std::move(iterator), 2 /* batchSize */, + chunkManager(), getExecutor(), writerPool()); @@ -1055,6 +1311,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, MultipleTxnSameLsidInOneBatch) { Timestamp(6, 3), std::move(iterator), 2 /* batchSize */, + chunkManager(), getExecutor(), writerPool()); @@ -1108,6 +1365,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithLowerExistingTxn) { Timestamp(6, 3), std::move(iterator), 2 /* batchSize */, + chunkManager(), getExecutor(), writerPool()); @@ -1154,6 +1412,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithHigherExistingTxnNum) { Timestamp(6, 3), std::move(iterator), 2 /* batchSize */, + chunkManager(), getExecutor(), writerPool()); @@ -1210,6 +1469,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithLowerExistingTxnNum) { Timestamp(6, 3), std::move(iterator), 2 /* batchSize */, + chunkManager(), getExecutor(), writerPool()); @@ -1257,6 +1517,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithEqualExistingTxnNum) { Timestamp(6, 3), std::move(iterator), 2 /* batchSize */, + chunkManager(), getExecutor(), writerPool()); @@ -1304,6 +1565,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithStmtIdAlreadyExecuted) Timestamp(6, 3), std::move(iterator), 2 /* batchSize */, + chunkManager(), getExecutor(), writerPool()); diff --git a/src/mongo/db/s/resharding/resharding_server_parameters.idl b/src/mongo/db/s/resharding/resharding_server_parameters.idl index 0699a2272c5..9a8ca535741 100644 --- a/src/mongo/db/s/resharding/resharding_server_parameters.idl +++ b/src/mongo/db/s/resharding/resharding_server_parameters.idl @@ -46,3 +46,11 @@ server_parameters: expr: 100 * 1024 validator: gte: 1 + useReshardingOplogApplicationRules: + description: >- + Whether or not the ReshardingOplogApplier should use ReshardingOplogApplicationRules + when applying operations during. If false, will use standard oplog application rules. + set_at: "startup" + cpp_vartype: bool + cpp_varname: gUseReshardingOplogApplicationRules + default: false |