summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/sync_tail.cpp
diff options
context:
space:
mode:
authorGeert Bosch <geert@mongodb.com>2015-11-13 22:38:55 -0500
committerGeert Bosch <geert@mongodb.com>2015-11-13 22:38:55 -0500
commit4d9c15d3bdc6ada6bd6cdddb5ba15e10f89337f6 (patch)
tree5499bd3b8a0956a42297fde25943061a8361831f /src/mongo/db/repl/sync_tail.cpp
parentba4281986e7aed9830ce47cce1b76006cbb94e67 (diff)
downloadmongo-4d9c15d3bdc6ada6bd6cdddb5ba15e10f89337f6.tar.gz
Revert "SERVER-21229 group replicated inserts"
This reverts commit a5e6480126f503cf116276a535ca2bbdc334464f.
Diffstat (limited to 'src/mongo/db/repl/sync_tail.cpp')
-rw-r--r--src/mongo/db/repl/sync_tail.cpp94
1 files changed, 13 insertions, 81 deletions
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 391c64d3a2a..dd85ce8573c 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -281,11 +281,11 @@ Status SyncTail::syncApply(OperationContext* txn,
txn->setReplicatedWrites(false);
DisableDocumentValidation validationDisabler(txn);
- Status status =
- applyOperationInLock(txn, db, op, convertUpdateToUpsert, incrementOpsAppliedStats);
+ Status status = applyOperationInLock(txn, db, op, convertUpdateToUpsert);
if (!status.isOK() && status.code() == ErrorCodes::WriteConflict) {
throw WriteConflictException();
}
+ incrementOpsAppliedStats();
return status;
};
@@ -343,12 +343,12 @@ Status SyncTail::syncApply(OperationContext* txn,
}
Status SyncTail::syncApply(OperationContext* txn, const BSONObj& op, bool convertUpdateToUpsert) {
- return SyncTail::syncApply(txn,
- op,
- convertUpdateToUpsert,
- applyOperation_inlock,
- applyCommand_inlock,
- stdx::bind(&Counter64::increment, &opsAppliedStats, 1ULL));
+ return syncApply(txn,
+ op,
+ convertUpdateToUpsert,
+ applyOperation_inlock,
+ applyCommand_inlock,
+ stdx::bind(&Counter64::increment, &opsAppliedStats, 1ULL));
}
@@ -923,15 +923,6 @@ static void initializeWriterThread() {
// This free function is used by the writer threads to apply each op
void multiSyncApply(const std::vector<BSONObj>& ops, SyncTail* st) {
- using OplogEntry = SyncTail::OplogEntry;
-
- std::vector<OplogEntry> oplogEntries(ops.begin(), ops.end());
-
- if (oplogEntries.size() > 1) {
- std::stable_sort(oplogEntries.begin(),
- oplogEntries.end(),
- [](const OplogEntry& l, const OplogEntry& r) { return l.ns < r.ns; });
- }
initializeWriterThread();
OperationContextImpl txn;
@@ -942,76 +933,17 @@ void multiSyncApply(const std::vector<BSONObj>& ops, SyncTail* st) {
txn.lockState()->setIsBatchWriter(true);
bool convertUpdatesToUpserts = true;
- for (std::vector<OplogEntry>::iterator oplogEntriesIterator = oplogEntries.begin();
- oplogEntriesIterator != oplogEntries.end();
- ++oplogEntriesIterator) {
- try {
- const OplogEntry& entry = *oplogEntriesIterator;
-
- if (entry.opType[0] == 'i') {
- // Attempt to group inserts if possible.
- std::vector<BSONObj> toInsert;
- auto endOfGroupableOpsIterator =
- std::find_if(oplogEntriesIterator + 1,
- oplogEntries.end(),
- [&](const OplogEntry& nextEntry) {
- return nextEntry.opType[0] != 'i' || nextEntry.ns != entry.ns;
- });
- if (endOfGroupableOpsIterator != oplogEntriesIterator + 1) {
- // Since we found more than one document, create grouped insert of many
- // docs.
- BSONObjBuilder groupedInsertBuilder;
- // Generate an op object of all elements except for "o", since we need to
- // make the "o" field an array of all the o's.
- for (auto elem : entry.raw) {
- if (elem.fieldNameStringData() != "o") {
- groupedInsertBuilder.append(elem);
- }
- }
-
- BSONArrayBuilder insertArrayBuilder(groupedInsertBuilder.subarrayStart("o"));
- // Group as many inserts as will fit in a single BSONObj.
- for (std::vector<OplogEntry>::iterator groupingInterator = oplogEntriesIterator;
- groupingInterator != endOfGroupableOpsIterator;
- ++groupingInterator) {
- if (groupedInsertBuilder.len() + groupingInterator->o.Obj().objsize() >
- BSONObjMaxUserSize) {
- // End group early, because we hit max bson size limit.
- endOfGroupableOpsIterator = groupingInterator;
- break;
- }
- insertArrayBuilder.append(groupingInterator->o.Obj());
- }
- insertArrayBuilder.done();
-
- try {
- // Apply the group of inserts.
- uassertStatusOK(SyncTail::syncApply(
- &txn, groupedInsertBuilder.done(), convertUpdatesToUpserts));
- // It succeeded, advance the oplogEntriesIterator to the end of the
- // group of inserts.
- oplogEntriesIterator = endOfGroupableOpsIterator - 1;
- continue;
- } catch (const DBException& e) {
- // The group insert failed, log an error and fall through to the
- // application of an individual op.
- error() << "Error applying inserts in bulk " << causedBy(e)
- << " trying first insert as a lone insert";
- }
- }
- }
-
- // Apply an individual (non-grouped) op.
- const Status s = SyncTail::syncApply(&txn, entry.raw, convertUpdatesToUpserts);
+ for (std::vector<BSONObj>::const_iterator it = ops.begin(); it != ops.end(); ++it) {
+ try {
+ const Status s = SyncTail::syncApply(&txn, *it, convertUpdatesToUpserts);
if (!s.isOK()) {
- severe() << "Error applying operation (" << oplogEntriesIterator->raw.toString()
- << "): " << s;
+ severe() << "Error applying operation (" << it->toString() << "): " << s;
fassertFailedNoTrace(16359);
}
} catch (const DBException& e) {
severe() << "writer worker caught exception: " << causedBy(e)
- << " on: " << oplogEntriesIterator->raw.toString();
+ << " on: " << it->toString();
if (inShutdown()) {
return;