/** * Copyright (C) 2018 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication #include "mongo/platform/basic.h" #include "mongo/db/repl/applier_helpers.h" #include #include #include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/query/query_knobs.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/sync_tail.h" #include "mongo/util/assert_util.h" #include "mongo/util/log.h" namespace mongo { namespace repl { namespace { // Must not create too large an object. const auto kInsertGroupMaxBatchSize = insertVectorMaxBytes; // Limit number of ops in a single group. constexpr auto kInsertGroupMaxBatchCount = 64; } // namespace // static void ApplierHelpers::stableSortByNamespace(MultiApplier::OperationPtrs* oplogEntryPointers) { if (oplogEntryPointers->size() < 1U) { return; } auto nssComparator = [](const OplogEntry* l, const OplogEntry* r) { return l->getNamespace() < r->getNamespace(); }; std::stable_sort(oplogEntryPointers->begin(), oplogEntryPointers->end(), nssComparator); } using InsertGroup = ApplierHelpers::InsertGroup; InsertGroup::InsertGroup(ApplierHelpers::OperationPtrs* ops, OperationContext* opCtx, InsertGroup::Mode mode) : _doNotGroupBeforePoint(ops->cbegin()), _end(ops->cend()), _opCtx(opCtx), _mode(mode) {} StatusWith InsertGroup::groupAndApplyInserts(ConstIterator it) { const auto& entry = **it; // The following conditions must be met before attempting to group the oplog entries starting // at 'oplogEntriesIterator': // 1) The CRUD operation must an insert; // 2) The namespace that we are inserting into cannot be a capped collection; // 3) We have not attempted to group this insert during a previous call to this function. if (entry.getOpType() != OpTypeEnum::kInsert) { return Status(ErrorCodes::TypeMismatch, "Can only group insert operations."); } if (entry.isForCappedCollection) { return Status(ErrorCodes::InvalidOptions, "Cannot group insert operations on capped collections."); } if (it <= _doNotGroupBeforePoint) { return Status(ErrorCodes::InvalidPath, "Cannot group an insert operation that we previously attempted to group."); } // Attempt to group 'insert' ops if possible. std::vector toInsert; // Make sure to include the first op in the batch size. auto batchSize = entry.getObject().objsize(); auto batchCount = OperationPtrs::size_type(1); auto batchNamespace = entry.getNamespace(); /** * Search for the op that delimits this insert batch, and save its position * in endOfGroupableOpsIterator. For example, given the following list of oplog * entries with a sequence of groupable inserts: * * S--------------E * u, u, u, i, i, i, i, i, d, d * * S: start of insert group * E: end of groupable ops * * E is the position of endOfGroupableOpsIterator. i.e. endOfGroupableOpsIterator * will point to the first op that *can't* be added to the current insert group. */ auto endOfGroupableOpsIterator = std::find_if(it + 1, _end, [&](const OplogEntry* nextEntry) -> bool { auto opNamespace = nextEntry->getNamespace(); batchSize += nextEntry->getObject().objsize(); batchCount += 1; // Only add the op to this batch if it passes the criteria. return nextEntry->getOpType() != OpTypeEnum::kInsert // Must be an insert. || opNamespace != batchNamespace // Must be in the same namespace. || batchSize > kInsertGroupMaxBatchSize // Must not create too large an object. || batchCount > kInsertGroupMaxBatchCount; // Limit number of ops in a single group. }); // See if we were able to create a group that contains more than a single op. if (std::distance(it, endOfGroupableOpsIterator) == 1) { return Status(ErrorCodes::NoSuchKey, "Not able to create a group with more than a single insert operation"); } // Since we found more than one document, create grouped insert of many docs. // We are going to group many 'i' ops into one big 'i' op, with array fields for // 'ts', 't', and 'o', corresponding to each individual op. // For example: // { ts: Timestamp(1,1), t:1, ns: "test.foo", op:"i", o: {_id:1} } // { ts: Timestamp(1,2), t:1, ns: "test.foo", op:"i", o: {_id:2} } // become: // { ts: [Timestamp(1, 1), Timestamp(1, 2)], // t: [1, 1], // o: [{_id: 1}, {_id: 2}], // ns: "test.foo", // op: "i" } BSONObjBuilder groupedInsertBuilder; // Populate the "ts" field with an array of all the grouped inserts' timestamps. { BSONArrayBuilder tsArrayBuilder(groupedInsertBuilder.subarrayStart("ts")); for (auto groupingIt = it; groupingIt != endOfGroupableOpsIterator; ++groupingIt) { tsArrayBuilder.append((*groupingIt)->getTimestamp()); } } // Populate the "t" (term) field with an array of all the grouped inserts' terms. { BSONArrayBuilder tArrayBuilder(groupedInsertBuilder.subarrayStart("t")); for (auto groupingIt = it; groupingIt != endOfGroupableOpsIterator; ++groupingIt) { auto parsedTerm = (*groupingIt)->getTerm(); long long term = OpTime::kUninitializedTerm; // Term may not be present (pv0) if (parsedTerm) { term = parsedTerm.get(); } tArrayBuilder.append(term); } } // Populate the "o" field with an array of all the grouped inserts. { BSONArrayBuilder oArrayBuilder(groupedInsertBuilder.subarrayStart("o")); for (auto groupingIt = it; groupingIt != endOfGroupableOpsIterator; ++groupingIt) { oArrayBuilder.append((*groupingIt)->getObject()); } } // Generate an op object of all elements except for "ts", "t", and "o", since we // need to make those fields arrays of all the ts's, t's, and o's. groupedInsertBuilder.appendElementsUnique(entry.raw); auto groupedInsertObj = groupedInsertBuilder.done(); try { // Apply the group of inserts. uassertStatusOK(SyncTail::syncApply(_opCtx, groupedInsertObj, _mode)); // It succeeded, advance the oplogEntriesIterator to the end of the // group of inserts. return endOfGroupableOpsIterator - 1; } catch (...) { // The group insert failed, log an error and fall through to the // application of an individual op. auto status = mongo::exceptionToStatus(); error() << "Error applying inserts in bulk " << causedBy(redact(status)) << ": " << redact(groupedInsertObj) << ". Trying first insert as a lone insert: " << redact(entry.raw); // Avoid quadratic run time from failed insert by not retrying until we // are beyond this group of ops. _doNotGroupBeforePoint = endOfGroupableOpsIterator - 1; return status.withContext(str::stream() << "Error applying inserts in bulk: " << redact(groupedInsertObj) << ". Trying first insert as a lone insert: " << redact(entry.raw)); } MONGO_UNREACHABLE; } } // namespace repl } // namespace mongo