summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/s/SConscript1
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp98
-rw-r--r--src/mongo/db/s/migration_destination_manager.h8
-rw-r--r--src/mongo/db/s/migration_destination_manager_test.cpp160
4 files changed, 238 insertions, 29 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 902b29089e9..b30de109a04 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -269,6 +269,7 @@ env.CppUnitTest(
'active_move_primaries_registry_test.cpp',
'catalog_cache_loader_mock.cpp',
'migration_chunk_cloner_source_legacy_test.cpp',
+ 'migration_destination_manager_test.cpp',
'namespace_metadata_change_notifications_test.cpp',
'sharding_state_test.cpp',
'shard_server_catalog_cache_loader_test.cpp',
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index 9b72f198891..099a829b414 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -60,6 +60,8 @@
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
+#include "mongo/util/producer_consumer_queue.h"
+#include "mongo/util/scopeguard.h"
namespace mongo {
namespace {
@@ -368,6 +370,51 @@ Status MigrationDestinationManager::start(const NamespaceString& nss,
return Status::OK();
}
+void MigrationDestinationManager::cloneDocumentsFromDonor(
+ OperationContext* opCtx,
+ stdx::function<void(OperationContext*, BSONObjIterator)> insertBatchFn,
+ stdx::function<BSONObj(OperationContext*)> fetchBatchFn) {
+
+ ProducerConsumerQueue<BSONObj> batches(1);
+ stdx::thread inserterThread{[&] {
+ Client::initThreadIfNotAlready("chunkInserter");
+ auto inserterOpCtx = Client::getCurrent()->makeOperationContext();
+ auto consumerGuard = MakeGuard([&] { batches.closeConsumerEnd(); });
+ try {
+ while (true) {
+ auto nextBatch = batches.pop(inserterOpCtx.get());
+ auto arr = nextBatch["objects"].Obj();
+ if (arr.isEmpty()) {
+ return;
+ }
+ insertBatchFn(inserterOpCtx.get(), BSONObjIterator(arr));
+ }
+ } catch (...) {
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ opCtx->getServiceContext()->killOperation(opCtx, exceptionToStatus().code());
+ log() << "Batch insertion failed " << causedBy(redact(exceptionToStatus()));
+ }
+ }};
+ auto inserterThreadJoinGuard = MakeGuard([&] {
+ batches.closeProducerEnd();
+ inserterThread.join();
+ });
+
+ while (true) {
+ opCtx->checkForInterrupt();
+
+ auto res = fetchBatchFn(opCtx);
+ batches.push(res.getOwned(), opCtx);
+ auto arr = res["objects"].Obj();
+ if (arr.isEmpty()) {
+ inserterThreadJoinGuard.Dismiss();
+ inserterThread.join();
+ opCtx->checkForInterrupt();
+ break;
+ }
+ }
+}
+
Status MigrationDestinationManager::abort(const MigrationSessionId& sessionId) {
stdx::lock_guard<stdx::mutex> sl(_mutex);
@@ -462,10 +509,8 @@ void MigrationDestinationManager::_migrateThread(BSONObj min,
try {
_migrateDriver(
opCtx.get(), min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern);
- } catch (std::exception& e) {
- setStateFail(str::stream() << "migrate failed: " << redact(e.what()));
} catch (...) {
- setStateFail("migrate failed with unknown exception: UNKNOWN ERROR");
+ setStateFail(str::stream() << "migrate failed: " << redact(exceptionToStatus()));
}
if (getState() != DONE && !MONGO_FAIL_POINT(failMigrationLeaveOrphans)) {
@@ -716,32 +761,19 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
_chunkMarkedPending = true; // no lock needed, only the migrate thread looks.
- while (true) {
- BSONObj res;
- if (!conn->runCommand("admin",
- migrateCloneRequest,
- res)) { // gets array of objects to copy, in disk order
- setStateFail(str::stream() << "_migrateClone failed: " << redact(res.toString()));
- conn.done();
- return;
- }
-
- BSONObj arr = res["objects"].Obj();
- int thisTime = 0;
-
- BSONObjIterator i(arr);
- while (i.more()) {
+ auto insertBatchFn = [&](OperationContext* opCtx, BSONObjIterator docs) {
+ while (docs.more()) {
opCtx->checkForInterrupt();
if (getState() == ABORT) {
- log() << "Migration aborted while copying documents";
- return;
+ auto message = "Migration aborted while copying documents";
+ log() << message;
+ uasserted(50748, message);
}
- BSONObj docToClone = i.next().Obj();
+ BSONObj docToClone = docs.next().Obj();
{
OldClientWriteContext cx(opCtx, _nss.ns());
-
BSONObj localDoc;
if (willOverrideLocalId(opCtx,
_nss,
@@ -760,17 +792,13 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
// Exception will abort migration cleanly
uasserted(16976, errMsg);
}
-
Helpers::upsert(opCtx, _nss.ns(), docToClone, true);
}
- thisTime++;
-
{
stdx::lock_guard<stdx::mutex> statsLock(_mutex);
_numCloned++;
_clonedBytes += docToClone.objsize();
}
-
if (writeConcern.shouldWaitForOtherNodes()) {
repl::ReplicationCoordinator::StatusAndDuration replStatus =
repl::ReplicationCoordinator::get(opCtx)->awaitReplication(
@@ -785,10 +813,22 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
}
}
}
+ };
- if (thisTime == 0)
- break;
- }
+ auto fetchBatchFn = [&](OperationContext* opCtx) {
+ BSONObj res;
+ if (!conn->runCommand("admin",
+ migrateCloneRequest,
+ res)) { // gets array of objects to copy, in disk order
+ conn.done();
+ const std::string errMsg = str::stream() << "_migrateClone failed: "
+ << redact(res.toString());
+ uasserted(50747, errMsg);
+ }
+ return res;
+ };
+
+ cloneDocumentsFromDonor(opCtx, insertBatchFn, fetchBatchFn);
timing.done(3);
MONGO_FAIL_POINT_PAUSE_WHILE_SET(migrateThreadHangAtStep3);
diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h
index 290c7f2218c..79d8eaacd86 100644
--- a/src/mongo/db/s/migration_destination_manager.h
+++ b/src/mongo/db/s/migration_destination_manager.h
@@ -118,6 +118,14 @@ public:
const WriteConcernOptions& writeConcern);
/**
+ * Clones documents from a donor shard.
+ */
+ static void cloneDocumentsFromDonor(
+ OperationContext* opCtx,
+ stdx::function<void(OperationContext*, BSONObjIterator)> insertBatchFn,
+ stdx::function<BSONObj(OperationContext*)> fetchBatchFn);
+
+ /**
* Idempotent method, which causes the current ongoing migration to abort only if it has the
* specified session id. If the migration is already aborted, does nothing.
*/
diff --git a/src/mongo/db/s/migration_destination_manager_test.cpp b/src/mongo/db/s/migration_destination_manager_test.cpp
new file mode 100644
index 00000000000..6dd3d456a2f
--- /dev/null
+++ b/src/mongo/db/s/migration_destination_manager_test.cpp
@@ -0,0 +1,160 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/s/migration_destination_manager.h"
+#include "mongo/s/shard_server_test_fixture.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+using unittest::assertGet;
+
+class MigrationDestinationManagerTest : public ShardServerTestFixture {
+protected:
+ /**
+ * Instantiates a BSON object in which both "_id" and "X" are set to value.
+ */
+ static BSONObj createDocument(int value) {
+ return BSON("_id" << value << "X" << value);
+ }
+
+ /**
+ * Creates a list of documents to clone.
+ */
+ static std::vector<BSONObj> createDocumentsToClone() {
+ return {createDocument(1), createDocument(2), createDocument(3)};
+ }
+
+ /**
+ * Creates a list of documents to clone and converts it to a BSONArray.
+ */
+ static BSONArray createDocumentsToCloneArray() {
+ BSONArrayBuilder arrayBuilder;
+ for (auto& doc : createDocumentsToClone()) {
+ arrayBuilder.append(doc);
+ }
+ return arrayBuilder.arr();
+ }
+};
+
+// Tests that documents will ferry from the fetch logic to the insert logic successfully.
+TEST_F(MigrationDestinationManagerTest, CloneDocumentsFromDonorWorksCorrectly) {
+ bool ranOnce = false;
+
+ auto fetchBatchFn = [&](OperationContext* opCtx) {
+ BSONObjBuilder fetchBatchResultBuilder;
+
+ if (ranOnce) {
+ fetchBatchResultBuilder.append("objects", BSONObj());
+ } else {
+ ranOnce = true;
+ fetchBatchResultBuilder.append("objects", createDocumentsToCloneArray());
+ }
+
+ return fetchBatchResultBuilder.obj();
+ };
+
+ std::vector<BSONObj> resultDocs;
+
+ auto insertBatchFn = [&](OperationContext* opCtx, BSONObjIterator docs) {
+ while (docs.more()) {
+ resultDocs.push_back(docs.next().Obj().getOwned());
+ }
+ };
+
+ MigrationDestinationManager::cloneDocumentsFromDonor(
+ operationContext(), insertBatchFn, fetchBatchFn);
+
+ std::vector<BSONObj> originalDocs = createDocumentsToClone();
+
+ ASSERT_EQ(originalDocs.size(), resultDocs.size());
+
+ for (auto originalDocsIt = originalDocs.begin(), resultDocsIt = resultDocs.begin();
+ originalDocsIt != originalDocs.end() && resultDocsIt != resultDocs.end();
+ ++originalDocsIt, ++resultDocsIt) {
+ ASSERT_BSONOBJ_EQ(*originalDocsIt, *resultDocsIt);
+ }
+}
+
+// Tests that an exception in the fetch logic will successfully throw an exception on the main
+// thread.
+TEST_F(MigrationDestinationManagerTest, CloneDocumentsThrowsFetchErrors) {
+ bool ranOnce = false;
+
+ auto fetchBatchFn = [&](OperationContext* opCtx) {
+ BSONObjBuilder fetchBatchResultBuilder;
+
+ if (ranOnce) {
+ uasserted(ErrorCodes::NetworkTimeout, "network error");
+ }
+
+ ranOnce = true;
+ fetchBatchResultBuilder.append("objects", createDocumentsToCloneArray());
+
+ return fetchBatchResultBuilder.obj();
+ };
+
+ auto insertBatchFn = [&](OperationContext* opCtx, BSONObjIterator docs) {};
+
+ ASSERT_THROWS_CODE_AND_WHAT(MigrationDestinationManager::cloneDocumentsFromDonor(
+ operationContext(), insertBatchFn, fetchBatchFn),
+ DBException,
+ ErrorCodes::NetworkTimeout,
+ "network error");
+}
+
+// Tests that an exception in the insertion logic will successfully throw an exception on the
+// main thread.
+TEST_F(MigrationDestinationManagerTest, CloneDocumentsCatchesInsertErrors) {
+ auto fetchBatchFn = [&](OperationContext* opCtx) {
+ BSONObjBuilder fetchBatchResultBuilder;
+ fetchBatchResultBuilder.append("objects", createDocumentsToCloneArray());
+ return fetchBatchResultBuilder.obj();
+ };
+
+ auto insertBatchFn = [&](OperationContext* opCtx, BSONObjIterator docs) {
+ uasserted(ErrorCodes::FailedToParse, "insertion error");
+ };
+
+ // Since the error is thrown on another thread, the message becomes "operation was interrupted"
+ // on the main thread.
+
+ ASSERT_THROWS_CODE_AND_WHAT(MigrationDestinationManager::cloneDocumentsFromDonor(
+ operationContext(), insertBatchFn, fetchBatchFn),
+ DBException,
+ ErrorCodes::FailedToParse,
+ "operation was interrupted");
+
+ ASSERT_EQ(operationContext()->getKillStatus(), ErrorCodes::FailedToParse);
+}
+
+} // namespace
+} // namespace mongo