diff options
-rw-r--r-- | src/mongo/db/prefetch.cpp | 39 | ||||
-rw-r--r-- | src/mongo/db/prefetch.h | 9 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_entry.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_entry.h | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_rollback.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_rollback_test.cpp | 40 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 28 |
7 files changed, 103 insertions, 45 deletions
diff --git a/src/mongo/db/prefetch.cpp b/src/mongo/db/prefetch.cpp index a2dc36be478..5a43e2ab982 100644 --- a/src/mongo/db/prefetch.cpp +++ b/src/mongo/db/prefetch.cpp @@ -40,6 +40,7 @@ #include "mongo/db/dbhelpers.h" #include "mongo/db/index/index_access_method.h" #include "mongo/db/jsobj.h" +#include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/repl_settings.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_coordinator_global.h" @@ -149,39 +150,31 @@ void prefetchRecordPages(OperationContext* opCtx, } // namespace // prefetch for an oplog operation -void prefetchPagesForReplicatedOp(OperationContext* opCtx, Database* db, const BSONObj& op) { +void prefetchPagesForReplicatedOp(OperationContext* opCtx, + Database* db, + const OplogEntry& oplogEntry) { invariant(db); const ReplSettings::IndexPrefetchConfig prefetchConfig = getGlobalReplicationCoordinator()->getIndexPrefetchConfig(); - const char* opField; - const char* opType = op.getStringField("op"); - switch (*opType) { - case 'i': // insert - case 'd': // delete - opField = "o"; - break; - case 'u': // update - opField = "o2"; - break; - default: - // prefetch ignores other ops - return; - } - BSONObj obj = op.getObjectField(opField); - const char* ns = op.getStringField("ns"); + // Prefetch ignores non-CRUD operations. + if (!oplogEntry.isCrudOpType()) { + return; + } // This will have to change for engines other than MMAP V1, because they might not have // means for directly prefetching pages from the collection. For this purpose, acquire S // lock on the database, instead of optimizing with IS. - Lock::CollectionLock collLock(opCtx->lockState(), ns, MODE_S); + const auto& nss = oplogEntry.getNamespace(); + Lock::CollectionLock collLock(opCtx->lockState(), nss.ns(), MODE_S); - Collection* collection = db->getCollection(opCtx, ns); + Collection* collection = db->getCollection(opCtx, nss); if (!collection) { return; } - LOG(4) << "index prefetch for op " << *opType; + auto opType = oplogEntry.getOpType(); + LOG(4) << "index prefetch for op " << OpType_serializer(opType); // should we prefetch index pages on updates? if the update is in-place and doesn't change // indexed values, it is actually slower - a lot slower if there are a dozen indexes or @@ -198,6 +191,8 @@ void prefetchPagesForReplicatedOp(OperationContext* opCtx, Database* db, const B // a way to achieve that would be to prefetch the record first, and then afterwards do // this part. // + auto obj = oplogEntry.getOperationToApply(); + invariant(!obj.isEmpty()); prefetchIndexPages(opCtx, collection, prefetchConfig, obj); // do not prefetch the data for inserts; it doesn't exist yet @@ -206,11 +201,11 @@ void prefetchPagesForReplicatedOp(OperationContext* opCtx, Database* db, const B // when we delete. note if done we only want to touch the first page. // // update: do record prefetch. - if ((*opType == 'u') && + if ((opType == OpTypeEnum::kUpdate) && // do not prefetch the data for capped collections because // they typically do not have an _id index for findById() to use. !collection->isCapped()) { - prefetchRecordPages(opCtx, db, ns, obj); + prefetchRecordPages(opCtx, db, nss.ns().c_str(), obj); } } diff --git a/src/mongo/db/prefetch.h b/src/mongo/db/prefetch.h index 8de85948a05..1f5576e31e7 100644 --- a/src/mongo/db/prefetch.h +++ b/src/mongo/db/prefetch.h @@ -28,12 +28,19 @@ #pragma once namespace mongo { + class BSONObj; class Database; class OperationContext; + namespace repl { +class OplogEntry; + // page in possible index and/or data pages for an op from the oplog -void prefetchPagesForReplicatedOp(OperationContext* opCtx, Database* db, const BSONObj& op); +void prefetchPagesForReplicatedOp(OperationContext* opCtx, + Database* db, + const OplogEntry& oplogEntry); + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp index f9f4a11896f..d70b518b795 100644 --- a/src/mongo/db/repl/oplog_entry.cpp +++ b/src/mongo/db/repl/oplog_entry.cpp @@ -208,12 +208,26 @@ bool OplogEntry::isCrudOpType() const { BSONElement OplogEntry::getIdElement() const { invariant(isCrudOpType()); if (getOpType() == OpTypeEnum::kUpdate) { + // We cannot use getOperationToApply() here because the BSONObj will go out out of scope + // after we return the BSONElement. return getObject2()->getField("_id"); } else { return getObject()["_id"]; } } +BSONObj OplogEntry::getOperationToApply() const { + if (getOpType() != OpTypeEnum::kUpdate) { + return getObject(); + } + + if (auto optionalObj = getObject2()) { + return *optionalObj; + } + + return {}; +} + OplogEntry::CommandType OplogEntry::getCommandType() const { invariant(isCommand()); invariant(_commandType != OplogEntry::CommandType::kNotCommand); diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h index ae21d23b446..d2ae43d65b4 100644 --- a/src/mongo/db/repl/oplog_entry.h +++ b/src/mongo/db/repl/oplog_entry.h @@ -103,6 +103,14 @@ public: BSONElement getIdElement() const; /** + * Returns the document representing the operation to apply. + * For commands and insert/delete operations, this will be the document in the 'o' field. + * For update operations, this will be the document in the 'o2' field. + * An empty document returned by this function indicates that we have a malformed OplogEntry. + */ + BSONObj getOperationToApply() const; + + /** * Returns the type of command of the oplog entry. Must be called on a command op. */ CommandType getCommandType() const; diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index c7f2c922d4b..7a73c7e20ee 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -235,7 +235,7 @@ Status rollback_internal::updateFixUpInfoFromLocalOplogEntry(FixUpInfo& fixUpInf BSONObj fixedObj = bob.obj(); // Parse the oplog entry. - auto oplogEntry = OplogEntry(fixedObj); + const OplogEntry oplogEntry(fixedObj); if (isNestedApplyOpsCommand) { LOG(2) << "Updating rollback FixUpInfo for nested applyOps oplog entry: " @@ -254,9 +254,7 @@ Status rollback_internal::updateFixUpInfoFromLocalOplogEntry(FixUpInfo& fixUpInf << redact(oplogEntry.toBSON())); } - BSONObj obj = - oplogEntry.raw.getObjectField(oplogEntry.getOpType() == OpTypeEnum::kUpdate ? "o2" : "o"); - + auto obj = oplogEntry.getOperationToApply(); if (obj.isEmpty()) { throw RSFatalException(str::stream() << "Local op on rollback has no object field: " << redact(oplogEntry.toBSON())); @@ -264,7 +262,7 @@ Status rollback_internal::updateFixUpInfoFromLocalOplogEntry(FixUpInfo& fixUpInf // If the operation being rolled back has a txnNumber, then the corresponding entry in the // session transaction table needs to be refetched. - auto operationSessionInfo = oplogEntry.getOperationSessionInfo(); + const auto& operationSessionInfo = oplogEntry.getOperationSessionInfo(); auto txnNumber = operationSessionInfo.getTxnNumber(); if (txnNumber) { auto sessionId = operationSessionInfo.getSessionId(); @@ -567,7 +565,7 @@ Status rollback_internal::updateFixUpInfoFromLocalOplogEntry(FixUpInfo& fixUpInf // If we are inserting/updating/deleting a document in the oplog entry, we will update // the doc._id field when we actually insert the docID into the docsToRefetch set. - DocID doc = DocID(oplogEntry.raw, BSONElement(), *uuid); + DocID doc = DocID(fixedObj, BSONElement(), *uuid); doc._id = oplogEntry.getIdElement(); if (doc._id.eoo()) { diff --git a/src/mongo/db/repl/rs_rollback_test.cpp b/src/mongo/db/repl/rs_rollback_test.cpp index 5e310eaf9aa..f6ca06237cd 100644 --- a/src/mongo/db/repl/rs_rollback_test.cpp +++ b/src/mongo/db/repl/rs_rollback_test.cpp @@ -2068,7 +2068,45 @@ TEST(RSRollbackTest, LocalEntryWithoutOIsFatal) { RSFatalException); } -TEST(RSRollbackTest, LocalEntryWithoutO2IsFatal) { +TEST(RSRollbackTest, LocalUpdateEntryWithoutO2IsFatal) { + const auto validOplogEntry = BSON("op" + << "u" + << "ui" + << UUID::gen() + << "ts" + << Timestamp(1, 1) + << "t" + << 1LL + << "h" + << 1LL + << "ns" + << "test.t" + << "o" + << BSON("_id" << 1 << "a" << 1) + << "o2" + << BSON("_id" << 1)); + FixUpInfo fui; + ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, validOplogEntry, false)); + const auto invalidOplogEntry = BSON("op" + << "u" + << "ui" + << UUID::gen() + << "ts" + << Timestamp(1, 1) + << "t" + << 1LL + << "h" + << 1LL + << "ns" + << "test.t" + << "o" + << BSON("_id" << 1 << "a" << 1)); + ASSERT_THROWS( + updateFixUpInfoFromLocalOplogEntry(fui, invalidOplogEntry, false).transitional_ignore(), + RSFatalException); +} + +TEST(RSRollbackTest, LocalUpdateEntryWithEmptyO2IsFatal) { const auto validOplogEntry = BSON("op" << "u" << "ui" diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 3fa8f87db9f..f25e398443a 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -424,20 +424,20 @@ Status SyncTail::syncApply(OperationContext* opCtx, namespace { // The pool threads call this to prefetch each op -void prefetchOp(const BSONObj& op) { +void prefetchOp(const OplogEntry& oplogEntry) { initializePrefetchThread(); - const char* ns = op.getStringField("ns"); - if (ns && (ns[0] != '\0')) { + const auto& nss = oplogEntry.getNamespace(); + if (!nss.isEmpty()) { try { // one possible tweak here would be to stay in the read lock for this database // for multiple prefetches if they are for the same database. const ServiceContext::UniqueOperationContext opCtxPtr = cc().makeOperationContext(); OperationContext& opCtx = *opCtxPtr; - AutoGetCollectionForReadCommand ctx(&opCtx, NamespaceString(ns)); + AutoGetCollectionForReadCommand ctx(&opCtx, nss); Database* db = ctx.getDb(); if (db) { - prefetchPagesForReplicatedOp(&opCtx, db, op); + prefetchPagesForReplicatedOp(&opCtx, db, oplogEntry); } } catch (const DBException& e) { LOG(2) << "ignoring exception in prefetchOp(): " << redact(e) << endl; @@ -452,7 +452,7 @@ void prefetchOp(const BSONObj& op) { void prefetchOps(const MultiApplier::Operations& ops, OldThreadPool* prefetcherPool) { invariant(prefetcherPool); for (auto&& op : ops) { - prefetcherPool->schedule([&] { prefetchOp(op.raw); }); + prefetcherPool->schedule([&] { prefetchOp(op); }); } prefetcherPool->join(); } @@ -928,10 +928,8 @@ void SyncTail::oplogApplication(ReplicationCoordinator* replCoord) { } // Extract some info from ops that we'll need after releasing the batch below. - const auto firstOpTimeInBatch = - fassertStatusOK(40299, OpTime::parseFromOplogEntry(ops.front().raw)); - const auto lastOpTimeInBatch = - fassertStatusOK(28773, OpTime::parseFromOplogEntry(ops.back().raw)); + const auto firstOpTimeInBatch = ops.front().getOpTime(); + const auto lastOpTimeInBatch = ops.back().getOpTime(); // Make sure the oplog doesn't go back in time or repeat an entry. if (firstOpTimeInBatch <= replCoord->getMyLastAppliedOpTime()) { @@ -1029,7 +1027,7 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* opCtx, int curVersion = entry.getVersion(); if (curVersion != OplogEntry::kOplogVersion) { severe() << "expected oplog version " << OplogEntry::kOplogVersion << " but found version " - << curVersion << " in oplog entry: " << redact(entry.raw); + << curVersion << " in oplog entry: " << redact(entry.toBSON()); fassertFailedNoTrace(18820); } @@ -1391,13 +1389,13 @@ Status multiSyncApply_noAbort(OperationContext* opCtx, const Status status = syncApply(opCtx, entry->raw, oplogApplicationMode); if (!status.isOK()) { - severe() << "Error applying operation (" << redact(entry->raw) + severe() << "Error applying operation (" << redact(entry->toBSON()) << "): " << causedBy(redact(status)); return status; } } catch (const DBException& e) { severe() << "writer worker caught exception: " << redact(e) - << " on: " << redact(entry->raw); + << " on: " << redact(entry->toBSON()); return e.toStatus(); } } @@ -1443,7 +1441,7 @@ Status multiInitialSyncApply_noAbort(OperationContext* opCtx, // sync source. if (s != ErrorCodes::UpdateOperationFailed) { error() << "Error applying operation: " << redact(s) << " (" - << redact(entry.raw) << ")"; + << redact(entry.toBSON()) << ")"; return s; } @@ -1459,7 +1457,7 @@ Status multiInitialSyncApply_noAbort(OperationContext* opCtx, } severe() << "writer worker caught exception: " << causedBy(redact(e)) - << " on: " << redact(entry.raw); + << " on: " << redact(entry.toBSON()); return e.toStatus(); } } |