summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl
diff options
context:
space:
mode:
authorLingzhi Deng <lingzhi.deng@mongodb.com>2019-06-24 16:44:36 -0400
committerLingzhi Deng <lingzhi.deng@mongodb.com>2019-07-29 10:36:12 -0400
commit102c20ab166050ae7357732070bc8262aa61749c (patch)
tree6f1f7149444fe753ad7442b0622d4dc230e24635 /src/mongo/db/repl
parent0ef81e97a8f60329c44640e6b76189a3b51bd42a (diff)
downloadmongo-102c20ab166050ae7357732070bc8262aa61749c.tar.gz
SERVER-37180: Use OplogEntry everywhere in oplog application
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r--src/mongo/db/repl/SConscript3
-rw-r--r--src/mongo/db/repl/applier_helpers.cpp58
-rw-r--r--src/mongo/db/repl/apply_ops.cpp24
-rw-r--r--src/mongo/db/repl/dbcheck.cpp15
-rw-r--r--src/mongo/db/repl/dbcheck.h4
-rw-r--r--src/mongo/db/repl/oplog.cpp851
-rw-r--r--src/mongo/db/repl/oplog.h8
-rw-r--r--src/mongo/db/repl/oplog_entry.h7
-rw-r--r--src/mongo/db/repl/oplog_entry_batch.cpp85
-rw-r--r--src/mongo/db/repl/oplog_entry_batch.h86
-rw-r--r--src/mongo/db/repl/sync_tail.cpp39
-rw-r--r--src/mongo/db/repl/sync_tail.h11
-rw-r--r--src/mongo/db/repl/sync_tail_test.cpp91
-rw-r--r--src/mongo/db/repl/sync_tail_test_fixture.cpp8
-rw-r--r--src/mongo/db/repl/sync_tail_test_fixture.h2
-rw-r--r--src/mongo/db/repl/transaction_oplog_application.cpp2
16 files changed, 640 insertions, 654 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index a5817517955..69086a7319f 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -33,6 +33,7 @@ env.Library(
source=[
'apply_ops.cpp',
'oplog.cpp',
+ 'oplog_entry_batch.cpp',
'transaction_oplog_application.cpp',
env.Idlc('apply_ops.idl')[0],
],
@@ -1394,4 +1395,4 @@ env.Library(
'$BUILD_DIR/mongo/base',
'election_reason_counter',
],
-) \ No newline at end of file
+)
diff --git a/src/mongo/db/repl/applier_helpers.cpp b/src/mongo/db/repl/applier_helpers.cpp
index 82a4cade0ed..c12fdcbeaf1 100644
--- a/src/mongo/db/repl/applier_helpers.cpp
+++ b/src/mongo/db/repl/applier_helpers.cpp
@@ -136,58 +136,11 @@ StatusWith<InsertGroup::ConstIterator> InsertGroup::groupAndApplyInserts(ConstIt
"Not able to create a group with more than a single insert operation");
}
- // Since we found more than one document, create grouped insert of many docs.
- // We are going to group many 'i' ops into one big 'i' op, with array fields for
- // 'ts', 't', and 'o', corresponding to each individual op.
- // For example:
- // { ts: Timestamp(1,1), t:1, ns: "test.foo", op:"i", o: {_id:1} }
- // { ts: Timestamp(1,2), t:1, ns: "test.foo", op:"i", o: {_id:2} }
- // become:
- // { ts: [Timestamp(1, 1), Timestamp(1, 2)],
- // t: [1, 1],
- // o: [{_id: 1}, {_id: 2}],
- // ns: "test.foo",
- // op: "i" }
- BSONObjBuilder groupedInsertBuilder;
-
- // Populate the "ts" field with an array of all the grouped inserts' timestamps.
- {
- BSONArrayBuilder tsArrayBuilder(groupedInsertBuilder.subarrayStart("ts"));
- for (auto groupingIt = it; groupingIt != endOfGroupableOpsIterator; ++groupingIt) {
- tsArrayBuilder.append((*groupingIt)->getTimestamp());
- }
- }
-
- // Populate the "t" (term) field with an array of all the grouped inserts' terms.
- {
- BSONArrayBuilder tArrayBuilder(groupedInsertBuilder.subarrayStart("t"));
- for (auto groupingIt = it; groupingIt != endOfGroupableOpsIterator; ++groupingIt) {
- auto parsedTerm = (*groupingIt)->getTerm();
- long long term = OpTime::kUninitializedTerm;
- // Term may not be present (pv0)
- if (parsedTerm) {
- term = parsedTerm.get();
- }
- tArrayBuilder.append(term);
- }
- }
-
- // Populate the "o" field with an array of all the grouped inserts.
- {
- BSONArrayBuilder oArrayBuilder(groupedInsertBuilder.subarrayStart("o"));
- for (auto groupingIt = it; groupingIt != endOfGroupableOpsIterator; ++groupingIt) {
- oArrayBuilder.append((*groupingIt)->getObject());
- }
- }
-
- // Generate an op object of all elements except for "ts", "t", and "o", since we
- // need to make those fields arrays of all the ts's, t's, and o's.
- groupedInsertBuilder.appendElementsUnique(entry.getRaw());
-
- auto groupedInsertObj = groupedInsertBuilder.done();
+ // Create an oplog entry batch for grouped inserts.
+ OplogEntryBatch groupedInsertBatch(it, endOfGroupableOpsIterator);
try {
- // Apply the group of inserts.
- uassertStatusOK(SyncTail::syncApply(_opCtx, groupedInsertObj, _mode, boost::none));
+ // Apply the group of inserts by passing in groupedInsertBatch.
+ uassertStatusOK(SyncTail::syncApply(_opCtx, groupedInsertBatch, _mode, boost::none));
// It succeeded, advance the oplogEntriesIterator to the end of the
// group of inserts.
return endOfGroupableOpsIterator - 1;
@@ -195,7 +148,8 @@ StatusWith<InsertGroup::ConstIterator> InsertGroup::groupAndApplyInserts(ConstIt
// The group insert failed, log an error and fall through to the
// application of an individual op.
auto status = exceptionToStatus().withContext(
- str::stream() << "Error applying inserts in bulk: " << redact(groupedInsertObj)
+ str::stream() << "Error applying inserts in bulk: "
+ << redact(groupedInsertBatch.toBSON())
<< ". Trying first insert as a lone insert: " << redact(entry.getRaw()));
// It's not an error during initial sync to encounter DuplicateKey errors.
diff --git a/src/mongo/db/repl/apply_ops.cpp b/src/mongo/db/repl/apply_ops.cpp
index 03e0c8ac566..76f87222aad 100644
--- a/src/mongo/db/repl/apply_ops.cpp
+++ b/src/mongo/db/repl/apply_ops.cpp
@@ -160,21 +160,26 @@ Status _applyOps(OperationContext* opCtx,
<< nss.ns() << " in atomic applyOps mode: " << redact(opObj));
}
+ BSONObjBuilder builder;
+ builder.appendElements(opObj);
+ if (!builder.hasField(OplogEntry::kTimestampFieldName)) {
+ builder.append(OplogEntry::kTimestampFieldName, Timestamp());
+ }
// Reject malformed operations in an atomic applyOps.
- try {
- ReplOperation::parse(IDLParserErrorContext("applyOps"), opObj);
- } catch (...) {
+ auto entry = OplogEntry::parse(builder.done());
+ if (!entry.isOK()) {
uasserted(ErrorCodes::AtomicityFailure,
str::stream()
<< "cannot apply a malformed operation in atomic applyOps mode: "
- << redact(opObj) << "; will retry without atomicity: "
- << exceptionToStatus().toString());
+ << redact(opObj)
+ << "; will retry without atomicity: " << entry.getStatus());
}
OldClientContext ctx(opCtx, nss.ns());
+ const auto& op = entry.getValue();
status = repl::applyOperation_inlock(
- opCtx, ctx.db(), opObj, alwaysUpsert, oplogApplicationMode);
+ opCtx, ctx.db(), &op, alwaysUpsert, oplogApplicationMode);
if (!status.isOK())
return status;
@@ -207,12 +212,11 @@ Status _applyOps(OperationContext* opCtx,
if (!builder.hasField(OplogEntry::kHashFieldName)) {
builder.append(OplogEntry::kHashFieldName, 0LL);
}
- auto entryObj = builder.done();
- auto entry = uassertStatusOK(OplogEntry::parse(entryObj));
+ auto entry = uassertStatusOK(OplogEntry::parse(builder.done()));
if (*opType == 'c') {
invariant(opCtx->lockState()->isW());
uassertStatusOK(applyCommand_inlock(
- opCtx, opObj, entry, oplogApplicationMode, boost::none));
+ opCtx, entry, oplogApplicationMode, boost::none));
return Status::OK();
}
@@ -236,7 +240,7 @@ Status _applyOps(OperationContext* opCtx,
// ops. This is to leave the door open to parallelizing CRUD op
// application in the future.
return repl::applyOperation_inlock(
- opCtx, ctx.db(), opObj, alwaysUpsert, oplogApplicationMode);
+ opCtx, ctx.db(), &entry, alwaysUpsert, oplogApplicationMode);
});
} catch (const DBException& ex) {
ab.append(false);
diff --git a/src/mongo/db/repl/dbcheck.cpp b/src/mongo/db/repl/dbcheck.cpp
index 289b831b795..b1d4bf2f84f 100644
--- a/src/mongo/db/repl/dbcheck.cpp
+++ b/src/mongo/db/repl/dbcheck.cpp
@@ -509,27 +509,28 @@ Status dbCheckDatabaseOnSecondary(OperationContext* opCtx,
namespace repl {
/*
- * The corresponding command run on the secondary.
+ * The corresponding command run during command application.
*/
Status dbCheckOplogCommand(OperationContext* opCtx,
- const char* ns,
- const BSONElement& ui,
- BSONObj& cmd,
- const repl::OpTime& optime,
const repl::OplogEntry& entry,
OplogApplication::Mode mode,
boost::optional<Timestamp> stableTimestampForRecovery) {
+ const auto& cmd = entry.getObject();
+ OpTime opTime;
+ if (!opCtx->writesAreReplicated()) {
+ opTime = entry.getOpTime();
+ }
auto type = OplogEntries_parse(IDLParserErrorContext("type"), cmd.getStringField("type"));
IDLParserErrorContext ctx("o");
switch (type) {
case OplogEntriesEnum::Batch: {
auto invocation = DbCheckOplogBatch::parse(ctx, cmd);
- return dbCheckBatchOnSecondary(opCtx, optime, invocation);
+ return dbCheckBatchOnSecondary(opCtx, opTime, invocation);
}
case OplogEntriesEnum::Collection: {
auto invocation = DbCheckOplogCollection::parse(ctx, cmd);
- return dbCheckDatabaseOnSecondary(opCtx, optime, invocation);
+ return dbCheckDatabaseOnSecondary(opCtx, opTime, invocation);
}
}
diff --git a/src/mongo/db/repl/dbcheck.h b/src/mongo/db/repl/dbcheck.h
index 457087a9365..3f0a9f88e37 100644
--- a/src/mongo/db/repl/dbcheck.h
+++ b/src/mongo/db/repl/dbcheck.h
@@ -221,10 +221,6 @@ namespace repl {
* errors (primarily by writing to the health log), so always returns `Status::OK`.
*/
Status dbCheckOplogCommand(OperationContext* opCtx,
- const char* ns,
- const BSONElement& ui,
- BSONObj& cmd,
- const repl::OpTime& optime,
const repl::OplogEntry& entry,
OplogApplication::Mode mode,
boost::optional<Timestamp> stableTimestampForRecovery);
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 5757ddfc267..ec1fa40c985 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -404,7 +404,9 @@ OpTime logOp(OperationContext* opCtx, MutableOplogEntry* oplogEntry) {
WriteUnitOfWork wuow(opCtx);
if (slot.isNull()) {
slot = oplogInfo->getNextOpTimes(opCtx, 1U)[0];
- // TODO: make the oplogEntry a const reference instead of using the guard.
+ // It would be better to make the oplogEntry a const reference. But because in some cases, a
+ // new OpTime needs to be assigned within the WUOW as explained earlier, we instead pass
+ // oplogEntry by pointer and reset the OpTime to null using a ScopeGuard.
oplogEntry->setOpTime(slot);
}
@@ -647,24 +649,25 @@ std::vector<OplogSlot> getNextOpTimes(OperationContext* opCtx, std::size_t count
// -------------------------------------
namespace {
-NamespaceString parseNs(const string& ns, const BSONObj& cmdObj) {
+NamespaceString extractNs(const NamespaceString& ns, const BSONObj& cmdObj) {
BSONElement first = cmdObj.firstElement();
uassert(40073,
str::stream() << "collection name has invalid type " << typeName(first.type()),
first.canonicalType() == canonicalizeBSONType(mongo::String));
std::string coll = first.valuestr();
uassert(28635, "no collection name specified", !coll.empty());
- return NamespaceString(NamespaceString(ns).db().toString(), coll);
+ return NamespaceString(ns.db().toString(), coll);
}
-std::pair<OptionalCollectionUUID, NamespaceString> parseCollModUUIDAndNss(OperationContext* opCtx,
- const BSONElement& ui,
- const char* ns,
- BSONObj& cmd) {
- if (ui.eoo()) {
- return std::pair<OptionalCollectionUUID, NamespaceString>(boost::none, parseNs(ns, cmd));
+std::pair<OptionalCollectionUUID, NamespaceString> extractCollModUUIDAndNss(
+ OperationContext* opCtx,
+ const boost::optional<UUID>& ui,
+ const NamespaceString& ns,
+ const BSONObj& cmd) {
+ if (!ui) {
+ return std::pair<OptionalCollectionUUID, NamespaceString>(boost::none, extractNs(ns, cmd));
}
- CollectionUUID uuid = uassertStatusOK(UUID::parse(ui));
+ CollectionUUID uuid = ui.get();
auto& catalog = CollectionCatalog::get(opCtx);
const auto nsByUUID = catalog.lookupNSSByUUID(uuid);
uassert(ErrorCodes::NamespaceNotFound,
@@ -674,28 +677,23 @@ std::pair<OptionalCollectionUUID, NamespaceString> parseCollModUUIDAndNss(Operat
return std::pair<OptionalCollectionUUID, NamespaceString>(uuid, *nsByUUID);
}
-NamespaceString parseUUID(OperationContext* opCtx, const BSONElement& ui) {
- auto statusWithUUID = UUID::parse(ui);
- uassertStatusOK(statusWithUUID);
- auto uuid = statusWithUUID.getValue();
+NamespaceString extractNsFromUUID(OperationContext* opCtx, const boost::optional<UUID>& ui) {
+ invariant(ui);
+ auto uuid = ui.get();
auto& catalog = CollectionCatalog::get(opCtx);
auto nss = catalog.lookupNSSByUUID(uuid);
uassert(ErrorCodes::NamespaceNotFound, "No namespace with UUID " + uuid.toString(), nss);
return *nss;
}
-NamespaceString parseUUIDorNs(OperationContext* opCtx,
- const char* ns,
- const BSONElement& ui,
- BSONObj& cmd) {
- return ui.ok() ? parseUUID(opCtx, ui) : parseNs(ns, cmd);
+NamespaceString extractNsFromUUIDorNs(OperationContext* opCtx,
+ const NamespaceString& ns,
+ const boost::optional<UUID>& ui,
+ const BSONObj& cmd) {
+ return ui ? extractNsFromUUID(opCtx, ui) : extractNs(ns, cmd);
}
using OpApplyFn = std::function<Status(OperationContext* opCtx,
- const char* ns,
- const BSONElement& ui,
- BSONObj& cmd,
- const OpTime& opTime,
const OplogEntry& entry,
OplogApplication::Mode mode,
boost::optional<Timestamp> stableTimestampForRecovery)>;
@@ -717,14 +715,12 @@ struct ApplyOpMetadata {
const StringMap<ApplyOpMetadata> kOpsMap = {
{"create",
{[](OperationContext* opCtx,
- const char* ns,
- const BSONElement& ui,
- BSONObj& cmd,
- const OpTime& opTime,
const OplogEntry& entry,
OplogApplication::Mode mode,
boost::optional<Timestamp> stableTimestampForRecovery) -> Status {
- const NamespaceString nss(parseNs(ns, cmd));
+ const auto& ui = entry.getUuid();
+ const auto& cmd = entry.getObject();
+ const NamespaceString nss(extractNs(entry.getNss(), cmd));
Lock::DBLock dbXLock(opCtx, nss.db(), MODE_X);
if (auto idIndexElem = cmd["idIndex"]) {
// Remove "idIndex" field from command.
@@ -746,14 +742,12 @@ const StringMap<ApplyOpMetadata> kOpsMap = {
{ErrorCodes::NamespaceExists}}},
{"createIndexes",
{[](OperationContext* opCtx,
- const char* ns,
- const BSONElement& ui,
- BSONObj& cmd,
- const OpTime& opTime,
const OplogEntry& entry,
OplogApplication::Mode mode,
boost::optional<Timestamp> stableTimestampForRecovery) -> Status {
- const NamespaceString nss(parseUUIDorNs(opCtx, ns, ui, cmd));
+ const auto& cmd = entry.getObject();
+ const NamespaceString nss(
+ extractNsFromUUIDorNs(opCtx, entry.getNss(), entry.getUuid(), cmd));
BSONElement first = cmd.firstElement();
invariant(first.fieldNameStringData() == "createIndexes");
uassert(ErrorCodes::InvalidNamespace,
@@ -772,10 +766,6 @@ const StringMap<ApplyOpMetadata> kOpsMap = {
ErrorCodes::NamespaceNotFound}}},
{"startIndexBuild",
{[](OperationContext* opCtx,
- const char* ns,
- const BSONElement& ui,
- BSONObj& cmd,
- const OpTime& opTime,
const OplogEntry& entry,
OplogApplication::Mode mode,
boost::optional<Timestamp> stableTimestampForRecovery) -> Status {
@@ -805,7 +795,9 @@ const StringMap<ApplyOpMetadata> kOpsMap = {
"The startIndexBuild operation is not supported in applyOps mode"};
}
- const NamespaceString nss(parseUUIDorNs(opCtx, ns, ui, cmd));
+ const auto& ui = entry.getUuid();
+ const auto& cmd = entry.getObject();
+ const NamespaceString nss(extractNsFromUUIDorNs(opCtx, entry.getNss(), ui, cmd));
auto buildUUIDElem = cmd.getField("indexBuildUUID");
uassert(ErrorCodes::BadValue,
@@ -822,16 +814,15 @@ const StringMap<ApplyOpMetadata> kOpsMap = {
"Error parsing 'startIndexBuild' oplog entry, field 'indexes' must be an array.",
indexesElem.type() == Array);
- auto collUUID = uassertStatusOK(UUID::parse(ui));
+ uassert(ErrorCodes::BadValue,
+ "Error parsing 'startIndexBuild' oplog entry, missing required field 'uuid'.",
+ ui);
+ auto collUUID = ui.get();
return startIndexBuild(opCtx, nss, collUUID, indexBuildUUID, indexesElem, mode);
}}},
{"commitIndexBuild",
{[](OperationContext* opCtx,
- const char* ns,
- const BSONElement& ui,
- BSONObj& cmd,
- const OpTime& opTime,
const OplogEntry& entry,
OplogApplication::Mode mode,
boost::optional<Timestamp> stableTimestampForRecovery) -> Status {
@@ -861,6 +852,7 @@ const StringMap<ApplyOpMetadata> kOpsMap = {
"The commitIndexBuild operation is not supported in applyOps mode"};
}
+ const auto& cmd = entry.getObject();
// Ensure the collection name is specified
BSONElement first = cmd.firstElement();
invariant(first.fieldNameStringData() == "commitIndexBuild");
@@ -868,7 +860,8 @@ const StringMap<ApplyOpMetadata> kOpsMap = {
"commitIndexBuild value must be a string",
first.type() == mongo::String);
- const NamespaceString nss(parseUUIDorNs(opCtx, ns, ui, cmd));
+ const NamespaceString nss(
+ extractNsFromUUIDorNs(opCtx, entry.getNss(), entry.getUuid(), cmd));
auto buildUUIDElem = cmd.getField("indexBuildUUID");
uassert(ErrorCodes::BadValue,
@@ -889,10 +882,6 @@ const StringMap<ApplyOpMetadata> kOpsMap = {
}}},
{"abortIndexBuild",
{[](OperationContext* opCtx,
- const char* ns,
- const BSONElement& ui,
- BSONObj& cmd,
- const OpTime& opTme,
const OplogEntry& entry,
OplogApplication::Mode mode,
boost::optional<Timestamp> stableTimestampForRecovery) -> Status {
@@ -922,6 +911,7 @@ const StringMap<ApplyOpMetadata> kOpsMap = {
"The abortIndexBuild operation is not supported in applyOps mode"};
}
+ const auto& cmd = entry.getObject();
// Ensure that the first element is the 'abortIndexBuild' field.
BSONElement first = cmd.firstElement();
invariant(first.fieldNameStringData() == "abortIndexBuild");
@@ -951,50 +941,47 @@ const StringMap<ApplyOpMetadata> kOpsMap = {
}}},
{"collMod",
{[](OperationContext* opCtx,
- const char* ns,
- const BSONElement& ui,
- BSONObj& cmd,
- const OpTime& opTime,
const OplogEntry& entry,
OplogApplication::Mode mode,
boost::optional<Timestamp> stableTimestampForRecovery) -> Status {
NamespaceString nss;
BSONObjBuilder resultWeDontCareAbout;
- std::tie(std::ignore, nss) = parseCollModUUIDAndNss(opCtx, ui, ns, cmd);
+ const auto& cmd = entry.getObject();
+ std::tie(std::ignore, nss) =
+ extractCollModUUIDAndNss(opCtx, entry.getUuid(), entry.getNss(), cmd);
return collMod(opCtx, nss, cmd, &resultWeDontCareAbout);
},
{ErrorCodes::IndexNotFound, ErrorCodes::NamespaceNotFound}}},
{"dbCheck", {dbCheckOplogCommand, {}}},
{"dropDatabase",
{[](OperationContext* opCtx,
- const char* ns,
- const BSONElement& ui,
- BSONObj& cmd,
- const OpTime& opTime,
const OplogEntry& entry,
OplogApplication::Mode mode,
boost::optional<Timestamp> stableTimestampForRecovery) -> Status {
- return dropDatabase(opCtx, NamespaceString(ns).db().toString());
+ return dropDatabase(opCtx, entry.getNss().db().toString());
},
{ErrorCodes::NamespaceNotFound}}},
{"drop",
{[](OperationContext* opCtx,
- const char* ns,
- const BSONElement& ui,
- BSONObj& cmd,
- const OpTime& opTime,
const OplogEntry& entry,
OplogApplication::Mode mode,
boost::optional<Timestamp> stableTimestampForRecovery) -> Status {
BSONObjBuilder resultWeDontCareAbout;
- auto nss = parseUUIDorNs(opCtx, ns, ui, cmd);
+ const auto& cmd = entry.getObject();
+ auto nss = extractNsFromUUIDorNs(opCtx, entry.getNss(), entry.getUuid(), cmd);
if (nss.isDropPendingNamespace()) {
log()
- << "applyCommand: " << nss << " (UUID: " << ui.toString(false)
- << "): collection is already in a drop-pending state: ignoring collection drop: "
+ << "applyCommand: " << nss
+ << " : collection is already in a drop-pending state: ignoring collection drop: "
<< redact(cmd);
return Status::OK();
}
+ // Parse optime from oplog entry unless we are applying this command in standalone or on a
+ // primary (replicated writes enabled).
+ OpTime opTime;
+ if (!opCtx->writesAreReplicated()) {
+ opTime = entry.getOpTime();
+ }
return dropCollection(opCtx,
nss,
resultWeDontCareAbout,
@@ -1005,74 +992,73 @@ const StringMap<ApplyOpMetadata> kOpsMap = {
// deleteIndex(es) is deprecated but still works as of April 10, 2015
{"deleteIndex",
{[](OperationContext* opCtx,
- const char* ns,
- const BSONElement& ui,
- BSONObj& cmd,
- const OpTime& opTime,
const OplogEntry& entry,
OplogApplication::Mode mode,
boost::optional<Timestamp> stableTimestampForRecovery) -> Status {
BSONObjBuilder resultWeDontCareAbout;
- return dropIndexes(opCtx, parseUUIDorNs(opCtx, ns, ui, cmd), cmd, &resultWeDontCareAbout);
+ const auto& cmd = entry.getObject();
+ return dropIndexes(opCtx,
+ extractNsFromUUIDorNs(opCtx, entry.getNss(), entry.getUuid(), cmd),
+ cmd,
+ &resultWeDontCareAbout);
},
{ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound}}},
{"deleteIndexes",
{[](OperationContext* opCtx,
- const char* ns,
- const BSONElement& ui,
- BSONObj& cmd,
- const OpTime& opTime,
const OplogEntry& entry,
OplogApplication::Mode mode,
boost::optional<Timestamp> stableTimestampForRecovery) -> Status {
BSONObjBuilder resultWeDontCareAbout;
- return dropIndexes(opCtx, parseUUIDorNs(opCtx, ns, ui, cmd), cmd, &resultWeDontCareAbout);
+ const auto& cmd = entry.getObject();
+ return dropIndexes(opCtx,
+ extractNsFromUUIDorNs(opCtx, entry.getNss(), entry.getUuid(), cmd),
+ cmd,
+ &resultWeDontCareAbout);
},
{ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound}}},
{"dropIndex",
{[](OperationContext* opCtx,
- const char* ns,
- const BSONElement& ui,
- BSONObj& cmd,
- const OpTime& opTime,
const OplogEntry& entry,
OplogApplication::Mode mode,
boost::optional<Timestamp> stableTimestampForRecovery) -> Status {
BSONObjBuilder resultWeDontCareAbout;
- return dropIndexes(opCtx, parseUUIDorNs(opCtx, ns, ui, cmd), cmd, &resultWeDontCareAbout);
+ const auto& cmd = entry.getObject();
+ return dropIndexes(opCtx,
+ extractNsFromUUIDorNs(opCtx, entry.getNss(), entry.getUuid(), cmd),
+ cmd,
+ &resultWeDontCareAbout);
},
{ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound}}},
{"dropIndexes",
{[](OperationContext* opCtx,
- const char* ns,
- const BSONElement& ui,
- BSONObj& cmd,
- const OpTime& opTime,
const OplogEntry& entry,
OplogApplication::Mode mode,
boost::optional<Timestamp> stableTimestampForRecovery) -> Status {
BSONObjBuilder resultWeDontCareAbout;
- return dropIndexes(opCtx, parseUUIDorNs(opCtx, ns, ui, cmd), cmd, &resultWeDontCareAbout);
+ const auto& cmd = entry.getObject();
+ return dropIndexes(opCtx,
+ extractNsFromUUIDorNs(opCtx, entry.getNss(), entry.getUuid(), cmd),
+ cmd,
+ &resultWeDontCareAbout);
},
{ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound}}},
{"renameCollection",
{[](OperationContext* opCtx,
- const char* ns,
- const BSONElement& ui,
- BSONObj& cmd,
- const OpTime& opTime,
const OplogEntry& entry,
OplogApplication::Mode mode,
boost::optional<Timestamp> stableTimestampForRecovery) -> Status {
- return renameCollectionForApplyOps(opCtx, nsToDatabase(ns), ui, cmd, opTime);
+ // Parse optime from oplog entry unless we are applying this command in standalone or on a
+ // primary (replicated writes enabled).
+ OpTime opTime;
+ if (!opCtx->writesAreReplicated()) {
+ opTime = entry.getOpTime();
+ }
+ return renameCollectionForApplyOps(
+ opCtx, entry.getNss().db().toString(), entry.getUuid(), entry.getObject(), opTime);
},
{ErrorCodes::NamespaceNotFound, ErrorCodes::NamespaceExists}}},
{"applyOps",
{[](OperationContext* opCtx,
- const char* ns,
- const BSONElement& ui,
- BSONObj& cmd,
- const OpTime& opTime,
const OplogEntry& entry,
OplogApplication::Mode mode,
boost::optional<Timestamp> stableTimestampForRecovery) -> Status {
@@ -1085,35 +1071,28 @@ const StringMap<ApplyOpMetadata> kOpsMap = {
}}},
{"convertToCapped",
{[](OperationContext* opCtx,
- const char* ns,
- const BSONElement& ui,
- BSONObj& cmd,
- const OpTime& opTime,
const OplogEntry& entry,
OplogApplication::Mode mode,
boost::optional<Timestamp> stableTimestampForRecovery) -> Status {
- convertToCapped(opCtx, parseUUIDorNs(opCtx, ns, ui, cmd), cmd["size"].number());
+ const auto& cmd = entry.getObject();
+ convertToCapped(opCtx,
+ extractNsFromUUIDorNs(opCtx, entry.getNss(), entry.getUuid(), cmd),
+ cmd["size"].number());
return Status::OK();
},
{ErrorCodes::NamespaceNotFound}}},
{"emptycapped",
{[](OperationContext* opCtx,
- const char* ns,
- const BSONElement& ui,
- BSONObj& cmd,
- const OpTime& opTime,
const OplogEntry& entry,
OplogApplication::Mode mode,
boost::optional<Timestamp> stableTimestampForRecovery) -> Status {
- return emptyCapped(opCtx, parseUUIDorNs(opCtx, ns, ui, cmd));
+ return emptyCapped(
+ opCtx,
+ extractNsFromUUIDorNs(opCtx, entry.getNss(), entry.getUuid(), entry.getObject()));
},
{ErrorCodes::NamespaceNotFound}}},
{"commitTransaction",
{[](OperationContext* opCtx,
- const char* ns,
- const BSONElement& ui,
- BSONObj& cmd,
- const OpTime& opTime,
const OplogEntry& entry,
OplogApplication::Mode mode,
boost::optional<Timestamp> stableTimestampForRecovery) -> Status {
@@ -1121,10 +1100,6 @@ const StringMap<ApplyOpMetadata> kOpsMap = {
}}},
{"abortTransaction",
{[](OperationContext* opCtx,
- const char* ns,
- const BSONElement& ui,
- BSONObj& cmd,
- const OpTime& opTime,
const OplogEntry& entry,
OplogApplication::Mode mode,
boost::optional<Timestamp> stableTimestampForRecovery) -> Status {
@@ -1173,11 +1148,13 @@ StatusWith<OplogApplication::Mode> OplogApplication::parseMode(const std::string
// See replset initial sync code.
Status applyOperation_inlock(OperationContext* opCtx,
Database* db,
- const BSONObj& op,
+ const OplogEntryBatch& opOrGroupedInserts,
bool alwaysUpsert,
OplogApplication::Mode mode,
IncrementOpsAppliedStatsFn incrementOpsAppliedStats) {
- LOG(3) << "applying op: " << redact(op)
+ // Get the single oplog entry to be applied or the first oplog entry of grouped inserts.
+ auto op = opOrGroupedInserts.getOp();
+ LOG(3) << "applying op (or grouped inserts): " << redact(opOrGroupedInserts.toBSON())
<< ", oplog application mode: " << OplogApplication::modeToString(mode);
// Choose opCounters based on running on standalone/primary or secondary by checking
@@ -1187,26 +1164,8 @@ Status applyOperation_inlock(OperationContext* opCtx,
mode == repl::OplogApplication::Mode::kApplyOpsCmd || opCtx->writesAreReplicated();
OpCounters* opCounters = shouldUseGlobalOpCounters ? &globalOpCounters : &replOpCounters;
- std::array<StringData, 8> names = {"ts", "t", "o", "ui", "ns", "op", "b", "o2"};
- std::array<BSONElement, 8> fields;
- op.getFields(names, &fields);
- BSONElement& fieldTs = fields[0];
- BSONElement& fieldT = fields[1];
- BSONElement& fieldO = fields[2];
- BSONElement& fieldUI = fields[3];
- BSONElement& fieldNs = fields[4];
- BSONElement& fieldOp = fields[5];
- BSONElement& fieldB = fields[6];
- BSONElement& fieldO2 = fields[7];
-
- BSONObj o;
- if (fieldO.isABSONObj())
- o = fieldO.embeddedObject();
-
- // operation type -- see logOp() comments for types
- const char* opType = fieldOp.valuestrsafe();
-
- if (*opType == 'n') {
+ auto opType = op.getOpType();
+ if (opType == OpTypeEnum::kNoop) {
// no op
if (incrementOpsAppliedStats) {
incrementOpsAppliedStats();
@@ -1217,23 +1176,18 @@ Status applyOperation_inlock(OperationContext* opCtx,
NamespaceString requestNss;
Collection* collection = nullptr;
- if (fieldUI) {
+ if (auto uuid = op.getUuid()) {
CollectionCatalog& catalog = CollectionCatalog::get(opCtx);
- auto uuid = uassertStatusOK(UUID::parse(fieldUI));
- collection = catalog.lookupCollectionByUUID(uuid);
+ collection = catalog.lookupCollectionByUUID(uuid.get());
uassert(ErrorCodes::NamespaceNotFound,
- str::stream() << "Failed to apply operation due to missing collection (" << uuid
- << "): " << redact(op.toString()),
+ str::stream() << "Failed to apply operation due to missing collection ("
+ << uuid.get() << "): " << redact(opOrGroupedInserts.toBSON()),
collection);
requestNss = collection->ns();
dassert(opCtx->lockState()->isCollectionLockedForMode(
requestNss, supportsDocLocking() ? MODE_IX : MODE_X));
} else {
- uassert(ErrorCodes::InvalidNamespace,
- "'ns' must be of type String",
- fieldNs.type() == BSONType::String);
- const StringData ns = fieldNs.valueStringDataSafe();
- requestNss = NamespaceString(ns);
+ requestNss = op.getNss();
invariant(requestNss.coll().size());
dassert(opCtx->lockState()->isCollectionLockedForMode(
requestNss, supportsDocLocking() ? MODE_IX : MODE_X),
@@ -1241,6 +1195,8 @@ Status applyOperation_inlock(OperationContext* opCtx,
collection = db->getCollection(opCtx, requestNss);
}
+ BSONObj o = op.getObject();
+
// The feature compatibility version in the server configuration collection must not change
// during initial sync.
if ((mode == OplogApplication::Mode::kInitialSync) &&
@@ -1251,15 +1207,13 @@ Status applyOperation_inlock(OperationContext* opCtx,
return Status(ErrorCodes::OplogOperationUnsupported,
str::stream() << "Applying operation on feature compatibility version "
"document not supported in initial sync: "
- << redact(op));
+ << redact(opOrGroupedInserts.toBSON()));
}
}
BSONObj o2;
- if (fieldO2.isABSONObj())
- o2 = fieldO2.Obj();
-
- bool valueB = fieldB.booleanSafe();
+ if (op.getObject2())
+ o2 = op.getObject2().get();
IndexCatalog* indexCatalog = collection == nullptr ? nullptr : collection->getIndexCatalog();
const bool haveWrappingWriteUnitOfWork = opCtx->lockState()->inAWriteUnitOfWork();
@@ -1322,377 +1276,331 @@ Status applyOperation_inlock(OperationContext* opCtx,
}
MONGO_UNREACHABLE;
}();
- invariant(!assignOperationTimestamp || !fieldTs.eoo(),
- str::stream() << "Oplog entry did not have 'ts' field when expected: " << redact(op));
-
- if (*opType == 'i') {
- uassert(ErrorCodes::NamespaceNotFound,
- str::stream() << "Failed to apply insert due to missing collection: "
- << op.toString(),
- collection);
-
- if (fieldO.type() == Array) {
- // Batched inserts.
-
- // Cannot apply an array insert with applyOps command. No support for wiping out
- // the provided timestamps and using new ones for oplog.
- uassert(ErrorCodes::OperationFailed,
- "Cannot apply an array insert with applyOps",
- !opCtx->writesAreReplicated());
-
- uassert(ErrorCodes::BadValue,
- "Expected array for field 'ts'",
- fieldTs.ok() && fieldTs.type() == Array);
- uassert(ErrorCodes::BadValue,
- "Expected array for field 't'",
- fieldT.ok() && fieldT.type() == Array);
-
- uassert(ErrorCodes::OperationFailed,
- str::stream() << "Failed to apply insert due to empty array element: "
- << op.toString(),
- !fieldO.Obj().isEmpty() && !fieldTs.Obj().isEmpty() && !fieldT.Obj().isEmpty());
-
- std::vector<InsertStatement> insertObjs;
- auto fieldOIt = BSONObjIterator(fieldO.Obj());
- auto fieldTsIt = BSONObjIterator(fieldTs.Obj());
- auto fieldTIt = BSONObjIterator(fieldT.Obj());
-
- while (true) {
- auto oElem = fieldOIt.next();
- auto tsElem = fieldTsIt.next();
- auto tElem = fieldTIt.next();
-
- // Note: we don't care about statement ids here since the secondaries don't create
- // their own oplog entries.
- insertObjs.emplace_back(oElem.Obj(), tsElem.timestamp(), tElem.Long());
- if (!fieldOIt.more()) {
- // Make sure arrays are the same length.
- uassert(ErrorCodes::OperationFailed,
- str::stream()
- << "Failed to apply insert due to invalid array elements: "
- << op.toString(),
- !fieldTsIt.more());
- break;
- }
- // Make sure arrays are the same length.
+ invariant(!assignOperationTimestamp || !op.getTimestamp().isNull(),
+ str::stream() << "Oplog entry did not have 'ts' field when expected: "
+ << redact(opOrGroupedInserts.toBSON()));
+
+ switch (opType) {
+ case OpTypeEnum::kInsert: {
+ uassert(ErrorCodes::NamespaceNotFound,
+ str::stream() << "Failed to apply insert due to missing collection: "
+ << redact(opOrGroupedInserts.toBSON()),
+ collection);
+
+ if (opOrGroupedInserts.isGroupedInserts()) {
+ // Grouped inserts.
+
+ // Cannot apply an array insert with applyOps command. No support for wiping out the
+ // provided timestamps and using new ones for oplog.
uassert(ErrorCodes::OperationFailed,
- str::stream() << "Failed to apply insert due to invalid array elements: "
- << op.toString(),
- fieldTsIt.more());
- }
+ "Cannot apply an array insert with applyOps",
+ !opCtx->writesAreReplicated());
+
+ std::vector<InsertStatement> insertObjs;
+ const auto insertOps = opOrGroupedInserts.getGroupedInserts();
+ for (const auto iOp : insertOps) {
+ invariant(iOp->getTerm());
+ insertObjs.emplace_back(
+ iOp->getObject(), iOp->getTimestamp(), iOp->getTerm().get());
+ }
- WriteUnitOfWork wuow(opCtx);
- OpDebug* const nullOpDebug = nullptr;
- Status status = collection->insertDocuments(
- opCtx, insertObjs.begin(), insertObjs.end(), nullOpDebug, true);
- if (!status.isOK()) {
- return status;
- }
- wuow.commit();
- for (auto entry : insertObjs) {
+ WriteUnitOfWork wuow(opCtx);
+ OpDebug* const nullOpDebug = nullptr;
+ Status status = collection->insertDocuments(
+ opCtx, insertObjs.begin(), insertObjs.end(), nullOpDebug, true);
+ if (!status.isOK()) {
+ return status;
+ }
+ wuow.commit();
+ for (auto entry : insertObjs) {
+ opCounters->gotInsert();
+ if (shouldUseGlobalOpCounters) {
+ ServerWriteConcernMetrics::get(opCtx)->recordWriteConcernForInsert(
+ opCtx->getWriteConcern());
+ }
+ if (incrementOpsAppliedStats) {
+ incrementOpsAppliedStats();
+ }
+ }
+ } else {
+ // Single insert.
opCounters->gotInsert();
if (shouldUseGlobalOpCounters) {
ServerWriteConcernMetrics::get(opCtx)->recordWriteConcernForInsert(
opCtx->getWriteConcern());
}
+
+ // No _id.
+ // This indicates an issue with the upstream server:
+ // The oplog entry is corrupted; or
+ // The version of the upstream server is obsolete.
+ uassert(ErrorCodes::NoSuchKey,
+ str::stream()
+ << "Failed to apply insert due to missing _id: " << redact(op.toBSON()),
+ o.hasField("_id"));
+
+ // 1. Insert if
+ // a) we do not have a wrapping WriteUnitOfWork, which implies we are not part of
+ // an "applyOps" command, OR
+ // b) we are part of a multi-document transaction[1].
+ //
+ // 2. Upsert[2] if
+ // a) we have a wrapping WriteUnitOfWork AND we are not part of a transaction,
+ // which implies we are part of an "applyOps" command, OR
+ // b) the previous insert failed with a DuplicateKey error AND we are not part of
+ // a transaction.
+ //
+ // [1] Transactions should not convert inserts to upserts because on secondaries
+ // they will perform a lookup that never occurred on the primary. This may cause
+ // an unintended prepare conflict and block replication. For this reason,
+ // transactions should always fail with DuplicateKey errors and never retry
+ // inserts as upserts.
+ // [2] This upsert behavior exists to support idempotency guarantees outside
+ // steady-state replication and existing users of applyOps.
+
+ const auto txnParticipant = TransactionParticipant::get(opCtx);
+ const bool inTxn = txnParticipant && txnParticipant.inMultiDocumentTransaction();
+ bool needToDoUpsert = haveWrappingWriteUnitOfWork && !inTxn;
+
+ Timestamp timestamp;
+ long long term = OpTime::kUninitializedTerm;
+ if (assignOperationTimestamp) {
+ timestamp = op.getTimestamp();
+ invariant(op.getTerm());
+ term = op.getTerm().get();
+ }
+
+ if (!needToDoUpsert) {
+ WriteUnitOfWork wuow(opCtx);
+
+ // Do not use supplied timestamps if running through applyOps, as that would
+ // allow a user to dictate what timestamps appear in the oplog.
+ if (assignOperationTimestamp) {
+ timestamp = op.getTimestamp();
+ invariant(op.getTerm());
+ term = op.getTerm().get();
+ }
+
+ OpDebug* const nullOpDebug = nullptr;
+ Status status = collection->insertDocument(
+ opCtx, InsertStatement(o, timestamp, term), nullOpDebug, true);
+
+ if (status.isOK()) {
+ wuow.commit();
+ } else if (status == ErrorCodes::DuplicateKey) {
+ // Transactions cannot be retried as upserts once they fail with a duplicate
+ // key error.
+ if (inTxn) {
+ return status;
+ }
+ // Continue to the next block to retry the operation as an upsert.
+ needToDoUpsert = true;
+ } else {
+ return status;
+ }
+ }
+
+ // Now see if we need to do an upsert.
+ if (needToDoUpsert) {
+ // Do update on DuplicateKey errors.
+ // This will only be on the _id field in replication,
+ // since we disable non-_id unique constraint violations.
+ BSONObjBuilder b;
+ b.append(o.getField("_id"));
+
+ UpdateRequest request(requestNss);
+ request.setQuery(b.done());
+ request.setUpdateModification(o);
+ request.setUpsert();
+ request.setFromOplogApplication(true);
+
+ const StringData ns = op.getNss().ns();
+ writeConflictRetry(opCtx, "applyOps_upsert", ns, [&] {
+ WriteUnitOfWork wuow(opCtx);
+ // If this is an atomic applyOps (i.e: `haveWrappingWriteUnitOfWork` is
+ // true), do not timestamp the write.
+ if (assignOperationTimestamp && timestamp != Timestamp::min()) {
+ uassertStatusOK(opCtx->recoveryUnit()->setTimestamp(timestamp));
+ }
+
+ UpdateResult res = update(opCtx, db, request);
+ if (res.numMatched == 0 && res.upserted.isEmpty()) {
+ error() << "No document was updated even though we got a DuplicateKey "
+ "error when inserting";
+ fassertFailedNoTrace(28750);
+ }
+ wuow.commit();
+ });
+ }
+
if (incrementOpsAppliedStats) {
incrementOpsAppliedStats();
}
}
- } else {
- // Single insert.
- opCounters->gotInsert();
+ break;
+ }
+ case OpTypeEnum::kUpdate: {
+ opCounters->gotUpdate();
if (shouldUseGlobalOpCounters) {
- ServerWriteConcernMetrics::get(opCtx)->recordWriteConcernForInsert(
+ ServerWriteConcernMetrics::get(opCtx)->recordWriteConcernForUpdate(
opCtx->getWriteConcern());
}
- // No _id.
- // This indicates an issue with the upstream server:
- // The oplog entry is corrupted; or
- // The version of the upstream server is obsolete.
+ auto idField = o2["_id"];
uassert(ErrorCodes::NoSuchKey,
- str::stream() << "Failed to apply insert due to missing _id: " << op.toString(),
- o.hasField("_id"));
-
- // 1. Insert if
- // a) we do not have a wrapping WriteUnitOfWork, which implies we are not part of an
- // "applyOps" command, OR
- // b) we are part of a multi-document transaction[1].
- //
- // 2. Upsert[2] if
- // a) we have a wrapping WriteUnitOfWork AND we are not part of a transaction, which
- // implies we are part of an "applyOps" command, OR
- // b) the previous insert failed with a DuplicateKey error AND we are not part of a
- // transaction.
- //
- // [1] Transactions should not convert inserts to upserts because on secondaries they
- // will perform a lookup that never occurred on the primary. This may cause an
- // unintended prepare conflict and block replication. For this reason, transactions
- // should always fail with DuplicateKey errors and never retry inserts as upserts.
- // [2] This upsert behavior exists to support idempotency guarantees outside
- // steady-state replication and existing users of applyOps.
-
- const auto txnParticipant = TransactionParticipant::get(opCtx);
- const bool inTxn = txnParticipant && txnParticipant.inMultiDocumentTransaction();
- bool needToDoUpsert = haveWrappingWriteUnitOfWork && !inTxn;
+ str::stream() << "Failed to apply update due to missing _id: "
+ << redact(op.toBSON()),
+ !idField.eoo());
+
+ // The o2 field may contain additional fields besides the _id (like the shard key
+ // fields), but we want to do the update by just _id so we can take advantage of the
+ // IDHACK.
+ BSONObj updateCriteria = idField.wrap();
+
+ const bool upsert = alwaysUpsert || op.getUpsert().value_or(false);
+ UpdateRequest request(requestNss);
+ request.setQuery(updateCriteria);
+ request.setUpdateModification(o);
+ request.setUpsert(upsert);
+ request.setFromOplogApplication(true);
Timestamp timestamp;
- long long term = OpTime::kUninitializedTerm;
if (assignOperationTimestamp) {
- if (fieldTs) {
- timestamp = fieldTs.timestamp();
- }
- if (fieldT) {
- term = fieldT.Long();
- }
+ timestamp = op.getTimestamp();
}
- if (!needToDoUpsert) {
+ const StringData ns = op.getNss().ns();
+ auto status = writeConflictRetry(opCtx, "applyOps_update", ns, [&] {
WriteUnitOfWork wuow(opCtx);
-
- // Do not use supplied timestamps if running through applyOps, as that would allow
- // a user to dictate what timestamps appear in the oplog.
- if (assignOperationTimestamp) {
- if (fieldTs.ok()) {
- timestamp = fieldTs.timestamp();
- }
- if (fieldT.ok()) {
- term = fieldT.Long();
- }
+ if (timestamp != Timestamp::min()) {
+ uassertStatusOK(opCtx->recoveryUnit()->setTimestamp(timestamp));
}
- OpDebug* const nullOpDebug = nullptr;
- Status status = collection->insertDocument(
- opCtx, InsertStatement(o, timestamp, term), nullOpDebug, true);
-
- if (status.isOK()) {
- wuow.commit();
- } else if (status == ErrorCodes::DuplicateKey) {
- // Transactions cannot be retried as upserts once they fail with a duplicate key
- // error.
- if (inTxn) {
- return status;
+ UpdateResult ur = update(opCtx, db, request);
+ if (ur.numMatched == 0 && ur.upserted.isEmpty()) {
+ if (ur.modifiers) {
+ if (updateCriteria.nFields() == 1) {
+ // was a simple { _id : ... } update criteria
+ string msg = str::stream()
+ << "failed to apply update: " << redact(op.toBSON());
+ error() << msg;
+ return Status(ErrorCodes::UpdateOperationFailed, msg);
+ }
+
+ // Need to check to see if it isn't present so we can exit early with a
+ // failure. Note that adds some overhead for this extra check in some cases,
+ // such as an updateCriteria of the form
+ // { _id:..., { x : {$size:...} }
+ // thus this is not ideal.
+ if (collection == nullptr ||
+ (indexCatalog->haveIdIndex(opCtx) &&
+ Helpers::findById(opCtx, collection, updateCriteria).isNull()) ||
+ // capped collections won't have an _id index
+ (!indexCatalog->haveIdIndex(opCtx) &&
+ Helpers::findOne(opCtx, collection, updateCriteria, false).isNull())) {
+ string msg = str::stream()
+ << "couldn't find doc: " << redact(op.toBSON());
+ error() << msg;
+ return Status(ErrorCodes::UpdateOperationFailed, msg);
+ }
+
+ // Otherwise, it's present; zero objects were updated because of additional
+ // specifiers in the query for idempotence
+ } else {
+ // this could happen benignly on an oplog duplicate replay of an upsert
+ // (because we are idempotent), if an regular non-mod update fails the item
+ // is (presumably) missing.
+ if (!upsert) {
+ string msg = str::stream()
+ << "update of non-mod failed: " << redact(op.toBSON());
+ error() << msg;
+ return Status(ErrorCodes::UpdateOperationFailed, msg);
+ }
}
- // Continue to the next block to retry the operation as an upsert.
- needToDoUpsert = true;
- } else {
- return status;
}
- }
- // Now see if we need to do an upsert.
- if (needToDoUpsert) {
- // Do update on DuplicateKey errors.
- // This will only be on the _id field in replication,
- // since we disable non-_id unique constraint violations.
- BSONObjBuilder b;
- b.append(o.getField("_id"));
-
- UpdateRequest request(requestNss);
- request.setQuery(b.done());
- request.setUpdateModification(o);
- request.setUpsert();
- request.setFromOplogApplication(true);
-
- const StringData ns = fieldNs.valueStringDataSafe();
- writeConflictRetry(opCtx, "applyOps_upsert", ns, [&] {
- WriteUnitOfWork wuow(opCtx);
- // If this is an atomic applyOps (i.e: `haveWrappingWriteUnitOfWork` is true),
- // do not timestamp the write.
- if (assignOperationTimestamp && timestamp != Timestamp::min()) {
- uassertStatusOK(opCtx->recoveryUnit()->setTimestamp(timestamp));
- }
+ wuow.commit();
+ return Status::OK();
+ });
- UpdateResult res = update(opCtx, db, request);
- if (res.numMatched == 0 && res.upserted.isEmpty()) {
- error() << "No document was updated even though we got a DuplicateKey "
- "error when inserting";
- fassertFailedNoTrace(28750);
- }
- wuow.commit();
- });
+ if (!status.isOK()) {
+ return status;
}
if (incrementOpsAppliedStats) {
incrementOpsAppliedStats();
}
+ break;
}
- } else if (*opType == 'u') {
- opCounters->gotUpdate();
- if (shouldUseGlobalOpCounters) {
- ServerWriteConcernMetrics::get(opCtx)->recordWriteConcernForUpdate(
- opCtx->getWriteConcern());
- }
-
- auto idField = o2["_id"];
- uassert(ErrorCodes::NoSuchKey,
- str::stream() << "Failed to apply update due to missing _id: " << op.toString(),
- !idField.eoo());
-
- // The o2 field may contain additional fields besides the _id (like the shard key fields),
- // but we want to do the update by just _id so we can take advantage of the IDHACK.
- BSONObj updateCriteria = idField.wrap();
- const bool upsert = valueB || alwaysUpsert;
-
- UpdateRequest request(requestNss);
- request.setQuery(updateCriteria);
- request.setUpdateModification(o);
- request.setUpsert(upsert);
- request.setFromOplogApplication(true);
-
- Timestamp timestamp;
- if (assignOperationTimestamp) {
- timestamp = fieldTs.timestamp();
- }
-
- const StringData ns = fieldNs.valueStringDataSafe();
- auto status = writeConflictRetry(opCtx, "applyOps_update", ns, [&] {
- WriteUnitOfWork wuow(opCtx);
- if (timestamp != Timestamp::min()) {
- uassertStatusOK(opCtx->recoveryUnit()->setTimestamp(timestamp));
+ case OpTypeEnum::kDelete: {
+ opCounters->gotDelete();
+ if (shouldUseGlobalOpCounters) {
+ ServerWriteConcernMetrics::get(opCtx)->recordWriteConcernForDelete(
+ opCtx->getWriteConcern());
}
- UpdateResult ur = update(opCtx, db, request);
- if (ur.numMatched == 0 && ur.upserted.isEmpty()) {
- if (ur.modifiers) {
- if (updateCriteria.nFields() == 1) {
- // was a simple { _id : ... } update criteria
- string msg = str::stream() << "failed to apply update: " << redact(op);
- error() << msg;
- return Status(ErrorCodes::UpdateOperationFailed, msg);
- }
+ auto idField = o["_id"];
+ uassert(ErrorCodes::NoSuchKey,
+ str::stream() << "Failed to apply delete due to missing _id: "
+ << redact(op.toBSON()),
+ !idField.eoo());
- // Need to check to see if it isn't present so we can exit early with a
- // failure. Note that adds some overhead for this extra check in some cases,
- // such as an updateCriteria of the form
- // { _id:..., { x : {$size:...} }
- // thus this is not ideal.
- if (collection == nullptr ||
- (indexCatalog->haveIdIndex(opCtx) &&
- Helpers::findById(opCtx, collection, updateCriteria).isNull()) ||
- // capped collections won't have an _id index
- (!indexCatalog->haveIdIndex(opCtx) &&
- Helpers::findOne(opCtx, collection, updateCriteria, false).isNull())) {
- string msg = str::stream() << "couldn't find doc: " << redact(op);
- error() << msg;
- return Status(ErrorCodes::UpdateOperationFailed, msg);
- }
+ // The o field may contain additional fields besides the _id (like the shard key
+ // fields), but we want to do the delete by just _id so we can take advantage of the
+ // IDHACK.
+ BSONObj deleteCriteria = idField.wrap();
- // Otherwise, it's present; zero objects were updated because of additional
- // specifiers in the query for idempotence
- } else {
- // this could happen benignly on an oplog duplicate replay of an upsert
- // (because we are idempotent),
- // if an regular non-mod update fails the item is (presumably) missing.
- if (!upsert) {
- string msg = str::stream() << "update of non-mod failed: " << redact(op);
- error() << msg;
- return Status(ErrorCodes::UpdateOperationFailed, msg);
- }
- }
+ Timestamp timestamp;
+ if (assignOperationTimestamp) {
+ timestamp = op.getTimestamp();
}
- wuow.commit();
- return Status::OK();
- });
-
- if (!status.isOK()) {
- return status;
- }
-
- if (incrementOpsAppliedStats) {
- incrementOpsAppliedStats();
- }
- } else if (*opType == 'd') {
- opCounters->gotDelete();
- if (shouldUseGlobalOpCounters) {
- ServerWriteConcernMetrics::get(opCtx)->recordWriteConcernForDelete(
- opCtx->getWriteConcern());
- }
-
- auto idField = o["_id"];
- uassert(ErrorCodes::NoSuchKey,
- str::stream() << "Failed to apply delete due to missing _id: " << op.toString(),
- !idField.eoo());
-
- // The o field may contain additional fields besides the _id (like the shard key fields),
- // but we want to do the delete by just _id so we can take advantage of the IDHACK.
- BSONObj deleteCriteria = idField.wrap();
-
- Timestamp timestamp;
- if (assignOperationTimestamp) {
- timestamp = fieldTs.timestamp();
- }
+ const StringData ns = op.getNss().ns();
+ writeConflictRetry(opCtx, "applyOps_delete", ns, [&] {
+ WriteUnitOfWork wuow(opCtx);
+ if (timestamp != Timestamp::min()) {
+ uassertStatusOK(opCtx->recoveryUnit()->setTimestamp(timestamp));
+ }
+ deleteObjects(opCtx, collection, requestNss, deleteCriteria, true /* justOne */);
+ wuow.commit();
+ });
- const StringData ns = fieldNs.valueStringDataSafe();
- writeConflictRetry(opCtx, "applyOps_delete", ns, [&] {
- WriteUnitOfWork wuow(opCtx);
- if (timestamp != Timestamp::min()) {
- uassertStatusOK(opCtx->recoveryUnit()->setTimestamp(timestamp));
+ if (incrementOpsAppliedStats) {
+ incrementOpsAppliedStats();
}
-
- if (opType[1] == 0) {
- const auto justOne = true;
- deleteObjects(opCtx, collection, requestNss, deleteCriteria, justOne);
- } else
- verify(opType[1] == 'b'); // "db" advertisement
- wuow.commit();
- });
-
- if (incrementOpsAppliedStats) {
- incrementOpsAppliedStats();
+ break;
+ }
+ default: {
+ // Commands are processed in applyCommand_inlock().
+ invariant(false, str::stream() << "Unsupported opType " << OpType_serializer(opType));
}
- } else {
- invariant(*opType != 'c'); // commands are processed in applyCommand_inlock()
- uasserted(14825, str::stream() << "error in applyOperation : unknown opType " << *opType);
}
return Status::OK();
}
Status applyCommand_inlock(OperationContext* opCtx,
- const BSONObj& op,
const OplogEntry& entry,
OplogApplication::Mode mode,
boost::optional<Timestamp> stableTimestampForRecovery) {
// We should only have a stableTimestampForRecovery during replication recovery.
invariant(stableTimestampForRecovery == boost::none ||
mode == OplogApplication::Mode::kRecovering);
- LOG(3) << "applying command op: " << redact(op)
+ LOG(3) << "applying command op: " << redact(entry.toBSON())
<< ", oplog application mode: " << OplogApplication::modeToString(mode)
<< ", stable timestamp for recovery: " << stableTimestampForRecovery;
- std::array<StringData, 4> names = {"o", "ui", "ns", "op"};
- std::array<BSONElement, 4> fields;
- op.getFields(names, &fields);
- BSONElement& fieldO = fields[0];
- BSONElement& fieldUI = fields[1];
- BSONElement& fieldNs = fields[2];
- BSONElement& fieldOp = fields[3];
-
- const char* opType = fieldOp.valuestrsafe();
- invariant(*opType == 'c'); // only commands are processed here
+ // Only commands are processed here.
+ invariant(entry.getOpType() == OpTypeEnum::kCommand);
// Choose opCounters based on running on standalone/primary or secondary by checking
// whether writes are replicated.
OpCounters* opCounters = opCtx->writesAreReplicated() ? &globalOpCounters : &replOpCounters;
opCounters->gotCommand();
- if (fieldO.eoo()) {
- return Status(ErrorCodes::NoSuchKey, "Missing expected field 'o'");
- }
-
- if (!fieldO.isABSONObj()) {
- return Status(ErrorCodes::BadValue, "Expected object for field 'o'");
- }
+ BSONObj o = entry.getObject();
- BSONObj o = fieldO.embeddedObject();
-
- uassert(ErrorCodes::InvalidNamespace,
- "'ns' must be of type String",
- fieldNs.type() == BSONType::String);
- const NamespaceString nss(fieldNs.valueStringData());
+ const auto& nss = entry.getNss();
if (!nss.isValid()) {
return {ErrorCodes::InvalidNamespace, "invalid ns: " + std::string(nss.ns())};
}
@@ -1719,21 +1627,18 @@ Status applyCommand_inlock(OperationContext* opCtx,
if ((mode == OplogApplication::Mode::kInitialSync) &&
(std::find(whitelistedOps.begin(), whitelistedOps.end(), o.firstElementFieldName()) ==
whitelistedOps.end()) &&
- parseNs(nss.ns(), o) == NamespaceString::kServerConfigurationNamespace) {
+ extractNs(nss, o) == NamespaceString::kServerConfigurationNamespace) {
return Status(ErrorCodes::OplogOperationUnsupported,
str::stream() << "Applying command to feature compatibility version "
"collection not supported in initial sync: "
- << redact(op));
+ << redact(entry.toBSON()));
}
// Parse optime from oplog entry unless we are applying this command in standalone or on a
// primary (replicated writes enabled).
OpTime opTime;
if (!opCtx->writesAreReplicated()) {
- auto opTimeResult = OpTime::parseFromOplogEntry(op);
- if (opTimeResult.isOK()) {
- opTime = opTimeResult.getValue();
- }
+ opTime = entry.getOpTime();
}
const bool assignCommandTimestamp = [&] {
@@ -1769,7 +1674,8 @@ Status applyCommand_inlock(OperationContext* opCtx,
MONGO_UNREACHABLE;
}();
invariant(!assignCommandTimestamp || !opTime.isNull(),
- str::stream() << "Oplog entry did not have 'ts' field when expected: " << redact(op));
+ str::stream() << "Oplog entry did not have 'ts' field when expected: "
+ << redact(entry.toBSON()));
const Timestamp writeTime = (assignCommandTimestamp ? opTime.getTimestamp() : Timestamp());
@@ -1789,14 +1695,7 @@ Status applyCommand_inlock(OperationContext* opCtx,
// If 'writeTime' is not null, any writes in this scope will be given 'writeTime' as
// their timestamp at commit.
TimestampBlock tsBlock(opCtx, writeTime);
- return curOpToApply.applyFunc(opCtx,
- nss.ns().c_str(),
- fieldUI,
- o,
- opTime,
- entry,
- mode,
- stableTimestampForRecovery);
+ return curOpToApply.applyFunc(opCtx, entry, mode, stableTimestampForRecovery);
} catch (const DBException& ex) {
return ex.toStatus();
}
@@ -1826,14 +1725,14 @@ Status applyCommand_inlock(OperationContext* opCtx,
// TODO: This parse could be expensive and not worth it.
auto ns =
cmd->parse(opCtx, OpMsgRequest::fromDBAndBody(nss.db(), o))->ns().toString();
- auto swUUID = UUID::parse(fieldUI);
- if (!swUUID.isOK()) {
- error() << "Failed command " << redact(o) << " on " << ns << " with status "
- << swUUID.getStatus() << "during oplog application. Expected a UUID.";
+ auto swUUID = entry.getUuid();
+ if (!swUUID) {
+ error() << "Failed command " << redact(o) << " on " << ns
+ << "during oplog application. Expected a UUID.";
}
BackgroundOperation::awaitNoBgOpInProgForNs(ns);
IndexBuildsCoordinator::get(opCtx)->awaitNoIndexBuildInProgressForCollection(
- swUUID.getValue());
+ swUUID.get());
opCtx->recoveryUnit()->abandonSnapshot();
opCtx->checkForInterrupt();
@@ -1853,7 +1752,7 @@ Status applyCommand_inlock(OperationContext* opCtx,
}
}
- AuthorizationManager::get(opCtx->getServiceContext())->logOp(opCtx, opType, nss, o, nullptr);
+ AuthorizationManager::get(opCtx->getServiceContext())->logOp(opCtx, "c", nss, o, nullptr);
return Status::OK();
}
diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h
index 7bf42539624..34da8931085 100644
--- a/src/mongo/db/repl/oplog.h
+++ b/src/mongo/db/repl/oplog.h
@@ -39,6 +39,7 @@
#include "mongo/db/catalog/collection_options.h"
#include "mongo/db/logical_session_id.h"
#include "mongo/db/repl/oplog_entry.h"
+#include "mongo/db/repl/oplog_entry_batch.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/replication_coordinator.h"
@@ -192,8 +193,8 @@ inline std::ostream& operator<<(std::ostream& s, OplogApplication::Mode mode) {
}
/**
- * Take a non-command op and apply it locally
- * Used for applying from an oplog
+ * Used for applying from an oplog entry or grouped inserts.
+ * @param opOrGroupedInserts a single oplog entry or grouped inserts to be applied.
* @param alwaysUpsert convert some updates to upserts for idempotency reasons
* @param mode specifies what oplog application mode we are in
* @param incrementOpsAppliedStats is called whenever an op is applied.
@@ -201,7 +202,7 @@ inline std::ostream& operator<<(std::ostream& s, OplogApplication::Mode mode) {
*/
Status applyOperation_inlock(OperationContext* opCtx,
Database* db,
- const BSONObj& op,
+ const OplogEntryBatch& opOrGroupedInserts,
bool alwaysUpsert,
OplogApplication::Mode mode,
IncrementOpsAppliedStatsFn incrementOpsAppliedStats = {});
@@ -212,7 +213,6 @@ Status applyOperation_inlock(OperationContext* opCtx,
* Returns failure status if the op that could not be applied.
*/
Status applyCommand_inlock(OperationContext* opCtx,
- const BSONObj& op,
const OplogEntry& entry,
OplogApplication::Mode mode,
boost::optional<Timestamp> stableTimestampForRecovery);
diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h
index 24c47432508..f1379307027 100644
--- a/src/mongo/db/repl/oplog_entry.h
+++ b/src/mongo/db/repl/oplog_entry.h
@@ -170,7 +170,7 @@ public:
using MutableOplogEntry::kVersionFieldName;
using MutableOplogEntry::kWallClockTimeFieldName;
- // Make serialize(), toBSON() and getters accessible.
+ // Make serialize() and getters accessible.
using MutableOplogEntry::get_id;
using MutableOplogEntry::getDurableReplOperation;
using MutableOplogEntry::getFromMigrate;
@@ -193,7 +193,6 @@ public:
using MutableOplogEntry::getVersion;
using MutableOplogEntry::getWallClockTime;
using MutableOplogEntry::serialize;
- using MutableOplogEntry::toBSON;
// Make helper functions accessible.
using MutableOplogEntry::getOpTime;
@@ -323,6 +322,10 @@ public:
*/
std::string toString() const;
+ BSONObj toBSON() const {
+ return _raw;
+ }
+
private:
BSONObj _raw; // Owned.
CommandType _commandType = CommandType::kNotCommand;
diff --git a/src/mongo/db/repl/oplog_entry_batch.cpp b/src/mongo/db/repl/oplog_entry_batch.cpp
new file mode 100644
index 00000000000..ff079e8f2b0
--- /dev/null
+++ b/src/mongo/db/repl/oplog_entry_batch.cpp
@@ -0,0 +1,85 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/repl/oplog_entry_batch.h"
+#include "mongo/bson/bsonobjbuilder.h"
+
+namespace mongo {
+namespace repl {
+BSONObj OplogEntryBatch::toBSON() const {
+ if (!isGroupedInserts())
+ return getOp().toBSON();
+
+ // Since we found more than one document, create grouped insert of many docs.
+ // We are going to group many 'i' ops into one big 'i' op, with array fields for
+ // 'ts', 't', and 'o', corresponding to each individual op.
+ // For example:
+ // { ts: Timestamp(1,1), t:1, ns: "test.foo", op:"i", o: {_id:1} }
+ // { ts: Timestamp(1,2), t:1, ns: "test.foo", op:"i", o: {_id:2} }
+ // become:
+ // { ts: [Timestamp(1, 1), Timestamp(1, 2)],
+ // t: [1, 1],
+ // o: [{_id: 1}, {_id: 2}],
+ // ns: "test.foo",
+ // op: "i"
+ // }
+ // This BSONObj is used for error messages logging only.
+ BSONObjBuilder groupedInsertBuilder;
+ // Populate the "ts" field with an array of all the grouped inserts' timestamps.
+ {
+ BSONArrayBuilder tsArrayBuilder(groupedInsertBuilder.subarrayStart("ts"));
+ for (auto op : _batch) {
+ tsArrayBuilder.append(op->getTimestamp());
+ }
+ }
+ // Populate the "t" (term) field with an array of all the grouped inserts' terms.
+ {
+ BSONArrayBuilder tArrayBuilder(groupedInsertBuilder.subarrayStart("t"));
+ for (auto op : _batch) {
+ long long term = OpTime::kUninitializedTerm;
+ auto parsedTerm = op->getTerm();
+ if (parsedTerm)
+ term = parsedTerm.get();
+ tArrayBuilder.append(term);
+ }
+ }
+ // Populate the "o" field with an array of all the grouped inserts.
+ {
+ BSONArrayBuilder oArrayBuilder(groupedInsertBuilder.subarrayStart("o"));
+ for (auto op : _batch) {
+ oArrayBuilder.append(op->getObject());
+ }
+ }
+ // Generate an op object of all elements except for "ts", "t", and "o", since we
+ // need to make those fields arrays of all the ts's, t's, and o's.
+ groupedInsertBuilder.appendElementsUnique(getOp().toBSON());
+ return groupedInsertBuilder.obj();
+}
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/oplog_entry_batch.h b/src/mongo/db/repl/oplog_entry_batch.h
new file mode 100644
index 00000000000..444087c0a45
--- /dev/null
+++ b/src/mongo/db/repl/oplog_entry_batch.h
@@ -0,0 +1,86 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/bson/bsonobj.h"
+#include "mongo/db/repl/oplog_entry.h"
+
+namespace mongo {
+namespace repl {
+/**
+ * This is a class for a single oplog entry or grouped inserts to be applied in syncApply. This
+ * class is immutable and can only be initialized using either a single oplog entry or a range of
+ * grouped inserts.
+ */
+class OplogEntryBatch {
+public:
+ using OperationPtrs = std::vector<const OplogEntry*>;
+ using ConstIterator = OperationPtrs::const_iterator;
+
+ OplogEntryBatch() = delete;
+
+ // This initializes it as a single oplog entry.
+ OplogEntryBatch(const OplogEntry* op) : _batch({op}) {}
+
+ // This initializes it as grouped inserts.
+ OplogEntryBatch(ConstIterator begin, ConstIterator end) : _batch(begin, end) {
+ // Performs sanity checks to confirm that the batch is valid.
+ invariant(!_batch.empty());
+ for (auto op : _batch) {
+ // Every oplog entry must be an insert.
+ invariant(op->getOpType() == OpTypeEnum::kInsert);
+ // Every oplog entry must be in the same namespace.
+ invariant(op->getNss() == _batch.front()->getNss());
+ }
+ }
+
+ // Return the oplog entry to be applied or the first oplog entry of the grouped inserts.
+ const OplogEntry& getOp() const {
+ return *(_batch.front());
+ }
+
+ bool isGroupedInserts() const {
+ return _batch.size() > 1;
+ }
+
+ const OperationPtrs& getGroupedInserts() const {
+ invariant(isGroupedInserts());
+ return _batch;
+ }
+
+ // Returns a BSONObj for message logging purpose.
+ BSONObj toBSON() const;
+
+private:
+ // A single oplog entry or a batch of grouped insert oplog entries to be applied.
+ OperationPtrs _batch;
+};
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index ef05c77289a..8acfbc9d001 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -226,9 +226,9 @@ NamespaceString parseUUIDOrNs(OperationContext* opCtx, const OplogEntry& oplogEn
return *nss;
}
-NamespaceStringOrUUID getNsOrUUID(const NamespaceString& nss, const BSONObj& op) {
- if (auto ui = op["ui"]) {
- return {nss.db().toString(), uassertStatusOK(UUID::parse(ui))};
+NamespaceStringOrUUID getNsOrUUID(const NamespaceString& nss, const OplogEntry& op) {
+ if (auto ui = op.getUuid()) {
+ return {nss.db().toString(), ui.get()};
}
return nss;
}
@@ -240,8 +240,7 @@ NamespaceStringOrUUID getNsOrUUID(const NamespaceString& nss, const BSONObj& op)
Status finishAndLogApply(ClockSource* clockSource,
Status finalStatus,
Date_t applyStartTime,
- OpTypeEnum opType,
- const BSONObj& op) {
+ const OplogEntryBatch& batch) {
if (finalStatus.isOK()) {
auto applyEndTime = clockSource->now();
@@ -253,13 +252,13 @@ Status finishAndLogApply(ClockSource* clockSource,
StringBuilder s;
s << "applied op: ";
- if (opType == OpTypeEnum::kCommand) {
+ if (batch.getOp().getOpType() == OpTypeEnum::kCommand) {
s << "command ";
} else {
s << "CRUD ";
}
- s << redact(op);
+ s << redact(batch.toBSON());
s << ", took " << diffMS << "ms";
log() << s.str();
@@ -276,13 +275,14 @@ LockMode fixLockModeForSystemDotViewsChanges(const NamespaceString& nss, LockMod
// static
Status SyncTail::syncApply(OperationContext* opCtx,
- const BSONObj& op,
+ const OplogEntryBatch& batch,
OplogApplication::Mode oplogApplicationMode,
boost::optional<Timestamp> stableTimestampForRecovery) {
+ auto op = batch.getOp();
// Count each log op application as a separate operation, for reporting purposes
CurOp individualOp(opCtx);
- const NamespaceString nss(op.getStringField("ns"));
+ const NamespaceString nss(op.getNss());
auto incrementOpsAppliedStats = [] { opsAppliedStats.increment(1); };
@@ -302,7 +302,7 @@ Status SyncTail::syncApply(OperationContext* opCtx,
// mode (similar to initial sync) instead so we do not accidentally ignore real errors.
bool shouldAlwaysUpsert = (oplogApplicationMode != OplogApplication::Mode::kInitialSync);
Status status = applyOperation_inlock(
- opCtx, db, op, shouldAlwaysUpsert, oplogApplicationMode, incrementOpsAppliedStats);
+ opCtx, db, batch, shouldAlwaysUpsert, oplogApplicationMode, incrementOpsAppliedStats);
if (!status.isOK() && status.code() == ErrorCodes::WriteConflict) {
throw WriteConflictException();
}
@@ -318,10 +318,10 @@ Status SyncTail::syncApply(OperationContext* opCtx,
MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangAfterRecordingOpApplicationStartTime);
}
- auto opType = OpType_parse(IDLParserErrorContext("syncApply"), op["op"].valuestrsafe());
+ auto opType = op.getOpType();
auto finishApply = [&](Status status) {
- return finishAndLogApply(clockSource, status, applyStartTime, opType, op);
+ return finishAndLogApply(clockSource, status, applyStartTime, batch);
};
if (opType == OpTypeEnum::kNoop) {
@@ -353,19 +353,16 @@ Status SyncTail::syncApply(OperationContext* opCtx,
return Status::OK();
}
- ex.addContext(str::stream() << "Failed to apply operation: " << redact(op));
+ ex.addContext(str::stream()
+ << "Failed to apply operation: " << redact(batch.toBSON()));
throw;
}
}));
} else if (opType == OpTypeEnum::kCommand) {
return finishApply(writeConflictRetry(opCtx, "syncApply_command", nss.ns(), [&] {
- // TODO SERVER-37180 Remove this double-parsing.
- // The command entry has been parsed before, so it must be valid.
- auto entry = uassertStatusOK(OplogEntry::parse(op));
-
// A special case apply for commands to avoid implicit database creation.
- Status status = applyCommand_inlock(
- opCtx, op, entry, oplogApplicationMode, stableTimestampForRecovery);
+ Status status =
+ applyCommand_inlock(opCtx, op, oplogApplicationMode, stableTimestampForRecovery);
incrementOpsAppliedStats();
return status;
}));
@@ -647,7 +644,7 @@ private:
auto oplogEntries =
fassertNoTrace(31004, _getNextApplierBatchFn(opCtx.get(), batchLimits));
for (const auto& oplogEntry : oplogEntries) {
- ops.emplace_back(oplogEntry.getRaw());
+ ops.emplace_back(oplogEntry);
}
// If we don't have anything in the queue, wait a bit for something to appear.
@@ -1052,7 +1049,7 @@ Status multiSyncApply(OperationContext* opCtx,
try {
auto stableTimestampForRecovery = st->getOptions().stableTimestampForRecovery;
const Status status = SyncTail::syncApply(
- opCtx, entry.getRaw(), oplogApplicationMode, stableTimestampForRecovery);
+ opCtx, &entry, oplogApplicationMode, stableTimestampForRecovery);
if (!status.isOK()) {
// In initial sync, update operations can cause documents to be missed during
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index 2bbdfbd71c8..6a156a630aa 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -73,15 +73,14 @@ public:
WorkerMultikeyPathInfo* workerMultikeyPathInfo)>;
/**
- * Applies the operation that is in param o.
+ * Applies the operations that is in param ops.
* Functions for applying operations/commands and increment server status counters may
* be overridden for testing.
*/
static Status syncApply(OperationContext* opCtx,
- const BSONObj& o,
+ const OplogEntryBatch& batch,
OplogApplication::Mode oplogApplicationMode,
boost::optional<Timestamp> stableTimestampForRecovery);
-
/**
*
* Constructs a SyncTail.
@@ -151,10 +150,10 @@ public:
return _batch;
}
- void emplace_back(BSONObj obj) {
+ void emplace_back(OplogEntry oplog) {
invariant(!_mustShutdown);
- _bytes += obj.objsize();
- _batch.emplace_back(std::move(obj));
+ _bytes += oplog.getRawObjSizeBytes();
+ _batch.emplace_back(std::move(oplog));
}
void pop_back() {
_bytes -= back().getRawObjSizeBytes();
diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp
index 933f23f4667..79fbd315d3b 100644
--- a/src/mongo/db/repl/sync_tail_test.cpp
+++ b/src/mongo/db/repl/sync_tail_test.cpp
@@ -234,44 +234,18 @@ auto parseFromOplogEntryArray(const BSONObj& obj, int elem) {
return OpTime(tsArray.Array()[elem].timestamp(), termArray.Array()[elem].Long());
};
-TEST_F(SyncTailTest, SyncApplyNoNamespaceBadOp) {
- const BSONObj op = BSON("op"
- << "x");
- ASSERT_THROWS(
- SyncTail::syncApply(_opCtx.get(), op, OplogApplication::Mode::kInitialSync, boost::none),
- ExceptionFor<ErrorCodes::BadValue>);
-}
-
-TEST_F(SyncTailTest, SyncApplyNoNamespaceNoOp) {
- ASSERT_OK(SyncTail::syncApply(_opCtx.get(),
- BSON("op"
- << "n"),
- OplogApplication::Mode::kInitialSync,
- boost::none));
-}
-
-TEST_F(SyncTailTest, SyncApplyBadOp) {
- const BSONObj op = BSON("op"
- << "x"
- << "ns"
- << "test.t");
- ASSERT_THROWS(
- SyncTail::syncApply(_opCtx.get(), op, OplogApplication::Mode::kInitialSync, boost::none),
- ExceptionFor<ErrorCodes::BadValue>);
-}
-
TEST_F(SyncTailTest, SyncApplyInsertDocumentDatabaseMissing) {
NamespaceString nss("test.t");
auto op = makeOplogEntry(OpTypeEnum::kInsert, nss, {});
- ASSERT_THROWS(SyncTail::syncApply(
- _opCtx.get(), op.toBSON(), OplogApplication::Mode::kSecondary, boost::none),
- ExceptionFor<ErrorCodes::NamespaceNotFound>);
+ ASSERT_THROWS(
+ SyncTail::syncApply(_opCtx.get(), &op, OplogApplication::Mode::kSecondary, boost::none),
+ ExceptionFor<ErrorCodes::NamespaceNotFound>);
}
TEST_F(SyncTailTest, SyncApplyDeleteDocumentDatabaseMissing) {
NamespaceString otherNss("test.othername");
auto op = makeOplogEntry(OpTypeEnum::kDelete, otherNss, {});
- _testSyncApplyCrudOperation(ErrorCodes::OK, op.toBSON(), false);
+ _testSyncApplyCrudOperation(ErrorCodes::OK, op, false);
}
TEST_F(SyncTailTest, SyncApplyInsertDocumentCollectionLookupByUUIDFails) {
@@ -279,9 +253,9 @@ TEST_F(SyncTailTest, SyncApplyInsertDocumentCollectionLookupByUUIDFails) {
createDatabase(_opCtx.get(), nss.db());
NamespaceString otherNss(nss.getSisterNS("othername"));
auto op = makeOplogEntry(OpTypeEnum::kInsert, otherNss, kUuid);
- ASSERT_THROWS(SyncTail::syncApply(
- _opCtx.get(), op.toBSON(), OplogApplication::Mode::kSecondary, boost::none),
- ExceptionFor<ErrorCodes::NamespaceNotFound>);
+ ASSERT_THROWS(
+ SyncTail::syncApply(_opCtx.get(), &op, OplogApplication::Mode::kSecondary, boost::none),
+ ExceptionFor<ErrorCodes::NamespaceNotFound>);
}
TEST_F(SyncTailTest, SyncApplyDeleteDocumentCollectionLookupByUUIDFails) {
@@ -289,7 +263,7 @@ TEST_F(SyncTailTest, SyncApplyDeleteDocumentCollectionLookupByUUIDFails) {
createDatabase(_opCtx.get(), nss.db());
NamespaceString otherNss(nss.getSisterNS("othername"));
auto op = makeOplogEntry(OpTypeEnum::kDelete, otherNss, kUuid);
- _testSyncApplyCrudOperation(ErrorCodes::OK, op.toBSON(), false);
+ _testSyncApplyCrudOperation(ErrorCodes::OK, op, false);
}
TEST_F(SyncTailTest, SyncApplyInsertDocumentCollectionMissing) {
@@ -299,9 +273,9 @@ TEST_F(SyncTailTest, SyncApplyInsertDocumentCollectionMissing) {
// which in the case of this test just ignores such errors. This tests mostly that we don't
// implicitly create the collection and lock the database in MODE_X.
auto op = makeOplogEntry(OpTypeEnum::kInsert, nss, {});
- ASSERT_THROWS(SyncTail::syncApply(
- _opCtx.get(), op.toBSON(), OplogApplication::Mode::kSecondary, boost::none),
- ExceptionFor<ErrorCodes::NamespaceNotFound>);
+ ASSERT_THROWS(
+ SyncTail::syncApply(_opCtx.get(), &op, OplogApplication::Mode::kSecondary, boost::none),
+ ExceptionFor<ErrorCodes::NamespaceNotFound>);
ASSERT_FALSE(collectionExists(_opCtx.get(), nss));
}
@@ -312,7 +286,7 @@ TEST_F(SyncTailTest, SyncApplyDeleteDocumentCollectionMissing) {
// which in the case of this test just ignores such errors. This tests mostly that we don't
// implicitly create the collection and lock the database in MODE_X.
auto op = makeOplogEntry(OpTypeEnum::kDelete, nss, {});
- _testSyncApplyCrudOperation(ErrorCodes::OK, op.toBSON(), false);
+ _testSyncApplyCrudOperation(ErrorCodes::OK, op, false);
ASSERT_FALSE(collectionExists(_opCtx.get(), nss));
}
@@ -320,14 +294,14 @@ TEST_F(SyncTailTest, SyncApplyInsertDocumentCollectionExists) {
const NamespaceString nss("test.t");
createCollection(_opCtx.get(), nss, {});
auto op = makeOplogEntry(OpTypeEnum::kInsert, nss, {});
- _testSyncApplyCrudOperation(ErrorCodes::OK, op.toBSON(), true);
+ _testSyncApplyCrudOperation(ErrorCodes::OK, op, true);
}
TEST_F(SyncTailTest, SyncApplyDeleteDocumentCollectionExists) {
const NamespaceString nss("test.t");
createCollection(_opCtx.get(), nss, {});
auto op = makeOplogEntry(OpTypeEnum::kDelete, nss, {});
- _testSyncApplyCrudOperation(ErrorCodes::OK, op.toBSON(), false);
+ _testSyncApplyCrudOperation(ErrorCodes::OK, op, false);
}
TEST_F(SyncTailTest, SyncApplyInsertDocumentCollectionLockedByUUID) {
@@ -336,7 +310,7 @@ TEST_F(SyncTailTest, SyncApplyInsertDocumentCollectionLockedByUUID) {
// Test that the collection to lock is determined by the UUID and not the 'ns' field.
NamespaceString otherNss(nss.getSisterNS("othername"));
auto op = makeOplogEntry(OpTypeEnum::kInsert, otherNss, uuid);
- _testSyncApplyCrudOperation(ErrorCodes::OK, op.toBSON(), true);
+ _testSyncApplyCrudOperation(ErrorCodes::OK, op, true);
}
TEST_F(SyncTailTest, SyncApplyDeleteDocumentCollectionLockedByUUID) {
@@ -348,7 +322,7 @@ TEST_F(SyncTailTest, SyncApplyDeleteDocumentCollectionLockedByUUID) {
// Test that the collection to lock is determined by the UUID and not the 'ns' field.
NamespaceString otherNss(nss.getSisterNS("othername"));
auto op = makeOplogEntry(OpTypeEnum::kDelete, otherNss, options.uuid);
- _testSyncApplyCrudOperation(ErrorCodes::OK, op.toBSON(), false);
+ _testSyncApplyCrudOperation(ErrorCodes::OK, op, false);
}
TEST_F(SyncTailTest, SyncApplyCommand) {
@@ -373,24 +347,12 @@ TEST_F(SyncTailTest, SyncApplyCommand) {
};
ASSERT_TRUE(_opCtx->writesAreReplicated());
ASSERT_FALSE(documentValidationDisabled(_opCtx.get()));
- ASSERT_OK(
- SyncTail::syncApply(_opCtx.get(), op, OplogApplication::Mode::kInitialSync, boost::none));
+ auto entry = OplogEntry(op);
+ ASSERT_OK(SyncTail::syncApply(
+ _opCtx.get(), &entry, OplogApplication::Mode::kInitialSync, boost::none));
ASSERT_TRUE(applyCmdCalled);
}
-TEST_F(SyncTailTest, SyncApplyCommandThrowsException) {
- const BSONObj op = BSON("op"
- << "c"
- << "ns" << 12345 << "o"
- << BSON("create"
- << "t")
- << "ts" << Timestamp(1, 1));
- // This test relies on the namespace type check of IDL.
- ASSERT_THROWS(
- SyncTail::syncApply(_opCtx.get(), op, OplogApplication::Mode::kInitialSync, boost::none),
- ExceptionFor<ErrorCodes::TypeMismatch>);
-}
-
DEATH_TEST_F(SyncTailTest, MultiApplyAbortsWhenNoOperationsAreGiven, "!ops.empty()") {
auto writerPool = OplogApplier::makeWriterPool();
SyncTail syncTail(nullptr,
@@ -2364,13 +2326,13 @@ TEST_F(SyncTailTest, LogSlowOpApplicationWhenSuccessful) {
auto entry = makeOplogEntry(OpTypeEnum::kInsert, nss, {});
startCapturingLogMessages();
- ASSERT_OK(SyncTail::syncApply(
- _opCtx.get(), entry.toBSON(), OplogApplication::Mode::kSecondary, boost::none));
+ ASSERT_OK(
+ SyncTail::syncApply(_opCtx.get(), &entry, OplogApplication::Mode::kSecondary, boost::none));
// Use a builder for easier escaping. We expect the operation to be logged.
StringBuilder expected;
- expected << "applied op: CRUD { op: \"i\", ns: \"test.t\", o: { _id: 0 }, ts: Timestamp(1, 1), "
- "t: 1, v: 2 }, took "
+ expected << "applied op: CRUD { ts: Timestamp(1, 1), t: 1, v: 2, op: \"i\", ns: \"test.t\", o: "
+ "{ _id: 0 } }, took "
<< applyDuration << "ms";
ASSERT_EQUALS(1, countLogLinesContaining(expected.str()));
}
@@ -2387,8 +2349,7 @@ TEST_F(SyncTailTest, DoNotLogSlowOpApplicationWhenFailed) {
startCapturingLogMessages();
ASSERT_THROWS(
- SyncTail::syncApply(
- _opCtx.get(), entry.toBSON(), OplogApplication::Mode::kSecondary, boost::none),
+ SyncTail::syncApply(_opCtx.get(), &entry, OplogApplication::Mode::kSecondary, boost::none),
ExceptionFor<ErrorCodes::NamespaceNotFound>);
// Use a builder for easier escaping. We expect the operation to *not* be logged
@@ -2412,8 +2373,8 @@ TEST_F(SyncTailTest, DoNotLogNonSlowOpApplicationWhenSuccessful) {
auto entry = makeOplogEntry(OpTypeEnum::kInsert, nss, {});
startCapturingLogMessages();
- ASSERT_OK(SyncTail::syncApply(
- _opCtx.get(), entry.toBSON(), OplogApplication::Mode::kSecondary, boost::none));
+ ASSERT_OK(
+ SyncTail::syncApply(_opCtx.get(), &entry, OplogApplication::Mode::kSecondary, boost::none));
// Use a builder for easier escaping. We expect the operation to *not* be logged,
// since it wasn't slow to apply.
diff --git a/src/mongo/db/repl/sync_tail_test_fixture.cpp b/src/mongo/db/repl/sync_tail_test_fixture.cpp
index 6ebcfa0f6fd..59860e000c2 100644
--- a/src/mongo/db/repl/sync_tail_test_fixture.cpp
+++ b/src/mongo/db/repl/sync_tail_test_fixture.cpp
@@ -154,7 +154,7 @@ StorageInterface* SyncTailTest::getStorageInterface() const {
}
void SyncTailTest::_testSyncApplyCrudOperation(ErrorCodes::Error expectedError,
- const BSONObj& op,
+ const OplogEntry& op,
bool expectedApplyOpCalled) {
bool applyOpCalled = false;
@@ -174,7 +174,7 @@ void SyncTailTest::_testSyncApplyCrudOperation(ErrorCodes::Error expectedError,
checkOpCtx(opCtx);
ASSERT_EQUALS(NamespaceString("test.t"), nss);
ASSERT_EQUALS(1U, docs.size());
- ASSERT_BSONOBJ_EQ(op["o"].Obj(), docs[0]);
+ ASSERT_BSONOBJ_EQ(op.getObject(), docs[0]);
return Status::OK();
};
@@ -188,13 +188,13 @@ void SyncTailTest::_testSyncApplyCrudOperation(ErrorCodes::Error expectedError,
checkOpCtx(opCtx);
ASSERT_EQUALS(NamespaceString("test.t"), nss);
ASSERT(deletedDoc);
- ASSERT_BSONOBJ_EQ(op["o"].Obj(), *deletedDoc);
+ ASSERT_BSONOBJ_EQ(op.getObject(), *deletedDoc);
return Status::OK();
};
ASSERT_TRUE(_opCtx->writesAreReplicated());
ASSERT_FALSE(documentValidationDisabled(_opCtx.get()));
ASSERT_EQ(
- SyncTail::syncApply(_opCtx.get(), op, OplogApplication::Mode::kSecondary, boost::none),
+ SyncTail::syncApply(_opCtx.get(), &op, OplogApplication::Mode::kSecondary, boost::none),
expectedError);
ASSERT_EQ(applyOpCalled, expectedApplyOpCalled);
}
diff --git a/src/mongo/db/repl/sync_tail_test_fixture.h b/src/mongo/db/repl/sync_tail_test_fixture.h
index 9b1adaea94f..6ce29c072f8 100644
--- a/src/mongo/db/repl/sync_tail_test_fixture.h
+++ b/src/mongo/db/repl/sync_tail_test_fixture.h
@@ -115,7 +115,7 @@ public:
protected:
void _testSyncApplyCrudOperation(ErrorCodes::Error expectedError,
- const BSONObj& op,
+ const OplogEntry& op,
bool expectedApplyOpCalled);
ServiceContext::UniqueOperationContext _opCtx;
diff --git a/src/mongo/db/repl/transaction_oplog_application.cpp b/src/mongo/db/repl/transaction_oplog_application.cpp
index c23415f61e4..fe2ee9fbc18 100644
--- a/src/mongo/db/repl/transaction_oplog_application.cpp
+++ b/src/mongo/db/repl/transaction_oplog_application.cpp
@@ -66,7 +66,7 @@ Status _applyOperationsForTransaction(OperationContext* opCtx,
try {
AutoGetCollection coll(opCtx, op.getNss(), MODE_IX);
auto status = repl::applyOperation_inlock(
- opCtx, coll.getDb(), op.toBSON(), false /*alwaysUpsert*/, oplogApplicationMode);
+ opCtx, coll.getDb(), &op, false /*alwaysUpsert*/, oplogApplicationMode);
if (!status.isOK()) {
return status;
}