summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/sync_tail.cpp
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2016-06-03 11:52:15 -0400
committerBenety Goh <benety@mongodb.com>2016-06-03 15:30:35 -0400
commit98b5cf70786fa1e59e1d7dd2bc18ced1ebba827c (patch)
tree48415d21d958b6be996cdde20fe2df77a98c3fbd /src/mongo/db/repl/sync_tail.cpp
parentf617392d541efa7d9ac5615ab58ddeedfa1ef779 (diff)
downloadmongo-98b5cf70786fa1e59e1d7dd2bc18ced1ebba827c.tar.gz
added unit tests for repl::multiSyncApply
Diffstat (limited to 'src/mongo/db/repl/sync_tail.cpp')
-rw-r--r--src/mongo/db/repl/sync_tail.cpp68
1 files changed, 40 insertions, 28 deletions
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 6e9ceabc098..1e01f0d11cf 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -1006,8 +1006,24 @@ static void initializeWriterThread() {
// This free function is used by the writer threads to apply each op
void multiSyncApply(const std::vector<OplogEntry>& ops, SyncTail*) {
- std::vector<OplogEntry> oplogEntries(ops.begin(), ops.end());
- std::vector<OplogEntry*> oplogEntryPointers(oplogEntries.size());
+ initializeWriterThread();
+ auto txn = cc().makeOperationContext();
+ auto syncApply = [](OperationContext* txn, const BSONObj& op, bool convertUpdateToUpsert) {
+ return SyncTail::syncApply(txn, op, convertUpdateToUpsert);
+ };
+ fassertNoTrace(16359, multiSyncApply_noAbort(txn.get(), ops, syncApply));
+}
+
+Status multiSyncApply_noAbort(OperationContext* txn,
+ const std::vector<OplogEntry>& oplogEntries,
+ SyncApplyFn syncApply) {
+ txn->setReplicatedWrites(false);
+ DisableDocumentValidation validationDisabler(txn);
+
+ // allow us to get through the magic barrier
+ txn->lockState()->setIsBatchWriter(true);
+
+ std::vector<const OplogEntry*> oplogEntryPointers(oplogEntries.size());
for (size_t i = 0; i < oplogEntries.size(); i++) {
oplogEntryPointers[i] = &oplogEntries[i];
}
@@ -1015,27 +1031,18 @@ void multiSyncApply(const std::vector<OplogEntry>& ops, SyncTail*) {
if (oplogEntryPointers.size() > 1) {
std::stable_sort(oplogEntryPointers.begin(),
oplogEntryPointers.end(),
- [](OplogEntry* l, OplogEntry* r) { return l->ns < r->ns; });
+ [](const OplogEntry* l, const OplogEntry* r) { return l->ns < r->ns; });
}
- initializeWriterThread();
-
- const ServiceContext::UniqueOperationContext txnPtr = cc().makeOperationContext();
- OperationContext& txn = *txnPtr;
- txn.setReplicatedWrites(false);
- DisableDocumentValidation validationDisabler(&txn);
-
- // allow us to get through the magic barrier
- txn.lockState()->setIsBatchWriter(true);
bool convertUpdatesToUpserts = true;
// doNotGroupBeforePoint is used to prevent retrying bad group inserts by marking the final op
// of a failed group and not allowing further group inserts until that op has been processed.
- std::vector<OplogEntry*>::iterator doNotGroupBeforePoint = oplogEntryPointers.begin();
+ auto doNotGroupBeforePoint = oplogEntryPointers.begin();
- for (std::vector<OplogEntry*>::iterator oplogEntriesIterator = oplogEntryPointers.begin();
+ for (auto oplogEntriesIterator = oplogEntryPointers.begin();
oplogEntriesIterator != oplogEntryPointers.end();
++oplogEntriesIterator) {
- OplogEntry* entry = *oplogEntriesIterator;
+ auto entry = *oplogEntriesIterator;
if (entry->opType[0] == 'i' && !entry->isForCappedCollection &&
oplogEntriesIterator > doNotGroupBeforePoint) {
// Attempt to group inserts if possible.
@@ -1043,7 +1050,9 @@ void multiSyncApply(const std::vector<OplogEntry>& ops, SyncTail*) {
int batchSize = 0;
int batchCount = 0;
auto endOfGroupableOpsIterator = std::find_if(
- oplogEntriesIterator + 1, oplogEntryPointers.end(), [&](OplogEntry* nextEntry) {
+ oplogEntriesIterator + 1,
+ oplogEntryPointers.end(),
+ [&](const OplogEntry* nextEntry) {
return nextEntry->opType[0] != 'i' || // Must be an insert.
nextEntry->ns != entry->ns || // Must be the same namespace.
// Must not create too large an object.
@@ -1064,7 +1073,7 @@ void multiSyncApply(const std::vector<OplogEntry>& ops, SyncTail*) {
// Populate the "o" field with all the groupable inserts.
BSONArrayBuilder insertArrayBuilder(groupedInsertBuilder.subarrayStart("o"));
- for (std::vector<OplogEntry*>::iterator groupingIterator = oplogEntriesIterator;
+ for (auto groupingIterator = oplogEntriesIterator;
groupingIterator != endOfGroupableOpsIterator;
++groupingIterator) {
insertArrayBuilder.append((*groupingIterator)->o.Obj());
@@ -1073,8 +1082,8 @@ void multiSyncApply(const std::vector<OplogEntry>& ops, SyncTail*) {
try {
// Apply the group of inserts.
- uassertStatusOK(SyncTail::syncApply(
- &txn, groupedInsertBuilder.done(), convertUpdatesToUpserts));
+ uassertStatusOK(
+ syncApply(txn, groupedInsertBuilder.done(), convertUpdatesToUpserts));
// It succeeded, advance the oplogEntriesIterator to the end of the
// group of inserts.
oplogEntriesIterator = endOfGroupableOpsIterator - 1;
@@ -1082,11 +1091,12 @@ void multiSyncApply(const std::vector<OplogEntry>& ops, SyncTail*) {
} catch (const DBException& e) {
// The group insert failed, log an error and fall through to the
// application of an individual op.
- error() << "Error applying inserts in bulk " << causedBy(e)
- << " trying first insert as a lone insert";
-
+ str::stream msg;
+ msg << "Error applying inserts in bulk " << causedBy(e)
+ << " trying first insert as a lone insert";
+ error() << std::string(msg);
if (inShutdown()) {
- return;
+ return {ErrorCodes::InterruptedAtShutdown, msg};
}
// Avoid quadratic run time from failed insert by not retrying until we
@@ -1098,26 +1108,28 @@ void multiSyncApply(const std::vector<OplogEntry>& ops, SyncTail*) {
try {
// Apply an individual (non-grouped) op.
- const Status s = SyncTail::syncApply(&txn, entry->raw, convertUpdatesToUpserts);
+ const Status s = syncApply(txn, entry->raw, convertUpdatesToUpserts);
if (!s.isOK()) {
severe() << "Error applying operation (" << entry->raw.toString() << "): " << s;
if (inShutdown()) {
- return;
+ return {ErrorCodes::InterruptedAtShutdown, s.toString()};
}
- fassertFailedNoTrace(16359);
+ return s;
}
} catch (const DBException& e) {
severe() << "writer worker caught exception: " << causedBy(e)
<< " on: " << entry->raw.toString();
if (inShutdown()) {
- return;
+ return {ErrorCodes::InterruptedAtShutdown, e.toString()};
}
- fassertFailedNoTrace(16360);
+ return e.toStatus();
}
}
+
+ return Status::OK();
}
// This free function is used by the initial sync writer threads to apply each op