summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjannaerin <golden.janna@gmail.com>2020-10-22 15:56:13 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-11-23 18:00:09 +0000
commit1a7845dc4e68566cbcc854c91b0f3befceb4508c (patch)
tree364836afadfe50f39d3168ddcfd427031a1f9271
parent4cf9000c24591166f4c093f4702a522a4a62097f (diff)
downloadmongo-1a7845dc4e68566cbcc854c91b0f3befceb4508c.tar.gz
SERVER-49901 Implement ordinary insert rule for resharding's oplog application
-rw-r--r--src/mongo/db/s/SConscript1
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_application.cpp261
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_application.h109
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier.cpp40
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier.h9
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp262
-rw-r--r--src/mongo/db/s/resharding/resharding_server_parameters.idl8
7 files changed, 682 insertions, 8 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index a8f6a7edde3..76a66095f9c 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -78,6 +78,7 @@ env.Library(
'resharding/resharding_op_observer.cpp',
'resharding/resharding_oplog_applier.cpp',
'resharding/resharding_oplog_applier_progress.idl',
+ 'resharding/resharding_oplog_application.cpp',
'resharding/resharding_oplog_fetcher.cpp',
'resharding/resharding_recipient_service.cpp',
'resharding/resharding_server_parameters.idl',
diff --git a/src/mongo/db/s/resharding/resharding_oplog_application.cpp b/src/mongo/db/s/resharding/resharding_oplog_application.cpp
new file mode 100644
index 00000000000..b9232293d0a
--- /dev/null
+++ b/src/mongo/db/s/resharding/resharding_oplog_application.cpp
@@ -0,0 +1,261 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/s/resharding/resharding_oplog_application.h"
+
+#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/dbhelpers.h"
+#include "mongo/db/index/index_access_method.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/ops/delete.h"
+#include "mongo/db/ops/update.h"
+#include "mongo/db/stats/counters.h"
+#include "mongo/logv2/log.h"
+
+namespace mongo {
+
+ReshardingOplogApplicationRules::ReshardingOplogApplicationRules(const NamespaceString& outputNss,
+ const NamespaceString& stashNss,
+ const ShardId& donorShardId,
+ ChunkManager sourceChunkMgr)
+ : _outputNss(outputNss),
+ _stashNss(stashNss),
+ _donorShardId(donorShardId),
+ _sourceChunkMgr(std::move(sourceChunkMgr)) {}
+
+Status ReshardingOplogApplicationRules::applyOperation(
+ OperationContext* opCtx, const repl::OplogEntryOrGroupedInserts& opOrGroupedInserts) {
+ LOGV2_DEBUG(
+ 49901, 3, "Applying op for resharding", "op"_attr = redact(opOrGroupedInserts.toBSON()));
+
+ invariant(!opCtx->lockState()->inAWriteUnitOfWork());
+ invariant(opCtx->writesAreReplicated());
+
+ auto op = opOrGroupedInserts.getOp();
+
+ return writeConflictRetry(opCtx, "applyOplogEntryResharding", op.getNss().ns(), [&] {
+ Status status = Status::OK();
+
+ WriteUnitOfWork wuow(opCtx);
+
+ // Take the global lock now in order to avoid hitting the invariant that disallows unlocking
+ // the global lock while inside a WUOW upon releasing the DB lock.
+ Lock::GlobalLock globalLock(opCtx, MODE_IX);
+
+ auto opType = op.getOpType();
+ switch (opType) {
+ case repl::OpTypeEnum::kInsert:
+ status = _applyInsert(opCtx, opOrGroupedInserts);
+ break;
+ case repl::OpTypeEnum::kUpdate:
+ status = _applyUpdate(opCtx, opOrGroupedInserts);
+ break;
+ case repl::OpTypeEnum::kDelete:
+ status = _applyDelete(opCtx, opOrGroupedInserts);
+ break;
+ default:
+ MONGO_UNREACHABLE;
+ }
+
+ if (!status.isOK() && status.code() == ErrorCodes::WriteConflict) {
+ throw WriteConflictException();
+ }
+
+ if (status.isOK())
+ wuow.commit();
+
+ return status;
+ });
+}
+
+Status ReshardingOplogApplicationRules::_applyInsert(
+ OperationContext* opCtx, const repl::OplogEntryOrGroupedInserts& opOrGroupedInserts) {
+ /**
+ * The rules to apply ordinary insert operations are as follows:
+ *
+ * Note that [op _id] refers to the value of op["o"]["_id"].
+ *
+ * 1. If there exists a document with _id == [op _id] in the conflict stash collection, replace
+ * the contents of the doc in the conflict stash collection for this donor shard with the
+ * contents of 'op'.
+ * 2. If there does NOT exist a document with _id == [op _id] in the output collection, insert
+ * the contents of 'op' into the output collection.
+ * 3. If there exists a document with _id == [op _id] in the output collection and it is owned
+ * by this donor shard, replace the contents of the doc in the output collection with the
+ * contents of 'op'.
+ * 4. If there exists a document with _id == [op _id] in the output collection and it is NOT
+ * owned by this donor shard, insert the contents of 'op' into the conflict stash collection.
+ */
+ auto op = opOrGroupedInserts.getOp();
+
+ uassert(ErrorCodes::OperationFailed,
+ "Cannot apply an array insert as a part of resharding oplog application",
+ !opOrGroupedInserts.isGroupedInserts());
+
+ // Writes are replicated, so use global op counters.
+ OpCounters* opCounters = &globalOpCounters;
+ opCounters->gotInsert();
+
+ BSONObj o = op.getObject();
+
+ // If the 'o' field does not have an _id, the oplog entry is corrupted.
+ auto idField = o["_id"];
+ uassert(ErrorCodes::NoSuchKey,
+ str::stream() << "Failed to apply insert due to missing _id: " << redact(op.toBSON()),
+ !idField.eoo());
+
+ BSONObj idQuery = idField.wrap();
+ const NamespaceString outputNss = op.getNss();
+ auto updateMod = write_ops::UpdateModification::parseFromClassicUpdate(o);
+
+ // First, query the conflict stash collection using [op _id] as the query. If a doc exists,
+ // apply rule #1 and run a replacement update on the stash collection.
+ auto stashCollDoc = _queryCollForId(opCtx, _stashNss, idQuery);
+ if (!stashCollDoc.isEmpty()) {
+ auto updateStashColl = [this, idQuery, updateMod](OperationContext* opCtx,
+ Database* db,
+ const AutoGetCollection& collection) {
+ auto request = UpdateRequest();
+ request.setNamespaceString(_stashNss);
+ request.setQuery(idQuery);
+ request.setUpdateModification(updateMod);
+ request.setUpsert(false);
+ request.setFromOplogApplication(true);
+
+ UpdateResult ur = update(opCtx, db, request);
+ invariant(ur.numMatched != 0);
+
+ return Status::OK();
+ };
+
+ return _getCollectionAndApplyOp(opCtx, _stashNss, updateStashColl);
+ }
+
+ // Query the output collection for a doc with _id == [op _id]. If a doc does not exist, apply
+ // rule #2 and insert this doc into the output collection.
+ auto outputCollDoc = _queryCollForId(opCtx, _outputNss, idQuery);
+
+ if (outputCollDoc.isEmpty()) {
+ auto insertToOutputColl =
+ [this, o](OperationContext* opCtx, Database* db, const AutoGetCollection& collection) {
+ OpDebug* const nullOpDebug = nullptr;
+
+ return collection->insertDocument(
+ opCtx, InsertStatement(o), nullOpDebug, false /* fromMigrate */);
+ };
+
+ return _getCollectionAndApplyOp(opCtx, _outputNss, insertToOutputColl);
+ }
+
+ // A doc with [op _id] already exists in the output collection. Check whether this doc belongs
+ // to '_donorShardId' under the original shard key. If it does, apply rule #3 and run a
+ // replacement update on the output collection.
+ if (_sourceChunkMgr.keyBelongsToShard(
+ _sourceChunkMgr.getShardKeyPattern().extractShardKeyFromDoc(outputCollDoc),
+ _donorShardId)) {
+ auto updateOutputCollection =
+ [this, idQuery, updateMod](
+ OperationContext* opCtx, Database* db, const AutoGetCollection& collection) {
+ auto request = UpdateRequest();
+ request.setNamespaceString(_outputNss);
+ request.setQuery(idQuery);
+ request.setUpdateModification(updateMod);
+ request.setUpsert(false);
+ request.setFromOplogApplication(true);
+
+ UpdateResult ur = update(opCtx, db, request);
+ invariant(ur.numMatched != 0);
+
+ return Status::OK();
+ };
+
+ return _getCollectionAndApplyOp(opCtx, _outputNss, updateOutputCollection);
+ }
+
+ // The doc does not belong to '_donorShardId' under the original shard key, so apply rule #4
+ // and insert the contents of 'op' to the stash collection.
+ auto insertToStashColl =
+ [this, o](OperationContext* opCtx, Database* db, const AutoGetCollection& collection) {
+ OpDebug* const nullOpDebug = nullptr;
+ return collection->insertDocument(
+ opCtx, InsertStatement(o), nullOpDebug, false /* fromMigrate */);
+ };
+
+ return _getCollectionAndApplyOp(opCtx, _stashNss, insertToStashColl);
+}
+
+Status ReshardingOplogApplicationRules::_applyUpdate(
+ OperationContext* opCtx, const repl::OplogEntryOrGroupedInserts& opOrGroupedInserts) {
+ // TODO SERVER-49903
+ return Status::OK();
+}
+
+Status ReshardingOplogApplicationRules::_applyDelete(
+ OperationContext* opCtx, const repl::OplogEntryOrGroupedInserts& opOrGroupedInserts) {
+ // TODO SERVER-49902
+ return Status::OK();
+}
+
+Status ReshardingOplogApplicationRules::_getCollectionAndApplyOp(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ unique_function<Status(OperationContext*, Database*, const AutoGetCollection& collection)>
+ applyOpFn) {
+ AutoGetCollection autoColl(opCtx, nss, MODE_IX);
+ uassert(ErrorCodes::NamespaceNotFound,
+ str::stream() << "Failed to apply op during resharding due to missing collection "
+ << nss.ns(),
+ autoColl);
+
+ return applyOpFn(opCtx, autoColl.getDb(), autoColl);
+}
+
+BSONObj ReshardingOplogApplicationRules::_queryCollForId(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& idQuery) {
+ AutoGetCollectionForRead autoRead(opCtx, nss);
+ uassert(ErrorCodes::NamespaceNotFound,
+ str::stream() << "Failed to apply op during resharding due to missing collection "
+ << nss.ns(),
+ autoRead);
+
+ const IndexCatalog* indexCatalog = autoRead->getIndexCatalog();
+ uassert(4990100,
+ str::stream() << "Missing _id index for collection " << nss.ns(),
+ indexCatalog->haveIdIndex(opCtx));
+
+ BSONObj result;
+ Helpers::findById(opCtx, autoRead.getDb(), nss.ns(), idQuery, result);
+ return result;
+}
+} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_oplog_application.h b/src/mongo/db/s/resharding/resharding_oplog_application.h
new file mode 100644
index 00000000000..b2b02b325d9
--- /dev/null
+++ b/src/mongo/db/s/resharding/resharding_oplog_application.h
@@ -0,0 +1,109 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <functional>
+#include <string>
+#include <vector>
+
+#include "mongo/base/status.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/bson/timestamp.h"
+#include "mongo/db/catalog/collection_catalog.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/repl/oplog_entry.h"
+#include "mongo/db/repl/oplog_entry_or_grouped_inserts.h"
+#include "mongo/db/repl/optime.h"
+#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/s/chunk_manager.h"
+
+namespace mongo {
+class Collection;
+class CollectionPtr;
+class NamespaceString;
+class OperationContext;
+
+/**
+ * Applies an operation from an oplog entry using special rules that apply to resharding.
+ */
+class ReshardingOplogApplicationRules {
+public:
+ ReshardingOplogApplicationRules(const NamespaceString& outputNss,
+ const NamespaceString& stashNss,
+ const ShardId& donorShardId,
+ ChunkManager sourceChunkMgr);
+
+ /**
+ * Wraps the op application in a writeConflictRetry loop and is responsible for creating and
+ * committing the WUOW.
+ */
+ Status applyOperation(OperationContext* opCtx,
+ const repl::OplogEntryOrGroupedInserts& opOrGroupedInserts);
+
+private:
+ // Applies an insert operation
+ Status _applyInsert(OperationContext* opCtx,
+ const repl::OplogEntryOrGroupedInserts& opOrGroupedInserts);
+
+ // Applies an update operation
+ Status _applyUpdate(OperationContext* opCtx,
+ const repl::OplogEntryOrGroupedInserts& opOrGroupedInserts);
+
+ // Applies a delete operation
+ Status _applyDelete(OperationContext* opCtx,
+ const repl::OplogEntryOrGroupedInserts& opOrGroupedInserts);
+
+ // Takes db and collection locks in MODE_IX for 'nss' and then applies an op by calling
+ // 'applyOpFn'. 'nss' must either be '_outputNss' or '_stashNss'.
+ Status _getCollectionAndApplyOp(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ unique_function<Status(OperationContext*, Database*, const AutoGetCollection&)> applyOpFn);
+
+ // Takes db and collection locks in MODE_IS for 'nss' and queries the collection using
+ // 'idQuery'.
+ BSONObj _queryCollForId(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& idQuery);
+
+ // Namespace where operations should be applied, unless there is an _id conflict.
+ const NamespaceString _outputNss;
+
+ // Namespace where operations are applied if there is an _id conflict.
+ const NamespaceString _stashNss;
+
+ // ShardId of the donor shard that the operations being applied originate from.
+ const ShardId _donorShardId;
+
+ // The chunk manager for the source namespace and original shard key.
+ const ChunkManager _sourceChunkMgr;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
index 3eae9fd93e0..fac4c892b7a 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
@@ -37,10 +37,14 @@
#include "mongo/base/simple_string_data_comparator.h"
#include "mongo/db/catalog/create_collection.h"
+#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/namespace_string.h"
#include "mongo/db/persistent_task_store.h"
#include "mongo/db/repl/oplog_applier_utils.h"
#include "mongo/db/s/resharding/resharding_donor_oplog_iterator_interface.h"
+#include "mongo/db/s/resharding/resharding_server_parameters_gen.h"
#include "mongo/db/s/resharding_util.h"
#include "mongo/db/session_catalog_mongod.h"
#include "mongo/db/transaction_participant.h"
@@ -173,6 +177,7 @@ ReshardingOplogApplier::ReshardingOplogApplier(
Timestamp reshardingCloneFinishedTs,
std::unique_ptr<ReshardingDonorOplogIteratorInterface> oplogIterator,
size_t batchSize,
+ const ChunkManager& sourceChunkMgr,
OutOfLineExecutor* executor,
ThreadPool* writerPool)
: _sourceId(std::move(sourceId)),
@@ -185,6 +190,8 @@ ReshardingOplogApplier::ReshardingOplogApplier(
"{}.{}"_format(_nsBeingResharded.coll(), _oplogNs.coll())),
_reshardingCloneFinishedTs(std::move(reshardingCloneFinishedTs)),
_batchSize(batchSize),
+ _applicationRules(ReshardingOplogApplicationRules(
+ _outputNs, _reshardingTempNs, _sourceId.getShardId(), sourceChunkMgr)),
_service(service),
_executor(executor),
_writerPool(writerPool),
@@ -487,13 +494,32 @@ Status ReshardingOplogApplier::_applyOplogEntryOrGroupedInserts(
return insertOplogAndUpdateConfigForRetryable(opCtx, entryOrGroupedInserts.getOp());
}
- // We always use oplog application mode 'kInitialSync', because we're applying oplog entries to
- // a cloned database the way initial sync does.
- return repl::OplogApplierUtils::applyOplogEntryOrGroupedInsertsCommon(opCtx,
- entryOrGroupedInserts,
- oplogApplicationMode,
- incrementOpsAppliedStats,
- nullptr /* opCounters*/);
+ invariant(DocumentValidationSettings::get(opCtx).isSchemaValidationDisabled());
+
+ auto opType = op.getOpType();
+ if (opType == repl::OpTypeEnum::kNoop) {
+ return Status::OK();
+ } else if (resharding::gUseReshardingOplogApplicationRules) {
+ if (opType == repl::OpTypeEnum::kInsert) {
+ return _applicationRules.applyOperation(opCtx, entryOrGroupedInserts);
+ } else {
+ // TODO SERVER-49902 call ReshardingOplogApplicationRules::applyOperation for deletes
+ // TODO SERVER-49903 call ReshardingOplogApplicationRules::applyOperation for updates
+ return repl::OplogApplierUtils::applyOplogEntryOrGroupedInsertsCommon(
+ opCtx,
+ entryOrGroupedInserts,
+ oplogApplicationMode,
+ incrementOpsAppliedStats,
+ nullptr);
+ }
+ } else {
+ // We always use oplog application mode 'kInitialSync', because we're applying oplog entries
+ // to a cloned database the way initial sync does.
+ return repl::OplogApplierUtils::applyOplogEntryOrGroupedInsertsCommon(
+ opCtx, entryOrGroupedInserts, oplogApplicationMode, incrementOpsAppliedStats, nullptr);
+ }
+
+ MONGO_UNREACHABLE;
}
// TODO: use MutableOplogEntry to handle prePostImageOps? Because OplogEntry tries to create BSON
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.h b/src/mongo/db/s/resharding/resharding_oplog_applier.h
index a95bb3e7029..a711bd1de88 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_applier.h
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier.h
@@ -34,7 +34,9 @@
#include "mongo/db/repl/oplog_entry_or_grouped_inserts.h"
#include "mongo/db/s/resharding/donor_oplog_id_gen.h"
#include "mongo/db/s/resharding/resharding_donor_oplog_iterator.h"
+#include "mongo/db/s/resharding/resharding_oplog_application.h"
#include "mongo/db/s/resharding/resharding_oplog_applier_progress_gen.h"
+#include "mongo/s/chunk_manager.h"
#include "mongo/util/future.h"
namespace mongo {
@@ -58,6 +60,7 @@ public:
Timestamp reshardingCloneFinishedTs,
std::unique_ptr<ReshardingDonorOplogIteratorInterface> oplogIterator,
size_t batchSize,
+ const ChunkManager& sourceChunkMgr,
OutOfLineExecutor* executor,
ThreadPool* writerPool);
@@ -174,6 +177,10 @@ private:
// before deciding to apply all oplog entries currently in the buffer.
const size_t _batchSize;
+ // Actually applies the ops, using special rules that apply only to resharding. Only used when
+ // the 'useReshardingOplogApplicationRules' server parameter is set to true.
+ ReshardingOplogApplicationRules _applicationRules;
+
Mutex _mutex = MONGO_MAKE_LATCH("ReshardingOplogApplier::_mutex");
// Member variable concurrency access rules:
@@ -209,7 +216,7 @@ private:
int _remainingWritersToWait{0};
// (M) Keeps track of the status from writer vectors. Will only keep one error if there are
- // mulitple occurrances.
+ // multiple occurrances.
Status _currentBatchConsolidatedStatus{Status::OK()};
// (R) The source of the oplog entries to be applied.
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
index d6dd459d982..c20a32d2169 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
@@ -34,19 +34,25 @@
#include <fmt/format.h>
#include "mongo/db/catalog/create_collection.h"
+#include "mongo/db/catalog_raii.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/logical_session_cache_noop.h"
#include "mongo/db/repl/oplog_applier.h"
#include "mongo/db/repl/storage_interface_impl.h"
+#include "mongo/db/s/collection_sharding_runtime.h"
+#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/resharding/resharding_donor_oplog_iterator_interface.h"
#include "mongo/db/s/resharding/resharding_oplog_applier.h"
#include "mongo/db/s/resharding_util.h"
#include "mongo/db/s/sharding_mongod_test_fixture.h"
+#include "mongo/db/s/sharding_state.h"
#include "mongo/db/service_context_d_test_fixture.h"
#include "mongo/db/session_catalog_mongod.h"
#include "mongo/db/transaction_participant.h"
#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
#include "mongo/logv2/log.h"
+#include "mongo/s/catalog_cache_loader_mock.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/uuid.h"
@@ -93,11 +99,25 @@ private:
class ReshardingOplogApplierTest : public ShardingMongodTestFixture {
public:
+ const HostAndPort kConfigHostAndPort{"DummyConfig", 12345};
+ const std::string kOriginalShardKey = "sk";
+ const BSONObj kOriginalShardKeyPattern{BSON(kOriginalShardKey << 1)};
+
void setUp() override {
ShardingMongodTestFixture::setUp();
serverGlobalParams.clusterRole = ClusterRole::ShardServer;
+ auto clusterId = OID::gen();
+ ShardingState::get(getServiceContext())
+ ->setInitialized(_sourceId.getShardId().toString(), clusterId);
+
+ auto mockLoader = std::make_unique<CatalogCacheLoaderMock>();
+ CatalogCacheLoader::set(getServiceContext(), std::move(mockLoader));
+
+ uassertStatusOK(
+ initializeGlobalShardingStateForMongodForTest(ConnectionString(kConfigHostAndPort)));
+
auto mockNetwork = std::make_unique<executor::NetworkInterfaceMock>();
_executor = executor::makeThreadPoolTestExecutor(std::move(mockNetwork));
_executor->startup();
@@ -107,6 +127,38 @@ public:
uassertStatusOK(createCollection(operationContext(),
kAppliedToNs.db().toString(),
BSON("create" << kAppliedToNs.coll())));
+
+ _cm = createChunkManagerForOriginalColl();
+ }
+
+ ChunkManager createChunkManagerForOriginalColl() {
+ // Create two chunks, one that is owned by this donor shard and the other owned by some
+ // other shard.
+ const OID epoch = OID::gen();
+ std::vector<ChunkType> chunks = {
+ ChunkType{kCrudNs,
+ ChunkRange{BSON(kOriginalShardKey << MINKEY), BSON(kOriginalShardKey << 0)},
+ ChunkVersion(1, 0, epoch),
+ kOtherShardId},
+ ChunkType{kCrudNs,
+ ChunkRange{BSON(kOriginalShardKey << 0), BSON(kOriginalShardKey << MAXKEY)},
+ ChunkVersion(1, 0, epoch),
+ _sourceId.getShardId()}};
+
+ auto rt = RoutingTableHistory::makeNew(kCrudNs,
+ kCrudUUID,
+ kOriginalShardKeyPattern,
+ nullptr,
+ false,
+ epoch,
+ boost::none,
+ false,
+ chunks);
+
+ return ChunkManager(_sourceId.getShardId(),
+ DatabaseVersion(UUID::gen(), 1),
+ makeStandaloneRoutingTableHistory(std::move(rt)),
+ boost::none);
}
ThreadPool* writerPool() {
@@ -147,6 +199,13 @@ public:
Value(id.toBSON()));
}
+ void setReshardingOplogApplicationServerParameterTrue() {
+ const ServerParameter::Map& parameterMap = ServerParameterSet::getGlobal()->getMap();
+ invariant(parameterMap.size());
+ const auto param = parameterMap.find("useReshardingOplogApplicationRules");
+ uassertStatusOK(param->second->setFromString("true"));
+ }
+
const NamespaceString& oplogNs() {
return kOplogNs;
}
@@ -163,6 +222,10 @@ public:
return kAppliedToNs;
}
+ const NamespaceString& stashNs() {
+ return kStashNs;
+ }
+
executor::ThreadPoolTaskExecutor* getExecutor() {
return _executor.get();
}
@@ -171,14 +234,21 @@ public:
return _sourceId;
}
+ const ChunkManager& chunkManager() {
+ return _cm.get();
+ }
+
protected:
static constexpr int kWriterPoolSize = 4;
const NamespaceString kOplogNs{"config.localReshardingOplogBuffer.xxx.yyy"};
const NamespaceString kCrudNs{"foo.bar"};
const UUID kCrudUUID = UUID::gen();
const NamespaceString kAppliedToNs{"foo", "system.resharding.{}"_format(kCrudUUID.toString())};
+ const NamespaceString kStashNs{"foo", "{}.{}"_format(kCrudNs.coll(), kOplogNs.coll())};
const ShardId kMyShardId{"shard1"};
+ const ShardId kOtherShardId{"shard2"};
UUID _crudNsUuid = UUID::gen();
+ boost::optional<ChunkManager> _cm;
const ReshardingSourceId _sourceId{UUID::gen(), kMyShardId};
@@ -198,6 +268,7 @@ TEST_F(ReshardingOplogApplierTest, NothingToIterate) {
Timestamp(6, 3),
std::move(iterator),
2 /* batchSize */,
+ chunkManager(),
getExecutor(),
writerPool());
@@ -236,6 +307,7 @@ TEST_F(ReshardingOplogApplierTest, ApplyBasicCrud) {
Timestamp(6, 3),
std::move(iterator),
2 /* batchSize */,
+ chunkManager(),
getExecutor(),
writerPool());
@@ -283,6 +355,7 @@ TEST_F(ReshardingOplogApplierTest, InsertTypeOplogAppliedInMultipleBatches) {
Timestamp(8, 3),
std::move(iterator),
3 /* batchSize */,
+ chunkManager(),
getExecutor(),
writerPool());
@@ -338,6 +411,7 @@ TEST_F(ReshardingOplogApplierTest, ErrorDuringBatchApplyCloningPhase) {
Timestamp(7, 3),
std::move(iterator),
4 /* batchSize */,
+ chunkManager(),
getExecutor(),
writerPool());
@@ -381,6 +455,7 @@ TEST_F(ReshardingOplogApplierTest, ErrorDuringBatchApplyCatchUpPhase) {
Timestamp(6, 3),
std::move(iterator),
2 /* batchSize */,
+ chunkManager(),
getExecutor(),
writerPool());
@@ -425,6 +500,7 @@ TEST_F(ReshardingOplogApplierTest, ErrorWhileIteratingFirstOplogCloningPhase) {
Timestamp(6, 3),
std::move(iterator),
2 /* batchSize */,
+ chunkManager(),
getExecutor(),
writerPool());
@@ -466,6 +542,7 @@ TEST_F(ReshardingOplogApplierTest, ErrorWhileIteratingFirstOplogCatchUpPhase) {
Timestamp(5, 3),
std::move(iterator),
2 /* batchSize */,
+ chunkManager(),
getExecutor(),
writerPool());
@@ -507,6 +584,7 @@ TEST_F(ReshardingOplogApplierTest, ErrorWhileIteratingFirstBatchCloningPhase) {
Timestamp(8, 3),
std::move(iterator),
4 /* batchSize */,
+ chunkManager(),
getExecutor(),
writerPool());
@@ -552,6 +630,7 @@ TEST_F(ReshardingOplogApplierTest, ErrorWhileIteratingFirstBatchCatchUpPhase) {
Timestamp(6, 3),
std::move(iterator),
2 /* batchSize */,
+ chunkManager(),
getExecutor(),
writerPool());
@@ -598,6 +677,7 @@ TEST_F(ReshardingOplogApplierTest, ErrorWhileIteratingSecondBatchCloningPhase) {
Timestamp(7, 3),
std::move(iterator),
2 /* batchSize */,
+ chunkManager(),
getExecutor(),
writerPool());
@@ -655,6 +735,7 @@ TEST_F(ReshardingOplogApplierTest, ErrorWhileIteratingSecondBatchCatchUpPhase) {
Timestamp(6, 3),
std::move(iterator),
2 /* batchSize */,
+ chunkManager(),
getExecutor(),
writerPool());
@@ -703,6 +784,7 @@ TEST_F(ReshardingOplogApplierTest, ExecutorIsShutDownCloningPhase) {
Timestamp(5, 3),
std::move(iterator),
4 /* batchSize */,
+ chunkManager(),
getExecutor(),
writerPool());
@@ -743,6 +825,7 @@ TEST_F(ReshardingOplogApplierTest, ExecutorIsShutDownCatchUpPhase) {
Timestamp(5, 3),
std::move(iterator),
2 /* batchSize */,
+ chunkManager(),
getExecutor(),
writerPool());
@@ -780,6 +863,7 @@ TEST_F(ReshardingOplogApplierTest, WriterPoolIsShutDownCloningPhase) {
Timestamp(5, 3),
std::move(iterator),
4 /* batchSize */,
+ chunkManager(),
getExecutor(),
writerPool());
@@ -820,6 +904,7 @@ TEST_F(ReshardingOplogApplierTest, WriterPoolIsShutDownCatchUpPhase) {
Timestamp(5, 3),
std::move(iterator),
2 /* batchSize */,
+ chunkManager(),
getExecutor(),
writerPool());
@@ -841,6 +926,176 @@ TEST_F(ReshardingOplogApplierTest, WriterPoolIsShutDownCatchUpPhase) {
ASSERT_EQ(Timestamp(6, 3), progressDoc->getProgress().getTs());
}
+TEST_F(ReshardingOplogApplierTest, InsertOpIntoOuputCollectionUseReshardingApplicationRules) {
+ // This case tests applying rule #2 described in ReshardingOplogApplicationRules::_applyInsert.
+ setReshardingOplogApplicationServerParameterTrue();
+
+ std::queue<repl::OplogEntry> crudOps;
+ crudOps.push(makeOplog(repl::OpTime(Timestamp(5, 3), 1),
+ repl::OpTypeEnum::kInsert,
+ BSON("_id" << 1),
+ boost::none));
+ crudOps.push(makeOplog(repl::OpTime(Timestamp(6, 3), 1),
+ repl::OpTypeEnum::kInsert,
+ BSON("_id" << 2),
+ boost::none));
+ crudOps.push(makeOplog(repl::OpTime(Timestamp(7, 3), 1),
+ repl::OpTypeEnum::kInsert,
+ BSON("_id" << 3),
+ boost::none));
+ crudOps.push(makeOplog(repl::OpTime(Timestamp(8, 3), 1),
+ repl::OpTypeEnum::kInsert,
+ BSON("_id" << 4),
+ boost::none));
+
+ auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps));
+ ReshardingOplogApplier applier(getServiceContext(),
+ sourceId(),
+ oplogNs(),
+ crudNs(),
+ crudUUID(),
+ Timestamp(6, 3),
+ std::move(iterator),
+ 2 /* batchSize */,
+ chunkManager(),
+ getExecutor(),
+ writerPool());
+
+ auto future = applier.applyUntilCloneFinishedTs();
+ future.get();
+
+ DBDirectClient client(operationContext());
+ auto doc = client.findOne(appliedToNs().ns(), BSON("_id" << 1));
+ ASSERT_BSONOBJ_EQ(BSON("_id" << 1), doc);
+
+ doc = client.findOne(appliedToNs().ns(), BSON("_id" << 2));
+ ASSERT_BSONOBJ_EQ(BSON("_id" << 2), doc);
+
+ future = applier.applyUntilDone();
+ future.get();
+
+ doc = client.findOne(appliedToNs().ns(), BSON("_id" << 3));
+ ASSERT_BSONOBJ_EQ(BSON("_id" << 3), doc);
+
+ doc = client.findOne(appliedToNs().ns(), BSON("_id" << 4));
+ ASSERT_BSONOBJ_EQ(BSON("_id" << 4), doc);
+
+ auto progressDoc = ReshardingOplogApplier::checkStoredProgress(operationContext(), sourceId());
+ ASSERT_TRUE(progressDoc);
+ ASSERT_EQ(Timestamp(8, 3), progressDoc->getProgress().getClusterTime());
+ ASSERT_EQ(Timestamp(8, 3), progressDoc->getProgress().getTs());
+}
+
+TEST_F(ReshardingOplogApplierTest,
+ InsertOpShouldTurnIntoReplacementUpdateOnOutputCollectionUseReshardingApplicationRules) {
+ // This case tests applying rule #3 described in ReshardingOplogApplicationRules::_applyInsert.
+ setReshardingOplogApplicationServerParameterTrue();
+
+ std::queue<repl::OplogEntry> crudOps;
+ crudOps.push(makeOplog(repl::OpTime(Timestamp(5, 3), 1),
+ repl::OpTypeEnum::kInsert,
+ BSON("_id" << 1 << "sk" << 2),
+ boost::none));
+
+ auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps));
+ ReshardingOplogApplier applier(getServiceContext(),
+ sourceId(),
+ oplogNs(),
+ crudNs(),
+ crudUUID(),
+ Timestamp(5, 3),
+ std::move(iterator),
+ 1 /* batchSize */,
+ chunkManager(),
+ getExecutor(),
+ writerPool());
+
+ // Make sure a doc with {_id: 1} exists in the output collection before applying an insert with
+ // the same _id. This donor shard owns these docs under the original shard key (it owns the
+ // range {sk: 0} -> {sk: maxKey}).
+ DBDirectClient client(operationContext());
+ client.insert(appliedToNs().toString(), BSON("_id" << 1 << "sk" << 1));
+
+ auto future = applier.applyUntilCloneFinishedTs();
+ future.get();
+
+ // We should have replaced the existing doc in the output collection.
+ auto doc = client.findOne(appliedToNs().ns(), BSON("_id" << 1));
+ ASSERT_BSONOBJ_EQ(BSON("_id" << 1 << "sk" << 2), doc);
+
+ future = applier.applyUntilDone();
+ future.get();
+
+ auto progressDoc = ReshardingOplogApplier::checkStoredProgress(operationContext(), sourceId());
+ ASSERT_TRUE(progressDoc);
+ ASSERT_EQ(Timestamp(5, 3), progressDoc->getProgress().getClusterTime());
+ ASSERT_EQ(Timestamp(5, 3), progressDoc->getProgress().getTs());
+}
+
+TEST_F(ReshardingOplogApplierTest,
+ InsertOpShouldWriteToStashCollectionUseReshardingApplicationRules) {
+ // This case tests applying rules #1 and #4 described in
+ // ReshardingOplogApplicationRules::_applyInsert.
+ setReshardingOplogApplicationServerParameterTrue();
+
+ std::queue<repl::OplogEntry> crudOps;
+ crudOps.push(makeOplog(repl::OpTime(Timestamp(5, 3), 1),
+ repl::OpTypeEnum::kInsert,
+ BSON("_id" << 1 << "sk" << 2),
+ boost::none));
+ crudOps.push(makeOplog(repl::OpTime(Timestamp(6, 3), 1),
+ repl::OpTypeEnum::kInsert,
+ BSON("_id" << 1 << "sk" << 3),
+ boost::none));
+
+ auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps));
+ ReshardingOplogApplier applier(getServiceContext(),
+ sourceId(),
+ oplogNs(),
+ crudNs(),
+ crudUUID(),
+ Timestamp(5, 3),
+ std::move(iterator),
+ 1 /* batchSize */,
+ chunkManager(),
+ getExecutor(),
+ writerPool());
+
+ // Make sure a doc with {_id: 1} exists in the output collection before applying inserts with
+ // the same _id. This donor shard does not own the doc {_id: 1, sk: -1} under the original shard
+ // key, so we should apply rule #4 and insert the doc into the stash collection.
+ DBDirectClient client(operationContext());
+ client.insert(appliedToNs().toString(), BSON("_id" << 1 << "sk" << -1));
+
+ auto future = applier.applyUntilCloneFinishedTs();
+ future.get();
+
+ // The output collection should still hold the doc {_id: 1, sk: -1}, and the doc with {_id: 1,
+ // sk: 2} should have been inserted into the stash collection.
+ auto doc = client.findOne(appliedToNs().ns(), BSON("_id" << 1));
+ ASSERT_BSONOBJ_EQ(BSON("_id" << 1 << "sk" << -1), doc);
+
+ doc = client.findOne(stashNs().ns(), BSON("_id" << 1));
+ ASSERT_BSONOBJ_EQ(BSON("_id" << 1 << "sk" << 2), doc);
+
+ future = applier.applyUntilDone();
+ future.get();
+
+ // The output collection should still hold the doc {_id: 1, x: 1}. We should have applied rule
+ // #1 and turned the last insert op into a replacement update on the stash collection, so the
+ // doc {_id: 1, x: 3} should now exist in the stash collection.
+ doc = client.findOne(appliedToNs().ns(), BSON("_id" << 1));
+ ASSERT_BSONOBJ_EQ(BSON("_id" << 1 << "sk" << -1), doc);
+
+ doc = client.findOne(stashNs().ns(), BSON("_id" << 1));
+ ASSERT_BSONOBJ_EQ(BSON("_id" << 1 << "sk" << 3), doc);
+
+ auto progressDoc = ReshardingOplogApplier::checkStoredProgress(operationContext(), sourceId());
+ ASSERT_TRUE(progressDoc);
+ ASSERT_EQ(Timestamp(6, 3), progressDoc->getProgress().getClusterTime());
+ ASSERT_EQ(Timestamp(6, 3), progressDoc->getProgress().getTs());
+}
+
class ReshardingOplogApplierRetryableTest : public ReshardingOplogApplierTest {
public:
void setUp() override {
@@ -976,6 +1231,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, CrudWithEmptyConfigTransactions) {
Timestamp(6, 3),
std::move(iterator),
2 /* batchSize */,
+ chunkManager(),
getExecutor(),
writerPool());
@@ -1055,6 +1311,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, MultipleTxnSameLsidInOneBatch) {
Timestamp(6, 3),
std::move(iterator),
2 /* batchSize */,
+ chunkManager(),
getExecutor(),
writerPool());
@@ -1108,6 +1365,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithLowerExistingTxn) {
Timestamp(6, 3),
std::move(iterator),
2 /* batchSize */,
+ chunkManager(),
getExecutor(),
writerPool());
@@ -1154,6 +1412,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithHigherExistingTxnNum) {
Timestamp(6, 3),
std::move(iterator),
2 /* batchSize */,
+ chunkManager(),
getExecutor(),
writerPool());
@@ -1210,6 +1469,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithLowerExistingTxnNum) {
Timestamp(6, 3),
std::move(iterator),
2 /* batchSize */,
+ chunkManager(),
getExecutor(),
writerPool());
@@ -1257,6 +1517,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithEqualExistingTxnNum) {
Timestamp(6, 3),
std::move(iterator),
2 /* batchSize */,
+ chunkManager(),
getExecutor(),
writerPool());
@@ -1304,6 +1565,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithStmtIdAlreadyExecuted)
Timestamp(6, 3),
std::move(iterator),
2 /* batchSize */,
+ chunkManager(),
getExecutor(),
writerPool());
diff --git a/src/mongo/db/s/resharding/resharding_server_parameters.idl b/src/mongo/db/s/resharding/resharding_server_parameters.idl
index 0699a2272c5..9a8ca535741 100644
--- a/src/mongo/db/s/resharding/resharding_server_parameters.idl
+++ b/src/mongo/db/s/resharding/resharding_server_parameters.idl
@@ -46,3 +46,11 @@ server_parameters:
expr: 100 * 1024
validator:
gte: 1
+ useReshardingOplogApplicationRules:
+ description: >-
+ Whether or not the ReshardingOplogApplier should use ReshardingOplogApplicationRules
+ when applying operations during. If false, will use standard oplog application rules.
+ set_at: "startup"
+ cpp_vartype: bool
+ cpp_varname: gUseReshardingOplogApplicationRules
+ default: false