summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheahuychou Mao <cheahuychou.mao@mongodb.com>2018-06-21 15:41:26 -0400
committerCheahuychou Mao <cheahuychou.mao@mongodb.com>2018-08-08 11:12:22 -0400
commit1ba1423693a268d3b40b56da9d23600b393a31e3 (patch)
treee1c2f173c81575ce3523c6e7adf81b9232c3a25e
parentd700ec58536c3191fd052a4f23cc694363911989 (diff)
downloadmongo-1ba1423693a268d3b40b56da9d23600b393a31e3.tar.gz
SERVER-27725 Use batch insert when migrating chunks
(cherry picked from commit b20b8c2a6930a43004c776f011212af2f2fcd59a)
-rw-r--r--src/mongo/db/ops/write_ops_exec.cpp20
-rw-r--r--src/mongo/db/ops/write_ops_exec.h6
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp95
-rw-r--r--src/mongo/db/s/migration_destination_manager.h2
-rw-r--r--src/mongo/db/s/migration_destination_manager_test.cpp10
5 files changed, 70 insertions, 63 deletions
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp
index b6100df6286..d880d089fc1 100644
--- a/src/mongo/db/ops/write_ops_exec.cpp
+++ b/src/mongo/db/ops/write_ops_exec.cpp
@@ -308,7 +308,8 @@ WriteResult performCreateIndexes(OperationContext* opCtx, const write_ops::Inser
void insertDocuments(OperationContext* opCtx,
Collection* collection,
std::vector<InsertStatement>::iterator begin,
- std::vector<InsertStatement>::iterator end) {
+ std::vector<InsertStatement>::iterator end,
+ bool fromMigrate) {
// Intentionally not using writeConflictRetry. That is handled by the caller so it can react to
// oversized batches.
WriteUnitOfWork wuow(opCtx);
@@ -333,7 +334,7 @@ void insertDocuments(OperationContext* opCtx,
}
uassertStatusOK(collection->insertDocuments(
- opCtx, begin, end, &CurOp::get(opCtx)->debug(), /*enforceQuota*/ true));
+ opCtx, begin, end, &CurOp::get(opCtx)->debug(), /*enforceQuota*/ true, fromMigrate));
wuow.commit();
}
@@ -344,7 +345,8 @@ bool insertBatchAndHandleErrors(OperationContext* opCtx,
const write_ops::Insert& wholeOp,
std::vector<InsertStatement>& batch,
LastOpFixer* lastOpFixer,
- WriteResult* out) {
+ WriteResult* out,
+ bool fromMigrate) {
if (batch.empty())
return true;
@@ -383,7 +385,8 @@ bool insertBatchAndHandleErrors(OperationContext* opCtx,
// First try doing it all together. If all goes well, this is all we need to do.
// See Collection::_insertDocuments for why we do all capped inserts one-at-a-time.
lastOpFixer->startingOp();
- insertDocuments(opCtx, collection->getCollection(), batch.begin(), batch.end());
+ insertDocuments(
+ opCtx, collection->getCollection(), batch.begin(), batch.end(), fromMigrate);
lastOpFixer->finishedOpSuccessfully();
globalOpCounters.gotInserts(batch.size());
SingleWriteResult result;
@@ -410,7 +413,7 @@ bool insertBatchAndHandleErrors(OperationContext* opCtx,
if (!collection)
acquireCollection();
lastOpFixer->startingOp();
- insertDocuments(opCtx, collection->getCollection(), it, it + 1);
+ insertDocuments(opCtx, collection->getCollection(), it, it + 1, fromMigrate);
lastOpFixer->finishedOpSuccessfully();
SingleWriteResult result;
result.setN(1);
@@ -449,7 +452,9 @@ SingleWriteResult makeWriteResultForInsertOrDeleteRetry() {
} // namespace
-WriteResult performInserts(OperationContext* opCtx, const write_ops::Insert& wholeOp) {
+WriteResult performInserts(OperationContext* opCtx,
+ const write_ops::Insert& wholeOp,
+ bool fromMigrate) {
invariant(!opCtx->lockState()->inAWriteUnitOfWork()); // Does own retries.
auto& curOp = *CurOp::get(opCtx);
ON_BLOCK_EXIT([&] {
@@ -524,7 +529,8 @@ WriteResult performInserts(OperationContext* opCtx, const write_ops::Insert& who
continue; // Add more to batch before inserting.
}
- bool canContinue = insertBatchAndHandleErrors(opCtx, wholeOp, batch, &lastOpFixer, &out);
+ bool canContinue =
+ insertBatchAndHandleErrors(opCtx, wholeOp, batch, &lastOpFixer, &out, fromMigrate);
batch.clear(); // We won't need the current batch any more.
bytesInBatch = 0;
diff --git a/src/mongo/db/ops/write_ops_exec.h b/src/mongo/db/ops/write_ops_exec.h
index 7483b78a236..4bfc212f196 100644
--- a/src/mongo/db/ops/write_ops_exec.h
+++ b/src/mongo/db/ops/write_ops_exec.h
@@ -70,8 +70,12 @@ struct WriteResult {
* LastError is updated for failures of individual writes, but not for batch errors reported by an
* exception being thrown from these functions. Callers are responsible for managing LastError in
* that case. This should generally be combined with LastError handling from parse failures.
+ *
+ * 'fromMigrate' indicates whether the operation was induced by a chunk migration
*/
-WriteResult performInserts(OperationContext* opCtx, const write_ops::Insert& op);
+WriteResult performInserts(OperationContext* opCtx,
+ const write_ops::Insert& op,
+ bool fromMigrate = false);
WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& op);
WriteResult performDeletes(OperationContext* opCtx, const write_ops::Delete& op);
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index f6e468324c5..1cd53e3bd2f 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -46,6 +46,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/ops/delete.h"
+#include "mongo/db/ops/write_ops_exec.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/s/collection_sharding_state.h"
@@ -367,7 +368,7 @@ Status MigrationDestinationManager::start(const NamespaceString& nss,
void MigrationDestinationManager::cloneDocumentsFromDonor(
OperationContext* opCtx,
- stdx::function<void(OperationContext*, BSONObjIterator)> insertBatchFn,
+ stdx::function<void(OperationContext*, BSONObj)> insertBatchFn,
stdx::function<BSONObj(OperationContext*)> fetchBatchFn) {
ProducerConsumerQueue<BSONObj> batches(1);
@@ -382,7 +383,7 @@ void MigrationDestinationManager::cloneDocumentsFromDonor(
if (arr.isEmpty()) {
return;
}
- insertBatchFn(inserterOpCtx.get(), BSONObjIterator(arr));
+ insertBatchFn(inserterOpCtx.get(), arr);
}
} catch (...) {
stdx::lock_guard<Client> lk(*opCtx->getClient());
@@ -762,56 +763,52 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
_chunkMarkedPending = true; // no lock needed, only the migrate thread looks.
- auto insertBatchFn = [&](OperationContext* opCtx, BSONObjIterator docs) {
- while (docs.more()) {
- opCtx->checkForInterrupt();
+ auto assertNotAborted = [&](OperationContext* opCtx) {
+ opCtx->checkForInterrupt();
+ uassert(50664, "Migration aborted while copying documents", getState() != ABORT);
+ };
- if (getState() == ABORT) {
- auto message = "Migration aborted while copying documents";
- log() << message;
- uasserted(50664, message);
+ auto insertBatchFn = [&](OperationContext* opCtx, BSONObj arr) {
+ int batchNumCloned = 0;
+ int batchClonedBytes = 0;
+
+ assertNotAborted(opCtx);
+
+ write_ops::Insert insertOp(_nss);
+ insertOp.getWriteCommandBase().setOrdered(true);
+ insertOp.setDocuments([&] {
+ std::vector<BSONObj> toInsert;
+ for (const auto& doc : arr) {
+ BSONObj docToClone = doc.Obj();
+ toInsert.push_back(docToClone);
+ batchNumCloned++;
+ batchClonedBytes += docToClone.objsize();
}
+ return toInsert;
+ }());
- BSONObj docToClone = docs.next().Obj();
- {
- OldClientWriteContext cx(opCtx, _nss.ns());
- BSONObj localDoc;
- if (willOverrideLocalId(opCtx,
- _nss,
- min,
- max,
- shardKeyPattern,
- cx.db(),
- docToClone,
- &localDoc)) {
- const std::string errMsg = str::stream()
- << "cannot migrate chunk, local document " << redact(localDoc)
- << " has same _id as cloned "
- << "remote document " << redact(docToClone);
- warning() << errMsg;
-
- // Exception will abort migration cleanly
- uasserted(16976, errMsg);
- }
- Helpers::upsert(opCtx, _nss.ns(), docToClone, true);
- }
- {
- stdx::lock_guard<stdx::mutex> statsLock(_mutex);
- _numCloned++;
- _clonedBytes += docToClone.objsize();
- }
- if (writeConcern.shouldWaitForOtherNodes()) {
- repl::ReplicationCoordinator::StatusAndDuration replStatus =
- repl::getGlobalReplicationCoordinator()->awaitReplication(
- opCtx,
- repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(),
- writeConcern);
- if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) {
- warning() << "secondaryThrottle on, but doc insert timed out; "
- "continuing";
- } else {
- massertStatusOK(replStatus.status);
- }
+ const WriteResult reply = performInserts(opCtx, insertOp, true);
+
+ for (unsigned long i = 0; i < reply.results.size(); ++i) {
+ uassertStatusOK(reply.results[i]);
+ }
+
+ {
+ stdx::lock_guard<stdx::mutex> statsLock(_mutex);
+ _numCloned += batchNumCloned;
+ _clonedBytes += batchClonedBytes;
+ }
+ if (writeConcern.shouldWaitForOtherNodes()) {
+ repl::ReplicationCoordinator::StatusAndDuration replStatus =
+ repl::ReplicationCoordinator::get(opCtx)->awaitReplication(
+ opCtx,
+ repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(),
+ writeConcern);
+ if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) {
+ warning() << "secondaryThrottle on, but doc insert timed out; "
+ "continuing";
+ } else {
+ uassertStatusOK(replStatus.status);
}
}
};
diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h
index ebf3bc25544..c0c5735e068 100644
--- a/src/mongo/db/s/migration_destination_manager.h
+++ b/src/mongo/db/s/migration_destination_manager.h
@@ -109,7 +109,7 @@ public:
*/
static void cloneDocumentsFromDonor(
OperationContext* opCtx,
- stdx::function<void(OperationContext*, BSONObjIterator)> insertBatchFn,
+ stdx::function<void(OperationContext*, BSONObj)> insertBatchFn,
stdx::function<BSONObj(OperationContext*)> fetchBatchFn);
/**
diff --git a/src/mongo/db/s/migration_destination_manager_test.cpp b/src/mongo/db/s/migration_destination_manager_test.cpp
index 6dd3d456a2f..1b694156e86 100644
--- a/src/mongo/db/s/migration_destination_manager_test.cpp
+++ b/src/mongo/db/s/migration_destination_manager_test.cpp
@@ -84,9 +84,9 @@ TEST_F(MigrationDestinationManagerTest, CloneDocumentsFromDonorWorksCorrectly) {
std::vector<BSONObj> resultDocs;
- auto insertBatchFn = [&](OperationContext* opCtx, BSONObjIterator docs) {
- while (docs.more()) {
- resultDocs.push_back(docs.next().Obj().getOwned());
+ auto insertBatchFn = [&](OperationContext* opCtx, BSONObj docs) {
+ for (auto&& docToClone : docs) {
+ resultDocs.push_back(docToClone.Obj().getOwned());
}
};
@@ -122,7 +122,7 @@ TEST_F(MigrationDestinationManagerTest, CloneDocumentsThrowsFetchErrors) {
return fetchBatchResultBuilder.obj();
};
- auto insertBatchFn = [&](OperationContext* opCtx, BSONObjIterator docs) {};
+ auto insertBatchFn = [&](OperationContext* opCtx, BSONObj docs) {};
ASSERT_THROWS_CODE_AND_WHAT(MigrationDestinationManager::cloneDocumentsFromDonor(
operationContext(), insertBatchFn, fetchBatchFn),
@@ -140,7 +140,7 @@ TEST_F(MigrationDestinationManagerTest, CloneDocumentsCatchesInsertErrors) {
return fetchBatchResultBuilder.obj();
};
- auto insertBatchFn = [&](OperationContext* opCtx, BSONObjIterator docs) {
+ auto insertBatchFn = [&](OperationContext* opCtx, BSONObj docs) {
uasserted(ErrorCodes::FailedToParse, "insertion error");
};