summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBlake Oler <blake.oler@mongodb.com>2020-10-15 02:28:28 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-11-05 21:21:11 +0000
commit20b397389eedea51faf8699790eb62963db40d7d (patch)
tree897ac87824dae7de1222664a349220cd938aca4b
parent0e400a736bacfa291dbda2971f7381ad392000c9 (diff)
downloadmongo-20b397389eedea51faf8699790eb62963db40d7d.tar.gz
SERVER-51398 Write state transition and other data from resharding donor/recipient shards to the coordinator
-rw-r--r--jstests/sharding/reshard_collection_basic.js52
-rw-r--r--src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp12
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.cpp4
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp4
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp6
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service.cpp53
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service.h3
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.cpp27
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.h2
9 files changed, 106 insertions, 57 deletions
diff --git a/jstests/sharding/reshard_collection_basic.js b/jstests/sharding/reshard_collection_basic.js
index 0bb5537f14f..c536503c82a 100644
--- a/jstests/sharding/reshard_collection_basic.js
+++ b/jstests/sharding/reshard_collection_basic.js
@@ -12,15 +12,26 @@ const collName = '.foo';
const ns = kDbName + collName;
const mongos = st.s0;
+let removeAllReshardingCollections = () => {
+ mongos.getDB(kDbName).foo.drop();
+ mongos.getDB('config').reshardingOperations.remove({nss: ns});
+ mongos.getDB('config').collections.remove({reshardingFields: {$exists: true}});
+ st.rs0.getPrimary().getDB('config').localReshardingOperations.donor.remove({nss: ns});
+ st.rs0.getPrimary().getDB('config').localReshardingOperations.recipient.remove({nss: ns});
+ st.rs1.getPrimary().getDB('config').localReshardingOperations.donor.remove({nss: ns});
+ st.rs1.getPrimary().getDB('config').localReshardingOperations.recipient.remove({nss: ns});
+};
+
// Fail if sharding is disabled.
assert.commandFailedWithCode(mongos.adminCommand({reshardCollection: ns, key: {_id: 1}}),
ErrorCodes.NamespaceNotFound);
+assert.commandWorked(mongos.adminCommand({enableSharding: kDbName}));
+
// Fail if collection is unsharded.
assert.commandFailedWithCode(mongos.adminCommand({reshardCollection: ns, key: {_id: 1}}),
- ErrorCodes.NamespaceNotFound);
+ ErrorCodes.NamespaceNotSharded);
-assert.commandWorked(mongos.adminCommand({enableSharding: kDbName}));
assert.commandWorked(mongos.adminCommand({shardCollection: ns, key: {_id: 1}}));
// Fail if missing required key.
@@ -35,7 +46,7 @@ assert.commandFailedWithCode(
assert.commandWorked(
mongos.adminCommand({reshardCollection: ns, key: {_id: 1}, collation: {locale: 'simple'}}));
-mongos.getDB(kDbName).foo.drop();
+removeAllReshardingCollections();
assert.commandWorked(mongos.adminCommand({shardCollection: ns, key: {_id: 1}}));
@@ -44,7 +55,8 @@ assert.commandFailedWithCode(
mongos.adminCommand({reshardCollection: ns, key: {_id: 1}, unique: true}), ErrorCodes.BadValue);
// Succeed if unique is specified and is false.
assert.commandWorked(mongos.adminCommand({reshardCollection: ns, key: {_id: 1}, unique: false}));
-mongos.getDB(kDbName).foo.drop();
+
+removeAllReshardingCollections();
// Succeed if _presetReshardedChunks is provided and test commands are enabled (default).
assert.commandWorked(mongos.adminCommand({shardCollection: ns, key: {_id: 1}}));
@@ -54,7 +66,8 @@ assert.commandWorked(mongos.adminCommand({
_presetReshardedChunks:
[{recipientShardId: st.shard1.shardName, min: {_id: MinKey}, max: {_id: MaxKey}}]
}));
-mongos.getDB(kDbName).foo.drop();
+
+removeAllReshardingCollections();
assert.commandWorked(mongos.adminCommand({shardCollection: ns, key: {_id: 1}}));
@@ -80,7 +93,8 @@ assert.commandWorked(mongos.adminCommand({
collation: {locale: 'simple'},
numInitialChunks: 2,
}));
-mongos.getDB(kDbName).foo.drop();
+
+removeAllReshardingCollections();
// Succeed if all optional fields and _presetReshardedChunks are provided with correct values and
// test commands are enabled (default).
@@ -95,7 +109,8 @@ assert.commandWorked(mongos.adminCommand({
{recipientShardId: st.shard0.shardName, min: {_id: MinKey}, max: {_id: 0}}
]
}));
-mongos.getDB(kDbName).foo.drop();
+
+removeAllReshardingCollections();
const existingZoneName = 'x1';
@@ -141,28 +156,7 @@ assert.commandWorked(mongos.adminCommand({
]
}));
-assert.commandWorked(mongos.getDB(kDbName).dropDatabase());
-
-// TODO remove test case below once the resharding command actually writes to config.collections
-// itself Test that shards correctly cache 'reshardingFields' in config.cache.collections
-assert.commandWorked(mongos.adminCommand({enableSharding: kDbName}));
-st.ensurePrimaryShard(kDbName, st.shard0.shardName);
-assert.commandWorked(mongos.adminCommand({shardCollection: ns, key: {_id: 1}}));
-
-let donorReshardingFields = {
- "uuid": UUID(),
- "state": "initializing",
- "donorFields": {"reshardingKey": {x: 1}}
-};
-assert.commandWorked(st.configRS.getPrimary().getDB("config").collections.update(
- {_id: ns}, {"$set": {"reshardingFields": donorReshardingFields}}));
-
-// Run moveChunk to force shard0 to pick up the new info in config.collections
-assert.commandWorked(mongos.adminCommand({moveChunk: ns, find: {_id: 0}, to: st.shard1.shardName}));
-
-let cachedEntry = st.rs0.getPrimary().getDB('config').cache.collections.findOne({_id: ns});
-assert.docEq(cachedEntry.reshardingFields, donorReshardingFields);
+removeAllReshardingCollections();
-assert.commandWorked(mongos.getDB(kDbName).dropDatabase());
st.stop();
})();
diff --git a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp
index 1f4e3fec8df..ccebb894ecf 100644
--- a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp
+++ b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp
@@ -209,11 +209,15 @@ public:
instance->setInitialChunksAndZones(std::move(initialChunks), std::move(newZones));
- // This promise will currently be falsely fulfilled by a call to interrupt() inside
- // the ReshardingCoordinatorService. This is to enable jsTests to pass while code
- // is still being committed.
- // TODO SERVER-51398 Change this comment and assess the current call to .wait().
instance->getObserver()->awaitAllDonorsReadyToDonate().wait();
+ // This promise is currently automatically filled by recipient shards after creating
+ // their ReshardingRecipientStateMachines.
+ // TODO SERVER-51217 Update this comment to reflect that the temporary collection
+ // has been created.
+ instance->getObserver()->awaitAllRecipientsFinishedApplying().wait();
+
+ instance->interrupt(
+ {ErrorCodes::InternalError, "Artificial interruption to enable jsTests"});
}
private:
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
index 9af822b3d87..7d81fe332c7 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
@@ -407,6 +407,7 @@ stdx::unordered_map<CoordinatorStateEnum, ParticipantsToNofityEnum> notifyForSta
{CoordinatorStateEnum::kInitializing, ParticipantsToNofityEnum::kNone},
{CoordinatorStateEnum::kPreparingToDonate, ParticipantsToNofityEnum::kDonors},
{CoordinatorStateEnum::kCloning, ParticipantsToNofityEnum::kRecipients},
+ {CoordinatorStateEnum::kApplying, ParticipantsToNofityEnum::kDonors},
{CoordinatorStateEnum::kMirroring, ParticipantsToNofityEnum::kDonors},
{CoordinatorStateEnum::kCommitted, ParticipantsToNofityEnum::kNone},
{CoordinatorStateEnum::kRenaming, ParticipantsToNofityEnum::kRecipients},
@@ -767,9 +768,6 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllDonorsReadyToDonat
return ExecutorFuture<void>(**executor, Status::OK());
}
- // TODO SERVER-51398 Remove this call.
- interrupt({ErrorCodes::InternalError, "Early exit to support jsTesting"});
-
return _reshardingCoordinatorObserver->awaitAllDonorsReadyToDonate()
.thenRunOn(**executor)
.then([this](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) {
diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp
index 8b848a95271..6acfb708340 100644
--- a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp
@@ -144,8 +144,8 @@ ReshardingRecipientDocument constructRecipientDocumentFromReshardingFields(
createDonorShardMirroringEntriesFromDonorShardIds(
reshardingFields.getRecipientFields()->getDonorShardIds());
- auto recipientDoc =
- ReshardingRecipientDocument(RecipientStateEnum::kCloning, std::move(donorShards));
+ auto recipientDoc = ReshardingRecipientDocument(RecipientStateEnum::kCreatingCollection,
+ std::move(donorShards));
auto commonMetadata =
CommonReshardingMetadata(reshardingFields.getUuid(),
diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp
index f479d02942e..9df204a68e6 100644
--- a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp
@@ -181,7 +181,7 @@ protected:
metadata.getShardKeyPattern().toBSON(),
recipientDoc);
- ASSERT(recipientDoc.getState() == RecipientStateEnum::kCloning);
+ ASSERT(recipientDoc.getState() == RecipientStateEnum::kCreatingCollection);
ASSERT(recipientDoc.getFetchTimestamp() ==
reshardingFields.getRecipientFields()->getFetchTimestamp());
@@ -294,6 +294,8 @@ TEST_F(ReshardingDonorRecipientCommonTest, CreateDonorServiceInstance) {
ReshardingDonorDocument>(opCtx, kReshardingUUID);
ASSERT(donorStateMachine != boost::none);
+
+ donorStateMachine.get()->interrupt({ErrorCodes::InternalError, "Shut down for test"});
}
TEST_F(ReshardingDonorRecipientCommonTest, CreateRecipientServiceInstance) {
@@ -315,6 +317,8 @@ TEST_F(ReshardingDonorRecipientCommonTest, CreateRecipientServiceInstance) {
kReshardingUUID);
ASSERT(recipientStateMachine != boost::none);
+
+ recipientStateMachine.get()->interrupt({ErrorCodes::InternalError, "Shut down for test"});
}
TEST_F(ReshardingDonorRecipientCommonTest,
diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp
index 17c2bb431b8..8135fbfbaa3 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp
@@ -37,7 +37,10 @@
#include "mongo/db/op_observer.h"
#include "mongo/db/persistent_task_store.h"
#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/db/s/sharding_state.h"
#include "mongo/logv2/log.h"
+#include "mongo/s/catalog/sharding_catalog_client.h"
+#include "mongo/s/grid.h"
namespace mongo {
namespace {
@@ -103,7 +106,6 @@ ReshardingDonorService::DonorStateMachine::~DonorStateMachine() {
void ReshardingDonorService::DonorStateMachine::run(
std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept {
ExecutorFuture<void>(**executor)
- .then([this] { _transitionToPreparingToDonate(); })
.then(
[this] { _onPreparingToDonateCalculateTimestampThenTransitionToDonatingInitialData(); })
.then([this, executor] {
@@ -166,14 +168,6 @@ void ReshardingDonorService::DonorStateMachine::interrupt(Status status) {
void ReshardingDonorService::DonorStateMachine::onReshardingFieldsChanges(
boost::optional<TypeCollectionReshardingFields> reshardingFields) {}
-void ReshardingDonorService::DonorStateMachine::_transitionToPreparingToDonate() {
- if (_donorDoc.getState() > DonorStateEnum::kUnused) {
- return;
- }
-
- _transitionState(DonorStateEnum::kPreparingToDonate);
-}
-
void ReshardingDonorService::DonorStateMachine::
_onPreparingToDonateCalculateTimestampThenTransitionToDonatingInitialData() {
if (_donorDoc.getState() > DonorStateEnum::kPreparingToDonate) {
@@ -181,8 +175,13 @@ void ReshardingDonorService::DonorStateMachine::
return;
}
+ _insertDonorDocument(_donorDoc);
+
auto minFetchTimestamp = generateMinFetchTimestamp(_donorDoc);
- _transitionState(DonorStateEnum::kDonatingInitialData, minFetchTimestamp);
+ _transitionStateAndUpdateCoordinator(DonorStateEnum::kDonatingInitialData, minFetchTimestamp);
+
+ // TODO SERVER-XXXXX Remove this line.
+ interrupt({ErrorCodes::InternalError, "Artificial interruption to enable jsTests"});
}
ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::
@@ -193,7 +192,7 @@ ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::
}
return _allRecipientsDoneCloning.getFuture().thenRunOn(**executor).then([this]() {
- _transitionState(DonorStateEnum::kDonatingOplogEntries);
+ _transitionStateAndUpdateCoordinator(DonorStateEnum::kDonatingOplogEntries);
});
}
@@ -235,18 +234,13 @@ void ReshardingDonorService::DonorStateMachine::_dropOriginalCollectionThenDelet
return;
}
- _transitionState(DonorStateEnum::kDone);
+ _transitionStateAndUpdateCoordinator(DonorStateEnum::kDone);
}
void ReshardingDonorService::DonorStateMachine::_transitionState(
DonorStateEnum endState, boost::optional<Timestamp> minFetchTimestamp) {
ReshardingDonorDocument replacementDoc(_donorDoc);
replacementDoc.setState(endState);
- if (endState == DonorStateEnum::kPreparingToDonate) {
- _insertDonorDocument(std::move(replacementDoc));
- return;
- }
-
if (minFetchTimestamp) {
auto& minFetchTimestampStruct = replacementDoc.getMinFetchTimestampStruct();
if (minFetchTimestampStruct.getMinFetchTimestamp())
@@ -259,6 +253,31 @@ void ReshardingDonorService::DonorStateMachine::_transitionState(
_updateDonorDocument(std::move(replacementDoc));
}
+void ReshardingDonorService::DonorStateMachine::_transitionStateAndUpdateCoordinator(
+ DonorStateEnum endState, boost::optional<Timestamp> minFetchTimestamp) {
+ _transitionState(endState, minFetchTimestamp);
+
+ auto opCtx = cc().makeOperationContext();
+ auto shardId = ShardingState::get(opCtx.get())->shardId();
+
+ BSONObjBuilder updateBuilder;
+ updateBuilder.append("donorShards.$.state", DonorState_serializer(endState));
+
+ if (minFetchTimestamp) {
+ updateBuilder.append("donorShards.$.minFetchTimestamp", minFetchTimestamp.get());
+ }
+
+ uassertStatusOK(
+ Grid::get(opCtx.get())
+ ->catalogClient()
+ ->updateConfigDocument(opCtx.get(),
+ NamespaceString::kConfigReshardingOperationsNamespace,
+ BSON("_id" << _donorDoc.get_id() << "donorShards.id" << shardId),
+ BSON("$set" << updateBuilder.done()),
+ false /* upsert */,
+ ShardingCatalogClient::kMajorityWriteConcern));
+}
+
void ReshardingDonorService::DonorStateMachine::_transitionStateToError(const Status& status) {
ReshardingDonorDocument replacementDoc(_donorDoc);
replacementDoc.setState(DonorStateEnum::kError);
diff --git a/src/mongo/db/s/resharding/resharding_donor_service.h b/src/mongo/db/s/resharding/resharding_donor_service.h
index dfd37678c94..ff34a651894 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service.h
+++ b/src/mongo/db/s/resharding/resharding_donor_service.h
@@ -120,6 +120,9 @@ private:
void _transitionState(DonorStateEnum endState,
boost::optional<Timestamp> minFetchTimestamp = boost::none);
+ void _transitionStateAndUpdateCoordinator(
+ DonorStateEnum endState, boost::optional<Timestamp> minFetchTimestamp = boost::none);
+
// Transitions the state on-disk and in-memory to kError.
void _transitionStateToError(const Status& status);
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
index 07980cd9740..25c87941614 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/s/migration_destination_manager.h"
#include "mongo/db/s/resharding_util.h"
+#include "mongo/db/s/sharding_state.h"
#include "mongo/logv2/log.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/cluster_commands_helpers.h"
@@ -234,7 +235,8 @@ void ReshardingRecipientService::RecipientStateMachine::_applyThenTransitionToSt
return;
}
- _transitionState(RecipientStateEnum::kSteadyState);
+ _transitionStateAndUpdateCoordinator(RecipientStateEnum::kSteadyState);
+ interrupt({ErrorCodes::InternalError, "Artificial interruption to enable jsTests"});
}
ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
@@ -284,6 +286,29 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionState(
_updateRecipientDocument(std::move(replacementDoc));
}
+void ReshardingRecipientService::RecipientStateMachine::_transitionStateAndUpdateCoordinator(
+ RecipientStateEnum endState) {
+ _transitionState(endState, boost::none);
+
+ auto opCtx = cc().makeOperationContext();
+
+ auto shardId = ShardingState::get(opCtx.get())->shardId();
+
+ BSONObjBuilder updateBuilder;
+ updateBuilder.append("recipientShards.$.state", RecipientState_serializer(endState));
+
+ uassertStatusOK(
+ Grid::get(opCtx.get())
+ ->catalogClient()
+ ->updateConfigDocument(
+ opCtx.get(),
+ NamespaceString::kConfigReshardingOperationsNamespace,
+ BSON("_id" << _recipientDoc.get_id() << "recipientShards.id" << shardId),
+ BSON("$set" << updateBuilder.done()),
+ false /* upsert */,
+ ShardingCatalogClient::kMajorityWriteConcern));
+}
+
void ReshardingRecipientService::RecipientStateMachine::_transitionStateToError(
const Status& status) {
ReshardingRecipientDocument replacementDoc(_recipientDoc);
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h
index da95ff3513f..9c53b20ca8a 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.h
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.h
@@ -135,6 +135,8 @@ private:
void _transitionState(RecipientStateEnum endState,
boost::optional<Timestamp> fetchTimestamp = boost::none);
+ void _transitionStateAndUpdateCoordinator(RecipientStateEnum endState);
+
// Transitions the state on-disk and in-memory to kError.
void _transitionStateToError(const Status& status);