summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2017-10-25 18:23:37 -0400
committerBenety Goh <benety@mongodb.com>2018-01-16 14:40:12 -0500
commit939a30aba1703c939b0352bded4b56257c8a0bec (patch)
tree9f192f4f2000f9836eb1d5e5e36570390feb73d7
parent4eabf1ea6225f444b3b0b3b2fee785aaa306212f (diff)
downloadmongo-939a30aba1703c939b0352bded4b56257c8a0bec.tar.gz
SERVER-29200 add OplogEntry::getOperationToApply()
remove references to OplogEntry::raw from rollback, prefetchPagesForReplicatedOp() and SyncTail
-rw-r--r--src/mongo/db/prefetch.cpp39
-rw-r--r--src/mongo/db/prefetch.h9
-rw-r--r--src/mongo/db/repl/oplog_entry.cpp14
-rw-r--r--src/mongo/db/repl/oplog_entry.h8
-rw-r--r--src/mongo/db/repl/rs_rollback.cpp10
-rw-r--r--src/mongo/db/repl/rs_rollback_test.cpp40
-rw-r--r--src/mongo/db/repl/sync_tail.cpp28
7 files changed, 103 insertions, 45 deletions
diff --git a/src/mongo/db/prefetch.cpp b/src/mongo/db/prefetch.cpp
index a2dc36be478..5a43e2ab982 100644
--- a/src/mongo/db/prefetch.cpp
+++ b/src/mongo/db/prefetch.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/dbhelpers.h"
#include "mongo/db/index/index_access_method.h"
#include "mongo/db/jsobj.h"
+#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/repl_settings.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/replication_coordinator_global.h"
@@ -149,39 +150,31 @@ void prefetchRecordPages(OperationContext* opCtx,
} // namespace
// prefetch for an oplog operation
-void prefetchPagesForReplicatedOp(OperationContext* opCtx, Database* db, const BSONObj& op) {
+void prefetchPagesForReplicatedOp(OperationContext* opCtx,
+ Database* db,
+ const OplogEntry& oplogEntry) {
invariant(db);
const ReplSettings::IndexPrefetchConfig prefetchConfig =
getGlobalReplicationCoordinator()->getIndexPrefetchConfig();
- const char* opField;
- const char* opType = op.getStringField("op");
- switch (*opType) {
- case 'i': // insert
- case 'd': // delete
- opField = "o";
- break;
- case 'u': // update
- opField = "o2";
- break;
- default:
- // prefetch ignores other ops
- return;
- }
- BSONObj obj = op.getObjectField(opField);
- const char* ns = op.getStringField("ns");
+ // Prefetch ignores non-CRUD operations.
+ if (!oplogEntry.isCrudOpType()) {
+ return;
+ }
// This will have to change for engines other than MMAP V1, because they might not have
// means for directly prefetching pages from the collection. For this purpose, acquire S
// lock on the database, instead of optimizing with IS.
- Lock::CollectionLock collLock(opCtx->lockState(), ns, MODE_S);
+ const auto& nss = oplogEntry.getNamespace();
+ Lock::CollectionLock collLock(opCtx->lockState(), nss.ns(), MODE_S);
- Collection* collection = db->getCollection(opCtx, ns);
+ Collection* collection = db->getCollection(opCtx, nss);
if (!collection) {
return;
}
- LOG(4) << "index prefetch for op " << *opType;
+ auto opType = oplogEntry.getOpType();
+ LOG(4) << "index prefetch for op " << OpType_serializer(opType);
// should we prefetch index pages on updates? if the update is in-place and doesn't change
// indexed values, it is actually slower - a lot slower if there are a dozen indexes or
@@ -198,6 +191,8 @@ void prefetchPagesForReplicatedOp(OperationContext* opCtx, Database* db, const B
// a way to achieve that would be to prefetch the record first, and then afterwards do
// this part.
//
+ auto obj = oplogEntry.getOperationToApply();
+ invariant(!obj.isEmpty());
prefetchIndexPages(opCtx, collection, prefetchConfig, obj);
// do not prefetch the data for inserts; it doesn't exist yet
@@ -206,11 +201,11 @@ void prefetchPagesForReplicatedOp(OperationContext* opCtx, Database* db, const B
// when we delete. note if done we only want to touch the first page.
//
// update: do record prefetch.
- if ((*opType == 'u') &&
+ if ((opType == OpTypeEnum::kUpdate) &&
// do not prefetch the data for capped collections because
// they typically do not have an _id index for findById() to use.
!collection->isCapped()) {
- prefetchRecordPages(opCtx, db, ns, obj);
+ prefetchRecordPages(opCtx, db, nss.ns().c_str(), obj);
}
}
diff --git a/src/mongo/db/prefetch.h b/src/mongo/db/prefetch.h
index 8de85948a05..1f5576e31e7 100644
--- a/src/mongo/db/prefetch.h
+++ b/src/mongo/db/prefetch.h
@@ -28,12 +28,19 @@
#pragma once
namespace mongo {
+
class BSONObj;
class Database;
class OperationContext;
+
namespace repl {
+class OplogEntry;
+
// page in possible index and/or data pages for an op from the oplog
-void prefetchPagesForReplicatedOp(OperationContext* opCtx, Database* db, const BSONObj& op);
+void prefetchPagesForReplicatedOp(OperationContext* opCtx,
+ Database* db,
+ const OplogEntry& oplogEntry);
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp
index f9f4a11896f..d70b518b795 100644
--- a/src/mongo/db/repl/oplog_entry.cpp
+++ b/src/mongo/db/repl/oplog_entry.cpp
@@ -208,12 +208,26 @@ bool OplogEntry::isCrudOpType() const {
BSONElement OplogEntry::getIdElement() const {
invariant(isCrudOpType());
if (getOpType() == OpTypeEnum::kUpdate) {
+ // We cannot use getOperationToApply() here because the BSONObj will go out out of scope
+ // after we return the BSONElement.
return getObject2()->getField("_id");
} else {
return getObject()["_id"];
}
}
+BSONObj OplogEntry::getOperationToApply() const {
+ if (getOpType() != OpTypeEnum::kUpdate) {
+ return getObject();
+ }
+
+ if (auto optionalObj = getObject2()) {
+ return *optionalObj;
+ }
+
+ return {};
+}
+
OplogEntry::CommandType OplogEntry::getCommandType() const {
invariant(isCommand());
invariant(_commandType != OplogEntry::CommandType::kNotCommand);
diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h
index ae21d23b446..d2ae43d65b4 100644
--- a/src/mongo/db/repl/oplog_entry.h
+++ b/src/mongo/db/repl/oplog_entry.h
@@ -103,6 +103,14 @@ public:
BSONElement getIdElement() const;
/**
+ * Returns the document representing the operation to apply.
+ * For commands and insert/delete operations, this will be the document in the 'o' field.
+ * For update operations, this will be the document in the 'o2' field.
+ * An empty document returned by this function indicates that we have a malformed OplogEntry.
+ */
+ BSONObj getOperationToApply() const;
+
+ /**
* Returns the type of command of the oplog entry. Must be called on a command op.
*/
CommandType getCommandType() const;
diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp
index c7f2c922d4b..7a73c7e20ee 100644
--- a/src/mongo/db/repl/rs_rollback.cpp
+++ b/src/mongo/db/repl/rs_rollback.cpp
@@ -235,7 +235,7 @@ Status rollback_internal::updateFixUpInfoFromLocalOplogEntry(FixUpInfo& fixUpInf
BSONObj fixedObj = bob.obj();
// Parse the oplog entry.
- auto oplogEntry = OplogEntry(fixedObj);
+ const OplogEntry oplogEntry(fixedObj);
if (isNestedApplyOpsCommand) {
LOG(2) << "Updating rollback FixUpInfo for nested applyOps oplog entry: "
@@ -254,9 +254,7 @@ Status rollback_internal::updateFixUpInfoFromLocalOplogEntry(FixUpInfo& fixUpInf
<< redact(oplogEntry.toBSON()));
}
- BSONObj obj =
- oplogEntry.raw.getObjectField(oplogEntry.getOpType() == OpTypeEnum::kUpdate ? "o2" : "o");
-
+ auto obj = oplogEntry.getOperationToApply();
if (obj.isEmpty()) {
throw RSFatalException(str::stream() << "Local op on rollback has no object field: "
<< redact(oplogEntry.toBSON()));
@@ -264,7 +262,7 @@ Status rollback_internal::updateFixUpInfoFromLocalOplogEntry(FixUpInfo& fixUpInf
// If the operation being rolled back has a txnNumber, then the corresponding entry in the
// session transaction table needs to be refetched.
- auto operationSessionInfo = oplogEntry.getOperationSessionInfo();
+ const auto& operationSessionInfo = oplogEntry.getOperationSessionInfo();
auto txnNumber = operationSessionInfo.getTxnNumber();
if (txnNumber) {
auto sessionId = operationSessionInfo.getSessionId();
@@ -567,7 +565,7 @@ Status rollback_internal::updateFixUpInfoFromLocalOplogEntry(FixUpInfo& fixUpInf
// If we are inserting/updating/deleting a document in the oplog entry, we will update
// the doc._id field when we actually insert the docID into the docsToRefetch set.
- DocID doc = DocID(oplogEntry.raw, BSONElement(), *uuid);
+ DocID doc = DocID(fixedObj, BSONElement(), *uuid);
doc._id = oplogEntry.getIdElement();
if (doc._id.eoo()) {
diff --git a/src/mongo/db/repl/rs_rollback_test.cpp b/src/mongo/db/repl/rs_rollback_test.cpp
index 5e310eaf9aa..f6ca06237cd 100644
--- a/src/mongo/db/repl/rs_rollback_test.cpp
+++ b/src/mongo/db/repl/rs_rollback_test.cpp
@@ -2068,7 +2068,45 @@ TEST(RSRollbackTest, LocalEntryWithoutOIsFatal) {
RSFatalException);
}
-TEST(RSRollbackTest, LocalEntryWithoutO2IsFatal) {
+TEST(RSRollbackTest, LocalUpdateEntryWithoutO2IsFatal) {
+ const auto validOplogEntry = BSON("op"
+ << "u"
+ << "ui"
+ << UUID::gen()
+ << "ts"
+ << Timestamp(1, 1)
+ << "t"
+ << 1LL
+ << "h"
+ << 1LL
+ << "ns"
+ << "test.t"
+ << "o"
+ << BSON("_id" << 1 << "a" << 1)
+ << "o2"
+ << BSON("_id" << 1));
+ FixUpInfo fui;
+ ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, validOplogEntry, false));
+ const auto invalidOplogEntry = BSON("op"
+ << "u"
+ << "ui"
+ << UUID::gen()
+ << "ts"
+ << Timestamp(1, 1)
+ << "t"
+ << 1LL
+ << "h"
+ << 1LL
+ << "ns"
+ << "test.t"
+ << "o"
+ << BSON("_id" << 1 << "a" << 1));
+ ASSERT_THROWS(
+ updateFixUpInfoFromLocalOplogEntry(fui, invalidOplogEntry, false).transitional_ignore(),
+ RSFatalException);
+}
+
+TEST(RSRollbackTest, LocalUpdateEntryWithEmptyO2IsFatal) {
const auto validOplogEntry = BSON("op"
<< "u"
<< "ui"
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 3fa8f87db9f..f25e398443a 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -424,20 +424,20 @@ Status SyncTail::syncApply(OperationContext* opCtx,
namespace {
// The pool threads call this to prefetch each op
-void prefetchOp(const BSONObj& op) {
+void prefetchOp(const OplogEntry& oplogEntry) {
initializePrefetchThread();
- const char* ns = op.getStringField("ns");
- if (ns && (ns[0] != '\0')) {
+ const auto& nss = oplogEntry.getNamespace();
+ if (!nss.isEmpty()) {
try {
// one possible tweak here would be to stay in the read lock for this database
// for multiple prefetches if they are for the same database.
const ServiceContext::UniqueOperationContext opCtxPtr = cc().makeOperationContext();
OperationContext& opCtx = *opCtxPtr;
- AutoGetCollectionForReadCommand ctx(&opCtx, NamespaceString(ns));
+ AutoGetCollectionForReadCommand ctx(&opCtx, nss);
Database* db = ctx.getDb();
if (db) {
- prefetchPagesForReplicatedOp(&opCtx, db, op);
+ prefetchPagesForReplicatedOp(&opCtx, db, oplogEntry);
}
} catch (const DBException& e) {
LOG(2) << "ignoring exception in prefetchOp(): " << redact(e) << endl;
@@ -452,7 +452,7 @@ void prefetchOp(const BSONObj& op) {
void prefetchOps(const MultiApplier::Operations& ops, OldThreadPool* prefetcherPool) {
invariant(prefetcherPool);
for (auto&& op : ops) {
- prefetcherPool->schedule([&] { prefetchOp(op.raw); });
+ prefetcherPool->schedule([&] { prefetchOp(op); });
}
prefetcherPool->join();
}
@@ -928,10 +928,8 @@ void SyncTail::oplogApplication(ReplicationCoordinator* replCoord) {
}
// Extract some info from ops that we'll need after releasing the batch below.
- const auto firstOpTimeInBatch =
- fassertStatusOK(40299, OpTime::parseFromOplogEntry(ops.front().raw));
- const auto lastOpTimeInBatch =
- fassertStatusOK(28773, OpTime::parseFromOplogEntry(ops.back().raw));
+ const auto firstOpTimeInBatch = ops.front().getOpTime();
+ const auto lastOpTimeInBatch = ops.back().getOpTime();
// Make sure the oplog doesn't go back in time or repeat an entry.
if (firstOpTimeInBatch <= replCoord->getMyLastAppliedOpTime()) {
@@ -1029,7 +1027,7 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* opCtx,
int curVersion = entry.getVersion();
if (curVersion != OplogEntry::kOplogVersion) {
severe() << "expected oplog version " << OplogEntry::kOplogVersion << " but found version "
- << curVersion << " in oplog entry: " << redact(entry.raw);
+ << curVersion << " in oplog entry: " << redact(entry.toBSON());
fassertFailedNoTrace(18820);
}
@@ -1391,13 +1389,13 @@ Status multiSyncApply_noAbort(OperationContext* opCtx,
const Status status = syncApply(opCtx, entry->raw, oplogApplicationMode);
if (!status.isOK()) {
- severe() << "Error applying operation (" << redact(entry->raw)
+ severe() << "Error applying operation (" << redact(entry->toBSON())
<< "): " << causedBy(redact(status));
return status;
}
} catch (const DBException& e) {
severe() << "writer worker caught exception: " << redact(e)
- << " on: " << redact(entry->raw);
+ << " on: " << redact(entry->toBSON());
return e.toStatus();
}
}
@@ -1443,7 +1441,7 @@ Status multiInitialSyncApply_noAbort(OperationContext* opCtx,
// sync source.
if (s != ErrorCodes::UpdateOperationFailed) {
error() << "Error applying operation: " << redact(s) << " ("
- << redact(entry.raw) << ")";
+ << redact(entry.toBSON()) << ")";
return s;
}
@@ -1459,7 +1457,7 @@ Status multiInitialSyncApply_noAbort(OperationContext* opCtx,
}
severe() << "writer worker caught exception: " << causedBy(redact(e))
- << " on: " << redact(entry.raw);
+ << " on: " << redact(entry.toBSON());
return e.toStatus();
}
}