diff options
author | Benety Goh <benety@mongodb.com> | 2016-06-03 11:52:15 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2016-06-03 15:30:35 -0400 |
commit | 98b5cf70786fa1e59e1d7dd2bc18ced1ebba827c (patch) | |
tree | 48415d21d958b6be996cdde20fe2df77a98c3fbd /src/mongo/db/repl/sync_tail.cpp | |
parent | f617392d541efa7d9ac5615ab58ddeedfa1ef779 (diff) | |
download | mongo-98b5cf70786fa1e59e1d7dd2bc18ced1ebba827c.tar.gz |
added unit tests for repl::multiSyncApply
Diffstat (limited to 'src/mongo/db/repl/sync_tail.cpp')
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 68 |
1 files changed, 40 insertions, 28 deletions
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 6e9ceabc098..1e01f0d11cf 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -1006,8 +1006,24 @@ static void initializeWriterThread() { // This free function is used by the writer threads to apply each op void multiSyncApply(const std::vector<OplogEntry>& ops, SyncTail*) { - std::vector<OplogEntry> oplogEntries(ops.begin(), ops.end()); - std::vector<OplogEntry*> oplogEntryPointers(oplogEntries.size()); + initializeWriterThread(); + auto txn = cc().makeOperationContext(); + auto syncApply = [](OperationContext* txn, const BSONObj& op, bool convertUpdateToUpsert) { + return SyncTail::syncApply(txn, op, convertUpdateToUpsert); + }; + fassertNoTrace(16359, multiSyncApply_noAbort(txn.get(), ops, syncApply)); +} + +Status multiSyncApply_noAbort(OperationContext* txn, + const std::vector<OplogEntry>& oplogEntries, + SyncApplyFn syncApply) { + txn->setReplicatedWrites(false); + DisableDocumentValidation validationDisabler(txn); + + // allow us to get through the magic barrier + txn->lockState()->setIsBatchWriter(true); + + std::vector<const OplogEntry*> oplogEntryPointers(oplogEntries.size()); for (size_t i = 0; i < oplogEntries.size(); i++) { oplogEntryPointers[i] = &oplogEntries[i]; } @@ -1015,27 +1031,18 @@ void multiSyncApply(const std::vector<OplogEntry>& ops, SyncTail*) { if (oplogEntryPointers.size() > 1) { std::stable_sort(oplogEntryPointers.begin(), oplogEntryPointers.end(), - [](OplogEntry* l, OplogEntry* r) { return l->ns < r->ns; }); + [](const OplogEntry* l, const OplogEntry* r) { return l->ns < r->ns; }); } - initializeWriterThread(); - - const ServiceContext::UniqueOperationContext txnPtr = cc().makeOperationContext(); - OperationContext& txn = *txnPtr; - txn.setReplicatedWrites(false); - DisableDocumentValidation validationDisabler(&txn); - - // allow us to get through the magic barrier - txn.lockState()->setIsBatchWriter(true); bool convertUpdatesToUpserts = true; // doNotGroupBeforePoint is used to prevent retrying bad group inserts by marking the final op // of a failed group and not allowing further group inserts until that op has been processed. - std::vector<OplogEntry*>::iterator doNotGroupBeforePoint = oplogEntryPointers.begin(); + auto doNotGroupBeforePoint = oplogEntryPointers.begin(); - for (std::vector<OplogEntry*>::iterator oplogEntriesIterator = oplogEntryPointers.begin(); + for (auto oplogEntriesIterator = oplogEntryPointers.begin(); oplogEntriesIterator != oplogEntryPointers.end(); ++oplogEntriesIterator) { - OplogEntry* entry = *oplogEntriesIterator; + auto entry = *oplogEntriesIterator; if (entry->opType[0] == 'i' && !entry->isForCappedCollection && oplogEntriesIterator > doNotGroupBeforePoint) { // Attempt to group inserts if possible. @@ -1043,7 +1050,9 @@ void multiSyncApply(const std::vector<OplogEntry>& ops, SyncTail*) { int batchSize = 0; int batchCount = 0; auto endOfGroupableOpsIterator = std::find_if( - oplogEntriesIterator + 1, oplogEntryPointers.end(), [&](OplogEntry* nextEntry) { + oplogEntriesIterator + 1, + oplogEntryPointers.end(), + [&](const OplogEntry* nextEntry) { return nextEntry->opType[0] != 'i' || // Must be an insert. nextEntry->ns != entry->ns || // Must be the same namespace. // Must not create too large an object. @@ -1064,7 +1073,7 @@ void multiSyncApply(const std::vector<OplogEntry>& ops, SyncTail*) { // Populate the "o" field with all the groupable inserts. BSONArrayBuilder insertArrayBuilder(groupedInsertBuilder.subarrayStart("o")); - for (std::vector<OplogEntry*>::iterator groupingIterator = oplogEntriesIterator; + for (auto groupingIterator = oplogEntriesIterator; groupingIterator != endOfGroupableOpsIterator; ++groupingIterator) { insertArrayBuilder.append((*groupingIterator)->o.Obj()); @@ -1073,8 +1082,8 @@ void multiSyncApply(const std::vector<OplogEntry>& ops, SyncTail*) { try { // Apply the group of inserts. - uassertStatusOK(SyncTail::syncApply( - &txn, groupedInsertBuilder.done(), convertUpdatesToUpserts)); + uassertStatusOK( + syncApply(txn, groupedInsertBuilder.done(), convertUpdatesToUpserts)); // It succeeded, advance the oplogEntriesIterator to the end of the // group of inserts. oplogEntriesIterator = endOfGroupableOpsIterator - 1; @@ -1082,11 +1091,12 @@ void multiSyncApply(const std::vector<OplogEntry>& ops, SyncTail*) { } catch (const DBException& e) { // The group insert failed, log an error and fall through to the // application of an individual op. - error() << "Error applying inserts in bulk " << causedBy(e) - << " trying first insert as a lone insert"; - + str::stream msg; + msg << "Error applying inserts in bulk " << causedBy(e) + << " trying first insert as a lone insert"; + error() << std::string(msg); if (inShutdown()) { - return; + return {ErrorCodes::InterruptedAtShutdown, msg}; } // Avoid quadratic run time from failed insert by not retrying until we @@ -1098,26 +1108,28 @@ void multiSyncApply(const std::vector<OplogEntry>& ops, SyncTail*) { try { // Apply an individual (non-grouped) op. - const Status s = SyncTail::syncApply(&txn, entry->raw, convertUpdatesToUpserts); + const Status s = syncApply(txn, entry->raw, convertUpdatesToUpserts); if (!s.isOK()) { severe() << "Error applying operation (" << entry->raw.toString() << "): " << s; if (inShutdown()) { - return; + return {ErrorCodes::InterruptedAtShutdown, s.toString()}; } - fassertFailedNoTrace(16359); + return s; } } catch (const DBException& e) { severe() << "writer worker caught exception: " << causedBy(e) << " on: " << entry->raw.toString(); if (inShutdown()) { - return; + return {ErrorCodes::InterruptedAtShutdown, e.toString()}; } - fassertFailedNoTrace(16360); + return e.toStatus(); } } + + return Status::OK(); } // This free function is used by the initial sync writer threads to apply each op |