diff options
author | Geert Bosch <geert@mongodb.com> | 2015-11-13 22:38:55 -0500 |
---|---|---|
committer | Geert Bosch <geert@mongodb.com> | 2015-11-13 22:38:55 -0500 |
commit | 4d9c15d3bdc6ada6bd6cdddb5ba15e10f89337f6 (patch) | |
tree | 5499bd3b8a0956a42297fde25943061a8361831f /src/mongo/db/repl/sync_tail.cpp | |
parent | ba4281986e7aed9830ce47cce1b76006cbb94e67 (diff) | |
download | mongo-4d9c15d3bdc6ada6bd6cdddb5ba15e10f89337f6.tar.gz |
Revert "SERVER-21229 group replicated inserts"
This reverts commit a5e6480126f503cf116276a535ca2bbdc334464f.
Diffstat (limited to 'src/mongo/db/repl/sync_tail.cpp')
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 94 |
1 files changed, 13 insertions, 81 deletions
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 391c64d3a2a..dd85ce8573c 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -281,11 +281,11 @@ Status SyncTail::syncApply(OperationContext* txn, txn->setReplicatedWrites(false); DisableDocumentValidation validationDisabler(txn); - Status status = - applyOperationInLock(txn, db, op, convertUpdateToUpsert, incrementOpsAppliedStats); + Status status = applyOperationInLock(txn, db, op, convertUpdateToUpsert); if (!status.isOK() && status.code() == ErrorCodes::WriteConflict) { throw WriteConflictException(); } + incrementOpsAppliedStats(); return status; }; @@ -343,12 +343,12 @@ Status SyncTail::syncApply(OperationContext* txn, } Status SyncTail::syncApply(OperationContext* txn, const BSONObj& op, bool convertUpdateToUpsert) { - return SyncTail::syncApply(txn, - op, - convertUpdateToUpsert, - applyOperation_inlock, - applyCommand_inlock, - stdx::bind(&Counter64::increment, &opsAppliedStats, 1ULL)); + return syncApply(txn, + op, + convertUpdateToUpsert, + applyOperation_inlock, + applyCommand_inlock, + stdx::bind(&Counter64::increment, &opsAppliedStats, 1ULL)); } @@ -923,15 +923,6 @@ static void initializeWriterThread() { // This free function is used by the writer threads to apply each op void multiSyncApply(const std::vector<BSONObj>& ops, SyncTail* st) { - using OplogEntry = SyncTail::OplogEntry; - - std::vector<OplogEntry> oplogEntries(ops.begin(), ops.end()); - - if (oplogEntries.size() > 1) { - std::stable_sort(oplogEntries.begin(), - oplogEntries.end(), - [](const OplogEntry& l, const OplogEntry& r) { return l.ns < r.ns; }); - } initializeWriterThread(); OperationContextImpl txn; @@ -942,76 +933,17 @@ void multiSyncApply(const std::vector<BSONObj>& ops, SyncTail* st) { txn.lockState()->setIsBatchWriter(true); bool convertUpdatesToUpserts = true; - for (std::vector<OplogEntry>::iterator oplogEntriesIterator = oplogEntries.begin(); - oplogEntriesIterator != oplogEntries.end(); - ++oplogEntriesIterator) { - try { - const OplogEntry& entry = *oplogEntriesIterator; - - if (entry.opType[0] == 'i') { - // Attempt to group inserts if possible. - std::vector<BSONObj> toInsert; - auto endOfGroupableOpsIterator = - std::find_if(oplogEntriesIterator + 1, - oplogEntries.end(), - [&](const OplogEntry& nextEntry) { - return nextEntry.opType[0] != 'i' || nextEntry.ns != entry.ns; - }); - if (endOfGroupableOpsIterator != oplogEntriesIterator + 1) { - // Since we found more than one document, create grouped insert of many - // docs. - BSONObjBuilder groupedInsertBuilder; - // Generate an op object of all elements except for "o", since we need to - // make the "o" field an array of all the o's. - for (auto elem : entry.raw) { - if (elem.fieldNameStringData() != "o") { - groupedInsertBuilder.append(elem); - } - } - - BSONArrayBuilder insertArrayBuilder(groupedInsertBuilder.subarrayStart("o")); - // Group as many inserts as will fit in a single BSONObj. - for (std::vector<OplogEntry>::iterator groupingInterator = oplogEntriesIterator; - groupingInterator != endOfGroupableOpsIterator; - ++groupingInterator) { - if (groupedInsertBuilder.len() + groupingInterator->o.Obj().objsize() > - BSONObjMaxUserSize) { - // End group early, because we hit max bson size limit. - endOfGroupableOpsIterator = groupingInterator; - break; - } - insertArrayBuilder.append(groupingInterator->o.Obj()); - } - insertArrayBuilder.done(); - - try { - // Apply the group of inserts. - uassertStatusOK(SyncTail::syncApply( - &txn, groupedInsertBuilder.done(), convertUpdatesToUpserts)); - // It succeeded, advance the oplogEntriesIterator to the end of the - // group of inserts. - oplogEntriesIterator = endOfGroupableOpsIterator - 1; - continue; - } catch (const DBException& e) { - // The group insert failed, log an error and fall through to the - // application of an individual op. - error() << "Error applying inserts in bulk " << causedBy(e) - << " trying first insert as a lone insert"; - } - } - } - - // Apply an individual (non-grouped) op. - const Status s = SyncTail::syncApply(&txn, entry.raw, convertUpdatesToUpserts); + for (std::vector<BSONObj>::const_iterator it = ops.begin(); it != ops.end(); ++it) { + try { + const Status s = SyncTail::syncApply(&txn, *it, convertUpdatesToUpserts); if (!s.isOK()) { - severe() << "Error applying operation (" << oplogEntriesIterator->raw.toString() - << "): " << s; + severe() << "Error applying operation (" << it->toString() << "): " << s; fassertFailedNoTrace(16359); } } catch (const DBException& e) { severe() << "writer worker caught exception: " << causedBy(e) - << " on: " << oplogEntriesIterator->raw.toString(); + << " on: " << it->toString(); if (inShutdown()) { return; |