diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2016-11-04 15:44:36 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2016-12-05 09:43:27 -0500 |
commit | 7ca7bcbfb746ef945afcda91569ea34a2c2738ec (patch) | |
tree | f95ac88b3eb70b7a72009e2f4fd8fbe0eefc1964 | |
parent | c09c4d6aae7eb22bfa0981c3085c9319f83bc4de (diff) | |
download | mongo-7ca7bcbfb746ef945afcda91569ea34a2c2738ec.tar.gz |
SERVER-22657,SERVER-27215 Unit-tests for MigrationChunkClonerSourceLegacy
(cherry picked from commit ee84af83abda81bdd5c1fdf2d831c4979d739f8c)
(cherry picked from commit 39c985be5b049edbbb1289eba2bb6f4b44fcb00e)
-rw-r--r-- | src/mongo/db/s/SConscript | 4 | ||||
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source.h | 12 | ||||
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp | 113 | ||||
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy.h | 17 | ||||
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp | 367 | ||||
-rw-r--r-- | src/mongo/db/s/migration_source_manager.cpp | 11 |
6 files changed, 451 insertions, 73 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index aa7365abcb6..53e74c1d5e3 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -202,13 +202,15 @@ env.CppUnitTest( ) env.CppUnitTest( - target='sharding_state_test', + target='shard_test', source=[ 'active_migrations_registry_test.cpp', 'metadata_manager_test.cpp', + 'migration_chunk_cloner_source_legacy_test.cpp', 'sharding_state_test.cpp', ], LIBDEPS=[ + '$BUILD_DIR/mongo/s/catalog/sharding_catalog_mock', '$BUILD_DIR/mongo/s/sharding_mongod_test_fixture', ], ) diff --git a/src/mongo/db/s/migration_chunk_cloner_source.h b/src/mongo/db/s/migration_chunk_cloner_source.h index 62ce24350b5..04cf9e36df2 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source.h +++ b/src/mongo/db/s/migration_chunk_cloner_source.h @@ -38,9 +38,10 @@ class OperationContext; class Status; /** - * This state machine is responsible for the actual movement of chunk documents from donor to a - * recipient shard. Its lifetime is owned and controlled by a single migration source manager which - * registers it for notifications from the replication subsystem. + * This class is responsible for producing chunk documents to be moved from donor to a recipient + * shard and its methods represent cloning stages. Its lifetime is owned and controlled by a single + * migration source manager which registers it for notifications from the replication subsystem + * before calling startClone. * * Unless explicitly indicated, the methods on this class are not thread-safe. * @@ -48,7 +49,7 @@ class Status; * it begins receiving notifications from the replication subsystem through the * on[insert/update/delete]Op methods. It is up to the creator to decide how these methods end up * being called, but currently this is done through the CollectionShardingState. The creator then - * kicks off the migration as soon as possible by calling startClone. + * kicks off the cloning as soon as possible by calling startClone. */ class MigrationChunkClonerSource { MONGO_DISALLOW_COPYING(MigrationChunkClonerSource); @@ -61,7 +62,8 @@ public: * the recipient shard to start cloning. Before calling this method, this chunk cloner must be * registered for notifications from the replication subsystem (not checked here). * - * NOTE: Must be called without any locks and must succeed, before any other methods are called. + * NOTE: Must be called without any locks and must succeed, before any other methods are called + * (except for cancelClone and [insert/update/delete]Op). */ virtual Status startClone(OperationContext* txn) = 0; diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index 1c93030657b..90714cdb71b 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -142,16 +142,14 @@ public: stdx::lock_guard<stdx::mutex> sl(_cloner->_mutex); _cloner->_deleted.push_back(_idObj); _cloner->_memoryUsed += _idObj.firstElement().size() + 5; - break; - } + } break; case 'i': case 'u': { stdx::lock_guard<stdx::mutex> sl(_cloner->_mutex); _cloner->_reload.push_back(_idObj); _cloner->_memoryUsed += _idObj.firstElement().size() + 5; - break; - } + } break; default: MONGO_UNREACHABLE; @@ -178,23 +176,18 @@ MigrationChunkClonerSourceLegacy::MigrationChunkClonerSourceLegacy(MoveChunkRequ _recipientHost(std::move(recipientHost)) {} MigrationChunkClonerSourceLegacy::~MigrationChunkClonerSourceLegacy() { + invariant(_state == kDone); invariant(!_deleteNotifyExec); } Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* txn) { + invariant(_state == kNew); invariant(!txn->lockState()->isLocked()); - // TODO (Kal): This can be changed to cancelClone after 3.4 is released. The reason to only do - // internal cleanup in 3.4 is for backwards compatibility with 3.2 nodes, which cannot - // differentiate between cancellations for different migration sessions. It is thus possible - // that a second migration from different donor, but the same recipient would certainly abort an - // already running migration. - auto scopedGuard = MakeGuard([&] { _cleanup(txn); }); - - // Prepare the currently available documents - Status status = _storeCurrentLocs(txn); - if (!status.isOK()) { - return status; + // Load the ids of the currently available documents + auto storeCurrentLocsStatus = _storeCurrentLocs(txn); + if (!storeCurrentLocsStatus.isOK()) { + return storeCurrentLocsStatus; } // Tell the recipient shard to start cloning @@ -216,14 +209,22 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* txn) { return startChunkCloneResponseStatus.getStatus(); } - scopedGuard.Dismiss(); + // TODO (Kal): Setting the state to kCloning below means that if cancelClone was called we will + // send a cancellation command to the recipient. The reason to limit the cases when we send + // cancellation is for backwards compatibility with 3.2 nodes, which cannot differentiate + // between cancellations for different migration sessions. It is thus possible that a second + // migration from different donor, but the same recipient would certainly abort an already + // running migration. + stdx::lock_guard<stdx::mutex> sl(_mutex); + _state = kCloning; + return Status::OK(); } Status MigrationChunkClonerSourceLegacy::awaitUntilCriticalSectionIsAppropriate( OperationContext* txn, Milliseconds maxTimeToWait) { + invariant(_state == kCloning); invariant(!txn->lockState()->isLocked()); - auto scopedGuard = MakeGuard([&] { cancelClone(txn); }); const auto startTime = Date_t::now(); @@ -261,7 +262,6 @@ Status MigrationChunkClonerSourceLegacy::awaitUntilCriticalSectionIsAppropriate( << " documents remaining"}; } - scopedGuard.Dismiss(); return Status::OK(); } @@ -302,18 +302,13 @@ Status MigrationChunkClonerSourceLegacy::awaitUntilCriticalSectionIsAppropriate( } } - scopedGuard.Dismiss(); return {ErrorCodes::ExceededTimeLimit, "Timed out waiting for the cloner to catch up"}; } Status MigrationChunkClonerSourceLegacy::commitClone(OperationContext* txn) { + invariant(_state == kCloning); invariant(!txn->lockState()->isLocked()); - { - stdx::lock_guard<stdx::mutex> sl(_mutex); - invariant(!_cloneCompleted); - } - auto responseStatus = _callRecipient(createRequestWithSessionId(kRecvChunkCommit, _args.getNss(), _sessionId)); if (responseStatus.isOK()) { @@ -328,14 +323,18 @@ Status MigrationChunkClonerSourceLegacy::commitClone(OperationContext* txn) { void MigrationChunkClonerSourceLegacy::cancelClone(OperationContext* txn) { invariant(!txn->lockState()->isLocked()); - { - stdx::lock_guard<stdx::mutex> sl(_mutex); - if (_cloneCompleted) - return; + switch (_state) { + case kDone: + break; + case kCloning: + _callRecipient(createRequestWithSessionId(kRecvChunkAbort, _args.getNss(), _sessionId)); + // Intentional fall through + case kNew: + _cleanup(txn); + break; + default: + MONGO_UNREACHABLE; } - - _callRecipient(createRequestWithSessionId(kRecvChunkAbort, _args.getNss(), _sessionId)); - _cleanup(txn); } bool MigrationChunkClonerSourceLegacy::isDocumentInMigratingChunk(OperationContext* txn, @@ -435,6 +434,12 @@ Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* txn, _cloneLocs.erase(_cloneLocs.begin(), it); + // If we have drained all the cloned data, there is no need to keep the delete notify executor + // around + if (_cloneLocs.empty()) { + _deleteNotifyExec.reset(); + } + return Status::OK(); } @@ -461,13 +466,15 @@ Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* txn, void MigrationChunkClonerSourceLegacy::_cleanup(OperationContext* txn) { { stdx::lock_guard<stdx::mutex> sl(_mutex); - _cloneCompleted = true; + _state = kDone; + _reload.clear(); + _deleted.clear(); } - ScopedTransaction scopedXact(txn, MODE_IS); - AutoGetCollection autoColl(txn, _args.getNss(), MODE_IS); - if (_deleteNotifyExec) { + ScopedTransaction scopedXact(txn, MODE_IS); + AutoGetCollection autoColl(txn, _args.getNss(), MODE_IS); + _deleteNotifyExec.reset(); } } @@ -476,7 +483,7 @@ StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::_callRecipient(const BSONO executor::RemoteCommandResponse responseStatus( Status{ErrorCodes::InternalError, "Uninitialized value"}); - auto executor = grid.getExecutorPool()->getArbitraryExecutor(); + auto executor = grid.getExecutorPool()->getFixedExecutor(); auto scheduleStatus = executor->scheduleRemoteCommand( executor::RemoteCommandRequest(_recipientHost, "admin", cmdObj, nullptr), [&responseStatus](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) { @@ -514,7 +521,7 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* txn // Allow multiKey based on the invariant that shard keys must be single-valued. Therefore, any // multi-key index prefixed by shard key cannot be multikey over the shard key fields. - IndexDescriptor* idx = + IndexDescriptor* const idx = collection->getIndexCatalog()->findShardKeyPrefixedIndex(txn, _shardKeyPattern.toBSON(), false); // requireSingleKey @@ -526,24 +533,19 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* txn } // Install the stage, which will listen for notifications on the collection - { - stdx::lock_guard<stdx::mutex> sl(_mutex); - - invariant(!_deleteNotifyExec); - - // Takes ownership of 'ws' and 'dns'. - auto statusWithPlanExecutor = - PlanExecutor::make(txn, - stdx::make_unique<WorkingSet>(), - stdx::make_unique<DeleteNotificationStage>(this, txn), - collection, - PlanExecutor::YIELD_MANUAL); - invariant(statusWithPlanExecutor.isOK()); - - _deleteNotifyExec = std::move(statusWithPlanExecutor.getValue()); - _deleteNotifyExec->registerExec(collection); + auto statusWithDeleteNotificationPlanExecutor = + PlanExecutor::make(txn, + stdx::make_unique<WorkingSet>(), + stdx::make_unique<DeleteNotificationStage>(this, txn), + collection, + PlanExecutor::YIELD_MANUAL); + if (!statusWithDeleteNotificationPlanExecutor.isOK()) { + return statusWithDeleteNotificationPlanExecutor.getStatus(); } + _deleteNotifyExec = std::move(statusWithDeleteNotificationPlanExecutor.getValue()); + _deleteNotifyExec->registerExec(collection); + // Assume both min and max non-empty, append MinKey's to make them fit chosen index const KeyPattern kp(idx->keyPattern()); @@ -607,7 +609,7 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* txn << WorkingSetCommon::toStatusString(obj)}; } - exec.reset(); + const uint64_t collectionAverageObjectSize = collection->averageObjectSize(txn); if (isLargeChunk) { return { @@ -629,7 +631,8 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* txn << _args.getMaxKey()}; } - _averageObjectSizeForCloneLocs = static_cast<uint64_t>(collection->averageObjectSize(txn) + 12); + stdx::lock_guard<stdx::mutex> lk(_mutex); + _averageObjectSizeForCloneLocs = collectionAverageObjectSize + 12; return Status::OK(); } diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h index df14e57b154..c683df2be29 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h @@ -125,6 +125,9 @@ private: friend class DeleteNotificationStage; friend class LogOpForShardingHandler; + // Represents the states in which the cloner can be + enum State { kNew, kCloning, kDone }; + /** * Idempotent method, which cleans up any previously initialized state. It is safe to be called * at any time, but no methods should be called after it. @@ -183,24 +186,24 @@ private: // Protects the entries below stdx::mutex _mutex; - // Inidicates whether commit or cancel have already been called and ensures that we do not - // double commit or double cancel - bool _cloneCompleted{false}; + // The current state of the cloner + State _state{kNew}; // List of record ids that needs to be transferred (initial clone) std::set<RecordId> _cloneLocs; // The estimated average object size during the clone phase. Used for buffer size - // pre-allocation. + // pre-allocation (initial clone). uint64_t _averageObjectSizeForCloneLocs{0}; - // List of _id of documents that were modified that must be re-cloned. + // List of _id of documents that were modified that must be re-cloned (xfer mods) std::list<BSONObj> _reload; - // List of _id of documents that were deleted during clone that should be deleted later. + // List of _id of documents that were deleted during clone that should be deleted later (xfer + // mods) std::list<BSONObj> _deleted; - // Total bytes in _reload + _deleted + // Total bytes in _reload + _deleted (xfer mods) uint64_t _memoryUsed{0}; }; diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp new file mode 100644 index 00000000000..403a951c20e --- /dev/null +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp @@ -0,0 +1,367 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/client/remote_command_targeter_mock.h" +#include "mongo/db/catalog/collection.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/repl/replication_coordinator_mock.h" +#include "mongo/db/s/migration_chunk_cloner_source_legacy.h" +#include "mongo/s/catalog/sharding_catalog_client_mock.h" +#include "mongo/s/catalog/type_shard.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/sharding_mongod_test_fixture.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +using executor::RemoteCommandRequest; +using unittest::assertGet; + +const NamespaceString kNss("TestDB", "TestColl"); +const BSONObj kShardKeyPattern{BSON("X" << 1)}; +const ConnectionString kConfigConnStr = + ConnectionString::forReplicaSet("Donor", + {HostAndPort("DonorHost1:1234"), + HostAndPort{"DonorHost2:1234"}, + HostAndPort{"DonorHost3:1234"}}); +const ConnectionString kDonorConnStr = + ConnectionString::forReplicaSet("Donor", + {HostAndPort("DonorHost1:1234"), + HostAndPort{"DonorHost2:1234"}, + HostAndPort{"DonorHost3:1234"}}); +const ConnectionString kRecipientConnStr = + ConnectionString::forReplicaSet("Recipient", + {HostAndPort("RecipientHost1:1234"), + HostAndPort("RecipientHost2:1234"), + HostAndPort("RecipientHost3:1234")}); + +class MigrationChunkClonerSourceLegacyTest : public ShardingMongodTestFixture { +protected: + void setUp() override { + serverGlobalParams.clusterRole = ClusterRole::ShardServer; + ShardingMongodTestFixture::setUp(); + + // TODO: SERVER-26919 set the flag on the mock repl coordinator just for the window where it + // actually needs to bypass the op observer. + replicationCoordinator()->alwaysAllowWrites(true); + + ASSERT_OK(initializeGlobalShardingStateForMongodForTest(kConfigConnStr)); + + _client.emplace(operationContext()); + + RemoteCommandTargeterMock::get(shardRegistry()->getConfigShard()->getTargeter()) + ->setConnectionStringReturnValue(kConfigConnStr); + + { + auto donorShard = assertGet( + shardRegistry()->getShard(operationContext(), kDonorConnStr.getSetName())); + RemoteCommandTargeterMock::get(donorShard->getTargeter()) + ->setConnectionStringReturnValue(kDonorConnStr); + RemoteCommandTargeterMock::get(donorShard->getTargeter()) + ->setFindHostReturnValue(kDonorConnStr.getServers()[0]); + } + + { + auto recipientShard = assertGet( + shardRegistry()->getShard(operationContext(), kRecipientConnStr.getSetName())); + RemoteCommandTargeterMock::get(recipientShard->getTargeter()) + ->setConnectionStringReturnValue(kRecipientConnStr); + RemoteCommandTargeterMock::get(recipientShard->getTargeter()) + ->setFindHostReturnValue(kRecipientConnStr.getServers()[0]); + } + } + + void tearDown() override { + _client.reset(); + + ShardingMongodTestFixture::tearDown(); + } + + /** + * Returns the DBDirectClient instance to use for writes to the database. + */ + DBDirectClient* client() { + invariant(_client); + return _client.get_ptr(); + } + + /** + * Creates a collection, which contains an index corresponding to kShardKeyPattern and insers + * the specified initial documents. + */ + void createShardedCollection(std::vector<BSONObj> initialDocs) { + ASSERT(_client->createCollection(kNss.ns())); + _client->createIndex(kNss.ns(), kShardKeyPattern); + + if (!initialDocs.empty()) { + _client->insert(kNss.ns(), initialDocs); + } + } + + /** + * Shortcut to create BSON represenation of a moveChunk request for the specified range with + * fixed kDonorConnStr and kRecipientConnStr, respectively. + */ + static MoveChunkRequest createMoveChunkRequest(const ChunkRange& chunkRange) { + BSONObjBuilder cmdBuilder; + MoveChunkRequest::appendAsCommand( + &cmdBuilder, + kNss, + ChunkVersion(1, 0, OID::gen()), + kConfigConnStr, + kDonorConnStr.getSetName(), + kRecipientConnStr.getSetName(), + chunkRange, + ChunkVersion(1, 0, OID::gen()), + 1024 * 1024, + MigrationSecondaryThrottleOptions::create(MigrationSecondaryThrottleOptions::kDefault), + false, + false); + + return assertGet(MoveChunkRequest::createFromCommand(kNss, cmdBuilder.obj())); + } + + /** + * Instantiates a BSON object in which both "_id" and "X" are set to value. + */ + static BSONObj createCollectionDocument(int value) { + return BSON("_id" << value << "X" << value); + } + +private: + std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient( + std::unique_ptr<DistLockManager> distLockManager) override { + class StaticCatalogClient final : public ShardingCatalogClientMock { + public: + StaticCatalogClient() : ShardingCatalogClientMock(nullptr) {} + + StatusWith<repl::OpTimeWith<std::vector<ShardType>>> getAllShards( + OperationContext* txn, repl::ReadConcernLevel readConcern) override { + + ShardType donorShard; + donorShard.setName(kDonorConnStr.getSetName()); + donorShard.setHost(kDonorConnStr.toString()); + + ShardType recipientShard; + recipientShard.setName(kRecipientConnStr.getSetName()); + recipientShard.setHost(kRecipientConnStr.toString()); + + return repl::OpTimeWith<std::vector<ShardType>>({donorShard, recipientShard}); + } + }; + + return stdx::make_unique<StaticCatalogClient>(); + } + + boost::optional<DBDirectClient> _client; +}; + +TEST_F(MigrationChunkClonerSourceLegacyTest, CorrectDocumentsFetched) { + const std::vector<BSONObj> contents = {createCollectionDocument(99), + createCollectionDocument(100), + createCollectionDocument(199), + createCollectionDocument(200)}; + + createShardedCollection(contents); + + MigrationChunkClonerSourceLegacy cloner( + createMoveChunkRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200))), + kShardKeyPattern, + kDonorConnStr, + kRecipientConnStr.getServers()[0]); + + auto futureStart = launchAsync([&]() { + onCommand([&](const RemoteCommandRequest& request) { return BSON("ok" << true); }); + }); + + ASSERT_OK(cloner.startClone(operationContext())); + + // Ensure the initial clone documents are available + { + AutoGetCollection autoColl(operationContext(), kNss, MODE_IS); + + { + BSONArrayBuilder arrBuilder; + ASSERT_OK( + cloner.nextCloneBatch(operationContext(), autoColl.getCollection(), &arrBuilder)); + ASSERT_EQ(2, arrBuilder.arrSize()); + + const auto arr = arrBuilder.arr(); + ASSERT_BSONOBJ_EQ(contents[1], arr[0].Obj()); + ASSERT_BSONOBJ_EQ(contents[2], arr[1].Obj()); + } + + { + BSONArrayBuilder arrBuilder; + ASSERT_OK( + cloner.nextCloneBatch(operationContext(), autoColl.getCollection(), &arrBuilder)); + ASSERT_EQ(0, arrBuilder.arrSize()); + } + } + + // Insert some documents in the chunk range to be included for migration + client()->insert(kNss.ns(), createCollectionDocument(150)); + client()->insert(kNss.ns(), createCollectionDocument(151)); + + // Insert some documents which are outside of the chunk range and should not be included for + // migration + client()->insert(kNss.ns(), createCollectionDocument(90)); + client()->insert(kNss.ns(), createCollectionDocument(210)); + + // Normally the insert above and the onInsert/onDelete callbacks below will happen under the + // same lock and write unit of work + { + AutoGetCollection autoColl(operationContext(), kNss, MODE_IX); + + WriteUnitOfWork wuow(operationContext()); + + cloner.onInsertOp(operationContext(), createCollectionDocument(90)); + cloner.onInsertOp(operationContext(), createCollectionDocument(150)); + cloner.onInsertOp(operationContext(), createCollectionDocument(151)); + cloner.onInsertOp(operationContext(), createCollectionDocument(210)); + + cloner.onDeleteOp(operationContext(), createCollectionDocument(80)); + cloner.onDeleteOp(operationContext(), createCollectionDocument(199)); + cloner.onDeleteOp(operationContext(), createCollectionDocument(220)); + + wuow.commit(); + } + + { + AutoGetCollection autoColl(operationContext(), kNss, MODE_IS); + + { + BSONArrayBuilder arrBuilder; + ASSERT_OK( + cloner.nextCloneBatch(operationContext(), autoColl.getCollection(), &arrBuilder)); + ASSERT_EQ(0, arrBuilder.arrSize()); + } + + { + BSONObjBuilder modsBuilder; + ASSERT_OK(cloner.nextModsBatch(operationContext(), autoColl.getDb(), &modsBuilder)); + + const auto modsObj = modsBuilder.obj(); + ASSERT_EQ(2U, modsObj["reload"].Array().size()); + ASSERT_BSONOBJ_EQ(createCollectionDocument(150), modsObj["reload"].Array()[0].Obj()); + ASSERT_BSONOBJ_EQ(createCollectionDocument(151), modsObj["reload"].Array()[1].Obj()); + + // The legacy chunk cloner cannot filter out deletes because we don't preserve the shard + // key on delete + ASSERT_EQ(3U, modsObj["deleted"].Array().size()); + ASSERT_BSONOBJ_EQ(BSON("_id" << 80), modsObj["deleted"].Array()[0].Obj()); + ASSERT_BSONOBJ_EQ(BSON("_id" << 199), modsObj["deleted"].Array()[1].Obj()); + ASSERT_BSONOBJ_EQ(BSON("_id" << 220), modsObj["deleted"].Array()[2].Obj()); + } + } + + auto futureCommit = launchAsync([&]() { + onCommand([&](const RemoteCommandRequest& request) { return BSON("ok" << true); }); + }); + + ASSERT_OK(cloner.commitClone(operationContext())); +} + +TEST_F(MigrationChunkClonerSourceLegacyTest, CollectionNotFound) { + MigrationChunkClonerSourceLegacy cloner( + createMoveChunkRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200))), + kShardKeyPattern, + kDonorConnStr, + kRecipientConnStr.getServers()[0]); + + ASSERT_NOT_OK(cloner.startClone(operationContext())); + cloner.cancelClone(operationContext()); +} + +TEST_F(MigrationChunkClonerSourceLegacyTest, ShardKeyIndexNotFound) { + ASSERT(client()->createCollection(kNss.ns())); + + MigrationChunkClonerSourceLegacy cloner( + createMoveChunkRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200))), + kShardKeyPattern, + kDonorConnStr, + kRecipientConnStr.getServers()[0]); + + ASSERT_NOT_OK(cloner.startClone(operationContext())); + cloner.cancelClone(operationContext()); +} + +TEST_F(MigrationChunkClonerSourceLegacyTest, FailedToEngageRecipientShard) { + const std::vector<BSONObj> contents = {createCollectionDocument(99), + createCollectionDocument(100), + createCollectionDocument(199), + createCollectionDocument(200)}; + + createShardedCollection(contents); + + MigrationChunkClonerSourceLegacy cloner( + createMoveChunkRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200))), + kShardKeyPattern, + kDonorConnStr, + kRecipientConnStr.getServers()[0]); + + auto future = launchAsync([&]() { + onCommand([&](const RemoteCommandRequest& request) { + return Status(ErrorCodes::NetworkTimeout, "Did not receive confirmation from donor"); + }); + }); + + auto startCloneStatus = cloner.startClone(operationContext()); + ASSERT_EQ(ErrorCodes::NetworkTimeout, startCloneStatus.code()); + + // Ensure that if the recipient tries to fetch some documents, the cloner won't crash + { + AutoGetCollection autoColl(operationContext(), kNss, MODE_IS); + + { + BSONArrayBuilder arrBuilder; + ASSERT_OK( + cloner.nextCloneBatch(operationContext(), autoColl.getCollection(), &arrBuilder)); + ASSERT_EQ(2, arrBuilder.arrSize()); + } + + { + BSONArrayBuilder arrBuilder; + ASSERT_OK( + cloner.nextCloneBatch(operationContext(), autoColl.getCollection(), &arrBuilder)); + ASSERT_EQ(0, arrBuilder.arrSize()); + } + } + + // Cancel clone should not send a cancellation request to the donor because we failed to engage + // it (see comment in the startClone method) + cloner.cancelClone(operationContext()); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index c51ea96696e..5a649de1bb3 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -464,7 +464,7 @@ void MigrationSourceManager::cleanupOnError(OperationContext* txn) { void MigrationSourceManager::_cleanup(OperationContext* txn) { invariant(_state != kDone); - { + auto cloneDriver = [&]() { // Unregister from the collection's sharding state ScopedTransaction scopedXact(txn, MODE_IX); AutoGetCollection autoColl(txn, getNss(), MODE_IX, MODE_X); @@ -479,7 +479,9 @@ void MigrationSourceManager::_cleanup(OperationContext* txn) { if (_critSecSignal) { _critSecSignal->set(); } - } + + return std::move(_cloneDriver); + }(); // Decrement the metadata op counter outside of the collection lock in order to hold it for as // short as possible. @@ -487,9 +489,8 @@ void MigrationSourceManager::_cleanup(OperationContext* txn) { ShardingStateRecovery::endMetadataOp(txn); } - if (_cloneDriver) { - _cloneDriver->cancelClone(txn); - _cloneDriver.reset(); + if (cloneDriver) { + cloneDriver->cancelClone(txn); } _state = kDone; |