diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2018-08-29 11:15:47 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2018-08-29 14:30:16 -0400 |
commit | f49d4876eca3341f6c0539b8525598e31a590748 (patch) | |
tree | 76aca5335a41814d36a8d7fa08fcb33a5dbad468 | |
parent | 0a30697f653ab8a58b509a52ef0c8337c90c7552 (diff) | |
download | mongo-f49d4876eca3341f6c0539b8525598e31a590748.tar.gz |
SERVER-27725 Use batch insert when migrating chunks
(cherry picked from commit b20b8c2a6930a43004c776f011212af2f2fcd59a)
-rw-r--r-- | src/mongo/db/ops/write_ops_exec.cpp | 19 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_exec.h | 4 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.cpp | 96 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.h | 2 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager_test.cpp | 10 |
5 files changed, 67 insertions, 64 deletions
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index 6ea4d88293a..ad307b508bf 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -52,6 +52,7 @@ #include "mongo/db/ops/parsed_update.h" #include "mongo/db/ops/update_lifecycle_impl.h" #include "mongo/db/ops/update_request.h" +#include "mongo/db/ops/write_ops.h" #include "mongo/db/ops/write_ops_exec.h" #include "mongo/db/query/get_executor.h" #include "mongo/db/query/plan_summary_stats.h" @@ -301,12 +302,13 @@ static WriteResult performCreateIndexes(OperationContext* txn, const InsertOp& w static void insertDocuments(OperationContext* txn, Collection* collection, std::vector<BSONObj>::const_iterator begin, - std::vector<BSONObj>::const_iterator end) { + std::vector<BSONObj>::const_iterator end, + bool fromMigrate) { // Intentionally not using a WRITE_CONFLICT_RETRY_LOOP. That is handled by the caller so it can // react to oversized batches. WriteUnitOfWork wuow(txn); uassertStatusOK(collection->insertDocuments( - txn, begin, end, &CurOp::get(txn)->debug(), /*enforceQuota*/ true)); + txn, begin, end, &CurOp::get(txn)->debug(), /*enforceQuota*/ true, fromMigrate)); wuow.commit(); } @@ -317,7 +319,8 @@ static bool insertBatchAndHandleErrors(OperationContext* txn, const InsertOp& wholeOp, const std::vector<BSONObj>& batch, LastOpFixer* lastOpFixer, - WriteResult* out) { + WriteResult* out, + bool fromMigrate) { if (batch.empty()) return true; @@ -350,7 +353,8 @@ static bool insertBatchAndHandleErrors(OperationContext* txn, // First try doing it all together. If all goes well, this is all we need to do. // See Collection::_insertDocuments for why we do all capped inserts one-at-a-time. lastOpFixer->startingOp(); - insertDocuments(txn, collection->getCollection(), batch.begin(), batch.end()); + insertDocuments( + txn, collection->getCollection(), batch.begin(), batch.end(), fromMigrate); lastOpFixer->finishedOpSuccessfully(); globalOpCounters.gotInserts(batch.size()); std::fill_n( @@ -374,7 +378,7 @@ static bool insertBatchAndHandleErrors(OperationContext* txn, if (!collection) acquireCollection(); lastOpFixer->startingOp(); - insertDocuments(txn, collection->getCollection(), it, it + 1); + insertDocuments(txn, collection->getCollection(), it, it + 1, fromMigrate); lastOpFixer->finishedOpSuccessfully(); out->results.emplace_back(WriteResult::SingleResult{1}); curOp.debug().ninserted++; @@ -396,7 +400,7 @@ static bool insertBatchAndHandleErrors(OperationContext* txn, return true; } -WriteResult performInserts(OperationContext* txn, const InsertOp& wholeOp) { +WriteResult performInserts(OperationContext* txn, const InsertOp& wholeOp, bool fromMigrate) { invariant(!txn->lockState()->inAWriteUnitOfWork()); // Does own retries. auto& curOp = *CurOp::get(txn); ON_BLOCK_EXIT([&] { @@ -453,7 +457,8 @@ WriteResult performInserts(OperationContext* txn, const InsertOp& wholeOp) { continue; // Add more to batch before inserting. } - bool canContinue = insertBatchAndHandleErrors(txn, wholeOp, batch, &lastOpFixer, &out); + bool canContinue = + insertBatchAndHandleErrors(txn, wholeOp, batch, &lastOpFixer, &out, fromMigrate); batch.clear(); // We won't need the current batch any more. bytesInBatch = 0; diff --git a/src/mongo/db/ops/write_ops_exec.h b/src/mongo/db/ops/write_ops_exec.h index 49d3d2e0cf1..362bf5a4782 100644 --- a/src/mongo/db/ops/write_ops_exec.h +++ b/src/mongo/db/ops/write_ops_exec.h @@ -75,8 +75,10 @@ struct WriteResult { * LastError is updated for failures of individual writes, but not for batch errors reported by an * exception being thrown from these functions. Callers are responsible for managing LastError in * that case. This should generally be combined with LastError handling from parse failures. + * + * 'fromMigrate' indicates whether the operation was induced by a chunk migration */ -WriteResult performInserts(OperationContext* txn, const InsertOp& op); +WriteResult performInserts(OperationContext* txn, const InsertOp& op, bool fromMigrate = false); WriteResult performUpdates(OperationContext* txn, const UpdateOp& op); WriteResult performDeletes(OperationContext* txn, const DeleteOp& op); diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index 72284059c07..b589b6831e0 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -47,6 +47,8 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/ops/delete.h" +#include "mongo/db/ops/write_ops.h" +#include "mongo/db/ops/write_ops_exec.h" #include "mongo/db/range_deleter_service.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_coordinator_global.h" @@ -349,7 +351,7 @@ Status MigrationDestinationManager::start(const NamespaceString& nss, void MigrationDestinationManager::cloneDocumentsFromDonor( OperationContext* txn, - stdx::function<void(OperationContext*, BSONObjIterator)> insertBatchFn, + stdx::function<void(OperationContext*, BSONObj)> insertBatchFn, stdx::function<BSONObj(OperationContext*)> fetchBatchFn) { ProducerConsumerQueue<BSONObj> batches(1); @@ -364,7 +366,7 @@ void MigrationDestinationManager::cloneDocumentsFromDonor( if (arr.isEmpty()) { return; } - insertBatchFn(inserterTxn.get(), BSONObjIterator(arr)); + insertBatchFn(inserterTxn.get(), arr); } } catch (...) { stdx::lock_guard<Client> lk(*txn->getClient()); @@ -710,56 +712,50 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, const BSONObj migrateCloneRequest = createMigrateCloneRequest(_nss, *_sessionId); - auto insertBatchFn = [&](OperationContext* txn, BSONObjIterator docs) { - while (docs.more()) { - txn->checkForInterrupt(); + auto assertNotAborted = [&](OperationContext* opCtx) { + opCtx->checkForInterrupt(); + uassert(40655, "Migration aborted while copying documents", getState() != ABORT); + }; - if (getState() == ABORT) { - auto message = "Migration aborted while copying documents"; - log() << message << migrateLog; - uasserted(40655, message); - } + auto insertBatchFn = [&](OperationContext* opCtx, BSONObj arr) { + int batchNumCloned = 0; + int batchClonedBytes = 0; - BSONObj docToClone = docs.next().Obj(); - { - OldClientWriteContext cx(txn, _nss.ns()); - BSONObj localDoc; - if (willOverrideLocalId(txn, - _nss.ns(), - min, - max, - shardKeyPattern, - cx.db(), - docToClone, - &localDoc)) { - const std::string errMsg = str::stream() - << "cannot migrate chunk, local document " << redact(localDoc) - << " has same _id as cloned " - << "remote document " << redact(docToClone); - warning() << errMsg; - - // Exception will abort migration cleanly - uasserted(16976, errMsg); - } - Helpers::upsert(txn, _nss.ns(), docToClone, true); - } - { - stdx::lock_guard<stdx::mutex> statsLock(_mutex); - _numCloned++; - _clonedBytes += docToClone.objsize(); - } - if (writeConcern.shouldWaitForOtherNodes()) { - repl::ReplicationCoordinator::StatusAndDuration replStatus = - repl::ReplicationCoordinator::get(txn)->awaitReplication( - txn, - repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(), - writeConcern); - if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) { - warning() << "secondaryThrottle on, but doc insert timed out; " - "continuing"; - } else { - massertStatusOK(replStatus.status); - } + assertNotAborted(opCtx); + + std::vector<BSONObj> toInsert; + for (const auto& doc : arr) { + BSONObj docToClone = doc.Obj(); + toInsert.push_back(docToClone); + batchNumCloned++; + batchClonedBytes += docToClone.objsize(); + } + InsertOp insertOp; + insertOp.ns = _nss; + insertOp.documents = toInsert; + + const WriteResult reply = performInserts(opCtx, insertOp, true); + + for (unsigned long i = 0; i < reply.results.size(); ++i) { + uassertStatusOK(reply.results[i]); + } + + { + stdx::lock_guard<stdx::mutex> statsLock(_mutex); + _numCloned += batchNumCloned; + _clonedBytes += batchClonedBytes; + } + if (writeConcern.shouldWaitForOtherNodes()) { + repl::ReplicationCoordinator::StatusAndDuration replStatus = + repl::ReplicationCoordinator::get(opCtx)->awaitReplication( + opCtx, + repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(), + writeConcern); + if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) { + warning() << "secondaryThrottle on, but doc insert timed out; " + "continuing"; + } else { + uassertStatusOK(replStatus.status); } } }; diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h index 836761530d4..791aa393116 100644 --- a/src/mongo/db/s/migration_destination_manager.h +++ b/src/mongo/db/s/migration_destination_manager.h @@ -105,7 +105,7 @@ public: */ static void cloneDocumentsFromDonor( OperationContext* txn, - stdx::function<void(OperationContext*, BSONObjIterator)> insertBatchFn, + stdx::function<void(OperationContext*, BSONObj)> insertBatchFn, stdx::function<BSONObj(OperationContext*)> fetchBatchFn); /** diff --git a/src/mongo/db/s/migration_destination_manager_test.cpp b/src/mongo/db/s/migration_destination_manager_test.cpp index f1ce14d3147..b9daad4ad9f 100644 --- a/src/mongo/db/s/migration_destination_manager_test.cpp +++ b/src/mongo/db/s/migration_destination_manager_test.cpp @@ -84,9 +84,9 @@ TEST_F(MigrationDestinationManagerTest, CloneDocumentsFromDonorWorksCorrectly) { std::vector<BSONObj> resultDocs; - auto insertBatchFn = [&](OperationContext* opCtx, BSONObjIterator docs) { - while (docs.more()) { - resultDocs.push_back(docs.next().Obj().getOwned()); + auto insertBatchFn = [&](OperationContext* opCtx, BSONObj docs) { + for (auto&& docToClone : docs) { + resultDocs.push_back(docToClone.Obj().getOwned()); } }; @@ -122,7 +122,7 @@ TEST_F(MigrationDestinationManagerTest, CloneDocumentsThrowsFetchErrors) { return fetchBatchResultBuilder.obj(); }; - auto insertBatchFn = [&](OperationContext* opCtx, BSONObjIterator docs) {}; + auto insertBatchFn = [&](OperationContext* opCtx, BSONObj docs) {}; ASSERT_THROWS_CODE_AND_WHAT(MigrationDestinationManager::cloneDocumentsFromDonor( operationContext(), insertBatchFn, fetchBatchFn), @@ -140,7 +140,7 @@ TEST_F(MigrationDestinationManagerTest, CloneDocumentsCatchesInsertErrors) { return fetchBatchResultBuilder.obj(); }; - auto insertBatchFn = [&](OperationContext* opCtx, BSONObjIterator docs) { + auto insertBatchFn = [&](OperationContext* opCtx, BSONObj docs) { uasserted(ErrorCodes::FailedToParse, "insertion error"); }; |