summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/resharding
diff options
context:
space:
mode:
authorLaMont Nelson <lamont.nelson@mongodb.com>2020-12-09 04:35:38 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-12-23 03:49:04 +0000
commite3372897dcea110233204133a8e62141ba2db70d (patch)
tree07a5ec45bcee28d71da45e326d0beaa0fca3dcc6 /src/mongo/db/s/resharding
parent288354349eb98af037fe157220d3e161a58703b7 (diff)
downloadmongo-e3372897dcea110233204133a8e62141ba2db70d.tar.gz
SERVER-52795 Resharding donors write final oplog entry upon entering the preparing-to-mirror state.
Diffstat (limited to 'src/mongo/db/s/resharding')
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp5
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp212
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h258
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service.cpp101
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service.h4
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service_test.cpp247
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp4
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.cpp9
8 files changed, 623 insertions, 217 deletions
diff --git a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp
index b999afdf103..e29851f0d29 100644
--- a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp
@@ -83,9 +83,8 @@ public:
ReshardingDonorOplogId oplogId(id, id);
const BSONObj oField(BSON("msg"
<< "Created temporary resharding collection"));
- const BSONObj o2Field(BSON("type"
- << "reshardFinalOp"
- << "reshardingUUID" << UUID::gen()));
+ const BSONObj o2Field(
+ BSON("type" << kReshardFinalOpLogType << "reshardingUUID" << UUID::gen()));
return makeOplog(_crudNss, _uuid, repl::OpTypeEnum::kNoop, oField, o2Field, oplogId);
}
diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp
index 3894b722254..c89a11da1a3 100644
--- a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp
@@ -29,6 +29,8 @@
#include "mongo/platform/basic.h"
+#include "mongo/db/s/resharding/resharding_donor_recipient_common_test.h"
+
#include "mongo/db/catalog_raii.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
@@ -45,166 +47,6 @@ namespace {
using namespace fmt::literals;
-class ReshardingDonorRecipientCommonInternalsTest : public ShardServerTestFixture {
-public:
- const UUID kExistingUUID = UUID::gen();
- const NamespaceString kOriginalNss = NamespaceString("db", "foo");
-
- const NamespaceString kTemporaryReshardingNss =
- constructTemporaryReshardingNss("db", kExistingUUID);
- const std::string kOriginalShardKey = "oldKey";
- const BSONObj kOriginalShardKeyPattern = BSON(kOriginalShardKey << 1);
- const std::string kReshardingKey = "newKey";
- const BSONObj kReshardingKeyPattern = BSON(kReshardingKey << 1);
- const OID kOriginalEpoch = OID::gen();
- const OID kReshardingEpoch = OID::gen();
- const UUID kReshardingUUID = UUID::gen();
-
- const ShardId kShardOne = ShardId("shardOne");
- const ShardId kShardTwo = ShardId("shardTwo");
-
- const std::vector<ShardId> kShardIds = {kShardOne, kShardTwo};
-
- const Timestamp kFetchTimestamp = Timestamp(1, 0);
-
-protected:
- CollectionMetadata makeShardedMetadataForOriginalCollection(OperationContext* opCtx) {
- return makeShardedMetadata(opCtx,
- kOriginalNss,
- kOriginalShardKey,
- kOriginalShardKeyPattern,
- kExistingUUID,
- kOriginalEpoch);
- }
-
- CollectionMetadata makeShardedMetadataForTemporaryReshardingCollection(
- OperationContext* opCtx) {
- return makeShardedMetadata(opCtx,
- kTemporaryReshardingNss,
- kReshardingKey,
- kReshardingKeyPattern,
- kReshardingUUID,
- kReshardingEpoch);
- }
-
- CollectionMetadata makeShardedMetadata(OperationContext* opCtx,
- const NamespaceString& nss,
- const std::string& shardKey,
- const BSONObj& shardKeyPattern,
- const UUID& uuid,
- const OID& epoch) {
- auto range = ChunkRange(BSON(shardKey << MINKEY), BSON(shardKey << MAXKEY));
- auto chunk = ChunkType(nss,
- std::move(range),
- ChunkVersion(1, 0, epoch, boost::none /* timestamp */),
- kShardTwo);
- ChunkManager cm(kShardOne,
- DatabaseVersion(uuid),
- makeStandaloneRoutingTableHistory(
- RoutingTableHistory::makeNew(nss,
- uuid,
- shardKeyPattern,
- nullptr,
- false,
- epoch,
- boost::none /* timestamp */,
- boost::none,
- true,
- {std::move(chunk)})),
- boost::none);
-
- if (!OperationShardingState::isOperationVersioned(opCtx)) {
- const auto version = cm.getVersion(kShardOne);
- BSONObjBuilder builder;
- version.appendToCommand(&builder);
-
- auto& oss = OperationShardingState::get(opCtx);
- oss.initializeClientRoutingVersionsFromCommand(nss, builder.obj());
- }
-
- return CollectionMetadata(std::move(cm), kShardOne);
- }
-
- ReshardingFields createCommonReshardingFields(const UUID& reshardingUUID,
- CoordinatorStateEnum state) {
- auto fields = ReshardingFields(reshardingUUID);
- fields.setState(state);
- return fields;
- };
-
- void appendDonorFieldsToReshardingFields(ReshardingFields& fields,
- const BSONObj& reshardingKey) {
- fields.setDonorFields(TypeCollectionDonorFields(reshardingKey));
- }
-
- void appendRecipientFieldsToReshardingFields(
- ReshardingFields& fields,
- const std::vector<ShardId> donorShardIds,
- const UUID& existingUUID,
- const NamespaceString& originalNss,
- const boost::optional<Timestamp>& fetchTimestamp = boost::none) {
- auto recipientFields =
- TypeCollectionRecipientFields(donorShardIds, existingUUID, originalNss);
- emplaceFetchTimestampIfExists(recipientFields, fetchTimestamp);
- fields.setRecipientFields(std::move(recipientFields));
- }
-
- template <class ReshardingDocument>
- void assertCommonDocFieldsMatchReshardingFields(const NamespaceString& nss,
- const UUID& reshardingUUID,
- const UUID& existingUUID,
- const BSONObj& reshardingKey,
- const ReshardingDocument& reshardingDoc) {
- ASSERT_EQ(reshardingDoc.get_id(), reshardingUUID);
- ASSERT_EQ(reshardingDoc.getNss(), nss);
- ASSERT_EQ(reshardingDoc.getExistingUUID(), existingUUID);
- ASSERT_BSONOBJ_EQ(reshardingDoc.getReshardingKey().toBSON(), reshardingKey);
- }
-
- void assertDonorDocMatchesReshardingFields(const NamespaceString& nss,
- const UUID& existingUUID,
- const ReshardingFields& reshardingFields,
- const ReshardingDonorDocument& donorDoc) {
- assertCommonDocFieldsMatchReshardingFields<ReshardingDonorDocument>(
- nss,
- reshardingFields.getUuid(),
- existingUUID,
- reshardingFields.getDonorFields()->getReshardingKey().toBSON(),
- donorDoc);
- ASSERT(donorDoc.getState() == DonorStateEnum::kPreparingToDonate);
- ASSERT(donorDoc.getMinFetchTimestamp() == boost::none);
- }
-
- void assertRecipientDocMatchesReshardingFields(
- const CollectionMetadata& metadata,
- const ReshardingFields& reshardingFields,
- const ReshardingRecipientDocument& recipientDoc) {
- assertCommonDocFieldsMatchReshardingFields<ReshardingRecipientDocument>(
- reshardingFields.getRecipientFields()->getOriginalNamespace(),
- reshardingFields.getUuid(),
- reshardingFields.getRecipientFields()->getExistingUUID(),
- metadata.getShardKeyPattern().toBSON(),
- recipientDoc);
-
- ASSERT(recipientDoc.getState() == RecipientStateEnum::kCreatingCollection);
- ASSERT(recipientDoc.getFetchTimestamp() ==
- reshardingFields.getRecipientFields()->getFetchTimestamp());
-
- auto donorShardIds = reshardingFields.getRecipientFields()->getDonorShardIds();
- auto donorShardIdsSet = std::set<ShardId>(donorShardIds.begin(), donorShardIds.end());
-
- for (const auto& donorShardMirroringEntry : recipientDoc.getDonorShardsMirroring()) {
- ASSERT_EQ(donorShardMirroringEntry.getMirroring(), false);
- auto reshardingFieldsDonorShardId =
- donorShardIdsSet.find(donorShardMirroringEntry.getId());
- ASSERT(reshardingFieldsDonorShardId != donorShardIdsSet.end());
- donorShardIdsSet.erase(reshardingFieldsDonorShardId);
- }
-
- ASSERT(donorShardIdsSet.empty());
- }
-};
-
TEST_F(ReshardingDonorRecipientCommonInternalsTest, ConstructDonorDocumentFromReshardingFields) {
OperationContext* opCtx = operationContext();
auto metadata = makeShardedMetadataForOriginalCollection(opCtx);
@@ -234,55 +76,6 @@ TEST_F(ReshardingDonorRecipientCommonInternalsTest,
assertRecipientDocMatchesReshardingFields(metadata, reshardingFields, recipientDoc);
}
-class ReshardingDonorRecipientCommonTest : public ReshardingDonorRecipientCommonInternalsTest {
-public:
- void setUp() override {
- ShardServerTestFixture::setUp();
-
- WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext());
-
- _registry = repl::PrimaryOnlyServiceRegistry::get(getServiceContext());
-
- std::unique_ptr<ReshardingDonorService> donorService =
- std::make_unique<ReshardingDonorService>(getServiceContext());
- _registry->registerService(std::move(donorService));
-
- std::unique_ptr<ReshardingRecipientService> recipientService =
- std::make_unique<ReshardingRecipientService>(getServiceContext());
- _registry->registerService(std::move(recipientService));
- _registry->onStartup(operationContext());
-
- stepUp();
- }
-
- void tearDown() override {
- WaitForMajorityService::get(getServiceContext()).shutDown();
-
- Grid::get(operationContext())->getExecutorPool()->shutdownAndJoin();
-
- _registry->onShutdown();
-
- ShardServerTestFixture::tearDown();
- }
-
- void stepUp() {
- auto replCoord = repl::ReplicationCoordinator::get(getServiceContext());
-
- // Advance term
- _term++;
-
- ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY));
- ASSERT_OK(replCoord->updateTerm(operationContext(), _term));
- replCoord->setMyLastAppliedOpTimeAndWallTime(
- repl::OpTimeAndWallTime(repl::OpTime(Timestamp(1, 1), _term), Date_t()));
-
- _registry->onStepUpComplete(operationContext(), _term);
- }
-
-protected:
- repl::PrimaryOnlyServiceRegistry* _registry;
- long long _term = 0;
-};
TEST_F(ReshardingDonorRecipientCommonTest, CreateDonorServiceInstance) {
OperationContext* opCtx = operationContext();
@@ -341,7 +134,6 @@ TEST_F(ReshardingDonorRecipientCommonTest,
resharding::tryGetReshardingStateMachine<ReshardingDonorService,
ReshardingDonorService::DonorStateMachine,
ReshardingDonorDocument>(opCtx, kReshardingUUID);
-
ASSERT(donorStateMachine == boost::none);
}
diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h
new file mode 100644
index 00000000000..e284b721667
--- /dev/null
+++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h
@@ -0,0 +1,258 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/catalog_raii.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/repl/wait_for_majority_service.h"
+#include "mongo/db/s/collection_sharding_runtime.h"
+#include "mongo/db/s/operation_sharding_state.h"
+#include "mongo/db/s/resharding/resharding_donor_recipient_common.h"
+#include "mongo/db/s/shard_server_test_fixture.h"
+#include "mongo/unittest/death_test.h"
+#include "mongo/util/fail_point.h"
+
+namespace mongo {
+namespace {
+
+using namespace fmt::literals;
+
+class ReshardingDonorRecipientCommonInternalsTest : public ShardServerTestFixture {
+public:
+ const UUID kExistingUUID = UUID::gen();
+ const NamespaceString kOriginalNss = NamespaceString("db", "foo");
+
+ const NamespaceString kTemporaryReshardingNss =
+ constructTemporaryReshardingNss("db", kExistingUUID);
+ const std::string kOriginalShardKey = "oldKey";
+ const BSONObj kOriginalShardKeyPattern = BSON(kOriginalShardKey << 1);
+ const std::string kReshardingKey = "newKey";
+ const BSONObj kReshardingKeyPattern = BSON(kReshardingKey << 1);
+ const OID kOriginalEpoch = OID::gen();
+ const OID kReshardingEpoch = OID::gen();
+ const UUID kReshardingUUID = UUID::gen();
+
+ const ShardId kShardOne = ShardId("shardOne");
+ const ShardId kShardTwo = ShardId("shardTwo");
+
+ const std::vector<ShardId> kShardIds = {kShardOne, kShardTwo};
+
+ const Timestamp kFetchTimestamp = Timestamp(1, 0);
+
+protected:
+ CollectionMetadata makeShardedMetadataForOriginalCollection(OperationContext* opCtx) {
+ return makeShardedMetadata(opCtx,
+ kOriginalNss,
+ kOriginalShardKey,
+ kOriginalShardKeyPattern,
+ kExistingUUID,
+ kOriginalEpoch);
+ }
+
+ CollectionMetadata makeShardedMetadataForTemporaryReshardingCollection(
+ OperationContext* opCtx) {
+ return makeShardedMetadata(opCtx,
+ kTemporaryReshardingNss,
+ kReshardingKey,
+ kReshardingKeyPattern,
+ kReshardingUUID,
+ kReshardingEpoch);
+ }
+
+ CollectionMetadata makeShardedMetadata(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const std::string& shardKey,
+ const BSONObj& shardKeyPattern,
+ const UUID& uuid,
+ const OID& epoch) {
+ auto range = ChunkRange(BSON(shardKey << MINKEY), BSON(shardKey << MAXKEY));
+ auto chunk =
+ ChunkType(nss, std::move(range), ChunkVersion(1, 0, epoch, boost::none), kShardTwo);
+ ChunkManager cm(
+ kShardOne,
+ DatabaseVersion(uuid),
+ makeStandaloneRoutingTableHistory(RoutingTableHistory::makeNew(nss,
+ uuid,
+ shardKeyPattern,
+ nullptr,
+ false,
+ epoch,
+ boost::none,
+ boost::none,
+ true,
+ {std::move(chunk)})),
+ boost::none);
+
+ if (!OperationShardingState::isOperationVersioned(opCtx)) {
+ const auto version = cm.getVersion(kShardOne);
+ BSONObjBuilder builder;
+ version.appendToCommand(&builder);
+
+ auto& oss = OperationShardingState::get(opCtx);
+ oss.initializeClientRoutingVersionsFromCommand(nss, builder.obj());
+ }
+
+ return CollectionMetadata(std::move(cm), kShardOne);
+ }
+
+ ReshardingFields createCommonReshardingFields(const UUID& reshardingUUID,
+ CoordinatorStateEnum state) {
+ auto fields = ReshardingFields(reshardingUUID);
+ fields.setState(state);
+ return fields;
+ };
+
+ void appendDonorFieldsToReshardingFields(ReshardingFields& fields,
+ const BSONObj& reshardingKey) {
+ fields.setDonorFields(TypeCollectionDonorFields(reshardingKey));
+ }
+
+ void appendRecipientFieldsToReshardingFields(
+ ReshardingFields& fields,
+ const std::vector<ShardId> donorShardIds,
+ const UUID& existingUUID,
+ const NamespaceString& originalNss,
+ const boost::optional<Timestamp>& fetchTimestamp = boost::none) {
+ auto recipientFields =
+ TypeCollectionRecipientFields(donorShardIds, existingUUID, originalNss);
+ emplaceFetchTimestampIfExists(recipientFields, fetchTimestamp);
+ fields.setRecipientFields(std::move(recipientFields));
+ }
+
+ template <class ReshardingDocument>
+ void assertCommonDocFieldsMatchReshardingFields(const NamespaceString& nss,
+ const UUID& reshardingUUID,
+ const UUID& existingUUID,
+ const BSONObj& reshardingKey,
+ const ReshardingDocument& reshardingDoc) {
+ ASSERT_EQ(reshardingDoc.get_id(), reshardingUUID);
+ ASSERT_EQ(reshardingDoc.getNss(), nss);
+ ASSERT_EQ(reshardingDoc.getExistingUUID(), existingUUID);
+ ASSERT_BSONOBJ_EQ(reshardingDoc.getReshardingKey().toBSON(), reshardingKey);
+ }
+
+ void assertDonorDocMatchesReshardingFields(const NamespaceString& nss,
+ const UUID& existingUUID,
+ const ReshardingFields& reshardingFields,
+ const ReshardingDonorDocument& donorDoc) {
+ assertCommonDocFieldsMatchReshardingFields<ReshardingDonorDocument>(
+ nss,
+ reshardingFields.getUuid(),
+ existingUUID,
+ reshardingFields.getDonorFields()->getReshardingKey().toBSON(),
+ donorDoc);
+ ASSERT(donorDoc.getState() == DonorStateEnum::kPreparingToDonate);
+ ASSERT(donorDoc.getMinFetchTimestamp() == boost::none);
+ }
+
+ void assertRecipientDocMatchesReshardingFields(
+ const CollectionMetadata& metadata,
+ const ReshardingFields& reshardingFields,
+ const ReshardingRecipientDocument& recipientDoc) {
+ assertCommonDocFieldsMatchReshardingFields<ReshardingRecipientDocument>(
+ reshardingFields.getRecipientFields()->getOriginalNamespace(),
+ reshardingFields.getUuid(),
+ reshardingFields.getRecipientFields()->getExistingUUID(),
+ metadata.getShardKeyPattern().toBSON(),
+ recipientDoc);
+
+ ASSERT(recipientDoc.getState() == RecipientStateEnum::kCreatingCollection);
+ ASSERT(recipientDoc.getFetchTimestamp() ==
+ reshardingFields.getRecipientFields()->getFetchTimestamp());
+
+ auto donorShardIds = reshardingFields.getRecipientFields()->getDonorShardIds();
+ auto donorShardIdsSet = std::set<ShardId>(donorShardIds.begin(), donorShardIds.end());
+
+ for (const auto& donorShardMirroringEntry : recipientDoc.getDonorShardsMirroring()) {
+ ASSERT_EQ(donorShardMirroringEntry.getMirroring(), false);
+ auto reshardingFieldsDonorShardId =
+ donorShardIdsSet.find(donorShardMirroringEntry.getId());
+ ASSERT(reshardingFieldsDonorShardId != donorShardIdsSet.end());
+ donorShardIdsSet.erase(reshardingFieldsDonorShardId);
+ }
+
+ ASSERT(donorShardIdsSet.empty());
+ }
+};
+
+class ReshardingDonorRecipientCommonTest : public ReshardingDonorRecipientCommonInternalsTest {
+public:
+ void setUp() override {
+ ShardServerTestFixture::setUp();
+
+ WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext());
+
+ _registry = repl::PrimaryOnlyServiceRegistry::get(getServiceContext());
+
+ std::unique_ptr<ReshardingDonorService> donorService =
+ std::make_unique<ReshardingDonorService>(getServiceContext());
+ _registry->registerService(std::move(donorService));
+
+ std::unique_ptr<ReshardingRecipientService> recipientService =
+ std::make_unique<ReshardingRecipientService>(getServiceContext());
+ _registry->registerService(std::move(recipientService));
+ _registry->onStartup(operationContext());
+
+ stepUp();
+ }
+
+ void tearDown() override {
+ WaitForMajorityService::get(getServiceContext()).shutDown();
+
+ Grid::get(operationContext())->getExecutorPool()->shutdownAndJoin();
+
+ _registry->onShutdown();
+
+ ShardServerTestFixture::tearDown();
+ }
+
+ void stepUp() {
+ auto replCoord = repl::ReplicationCoordinator::get(getServiceContext());
+
+ // Advance term
+ _term++;
+
+ ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY));
+ ASSERT_OK(replCoord->updateTerm(operationContext(), _term));
+ replCoord->setMyLastAppliedOpTimeAndWallTime(
+ repl::OpTimeAndWallTime(repl::OpTime(Timestamp(1, 1), _term), Date_t()));
+
+ _registry->onStepUpComplete(operationContext(), _term);
+ }
+
+protected:
+ repl::PrimaryOnlyServiceRegistry* _registry;
+ long long _term = 0;
+};
+
+} // namespace
+
+} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp
index 15fa5aa4a2c..0e81debce95 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp
@@ -118,6 +118,12 @@ void ensureFulfilledPromise(WithLock lk, SharedPromise<void>& sp) {
sp.emplaceValue();
}
}
+
+void ensureFulfilledPromise(WithLock lk, SharedPromise<void>& sp, Status error) {
+ if (!sp.getFuture().isReady()) {
+ sp.setError(error);
+ }
+}
} // namespace
std::shared_ptr<repl::PrimaryOnlyService::Instance> ReshardingDonorService::constructInstance(
@@ -200,6 +206,10 @@ void ReshardingDonorService::DonorStateMachine::interrupt(Status status) {
_allRecipientsDoneApplying.setError(status);
}
+ if (!_finalOplogEntriesWritten.getFuture().isReady()) {
+ _finalOplogEntriesWritten.setError(status);
+ }
+
if (!_coordinatorHasCommitted.getFuture().isReady()) {
_coordinatorHasCommitted.setError(status);
}
@@ -296,9 +306,93 @@ void ReshardingDonorService::DonorStateMachine::
return;
}
+ {
+ const auto& nss = _donorDoc.getNss();
+ const auto& nssUUID = _donorDoc.getExistingUUID();
+ const auto& reshardingUUID = _donorDoc.get_id();
+ auto opCtx = cc().makeOperationContext();
+ auto rawOpCtx = opCtx.get();
+
+ auto generateOplogEntry = [&](ShardId destinedRecipient) {
+ repl::MutableOplogEntry oplog;
+ oplog.setNss(nss);
+ oplog.setOpType(repl::OpTypeEnum::kNoop);
+ oplog.setUuid(nssUUID);
+ oplog.setDestinedRecipient(destinedRecipient);
+ oplog.setObject(
+ BSON("msg" << fmt::format("Writes to {} are temporarily blocked for resharding.",
+ nss.toString())));
+ oplog.setObject2(
+ BSON("type" << kReshardFinalOpLogType << "reshardingUUID" << reshardingUUID));
+ oplog.setOpTime(OplogSlot());
+ oplog.setWallClockTime(opCtx->getServiceContext()->getFastClockSource()->now());
+ return oplog;
+ };
+
+ try {
+ Timer latency;
+ const auto& tempNss = constructTemporaryReshardingNss(_donorDoc.getNss().db(),
+ _donorDoc.getExistingUUID());
+ auto* catalogCache = Grid::get(rawOpCtx)->catalogCache();
+ auto cm = uassertStatusOK(catalogCache->getCollectionRoutingInfo(rawOpCtx, tempNss));
+
+ uassert(ErrorCodes::NamespaceNotSharded,
+ str::stream() << "Expected collection " << tempNss << " to be sharded",
+ cm.isSharded());
+
+ std::set<ShardId> recipients;
+ cm.getAllShardIds(&recipients);
+
+ for (const auto& recipient : recipients) {
+ auto oplog = generateOplogEntry(recipient);
+ writeConflictRetry(
+ rawOpCtx,
+ "ReshardingBlockWritesOplog",
+ NamespaceString::kRsOplogNamespace.ns(),
+ [&] {
+ AutoGetOplog oplogWrite(rawOpCtx, OplogAccessMode::kWrite);
+ WriteUnitOfWork wunit(rawOpCtx);
+ const auto& oplogOpTime = repl::logOp(rawOpCtx, &oplog);
+ uassert(5279507,
+ str::stream()
+ << "Failed to create new oplog entry for oplog with opTime: "
+ << oplog.getOpTime().toString() << ": "
+ << redact(oplog.toBSON()),
+ !oplogOpTime.isNull());
+ wunit.commit();
+ });
+ }
+
+ {
+ stdx::lock_guard<Latch> lg(_mutex);
+ LOGV2_DEBUG(5279504,
+ 0,
+ "Committed oplog entries to temporarily block writes for resharding",
+ "namespace"_attr = nss,
+ "reshardingUUID"_attr = reshardingUUID,
+ "numRecipients"_attr = recipients.size(),
+ "duration"_attr = duration_cast<Milliseconds>(latency.elapsed()));
+ ensureFulfilledPromise(lg, _finalOplogEntriesWritten);
+ }
+ } catch (const DBException& e) {
+ const auto& status = e.toStatus();
+ stdx::lock_guard<Latch> lg(_mutex);
+ LOGV2_ERROR(5279508,
+ "Exception while writing resharding final oplog entries",
+ "reshardingUUID"_attr = reshardingUUID,
+ "error"_attr = status);
+ ensureFulfilledPromise(lg, _finalOplogEntriesWritten, status);
+ uassertStatusOK(status);
+ }
+ }
+
_transitionState(DonorStateEnum::kMirroring);
}
+SharedSemiFuture<void> ReshardingDonorService::DonorStateMachine::awaitFinalOplogEntriesWritten() {
+ return _finalOplogEntriesWritten.getFuture();
+}
+
ExecutorFuture<void>
ReshardingDonorService::DonorStateMachine::_awaitCoordinatorHasCommittedThenTransitionToDropping(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
@@ -354,6 +448,13 @@ void ReshardingDonorService::DonorStateMachine::_transitionState(
ReshardingDonorDocument replacementDoc(_donorDoc);
replacementDoc.setState(endState);
emplaceMinFetchTimestampIfExists(replacementDoc, minFetchTimestamp);
+
+ LOGV2_INFO(5279505,
+ "Transition resharding donor state",
+ "newState"_attr = DonorState_serializer(replacementDoc.getState()),
+ "oldState"_attr = DonorState_serializer(_donorDoc.getState()),
+ "reshardingUUID"_attr = _donorDoc.get_id());
+
_updateDonorDocument(std::move(replacementDoc));
}
diff --git a/src/mongo/db/s/resharding/resharding_donor_service.h b/src/mongo/db/s/resharding/resharding_donor_service.h
index f131f8d1d1c..efbc4e78ae3 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service.h
+++ b/src/mongo/db/s/resharding/resharding_donor_service.h
@@ -97,6 +97,8 @@ public:
void onReshardingFieldsChanges(const TypeCollectionReshardingFields& reshardingFields);
+ SharedSemiFuture<void> awaitFinalOplogEntriesWritten();
+
private:
// The following functions correspond to the actions to take at a particular donor state.
void _transitionToPreparingToDonate();
@@ -155,6 +157,8 @@ private:
SharedPromise<void> _allRecipientsDoneApplying;
+ SharedPromise<void> _finalOplogEntriesWritten;
+
SharedPromise<void> _coordinatorHasCommitted;
SharedPromise<void> _completionPromise;
diff --git a/src/mongo/db/s/resharding/resharding_donor_service_test.cpp b/src/mongo/db/s/resharding/resharding_donor_service_test.cpp
new file mode 100644
index 00000000000..b16c9c44dc6
--- /dev/null
+++ b/src/mongo/db/s/resharding/resharding_donor_service_test.cpp
@@ -0,0 +1,247 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
+
+#include "mongo/platform/basic.h"
+
+#include <boost/optional.hpp>
+
+#include "mongo/client/remote_command_targeter_mock.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/logical_session_cache_noop.h"
+#include "mongo/db/repl/storage_interface_impl.h"
+#include "mongo/db/repl/storage_interface_mock.h"
+#include "mongo/db/s/config/config_server_test_fixture.h"
+#include "mongo/db/s/resharding/resharding_coordinator_service.h"
+#include "mongo/db/s/resharding/resharding_donor_recipient_common_test.h"
+#include "mongo/db/s/resharding/resharding_donor_service.h"
+#include "mongo/db/s/resharding_util.h"
+#include "mongo/db/s/shard_metadata_util.h"
+#include "mongo/db/s/transaction_coordinator_service.h"
+#include "mongo/db/session_catalog_mongod.h"
+#include "mongo/executor/remote_command_request.h"
+#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
+#include "mongo/logv2/log.h"
+#include "mongo/s/catalog/sharding_catalog_client_mock.h"
+#include "mongo/s/catalog/type_collection.h"
+#include "mongo/s/catalog/type_shard.h"
+#include "mongo/s/catalog_cache_loader_mock.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/clock_source_mock.h"
+
+namespace mongo {
+namespace {
+auto reshardingTempNss(const UUID& existingUUID) {
+ return NamespaceString(fmt::format("db.system.resharding.{}", existingUUID.toString()));
+}
+
+class ReshardingDonorServiceTest : public ReshardingDonorRecipientCommonTest {
+protected:
+ class ThreeRecipientsCatalogClient final : public ShardingCatalogClientMock {
+ public:
+ ThreeRecipientsCatalogClient(UUID existingUUID, std::vector<ShardId> recipients)
+ : ShardingCatalogClientMock(nullptr),
+ _existingUUID(std::move(existingUUID)),
+ _recipients(std::move(recipients)) {}
+
+ // Makes one chunk object per shard; the actual key ranges not relevant for the test.
+ // The output is deterministic since this function is also used to provide data to the
+ // catalog cache loader.
+ static std::vector<ChunkType> makeChunks(const NamespaceString& nss,
+ const std::vector<ShardId> shards,
+ const ChunkVersion& chunkVersion) {
+ std::vector<ChunkType> chunks;
+ constexpr auto kKeyDelta = 1000;
+
+ int curKey = 0;
+ for (size_t i = 0; i < shards.size(); ++i, curKey += kKeyDelta) {
+ const auto min = (i == 0) ? BSON("a" << MINKEY) : BSON("a" << curKey);
+ const auto max = (i == shards.size() - 1) ? BSON("a" << MAXKEY)
+ : BSON("a" << curKey + kKeyDelta);
+ chunks.push_back(ChunkType(nss, ChunkRange(min, max), chunkVersion, shards[i]));
+ }
+
+ return chunks;
+ }
+
+ StatusWith<std::vector<ChunkType>> getChunks(OperationContext* opCtx,
+ const BSONObj& filter,
+ const BSONObj& sort,
+ boost::optional<int> limit,
+ repl::OpTime* opTime,
+ repl::ReadConcernLevel readConcern) override {
+ auto version = ChunkVersion(1, 0, OID::gen(), boost::none /* timestamp */);
+ return makeChunks(reshardingTempNss(_existingUUID), _recipients, version);
+ }
+
+ StatusWith<repl::OpTimeWith<std::vector<ShardType>>> getAllShards(
+ OperationContext* opCtx, repl::ReadConcernLevel readConcern) override {
+ auto shardTypes = [this]() {
+ std::vector<ShardType> result;
+ std::transform(
+ kRecipientShards.begin(),
+ kRecipientShards.end(),
+ std::back_inserter(result),
+ [&](const ShardId& id) { return ShardType(id.toString(), id.toString()); });
+ return result;
+ };
+ return repl::OpTimeWith<std::vector<ShardType>>(shardTypes());
+ }
+
+ UUID _existingUUID;
+ ChunkVersion maxCollVersion = ChunkVersion(0, 0, OID::gen(), boost::none);
+ std::vector<ShardId> _recipients;
+ };
+
+ void setUp() override {
+ _reshardingUUID = UUID::gen();
+
+ auto mockLoader = std::make_unique<CatalogCacheLoaderMock>();
+ _mockCatalogCacheLoader = mockLoader.get();
+
+ // Must be called prior to setUp so that the catalog cache is wired properly.
+ setCatalogCacheLoader(std::move(mockLoader));
+ ReshardingDonorRecipientCommonTest::setUp();
+
+ mockCatalogCacheLoader(
+ NamespaceString(kReshardNs), reshardingTempNss(kExistingUUID), kRecipientShards);
+ }
+
+
+ std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient(
+ std::unique_ptr<DistLockManager> distLockManager) {
+ auto mockClient = std::make_unique<ThreeRecipientsCatalogClient>(
+ uassertStatusOK(UUID::parse(kExistingUUID.toString())), kRecipientShards);
+ return mockClient;
+ }
+
+ std::shared_ptr<ReshardingDonorService::DonorStateMachine> getStateMachineInstace(
+ OperationContext* opCtx, ReshardingDonorDocument initialState) {
+ auto instanceId = BSON(ReshardingDonorDocument::k_idFieldName << initialState.get_id());
+ auto registry = repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext());
+ auto service = registry->lookupServiceByName(ReshardingDonorService::kServiceName);
+ return ReshardingDonorService::DonorStateMachine::getOrCreate(
+ opCtx, service, initialState.toBSON());
+ }
+
+ std::vector<BSONObj> getOplogWritesForDonorDocument(const ReshardingDonorDocument& doc) {
+ auto reshardNs = doc.getNss().toString();
+ DBDirectClient client(operationContext());
+ auto result = client.query(NamespaceString(NamespaceString::kRsOplogNamespace.ns()),
+ BSON("ns" << reshardNs));
+ std::vector<BSONObj> results;
+ while (result->more()) {
+ results.push_back(result->next());
+ }
+ return results;
+ }
+
+ void mockCatalogCacheLoader(const NamespaceString& nss,
+ const NamespaceString tempNss,
+ const std::vector<ShardId>& recipients) {
+ auto version = ChunkVersion(1, 0, OID::gen(), boost::none /* timestamp */);
+ CollectionType coll(nss, version.epoch(), Date_t::now(), UUID::gen());
+ coll.setKeyPattern(BSON("y" << 1));
+ coll.setUnique(false);
+ coll.setAllowMigrations(false);
+
+ _mockCatalogCacheLoader->setDatabaseRefreshReturnValue(
+ DatabaseType(nss.toString(), recipients.front(), true, DatabaseVersion(UUID::gen())));
+ _mockCatalogCacheLoader->setCollectionRefreshValues(
+ kTemporaryReshardingNss,
+ coll,
+ ThreeRecipientsCatalogClient::makeChunks(tempNss, recipients, version),
+ boost::none);
+ }
+
+ boost::optional<UUID> _reshardingUUID;
+ boost::optional<UUID> _existingUUID;
+ CatalogCacheLoaderMock* _mockCatalogCacheLoader;
+
+ static const inline auto kRecipientShards =
+ std::vector<ShardId>{ShardId("shard1"), ShardId("shard2"), ShardId("shard3")};
+ static constexpr auto kExpectedO2Type = "reshardFinalOp"_sd;
+ static constexpr auto kReshardNs = "db.foo"_sd;
+};
+
+TEST_F(ReshardingDonorServiceTest, ShouldWriteFinalOpLogEntryAfterTransitionToPreparingToMirror) {
+ ReshardingDonorDocument doc(DonorStateEnum::kPreparingToMirror);
+ CommonReshardingMetadata metadata(kReshardingUUID,
+ mongo::NamespaceString(kReshardNs),
+ kExistingUUID,
+ KeyPattern(kReshardingKeyPattern));
+ doc.setCommonReshardingMetadata(metadata);
+ doc.getMinFetchTimestampStruct().setMinFetchTimestamp(Timestamp{0xf00});
+
+ auto donorStateMachine = getStateMachineInstace(operationContext(), doc);
+ ASSERT(donorStateMachine);
+
+ const auto expectedRecipients =
+ std::set<ShardId>(kRecipientShards.begin(), kRecipientShards.end());
+ ASSERT(expectedRecipients.size());
+
+ donorStateMachine->awaitFinalOplogEntriesWritten().get();
+
+ const auto oplogs = getOplogWritesForDonorDocument(doc);
+ ASSERT(oplogs.size() == expectedRecipients.size());
+
+ std::set<ShardId> actualRecipients;
+ for (const auto& oplog : oplogs) {
+ LOGV2_INFO(5279502, "verify retrieved oplog document", "document"_attr = oplog);
+
+ ASSERT(oplog.hasField("ns"));
+ auto actualNs = oplog.getStringField("ns");
+ ASSERT_EQUALS(kReshardNs, actualNs);
+
+ ASSERT(oplog.hasField("o2"));
+ auto o2 = oplog.getObjectField("o2");
+ ASSERT(o2.hasField("type"));
+ auto actualType = StringData(o2.getStringField("type"));
+ ASSERT_EQUALS(kExpectedO2Type, actualType);
+ ASSERT(o2.hasField("reshardingUUID"));
+ auto actualReshardingUUIDBson = o2.getField("reshardingUUID");
+ auto actualReshardingUUID = UUID::parse(actualReshardingUUIDBson);
+ ASSERT_EQUALS(doc.get_id(), actualReshardingUUID);
+
+ ASSERT(oplog.hasField("ui"));
+ auto actualUiBson = oplog.getField("ui");
+ auto actualUi = UUID::parse(actualUiBson);
+ ASSERT_EQUALS(kExistingUUID, actualUi);
+
+ ASSERT(oplog.hasField("destinedRecipient"));
+ auto actualRecipient = oplog.getStringField("destinedRecipient");
+ actualRecipients.insert(ShardId(actualRecipient));
+ }
+
+ ASSERT(expectedRecipients == actualRecipients);
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
index df950d51542..79b029a9a92 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
@@ -287,9 +287,7 @@ public:
dataColl.getCollection()->uuid(),
BSON("msg" << fmt::format("Writes to {} are temporarily blocked for resharding.",
dataColl.getCollection()->ns().toString())),
- BSON("type"
- << "reshardFinalOp"
- << "reshardingUUID" << _reshardingUUID),
+ BSON("type" << kReshardFinalOpLogType << "reshardingUUID" << _reshardingUUID),
boost::none,
boost::none,
boost::none,
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
index 9e7a4ffac0b..76a29f4d0cc 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
@@ -397,7 +397,7 @@ void ReshardingRecipientService::RecipientStateMachine::_applyThenTransitionToSt
// Unless a test is prepared to write the final oplog entries itself, without this interrupt(),
// the futures returned by ReshardingOplogFetcher::schedule() would never become ready.
//
- // TODO SERVER-52795: Remove once the donor shards write the final oplog entry themselves.
+ // TODO SERVER-53372: Remove once the donor shards write the final oplog entry themselves.
if (resharding::gReshardingTempInterruptBeforeOplogApplication) {
interrupt({ErrorCodes::InternalError, "Artificial interruption to enable jsTests"});
}
@@ -524,6 +524,13 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionState(
RecipientStateEnum endState, boost::optional<Timestamp> fetchTimestamp) {
ReshardingRecipientDocument replacementDoc(_recipientDoc);
replacementDoc.setState(endState);
+
+ LOGV2_INFO(5279506,
+ "Transition resharding recipient state",
+ "newState"_attr = RecipientState_serializer(replacementDoc.getState()),
+ "oldState"_attr = RecipientState_serializer(_recipientDoc.getState()),
+ "reshardingUUID"_attr = _recipientDoc.get_id());
+
if (endState == RecipientStateEnum::kCreatingCollection) {
_insertRecipientDocument(replacementDoc);
return;