summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2018-08-29 11:15:47 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2018-08-29 14:30:16 -0400
commitf49d4876eca3341f6c0539b8525598e31a590748 (patch)
tree76aca5335a41814d36a8d7fa08fcb33a5dbad468
parent0a30697f653ab8a58b509a52ef0c8337c90c7552 (diff)
downloadmongo-f49d4876eca3341f6c0539b8525598e31a590748.tar.gz
SERVER-27725 Use batch insert when migrating chunks
(cherry picked from commit b20b8c2a6930a43004c776f011212af2f2fcd59a)
-rw-r--r--src/mongo/db/ops/write_ops_exec.cpp19
-rw-r--r--src/mongo/db/ops/write_ops_exec.h4
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp96
-rw-r--r--src/mongo/db/s/migration_destination_manager.h2
-rw-r--r--src/mongo/db/s/migration_destination_manager_test.cpp10
5 files changed, 67 insertions, 64 deletions
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp
index 6ea4d88293a..ad307b508bf 100644
--- a/src/mongo/db/ops/write_ops_exec.cpp
+++ b/src/mongo/db/ops/write_ops_exec.cpp
@@ -52,6 +52,7 @@
#include "mongo/db/ops/parsed_update.h"
#include "mongo/db/ops/update_lifecycle_impl.h"
#include "mongo/db/ops/update_request.h"
+#include "mongo/db/ops/write_ops.h"
#include "mongo/db/ops/write_ops_exec.h"
#include "mongo/db/query/get_executor.h"
#include "mongo/db/query/plan_summary_stats.h"
@@ -301,12 +302,13 @@ static WriteResult performCreateIndexes(OperationContext* txn, const InsertOp& w
static void insertDocuments(OperationContext* txn,
Collection* collection,
std::vector<BSONObj>::const_iterator begin,
- std::vector<BSONObj>::const_iterator end) {
+ std::vector<BSONObj>::const_iterator end,
+ bool fromMigrate) {
// Intentionally not using a WRITE_CONFLICT_RETRY_LOOP. That is handled by the caller so it can
// react to oversized batches.
WriteUnitOfWork wuow(txn);
uassertStatusOK(collection->insertDocuments(
- txn, begin, end, &CurOp::get(txn)->debug(), /*enforceQuota*/ true));
+ txn, begin, end, &CurOp::get(txn)->debug(), /*enforceQuota*/ true, fromMigrate));
wuow.commit();
}
@@ -317,7 +319,8 @@ static bool insertBatchAndHandleErrors(OperationContext* txn,
const InsertOp& wholeOp,
const std::vector<BSONObj>& batch,
LastOpFixer* lastOpFixer,
- WriteResult* out) {
+ WriteResult* out,
+ bool fromMigrate) {
if (batch.empty())
return true;
@@ -350,7 +353,8 @@ static bool insertBatchAndHandleErrors(OperationContext* txn,
// First try doing it all together. If all goes well, this is all we need to do.
// See Collection::_insertDocuments for why we do all capped inserts one-at-a-time.
lastOpFixer->startingOp();
- insertDocuments(txn, collection->getCollection(), batch.begin(), batch.end());
+ insertDocuments(
+ txn, collection->getCollection(), batch.begin(), batch.end(), fromMigrate);
lastOpFixer->finishedOpSuccessfully();
globalOpCounters.gotInserts(batch.size());
std::fill_n(
@@ -374,7 +378,7 @@ static bool insertBatchAndHandleErrors(OperationContext* txn,
if (!collection)
acquireCollection();
lastOpFixer->startingOp();
- insertDocuments(txn, collection->getCollection(), it, it + 1);
+ insertDocuments(txn, collection->getCollection(), it, it + 1, fromMigrate);
lastOpFixer->finishedOpSuccessfully();
out->results.emplace_back(WriteResult::SingleResult{1});
curOp.debug().ninserted++;
@@ -396,7 +400,7 @@ static bool insertBatchAndHandleErrors(OperationContext* txn,
return true;
}
-WriteResult performInserts(OperationContext* txn, const InsertOp& wholeOp) {
+WriteResult performInserts(OperationContext* txn, const InsertOp& wholeOp, bool fromMigrate) {
invariant(!txn->lockState()->inAWriteUnitOfWork()); // Does own retries.
auto& curOp = *CurOp::get(txn);
ON_BLOCK_EXIT([&] {
@@ -453,7 +457,8 @@ WriteResult performInserts(OperationContext* txn, const InsertOp& wholeOp) {
continue; // Add more to batch before inserting.
}
- bool canContinue = insertBatchAndHandleErrors(txn, wholeOp, batch, &lastOpFixer, &out);
+ bool canContinue =
+ insertBatchAndHandleErrors(txn, wholeOp, batch, &lastOpFixer, &out, fromMigrate);
batch.clear(); // We won't need the current batch any more.
bytesInBatch = 0;
diff --git a/src/mongo/db/ops/write_ops_exec.h b/src/mongo/db/ops/write_ops_exec.h
index 49d3d2e0cf1..362bf5a4782 100644
--- a/src/mongo/db/ops/write_ops_exec.h
+++ b/src/mongo/db/ops/write_ops_exec.h
@@ -75,8 +75,10 @@ struct WriteResult {
* LastError is updated for failures of individual writes, but not for batch errors reported by an
* exception being thrown from these functions. Callers are responsible for managing LastError in
* that case. This should generally be combined with LastError handling from parse failures.
+ *
+ * 'fromMigrate' indicates whether the operation was induced by a chunk migration
*/
-WriteResult performInserts(OperationContext* txn, const InsertOp& op);
+WriteResult performInserts(OperationContext* txn, const InsertOp& op, bool fromMigrate = false);
WriteResult performUpdates(OperationContext* txn, const UpdateOp& op);
WriteResult performDeletes(OperationContext* txn, const DeleteOp& op);
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index 72284059c07..b589b6831e0 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -47,6 +47,8 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/ops/delete.h"
+#include "mongo/db/ops/write_ops.h"
+#include "mongo/db/ops/write_ops_exec.h"
#include "mongo/db/range_deleter_service.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_coordinator_global.h"
@@ -349,7 +351,7 @@ Status MigrationDestinationManager::start(const NamespaceString& nss,
void MigrationDestinationManager::cloneDocumentsFromDonor(
OperationContext* txn,
- stdx::function<void(OperationContext*, BSONObjIterator)> insertBatchFn,
+ stdx::function<void(OperationContext*, BSONObj)> insertBatchFn,
stdx::function<BSONObj(OperationContext*)> fetchBatchFn) {
ProducerConsumerQueue<BSONObj> batches(1);
@@ -364,7 +366,7 @@ void MigrationDestinationManager::cloneDocumentsFromDonor(
if (arr.isEmpty()) {
return;
}
- insertBatchFn(inserterTxn.get(), BSONObjIterator(arr));
+ insertBatchFn(inserterTxn.get(), arr);
}
} catch (...) {
stdx::lock_guard<Client> lk(*txn->getClient());
@@ -710,56 +712,50 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
const BSONObj migrateCloneRequest = createMigrateCloneRequest(_nss, *_sessionId);
- auto insertBatchFn = [&](OperationContext* txn, BSONObjIterator docs) {
- while (docs.more()) {
- txn->checkForInterrupt();
+ auto assertNotAborted = [&](OperationContext* opCtx) {
+ opCtx->checkForInterrupt();
+ uassert(40655, "Migration aborted while copying documents", getState() != ABORT);
+ };
- if (getState() == ABORT) {
- auto message = "Migration aborted while copying documents";
- log() << message << migrateLog;
- uasserted(40655, message);
- }
+ auto insertBatchFn = [&](OperationContext* opCtx, BSONObj arr) {
+ int batchNumCloned = 0;
+ int batchClonedBytes = 0;
- BSONObj docToClone = docs.next().Obj();
- {
- OldClientWriteContext cx(txn, _nss.ns());
- BSONObj localDoc;
- if (willOverrideLocalId(txn,
- _nss.ns(),
- min,
- max,
- shardKeyPattern,
- cx.db(),
- docToClone,
- &localDoc)) {
- const std::string errMsg = str::stream()
- << "cannot migrate chunk, local document " << redact(localDoc)
- << " has same _id as cloned "
- << "remote document " << redact(docToClone);
- warning() << errMsg;
-
- // Exception will abort migration cleanly
- uasserted(16976, errMsg);
- }
- Helpers::upsert(txn, _nss.ns(), docToClone, true);
- }
- {
- stdx::lock_guard<stdx::mutex> statsLock(_mutex);
- _numCloned++;
- _clonedBytes += docToClone.objsize();
- }
- if (writeConcern.shouldWaitForOtherNodes()) {
- repl::ReplicationCoordinator::StatusAndDuration replStatus =
- repl::ReplicationCoordinator::get(txn)->awaitReplication(
- txn,
- repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(),
- writeConcern);
- if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) {
- warning() << "secondaryThrottle on, but doc insert timed out; "
- "continuing";
- } else {
- massertStatusOK(replStatus.status);
- }
+ assertNotAborted(opCtx);
+
+ std::vector<BSONObj> toInsert;
+ for (const auto& doc : arr) {
+ BSONObj docToClone = doc.Obj();
+ toInsert.push_back(docToClone);
+ batchNumCloned++;
+ batchClonedBytes += docToClone.objsize();
+ }
+ InsertOp insertOp;
+ insertOp.ns = _nss;
+ insertOp.documents = toInsert;
+
+ const WriteResult reply = performInserts(opCtx, insertOp, true);
+
+ for (unsigned long i = 0; i < reply.results.size(); ++i) {
+ uassertStatusOK(reply.results[i]);
+ }
+
+ {
+ stdx::lock_guard<stdx::mutex> statsLock(_mutex);
+ _numCloned += batchNumCloned;
+ _clonedBytes += batchClonedBytes;
+ }
+ if (writeConcern.shouldWaitForOtherNodes()) {
+ repl::ReplicationCoordinator::StatusAndDuration replStatus =
+ repl::ReplicationCoordinator::get(opCtx)->awaitReplication(
+ opCtx,
+ repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(),
+ writeConcern);
+ if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) {
+ warning() << "secondaryThrottle on, but doc insert timed out; "
+ "continuing";
+ } else {
+ uassertStatusOK(replStatus.status);
}
}
};
diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h
index 836761530d4..791aa393116 100644
--- a/src/mongo/db/s/migration_destination_manager.h
+++ b/src/mongo/db/s/migration_destination_manager.h
@@ -105,7 +105,7 @@ public:
*/
static void cloneDocumentsFromDonor(
OperationContext* txn,
- stdx::function<void(OperationContext*, BSONObjIterator)> insertBatchFn,
+ stdx::function<void(OperationContext*, BSONObj)> insertBatchFn,
stdx::function<BSONObj(OperationContext*)> fetchBatchFn);
/**
diff --git a/src/mongo/db/s/migration_destination_manager_test.cpp b/src/mongo/db/s/migration_destination_manager_test.cpp
index f1ce14d3147..b9daad4ad9f 100644
--- a/src/mongo/db/s/migration_destination_manager_test.cpp
+++ b/src/mongo/db/s/migration_destination_manager_test.cpp
@@ -84,9 +84,9 @@ TEST_F(MigrationDestinationManagerTest, CloneDocumentsFromDonorWorksCorrectly) {
std::vector<BSONObj> resultDocs;
- auto insertBatchFn = [&](OperationContext* opCtx, BSONObjIterator docs) {
- while (docs.more()) {
- resultDocs.push_back(docs.next().Obj().getOwned());
+ auto insertBatchFn = [&](OperationContext* opCtx, BSONObj docs) {
+ for (auto&& docToClone : docs) {
+ resultDocs.push_back(docToClone.Obj().getOwned());
}
};
@@ -122,7 +122,7 @@ TEST_F(MigrationDestinationManagerTest, CloneDocumentsThrowsFetchErrors) {
return fetchBatchResultBuilder.obj();
};
- auto insertBatchFn = [&](OperationContext* opCtx, BSONObjIterator docs) {};
+ auto insertBatchFn = [&](OperationContext* opCtx, BSONObj docs) {};
ASSERT_THROWS_CODE_AND_WHAT(MigrationDestinationManager::cloneDocumentsFromDonor(
operationContext(), insertBatchFn, fetchBatchFn),
@@ -140,7 +140,7 @@ TEST_F(MigrationDestinationManagerTest, CloneDocumentsCatchesInsertErrors) {
return fetchBatchResultBuilder.obj();
};
- auto insertBatchFn = [&](OperationContext* opCtx, BSONObjIterator docs) {
+ auto insertBatchFn = [&](OperationContext* opCtx, BSONObj docs) {
uasserted(ErrorCodes::FailedToParse, "insertion error");
};