diff options
author | LaMont Nelson <lamont.nelson@mongodb.com> | 2020-12-09 04:35:38 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-12-23 03:49:04 +0000 |
commit | e3372897dcea110233204133a8e62141ba2db70d (patch) | |
tree | 07a5ec45bcee28d71da45e326d0beaa0fca3dcc6 /src/mongo/db/s/resharding | |
parent | 288354349eb98af037fe157220d3e161a58703b7 (diff) | |
download | mongo-e3372897dcea110233204133a8e62141ba2db70d.tar.gz |
SERVER-52795 Resharding donors write final oplog entry upon entering the preparing-to-mirror state.
Diffstat (limited to 'src/mongo/db/s/resharding')
8 files changed, 623 insertions, 217 deletions
diff --git a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp index b999afdf103..e29851f0d29 100644 --- a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp @@ -83,9 +83,8 @@ public: ReshardingDonorOplogId oplogId(id, id); const BSONObj oField(BSON("msg" << "Created temporary resharding collection")); - const BSONObj o2Field(BSON("type" - << "reshardFinalOp" - << "reshardingUUID" << UUID::gen())); + const BSONObj o2Field( + BSON("type" << kReshardFinalOpLogType << "reshardingUUID" << UUID::gen())); return makeOplog(_crudNss, _uuid, repl::OpTypeEnum::kNoop, oField, o2Field, oplogId); } diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp index 3894b722254..c89a11da1a3 100644 --- a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp @@ -29,6 +29,8 @@ #include "mongo/platform/basic.h" +#include "mongo/db/s/resharding/resharding_donor_recipient_common_test.h" + #include "mongo/db/catalog_raii.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" @@ -45,166 +47,6 @@ namespace { using namespace fmt::literals; -class ReshardingDonorRecipientCommonInternalsTest : public ShardServerTestFixture { -public: - const UUID kExistingUUID = UUID::gen(); - const NamespaceString kOriginalNss = NamespaceString("db", "foo"); - - const NamespaceString kTemporaryReshardingNss = - constructTemporaryReshardingNss("db", kExistingUUID); - const std::string kOriginalShardKey = "oldKey"; - const BSONObj kOriginalShardKeyPattern = BSON(kOriginalShardKey << 1); - const std::string kReshardingKey = "newKey"; - const BSONObj kReshardingKeyPattern = BSON(kReshardingKey << 1); - const OID kOriginalEpoch = OID::gen(); - const OID kReshardingEpoch = OID::gen(); - const UUID kReshardingUUID = UUID::gen(); - - const ShardId kShardOne = ShardId("shardOne"); - const ShardId kShardTwo = ShardId("shardTwo"); - - const std::vector<ShardId> kShardIds = {kShardOne, kShardTwo}; - - const Timestamp kFetchTimestamp = Timestamp(1, 0); - -protected: - CollectionMetadata makeShardedMetadataForOriginalCollection(OperationContext* opCtx) { - return makeShardedMetadata(opCtx, - kOriginalNss, - kOriginalShardKey, - kOriginalShardKeyPattern, - kExistingUUID, - kOriginalEpoch); - } - - CollectionMetadata makeShardedMetadataForTemporaryReshardingCollection( - OperationContext* opCtx) { - return makeShardedMetadata(opCtx, - kTemporaryReshardingNss, - kReshardingKey, - kReshardingKeyPattern, - kReshardingUUID, - kReshardingEpoch); - } - - CollectionMetadata makeShardedMetadata(OperationContext* opCtx, - const NamespaceString& nss, - const std::string& shardKey, - const BSONObj& shardKeyPattern, - const UUID& uuid, - const OID& epoch) { - auto range = ChunkRange(BSON(shardKey << MINKEY), BSON(shardKey << MAXKEY)); - auto chunk = ChunkType(nss, - std::move(range), - ChunkVersion(1, 0, epoch, boost::none /* timestamp */), - kShardTwo); - ChunkManager cm(kShardOne, - DatabaseVersion(uuid), - makeStandaloneRoutingTableHistory( - RoutingTableHistory::makeNew(nss, - uuid, - shardKeyPattern, - nullptr, - false, - epoch, - boost::none /* timestamp */, - boost::none, - true, - {std::move(chunk)})), - boost::none); - - if (!OperationShardingState::isOperationVersioned(opCtx)) { - const auto version = cm.getVersion(kShardOne); - BSONObjBuilder builder; - version.appendToCommand(&builder); - - auto& oss = OperationShardingState::get(opCtx); - oss.initializeClientRoutingVersionsFromCommand(nss, builder.obj()); - } - - return CollectionMetadata(std::move(cm), kShardOne); - } - - ReshardingFields createCommonReshardingFields(const UUID& reshardingUUID, - CoordinatorStateEnum state) { - auto fields = ReshardingFields(reshardingUUID); - fields.setState(state); - return fields; - }; - - void appendDonorFieldsToReshardingFields(ReshardingFields& fields, - const BSONObj& reshardingKey) { - fields.setDonorFields(TypeCollectionDonorFields(reshardingKey)); - } - - void appendRecipientFieldsToReshardingFields( - ReshardingFields& fields, - const std::vector<ShardId> donorShardIds, - const UUID& existingUUID, - const NamespaceString& originalNss, - const boost::optional<Timestamp>& fetchTimestamp = boost::none) { - auto recipientFields = - TypeCollectionRecipientFields(donorShardIds, existingUUID, originalNss); - emplaceFetchTimestampIfExists(recipientFields, fetchTimestamp); - fields.setRecipientFields(std::move(recipientFields)); - } - - template <class ReshardingDocument> - void assertCommonDocFieldsMatchReshardingFields(const NamespaceString& nss, - const UUID& reshardingUUID, - const UUID& existingUUID, - const BSONObj& reshardingKey, - const ReshardingDocument& reshardingDoc) { - ASSERT_EQ(reshardingDoc.get_id(), reshardingUUID); - ASSERT_EQ(reshardingDoc.getNss(), nss); - ASSERT_EQ(reshardingDoc.getExistingUUID(), existingUUID); - ASSERT_BSONOBJ_EQ(reshardingDoc.getReshardingKey().toBSON(), reshardingKey); - } - - void assertDonorDocMatchesReshardingFields(const NamespaceString& nss, - const UUID& existingUUID, - const ReshardingFields& reshardingFields, - const ReshardingDonorDocument& donorDoc) { - assertCommonDocFieldsMatchReshardingFields<ReshardingDonorDocument>( - nss, - reshardingFields.getUuid(), - existingUUID, - reshardingFields.getDonorFields()->getReshardingKey().toBSON(), - donorDoc); - ASSERT(donorDoc.getState() == DonorStateEnum::kPreparingToDonate); - ASSERT(donorDoc.getMinFetchTimestamp() == boost::none); - } - - void assertRecipientDocMatchesReshardingFields( - const CollectionMetadata& metadata, - const ReshardingFields& reshardingFields, - const ReshardingRecipientDocument& recipientDoc) { - assertCommonDocFieldsMatchReshardingFields<ReshardingRecipientDocument>( - reshardingFields.getRecipientFields()->getOriginalNamespace(), - reshardingFields.getUuid(), - reshardingFields.getRecipientFields()->getExistingUUID(), - metadata.getShardKeyPattern().toBSON(), - recipientDoc); - - ASSERT(recipientDoc.getState() == RecipientStateEnum::kCreatingCollection); - ASSERT(recipientDoc.getFetchTimestamp() == - reshardingFields.getRecipientFields()->getFetchTimestamp()); - - auto donorShardIds = reshardingFields.getRecipientFields()->getDonorShardIds(); - auto donorShardIdsSet = std::set<ShardId>(donorShardIds.begin(), donorShardIds.end()); - - for (const auto& donorShardMirroringEntry : recipientDoc.getDonorShardsMirroring()) { - ASSERT_EQ(donorShardMirroringEntry.getMirroring(), false); - auto reshardingFieldsDonorShardId = - donorShardIdsSet.find(donorShardMirroringEntry.getId()); - ASSERT(reshardingFieldsDonorShardId != donorShardIdsSet.end()); - donorShardIdsSet.erase(reshardingFieldsDonorShardId); - } - - ASSERT(donorShardIdsSet.empty()); - } -}; - TEST_F(ReshardingDonorRecipientCommonInternalsTest, ConstructDonorDocumentFromReshardingFields) { OperationContext* opCtx = operationContext(); auto metadata = makeShardedMetadataForOriginalCollection(opCtx); @@ -234,55 +76,6 @@ TEST_F(ReshardingDonorRecipientCommonInternalsTest, assertRecipientDocMatchesReshardingFields(metadata, reshardingFields, recipientDoc); } -class ReshardingDonorRecipientCommonTest : public ReshardingDonorRecipientCommonInternalsTest { -public: - void setUp() override { - ShardServerTestFixture::setUp(); - - WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext()); - - _registry = repl::PrimaryOnlyServiceRegistry::get(getServiceContext()); - - std::unique_ptr<ReshardingDonorService> donorService = - std::make_unique<ReshardingDonorService>(getServiceContext()); - _registry->registerService(std::move(donorService)); - - std::unique_ptr<ReshardingRecipientService> recipientService = - std::make_unique<ReshardingRecipientService>(getServiceContext()); - _registry->registerService(std::move(recipientService)); - _registry->onStartup(operationContext()); - - stepUp(); - } - - void tearDown() override { - WaitForMajorityService::get(getServiceContext()).shutDown(); - - Grid::get(operationContext())->getExecutorPool()->shutdownAndJoin(); - - _registry->onShutdown(); - - ShardServerTestFixture::tearDown(); - } - - void stepUp() { - auto replCoord = repl::ReplicationCoordinator::get(getServiceContext()); - - // Advance term - _term++; - - ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY)); - ASSERT_OK(replCoord->updateTerm(operationContext(), _term)); - replCoord->setMyLastAppliedOpTimeAndWallTime( - repl::OpTimeAndWallTime(repl::OpTime(Timestamp(1, 1), _term), Date_t())); - - _registry->onStepUpComplete(operationContext(), _term); - } - -protected: - repl::PrimaryOnlyServiceRegistry* _registry; - long long _term = 0; -}; TEST_F(ReshardingDonorRecipientCommonTest, CreateDonorServiceInstance) { OperationContext* opCtx = operationContext(); @@ -341,7 +134,6 @@ TEST_F(ReshardingDonorRecipientCommonTest, resharding::tryGetReshardingStateMachine<ReshardingDonorService, ReshardingDonorService::DonorStateMachine, ReshardingDonorDocument>(opCtx, kReshardingUUID); - ASSERT(donorStateMachine == boost::none); } diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h new file mode 100644 index 00000000000..e284b721667 --- /dev/null +++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h @@ -0,0 +1,258 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/catalog_raii.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/repl/wait_for_majority_service.h" +#include "mongo/db/s/collection_sharding_runtime.h" +#include "mongo/db/s/operation_sharding_state.h" +#include "mongo/db/s/resharding/resharding_donor_recipient_common.h" +#include "mongo/db/s/shard_server_test_fixture.h" +#include "mongo/unittest/death_test.h" +#include "mongo/util/fail_point.h" + +namespace mongo { +namespace { + +using namespace fmt::literals; + +class ReshardingDonorRecipientCommonInternalsTest : public ShardServerTestFixture { +public: + const UUID kExistingUUID = UUID::gen(); + const NamespaceString kOriginalNss = NamespaceString("db", "foo"); + + const NamespaceString kTemporaryReshardingNss = + constructTemporaryReshardingNss("db", kExistingUUID); + const std::string kOriginalShardKey = "oldKey"; + const BSONObj kOriginalShardKeyPattern = BSON(kOriginalShardKey << 1); + const std::string kReshardingKey = "newKey"; + const BSONObj kReshardingKeyPattern = BSON(kReshardingKey << 1); + const OID kOriginalEpoch = OID::gen(); + const OID kReshardingEpoch = OID::gen(); + const UUID kReshardingUUID = UUID::gen(); + + const ShardId kShardOne = ShardId("shardOne"); + const ShardId kShardTwo = ShardId("shardTwo"); + + const std::vector<ShardId> kShardIds = {kShardOne, kShardTwo}; + + const Timestamp kFetchTimestamp = Timestamp(1, 0); + +protected: + CollectionMetadata makeShardedMetadataForOriginalCollection(OperationContext* opCtx) { + return makeShardedMetadata(opCtx, + kOriginalNss, + kOriginalShardKey, + kOriginalShardKeyPattern, + kExistingUUID, + kOriginalEpoch); + } + + CollectionMetadata makeShardedMetadataForTemporaryReshardingCollection( + OperationContext* opCtx) { + return makeShardedMetadata(opCtx, + kTemporaryReshardingNss, + kReshardingKey, + kReshardingKeyPattern, + kReshardingUUID, + kReshardingEpoch); + } + + CollectionMetadata makeShardedMetadata(OperationContext* opCtx, + const NamespaceString& nss, + const std::string& shardKey, + const BSONObj& shardKeyPattern, + const UUID& uuid, + const OID& epoch) { + auto range = ChunkRange(BSON(shardKey << MINKEY), BSON(shardKey << MAXKEY)); + auto chunk = + ChunkType(nss, std::move(range), ChunkVersion(1, 0, epoch, boost::none), kShardTwo); + ChunkManager cm( + kShardOne, + DatabaseVersion(uuid), + makeStandaloneRoutingTableHistory(RoutingTableHistory::makeNew(nss, + uuid, + shardKeyPattern, + nullptr, + false, + epoch, + boost::none, + boost::none, + true, + {std::move(chunk)})), + boost::none); + + if (!OperationShardingState::isOperationVersioned(opCtx)) { + const auto version = cm.getVersion(kShardOne); + BSONObjBuilder builder; + version.appendToCommand(&builder); + + auto& oss = OperationShardingState::get(opCtx); + oss.initializeClientRoutingVersionsFromCommand(nss, builder.obj()); + } + + return CollectionMetadata(std::move(cm), kShardOne); + } + + ReshardingFields createCommonReshardingFields(const UUID& reshardingUUID, + CoordinatorStateEnum state) { + auto fields = ReshardingFields(reshardingUUID); + fields.setState(state); + return fields; + }; + + void appendDonorFieldsToReshardingFields(ReshardingFields& fields, + const BSONObj& reshardingKey) { + fields.setDonorFields(TypeCollectionDonorFields(reshardingKey)); + } + + void appendRecipientFieldsToReshardingFields( + ReshardingFields& fields, + const std::vector<ShardId> donorShardIds, + const UUID& existingUUID, + const NamespaceString& originalNss, + const boost::optional<Timestamp>& fetchTimestamp = boost::none) { + auto recipientFields = + TypeCollectionRecipientFields(donorShardIds, existingUUID, originalNss); + emplaceFetchTimestampIfExists(recipientFields, fetchTimestamp); + fields.setRecipientFields(std::move(recipientFields)); + } + + template <class ReshardingDocument> + void assertCommonDocFieldsMatchReshardingFields(const NamespaceString& nss, + const UUID& reshardingUUID, + const UUID& existingUUID, + const BSONObj& reshardingKey, + const ReshardingDocument& reshardingDoc) { + ASSERT_EQ(reshardingDoc.get_id(), reshardingUUID); + ASSERT_EQ(reshardingDoc.getNss(), nss); + ASSERT_EQ(reshardingDoc.getExistingUUID(), existingUUID); + ASSERT_BSONOBJ_EQ(reshardingDoc.getReshardingKey().toBSON(), reshardingKey); + } + + void assertDonorDocMatchesReshardingFields(const NamespaceString& nss, + const UUID& existingUUID, + const ReshardingFields& reshardingFields, + const ReshardingDonorDocument& donorDoc) { + assertCommonDocFieldsMatchReshardingFields<ReshardingDonorDocument>( + nss, + reshardingFields.getUuid(), + existingUUID, + reshardingFields.getDonorFields()->getReshardingKey().toBSON(), + donorDoc); + ASSERT(donorDoc.getState() == DonorStateEnum::kPreparingToDonate); + ASSERT(donorDoc.getMinFetchTimestamp() == boost::none); + } + + void assertRecipientDocMatchesReshardingFields( + const CollectionMetadata& metadata, + const ReshardingFields& reshardingFields, + const ReshardingRecipientDocument& recipientDoc) { + assertCommonDocFieldsMatchReshardingFields<ReshardingRecipientDocument>( + reshardingFields.getRecipientFields()->getOriginalNamespace(), + reshardingFields.getUuid(), + reshardingFields.getRecipientFields()->getExistingUUID(), + metadata.getShardKeyPattern().toBSON(), + recipientDoc); + + ASSERT(recipientDoc.getState() == RecipientStateEnum::kCreatingCollection); + ASSERT(recipientDoc.getFetchTimestamp() == + reshardingFields.getRecipientFields()->getFetchTimestamp()); + + auto donorShardIds = reshardingFields.getRecipientFields()->getDonorShardIds(); + auto donorShardIdsSet = std::set<ShardId>(donorShardIds.begin(), donorShardIds.end()); + + for (const auto& donorShardMirroringEntry : recipientDoc.getDonorShardsMirroring()) { + ASSERT_EQ(donorShardMirroringEntry.getMirroring(), false); + auto reshardingFieldsDonorShardId = + donorShardIdsSet.find(donorShardMirroringEntry.getId()); + ASSERT(reshardingFieldsDonorShardId != donorShardIdsSet.end()); + donorShardIdsSet.erase(reshardingFieldsDonorShardId); + } + + ASSERT(donorShardIdsSet.empty()); + } +}; + +class ReshardingDonorRecipientCommonTest : public ReshardingDonorRecipientCommonInternalsTest { +public: + void setUp() override { + ShardServerTestFixture::setUp(); + + WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext()); + + _registry = repl::PrimaryOnlyServiceRegistry::get(getServiceContext()); + + std::unique_ptr<ReshardingDonorService> donorService = + std::make_unique<ReshardingDonorService>(getServiceContext()); + _registry->registerService(std::move(donorService)); + + std::unique_ptr<ReshardingRecipientService> recipientService = + std::make_unique<ReshardingRecipientService>(getServiceContext()); + _registry->registerService(std::move(recipientService)); + _registry->onStartup(operationContext()); + + stepUp(); + } + + void tearDown() override { + WaitForMajorityService::get(getServiceContext()).shutDown(); + + Grid::get(operationContext())->getExecutorPool()->shutdownAndJoin(); + + _registry->onShutdown(); + + ShardServerTestFixture::tearDown(); + } + + void stepUp() { + auto replCoord = repl::ReplicationCoordinator::get(getServiceContext()); + + // Advance term + _term++; + + ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY)); + ASSERT_OK(replCoord->updateTerm(operationContext(), _term)); + replCoord->setMyLastAppliedOpTimeAndWallTime( + repl::OpTimeAndWallTime(repl::OpTime(Timestamp(1, 1), _term), Date_t())); + + _registry->onStepUpComplete(operationContext(), _term); + } + +protected: + repl::PrimaryOnlyServiceRegistry* _registry; + long long _term = 0; +}; + +} // namespace + +} // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp index 15fa5aa4a2c..0e81debce95 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp @@ -118,6 +118,12 @@ void ensureFulfilledPromise(WithLock lk, SharedPromise<void>& sp) { sp.emplaceValue(); } } + +void ensureFulfilledPromise(WithLock lk, SharedPromise<void>& sp, Status error) { + if (!sp.getFuture().isReady()) { + sp.setError(error); + } +} } // namespace std::shared_ptr<repl::PrimaryOnlyService::Instance> ReshardingDonorService::constructInstance( @@ -200,6 +206,10 @@ void ReshardingDonorService::DonorStateMachine::interrupt(Status status) { _allRecipientsDoneApplying.setError(status); } + if (!_finalOplogEntriesWritten.getFuture().isReady()) { + _finalOplogEntriesWritten.setError(status); + } + if (!_coordinatorHasCommitted.getFuture().isReady()) { _coordinatorHasCommitted.setError(status); } @@ -296,9 +306,93 @@ void ReshardingDonorService::DonorStateMachine:: return; } + { + const auto& nss = _donorDoc.getNss(); + const auto& nssUUID = _donorDoc.getExistingUUID(); + const auto& reshardingUUID = _donorDoc.get_id(); + auto opCtx = cc().makeOperationContext(); + auto rawOpCtx = opCtx.get(); + + auto generateOplogEntry = [&](ShardId destinedRecipient) { + repl::MutableOplogEntry oplog; + oplog.setNss(nss); + oplog.setOpType(repl::OpTypeEnum::kNoop); + oplog.setUuid(nssUUID); + oplog.setDestinedRecipient(destinedRecipient); + oplog.setObject( + BSON("msg" << fmt::format("Writes to {} are temporarily blocked for resharding.", + nss.toString()))); + oplog.setObject2( + BSON("type" << kReshardFinalOpLogType << "reshardingUUID" << reshardingUUID)); + oplog.setOpTime(OplogSlot()); + oplog.setWallClockTime(opCtx->getServiceContext()->getFastClockSource()->now()); + return oplog; + }; + + try { + Timer latency; + const auto& tempNss = constructTemporaryReshardingNss(_donorDoc.getNss().db(), + _donorDoc.getExistingUUID()); + auto* catalogCache = Grid::get(rawOpCtx)->catalogCache(); + auto cm = uassertStatusOK(catalogCache->getCollectionRoutingInfo(rawOpCtx, tempNss)); + + uassert(ErrorCodes::NamespaceNotSharded, + str::stream() << "Expected collection " << tempNss << " to be sharded", + cm.isSharded()); + + std::set<ShardId> recipients; + cm.getAllShardIds(&recipients); + + for (const auto& recipient : recipients) { + auto oplog = generateOplogEntry(recipient); + writeConflictRetry( + rawOpCtx, + "ReshardingBlockWritesOplog", + NamespaceString::kRsOplogNamespace.ns(), + [&] { + AutoGetOplog oplogWrite(rawOpCtx, OplogAccessMode::kWrite); + WriteUnitOfWork wunit(rawOpCtx); + const auto& oplogOpTime = repl::logOp(rawOpCtx, &oplog); + uassert(5279507, + str::stream() + << "Failed to create new oplog entry for oplog with opTime: " + << oplog.getOpTime().toString() << ": " + << redact(oplog.toBSON()), + !oplogOpTime.isNull()); + wunit.commit(); + }); + } + + { + stdx::lock_guard<Latch> lg(_mutex); + LOGV2_DEBUG(5279504, + 0, + "Committed oplog entries to temporarily block writes for resharding", + "namespace"_attr = nss, + "reshardingUUID"_attr = reshardingUUID, + "numRecipients"_attr = recipients.size(), + "duration"_attr = duration_cast<Milliseconds>(latency.elapsed())); + ensureFulfilledPromise(lg, _finalOplogEntriesWritten); + } + } catch (const DBException& e) { + const auto& status = e.toStatus(); + stdx::lock_guard<Latch> lg(_mutex); + LOGV2_ERROR(5279508, + "Exception while writing resharding final oplog entries", + "reshardingUUID"_attr = reshardingUUID, + "error"_attr = status); + ensureFulfilledPromise(lg, _finalOplogEntriesWritten, status); + uassertStatusOK(status); + } + } + _transitionState(DonorStateEnum::kMirroring); } +SharedSemiFuture<void> ReshardingDonorService::DonorStateMachine::awaitFinalOplogEntriesWritten() { + return _finalOplogEntriesWritten.getFuture(); +} + ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::_awaitCoordinatorHasCommittedThenTransitionToDropping( const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { @@ -354,6 +448,13 @@ void ReshardingDonorService::DonorStateMachine::_transitionState( ReshardingDonorDocument replacementDoc(_donorDoc); replacementDoc.setState(endState); emplaceMinFetchTimestampIfExists(replacementDoc, minFetchTimestamp); + + LOGV2_INFO(5279505, + "Transition resharding donor state", + "newState"_attr = DonorState_serializer(replacementDoc.getState()), + "oldState"_attr = DonorState_serializer(_donorDoc.getState()), + "reshardingUUID"_attr = _donorDoc.get_id()); + _updateDonorDocument(std::move(replacementDoc)); } diff --git a/src/mongo/db/s/resharding/resharding_donor_service.h b/src/mongo/db/s/resharding/resharding_donor_service.h index f131f8d1d1c..efbc4e78ae3 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service.h +++ b/src/mongo/db/s/resharding/resharding_donor_service.h @@ -97,6 +97,8 @@ public: void onReshardingFieldsChanges(const TypeCollectionReshardingFields& reshardingFields); + SharedSemiFuture<void> awaitFinalOplogEntriesWritten(); + private: // The following functions correspond to the actions to take at a particular donor state. void _transitionToPreparingToDonate(); @@ -155,6 +157,8 @@ private: SharedPromise<void> _allRecipientsDoneApplying; + SharedPromise<void> _finalOplogEntriesWritten; + SharedPromise<void> _coordinatorHasCommitted; SharedPromise<void> _completionPromise; diff --git a/src/mongo/db/s/resharding/resharding_donor_service_test.cpp b/src/mongo/db/s/resharding/resharding_donor_service_test.cpp new file mode 100644 index 00000000000..b16c9c44dc6 --- /dev/null +++ b/src/mongo/db/s/resharding/resharding_donor_service_test.cpp @@ -0,0 +1,247 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest + +#include "mongo/platform/basic.h" + +#include <boost/optional.hpp> + +#include "mongo/client/remote_command_targeter_mock.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/logical_session_cache_noop.h" +#include "mongo/db/repl/storage_interface_impl.h" +#include "mongo/db/repl/storage_interface_mock.h" +#include "mongo/db/s/config/config_server_test_fixture.h" +#include "mongo/db/s/resharding/resharding_coordinator_service.h" +#include "mongo/db/s/resharding/resharding_donor_recipient_common_test.h" +#include "mongo/db/s/resharding/resharding_donor_service.h" +#include "mongo/db/s/resharding_util.h" +#include "mongo/db/s/shard_metadata_util.h" +#include "mongo/db/s/transaction_coordinator_service.h" +#include "mongo/db/session_catalog_mongod.h" +#include "mongo/executor/remote_command_request.h" +#include "mongo/executor/thread_pool_task_executor_test_fixture.h" +#include "mongo/logv2/log.h" +#include "mongo/s/catalog/sharding_catalog_client_mock.h" +#include "mongo/s/catalog/type_collection.h" +#include "mongo/s/catalog/type_shard.h" +#include "mongo/s/catalog_cache_loader_mock.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/clock_source_mock.h" + +namespace mongo { +namespace { +auto reshardingTempNss(const UUID& existingUUID) { + return NamespaceString(fmt::format("db.system.resharding.{}", existingUUID.toString())); +} + +class ReshardingDonorServiceTest : public ReshardingDonorRecipientCommonTest { +protected: + class ThreeRecipientsCatalogClient final : public ShardingCatalogClientMock { + public: + ThreeRecipientsCatalogClient(UUID existingUUID, std::vector<ShardId> recipients) + : ShardingCatalogClientMock(nullptr), + _existingUUID(std::move(existingUUID)), + _recipients(std::move(recipients)) {} + + // Makes one chunk object per shard; the actual key ranges not relevant for the test. + // The output is deterministic since this function is also used to provide data to the + // catalog cache loader. + static std::vector<ChunkType> makeChunks(const NamespaceString& nss, + const std::vector<ShardId> shards, + const ChunkVersion& chunkVersion) { + std::vector<ChunkType> chunks; + constexpr auto kKeyDelta = 1000; + + int curKey = 0; + for (size_t i = 0; i < shards.size(); ++i, curKey += kKeyDelta) { + const auto min = (i == 0) ? BSON("a" << MINKEY) : BSON("a" << curKey); + const auto max = (i == shards.size() - 1) ? BSON("a" << MAXKEY) + : BSON("a" << curKey + kKeyDelta); + chunks.push_back(ChunkType(nss, ChunkRange(min, max), chunkVersion, shards[i])); + } + + return chunks; + } + + StatusWith<std::vector<ChunkType>> getChunks(OperationContext* opCtx, + const BSONObj& filter, + const BSONObj& sort, + boost::optional<int> limit, + repl::OpTime* opTime, + repl::ReadConcernLevel readConcern) override { + auto version = ChunkVersion(1, 0, OID::gen(), boost::none /* timestamp */); + return makeChunks(reshardingTempNss(_existingUUID), _recipients, version); + } + + StatusWith<repl::OpTimeWith<std::vector<ShardType>>> getAllShards( + OperationContext* opCtx, repl::ReadConcernLevel readConcern) override { + auto shardTypes = [this]() { + std::vector<ShardType> result; + std::transform( + kRecipientShards.begin(), + kRecipientShards.end(), + std::back_inserter(result), + [&](const ShardId& id) { return ShardType(id.toString(), id.toString()); }); + return result; + }; + return repl::OpTimeWith<std::vector<ShardType>>(shardTypes()); + } + + UUID _existingUUID; + ChunkVersion maxCollVersion = ChunkVersion(0, 0, OID::gen(), boost::none); + std::vector<ShardId> _recipients; + }; + + void setUp() override { + _reshardingUUID = UUID::gen(); + + auto mockLoader = std::make_unique<CatalogCacheLoaderMock>(); + _mockCatalogCacheLoader = mockLoader.get(); + + // Must be called prior to setUp so that the catalog cache is wired properly. + setCatalogCacheLoader(std::move(mockLoader)); + ReshardingDonorRecipientCommonTest::setUp(); + + mockCatalogCacheLoader( + NamespaceString(kReshardNs), reshardingTempNss(kExistingUUID), kRecipientShards); + } + + + std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient( + std::unique_ptr<DistLockManager> distLockManager) { + auto mockClient = std::make_unique<ThreeRecipientsCatalogClient>( + uassertStatusOK(UUID::parse(kExistingUUID.toString())), kRecipientShards); + return mockClient; + } + + std::shared_ptr<ReshardingDonorService::DonorStateMachine> getStateMachineInstace( + OperationContext* opCtx, ReshardingDonorDocument initialState) { + auto instanceId = BSON(ReshardingDonorDocument::k_idFieldName << initialState.get_id()); + auto registry = repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext()); + auto service = registry->lookupServiceByName(ReshardingDonorService::kServiceName); + return ReshardingDonorService::DonorStateMachine::getOrCreate( + opCtx, service, initialState.toBSON()); + } + + std::vector<BSONObj> getOplogWritesForDonorDocument(const ReshardingDonorDocument& doc) { + auto reshardNs = doc.getNss().toString(); + DBDirectClient client(operationContext()); + auto result = client.query(NamespaceString(NamespaceString::kRsOplogNamespace.ns()), + BSON("ns" << reshardNs)); + std::vector<BSONObj> results; + while (result->more()) { + results.push_back(result->next()); + } + return results; + } + + void mockCatalogCacheLoader(const NamespaceString& nss, + const NamespaceString tempNss, + const std::vector<ShardId>& recipients) { + auto version = ChunkVersion(1, 0, OID::gen(), boost::none /* timestamp */); + CollectionType coll(nss, version.epoch(), Date_t::now(), UUID::gen()); + coll.setKeyPattern(BSON("y" << 1)); + coll.setUnique(false); + coll.setAllowMigrations(false); + + _mockCatalogCacheLoader->setDatabaseRefreshReturnValue( + DatabaseType(nss.toString(), recipients.front(), true, DatabaseVersion(UUID::gen()))); + _mockCatalogCacheLoader->setCollectionRefreshValues( + kTemporaryReshardingNss, + coll, + ThreeRecipientsCatalogClient::makeChunks(tempNss, recipients, version), + boost::none); + } + + boost::optional<UUID> _reshardingUUID; + boost::optional<UUID> _existingUUID; + CatalogCacheLoaderMock* _mockCatalogCacheLoader; + + static const inline auto kRecipientShards = + std::vector<ShardId>{ShardId("shard1"), ShardId("shard2"), ShardId("shard3")}; + static constexpr auto kExpectedO2Type = "reshardFinalOp"_sd; + static constexpr auto kReshardNs = "db.foo"_sd; +}; + +TEST_F(ReshardingDonorServiceTest, ShouldWriteFinalOpLogEntryAfterTransitionToPreparingToMirror) { + ReshardingDonorDocument doc(DonorStateEnum::kPreparingToMirror); + CommonReshardingMetadata metadata(kReshardingUUID, + mongo::NamespaceString(kReshardNs), + kExistingUUID, + KeyPattern(kReshardingKeyPattern)); + doc.setCommonReshardingMetadata(metadata); + doc.getMinFetchTimestampStruct().setMinFetchTimestamp(Timestamp{0xf00}); + + auto donorStateMachine = getStateMachineInstace(operationContext(), doc); + ASSERT(donorStateMachine); + + const auto expectedRecipients = + std::set<ShardId>(kRecipientShards.begin(), kRecipientShards.end()); + ASSERT(expectedRecipients.size()); + + donorStateMachine->awaitFinalOplogEntriesWritten().get(); + + const auto oplogs = getOplogWritesForDonorDocument(doc); + ASSERT(oplogs.size() == expectedRecipients.size()); + + std::set<ShardId> actualRecipients; + for (const auto& oplog : oplogs) { + LOGV2_INFO(5279502, "verify retrieved oplog document", "document"_attr = oplog); + + ASSERT(oplog.hasField("ns")); + auto actualNs = oplog.getStringField("ns"); + ASSERT_EQUALS(kReshardNs, actualNs); + + ASSERT(oplog.hasField("o2")); + auto o2 = oplog.getObjectField("o2"); + ASSERT(o2.hasField("type")); + auto actualType = StringData(o2.getStringField("type")); + ASSERT_EQUALS(kExpectedO2Type, actualType); + ASSERT(o2.hasField("reshardingUUID")); + auto actualReshardingUUIDBson = o2.getField("reshardingUUID"); + auto actualReshardingUUID = UUID::parse(actualReshardingUUIDBson); + ASSERT_EQUALS(doc.get_id(), actualReshardingUUID); + + ASSERT(oplog.hasField("ui")); + auto actualUiBson = oplog.getField("ui"); + auto actualUi = UUID::parse(actualUiBson); + ASSERT_EQUALS(kExistingUUID, actualUi); + + ASSERT(oplog.hasField("destinedRecipient")); + auto actualRecipient = oplog.getStringField("destinedRecipient"); + actualRecipients.insert(ShardId(actualRecipient)); + } + + ASSERT(expectedRecipients == actualRecipients); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp index df950d51542..79b029a9a92 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp @@ -287,9 +287,7 @@ public: dataColl.getCollection()->uuid(), BSON("msg" << fmt::format("Writes to {} are temporarily blocked for resharding.", dataColl.getCollection()->ns().toString())), - BSON("type" - << "reshardFinalOp" - << "reshardingUUID" << _reshardingUUID), + BSON("type" << kReshardFinalOpLogType << "reshardingUUID" << _reshardingUUID), boost::none, boost::none, boost::none, diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp index 9e7a4ffac0b..76a29f4d0cc 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp @@ -397,7 +397,7 @@ void ReshardingRecipientService::RecipientStateMachine::_applyThenTransitionToSt // Unless a test is prepared to write the final oplog entries itself, without this interrupt(), // the futures returned by ReshardingOplogFetcher::schedule() would never become ready. // - // TODO SERVER-52795: Remove once the donor shards write the final oplog entry themselves. + // TODO SERVER-53372: Remove once the donor shards write the final oplog entry themselves. if (resharding::gReshardingTempInterruptBeforeOplogApplication) { interrupt({ErrorCodes::InternalError, "Artificial interruption to enable jsTests"}); } @@ -524,6 +524,13 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionState( RecipientStateEnum endState, boost::optional<Timestamp> fetchTimestamp) { ReshardingRecipientDocument replacementDoc(_recipientDoc); replacementDoc.setState(endState); + + LOGV2_INFO(5279506, + "Transition resharding recipient state", + "newState"_attr = RecipientState_serializer(replacementDoc.getState()), + "oldState"_attr = RecipientState_serializer(_recipientDoc.getState()), + "reshardingUUID"_attr = _recipientDoc.get_id()); + if (endState == RecipientStateEnum::kCreatingCollection) { _insertRecipientDocument(replacementDoc); return; |