summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBlake Oler <blake.oler@mongodb.com>2018-03-19 15:48:36 -0400
committerBlake Oler <blake.oler@mongodb.com>2018-03-22 13:36:25 -0400
commit7195da1959bf12fe2497e385b4b6579f43712930 (patch)
treea96da5e44ec2212ed36ad4ea2c4b1f08de74aa55
parent506ec8c26c0612f557654062b74552f978b2569e (diff)
downloadmongo-7195da1959bf12fe2497e385b4b6579f43712930.tar.gz
SERVER-32885 Overlap chunk clone application on the donor with fetching documents from the recipient
(cherry picked from dfa03b56b8334a82b223e34f02fc5cbfead4dca2)
-rw-r--r--src/mongo/db/s/SConscript1
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp121
-rw-r--r--src/mongo/db/s/migration_destination_manager.h8
-rw-r--r--src/mongo/db/s/migration_destination_manager_test.cpp160
4 files changed, 244 insertions, 46 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index b4c8b12a3d2..fc9805460f1 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -205,6 +205,7 @@ env.CppUnitTest(
'active_migrations_registry_test.cpp',
'metadata_manager_test.cpp',
'migration_chunk_cloner_source_legacy_test.cpp',
+ 'migration_destination_manager_test.cpp',
'sharding_state_test.cpp',
],
LIBDEPS=[
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index 1c02208ee27..a951711b0e0 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -64,6 +64,8 @@
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
+#include "mongo/util/producer_consumer_queue.h"
+#include "mongo/util/scopeguard.h"
namespace mongo {
@@ -345,6 +347,51 @@ Status MigrationDestinationManager::start(const NamespaceString& nss,
return Status::OK();
}
+void MigrationDestinationManager::cloneDocumentsFromDonor(
+ OperationContext* txn,
+ stdx::function<void(OperationContext*, BSONObjIterator)> insertBatchFn,
+ stdx::function<BSONObj(OperationContext*)> fetchBatchFn) {
+
+ ProducerConsumerQueue<BSONObj> batches(1);
+ stdx::thread inserterThread{[&] {
+ Client::initThreadIfNotAlready("chunkInserter");
+ auto inserterTxn = Client::getCurrent()->makeOperationContext();
+ auto consumerGuard = MakeGuard([&] { batches.closeConsumerEnd(); });
+ try {
+ while (true) {
+ auto nextBatch = batches.pop(inserterTxn.get());
+ auto arr = nextBatch["objects"].Obj();
+ if (arr.isEmpty()) {
+ return;
+ }
+ insertBatchFn(inserterTxn.get(), BSONObjIterator(arr));
+ }
+ } catch (...) {
+ stdx::lock_guard<Client> lk(*txn->getClient());
+ txn->getServiceContext()->killOperation(txn, exceptionToStatus().code());
+ log() << "Batch insertion failed " << causedBy(redact(exceptionToStatus()));
+ }
+ }};
+ auto inserterThreadJoinGuard = MakeGuard([&] {
+ batches.closeProducerEnd();
+ inserterThread.join();
+ });
+
+ while (true) {
+ txn->checkForInterrupt();
+
+ auto res = fetchBatchFn(txn);
+ batches.push(res.getOwned(), txn);
+ auto arr = res["objects"].Obj();
+ if (arr.isEmpty()) {
+ inserterThreadJoinGuard.Dismiss();
+ inserterThread.join();
+ txn->checkForInterrupt();
+ break;
+ }
+ }
+}
+
bool MigrationDestinationManager::abort(const MigrationSessionId& sessionId) {
stdx::lock_guard<stdx::mutex> sl(_mutex);
@@ -433,22 +480,14 @@ void MigrationDestinationManager::_migrateThread(BSONObj min,
try {
_migrateDriver(
opCtx.get(), min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern);
- } catch (std::exception& e) {
- {
- stdx::lock_guard<stdx::mutex> sl(_mutex);
- _state = FAIL;
- _errmsg = e.what();
- }
-
- error() << "migrate failed: " << redact(e.what()) << migrateLog;
} catch (...) {
{
stdx::lock_guard<stdx::mutex> sl(_mutex);
_state = FAIL;
- _errmsg = "UNKNOWN ERROR";
+ _errmsg = exceptionToStatus().toString();
}
- error() << "migrate failed with unknown exception" << migrateLog;
+ error() << "migrate failed: " << redact(exceptionToStatus()) << migrateLog;
}
if (getState() != DONE) {
@@ -664,36 +703,19 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
const BSONObj migrateCloneRequest = createMigrateCloneRequest(_nss, *_sessionId);
- while (true) {
- BSONObj res;
- if (!conn->runCommand("admin",
- migrateCloneRequest,
- res)) { // gets array of objects to copy, in disk order
- setState(FAIL);
- errmsg = "_migrateClone failed: ";
- errmsg += redact(res.toString());
- error() << errmsg << migrateLog;
- conn.done();
- return;
- }
-
- BSONObj arr = res["objects"].Obj();
- int thisTime = 0;
-
- BSONObjIterator i(arr);
- while (i.more()) {
+ auto insertBatchFn = [&](OperationContext* txn, BSONObjIterator docs) {
+ while (docs.more()) {
txn->checkForInterrupt();
if (getState() == ABORT) {
- errmsg = "Migration aborted while copying documents";
- error() << errmsg << migrateLog;
- return;
+ auto message = "Migration aborted while copying documents";
+ log() << message << migrateLog;
+ uasserted(40655, message);
}
- BSONObj docToClone = i.next().Obj();
+ BSONObj docToClone = docs.next().Obj();
{
OldClientWriteContext cx(txn, _nss.ns());
-
BSONObj localDoc;
if (willOverrideLocalId(txn,
_nss.ns(),
@@ -703,30 +725,25 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
cx.db(),
docToClone,
&localDoc)) {
- string errMsg = str::stream() << "cannot migrate chunk, local document "
- << redact(localDoc)
- << " has same _id as cloned "
- << "remote document " << redact(docToClone);
-
+ const std::string errMsg = str::stream()
+ << "cannot migrate chunk, local document " << redact(localDoc)
+ << " has same _id as cloned "
+ << "remote document " << redact(docToClone);
warning() << errMsg;
// Exception will abort migration cleanly
uasserted(16976, errMsg);
}
-
Helpers::upsert(txn, _nss.ns(), docToClone, true);
}
- thisTime++;
-
{
stdx::lock_guard<stdx::mutex> statsLock(_mutex);
_numCloned++;
_clonedBytes += docToClone.objsize();
}
-
if (writeConcern.shouldWaitForOtherNodes()) {
repl::ReplicationCoordinator::StatusAndDuration replStatus =
- repl::getGlobalReplicationCoordinator()->awaitReplication(
+ repl::ReplicationCoordinator::get(txn)->awaitReplication(
txn,
repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(),
writeConcern);
@@ -738,10 +755,22 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
}
}
}
+ };
- if (thisTime == 0)
- break;
- }
+ auto fetchBatchFn = [&](OperationContext* txn) {
+ BSONObj res;
+ if (!conn->runCommand("admin",
+ migrateCloneRequest,
+ res)) { // gets array of objects to copy, in disk order
+ conn.done();
+ const std::string errMsg = str::stream() << "_migrateClone failed: "
+ << redact(res.toString());
+ uasserted(40656, errMsg);
+ }
+ return res;
+ };
+
+ cloneDocumentsFromDonor(txn, insertBatchFn, fetchBatchFn);
timing.done(3);
MONGO_FAIL_POINT_PAUSE_WHILE_SET(migrateThreadHangAtStep3);
diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h
index 4bd269f37fd..836761530d4 100644
--- a/src/mongo/db/s/migration_destination_manager.h
+++ b/src/mongo/db/s/migration_destination_manager.h
@@ -101,6 +101,14 @@ public:
const WriteConcernOptions& writeConcern);
/**
+ * Clones documents from a donor shard.
+ */
+ static void cloneDocumentsFromDonor(
+ OperationContext* txn,
+ stdx::function<void(OperationContext*, BSONObjIterator)> insertBatchFn,
+ stdx::function<BSONObj(OperationContext*)> fetchBatchFn);
+
+ /**
* Idempotent method, which causes the current ongoing migration to abort only if it has the
* specified session id, otherwise returns false. If the migration is already aborted, does
* nothing.
diff --git a/src/mongo/db/s/migration_destination_manager_test.cpp b/src/mongo/db/s/migration_destination_manager_test.cpp
new file mode 100644
index 00000000000..f1ce14d3147
--- /dev/null
+++ b/src/mongo/db/s/migration_destination_manager_test.cpp
@@ -0,0 +1,160 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/s/migration_destination_manager.h"
+#include "mongo/s/sharding_mongod_test_fixture.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+using unittest::assertGet;
+
+class MigrationDestinationManagerTest : public ShardingMongodTestFixture {
+protected:
+ /**
+ * Instantiates a BSON object in which both "_id" and "X" are set to value.
+ */
+ static BSONObj createDocument(int value) {
+ return BSON("_id" << value << "X" << value);
+ }
+
+ /**
+ * Creates a list of documents to clone.
+ */
+ static std::vector<BSONObj> createDocumentsToClone() {
+ return {createDocument(1), createDocument(2), createDocument(3)};
+ }
+
+ /**
+ * Creates a list of documents to clone and converts it to a BSONArray.
+ */
+ static BSONArray createDocumentsToCloneArray() {
+ BSONArrayBuilder arrayBuilder;
+ for (auto& doc : createDocumentsToClone()) {
+ arrayBuilder.append(doc);
+ }
+ return arrayBuilder.arr();
+ }
+};
+
+// Tests that documents will ferry from the fetch logic to the insert logic successfully.
+TEST_F(MigrationDestinationManagerTest, CloneDocumentsFromDonorWorksCorrectly) {
+ bool ranOnce = false;
+
+ auto fetchBatchFn = [&](OperationContext* opCtx) {
+ BSONObjBuilder fetchBatchResultBuilder;
+
+ if (ranOnce) {
+ fetchBatchResultBuilder.append("objects", BSONObj());
+ } else {
+ ranOnce = true;
+ fetchBatchResultBuilder.append("objects", createDocumentsToCloneArray());
+ }
+
+ return fetchBatchResultBuilder.obj();
+ };
+
+ std::vector<BSONObj> resultDocs;
+
+ auto insertBatchFn = [&](OperationContext* opCtx, BSONObjIterator docs) {
+ while (docs.more()) {
+ resultDocs.push_back(docs.next().Obj().getOwned());
+ }
+ };
+
+ MigrationDestinationManager::cloneDocumentsFromDonor(
+ operationContext(), insertBatchFn, fetchBatchFn);
+
+ std::vector<BSONObj> originalDocs = createDocumentsToClone();
+
+ ASSERT_EQ(originalDocs.size(), resultDocs.size());
+
+ for (auto originalDocsIt = originalDocs.begin(), resultDocsIt = resultDocs.begin();
+ originalDocsIt != originalDocs.end() && resultDocsIt != resultDocs.end();
+ ++originalDocsIt, ++resultDocsIt) {
+ ASSERT_BSONOBJ_EQ(*originalDocsIt, *resultDocsIt);
+ }
+}
+
+// Tests that an exception in the fetch logic will successfully throw an exception on the main
+// thread.
+TEST_F(MigrationDestinationManagerTest, CloneDocumentsThrowsFetchErrors) {
+ bool ranOnce = false;
+
+ auto fetchBatchFn = [&](OperationContext* opCtx) {
+ BSONObjBuilder fetchBatchResultBuilder;
+
+ if (ranOnce) {
+ uasserted(ErrorCodes::NetworkTimeout, "network error");
+ }
+
+ ranOnce = true;
+ fetchBatchResultBuilder.append("objects", createDocumentsToCloneArray());
+
+ return fetchBatchResultBuilder.obj();
+ };
+
+ auto insertBatchFn = [&](OperationContext* opCtx, BSONObjIterator docs) {};
+
+ ASSERT_THROWS_CODE_AND_WHAT(MigrationDestinationManager::cloneDocumentsFromDonor(
+ operationContext(), insertBatchFn, fetchBatchFn),
+ DBException,
+ ErrorCodes::NetworkTimeout,
+ "network error");
+}
+
+// Tests that an exception in the insertion logic will successfully throw an exception on the
+// main thread.
+TEST_F(MigrationDestinationManagerTest, CloneDocumentsCatchesInsertErrors) {
+ auto fetchBatchFn = [&](OperationContext* opCtx) {
+ BSONObjBuilder fetchBatchResultBuilder;
+ fetchBatchResultBuilder.append("objects", createDocumentsToCloneArray());
+ return fetchBatchResultBuilder.obj();
+ };
+
+ auto insertBatchFn = [&](OperationContext* opCtx, BSONObjIterator docs) {
+ uasserted(ErrorCodes::FailedToParse, "insertion error");
+ };
+
+ // Since the error is thrown on another thread, the message becomes "operation was interrupted"
+ // on the main thread.
+
+ ASSERT_THROWS_CODE_AND_WHAT(MigrationDestinationManager::cloneDocumentsFromDonor(
+ operationContext(), insertBatchFn, fetchBatchFn),
+ DBException,
+ ErrorCodes::FailedToParse,
+ "operation was interrupted");
+
+ ASSERT_EQ(operationContext()->getKillStatus(), ErrorCodes::FailedToParse);
+}
+
+} // namespace
+} // namespace mongo