diff options
author | Cheahuychou Mao <cheahuychou.mao@mongodb.com> | 2018-06-21 15:41:26 -0400 |
---|---|---|
committer | Cheahuychou Mao <cheahuychou.mao@mongodb.com> | 2018-08-08 11:12:22 -0400 |
commit | 1ba1423693a268d3b40b56da9d23600b393a31e3 (patch) | |
tree | e1c2f173c81575ce3523c6e7adf81b9232c3a25e | |
parent | d700ec58536c3191fd052a4f23cc694363911989 (diff) | |
download | mongo-1ba1423693a268d3b40b56da9d23600b393a31e3.tar.gz |
SERVER-27725 Use batch insert when migrating chunks
(cherry picked from commit b20b8c2a6930a43004c776f011212af2f2fcd59a)
-rw-r--r-- | src/mongo/db/ops/write_ops_exec.cpp | 20 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_exec.h | 6 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.cpp | 95 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.h | 2 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager_test.cpp | 10 |
5 files changed, 70 insertions, 63 deletions
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index b6100df6286..d880d089fc1 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -308,7 +308,8 @@ WriteResult performCreateIndexes(OperationContext* opCtx, const write_ops::Inser void insertDocuments(OperationContext* opCtx, Collection* collection, std::vector<InsertStatement>::iterator begin, - std::vector<InsertStatement>::iterator end) { + std::vector<InsertStatement>::iterator end, + bool fromMigrate) { // Intentionally not using writeConflictRetry. That is handled by the caller so it can react to // oversized batches. WriteUnitOfWork wuow(opCtx); @@ -333,7 +334,7 @@ void insertDocuments(OperationContext* opCtx, } uassertStatusOK(collection->insertDocuments( - opCtx, begin, end, &CurOp::get(opCtx)->debug(), /*enforceQuota*/ true)); + opCtx, begin, end, &CurOp::get(opCtx)->debug(), /*enforceQuota*/ true, fromMigrate)); wuow.commit(); } @@ -344,7 +345,8 @@ bool insertBatchAndHandleErrors(OperationContext* opCtx, const write_ops::Insert& wholeOp, std::vector<InsertStatement>& batch, LastOpFixer* lastOpFixer, - WriteResult* out) { + WriteResult* out, + bool fromMigrate) { if (batch.empty()) return true; @@ -383,7 +385,8 @@ bool insertBatchAndHandleErrors(OperationContext* opCtx, // First try doing it all together. If all goes well, this is all we need to do. // See Collection::_insertDocuments for why we do all capped inserts one-at-a-time. lastOpFixer->startingOp(); - insertDocuments(opCtx, collection->getCollection(), batch.begin(), batch.end()); + insertDocuments( + opCtx, collection->getCollection(), batch.begin(), batch.end(), fromMigrate); lastOpFixer->finishedOpSuccessfully(); globalOpCounters.gotInserts(batch.size()); SingleWriteResult result; @@ -410,7 +413,7 @@ bool insertBatchAndHandleErrors(OperationContext* opCtx, if (!collection) acquireCollection(); lastOpFixer->startingOp(); - insertDocuments(opCtx, collection->getCollection(), it, it + 1); + insertDocuments(opCtx, collection->getCollection(), it, it + 1, fromMigrate); lastOpFixer->finishedOpSuccessfully(); SingleWriteResult result; result.setN(1); @@ -449,7 +452,9 @@ SingleWriteResult makeWriteResultForInsertOrDeleteRetry() { } // namespace -WriteResult performInserts(OperationContext* opCtx, const write_ops::Insert& wholeOp) { +WriteResult performInserts(OperationContext* opCtx, + const write_ops::Insert& wholeOp, + bool fromMigrate) { invariant(!opCtx->lockState()->inAWriteUnitOfWork()); // Does own retries. auto& curOp = *CurOp::get(opCtx); ON_BLOCK_EXIT([&] { @@ -524,7 +529,8 @@ WriteResult performInserts(OperationContext* opCtx, const write_ops::Insert& who continue; // Add more to batch before inserting. } - bool canContinue = insertBatchAndHandleErrors(opCtx, wholeOp, batch, &lastOpFixer, &out); + bool canContinue = + insertBatchAndHandleErrors(opCtx, wholeOp, batch, &lastOpFixer, &out, fromMigrate); batch.clear(); // We won't need the current batch any more. bytesInBatch = 0; diff --git a/src/mongo/db/ops/write_ops_exec.h b/src/mongo/db/ops/write_ops_exec.h index 7483b78a236..4bfc212f196 100644 --- a/src/mongo/db/ops/write_ops_exec.h +++ b/src/mongo/db/ops/write_ops_exec.h @@ -70,8 +70,12 @@ struct WriteResult { * LastError is updated for failures of individual writes, but not for batch errors reported by an * exception being thrown from these functions. Callers are responsible for managing LastError in * that case. This should generally be combined with LastError handling from parse failures. + * + * 'fromMigrate' indicates whether the operation was induced by a chunk migration */ -WriteResult performInserts(OperationContext* opCtx, const write_ops::Insert& op); +WriteResult performInserts(OperationContext* opCtx, + const write_ops::Insert& op, + bool fromMigrate = false); WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& op); WriteResult performDeletes(OperationContext* opCtx, const write_ops::Delete& op); diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index f6e468324c5..1cd53e3bd2f 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -46,6 +46,7 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/ops/delete.h" +#include "mongo/db/ops/write_ops_exec.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/s/collection_sharding_state.h" @@ -367,7 +368,7 @@ Status MigrationDestinationManager::start(const NamespaceString& nss, void MigrationDestinationManager::cloneDocumentsFromDonor( OperationContext* opCtx, - stdx::function<void(OperationContext*, BSONObjIterator)> insertBatchFn, + stdx::function<void(OperationContext*, BSONObj)> insertBatchFn, stdx::function<BSONObj(OperationContext*)> fetchBatchFn) { ProducerConsumerQueue<BSONObj> batches(1); @@ -382,7 +383,7 @@ void MigrationDestinationManager::cloneDocumentsFromDonor( if (arr.isEmpty()) { return; } - insertBatchFn(inserterOpCtx.get(), BSONObjIterator(arr)); + insertBatchFn(inserterOpCtx.get(), arr); } } catch (...) { stdx::lock_guard<Client> lk(*opCtx->getClient()); @@ -762,56 +763,52 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, _chunkMarkedPending = true; // no lock needed, only the migrate thread looks. - auto insertBatchFn = [&](OperationContext* opCtx, BSONObjIterator docs) { - while (docs.more()) { - opCtx->checkForInterrupt(); + auto assertNotAborted = [&](OperationContext* opCtx) { + opCtx->checkForInterrupt(); + uassert(50664, "Migration aborted while copying documents", getState() != ABORT); + }; - if (getState() == ABORT) { - auto message = "Migration aborted while copying documents"; - log() << message; - uasserted(50664, message); + auto insertBatchFn = [&](OperationContext* opCtx, BSONObj arr) { + int batchNumCloned = 0; + int batchClonedBytes = 0; + + assertNotAborted(opCtx); + + write_ops::Insert insertOp(_nss); + insertOp.getWriteCommandBase().setOrdered(true); + insertOp.setDocuments([&] { + std::vector<BSONObj> toInsert; + for (const auto& doc : arr) { + BSONObj docToClone = doc.Obj(); + toInsert.push_back(docToClone); + batchNumCloned++; + batchClonedBytes += docToClone.objsize(); } + return toInsert; + }()); - BSONObj docToClone = docs.next().Obj(); - { - OldClientWriteContext cx(opCtx, _nss.ns()); - BSONObj localDoc; - if (willOverrideLocalId(opCtx, - _nss, - min, - max, - shardKeyPattern, - cx.db(), - docToClone, - &localDoc)) { - const std::string errMsg = str::stream() - << "cannot migrate chunk, local document " << redact(localDoc) - << " has same _id as cloned " - << "remote document " << redact(docToClone); - warning() << errMsg; - - // Exception will abort migration cleanly - uasserted(16976, errMsg); - } - Helpers::upsert(opCtx, _nss.ns(), docToClone, true); - } - { - stdx::lock_guard<stdx::mutex> statsLock(_mutex); - _numCloned++; - _clonedBytes += docToClone.objsize(); - } - if (writeConcern.shouldWaitForOtherNodes()) { - repl::ReplicationCoordinator::StatusAndDuration replStatus = - repl::getGlobalReplicationCoordinator()->awaitReplication( - opCtx, - repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(), - writeConcern); - if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) { - warning() << "secondaryThrottle on, but doc insert timed out; " - "continuing"; - } else { - massertStatusOK(replStatus.status); - } + const WriteResult reply = performInserts(opCtx, insertOp, true); + + for (unsigned long i = 0; i < reply.results.size(); ++i) { + uassertStatusOK(reply.results[i]); + } + + { + stdx::lock_guard<stdx::mutex> statsLock(_mutex); + _numCloned += batchNumCloned; + _clonedBytes += batchClonedBytes; + } + if (writeConcern.shouldWaitForOtherNodes()) { + repl::ReplicationCoordinator::StatusAndDuration replStatus = + repl::ReplicationCoordinator::get(opCtx)->awaitReplication( + opCtx, + repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(), + writeConcern); + if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) { + warning() << "secondaryThrottle on, but doc insert timed out; " + "continuing"; + } else { + uassertStatusOK(replStatus.status); } } }; diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h index ebf3bc25544..c0c5735e068 100644 --- a/src/mongo/db/s/migration_destination_manager.h +++ b/src/mongo/db/s/migration_destination_manager.h @@ -109,7 +109,7 @@ public: */ static void cloneDocumentsFromDonor( OperationContext* opCtx, - stdx::function<void(OperationContext*, BSONObjIterator)> insertBatchFn, + stdx::function<void(OperationContext*, BSONObj)> insertBatchFn, stdx::function<BSONObj(OperationContext*)> fetchBatchFn); /** diff --git a/src/mongo/db/s/migration_destination_manager_test.cpp b/src/mongo/db/s/migration_destination_manager_test.cpp index 6dd3d456a2f..1b694156e86 100644 --- a/src/mongo/db/s/migration_destination_manager_test.cpp +++ b/src/mongo/db/s/migration_destination_manager_test.cpp @@ -84,9 +84,9 @@ TEST_F(MigrationDestinationManagerTest, CloneDocumentsFromDonorWorksCorrectly) { std::vector<BSONObj> resultDocs; - auto insertBatchFn = [&](OperationContext* opCtx, BSONObjIterator docs) { - while (docs.more()) { - resultDocs.push_back(docs.next().Obj().getOwned()); + auto insertBatchFn = [&](OperationContext* opCtx, BSONObj docs) { + for (auto&& docToClone : docs) { + resultDocs.push_back(docToClone.Obj().getOwned()); } }; @@ -122,7 +122,7 @@ TEST_F(MigrationDestinationManagerTest, CloneDocumentsThrowsFetchErrors) { return fetchBatchResultBuilder.obj(); }; - auto insertBatchFn = [&](OperationContext* opCtx, BSONObjIterator docs) {}; + auto insertBatchFn = [&](OperationContext* opCtx, BSONObj docs) {}; ASSERT_THROWS_CODE_AND_WHAT(MigrationDestinationManager::cloneDocumentsFromDonor( operationContext(), insertBatchFn, fetchBatchFn), @@ -140,7 +140,7 @@ TEST_F(MigrationDestinationManagerTest, CloneDocumentsCatchesInsertErrors) { return fetchBatchResultBuilder.obj(); }; - auto insertBatchFn = [&](OperationContext* opCtx, BSONObjIterator docs) { + auto insertBatchFn = [&](OperationContext* opCtx, BSONObj docs) { uasserted(ErrorCodes::FailedToParse, "insertion error"); }; |