summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2016-11-04 15:44:36 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2016-12-05 09:43:27 -0500
commit7ca7bcbfb746ef945afcda91569ea34a2c2738ec (patch)
treef95ac88b3eb70b7a72009e2f4fd8fbe0eefc1964
parentc09c4d6aae7eb22bfa0981c3085c9319f83bc4de (diff)
downloadmongo-7ca7bcbfb746ef945afcda91569ea34a2c2738ec.tar.gz
SERVER-22657,SERVER-27215 Unit-tests for MigrationChunkClonerSourceLegacy
(cherry picked from commit ee84af83abda81bdd5c1fdf2d831c4979d739f8c) (cherry picked from commit 39c985be5b049edbbb1289eba2bb6f4b44fcb00e)
-rw-r--r--src/mongo/db/s/SConscript4
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source.h12
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp113
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.h17
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp367
-rw-r--r--src/mongo/db/s/migration_source_manager.cpp11
6 files changed, 451 insertions, 73 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index aa7365abcb6..53e74c1d5e3 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -202,13 +202,15 @@ env.CppUnitTest(
)
env.CppUnitTest(
- target='sharding_state_test',
+ target='shard_test',
source=[
'active_migrations_registry_test.cpp',
'metadata_manager_test.cpp',
+ 'migration_chunk_cloner_source_legacy_test.cpp',
'sharding_state_test.cpp',
],
LIBDEPS=[
+ '$BUILD_DIR/mongo/s/catalog/sharding_catalog_mock',
'$BUILD_DIR/mongo/s/sharding_mongod_test_fixture',
],
)
diff --git a/src/mongo/db/s/migration_chunk_cloner_source.h b/src/mongo/db/s/migration_chunk_cloner_source.h
index 62ce24350b5..04cf9e36df2 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source.h
@@ -38,9 +38,10 @@ class OperationContext;
class Status;
/**
- * This state machine is responsible for the actual movement of chunk documents from donor to a
- * recipient shard. Its lifetime is owned and controlled by a single migration source manager which
- * registers it for notifications from the replication subsystem.
+ * This class is responsible for producing chunk documents to be moved from donor to a recipient
+ * shard and its methods represent cloning stages. Its lifetime is owned and controlled by a single
+ * migration source manager which registers it for notifications from the replication subsystem
+ * before calling startClone.
*
* Unless explicitly indicated, the methods on this class are not thread-safe.
*
@@ -48,7 +49,7 @@ class Status;
* it begins receiving notifications from the replication subsystem through the
* on[insert/update/delete]Op methods. It is up to the creator to decide how these methods end up
* being called, but currently this is done through the CollectionShardingState. The creator then
- * kicks off the migration as soon as possible by calling startClone.
+ * kicks off the cloning as soon as possible by calling startClone.
*/
class MigrationChunkClonerSource {
MONGO_DISALLOW_COPYING(MigrationChunkClonerSource);
@@ -61,7 +62,8 @@ public:
* the recipient shard to start cloning. Before calling this method, this chunk cloner must be
* registered for notifications from the replication subsystem (not checked here).
*
- * NOTE: Must be called without any locks and must succeed, before any other methods are called.
+ * NOTE: Must be called without any locks and must succeed, before any other methods are called
+ * (except for cancelClone and [insert/update/delete]Op).
*/
virtual Status startClone(OperationContext* txn) = 0;
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index 1c93030657b..90714cdb71b 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -142,16 +142,14 @@ public:
stdx::lock_guard<stdx::mutex> sl(_cloner->_mutex);
_cloner->_deleted.push_back(_idObj);
_cloner->_memoryUsed += _idObj.firstElement().size() + 5;
- break;
- }
+ } break;
case 'i':
case 'u': {
stdx::lock_guard<stdx::mutex> sl(_cloner->_mutex);
_cloner->_reload.push_back(_idObj);
_cloner->_memoryUsed += _idObj.firstElement().size() + 5;
- break;
- }
+ } break;
default:
MONGO_UNREACHABLE;
@@ -178,23 +176,18 @@ MigrationChunkClonerSourceLegacy::MigrationChunkClonerSourceLegacy(MoveChunkRequ
_recipientHost(std::move(recipientHost)) {}
MigrationChunkClonerSourceLegacy::~MigrationChunkClonerSourceLegacy() {
+ invariant(_state == kDone);
invariant(!_deleteNotifyExec);
}
Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* txn) {
+ invariant(_state == kNew);
invariant(!txn->lockState()->isLocked());
- // TODO (Kal): This can be changed to cancelClone after 3.4 is released. The reason to only do
- // internal cleanup in 3.4 is for backwards compatibility with 3.2 nodes, which cannot
- // differentiate between cancellations for different migration sessions. It is thus possible
- // that a second migration from different donor, but the same recipient would certainly abort an
- // already running migration.
- auto scopedGuard = MakeGuard([&] { _cleanup(txn); });
-
- // Prepare the currently available documents
- Status status = _storeCurrentLocs(txn);
- if (!status.isOK()) {
- return status;
+ // Load the ids of the currently available documents
+ auto storeCurrentLocsStatus = _storeCurrentLocs(txn);
+ if (!storeCurrentLocsStatus.isOK()) {
+ return storeCurrentLocsStatus;
}
// Tell the recipient shard to start cloning
@@ -216,14 +209,22 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* txn) {
return startChunkCloneResponseStatus.getStatus();
}
- scopedGuard.Dismiss();
+ // TODO (Kal): Setting the state to kCloning below means that if cancelClone was called we will
+ // send a cancellation command to the recipient. The reason to limit the cases when we send
+ // cancellation is for backwards compatibility with 3.2 nodes, which cannot differentiate
+ // between cancellations for different migration sessions. It is thus possible that a second
+ // migration from different donor, but the same recipient would certainly abort an already
+ // running migration.
+ stdx::lock_guard<stdx::mutex> sl(_mutex);
+ _state = kCloning;
+
return Status::OK();
}
Status MigrationChunkClonerSourceLegacy::awaitUntilCriticalSectionIsAppropriate(
OperationContext* txn, Milliseconds maxTimeToWait) {
+ invariant(_state == kCloning);
invariant(!txn->lockState()->isLocked());
- auto scopedGuard = MakeGuard([&] { cancelClone(txn); });
const auto startTime = Date_t::now();
@@ -261,7 +262,6 @@ Status MigrationChunkClonerSourceLegacy::awaitUntilCriticalSectionIsAppropriate(
<< " documents remaining"};
}
- scopedGuard.Dismiss();
return Status::OK();
}
@@ -302,18 +302,13 @@ Status MigrationChunkClonerSourceLegacy::awaitUntilCriticalSectionIsAppropriate(
}
}
- scopedGuard.Dismiss();
return {ErrorCodes::ExceededTimeLimit, "Timed out waiting for the cloner to catch up"};
}
Status MigrationChunkClonerSourceLegacy::commitClone(OperationContext* txn) {
+ invariant(_state == kCloning);
invariant(!txn->lockState()->isLocked());
- {
- stdx::lock_guard<stdx::mutex> sl(_mutex);
- invariant(!_cloneCompleted);
- }
-
auto responseStatus =
_callRecipient(createRequestWithSessionId(kRecvChunkCommit, _args.getNss(), _sessionId));
if (responseStatus.isOK()) {
@@ -328,14 +323,18 @@ Status MigrationChunkClonerSourceLegacy::commitClone(OperationContext* txn) {
void MigrationChunkClonerSourceLegacy::cancelClone(OperationContext* txn) {
invariant(!txn->lockState()->isLocked());
- {
- stdx::lock_guard<stdx::mutex> sl(_mutex);
- if (_cloneCompleted)
- return;
+ switch (_state) {
+ case kDone:
+ break;
+ case kCloning:
+ _callRecipient(createRequestWithSessionId(kRecvChunkAbort, _args.getNss(), _sessionId));
+ // Intentional fall through
+ case kNew:
+ _cleanup(txn);
+ break;
+ default:
+ MONGO_UNREACHABLE;
}
-
- _callRecipient(createRequestWithSessionId(kRecvChunkAbort, _args.getNss(), _sessionId));
- _cleanup(txn);
}
bool MigrationChunkClonerSourceLegacy::isDocumentInMigratingChunk(OperationContext* txn,
@@ -435,6 +434,12 @@ Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* txn,
_cloneLocs.erase(_cloneLocs.begin(), it);
+ // If we have drained all the cloned data, there is no need to keep the delete notify executor
+ // around
+ if (_cloneLocs.empty()) {
+ _deleteNotifyExec.reset();
+ }
+
return Status::OK();
}
@@ -461,13 +466,15 @@ Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* txn,
void MigrationChunkClonerSourceLegacy::_cleanup(OperationContext* txn) {
{
stdx::lock_guard<stdx::mutex> sl(_mutex);
- _cloneCompleted = true;
+ _state = kDone;
+ _reload.clear();
+ _deleted.clear();
}
- ScopedTransaction scopedXact(txn, MODE_IS);
- AutoGetCollection autoColl(txn, _args.getNss(), MODE_IS);
-
if (_deleteNotifyExec) {
+ ScopedTransaction scopedXact(txn, MODE_IS);
+ AutoGetCollection autoColl(txn, _args.getNss(), MODE_IS);
+
_deleteNotifyExec.reset();
}
}
@@ -476,7 +483,7 @@ StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::_callRecipient(const BSONO
executor::RemoteCommandResponse responseStatus(
Status{ErrorCodes::InternalError, "Uninitialized value"});
- auto executor = grid.getExecutorPool()->getArbitraryExecutor();
+ auto executor = grid.getExecutorPool()->getFixedExecutor();
auto scheduleStatus = executor->scheduleRemoteCommand(
executor::RemoteCommandRequest(_recipientHost, "admin", cmdObj, nullptr),
[&responseStatus](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) {
@@ -514,7 +521,7 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* txn
// Allow multiKey based on the invariant that shard keys must be single-valued. Therefore, any
// multi-key index prefixed by shard key cannot be multikey over the shard key fields.
- IndexDescriptor* idx =
+ IndexDescriptor* const idx =
collection->getIndexCatalog()->findShardKeyPrefixedIndex(txn,
_shardKeyPattern.toBSON(),
false); // requireSingleKey
@@ -526,24 +533,19 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* txn
}
// Install the stage, which will listen for notifications on the collection
- {
- stdx::lock_guard<stdx::mutex> sl(_mutex);
-
- invariant(!_deleteNotifyExec);
-
- // Takes ownership of 'ws' and 'dns'.
- auto statusWithPlanExecutor =
- PlanExecutor::make(txn,
- stdx::make_unique<WorkingSet>(),
- stdx::make_unique<DeleteNotificationStage>(this, txn),
- collection,
- PlanExecutor::YIELD_MANUAL);
- invariant(statusWithPlanExecutor.isOK());
-
- _deleteNotifyExec = std::move(statusWithPlanExecutor.getValue());
- _deleteNotifyExec->registerExec(collection);
+ auto statusWithDeleteNotificationPlanExecutor =
+ PlanExecutor::make(txn,
+ stdx::make_unique<WorkingSet>(),
+ stdx::make_unique<DeleteNotificationStage>(this, txn),
+ collection,
+ PlanExecutor::YIELD_MANUAL);
+ if (!statusWithDeleteNotificationPlanExecutor.isOK()) {
+ return statusWithDeleteNotificationPlanExecutor.getStatus();
}
+ _deleteNotifyExec = std::move(statusWithDeleteNotificationPlanExecutor.getValue());
+ _deleteNotifyExec->registerExec(collection);
+
// Assume both min and max non-empty, append MinKey's to make them fit chosen index
const KeyPattern kp(idx->keyPattern());
@@ -607,7 +609,7 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* txn
<< WorkingSetCommon::toStatusString(obj)};
}
- exec.reset();
+ const uint64_t collectionAverageObjectSize = collection->averageObjectSize(txn);
if (isLargeChunk) {
return {
@@ -629,7 +631,8 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* txn
<< _args.getMaxKey()};
}
- _averageObjectSizeForCloneLocs = static_cast<uint64_t>(collection->averageObjectSize(txn) + 12);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _averageObjectSizeForCloneLocs = collectionAverageObjectSize + 12;
return Status::OK();
}
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
index df14e57b154..c683df2be29 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
@@ -125,6 +125,9 @@ private:
friend class DeleteNotificationStage;
friend class LogOpForShardingHandler;
+ // Represents the states in which the cloner can be
+ enum State { kNew, kCloning, kDone };
+
/**
* Idempotent method, which cleans up any previously initialized state. It is safe to be called
* at any time, but no methods should be called after it.
@@ -183,24 +186,24 @@ private:
// Protects the entries below
stdx::mutex _mutex;
- // Inidicates whether commit or cancel have already been called and ensures that we do not
- // double commit or double cancel
- bool _cloneCompleted{false};
+ // The current state of the cloner
+ State _state{kNew};
// List of record ids that needs to be transferred (initial clone)
std::set<RecordId> _cloneLocs;
// The estimated average object size during the clone phase. Used for buffer size
- // pre-allocation.
+ // pre-allocation (initial clone).
uint64_t _averageObjectSizeForCloneLocs{0};
- // List of _id of documents that were modified that must be re-cloned.
+ // List of _id of documents that were modified that must be re-cloned (xfer mods)
std::list<BSONObj> _reload;
- // List of _id of documents that were deleted during clone that should be deleted later.
+ // List of _id of documents that were deleted during clone that should be deleted later (xfer
+ // mods)
std::list<BSONObj> _deleted;
- // Total bytes in _reload + _deleted
+ // Total bytes in _reload + _deleted (xfer mods)
uint64_t _memoryUsed{0};
};
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp
new file mode 100644
index 00000000000..403a951c20e
--- /dev/null
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp
@@ -0,0 +1,367 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/client/remote_command_targeter_mock.h"
+#include "mongo/db/catalog/collection.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/repl/replication_coordinator_mock.h"
+#include "mongo/db/s/migration_chunk_cloner_source_legacy.h"
+#include "mongo/s/catalog/sharding_catalog_client_mock.h"
+#include "mongo/s/catalog/type_shard.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/sharding_mongod_test_fixture.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+using executor::RemoteCommandRequest;
+using unittest::assertGet;
+
+const NamespaceString kNss("TestDB", "TestColl");
+const BSONObj kShardKeyPattern{BSON("X" << 1)};
+const ConnectionString kConfigConnStr =
+ ConnectionString::forReplicaSet("Donor",
+ {HostAndPort("DonorHost1:1234"),
+ HostAndPort{"DonorHost2:1234"},
+ HostAndPort{"DonorHost3:1234"}});
+const ConnectionString kDonorConnStr =
+ ConnectionString::forReplicaSet("Donor",
+ {HostAndPort("DonorHost1:1234"),
+ HostAndPort{"DonorHost2:1234"},
+ HostAndPort{"DonorHost3:1234"}});
+const ConnectionString kRecipientConnStr =
+ ConnectionString::forReplicaSet("Recipient",
+ {HostAndPort("RecipientHost1:1234"),
+ HostAndPort("RecipientHost2:1234"),
+ HostAndPort("RecipientHost3:1234")});
+
+class MigrationChunkClonerSourceLegacyTest : public ShardingMongodTestFixture {
+protected:
+ void setUp() override {
+ serverGlobalParams.clusterRole = ClusterRole::ShardServer;
+ ShardingMongodTestFixture::setUp();
+
+ // TODO: SERVER-26919 set the flag on the mock repl coordinator just for the window where it
+ // actually needs to bypass the op observer.
+ replicationCoordinator()->alwaysAllowWrites(true);
+
+ ASSERT_OK(initializeGlobalShardingStateForMongodForTest(kConfigConnStr));
+
+ _client.emplace(operationContext());
+
+ RemoteCommandTargeterMock::get(shardRegistry()->getConfigShard()->getTargeter())
+ ->setConnectionStringReturnValue(kConfigConnStr);
+
+ {
+ auto donorShard = assertGet(
+ shardRegistry()->getShard(operationContext(), kDonorConnStr.getSetName()));
+ RemoteCommandTargeterMock::get(donorShard->getTargeter())
+ ->setConnectionStringReturnValue(kDonorConnStr);
+ RemoteCommandTargeterMock::get(donorShard->getTargeter())
+ ->setFindHostReturnValue(kDonorConnStr.getServers()[0]);
+ }
+
+ {
+ auto recipientShard = assertGet(
+ shardRegistry()->getShard(operationContext(), kRecipientConnStr.getSetName()));
+ RemoteCommandTargeterMock::get(recipientShard->getTargeter())
+ ->setConnectionStringReturnValue(kRecipientConnStr);
+ RemoteCommandTargeterMock::get(recipientShard->getTargeter())
+ ->setFindHostReturnValue(kRecipientConnStr.getServers()[0]);
+ }
+ }
+
+ void tearDown() override {
+ _client.reset();
+
+ ShardingMongodTestFixture::tearDown();
+ }
+
+ /**
+ * Returns the DBDirectClient instance to use for writes to the database.
+ */
+ DBDirectClient* client() {
+ invariant(_client);
+ return _client.get_ptr();
+ }
+
+ /**
+ * Creates a collection, which contains an index corresponding to kShardKeyPattern and insers
+ * the specified initial documents.
+ */
+ void createShardedCollection(std::vector<BSONObj> initialDocs) {
+ ASSERT(_client->createCollection(kNss.ns()));
+ _client->createIndex(kNss.ns(), kShardKeyPattern);
+
+ if (!initialDocs.empty()) {
+ _client->insert(kNss.ns(), initialDocs);
+ }
+ }
+
+ /**
+ * Shortcut to create BSON represenation of a moveChunk request for the specified range with
+ * fixed kDonorConnStr and kRecipientConnStr, respectively.
+ */
+ static MoveChunkRequest createMoveChunkRequest(const ChunkRange& chunkRange) {
+ BSONObjBuilder cmdBuilder;
+ MoveChunkRequest::appendAsCommand(
+ &cmdBuilder,
+ kNss,
+ ChunkVersion(1, 0, OID::gen()),
+ kConfigConnStr,
+ kDonorConnStr.getSetName(),
+ kRecipientConnStr.getSetName(),
+ chunkRange,
+ ChunkVersion(1, 0, OID::gen()),
+ 1024 * 1024,
+ MigrationSecondaryThrottleOptions::create(MigrationSecondaryThrottleOptions::kDefault),
+ false,
+ false);
+
+ return assertGet(MoveChunkRequest::createFromCommand(kNss, cmdBuilder.obj()));
+ }
+
+ /**
+ * Instantiates a BSON object in which both "_id" and "X" are set to value.
+ */
+ static BSONObj createCollectionDocument(int value) {
+ return BSON("_id" << value << "X" << value);
+ }
+
+private:
+ std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient(
+ std::unique_ptr<DistLockManager> distLockManager) override {
+ class StaticCatalogClient final : public ShardingCatalogClientMock {
+ public:
+ StaticCatalogClient() : ShardingCatalogClientMock(nullptr) {}
+
+ StatusWith<repl::OpTimeWith<std::vector<ShardType>>> getAllShards(
+ OperationContext* txn, repl::ReadConcernLevel readConcern) override {
+
+ ShardType donorShard;
+ donorShard.setName(kDonorConnStr.getSetName());
+ donorShard.setHost(kDonorConnStr.toString());
+
+ ShardType recipientShard;
+ recipientShard.setName(kRecipientConnStr.getSetName());
+ recipientShard.setHost(kRecipientConnStr.toString());
+
+ return repl::OpTimeWith<std::vector<ShardType>>({donorShard, recipientShard});
+ }
+ };
+
+ return stdx::make_unique<StaticCatalogClient>();
+ }
+
+ boost::optional<DBDirectClient> _client;
+};
+
+TEST_F(MigrationChunkClonerSourceLegacyTest, CorrectDocumentsFetched) {
+ const std::vector<BSONObj> contents = {createCollectionDocument(99),
+ createCollectionDocument(100),
+ createCollectionDocument(199),
+ createCollectionDocument(200)};
+
+ createShardedCollection(contents);
+
+ MigrationChunkClonerSourceLegacy cloner(
+ createMoveChunkRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200))),
+ kShardKeyPattern,
+ kDonorConnStr,
+ kRecipientConnStr.getServers()[0]);
+
+ auto futureStart = launchAsync([&]() {
+ onCommand([&](const RemoteCommandRequest& request) { return BSON("ok" << true); });
+ });
+
+ ASSERT_OK(cloner.startClone(operationContext()));
+
+ // Ensure the initial clone documents are available
+ {
+ AutoGetCollection autoColl(operationContext(), kNss, MODE_IS);
+
+ {
+ BSONArrayBuilder arrBuilder;
+ ASSERT_OK(
+ cloner.nextCloneBatch(operationContext(), autoColl.getCollection(), &arrBuilder));
+ ASSERT_EQ(2, arrBuilder.arrSize());
+
+ const auto arr = arrBuilder.arr();
+ ASSERT_BSONOBJ_EQ(contents[1], arr[0].Obj());
+ ASSERT_BSONOBJ_EQ(contents[2], arr[1].Obj());
+ }
+
+ {
+ BSONArrayBuilder arrBuilder;
+ ASSERT_OK(
+ cloner.nextCloneBatch(operationContext(), autoColl.getCollection(), &arrBuilder));
+ ASSERT_EQ(0, arrBuilder.arrSize());
+ }
+ }
+
+ // Insert some documents in the chunk range to be included for migration
+ client()->insert(kNss.ns(), createCollectionDocument(150));
+ client()->insert(kNss.ns(), createCollectionDocument(151));
+
+ // Insert some documents which are outside of the chunk range and should not be included for
+ // migration
+ client()->insert(kNss.ns(), createCollectionDocument(90));
+ client()->insert(kNss.ns(), createCollectionDocument(210));
+
+ // Normally the insert above and the onInsert/onDelete callbacks below will happen under the
+ // same lock and write unit of work
+ {
+ AutoGetCollection autoColl(operationContext(), kNss, MODE_IX);
+
+ WriteUnitOfWork wuow(operationContext());
+
+ cloner.onInsertOp(operationContext(), createCollectionDocument(90));
+ cloner.onInsertOp(operationContext(), createCollectionDocument(150));
+ cloner.onInsertOp(operationContext(), createCollectionDocument(151));
+ cloner.onInsertOp(operationContext(), createCollectionDocument(210));
+
+ cloner.onDeleteOp(operationContext(), createCollectionDocument(80));
+ cloner.onDeleteOp(operationContext(), createCollectionDocument(199));
+ cloner.onDeleteOp(operationContext(), createCollectionDocument(220));
+
+ wuow.commit();
+ }
+
+ {
+ AutoGetCollection autoColl(operationContext(), kNss, MODE_IS);
+
+ {
+ BSONArrayBuilder arrBuilder;
+ ASSERT_OK(
+ cloner.nextCloneBatch(operationContext(), autoColl.getCollection(), &arrBuilder));
+ ASSERT_EQ(0, arrBuilder.arrSize());
+ }
+
+ {
+ BSONObjBuilder modsBuilder;
+ ASSERT_OK(cloner.nextModsBatch(operationContext(), autoColl.getDb(), &modsBuilder));
+
+ const auto modsObj = modsBuilder.obj();
+ ASSERT_EQ(2U, modsObj["reload"].Array().size());
+ ASSERT_BSONOBJ_EQ(createCollectionDocument(150), modsObj["reload"].Array()[0].Obj());
+ ASSERT_BSONOBJ_EQ(createCollectionDocument(151), modsObj["reload"].Array()[1].Obj());
+
+ // The legacy chunk cloner cannot filter out deletes because we don't preserve the shard
+ // key on delete
+ ASSERT_EQ(3U, modsObj["deleted"].Array().size());
+ ASSERT_BSONOBJ_EQ(BSON("_id" << 80), modsObj["deleted"].Array()[0].Obj());
+ ASSERT_BSONOBJ_EQ(BSON("_id" << 199), modsObj["deleted"].Array()[1].Obj());
+ ASSERT_BSONOBJ_EQ(BSON("_id" << 220), modsObj["deleted"].Array()[2].Obj());
+ }
+ }
+
+ auto futureCommit = launchAsync([&]() {
+ onCommand([&](const RemoteCommandRequest& request) { return BSON("ok" << true); });
+ });
+
+ ASSERT_OK(cloner.commitClone(operationContext()));
+}
+
+TEST_F(MigrationChunkClonerSourceLegacyTest, CollectionNotFound) {
+ MigrationChunkClonerSourceLegacy cloner(
+ createMoveChunkRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200))),
+ kShardKeyPattern,
+ kDonorConnStr,
+ kRecipientConnStr.getServers()[0]);
+
+ ASSERT_NOT_OK(cloner.startClone(operationContext()));
+ cloner.cancelClone(operationContext());
+}
+
+TEST_F(MigrationChunkClonerSourceLegacyTest, ShardKeyIndexNotFound) {
+ ASSERT(client()->createCollection(kNss.ns()));
+
+ MigrationChunkClonerSourceLegacy cloner(
+ createMoveChunkRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200))),
+ kShardKeyPattern,
+ kDonorConnStr,
+ kRecipientConnStr.getServers()[0]);
+
+ ASSERT_NOT_OK(cloner.startClone(operationContext()));
+ cloner.cancelClone(operationContext());
+}
+
+TEST_F(MigrationChunkClonerSourceLegacyTest, FailedToEngageRecipientShard) {
+ const std::vector<BSONObj> contents = {createCollectionDocument(99),
+ createCollectionDocument(100),
+ createCollectionDocument(199),
+ createCollectionDocument(200)};
+
+ createShardedCollection(contents);
+
+ MigrationChunkClonerSourceLegacy cloner(
+ createMoveChunkRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200))),
+ kShardKeyPattern,
+ kDonorConnStr,
+ kRecipientConnStr.getServers()[0]);
+
+ auto future = launchAsync([&]() {
+ onCommand([&](const RemoteCommandRequest& request) {
+ return Status(ErrorCodes::NetworkTimeout, "Did not receive confirmation from donor");
+ });
+ });
+
+ auto startCloneStatus = cloner.startClone(operationContext());
+ ASSERT_EQ(ErrorCodes::NetworkTimeout, startCloneStatus.code());
+
+ // Ensure that if the recipient tries to fetch some documents, the cloner won't crash
+ {
+ AutoGetCollection autoColl(operationContext(), kNss, MODE_IS);
+
+ {
+ BSONArrayBuilder arrBuilder;
+ ASSERT_OK(
+ cloner.nextCloneBatch(operationContext(), autoColl.getCollection(), &arrBuilder));
+ ASSERT_EQ(2, arrBuilder.arrSize());
+ }
+
+ {
+ BSONArrayBuilder arrBuilder;
+ ASSERT_OK(
+ cloner.nextCloneBatch(operationContext(), autoColl.getCollection(), &arrBuilder));
+ ASSERT_EQ(0, arrBuilder.arrSize());
+ }
+ }
+
+ // Cancel clone should not send a cancellation request to the donor because we failed to engage
+ // it (see comment in the startClone method)
+ cloner.cancelClone(operationContext());
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp
index c51ea96696e..5a649de1bb3 100644
--- a/src/mongo/db/s/migration_source_manager.cpp
+++ b/src/mongo/db/s/migration_source_manager.cpp
@@ -464,7 +464,7 @@ void MigrationSourceManager::cleanupOnError(OperationContext* txn) {
void MigrationSourceManager::_cleanup(OperationContext* txn) {
invariant(_state != kDone);
- {
+ auto cloneDriver = [&]() {
// Unregister from the collection's sharding state
ScopedTransaction scopedXact(txn, MODE_IX);
AutoGetCollection autoColl(txn, getNss(), MODE_IX, MODE_X);
@@ -479,7 +479,9 @@ void MigrationSourceManager::_cleanup(OperationContext* txn) {
if (_critSecSignal) {
_critSecSignal->set();
}
- }
+
+ return std::move(_cloneDriver);
+ }();
// Decrement the metadata op counter outside of the collection lock in order to hold it for as
// short as possible.
@@ -487,9 +489,8 @@ void MigrationSourceManager::_cleanup(OperationContext* txn) {
ShardingStateRecovery::endMetadataOp(txn);
}
- if (_cloneDriver) {
- _cloneDriver->cancelClone(txn);
- _cloneDriver.reset();
+ if (cloneDriver) {
+ cloneDriver->cancelClone(txn);
}
_state = kDone;