summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorBlake Oler <blake.oler@mongodb.com>2018-03-16 11:07:53 -0400
committerBlake Oler <blake.oler@mongodb.com>2018-03-16 11:07:53 -0400
commit1cac032e38456b8cbb11921183198e43bb59c95e (patch)
treefdec9fd1af9039328940b0de678a7c39b1a5b262 /src/mongo
parent1848f026aa094d6d143dc3b664a272fbd64155e5 (diff)
downloadmongo-1cac032e38456b8cbb11921183198e43bb59c95e.tar.gz
Revert "SERVER-32885 Overlap chunk clone application on the donor with fetching documents from the recipient"
This reverts commit 1848f026aa094d6d143dc3b664a272fbd64155e5.
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/s/SConscript1
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp98
-rw-r--r--src/mongo/db/s/migration_destination_manager.h8
-rw-r--r--src/mongo/db/s/migration_destination_manager_test.cpp160
4 files changed, 29 insertions, 238 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index b30de109a04..902b29089e9 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -269,7 +269,6 @@ env.CppUnitTest(
'active_move_primaries_registry_test.cpp',
'catalog_cache_loader_mock.cpp',
'migration_chunk_cloner_source_legacy_test.cpp',
- 'migration_destination_manager_test.cpp',
'namespace_metadata_change_notifications_test.cpp',
'sharding_state_test.cpp',
'shard_server_catalog_cache_loader_test.cpp',
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index e5831ac3570..9b72f198891 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -60,8 +60,6 @@
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
-#include "mongo/util/producer_consumer_queue.h"
-#include "mongo/util/scopeguard.h"
namespace mongo {
namespace {
@@ -370,51 +368,6 @@ Status MigrationDestinationManager::start(const NamespaceString& nss,
return Status::OK();
}
-void MigrationDestinationManager::cloneDocumentsFromDonor(
- OperationContext* opCtx,
- stdx::function<void(OperationContext*, BSONObjIterator)> insertBatchFn,
- stdx::function<BSONObj(OperationContext*)> fetchBatchFn) {
-
- ProducerConsumerQueue<BSONObj> batches(1);
- stdx::thread inserterThread{[&] {
- Client::initThreadIfNotAlready("chunkInserter");
- auto inserterOpCtx = Client::getCurrent()->makeOperationContext();
- auto consumerGuard = MakeGuard([&] { batches.closeConsumerEnd(); });
- try {
- while (true) {
- auto nextBatch = batches.pop(inserterOpCtx.get());
- auto arr = nextBatch["objects"].Obj();
- if (arr.isEmpty()) {
- return;
- }
- insertBatchFn(inserterOpCtx.get(), BSONObjIterator(arr));
- }
- } catch (...) {
- stdx::lock_guard<Client> lk(*opCtx->getClient());
- opCtx->getServiceContext()->killOperation(opCtx, exceptionToStatus().code());
- log() << "Batch insertion failed " << causedBy(redact(exceptionToStatus()));
- }
- }};
- auto inserterThreadJoinGuard = MakeGuard([&] {
- batches.closeProducerEnd();
- inserterThread.join();
- });
-
- while (true) {
- opCtx->checkForInterrupt();
-
- auto res = fetchBatchFn(opCtx);
- batches.push(res.getOwned(), opCtx);
- auto arr = res["objects"].Obj();
- if (arr.isEmpty()) {
- inserterThreadJoinGuard.Dismiss();
- inserterThread.join();
- opCtx->checkForInterrupt();
- break;
- }
- }
-}
-
Status MigrationDestinationManager::abort(const MigrationSessionId& sessionId) {
stdx::lock_guard<stdx::mutex> sl(_mutex);
@@ -509,8 +462,10 @@ void MigrationDestinationManager::_migrateThread(BSONObj min,
try {
_migrateDriver(
opCtx.get(), min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern);
+ } catch (std::exception& e) {
+ setStateFail(str::stream() << "migrate failed: " << redact(e.what()));
} catch (...) {
- setStateFail(str::stream() << "migrate failed: " << redact(exceptionToStatus()));
+ setStateFail("migrate failed with unknown exception: UNKNOWN ERROR");
}
if (getState() != DONE && !MONGO_FAIL_POINT(failMigrationLeaveOrphans)) {
@@ -761,19 +716,32 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
_chunkMarkedPending = true; // no lock needed, only the migrate thread looks.
- auto insertBatchFn = [&](OperationContext* opCtx, BSONObjIterator docs) {
- while (docs.more()) {
+ while (true) {
+ BSONObj res;
+ if (!conn->runCommand("admin",
+ migrateCloneRequest,
+ res)) { // gets array of objects to copy, in disk order
+ setStateFail(str::stream() << "_migrateClone failed: " << redact(res.toString()));
+ conn.done();
+ return;
+ }
+
+ BSONObj arr = res["objects"].Obj();
+ int thisTime = 0;
+
+ BSONObjIterator i(arr);
+ while (i.more()) {
opCtx->checkForInterrupt();
if (getState() == ABORT) {
- auto message = "Migration aborted while copying documents";
- log() << message;
- uasserted(50746, message);
+ log() << "Migration aborted while copying documents";
+ return;
}
- BSONObj docToClone = docs.next().Obj();
+ BSONObj docToClone = i.next().Obj();
{
OldClientWriteContext cx(opCtx, _nss.ns());
+
BSONObj localDoc;
if (willOverrideLocalId(opCtx,
_nss,
@@ -792,13 +760,17 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
// Exception will abort migration cleanly
uasserted(16976, errMsg);
}
+
Helpers::upsert(opCtx, _nss.ns(), docToClone, true);
}
+ thisTime++;
+
{
stdx::lock_guard<stdx::mutex> statsLock(_mutex);
_numCloned++;
_clonedBytes += docToClone.objsize();
}
+
if (writeConcern.shouldWaitForOtherNodes()) {
repl::ReplicationCoordinator::StatusAndDuration replStatus =
repl::ReplicationCoordinator::get(opCtx)->awaitReplication(
@@ -813,22 +785,10 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
}
}
}
- };
- auto fetchBatchFn = [&](OperationContext* opCtx) {
- BSONObj res;
- if (!conn->runCommand("admin",
- migrateCloneRequest,
- res)) { // gets array of objects to copy, in disk order
- conn.done();
- const std::string errMsg = str::stream() << "_migrateClone failed: "
- << redact(res.toString());
- uasserted(50747, errMsg);
- }
- return res;
- };
-
- cloneDocumentsFromDonor(opCtx, insertBatchFn, fetchBatchFn);
+ if (thisTime == 0)
+ break;
+ }
timing.done(3);
MONGO_FAIL_POINT_PAUSE_WHILE_SET(migrateThreadHangAtStep3);
diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h
index 79d8eaacd86..290c7f2218c 100644
--- a/src/mongo/db/s/migration_destination_manager.h
+++ b/src/mongo/db/s/migration_destination_manager.h
@@ -118,14 +118,6 @@ public:
const WriteConcernOptions& writeConcern);
/**
- * Clones documents from a donor shard.
- */
- static void cloneDocumentsFromDonor(
- OperationContext* opCtx,
- stdx::function<void(OperationContext*, BSONObjIterator)> insertBatchFn,
- stdx::function<BSONObj(OperationContext*)> fetchBatchFn);
-
- /**
* Idempotent method, which causes the current ongoing migration to abort only if it has the
* specified session id. If the migration is already aborted, does nothing.
*/
diff --git a/src/mongo/db/s/migration_destination_manager_test.cpp b/src/mongo/db/s/migration_destination_manager_test.cpp
deleted file mode 100644
index 6dd3d456a2f..00000000000
--- a/src/mongo/db/s/migration_destination_manager_test.cpp
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Copyright (C) 2018 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/s/migration_destination_manager.h"
-#include "mongo/s/shard_server_test_fixture.h"
-#include "mongo/unittest/unittest.h"
-
-namespace mongo {
-namespace {
-
-using unittest::assertGet;
-
-class MigrationDestinationManagerTest : public ShardServerTestFixture {
-protected:
- /**
- * Instantiates a BSON object in which both "_id" and "X" are set to value.
- */
- static BSONObj createDocument(int value) {
- return BSON("_id" << value << "X" << value);
- }
-
- /**
- * Creates a list of documents to clone.
- */
- static std::vector<BSONObj> createDocumentsToClone() {
- return {createDocument(1), createDocument(2), createDocument(3)};
- }
-
- /**
- * Creates a list of documents to clone and converts it to a BSONArray.
- */
- static BSONArray createDocumentsToCloneArray() {
- BSONArrayBuilder arrayBuilder;
- for (auto& doc : createDocumentsToClone()) {
- arrayBuilder.append(doc);
- }
- return arrayBuilder.arr();
- }
-};
-
-// Tests that documents will ferry from the fetch logic to the insert logic successfully.
-TEST_F(MigrationDestinationManagerTest, CloneDocumentsFromDonorWorksCorrectly) {
- bool ranOnce = false;
-
- auto fetchBatchFn = [&](OperationContext* opCtx) {
- BSONObjBuilder fetchBatchResultBuilder;
-
- if (ranOnce) {
- fetchBatchResultBuilder.append("objects", BSONObj());
- } else {
- ranOnce = true;
- fetchBatchResultBuilder.append("objects", createDocumentsToCloneArray());
- }
-
- return fetchBatchResultBuilder.obj();
- };
-
- std::vector<BSONObj> resultDocs;
-
- auto insertBatchFn = [&](OperationContext* opCtx, BSONObjIterator docs) {
- while (docs.more()) {
- resultDocs.push_back(docs.next().Obj().getOwned());
- }
- };
-
- MigrationDestinationManager::cloneDocumentsFromDonor(
- operationContext(), insertBatchFn, fetchBatchFn);
-
- std::vector<BSONObj> originalDocs = createDocumentsToClone();
-
- ASSERT_EQ(originalDocs.size(), resultDocs.size());
-
- for (auto originalDocsIt = originalDocs.begin(), resultDocsIt = resultDocs.begin();
- originalDocsIt != originalDocs.end() && resultDocsIt != resultDocs.end();
- ++originalDocsIt, ++resultDocsIt) {
- ASSERT_BSONOBJ_EQ(*originalDocsIt, *resultDocsIt);
- }
-}
-
-// Tests that an exception in the fetch logic will successfully throw an exception on the main
-// thread.
-TEST_F(MigrationDestinationManagerTest, CloneDocumentsThrowsFetchErrors) {
- bool ranOnce = false;
-
- auto fetchBatchFn = [&](OperationContext* opCtx) {
- BSONObjBuilder fetchBatchResultBuilder;
-
- if (ranOnce) {
- uasserted(ErrorCodes::NetworkTimeout, "network error");
- }
-
- ranOnce = true;
- fetchBatchResultBuilder.append("objects", createDocumentsToCloneArray());
-
- return fetchBatchResultBuilder.obj();
- };
-
- auto insertBatchFn = [&](OperationContext* opCtx, BSONObjIterator docs) {};
-
- ASSERT_THROWS_CODE_AND_WHAT(MigrationDestinationManager::cloneDocumentsFromDonor(
- operationContext(), insertBatchFn, fetchBatchFn),
- DBException,
- ErrorCodes::NetworkTimeout,
- "network error");
-}
-
-// Tests that an exception in the insertion logic will successfully throw an exception on the
-// main thread.
-TEST_F(MigrationDestinationManagerTest, CloneDocumentsCatchesInsertErrors) {
- auto fetchBatchFn = [&](OperationContext* opCtx) {
- BSONObjBuilder fetchBatchResultBuilder;
- fetchBatchResultBuilder.append("objects", createDocumentsToCloneArray());
- return fetchBatchResultBuilder.obj();
- };
-
- auto insertBatchFn = [&](OperationContext* opCtx, BSONObjIterator docs) {
- uasserted(ErrorCodes::FailedToParse, "insertion error");
- };
-
- // Since the error is thrown on another thread, the message becomes "operation was interrupted"
- // on the main thread.
-
- ASSERT_THROWS_CODE_AND_WHAT(MigrationDestinationManager::cloneDocumentsFromDonor(
- operationContext(), insertBatchFn, fetchBatchFn),
- DBException,
- ErrorCodes::FailedToParse,
- "operation was interrupted");
-
- ASSERT_EQ(operationContext()->getKillStatus(), ErrorCodes::FailedToParse);
-}
-
-} // namespace
-} // namespace mongo