diff options
author | Randolph Tan <randolph@10gen.com> | 2020-12-15 18:41:14 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-12-17 21:16:53 +0000 |
commit | 6364c95692fbf3cd48c5ba13772eede90b61fe86 (patch) | |
tree | ea832087fea452fde6ca8678277147a958bad8b2 /src | |
parent | f6abb42a511110023ceb130e45787ff881e7a2e9 (diff) | |
download | mongo-6364c95692fbf3cd48c5ba13772eede90b61fe86.tar.gz |
SERVER-49904 Create new oplog type to attach metadata during runtime
Diffstat (limited to 'src')
54 files changed, 1280 insertions, 923 deletions
diff --git a/src/mongo/db/catalog/index_build_oplog_entry.cpp b/src/mongo/db/catalog/index_build_oplog_entry.cpp index 07d965ba64a..5cb54358a3d 100644 --- a/src/mongo/db/catalog/index_build_oplog_entry.cpp +++ b/src/mongo/db/catalog/index_build_oplog_entry.cpp @@ -127,7 +127,7 @@ StatusWith<IndexBuildOplogEntry> IndexBuildOplogEntry::parse(const repl::OplogEn } auto collUUID = entry.getUuid(); - invariant(collUUID, str::stream() << redact(entry.getRaw())); + invariant(collUUID, str::stream() << redact(entry.toBSONForLogging())); return IndexBuildOplogEntry{*collUUID, commandType, diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 7d526ae95c0..66c9b35a9b0 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -70,8 +70,8 @@ #include "mongo/util/fail_point.h" namespace mongo { +using repl::DurableOplogEntry; using repl::MutableOplogEntry; -using repl::OplogEntry; const OperationContext::Decoration<boost::optional<OpObserverImpl::DocumentKey>> documentKeyDecoration = OperationContext::declareDecoration<boost::optional<OpObserverImpl::DocumentKey>>(); @@ -957,7 +957,8 @@ std::vector<repl::ReplOperation>::iterator packTransactionStatementsForApplyOps( // should be able to be applied. if (opsArray.arrSize() == gMaxNumberOfTransactionOperationsInSingleOplogEntry || (opsArray.arrSize() > 0 && - (opsArray.len() + OplogEntry::getDurableReplOperationSize(stmt) > BSONObjMaxUserSize))) + (opsArray.len() + DurableOplogEntry::getDurableReplOperationSize(stmt) > + BSONObjMaxUserSize))) break; opsArray.append(stmt.toBSON()); } diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp index 32828f77705..35adf966502 100644 --- a/src/mongo/db/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer_impl_test.cpp @@ -2317,13 +2317,13 @@ TEST_F(OpObserverLargeTransactionTest, LargeTransactionCreatesMultipleOplogEntri // entry. constexpr size_t kHalfTransactionSize = BSONObjMaxInternalSize / 2 - 175; std::unique_ptr<uint8_t[]> halfTransactionData(new uint8_t[kHalfTransactionSize]()); - auto operation1 = repl::OplogEntry::makeInsertOperation( + auto operation1 = repl::DurableOplogEntry::makeInsertOperation( nss, uuid, BSON( "_id" << 0 << "data" << BSONBinData(halfTransactionData.get(), kHalfTransactionSize, BinDataGeneral))); - auto operation2 = repl::OplogEntry::makeInsertOperation( + auto operation2 = repl::DurableOplogEntry::makeInsertOperation( nss, uuid, BSON( diff --git a/src/mongo/db/ops/write_ops_retryability.cpp b/src/mongo/db/ops/write_ops_retryability.cpp index ba12fe0c453..521f15d4fad 100644 --- a/src/mongo/db/ops/write_ops_retryability.cpp +++ b/src/mongo/db/ops/write_ops_retryability.cpp @@ -57,7 +57,7 @@ void validateFindAndModifyRetryability(const write_ops::FindAndModifyCommand& re str::stream() << "findAndModify retry request: " << redact(request.toBSON({})) << " is not compatible with previous write in the transaction of type: " << OpType_serializer(oplogEntry.getOpType()) << ", oplogTs: " - << ts.toString() << ", oplog: " << redact(oplogEntry.toBSON()), + << ts.toString() << ", oplog: " << redact(oplogEntry.toBSONForLogging()), request.getRemove().value_or(false)); uassert(40607, str::stream() << "No pre-image available for findAndModify retry request:" @@ -69,7 +69,7 @@ void validateFindAndModifyRetryability(const write_ops::FindAndModifyCommand& re str::stream() << "findAndModify retry request: " << redact(request.toBSON({})) << " is not compatible with previous write in the transaction of type: " << OpType_serializer(oplogEntry.getOpType()) << ", oplogTs: " - << ts.toString() << ", oplog: " << redact(oplogEntry.toBSON()), + << ts.toString() << ", oplog: " << redact(oplogEntry.toBSONForLogging()), request.getUpsert().value_or(false)); } else { uassert( @@ -77,7 +77,7 @@ void validateFindAndModifyRetryability(const write_ops::FindAndModifyCommand& re str::stream() << "findAndModify retry request: " << redact(request.toBSON({})) << " is not compatible with previous write in the transaction of type: " << OpType_serializer(oplogEntry.getOpType()) << ", oplogTs: " - << ts.toString() << ", oplog: " << redact(oplogEntry.toBSON()), + << ts.toString() << ", oplog: " << redact(oplogEntry.toBSONForLogging()), opType == repl::OpTypeEnum::kUpdate); if (request.getNew().value_or(false)) { @@ -85,14 +85,16 @@ void validateFindAndModifyRetryability(const write_ops::FindAndModifyCommand& re str::stream() << "findAndModify retry request: " << redact(request.toBSON({})) << " wants the document after update returned, but only before " "update document is stored, oplogTs: " - << ts.toString() << ", oplog: " << redact(oplogEntry.toBSON()), + << ts.toString() + << ", oplog: " << redact(oplogEntry.toBSONForLogging()), oplogWithCorrectLinks.getPostImageOpTime()); } else { uassert(40612, str::stream() << "findAndModify retry request: " << redact(request.toBSON({})) << " wants the document before update returned, but only after " "update document is stored, oplogTs: " - << ts.toString() << ", oplog: " << redact(oplogEntry.toBSON()), + << ts.toString() + << ", oplog: " << redact(oplogEntry.toBSONForLogging()), oplogWithCorrectLinks.getPreImageOpTime()); } } @@ -157,7 +159,7 @@ repl::OplogEntry getInnerNestedOplogEntry(const repl::OplogEntry& entry) { uassert(40635, str::stream() << "expected nested oplog entry with ts: " << entry.getTimestamp().toString() - << " to have o2 field: " << redact(entry.toBSON()), + << " to have o2 field: " << redact(entry.toBSONForLogging()), entry.getObject2()); return uassertStatusOK(repl::OplogEntry::parse(*entry.getObject2())); } @@ -185,7 +187,7 @@ SingleWriteResult parseOplogEntryForUpdate(const repl::OplogEntry& entry) { "the transaction of type: " << OpType_serializer(entry.getOpType()) << ", oplogTs: " << entry.getTimestamp().toString() - << ", oplog: " << redact(entry.toBSON())); + << ", oplog: " << redact(entry.toBSONForLogging())); } return res; diff --git a/src/mongo/db/ops/write_ops_retryability_test.cpp b/src/mongo/db/ops/write_ops_retryability_test.cpp index 61738479b60..9ae76ce54b8 100644 --- a/src/mongo/db/ops/write_ops_retryability_test.cpp +++ b/src/mongo/db/ops/write_ops_retryability_test.cpp @@ -66,24 +66,25 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime, boost::optional<BSONObj> o2Field = boost::none, boost::optional<repl::OpTime> preImageOpTime = boost::none, boost::optional<repl::OpTime> postImageOpTime = boost::none) { - return repl::OplogEntry(opTime, // optime - boost::none, // hash - opType, // opType - nss, // namespace - boost::none, // uuid - boost::none, // fromMigrate - repl::OplogEntry::kOplogVersion, // version - oField, // o - o2Field, // o2 - {}, // sessionInfo - boost::none, // upsert - Date_t(), // wall clock time - boost::none, // statement id - boost::none, // optime of previous write within same transaction - preImageOpTime, // pre-image optime - postImageOpTime, // post-image optime - boost::none, // ShardId of resharding recipient - boost::none); // _id + return { + repl::DurableOplogEntry(opTime, // optime + boost::none, // hash + opType, // opType + nss, // namespace + boost::none, // uuid + boost::none, // fromMigrate + repl::OplogEntry::kOplogVersion, // version + oField, // o + o2Field, // o2 + {}, // sessionInfo + boost::none, // upsert + Date_t(), // wall clock time + boost::none, // statement id + boost::none, // optime of previous write within same transaction + preImageOpTime, // pre-image optime + postImageOpTime, // post-image optime + boost::none, // ShardId of resharding recipient + boost::none)}; // _id } void setUpReplication(ServiceContext* svcCtx) { @@ -134,7 +135,7 @@ TEST_F(WriteOpsRetryability, ParseOplogEntryForNestedUpdate) { repl::OpTypeEnum::kNoop, // op type NamespaceString("a.b"), // namespace kNestedOplog, // o - innerOplog.toBSON()); // o2 + innerOplog.getEntry().toBSON()); // o2 auto res = parseOplogEntryForUpdate(updateOplog); @@ -167,7 +168,7 @@ TEST_F(WriteOpsRetryability, ParseOplogEntryForNestedUpsert) { repl::OpTypeEnum::kNoop, // op type NamespaceString("a.b"), // namespace kNestedOplog, // o - innerOplog.toBSON()); // o2 + innerOplog.getEntry().toBSON()); // o2 auto res = parseOplogEntryForUpdate(insertOplog); @@ -421,7 +422,7 @@ TEST_F(FindAndModifyRetryability, NestedUpsert) { repl::OpTypeEnum::kNoop, // op type kNs, // namespace kNestedOplog, // o - innerOplog.toBSON()); // o2 + innerOplog.getEntry().toBSON()); // o2 auto result = constructFindAndModifyRetryResult(opCtx(), request, insertOplog); ASSERT_BSONOBJ_EQ(BSON("lastErrorObject" @@ -568,7 +569,7 @@ TEST_F(FindAndModifyRetryability, NestedUpdateWithPreImage) { repl::OpTypeEnum::kNoop, // optype kNs, // namespace kNestedOplog, // o - innerOplog.toBSON(), // o2 + innerOplog.getEntry().toBSON(), // o2 imageOpTime, // pre-image optime boost::none); // post-image optime @@ -628,7 +629,7 @@ TEST_F(FindAndModifyRetryability, NestedUpdateWithPostImage) { repl::OpTypeEnum::kNoop, // op type kNs, // namespace kNestedOplog, // o - innerOplog.toBSON(), // o2 + innerOplog.getEntry().toBSON(), // o2 boost::none, // pre-image optime imageOpTime); // post-image optime @@ -703,7 +704,7 @@ TEST_F(FindAndModifyRetryability, NestedRemove) { repl::OpTypeEnum::kNoop, // op type kNs, // namespace kNestedOplog, // o - innerOplog.toBSON(), // o2 + innerOplog.getEntry().toBSON(), // o2 imageOpTime, // pre-image optime boost::none); // post-image optime diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index 89c207471a0..f68c4cca237 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -205,7 +205,7 @@ public: const boost::optional<Document> expectedInvalidate = {}, const std::vector<repl::OplogEntry> transactionEntries = {}, std::vector<Document> documentsForLookup = {}) { - vector<intrusive_ptr<DocumentSource>> stages = makeStages(entry.toBSON(), spec); + vector<intrusive_ptr<DocumentSource>> stages = makeStages(entry.getEntry().toBSON(), spec); auto closeCursor = stages.back(); getExpCtx()->mongoProcessInterface = std::make_unique<MockMongoInterface>( @@ -273,7 +273,7 @@ public: } vector<intrusive_ptr<DocumentSource>> makeStages(const OplogEntry& entry) { - return makeStages(entry.toBSON(), kDefaultSpec); + return makeStages(entry.getEntry().toBSON(), kDefaultSpec); } OplogEntry createCommand(const BSONObj& oField, @@ -319,7 +319,7 @@ public: testUuid(), boost::none, // fromMigrate BSONObj()); - BSONObjBuilder builder(baseOplogEntry.toBSON()); + BSONObjBuilder builder(baseOplogEntry.getEntry().toBSON()); builder.append("lsid", lsid.toBSON()); builder.append("txnNumber", 0LL); BSONObj oplogEntry = builder.done(); @@ -371,24 +371,25 @@ public: boost::optional<repl::OpTime> prevOpTime = {}, boost::optional<repl::OpTime> preImageOpTime = boost::none) { long long hash = 1LL; - return repl::OplogEntry(opTime ? *opTime : kDefaultOpTime, // optime - hash, // hash - opType, // opType - nss, // namespace - uuid, // uuid - fromMigrate, // fromMigrate - repl::OplogEntry::kOplogVersion, // version - object, // o - object2, // o2 - sessionInfo, // sessionInfo - boost::none, // upsert - Date_t(), // wall clock time - boost::none, // statement id - prevOpTime, // optime of previous write within same transaction - preImageOpTime, // pre-image optime - boost::none, // post-image optime - boost::none, // ShardId of resharding recipient - boost::none); // _id + return { + repl::DurableOplogEntry(opTime ? *opTime : kDefaultOpTime, // optime + hash, // hash + opType, // opType + nss, // namespace + uuid, // uuid + fromMigrate, // fromMigrate + repl::OplogEntry::kOplogVersion, // version + object, // o + object2, // o2 + sessionInfo, // sessionInfo + boost::none, // upsert + Date_t(), // wall clock time + boost::none, // statement id + prevOpTime, // optime of previous write within same transaction + preImageOpTime, // pre-image optime + boost::none, // post-image optime + boost::none, // ShardId of resharding recipient + boost::none)}; // _id } /** @@ -1213,24 +1214,24 @@ TEST_F(ChangeStreamStageTest, CommitCommandReturnsOperationsFromPreparedTransact sessionInfo.setTxnNumber(1); sessionInfo.setSessionId(makeLogicalSessionIdForTest()); auto oplogEntry = - repl::OplogEntry(kDefaultOpTime, // optime - 1LL, // hash - OpTypeEnum::kCommand, // opType - nss.getCommandNS(), // namespace - boost::none, // uuid - boost::none, // fromMigrate - repl::OplogEntry::kOplogVersion, // version - BSON("commitTransaction" << 1), // o - boost::none, // o2 - sessionInfo, // sessionInfo - boost::none, // upsert - Date_t(), // wall clock time - boost::none, // statement id - applyOpsOpTime, // optime of previous write within same transaction - boost::none, // pre-image optime - boost::none, // post-image optime - boost::none, // ShardId of resharding recipient - boost::none); // _id + repl::DurableOplogEntry(kDefaultOpTime, // optime + 1LL, // hash + OpTypeEnum::kCommand, // opType + nss.getCommandNS(), // namespace + boost::none, // uuid + boost::none, // fromMigrate + repl::OplogEntry::kOplogVersion, // version + BSON("commitTransaction" << 1), // o + boost::none, // o2 + sessionInfo, // sessionInfo + boost::none, // upsert + Date_t(), // wall clock time + boost::none, // statement id + applyOpsOpTime, // optime of previous write within same transaction + boost::none, // pre-image optime + boost::none, // post-image optime + boost::none, // ShardId of resharding recipient + boost::none); // _id // When the DocumentSourceChangeStreamTransform sees the "commitTransaction" oplog entry, we // expect it to return the insert op within our 'preparedApplyOps' oplog entry. @@ -1411,25 +1412,25 @@ TEST_F(ChangeStreamStageTest, PreparedTransactionWithMultipleOplogEntries) { applyOpsOpTime1); // Create an oplog entry representing the commit for the prepared transaction. - auto commitEntry = - repl::OplogEntry(kDefaultOpTime, // optime - 1LL, // hash - OpTypeEnum::kCommand, // opType - nss.getCommandNS(), // namespace - boost::none, // uuid - boost::none, // fromMigrate - repl::OplogEntry::kOplogVersion, // version - BSON("commitTransaction" << 1), // o - boost::none, // o2 - sessionInfo, // sessionInfo - boost::none, // upsert - Date_t(), // wall clock time - boost::none, // statement id - applyOpsOpTime2, // optime of previous write within same transaction - boost::none, // pre-image optime - boost::none, // post-image optime - boost::none, // ShardId of resharding recipient - boost::none); // _id + auto commitEntry = repl::DurableOplogEntry( + kDefaultOpTime, // optime + 1LL, // hash + OpTypeEnum::kCommand, // opType + nss.getCommandNS(), // namespace + boost::none, // uuid + boost::none, // fromMigrate + repl::OplogEntry::kOplogVersion, // version + BSON("commitTransaction" << 1), // o + boost::none, // o2 + sessionInfo, // sessionInfo + boost::none, // upsert + Date_t(), // wall clock time + boost::none, // statement id + applyOpsOpTime2, // optime of previous write within same transaction + boost::none, // pre-image optime + boost::none, // post-image optime + boost::none, // ShardId of resharding recipient + boost::none); // _id // We do not use the checkTransformation() pattern that other tests use since we expect multiple // documents to be returned from one applyOps. @@ -1884,7 +1885,7 @@ TEST_F(ChangeStreamStageTest, UsesResumeTokenAsSortKeyIfNeedsMergeIsFalse) { boost::none, // fromMigrate boost::none); // o2 - auto stages = makeStages(insert.toBSON(), kDefaultSpec); + auto stages = makeStages(insert.getEntry().toBSON(), kDefaultSpec); getExpCtx()->mongoProcessInterface = std::make_unique<MockMongoInterface>(std::vector<FieldPath>{{"x"}, {"_id"}}); @@ -2212,7 +2213,7 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForDelete) { // entry before it so that we know we are finding the pre-image based on the given timestamp. repl::OpTime dummyOpTime{preImageOpTime.getTimestamp(), repl::OpTime::kInitialTerm}; std::vector<Document> documentsForLookup = {Document{dummyOpTime.toBSON()}, - Document{preImageEntry.toBSON()}}; + Document{preImageEntry.getEntry().toBSON()}}; // When run with {fullDocumentBeforeChange: "off"}, we do not see a pre-image even if available. auto spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" @@ -2301,7 +2302,7 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForUpdate) { // entry before it so that we know we are finding the pre-image based on the given timestamp. repl::OpTime dummyOpTime{preImageOpTime.getTimestamp(), repl::OpTime::kInitialTerm}; std::vector<Document> documentsForLookup = {Document{dummyOpTime.toBSON()}, - Document{preImageEntry.toBSON()}}; + Document{preImageEntry.getEntry().toBSON()}}; // When run with {fullDocumentBeforeChange: "off"}, we do not see a pre-image even if available. auto spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" @@ -2398,7 +2399,7 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForReplace) { // entry before it so that we know we are finding the pre-image based on the given timestamp. repl::OpTime dummyOpTime{preImageOpTime.getTimestamp(), repl::OpTime::kInitialTerm}; std::vector<Document> documentsForLookup = {Document{dummyOpTime.toBSON()}, - Document{preImageEntry.toBSON()}}; + Document{preImageEntry.getEntry().toBSON()}}; // When run with {fullDocumentBeforeChange: "off"}, we do not see a pre-image even if available. auto spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" diff --git a/src/mongo/db/repl/apply_ops.cpp b/src/mongo/db/repl/apply_ops.cpp index fb3f190f9d2..840cd23db3b 100644 --- a/src/mongo/db/repl/apply_ops.cpp +++ b/src/mongo/db/repl/apply_ops.cpp @@ -503,7 +503,7 @@ Status applyOps(OperationContext* opCtx, // static std::vector<OplogEntry> ApplyOps::extractOperations(const OplogEntry& applyOpsOplogEntry) { std::vector<OplogEntry> result; - extractOperationsTo(applyOpsOplogEntry, applyOpsOplogEntry.toBSON(), &result); + extractOperationsTo(applyOpsOplogEntry, applyOpsOplogEntry.getEntry().toBSON(), &result); return result; } @@ -513,12 +513,12 @@ void ApplyOps::extractOperationsTo(const OplogEntry& applyOpsOplogEntry, std::vector<OplogEntry>* operations) { uassert(ErrorCodes::TypeMismatch, str::stream() << "ApplyOps::extractOperations(): not a command: " - << redact(applyOpsOplogEntry.toBSON()), + << redact(applyOpsOplogEntry.toBSONForLogging()), applyOpsOplogEntry.isCommand()); uassert(ErrorCodes::CommandNotSupported, str::stream() << "ApplyOps::extractOperations(): not applyOps command: " - << redact(applyOpsOplogEntry.toBSON()), + << redact(applyOpsOplogEntry.toBSONForLogging()), OplogEntry::CommandType::kApplyOps == applyOpsOplogEntry.getCommandType()); auto cmdObj = applyOpsOplogEntry.getOperationToApply(); diff --git a/src/mongo/db/repl/apply_ops_test.cpp b/src/mongo/db/repl/apply_ops_test.cpp index 88b36f5827c..a61db70a0b6 100644 --- a/src/mongo/db/repl/apply_ops_test.cpp +++ b/src/mongo/db/repl/apply_ops_test.cpp @@ -337,24 +337,24 @@ TEST_F(ApplyOpsTest, ApplyOpsPropagatesOplogApplicationMode) { * Generates oplog entries with the given number used for the timestamp. */ OplogEntry makeOplogEntry(OpTypeEnum opType, const BSONObj& oField) { - return OplogEntry(OpTime(Timestamp(1, 1), 1), // optime - boost::none, // hash - opType, // op type - NamespaceString("a.a"), // namespace - boost::none, // uuid - boost::none, // fromMigrate - OplogEntry::kOplogVersion, // version - oField, // o - boost::none, // o2 - {}, // sessionInfo - boost::none, // upsert - Date_t(), // wall clock time - boost::none, // statement id - boost::none, // optime of previous write within same transaction - boost::none, // pre-image optime - boost::none, // post-image optime - boost::none, // ShardId of resharding recipient - boost::none); // _id + return {DurableOplogEntry(OpTime(Timestamp(1, 1), 1), // optime + boost::none, // hash + opType, // op type + NamespaceString("a.a"), // namespace + boost::none, // uuid + boost::none, // fromMigrate + OplogEntry::kOplogVersion, // version + oField, // o + boost::none, // o2 + {}, // sessionInfo + boost::none, // upsert + Date_t(), // wall clock time + boost::none, // statement id + boost::none, // optime of previous write within same transaction + boost::none, // pre-image optime + boost::none, // post-image optime + boost::none, // ShardId of resharding recipient + boost::none)}; // _id } TEST_F(ApplyOpsTest, ExtractOperationsReturnsTypeMismatchIfNotCommand) { @@ -403,7 +403,7 @@ TEST_F(ApplyOpsTest, ExtractOperationsReturnsOperationsWithSameOpTimeAsApplyOps) auto operations = ApplyOps::extractOperations(oplogEntry); ASSERT_EQUALS(3U, operations.size()) - << "Unexpected number of operations extracted: " << oplogEntry.toBSON(); + << "Unexpected number of operations extracted: " << oplogEntry.toBSONForLogging(); // Check extracted CRUD operations. auto it = operations.cbegin(); @@ -411,7 +411,7 @@ TEST_F(ApplyOpsTest, ExtractOperationsReturnsOperationsWithSameOpTimeAsApplyOps) ASSERT(operations.cend() != it); const auto& operation1 = *(it++); ASSERT(OpTypeEnum::kInsert == operation1.getOpType()) - << "Unexpected op type: " << operation1.toBSON(); + << "Unexpected op type: " << operation1.toBSONForLogging(); ASSERT_EQUALS(ui1, *operation1.getUuid()); ASSERT_EQUALS(ns1, operation1.getNss()); ASSERT_BSONOBJ_EQ(BSON("_id" << 1), operation1.getOperationToApply()); @@ -424,7 +424,7 @@ TEST_F(ApplyOpsTest, ExtractOperationsReturnsOperationsWithSameOpTimeAsApplyOps) ASSERT(operations.cend() != it); const auto& operation2 = *(it++); ASSERT(OpTypeEnum::kInsert == operation2.getOpType()) - << "Unexpected op type: " << operation2.toBSON(); + << "Unexpected op type: " << operation2.toBSONForLogging(); ASSERT_EQUALS(ui2, *operation2.getUuid()); ASSERT_EQUALS(ns2, operation2.getNss()); ASSERT_BSONOBJ_EQ(BSON("_id" << 2), operation2.getOperationToApply()); @@ -437,7 +437,7 @@ TEST_F(ApplyOpsTest, ExtractOperationsReturnsOperationsWithSameOpTimeAsApplyOps) ASSERT(operations.cend() != it); const auto& operation3 = *(it++); ASSERT(OpTypeEnum::kUpdate == operation3.getOpType()) - << "Unexpected op type: " << operation3.toBSON(); + << "Unexpected op type: " << operation3.toBSONForLogging(); ASSERT_EQUALS(ui3, *operation3.getUuid()); ASSERT_EQUALS(ns3, operation3.getNss()); ASSERT_BSONOBJ_EQ(BSON("x" << 1), operation3.getOperationToApply()); diff --git a/src/mongo/db/repl/idempotency_test.cpp b/src/mongo/db/repl/idempotency_test.cpp index aad94b99636..f96aac80e4d 100644 --- a/src/mongo/db/repl/idempotency_test.cpp +++ b/src/mongo/db/repl/idempotency_test.cpp @@ -163,7 +163,7 @@ std::string RandomizedIdempotencyTest::getStatesString(const std::vector<Collect } else { firstIter = false; } - sb << op.toString(); + sb << op.toStringForLogging(); } sb << " ]\n"; diff --git a/src/mongo/db/repl/idempotency_test_fixture.cpp b/src/mongo/db/repl/idempotency_test_fixture.cpp index 6590ccbc031..223861d4d15 100644 --- a/src/mongo/db/repl/idempotency_test_fixture.cpp +++ b/src/mongo/db/repl/idempotency_test_fixture.cpp @@ -448,7 +448,7 @@ std::string IdempotencyTest::getStatesString(const std::vector<CollectionState>& sb << "found after applying the operations a second time, therefore breaking idempotency.\n"; sb << "Applied ops:\n"; for (const auto& op : state2Ops) { - sb << op.toString() << "\n"; + sb << op.toStringForLogging() << "\n"; } return sb.str(); } diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp index 5113077ce8a..2441177164c 100644 --- a/src/mongo/db/repl/initial_syncer_test.cpp +++ b/src/mongo/db/repl/initial_syncer_test.cpp @@ -623,28 +623,28 @@ OplogEntry makeOplogEntry(int t, oField = BSON("dropIndexes" << "a_1"); } - return OplogEntry(OpTime(Timestamp(t, 1), 1), // optime - boost::none, // hash - opType, // op type - NamespaceString("a.a"), // namespace - boost::none, // uuid - boost::none, // fromMigrate - version, // version - oField, // o - boost::none, // o2 - {}, // sessionInfo - boost::none, // upsert - Date_t() + Seconds(t), // wall clock time - boost::none, // statement id - boost::none, // optime of previous write within same transaction - boost::none, // pre-image optime - boost::none, // post-image optime - boost::none, // ShardId of resharding recipient - boost::none); // _id + return {DurableOplogEntry(OpTime(Timestamp(t, 1), 1), // optime + boost::none, // hash + opType, // op type + NamespaceString("a.a"), // namespace + boost::none, // uuid + boost::none, // fromMigrate + version, // version + oField, // o + boost::none, // o2 + {}, // sessionInfo + boost::none, // upsert + Date_t() + Seconds(t), // wall clock time + boost::none, // statement id + boost::none, // optime of previous write within same transaction + boost::none, // pre-image optime + boost::none, // post-image optime + boost::none, // ShardId of resharding recipient + boost::none)}; // _id } BSONObj makeOplogEntryObj(int t, OpTypeEnum opType, int version) { - return makeOplogEntry(t, opType, version).toBSON(); + return makeOplogEntry(t, opType, version).getEntry().toBSON(); } void InitialSyncerTest::processSuccessfulLastOplogEntryFetcherResponse(std::vector<BSONObj> docs) { @@ -2037,13 +2037,13 @@ TEST_F( _mock->runUntilExpectationsSatisfied(); // Simulate response to OplogFetcher so it has enough operations to reach end timestamp. - getOplogFetcher()->receiveBatch(1LL, {makeOplogEntryObj(1), lastOp.toBSON()}); + getOplogFetcher()->receiveBatch(1LL, {makeOplogEntryObj(1), lastOp.getEntry().toBSON()}); // Simulate a network error response that restarts the OplogFetcher. getOplogFetcher()->simulateResponseError(Status(ErrorCodes::NetworkTimeout, "network error")); _mock ->expect([](auto& request) { return request["find"].str() == "oplog.rs"; }, - makeCursorResponse(0LL, _options.localOplogNS, {lastOp.toBSON()})) + makeCursorResponse(0LL, _options.localOplogNS, {lastOp.getEntry().toBSON()})) .times(1); _mock->runUntilExpectationsSatisfied(); @@ -2096,7 +2096,8 @@ TEST_F(InitialSyncerTest, processSuccessfulFCVFetcherResponseLastLTS(); // Simulate response to OplogFetcher so it has enough operations to reach end timestamp. - getOplogFetcher()->receiveBatch(1LL, {makeOplogEntryObj(1), lastOp.toBSON()}); + getOplogFetcher()->receiveBatch(1LL, + {makeOplogEntryObj(1), lastOp.getEntry().toBSON()}); // Simulate a network error response that restarts the OplogFetcher. getOplogFetcher()->simulateResponseError( Status(ErrorCodes::NetworkTimeout, "network error")); @@ -2104,7 +2105,7 @@ TEST_F(InitialSyncerTest, // Oplog entry associated with the stopTimestamp. - processSuccessfulLastOplogEntryFetcherResponse({lastOp.toBSON()}); + processSuccessfulLastOplogEntryFetcherResponse({lastOp.getEntry().toBSON()}); request = net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); assertRemoteCommandNameEquals("replSetGetRBID", request); @@ -3525,7 +3526,7 @@ TEST_F(InitialSyncerTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfter net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); // Oplog entry associated with the defaultBeginFetchingTimestamp. - processSuccessfulLastOplogEntryFetcherResponse({oplogEntry.toBSON()}); + processSuccessfulLastOplogEntryFetcherResponse({oplogEntry.getEntry().toBSON()}); // Send an empty optime as the response to the beginFetchingOptime find request, which will // cause the beginFetchingTimestamp to be set to the defaultBeginFetchingTimestamp. @@ -3535,7 +3536,7 @@ TEST_F(InitialSyncerTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfter net->runReadyNetworkOperations(); // Oplog entry associated with the beginApplyingTimestamp. - processSuccessfulLastOplogEntryFetcherResponse({oplogEntry.toBSON()}); + processSuccessfulLastOplogEntryFetcherResponse({oplogEntry.getEntry().toBSON()}); // Instead of fast forwarding to AllDatabaseCloner completion by returning an empty list of // database names, we'll simulate copying a single database with a single collection on the @@ -3587,7 +3588,7 @@ TEST_F(InitialSyncerTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfter } // Oplog entry associated with the stopTimestamp. - processSuccessfulLastOplogEntryFetcherResponse({oplogEntry.toBSON()}); + processSuccessfulLastOplogEntryFetcherResponse({oplogEntry.getEntry().toBSON()}); // Last rollback checker replSetGetRBID command. request = assertRemoteCommandNameEquals( @@ -4128,11 +4129,12 @@ OplogEntry InitialSyncerTest::doInitialSyncWithOneBatch() { processSuccessfulFCVFetcherResponseLastLTS(); // Simulate an OplogFetcher batch that has enough operations to reach end timestamp. - getOplogFetcher()->receiveBatch(1LL, {makeOplogEntryObj(1), lastOp.toBSON()}); + getOplogFetcher()->receiveBatch(1LL, + {makeOplogEntryObj(1), lastOp.getEntry().toBSON()}); } // Oplog entry associated with the stopTimestamp. - processSuccessfulLastOplogEntryFetcherResponse({lastOp.toBSON()}); + processSuccessfulLastOplogEntryFetcherResponse({lastOp.getEntry().toBSON()}); request = net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); assertRemoteCommandNameEquals("replSetGetRBID", request); @@ -4265,11 +4267,11 @@ TEST_F(InitialSyncerTest, // Simulate an OplogFetcher batch that has enough operations to reach end timestamp. getOplogFetcher()->receiveBatch( - 1LL, {makeOplogEntryObj(1), makeOplogEntryObj(2), lastOp.toBSON()}); + 1LL, {makeOplogEntryObj(1), makeOplogEntryObj(2), lastOp.getEntry().toBSON()}); } // Oplog entry associated with the stopTimestamp. - processSuccessfulLastOplogEntryFetcherResponse({lastOp.toBSON()}); + processSuccessfulLastOplogEntryFetcherResponse({lastOp.getEntry().toBSON()}); // Last rollback ID. request = net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); diff --git a/src/mongo/db/repl/insert_group.cpp b/src/mongo/db/repl/insert_group.cpp index 2f7ec43f7c3..fc4870d77f8 100644 --- a/src/mongo/db/repl/insert_group.cpp +++ b/src/mongo/db/repl/insert_group.cpp @@ -78,7 +78,7 @@ StatusWith<InsertGroup::ConstIterator> InsertGroup::groupAndApplyInserts( if (entry.getOpType() != OpTypeEnum::kInsert) { return Status(ErrorCodes::TypeMismatch, "Can only group insert operations."); } - if (entry.isForCappedCollection) { + if (entry.isForCappedCollection()) { return Status(ErrorCodes::InvalidOptions, "Cannot group insert operations on capped collections."); } @@ -143,7 +143,7 @@ StatusWith<InsertGroup::ConstIterator> InsertGroup::groupAndApplyInserts( "Error applying inserts in bulk. Trying first insert as a lone insert"; auto status = exceptionToStatus().withContext( str::stream() << message << ". Grouped inserts: " << redact(groupedInserts.toBSON()) - << ". First insert: " << redact(entry.getRaw())); + << ". First insert: " << redact(entry.toBSONForLogging())); // It's not an error during initial sync to encounter DuplicateKey errors. if (Mode::kInitialSync == _mode && ErrorCodes::DuplicateKey == status) { @@ -151,12 +151,12 @@ StatusWith<InsertGroup::ConstIterator> InsertGroup::groupAndApplyInserts( 2, message, "groupedInserts"_attr = redact(groupedInserts.toBSON()), - "firstInsert"_attr = redact(entry.getRaw())); + "firstInsert"_attr = redact(entry.toBSONForLogging())); } else { LOGV2_ERROR(21204, message, "groupedInserts"_attr = redact(groupedInserts.toBSON()), - "firstInsert"_attr = redact(entry.getRaw())); + "firstInsert"_attr = redact(entry.toBSONForLogging())); } // Avoid quadratic run time from failed insert by not retrying until we diff --git a/src/mongo/db/repl/mock_repl_coord_server_fixture.cpp b/src/mongo/db/repl/mock_repl_coord_server_fixture.cpp index cda26ff534d..bc8704c8622 100644 --- a/src/mongo/db/repl/mock_repl_coord_server_fixture.cpp +++ b/src/mongo/db/repl/mock_repl_coord_server_fixture.cpp @@ -98,7 +98,7 @@ void MockReplCoordServerFixture::insertOplogEntry(const repl::OplogEntry& entry) WriteUnitOfWork wuow(opCtx()); auto status = coll->insertDocument(opCtx(), - InsertStatement(entry.toBSON()), + InsertStatement(entry.getEntry().toBSON()), &CurOp::get(opCtx())->debug(), /* fromMigrate */ false); ASSERT_OK(status); diff --git a/src/mongo/db/repl/multiapplier_test.cpp b/src/mongo/db/repl/multiapplier_test.cpp index f0636d159ae..c1f24145358 100644 --- a/src/mongo/db/repl/multiapplier_test.cpp +++ b/src/mongo/db/repl/multiapplier_test.cpp @@ -66,24 +66,24 @@ void MultiApplierTest::setUp() { * Generates oplog entries with the given number used for the timestamp. */ OplogEntry makeOplogEntry(int ts) { - return OplogEntry(OpTime(Timestamp(ts, 1), 1), // optime - boost::none, // hash - OpTypeEnum::kNoop, // op type - NamespaceString("a.a"), // namespace - boost::none, // uuid - boost::none, // fromMigrate - OplogEntry::kOplogVersion, // version - BSONObj(), // o - boost::none, // o2 - {}, // sessionInfo - boost::none, // upsert - Date_t(), // wall clock time - boost::none, // statement id - boost::none, // optime of previous write within same transaction - boost::none, // pre-image optime - boost::none, // post-image optime - boost::none, // ShardId of resharding recipient - boost::none); // _id + return {DurableOplogEntry(OpTime(Timestamp(ts, 1), 1), // optime + boost::none, // hash + OpTypeEnum::kNoop, // op type + NamespaceString("a.a"), // namespace + boost::none, // uuid + boost::none, // fromMigrate + OplogEntry::kOplogVersion, // version + BSONObj(), // o + boost::none, // o2 + {}, // sessionInfo + boost::none, // upsert + Date_t(), // wall clock time + boost::none, // statement id + boost::none, // optime of previous write within same transaction + boost::none, // pre-image optime + boost::none, // post-image optime + boost::none, // ShardId of resharding recipient + boost::none)}; // _id } TEST_F(MultiApplierTest, InvalidConstruction) { @@ -260,7 +260,7 @@ TEST_F( ASSERT_TRUE(multiApplyTxn); ASSERT_EQUALS(1U, operationsToApply.size()); - ASSERT_BSONOBJ_EQ(operations[0].getRaw(), operationsToApply[0].getRaw()); + ASSERT_BSONOBJ_EQ(operations[0].getEntry().toBSON(), operationsToApply[0].getEntry().toBSON()); ASSERT_OK(callbackResult); ASSERT_FALSE(callbackTxn); diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index d8c95789f04..09e1820ccf9 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -1221,8 +1221,8 @@ Status applyOperation_inlock(OperationContext* opCtx, // The oplog entry is corrupted; or // The version of the upstream server is obsolete. uassert(ErrorCodes::NoSuchKey, - str::stream() - << "Failed to apply insert due to missing _id: " << redact(op.toBSON()), + str::stream() << "Failed to apply insert due to missing _id: " + << redact(op.toBSONForLogging()), o.hasField("_id")); // 1. Insert if @@ -1346,7 +1346,7 @@ Status applyOperation_inlock(OperationContext* opCtx, auto idField = o2["_id"]; uassert(ErrorCodes::NoSuchKey, str::stream() << "Failed to apply update due to missing _id: " - << redact(op.toBSON()), + << redact(op.toBSONForLogging()), !idField.eoo()); // The o2 field may contain additional fields besides the _id (like the shard key @@ -1400,14 +1400,15 @@ Status applyOperation_inlock(OperationContext* opCtx, LOGV2_DEBUG(2170003, 2, "couldn't find doc in capped collection", - "op"_attr = redact(op.toBSON())); + "op"_attr = redact(op.toBSONForLogging())); } else if (ur.modifiers) { if (updateCriteria.nFields() == 1) { // was a simple { _id : ... } update criteria static constexpr char msg[] = "Failed to apply update"; - LOGV2_ERROR(21258, msg, "op"_attr = redact(op.toBSON())); + LOGV2_ERROR(21258, msg, "op"_attr = redact(op.toBSONForLogging())); return Status(ErrorCodes::UpdateOperationFailed, - str::stream() << msg << ": " << redact(op.toBSON())); + str::stream() + << msg << ": " << redact(op.toBSONForLogging())); } // Need to check to see if it isn't present so we can exit early with a @@ -1422,9 +1423,10 @@ Status applyOperation_inlock(OperationContext* opCtx, (!indexCatalog->haveIdIndex(opCtx) && Helpers::findOne(opCtx, collection, updateCriteria, false).isNull())) { static constexpr char msg[] = "Couldn't find document"; - LOGV2_ERROR(21259, msg, "op"_attr = redact(op.toBSON())); + LOGV2_ERROR(21259, msg, "op"_attr = redact(op.toBSONForLogging())); return Status(ErrorCodes::UpdateOperationFailed, - str::stream() << msg << ": " << redact(op.toBSON())); + str::stream() + << msg << ": " << redact(op.toBSONForLogging())); } // Otherwise, it's present; zero objects were updated because of additional @@ -1435,9 +1437,10 @@ Status applyOperation_inlock(OperationContext* opCtx, // is (presumably) missing. if (!upsert) { static constexpr char msg[] = "Update of non-mod failed"; - LOGV2_ERROR(21260, msg, "op"_attr = redact(op.toBSON())); + LOGV2_ERROR(21260, msg, "op"_attr = redact(op.toBSONForLogging())); return Status(ErrorCodes::UpdateOperationFailed, - str::stream() << msg << ": " << redact(op.toBSON())); + str::stream() + << msg << ": " << redact(op.toBSONForLogging())); } } } else if (mode == OplogApplication::Mode::kSecondary && !upsertOplogEntry && @@ -1446,7 +1449,7 @@ Status applyOperation_inlock(OperationContext* opCtx, // upsert. In steady state mode this is unexpected. LOGV2_WARNING(2170001, "update needed to be converted to upsert", - "op"_attr = redact(op.toBSON())); + "op"_attr = redact(op.toBSONForLogging())); opCounters->gotUpdateOnMissingDoc(); // We shouldn't be doing upserts in secondary mode when enforcing steady state @@ -1477,7 +1480,7 @@ Status applyOperation_inlock(OperationContext* opCtx, auto idField = o["_id"]; uassert(ErrorCodes::NoSuchKey, str::stream() << "Failed to apply delete due to missing _id: " - << redact(op.toBSON()), + << redact(op.toBSONForLogging()), !idField.eoo()); // The o field may contain additional fields besides the _id (like the shard key @@ -1502,7 +1505,7 @@ Status applyOperation_inlock(OperationContext* opCtx, LOGV2_WARNING(2170002, "Applied a delete which did not delete anything in steady state " "replication", - "op"_attr = redact(op.toBSON())); + "op"_attr = redact(op.toBSONForLogging())); if (collection) opCounters->gotDeleteWasEmpty(); else @@ -1511,7 +1514,7 @@ Status applyOperation_inlock(OperationContext* opCtx, uassert(collection ? ErrorCodes::NoSuchKey : ErrorCodes::NamespaceNotFound, str::stream() << "Applied a delete which did not delete anything in " "steady state replication : " - << redact(op.toBSON()), + << redact(op.toBSONForLogging()), !oplogApplicationEnforcesSteadyStateConstraints); } wuow.commit(); @@ -1539,7 +1542,7 @@ Status applyCommand_inlock(OperationContext* opCtx, "applying command op: {oplogEntry}, oplog application mode: " "{oplogApplicationMode}", "Applying command op", - "oplogEntry"_attr = redact(entry.toBSON()), + "oplogEntry"_attr = redact(entry.toBSONForLogging()), "oplogApplicationMode"_attr = OplogApplication::modeToString(mode)); // Only commands are processed here. @@ -1590,7 +1593,7 @@ Status applyCommand_inlock(OperationContext* opCtx, return Status(ErrorCodes::OplogOperationUnsupported, str::stream() << "Applying command to feature compatibility version " "collection not supported in initial sync: " - << redact(entry.toBSON())); + << redact(entry.toBSONForLogging())); } // Parse optime from oplog entry unless we are applying this command in standalone or on a @@ -1631,7 +1634,7 @@ Status applyCommand_inlock(OperationContext* opCtx, }(); invariant(!assignCommandTimestamp || !opTime.isNull(), str::stream() << "Oplog entry did not have 'ts' field when expected: " - << redact(entry.toBSON())); + << redact(entry.toBSONForLogging())); const Timestamp writeTime = (assignCommandTimestamp ? opTime.getTimestamp() : Timestamp()); @@ -1689,7 +1692,7 @@ Status applyCommand_inlock(OperationContext* opCtx, "Acceptable error during oplog application: background operation in " "progress for database", "db"_attr = nss.db(), - "oplogEntry"_attr = redact(entry.toBSON())); + "oplogEntry"_attr = redact(entry.toBSONForLogging())); break; } case ErrorCodes::BackgroundOperationInProgressForNamespace: { @@ -1734,7 +1737,7 @@ Status applyCommand_inlock(OperationContext* opCtx, "Acceptable error during oplog application: background operation in " "progress for namespace", "namespace"_attr = ns, - "oplogEntry"_attr = redact(entry.toBSON())); + "oplogEntry"_attr = redact(entry.toBSONForLogging())); break; } default: { @@ -1765,7 +1768,7 @@ Status applyCommand_inlock(OperationContext* opCtx, "Acceptable error during oplog application", "db"_attr = nss.db(), "error"_attr = status, - "oplogEntry"_attr = redact(entry.toBSON())); + "oplogEntry"_attr = redact(entry.toBSONForLogging())); opCounters->gotAcceptableErrorInCommand(); } else { LOGV2_DEBUG(51776, @@ -1773,7 +1776,7 @@ Status applyCommand_inlock(OperationContext* opCtx, "Acceptable error during oplog application", "db"_attr = nss.db(), "error"_attr = status, - "oplogEntry"_attr = redact(entry.toBSON())); + "oplogEntry"_attr = redact(entry.toBSONForLogging())); } } // fallthrough diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp index 83af80c9101..d6f5d6d4653 100644 --- a/src/mongo/db/repl/oplog_applier.cpp +++ b/src/mongo/db/repl/oplog_applier.cpp @@ -99,7 +99,7 @@ void OplogApplier::enqueue(OperationContext* opCtx, std::vector<OplogEntry>::const_iterator end) { OplogBuffer::Batch batch; for (auto i = begin; i != end; ++i) { - batch.push_back(i->getRaw()); + batch.push_back(i->getEntry().getRaw()); } enqueue(opCtx, batch.cbegin(), batch.cend()); } diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp index 99593165130..013fca2fdc8 100644 --- a/src/mongo/db/repl/oplog_applier_impl.cpp +++ b/src/mongo/db/repl/oplog_applier_impl.cpp @@ -394,9 +394,7 @@ void scheduleWritesToOplog(OperationContext* opCtx, std::vector<InsertStatement> docs; docs.reserve(end - begin); for (size_t i = begin; i < end; i++) { - // Add as unowned BSON to avoid unnecessary ref-count bumps. - // 'ops' will outlive 'docs' so the BSON lifetime will be guaranteed. - docs.emplace_back(InsertStatement{ops[i].getRaw(), + docs.emplace_back(InsertStatement{ops[i].getEntry().getRaw(), ops[i].getOpTime().getTimestamp(), ops[i].getOpTime().getTerm()}); } @@ -546,8 +544,8 @@ StatusWith<OpTime> OplogApplierImpl::_applyOplogBatch(OperationContext* opCtx, "{failedWriterThread}: {error}", "Failed to apply batch of operations", "numOperationsInBatch"_attr = ops.size(), - "firstOperation"_attr = redact(ops.front().toBSON()), - "lastOperation"_attr = redact(ops.back().toBSON()), + "firstOperation"_attr = redact(ops.front().toBSONForLogging()), + "lastOperation"_attr = redact(ops.back().toBSONForLogging()), "failedWriterThread"_attr = std::distance(statusVector.cbegin(), it), "error"_attr = redact(status)); return status; diff --git a/src/mongo/db/repl/oplog_applier_impl_test.cpp b/src/mongo/db/repl/oplog_applier_impl_test.cpp index cc42a35967c..6f0e83ef2a2 100644 --- a/src/mongo/db/repl/oplog_applier_impl_test.cpp +++ b/src/mongo/db/repl/oplog_applier_impl_test.cpp @@ -413,7 +413,7 @@ bool _testOplogEntryIsForCappedCollection(OperationContext* opCtx, createCollection(opCtx, nss, options); auto op = makeInsertDocumentOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss, BSON("a" << 1)); - ASSERT_FALSE(op.isForCappedCollection); + ASSERT_FALSE(op.isForCappedCollection()); NoopOplogApplierObserver observer; TrackOpsAppliedApplier oplogApplier( @@ -430,9 +430,9 @@ bool _testOplogEntryIsForCappedCollection(OperationContext* opCtx, ASSERT_EQUALS(1U, oplogApplier.operationsApplied.size()); const auto& opApplied = oplogApplier.operationsApplied.front(); - ASSERT_EQUALS(op, opApplied); + ASSERT_EQUALS(op.getEntry(), opApplied.getEntry()); // "isForCappedCollection" is not parsed from raw oplog entry document. - return opApplied.isForCappedCollection; + return opApplied.isForCappedCollection(); } TEST_F( @@ -600,7 +600,7 @@ TEST_F(MultiOplogEntryOplogApplierImplTest, MultiApplyUnpreparedTransactionSepar const auto expectedStartOpTime = _insertOp1->getOpTime(); ASSERT_OK(oplogApplier.applyOplogBatch(_opCtx.get(), {*_insertOp1})); ASSERT_EQ(1U, oplogDocs().size()); - ASSERT_BSONOBJ_EQ(oplogDocs().back(), _insertOp1->getRaw()); + ASSERT_BSONOBJ_EQ(oplogDocs().back(), _insertOp1->getEntry().toBSON()); ASSERT_TRUE(_insertedDocs[_nss1].empty()); ASSERT_TRUE(_insertedDocs[_nss2].empty()); checkTxnTable(_lsid, @@ -615,7 +615,7 @@ TEST_F(MultiOplogEntryOplogApplierImplTest, MultiApplyUnpreparedTransactionSepar // transaction. ASSERT_OK(oplogApplier.applyOplogBatch(_opCtx.get(), {*_insertOp2})); ASSERT_EQ(2U, oplogDocs().size()); - ASSERT_BSONOBJ_EQ(oplogDocs().back(), _insertOp2->getRaw()); + ASSERT_BSONOBJ_EQ(oplogDocs().back(), _insertOp2->getEntry().toBSON()); ASSERT_TRUE(_insertedDocs[_nss1].empty()); ASSERT_TRUE(_insertedDocs[_nss2].empty()); // The transaction table should not have been updated for partialTxn operations that are not the @@ -633,7 +633,7 @@ TEST_F(MultiOplogEntryOplogApplierImplTest, MultiApplyUnpreparedTransactionSepar ASSERT_EQ(3U, oplogDocs().size()); ASSERT_EQ(1U, _insertedDocs[_nss1].size()); ASSERT_EQ(2U, _insertedDocs[_nss2].size()); - ASSERT_BSONOBJ_EQ(oplogDocs().back(), _commitOp->getRaw()); + ASSERT_BSONOBJ_EQ(oplogDocs().back(), _commitOp->getEntry().toBSON()); checkTxnTable(_lsid, _txnNum, _commitOp->getOpTime(), @@ -969,8 +969,8 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionStea const auto expectedStartOpTime = _insertOp1->getOpTime(); ASSERT_OK(oplogApplier.applyOplogBatch(_opCtx.get(), {*_insertOp1, *_insertOp2})); ASSERT_EQ(2U, oplogDocs().size()); - ASSERT_BSONOBJ_EQ(_insertOp1->getRaw(), oplogDocs()[0]); - ASSERT_BSONOBJ_EQ(_insertOp2->getRaw(), oplogDocs()[1]); + ASSERT_BSONOBJ_EQ(_insertOp1->getEntry().toBSON(), oplogDocs()[0]); + ASSERT_BSONOBJ_EQ(_insertOp2->getEntry().toBSON(), oplogDocs()[1]); ASSERT_TRUE(_insertedDocs[_nss1].empty()); ASSERT_TRUE(_insertedDocs[_nss2].empty()); checkTxnTable(_lsid, @@ -985,7 +985,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionStea // nested insert in the prepare oplog entry. ASSERT_OK(oplogApplier.applyOplogBatch(_opCtx.get(), {*_prepareWithPrevOp})); ASSERT_EQ(3U, oplogDocs().size()); - ASSERT_BSONOBJ_EQ(_prepareWithPrevOp->getRaw(), oplogDocs().back()); + ASSERT_BSONOBJ_EQ(_prepareWithPrevOp->getEntry().toBSON(), oplogDocs().back()); ASSERT_EQ(1U, _insertedDocs[_nss1].size()); ASSERT_EQ(2U, _insertedDocs[_nss2].size()); checkTxnTable(_lsid, @@ -998,7 +998,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionStea // Apply a batch with only the commit. This should result in the commit being put in the // oplog, and the three previous entries being committed. ASSERT_OK(oplogApplier.applyOplogBatch(_opCtx.get(), {*_commitPrepareWithPrevOp})); - ASSERT_BSONOBJ_EQ(_commitPrepareWithPrevOp->getRaw(), oplogDocs().back()); + ASSERT_BSONOBJ_EQ(_commitPrepareWithPrevOp->getEntry().toBSON(), oplogDocs().back()); ASSERT_EQ(1U, _insertedDocs[_nss1].size()); ASSERT_EQ(2U, _insertedDocs[_nss2].size()); checkTxnTable(_lsid, @@ -1047,7 +1047,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortPreparedTransactio // Apply a batch with only the abort. This should result in the abort being put in the // oplog and the transaction table being updated accordingly. ASSERT_OK(oplogApplier.applyOplogBatch(_opCtx.get(), {*_abortPrepareWithPrevOp})); - ASSERT_BSONOBJ_EQ(_abortPrepareWithPrevOp->getRaw(), oplogDocs().back()); + ASSERT_BSONOBJ_EQ(_abortPrepareWithPrevOp->getEntry().toBSON(), oplogDocs().back()); ASSERT_EQ(1U, _insertedDocs[_nss1].size()); ASSERT_EQ(2U, _insertedDocs[_nss2].size()); checkTxnTable(_lsid, @@ -1075,8 +1075,8 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionInit const auto expectedStartOpTime = _insertOp1->getOpTime(); ASSERT_OK(oplogApplier.applyOplogBatch(_opCtx.get(), {*_insertOp1, *_insertOp2})); ASSERT_EQ(2U, oplogDocs().size()); - ASSERT_BSONOBJ_EQ(_insertOp1->getRaw(), oplogDocs()[0]); - ASSERT_BSONOBJ_EQ(_insertOp2->getRaw(), oplogDocs()[1]); + ASSERT_BSONOBJ_EQ(_insertOp1->getEntry().toBSON(), oplogDocs()[0]); + ASSERT_BSONOBJ_EQ(_insertOp2->getEntry().toBSON(), oplogDocs()[1]); ASSERT_TRUE(_insertedDocs[_nss1].empty()); ASSERT_TRUE(_insertedDocs[_nss2].empty()); checkTxnTable(_lsid, @@ -1090,7 +1090,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionInit // the oplog, but, since this is initial sync, nothing else. ASSERT_OK(oplogApplier.applyOplogBatch(_opCtx.get(), {*_prepareWithPrevOp})); ASSERT_EQ(3U, oplogDocs().size()); - ASSERT_BSONOBJ_EQ(_prepareWithPrevOp->getRaw(), oplogDocs().back()); + ASSERT_BSONOBJ_EQ(_prepareWithPrevOp->getEntry().toBSON(), oplogDocs().back()); ASSERT_TRUE(_insertedDocs[_nss1].empty()); ASSERT_TRUE(_insertedDocs[_nss2].empty()); checkTxnTable(_lsid, @@ -1103,7 +1103,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionInit // Apply a batch with only the commit. This should result in the commit being put in the // oplog, and the three previous entries being applied. ASSERT_OK(oplogApplier.applyOplogBatch(_opCtx.get(), {*_commitPrepareWithPrevOp})); - ASSERT_BSONOBJ_EQ(_commitPrepareWithPrevOp->getRaw(), oplogDocs().back()); + ASSERT_BSONOBJ_EQ(_commitPrepareWithPrevOp->getEntry().toBSON(), oplogDocs().back()); ASSERT_EQ(1U, _insertedDocs[_nss1].size()); ASSERT_EQ(2U, _insertedDocs[_nss2].size()); checkTxnTable(_lsid, @@ -1121,7 +1121,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionReco ASSERT_OK(getStorageInterface()->insertDocument( _opCtx.get(), NamespaceString::kRsOplogNamespace, - {entry.toBSON(), entry.getOpTime().getTimestamp()}, + {entry.getEntry().toBSON(), entry.getOpTime().getTimestamp()}, entry.getOpTime().getTerm())); } // Ignore docs inserted into oplog in setup. @@ -1196,7 +1196,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplySingleApplyOpsPreparedT // the oplog, and the nested insert being applied (but in a transaction). ASSERT_OK(oplogApplier.applyOplogBatch(_opCtx.get(), {*_singlePrepareApplyOp})); ASSERT_EQ(1U, oplogDocs().size()); - ASSERT_BSONOBJ_EQ(_singlePrepareApplyOp->getRaw(), oplogDocs().back()); + ASSERT_BSONOBJ_EQ(_singlePrepareApplyOp->getEntry().toBSON(), oplogDocs().back()); ASSERT_EQ(1U, _insertedDocs[_nss1].size()); checkTxnTable(_lsid, _txnNum, @@ -1208,7 +1208,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplySingleApplyOpsPreparedT // Apply a batch with only the commit. This should result in the commit being put in the // oplog, and prepared insert being committed. ASSERT_OK(oplogApplier.applyOplogBatch(_opCtx.get(), {*_commitSinglePrepareApplyOp})); - ASSERT_BSONOBJ_EQ(_commitSinglePrepareApplyOp->getRaw(), oplogDocs().back()); + ASSERT_BSONOBJ_EQ(_commitSinglePrepareApplyOp->getEntry().toBSON(), oplogDocs().back()); ASSERT_EQ(1U, _insertedDocs[_nss1].size()); ASSERT_TRUE(_insertedDocs[_nss2].empty()); checkTxnTable(_lsid, @@ -1245,7 +1245,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyEmptyApplyOpsPreparedTr // the oplog, and the nested insert being applied (but in a transaction). ASSERT_OK(oplogApplier.applyOplogBatch(_opCtx.get(), {emptyPrepareApplyOp})); ASSERT_EQ(1U, oplogDocs().size()); - ASSERT_BSONOBJ_EQ(emptyPrepareApplyOp.getRaw(), oplogDocs().back()); + ASSERT_BSONOBJ_EQ(emptyPrepareApplyOp.getEntry().toBSON(), oplogDocs().back()); ASSERT_TRUE(_insertedDocs[_nss1].empty()); checkTxnTable(_lsid, _txnNum, @@ -1257,7 +1257,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyEmptyApplyOpsPreparedTr // Apply a batch with only the commit. This should result in the commit being put in the // oplog, and prepared insert being committed. ASSERT_OK(oplogApplier.applyOplogBatch(_opCtx.get(), {*_commitSinglePrepareApplyOp})); - ASSERT_BSONOBJ_EQ(_commitSinglePrepareApplyOp->getRaw(), oplogDocs().back()); + ASSERT_BSONOBJ_EQ(_commitSinglePrepareApplyOp->getEntry().toBSON(), oplogDocs().back()); ASSERT_TRUE(_insertedDocs[_nss1].empty()); ASSERT_TRUE(_insertedDocs[_nss2].empty()); checkTxnTable(_lsid, @@ -1294,7 +1294,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortSingleApplyOpsPrep // Apply a batch with only the abort. This should result in the abort being put in the // oplog and the transaction table being updated accordingly. ASSERT_OK(oplogApplier.applyOplogBatch(_opCtx.get(), {*_abortSinglePrepareApplyOp})); - ASSERT_BSONOBJ_EQ(_abortSinglePrepareApplyOp->getRaw(), oplogDocs().back()); + ASSERT_BSONOBJ_EQ(_abortSinglePrepareApplyOp->getEntry().toBSON(), oplogDocs().back()); ASSERT_EQ(1U, _insertedDocs[_nss1].size()); ASSERT_TRUE(_insertedDocs[_nss2].empty()); checkTxnTable(_lsid, @@ -1324,7 +1324,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, // the oplog, but, since this is initial sync, nothing else. ASSERT_OK(oplogApplier.applyOplogBatch(_opCtx.get(), {*_singlePrepareApplyOp})); ASSERT_EQ(1U, oplogDocs().size()); - ASSERT_BSONOBJ_EQ(_singlePrepareApplyOp->getRaw(), oplogDocs().back()); + ASSERT_BSONOBJ_EQ(_singlePrepareApplyOp->getEntry().toBSON(), oplogDocs().back()); ASSERT_TRUE(_insertedDocs[_nss1].empty()); ASSERT_TRUE(_insertedDocs[_nss2].empty()); checkTxnTable(_lsid, @@ -1337,7 +1337,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, // Apply a batch with only the commit. This should result in the commit being put in the // oplog, and the previous entry being applied. ASSERT_OK(oplogApplier.applyOplogBatch(_opCtx.get(), {*_commitSinglePrepareApplyOp})); - ASSERT_BSONOBJ_EQ(_commitSinglePrepareApplyOp->getRaw(), oplogDocs().back()); + ASSERT_BSONOBJ_EQ(_commitSinglePrepareApplyOp->getEntry().toBSON(), oplogDocs().back()); ASSERT_EQ(1U, _insertedDocs[_nss1].size()); ASSERT_TRUE(_insertedDocs[_nss2].empty()); checkTxnTable(_lsid, @@ -1355,7 +1355,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, ASSERT_OK(getStorageInterface()->insertDocument( _opCtx.get(), NamespaceString::kRsOplogNamespace, - {entry.toBSON(), entry.getOpTime().getTimestamp()}, + {entry.getEntry().toBSON(), entry.getOpTime().getTimestamp()}, entry.getOpTime().getTerm())); } // Ignore docs inserted into oplog in setup. @@ -2428,24 +2428,25 @@ public: boost::optional<BSONObj> object2, const OperationSessionInfo& sessionInfo, Date_t wallClockTime) { - return repl::OplogEntry(opTime, // optime - boost::none, // hash - opType, // opType - ns, // namespace - boost::none, // uuid - boost::none, // fromMigrate - 0, // version - object, // o - object2, // o2 - sessionInfo, // sessionInfo - boost::none, // false - wallClockTime, // wall clock time - boost::none, // statement id - boost::none, // optime of previous write within same transaction - boost::none, // pre-image optime - boost::none, // post-image optime - boost::none, // ShardId of resharding recipient - boost::none); // _id + return {repl::DurableOplogEntry( + opTime, // optime + boost::none, // hash + opType, // opType + ns, // namespace + boost::none, // uuid + boost::none, // fromMigrate + 0, // version + object, // o + object2, // o2 + sessionInfo, // sessionInfo + boost::none, // false + wallClockTime, // wall clock time + boost::none, // statement id + boost::none, // optime of previous write within same transaction + boost::none, // pre-image optime + boost::none, // post-image optime + boost::none, // ShardId of resharding recipient + boost::none)}; // _id } /** @@ -2458,24 +2459,25 @@ public: boost::optional<BSONObj> object2, const OperationSessionInfo& sessionInfo, Date_t wallClockTime) { - return repl::OplogEntry(opTime, // optime - boost::none, // hash - opType, // opType - ns, // namespace - boost::none, // uuid - true, // fromMigrate - 0, // version - object, // o - object2, // o2 - sessionInfo, // sessionInfo - boost::none, // false - wallClockTime, // wall clock time - boost::none, // statement id - boost::none, // optime of previous write within same transaction - boost::none, // pre-image optime - boost::none, // post-image optime - boost::none, // ShardId of resharding recipient - boost::none); // _id + return {repl::DurableOplogEntry( + opTime, // optime + boost::none, // hash + opType, // opType + ns, // namespace + boost::none, // uuid + true, // fromMigrate + 0, // version + object, // o + object2, // o2 + sessionInfo, // sessionInfo + boost::none, // false + wallClockTime, // wall clock time + boost::none, // statement id + boost::none, // optime of previous write within same transaction + boost::none, // pre-image optime + boost::none, // post-image optime + boost::none, // ShardId of resharding recipient + boost::none)}; // _id } void checkTxnTable(const OperationSessionInfo& sessionInfo, @@ -2940,7 +2942,7 @@ TEST_F(OplogApplierImplTxnTableTest, SessionMigrationNoOpEntriesShouldUpdateTxnT {Timestamp(40, 0), 1}, repl::OpTypeEnum::kNoop, BSON("$sessionMigrateInfo" << 1), - innerOplog.toBSON(), + innerOplog.getEntry().toBSON(), insertSessionInfo, outerInsertDate); diff --git a/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp b/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp index 65bb60be1d9..52f145ffd74 100644 --- a/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp +++ b/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp @@ -274,11 +274,12 @@ Status OplogApplierImplTest::runOpsInitialSync(std::vector<OplogEntry> ops) { if (!applyResult.isOK()) { std::vector<BSONObj> docsFromOps; for (const auto& opForContext : ops) { - docsFromOps.push_back(opForContext.toBSON()); + docsFromOps.push_back(opForContext.getEntry().toBSON()); } auto status = applyResult.getStatus(); - return status.withContext(str::stream() << "failed to apply operation: " << op.toBSON() - << ". " << BSON("ops" << docsFromOps)); + return status.withContext(str::stream() + << "failed to apply operation: " << op.toBSONForLogging() + << ". " << BSON("ops" << docsFromOps)); } auto lastApplied = applyResult.getValue(); const bool orderedCommit = true; @@ -354,24 +355,24 @@ OplogEntry makeOplogEntry(OpTypeEnum opType, OptionalCollectionUUID uuid, BSONObj o, boost::optional<BSONObj> o2) { - return OplogEntry(OpTime(Timestamp(1, 1), 1), // optime - boost::none, // hash - opType, // opType - nss, // namespace - uuid, // uuid - boost::none, // fromMigrate - OplogEntry::kOplogVersion, // version - o, // o - o2, // o2 - {}, // sessionInfo - boost::none, // upsert - Date_t(), // wall clock time - boost::none, // statement id - boost::none, // optime of previous write within same transaction - boost::none, // pre-image optime - boost::none, // post-image optime - boost::none, // ShardId of resharding recipient - boost::none); // _id + return {DurableOplogEntry(OpTime(Timestamp(1, 1), 1), // optime + boost::none, // hash + opType, // opType + nss, // namespace + uuid, // uuid + boost::none, // fromMigrate + OplogEntry::kOplogVersion, // version + o, // o + o2, // o2 + {}, // sessionInfo + boost::none, // upsert + Date_t(), // wall clock time + boost::none, // statement id + boost::none, // optime of previous write within same transaction + boost::none, // pre-image optime + boost::none, // post-image optime + boost::none, // ShardId of resharding recipient + boost::none)}; // _id } OplogEntry makeOplogEntry(OpTypeEnum opType, NamespaceString nss, OptionalCollectionUUID uuid) { diff --git a/src/mongo/db/repl/oplog_applier_utils.cpp b/src/mongo/db/repl/oplog_applier_utils.cpp index 757eafa89be..82caae399c7 100644 --- a/src/mongo/db/repl/oplog_applier_utils.cpp +++ b/src/mongo/db/repl/oplog_applier_utils.cpp @@ -97,7 +97,7 @@ void OplogApplierUtils::processCrudOp(OperationContext* opCtx, if (op->getOpType() == OpTypeEnum::kInsert && collProperties.isCapped) { // Mark capped collection ops before storing them to ensure we do not attempt to // bulk insert them. - op->isForCappedCollection = true; + op->setIsForCappedCollection(true); } } @@ -201,7 +201,7 @@ Status OplogApplierUtils::applyOplogEntryOrGroupedInsertsCommon( if (opType == OpTypeEnum::kNoop) { incrementOpsAppliedStats(); return Status::OK(); - } else if (OplogEntry::isCrudOpType(opType)) { + } else if (DurableOplogEntry::isCrudOpType(opType)) { auto status = writeConflictRetry(opCtx, "applyOplogEntryOrGroupedInserts_CRUD", nss.ns(), [&] { // Need to throw instead of returning a status for it to be properly ignored. @@ -314,7 +314,7 @@ Status OplogApplierUtils::applyOplogBatchCommon( LOGV2_FATAL_CONTINUE(21237, "Error applying operation ({oplogEntry}): {error}", "Error applying operation", - "oplogEntry"_attr = redact(entry.toBSON()), + "oplogEntry"_attr = redact(entry.toBSONForLogging()), "error"_attr = causedBy(redact(status))); return status; } @@ -330,7 +330,7 @@ Status OplogApplierUtils::applyOplogBatchCommon( "writer worker caught exception: {error} on: {oplogEntry}", "Writer worker caught exception", "error"_attr = redact(e), - "oplogEntry"_attr = redact(entry.toBSON())); + "oplogEntry"_attr = redact(entry.toBSONForLogging())); return e.toStatus(); } } diff --git a/src/mongo/db/repl/oplog_batcher.cpp b/src/mongo/db/repl/oplog_batcher.cpp index a5ae7f3ca65..31071a90cb3 100644 --- a/src/mongo/db/repl/oplog_batcher.cpp +++ b/src/mongo/db/repl/oplog_batcher.cpp @@ -169,12 +169,12 @@ StatusWith<std::vector<OplogEntry>> OplogBatcher::getNextApplierBatch( message, "expectedVersion"_attr = OplogEntry::kOplogVersion, "foundVersion"_attr = entry.getVersion(), - "oplogEntry"_attr = redact(entry.toBSON())); + "oplogEntry"_attr = redact(entry.toBSONForLogging())); return {ErrorCodes::BadValue, str::stream() << message << ", expected oplog version " << OplogEntry::kOplogVersion << ", found version " << entry.getVersion() - << ", oplog entry: " << redact(entry.toBSON())}; + << ", oplog entry: " << redact(entry.toBSONForLogging())}; } if (batchLimits.slaveDelayLatestTimestamp) { diff --git a/src/mongo/db/repl/oplog_batcher_test_fixture.cpp b/src/mongo/db/repl/oplog_batcher_test_fixture.cpp index b85fccb8c39..b747ee1c369 100644 --- a/src/mongo/db/repl/oplog_batcher_test_fixture.cpp +++ b/src/mongo/db/repl/oplog_batcher_test_fixture.cpp @@ -170,46 +170,46 @@ Status OplogBufferMock::seekToTimestamp(OperationContext* opCtx, */ OplogEntry makeInsertOplogEntry(int t, const NamespaceString& nss, boost::optional<UUID> uuid) { BSONObj oField = BSON("_id" << t << "a" << t); - return OplogEntry(OpTime(Timestamp(t, 1), 1), // optime - boost::none, // hash - OpTypeEnum::kInsert, // op type - nss, // namespace - uuid, // uuid - boost::none, // fromMigrate - OplogEntry::kOplogVersion, // version - oField, // o - boost::none, // o2 - {}, // sessionInfo - boost::none, // upsert - Date_t() + Seconds(t), // wall clock time - boost::none, // statement id - boost::none, // optime of previous write within same transaction - boost::none, // pre-image optime - boost::none, // post-image optime - boost::none, // ShardId of resharding recipient - boost::none); // _id + return {DurableOplogEntry(OpTime(Timestamp(t, 1), 1), // optime + boost::none, // hash + OpTypeEnum::kInsert, // op type + nss, // namespace + uuid, // uuid + boost::none, // fromMigrate + OplogEntry::kOplogVersion, // version + oField, // o + boost::none, // o2 + {}, // sessionInfo + boost::none, // upsert + Date_t() + Seconds(t), // wall clock time + boost::none, // statement id + boost::none, // optime of previous write within same transaction + boost::none, // pre-image optime + boost::none, // post-image optime + boost::none, // ShardId of resharding recipient + boost::none)}; // _id } OplogEntry makeNoopOplogEntry(int t, const StringData& msg) { BSONObj oField = BSON("msg" << msg << "count" << t); - return OplogEntry(OpTime(Timestamp(t, 1), 1), // optime - boost::none, // hash - OpTypeEnum::kNoop, // op type - NamespaceString(""), // namespace - boost::none, // uuid - boost::none, // fromMigrate - OplogEntry::kOplogVersion, // version - oField, // o - boost::none, // o2 - {}, // sessionInfo - boost::none, // upsert - Date_t() + Seconds(t), // wall clock time - boost::none, // statement id - boost::none, // optime of previous write within same transaction - boost::none, // pre-image optime - boost::none, // post-image optime - boost::none, // ShardId of resharding recipient - boost::none); // _id + return {DurableOplogEntry(OpTime(Timestamp(t, 1), 1), // optime + boost::none, // hash + OpTypeEnum::kNoop, // op type + NamespaceString(""), // namespace + boost::none, // uuid + boost::none, // fromMigrate + OplogEntry::kOplogVersion, // version + oField, // o + boost::none, // o2 + {}, // sessionInfo + boost::none, // upsert + Date_t() + Seconds(t), // wall clock time + boost::none, // statement id + boost::none, // optime of previous write within same transaction + boost::none, // pre-image optime + boost::none, // post-image optime + boost::none, // ShardId of resharding recipient + boost::none)}; // _id } /** @@ -226,24 +226,24 @@ OplogEntry makeApplyOpsOplogEntry(int t, bool prepare, const std::vector<OplogEn if (prepare) { oField.append("prepare", true); } - return OplogEntry(OpTime(Timestamp(t, 1), 1), // optime - boost::none, // hash - OpTypeEnum::kCommand, // op type - nss, // namespace - boost::none, // uuid - boost::none, // fromMigrate - OplogEntry::kOplogVersion, // version - oField.obj(), // o - boost::none, // o2 - {}, // sessionInfo - boost::none, // upsert - Date_t() + Seconds(t), // wall clock time - boost::none, // statement id - boost::none, // optime of previous write within same transaction - boost::none, // pre-image optime - boost::none, // post-image optime - boost::none, // ShardId of resharding recipient - boost::none); // _id + return {DurableOplogEntry(OpTime(Timestamp(t, 1), 1), // optime + boost::none, // hash + OpTypeEnum::kCommand, // op type + nss, // namespace + boost::none, // uuid + boost::none, // fromMigrate + OplogEntry::kOplogVersion, // version + oField.obj(), // o + boost::none, // o2 + {}, // sessionInfo + boost::none, // upsert + Date_t() + Seconds(t), // wall clock time + boost::none, // statement id + boost::none, // optime of previous write within same transaction + boost::none, // pre-image optime + boost::none, // post-image optime + boost::none, // ShardId of resharding recipient + boost::none)}; // _id } /** @@ -260,24 +260,24 @@ OplogEntry makeCommitTransactionOplogEntry(int t, StringData dbName, bool prepar } else { oField = BSON("applyOps" << BSONArray() << "count" << count); } - return OplogEntry(OpTime(Timestamp(t, 1), 1), // optime - boost::none, // hash - OpTypeEnum::kCommand, // op type - nss, // namespace - boost::none, // uuid - boost::none, // fromMigrate - OplogEntry::kOplogVersion, // version - oField, // o - boost::none, // o2 - {}, // sessionInfo - boost::none, // upsert - Date_t() + Seconds(t), // wall clock time - boost::none, // statement id - boost::none, // optime of previous write within same transaction - boost::none, // pre-image optime - boost::none, // post-image optime - boost::none, // ShardId of resharding recipient - boost::none); // _id + return {DurableOplogEntry(OpTime(Timestamp(t, 1), 1), // optime + boost::none, // hash + OpTypeEnum::kCommand, // op type + nss, // namespace + boost::none, // uuid + boost::none, // fromMigrate + OplogEntry::kOplogVersion, // version + oField, // o + boost::none, // o2 + {}, // sessionInfo + boost::none, // upsert + Date_t() + Seconds(t), // wall clock time + boost::none, // statement id + boost::none, // optime of previous write within same transaction + boost::none, // pre-image optime + boost::none, // post-image optime + boost::none, // ShardId of resharding recipient + boost::none)}; // _id } /** @@ -321,24 +321,24 @@ OplogEntry makeLargeTransactionOplogEntries(int t, } oField = oFieldBuilder.obj(); } - return OplogEntry(OpTime(Timestamp(t, 1), 1), // optime - boost::none, // hash - OpTypeEnum::kCommand, // op type - nss, // namespace - boost::none, // uuid - boost::none, // fromMigrate - OplogEntry::kOplogVersion, // version - oField, // o - boost::none, // o2 - {}, // sessionInfo - boost::none, // upsert - Date_t() + Seconds(t), // wall clock time - boost::none, // statement id - prevWriteOpTime, // optime of previous write within same transaction - boost::none, // pre-image optime - boost::none, // post-image optime - boost::none, // ShardId of resharding recipient - boost::none); // _id + return {DurableOplogEntry(OpTime(Timestamp(t, 1), 1), // optime + boost::none, // hash + OpTypeEnum::kCommand, // op type + nss, // namespace + boost::none, // uuid + boost::none, // fromMigrate + OplogEntry::kOplogVersion, // version + oField, // o + boost::none, // o2 + {}, // sessionInfo + boost::none, // upsert + Date_t() + Seconds(t), // wall clock time + boost::none, // statement id + prevWriteOpTime, // optime of previous write within same transaction + boost::none, // pre-image optime + boost::none, // post-image optime + boost::none, // ShardId of resharding recipient + boost::none)}; // _id } /** @@ -386,7 +386,7 @@ std::string toString(const std::vector<OplogEntry>& ops) { StringBuilder sb; sb << "["; for (const auto& op : ops) { - sb << " " << op.toString(); + sb << " " << op.toStringForLogging(); } sb << " ]"; return sb.str(); diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp index 0c89412f0c9..e6d6350fb11 100644 --- a/src/mongo/db/repl/oplog_entry.cpp +++ b/src/mongo/db/repl/oplog_entry.cpp @@ -43,42 +43,42 @@ namespace repl { namespace { -OplogEntry::CommandType parseCommandType(const BSONObj& objectField) { +DurableOplogEntry::CommandType parseCommandType(const BSONObj& objectField) { StringData commandString(objectField.firstElementFieldName()); if (commandString == "create") { - return OplogEntry::CommandType::kCreate; + return DurableOplogEntry::CommandType::kCreate; } else if (commandString == "renameCollection") { - return OplogEntry::CommandType::kRenameCollection; + return DurableOplogEntry::CommandType::kRenameCollection; } else if (commandString == "drop") { - return OplogEntry::CommandType::kDrop; + return DurableOplogEntry::CommandType::kDrop; } else if (commandString == "collMod") { - return OplogEntry::CommandType::kCollMod; + return DurableOplogEntry::CommandType::kCollMod; } else if (commandString == "applyOps") { - return OplogEntry::CommandType::kApplyOps; + return DurableOplogEntry::CommandType::kApplyOps; } else if (commandString == "dbCheck") { - return OplogEntry::CommandType::kDbCheck; + return DurableOplogEntry::CommandType::kDbCheck; } else if (commandString == "dropDatabase") { - return OplogEntry::CommandType::kDropDatabase; + return DurableOplogEntry::CommandType::kDropDatabase; } else if (commandString == "emptycapped") { - return OplogEntry::CommandType::kEmptyCapped; + return DurableOplogEntry::CommandType::kEmptyCapped; } else if (commandString == "createIndexes") { - return OplogEntry::CommandType::kCreateIndexes; + return DurableOplogEntry::CommandType::kCreateIndexes; } else if (commandString == "startIndexBuild") { - return OplogEntry::CommandType::kStartIndexBuild; + return DurableOplogEntry::CommandType::kStartIndexBuild; } else if (commandString == "commitIndexBuild") { - return OplogEntry::CommandType::kCommitIndexBuild; + return DurableOplogEntry::CommandType::kCommitIndexBuild; } else if (commandString == "abortIndexBuild") { - return OplogEntry::CommandType::kAbortIndexBuild; + return DurableOplogEntry::CommandType::kAbortIndexBuild; } else if (commandString == "dropIndexes") { - return OplogEntry::CommandType::kDropIndexes; + return DurableOplogEntry::CommandType::kDropIndexes; } else if (commandString == "deleteIndexes") { - return OplogEntry::CommandType::kDropIndexes; + return DurableOplogEntry::CommandType::kDropIndexes; } else if (commandString == "commitTransaction") { - return OplogEntry::CommandType::kCommitTransaction; + return DurableOplogEntry::CommandType::kCommitTransaction; } else if (commandString == "abortTransaction") { - return OplogEntry::CommandType::kAbortTransaction; + return DurableOplogEntry::CommandType::kAbortTransaction; } else if (commandString == "importCollection") { - return OplogEntry::CommandType::kImportCollection; + return DurableOplogEntry::CommandType::kImportCollection; } else { uasserted(ErrorCodes::BadValue, str::stream() << "Unknown oplog entry command type: " << commandString @@ -160,8 +160,6 @@ BSONObj makeOplogEntryDoc(OpTime opTime, } // namespace -const int MutableOplogEntry::kOplogVersion = 2; - // Static ReplOperation MutableOplogEntry::makeInsertOperation(const NamespaceString& nss, UUID uuid, @@ -277,21 +275,21 @@ OpTime MutableOplogEntry::getOpTime() const { return OpTime(getTimestamp(), term); } -size_t OplogEntry::getDurableReplOperationSize(const DurableReplOperation& op) { +size_t DurableOplogEntry::getDurableReplOperationSize(const DurableReplOperation& op) { return sizeof(op) + op.getNss().size() + op.getObject().objsize() + (op.getObject2() ? op.getObject2()->objsize() : 0); } -StatusWith<OplogEntry> OplogEntry::parse(const BSONObj& object) { +StatusWith<DurableOplogEntry> DurableOplogEntry::parse(const BSONObj& object) { try { - return OplogEntry(object); + return DurableOplogEntry(object); } catch (...) { return exceptionToStatus(); } MONGO_UNREACHABLE; } -OplogEntry::OplogEntry(BSONObj rawInput) : _raw(std::move(rawInput)) { +DurableOplogEntry::DurableOplogEntry(BSONObj rawInput) : _raw(std::move(rawInput)) { _raw = _raw.getOwned(); parseProtected(IDLParserErrorContext("OplogEntryBase"), _raw); @@ -302,49 +300,49 @@ OplogEntry::OplogEntry(BSONObj rawInput) : _raw(std::move(rawInput)) { } } -OplogEntry::OplogEntry(OpTime opTime, - const boost::optional<int64_t> hash, - OpTypeEnum opType, - const NamespaceString& nss, - const boost::optional<UUID>& uuid, - const boost::optional<bool>& fromMigrate, - int version, - const BSONObj& oField, - const boost::optional<BSONObj>& o2Field, - const OperationSessionInfo& sessionInfo, - const boost::optional<bool>& isUpsert, - const mongo::Date_t& wallClockTime, - const boost::optional<StmtId>& statementId, - const boost::optional<OpTime>& prevWriteOpTimeInTransaction, - const boost::optional<OpTime>& preImageOpTime, - const boost::optional<OpTime>& postImageOpTime, - const boost::optional<ShardId>& destinedRecipient, - const boost::optional<Value>& idField) - : OplogEntry(makeOplogEntryDoc(opTime, - hash, - opType, - nss, - uuid, - fromMigrate, - version, - oField, - o2Field, - sessionInfo, - isUpsert, - wallClockTime, - statementId, - prevWriteOpTimeInTransaction, - preImageOpTime, - postImageOpTime, - destinedRecipient, - idField)) {} - -bool OplogEntry::isCommand() const { +DurableOplogEntry::DurableOplogEntry(OpTime opTime, + const boost::optional<int64_t> hash, + OpTypeEnum opType, + const NamespaceString& nss, + const boost::optional<UUID>& uuid, + const boost::optional<bool>& fromMigrate, + int version, + const BSONObj& oField, + const boost::optional<BSONObj>& o2Field, + const OperationSessionInfo& sessionInfo, + const boost::optional<bool>& isUpsert, + const mongo::Date_t& wallClockTime, + const boost::optional<StmtId>& statementId, + const boost::optional<OpTime>& prevWriteOpTimeInTransaction, + const boost::optional<OpTime>& preImageOpTime, + const boost::optional<OpTime>& postImageOpTime, + const boost::optional<ShardId>& destinedRecipient, + const boost::optional<Value>& idField) + : DurableOplogEntry(makeOplogEntryDoc(opTime, + hash, + opType, + nss, + uuid, + fromMigrate, + version, + oField, + o2Field, + sessionInfo, + isUpsert, + wallClockTime, + statementId, + prevWriteOpTimeInTransaction, + preImageOpTime, + postImageOpTime, + destinedRecipient, + idField)) {} + +bool DurableOplogEntry::isCommand() const { return getOpType() == OpTypeEnum::kCommand; } // static -bool OplogEntry::isCrudOpType(OpTypeEnum opType) { +bool DurableOplogEntry::isCrudOpType(OpTypeEnum opType) { switch (opType) { case OpTypeEnum::kInsert: case OpTypeEnum::kDelete: @@ -357,16 +355,16 @@ bool OplogEntry::isCrudOpType(OpTypeEnum opType) { MONGO_UNREACHABLE; } -bool OplogEntry::isCrudOpType() const { +bool DurableOplogEntry::isCrudOpType() const { return isCrudOpType(getOpType()); } -bool OplogEntry::shouldPrepare() const { +bool DurableOplogEntry::shouldPrepare() const { return getCommandType() == CommandType::kApplyOps && getObject()[ApplyOpsCommandInfoBase::kPrepareFieldName].booleanSafe(); } -bool OplogEntry::isSingleOplogEntryTransaction() const { +bool DurableOplogEntry::isSingleOplogEntryTransaction() const { if (getCommandType() != CommandType::kApplyOps || !getTxnNumber() || !getSessionId() || getObject()[ApplyOpsCommandInfoBase::kPartialTxnFieldName].booleanSafe()) { return false; @@ -379,7 +377,7 @@ bool OplogEntry::isSingleOplogEntryTransaction() const { return prevOptimeOpt->isNull(); } -bool OplogEntry::isEndOfLargeTransaction() const { +bool DurableOplogEntry::isEndOfLargeTransaction() const { if (getCommandType() != CommandType::kApplyOps) { // If the oplog entry is neither commit nor abort, then it must be an applyOps. Otherwise, // it cannot be a termainal oplog entry of a large transaction. @@ -397,7 +395,7 @@ bool OplogEntry::isEndOfLargeTransaction() const { return !prevOptimeOpt->isNull() && !isPartialTransaction(); } -bool OplogEntry::isSingleOplogEntryTransactionWithCommand() const { +bool DurableOplogEntry::isSingleOplogEntryTransactionWithCommand() const { if (!isSingleOplogEntryTransaction()) { return false; } @@ -416,7 +414,7 @@ bool OplogEntry::isSingleOplogEntryTransactionWithCommand() const { return false; } -bool OplogEntry::isIndexCommandType() const { +bool DurableOplogEntry::isIndexCommandType() const { return getOpType() == OpTypeEnum::kCommand && ((getCommandType() == CommandType::kCreateIndexes) || (getCommandType() == CommandType::kStartIndexBuild) || @@ -425,7 +423,7 @@ bool OplogEntry::isIndexCommandType() const { (getCommandType() == CommandType::kDropIndexes)); } -BSONElement OplogEntry::getIdElement() const { +BSONElement DurableOplogEntry::getIdElement() const { invariant(isCrudOpType()); if (getOpType() == OpTypeEnum::kUpdate) { // We cannot use getObjectContainingDocumentKey() here because the BSONObj will go out @@ -437,11 +435,11 @@ BSONElement OplogEntry::getIdElement() const { } } -BSONObj OplogEntry::getOperationToApply() const { +BSONObj DurableOplogEntry::getOperationToApply() const { return getObject(); } -BSONObj OplogEntry::getObjectContainingDocumentKey() const { +BSONObj DurableOplogEntry::getObjectContainingDocumentKey() const { invariant(isCrudOpType()); if (getOpType() == OpTypeEnum::kUpdate) { fassert(31081, getObject2() != boost::none); @@ -451,25 +449,231 @@ BSONObj OplogEntry::getObjectContainingDocumentKey() const { } } -OplogEntry::CommandType OplogEntry::getCommandType() const { +DurableOplogEntry::CommandType DurableOplogEntry::getCommandType() const { return _commandType; } -int OplogEntry::getRawObjSizeBytes() const { +int DurableOplogEntry::getRawObjSizeBytes() const { return _raw.objsize(); } -std::string OplogEntry::toString() const { +std::string DurableOplogEntry::toString() const { return _raw.toString(); } -std::ostream& operator<<(std::ostream& s, const OplogEntry& o) { +std::ostream& operator<<(std::ostream& s, const DurableOplogEntry& o) { return s << o.toString(); } +std::ostream& operator<<(std::ostream& s, const OplogEntry& o) { + return s << o.toStringForLogging(); +} + std::ostream& operator<<(std::ostream& s, const ReplOperation& o) { return s << o.toBSON().toString(); } +OplogEntry::OplogEntry(DurableOplogEntry entry) : _entry(std::move(entry)) {} + +OplogEntry::OplogEntry(const BSONObj& entry) + : OplogEntry(uassertStatusOK(DurableOplogEntry::parse(entry))) {} +void OplogEntry::setEntry(DurableOplogEntry entry) { + _entry = std::move(entry); +} + +bool operator==(const OplogEntry& lhs, const OplogEntry& rhs) { + if (lhs.isForCappedCollection() != rhs.isForCappedCollection()) { + return false; + } + + return lhs.getEntry() == rhs.getEntry(); +} + +StatusWith<OplogEntry> OplogEntry::parse(const BSONObj& object) { + auto parseStatus = DurableOplogEntry::parse(object); + + if (!parseStatus.isOK()) { + return parseStatus; + } + + return OplogEntry(std::move(parseStatus.getValue())); +} +std::string OplogEntry::toStringForLogging() const { + return toBSONForLogging().toString(); +} +BSONObj OplogEntry::toBSONForLogging() const { + BSONObjBuilder builder; + builder.append("oplogEntry", _entry.toBSON()); + + if (_isForCappedCollection) { + builder.append("isForCappedCollection", *_isForCappedCollection); + } + + return builder.obj(); +} + +bool OplogEntry::isForCappedCollection() const { + return _isForCappedCollection.get_value_or(false); +} + +void OplogEntry::setIsForCappedCollection(bool isForCappedCollection) { + _isForCappedCollection = isForCappedCollection; +} + +const boost::optional<mongo::Value>& OplogEntry::get_id() const& { + return _entry.get_id(); +} + +const boost::optional<std::int32_t> OplogEntry::getStatementId() const& { + return _entry.getStatementId(); +} + +const OperationSessionInfo& OplogEntry::getOperationSessionInfo() const { + return _entry.getOperationSessionInfo(); +} +const boost::optional<mongo::LogicalSessionId>& OplogEntry::getSessionId() const { + return _entry.getSessionId(); +} + +const boost::optional<std::int64_t> OplogEntry::getTxnNumber() const { + return _entry.getTxnNumber(); +} + +const DurableReplOperation& OplogEntry::getDurableReplOperation() const { + return _entry.getDurableReplOperation(); +} + +mongo::repl::OpTypeEnum OplogEntry::getOpType() const { + return _entry.getOpType(); +} + +const mongo::NamespaceString& OplogEntry::getNss() const { + return _entry.getNss(); +} + +const boost::optional<mongo::UUID>& OplogEntry::getUuid() const { + return _entry.getUuid(); +} + +const mongo::BSONObj& OplogEntry::getObject() const { + return _entry.getObject(); +} + +const boost::optional<mongo::BSONObj>& OplogEntry::getObject2() const { + return _entry.getObject2(); +} + +const boost::optional<bool> OplogEntry::getUpsert() const { + return _entry.getUpsert(); +} + +const boost::optional<mongo::repl::OpTime>& OplogEntry::getPreImageOpTime() const { + return _entry.getPreImageOpTime(); +} + +const boost::optional<mongo::ShardId>& OplogEntry::getDestinedRecipient() const { + return _entry.getDestinedRecipient(); +} + +const mongo::Timestamp& OplogEntry::getTimestamp() const { + return _entry.getTimestamp(); +} + +const boost::optional<std::int64_t> OplogEntry::getTerm() const { + return _entry.getTerm(); +} + +const mongo::Date_t& OplogEntry::getWallClockTime() const { + return _entry.getWallClockTime(); +} + +const boost::optional<std::int64_t> OplogEntry::getHash() const& { + return _entry.getHash(); +} + +std::int64_t OplogEntry::getVersion() const { + return _entry.getVersion(); +} + +const boost::optional<bool> OplogEntry::getFromMigrate() const& { + return _entry.getFromMigrate(); +} + +const boost::optional<mongo::UUID>& OplogEntry::getFromTenantMigration() const& { + return _entry.getFromTenantMigration(); +} + +const boost::optional<mongo::repl::OpTime>& OplogEntry::getPrevWriteOpTimeInTransaction() const& { + return _entry.getPrevWriteOpTimeInTransaction(); +} + +const boost::optional<mongo::repl::OpTime>& OplogEntry::getPostImageOpTime() const& { + return _entry.getPostImageOpTime(); +} + +OpTime OplogEntry::getOpTime() const { + return _entry.getOpTime(); +} + +bool OplogEntry::isCommand() const { + return _entry.isCommand(); +} + +bool OplogEntry::isPartialTransaction() const { + return _entry.isPartialTransaction(); +} + +bool OplogEntry::isEndOfLargeTransaction() const { + return _entry.isEndOfLargeTransaction(); +} + +bool OplogEntry::isPreparedCommit() const { + return _entry.isPreparedCommit(); +} + +bool OplogEntry::isTerminalApplyOps() const { + return _entry.isTerminalApplyOps(); +} + +bool OplogEntry::isSingleOplogEntryTransaction() const { + return _entry.isSingleOplogEntryTransaction(); +} + +bool OplogEntry::isSingleOplogEntryTransactionWithCommand() const { + return _entry.isSingleOplogEntryTransactionWithCommand(); +} + +bool OplogEntry::isCrudOpType() const { + return _entry.isCrudOpType(); +} + +bool OplogEntry::isIndexCommandType() const { + return _entry.isIndexCommandType(); +} + +bool OplogEntry::shouldPrepare() const { + return _entry.shouldPrepare(); +} + +BSONElement OplogEntry::getIdElement() const { + return _entry.getIdElement(); +} + +BSONObj OplogEntry::getOperationToApply() const { + return _entry.getOperationToApply(); +} + +BSONObj OplogEntry::getObjectContainingDocumentKey() const { + return _entry.getObjectContainingDocumentKey(); +} + +OplogEntry::CommandType OplogEntry::getCommandType() const { + return _entry.getCommandType(); +} + +int OplogEntry::getRawObjSizeBytes() const { + return _entry.getRawObjSizeBytes(); +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h index 77f0933a116..125f401b509 100644 --- a/src/mongo/db/repl/oplog_entry.h +++ b/src/mongo/db/repl/oplog_entry.h @@ -86,7 +86,7 @@ private: class MutableOplogEntry : public OplogEntryBase { public: // Current oplog version, should be the value of the v field in all oplog entries. - static const int kOplogVersion; + static constexpr int kOplogVersion = 2; // Helpers to generate ReplOperation. static ReplOperation makeInsertOperation(const NamespaceString& nss, @@ -199,7 +199,7 @@ public: * A parsed oplog entry that privately inherits from the MutableOplogEntry. * This class is immutable. All setters are hidden. */ -class OplogEntry : private MutableOplogEntry { +class DurableOplogEntry : private MutableOplogEntry { public: // Make field names accessible. using MutableOplogEntry::k_idFieldName; @@ -284,34 +284,31 @@ public: // Get the in-memory size in bytes of a ReplOperation. static size_t getDurableReplOperationSize(const DurableReplOperation& op); - static StatusWith<OplogEntry> parse(const BSONObj& object); - - OplogEntry(OpTime opTime, - const boost::optional<int64_t> hash, - OpTypeEnum opType, - const NamespaceString& nss, - const boost::optional<UUID>& uuid, - const boost::optional<bool>& fromMigrate, - int version, - const BSONObj& oField, - const boost::optional<BSONObj>& o2Field, - const OperationSessionInfo& sessionInfo, - const boost::optional<bool>& isUpsert, - const mongo::Date_t& wallClockTime, - const boost::optional<StmtId>& statementId, - const boost::optional<OpTime>& prevWriteOpTimeInTransaction, - const boost::optional<OpTime>& preImageOpTime, - const boost::optional<OpTime>& postImageOpTime, - const boost::optional<ShardId>& destinedRecipient, - const boost::optional<Value>& idField); + static StatusWith<DurableOplogEntry> parse(const BSONObj& object); + + DurableOplogEntry(OpTime opTime, + const boost::optional<int64_t> hash, + OpTypeEnum opType, + const NamespaceString& nss, + const boost::optional<UUID>& uuid, + const boost::optional<bool>& fromMigrate, + int version, + const BSONObj& oField, + const boost::optional<BSONObj>& o2Field, + const OperationSessionInfo& sessionInfo, + const boost::optional<bool>& isUpsert, + const mongo::Date_t& wallClockTime, + const boost::optional<StmtId>& statementId, + const boost::optional<OpTime>& prevWriteOpTimeInTransaction, + const boost::optional<OpTime>& preImageOpTime, + const boost::optional<OpTime>& postImageOpTime, + const boost::optional<ShardId>& destinedRecipient, + const boost::optional<Value>& idField); // DEPRECATED: This constructor can throw. Use static parse method instead. - explicit OplogEntry(BSONObj raw); - - OplogEntry() = delete; + explicit DurableOplogEntry(BSONObj raw); - // This member is not parsed from the BSON and is instead populated by fillWriterVectors. - bool isForCappedCollection = false; + DurableOplogEntry() = delete; /** * Returns if the oplog entry is for a command operation. @@ -339,7 +336,7 @@ public: * Returns if this is a prepared 'commitTransaction' oplog entry. */ bool isPreparedCommit() const { - return getCommandType() == OplogEntry::CommandType::kCommitTransaction; + return getCommandType() == DurableOplogEntry::CommandType::kCommitTransaction; } /** @@ -348,7 +345,7 @@ public: * prepared transaction or a non-final applyOps in a transaction. */ bool isTerminalApplyOps() const { - return getCommandType() == OplogEntry::CommandType::kApplyOps && !shouldPrepare() && + return getCommandType() == DurableOplogEntry::CommandType::kApplyOps && !shouldPrepare() && !isPartialTransaction() && !getObject().getBoolField("prepare"); } @@ -408,12 +405,12 @@ public: CommandType getCommandType() const; /** - * Returns the size of the original document used to create this OplogEntry. + * Returns the size of the original document used to create this DurableOplogEntry. */ int getRawObjSizeBytes() const; /** - * Returns the original document used to create this OplogEntry. + * Returns the original document used to create this DurableOplogEntry. */ const BSONObj& getRaw() const { return _raw; @@ -433,12 +430,123 @@ private: CommandType _commandType = CommandType::kNotCommand; }; +/** + * Data structure that holds a DurableOplogEntry and other different run time state variables. + */ +class OplogEntry { +public: + using CommandType = DurableOplogEntry::CommandType; + static constexpr auto k_idFieldName = DurableOplogEntry::k_idFieldName; + static constexpr auto kDestinedRecipientFieldName = + DurableOplogEntry::kDestinedRecipientFieldName; + static constexpr auto kDurableReplOperationFieldName = + DurableOplogEntry::kDurableReplOperationFieldName; + static constexpr auto kFromMigrateFieldName = DurableOplogEntry::kFromMigrateFieldName; + static constexpr auto kFromTenantMigrationFieldName = + DurableOplogEntry::kFromTenantMigrationFieldName; + static constexpr auto kHashFieldName = DurableOplogEntry::kHashFieldName; + static constexpr auto kNssFieldName = DurableOplogEntry::kNssFieldName; + static constexpr auto kObject2FieldName = DurableOplogEntry::kObject2FieldName; + static constexpr auto kObjectFieldName = DurableOplogEntry::kObjectFieldName; + static constexpr auto kOperationSessionInfoFieldName = + DurableOplogEntry::kOperationSessionInfoFieldName; + static constexpr auto kOplogVersion = DurableOplogEntry::kOplogVersion; + static constexpr auto kOpTypeFieldName = DurableOplogEntry::kOpTypeFieldName; + static constexpr auto kPostImageOpTimeFieldName = DurableOplogEntry::kPostImageOpTimeFieldName; + static constexpr auto kPreImageOpTimeFieldName = DurableOplogEntry::kPreImageOpTimeFieldName; + static constexpr auto kPrevWriteOpTimeInTransactionFieldName = + DurableOplogEntry::kPrevWriteOpTimeInTransactionFieldName; + static constexpr auto kSessionIdFieldName = DurableOplogEntry::kSessionIdFieldName; + static constexpr auto kStatementIdFieldName = DurableOplogEntry::kStatementIdFieldName; + static constexpr auto kTermFieldName = DurableOplogEntry::kTermFieldName; + static constexpr auto kTimestampFieldName = DurableOplogEntry::kTimestampFieldName; + static constexpr auto kTxnNumberFieldName = DurableOplogEntry::kTxnNumberFieldName; + static constexpr auto kUpsertFieldName = DurableOplogEntry::kUpsertFieldName; + static constexpr auto kUuidFieldName = DurableOplogEntry::kUuidFieldName; + static constexpr auto kVersionFieldName = DurableOplogEntry::kVersionFieldName; + static constexpr auto kWallClockTimeFieldName = DurableOplogEntry::kWallClockTimeFieldName; + + OplogEntry(DurableOplogEntry oplog); + OplogEntry(const BSONObj& oplog); + + const DurableOplogEntry& getEntry() const { + return _entry; + } + + void setEntry(DurableOplogEntry oplog); + + /** + * Note: will only parse fields included in DurableOplogEntry. + */ + static StatusWith<OplogEntry> parse(const BSONObj& object); + + bool isForCappedCollection() const; + void setIsForCappedCollection(bool isForCappedCollection); + + std::string toStringForLogging() const; + + /** + * Returns the BSON representation for diagnostic purposes. To get a BSON meant for storing to + * the oplog collection, use getEntry().toBSON() instead. + */ + BSONObj toBSONForLogging() const; + + // Wrapper methods for DurableOplogEntry + const boost::optional<mongo::Value>& get_id() const&; + const boost::optional<std::int32_t> getStatementId() const&; + const OperationSessionInfo& getOperationSessionInfo() const; + const boost::optional<mongo::LogicalSessionId>& getSessionId() const; + const boost::optional<std::int64_t> getTxnNumber() const; + const DurableReplOperation& getDurableReplOperation() const; + mongo::repl::OpTypeEnum getOpType() const; + const mongo::NamespaceString& getNss() const; + const boost::optional<mongo::UUID>& getUuid() const; + const mongo::BSONObj& getObject() const; + const boost::optional<mongo::BSONObj>& getObject2() const; + const boost::optional<bool> getUpsert() const; + const boost::optional<mongo::repl::OpTime>& getPreImageOpTime() const; + const boost::optional<mongo::ShardId>& getDestinedRecipient() const; + const mongo::Timestamp& getTimestamp() const; + const boost::optional<std::int64_t> getTerm() const; + const mongo::Date_t& getWallClockTime() const; + const boost::optional<std::int64_t> getHash() const&; + std::int64_t getVersion() const; + const boost::optional<bool> getFromMigrate() const&; + const boost::optional<mongo::UUID>& getFromTenantMigration() const&; + const boost::optional<mongo::repl::OpTime>& getPrevWriteOpTimeInTransaction() const&; + const boost::optional<mongo::repl::OpTime>& getPostImageOpTime() const&; + OpTime getOpTime() const; + bool isCommand() const; + bool isPartialTransaction() const; + bool isEndOfLargeTransaction() const; + bool isPreparedCommit() const; + bool isTerminalApplyOps() const; + bool isSingleOplogEntryTransaction() const; + bool isSingleOplogEntryTransactionWithCommand() const; + bool isCrudOpType() const; + bool isIndexCommandType() const; + bool shouldPrepare() const; + BSONElement getIdElement() const; + BSONObj getOperationToApply() const; + BSONObj getObjectContainingDocumentKey() const; + OplogEntry::CommandType getCommandType() const; + int getRawObjSizeBytes() const; + +private: + DurableOplogEntry _entry; + + boost::optional<bool> _isForCappedCollection; +}; + +std::ostream& operator<<(std::ostream& s, const DurableOplogEntry& o); std::ostream& operator<<(std::ostream& s, const OplogEntry& o); -inline bool operator==(const OplogEntry& lhs, const OplogEntry& rhs) { +inline bool operator==(const DurableOplogEntry& lhs, const DurableOplogEntry& rhs) { return SimpleBSONObjComparator::kInstance.evaluate(lhs.getRaw() == rhs.getRaw()); } +bool operator==(const OplogEntry& lhs, const OplogEntry& rhs); + std::ostream& operator<<(std::ostream& s, const ReplOperation& o); } // namespace repl diff --git a/src/mongo/db/repl/oplog_entry_or_grouped_inserts.cpp b/src/mongo/db/repl/oplog_entry_or_grouped_inserts.cpp index feba02a5ed5..be04c9a8463 100644 --- a/src/mongo/db/repl/oplog_entry_or_grouped_inserts.cpp +++ b/src/mongo/db/repl/oplog_entry_or_grouped_inserts.cpp @@ -34,7 +34,7 @@ namespace mongo { namespace repl { BSONObj OplogEntryOrGroupedInserts::toBSON() const { if (!isGroupedInserts()) - return getOp().toBSON(); + return getOp().getEntry().toBSON(); // Since we found more than one document, create grouped insert of many docs. // We are going to group many 'i' ops into one big 'i' op, with array fields for @@ -78,7 +78,7 @@ BSONObj OplogEntryOrGroupedInserts::toBSON() const { } // Generate an op object of all elements except for "ts", "t", and "o", since we // need to make those fields arrays of all the ts's, t's, and o's. - groupedInsertBuilder.appendElementsUnique(getOp().toBSON()); + groupedInsertBuilder.appendElementsUnique(getOp().getEntry().toBSON()); return groupedInsertBuilder.obj(); } } // namespace repl diff --git a/src/mongo/db/repl/oplog_entry_test_helpers.cpp b/src/mongo/db/repl/oplog_entry_test_helpers.cpp index 847b5517c98..a68d4824f9b 100644 --- a/src/mongo/db/repl/oplog_entry_test_helpers.cpp +++ b/src/mongo/db/repl/oplog_entry_test_helpers.cpp @@ -60,24 +60,24 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime, boost::optional<StmtId> stmtId, boost::optional<UUID> uuid, boost::optional<OpTime> prevOpTime) { - return repl::OplogEntry(opTime, // optime - boost::none, // hash - opType, // opType - nss, // namespace - uuid, // uuid - boost::none, // fromMigrate - repl::OplogEntry::kOplogVersion, // version - object, // o - object2, // o2 - sessionInfo, // sessionInfo - boost::none, // upsert - wallClockTime, // wall clock time - stmtId, // statement id - prevOpTime, // optime of previous write within same transaction - boost::none, // pre-image optime - boost::none, // post-image optime - boost::none, // ShardId of resharding recipient - boost::none); // _id + return {repl::DurableOplogEntry(opTime, // optime + boost::none, // hash + opType, // opType + nss, // namespace + uuid, // uuid + boost::none, // fromMigrate + repl::OplogEntry::kOplogVersion, // version + object, // o + object2, // o2 + sessionInfo, // sessionInfo + boost::none, // upsert + wallClockTime, // wall clock time + stmtId, // statement id + prevOpTime, // optime of previous write within same transaction + boost::none, // pre-image optime + boost::none, // post-image optime + boost::none, // ShardId of resharding recipient + boost::none)}; // _id } OplogEntry makeCommandOplogEntry(OpTime opTime, diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp index e4a43936c08..d8a6f78e237 100644 --- a/src/mongo/db/repl/oplog_fetcher_test.cpp +++ b/src/mongo/db/repl/oplog_fetcher_test.cpp @@ -84,24 +84,24 @@ BSONObj concatenate(BSONObj a, const BSONObj& b) { BSONObj makeNoopOplogEntry(OpTime opTime) { auto oplogEntry = - repl::OplogEntry(opTime, // optime - boost::none, // hash - OpTypeEnum ::kNoop, // opType - NamespaceString("test.t"), // namespace - boost::none, // uuid - boost::none, // fromMigrate - repl::OplogEntry::kOplogVersion, // version - BSONObj(), // o - boost::none, // o2 - {}, // sessionInfo - boost::none, // upsert - Date_t(), // wall clock time - boost::none, // statement id - boost::none, // optime of previous write within same transaction - boost::none, // pre-image optime - boost::none, // post-image optime - boost::none, // ShardId of resharding recipient - boost::none); // _id + repl::DurableOplogEntry(opTime, // optime + boost::none, // hash + OpTypeEnum ::kNoop, // opType + NamespaceString("test.t"), // namespace + boost::none, // uuid + boost::none, // fromMigrate + repl::OplogEntry::kOplogVersion, // version + BSONObj(), // o + boost::none, // o2 + {}, // sessionInfo + boost::none, // upsert + Date_t(), // wall clock time + boost::none, // statement id + boost::none, // optime of previous write within same transaction + boost::none, // pre-image optime + boost::none, // post-image optime + boost::none, // ShardId of resharding recipient + boost::none); // _id return oplogEntry.toBSON(); } diff --git a/src/mongo/db/repl/oplog_test.cpp b/src/mongo/db/repl/oplog_test.cpp index 20edd9e5014..a7f1cc0eeb5 100644 --- a/src/mongo/db/repl/oplog_test.cpp +++ b/src/mongo/db/repl/oplog_test.cpp @@ -118,10 +118,10 @@ TEST_F(OplogTest, LogOpReturnsOpTimeOnSuccessfulInsertIntoOplogCollection) { ASSERT_EQUALS(opTime, oplogEntry.getOpTime()) << "OpTime returned from logOp() did not match that in the oplog entry written to the " "oplog: " - << oplogEntry.toBSON(); + << oplogEntry.toBSONForLogging(); ASSERT(OpTypeEnum::kNoop == oplogEntry.getOpType()) << "Expected 'n' op type but found '" << OpType_serializer(oplogEntry.getOpType()) - << "' instead: " << oplogEntry.toBSON(); + << "' instead: " << oplogEntry.toBSONForLogging(); ASSERT_BSONOBJ_EQ(msgObj, oplogEntry.getObject()); // Ensure that the msg optime returned is the same as the last optime in the ReplClientInfo. @@ -134,8 +134,8 @@ TEST_F(OplogTest, LogOpReturnsOpTimeOnSuccessfulInsertIntoOplogCollection) { void _checkOplogEntry(const OplogEntry& oplogEntry, const OpTime& expectedOpTime, const NamespaceString& expectedNss) { - ASSERT_EQUALS(expectedOpTime, oplogEntry.getOpTime()) << oplogEntry.toBSON(); - ASSERT_EQUALS(expectedNss, oplogEntry.getNss()) << oplogEntry.toBSON(); + ASSERT_EQUALS(expectedOpTime, oplogEntry.getOpTime()) << oplogEntry.toBSONForLogging(); + ASSERT_EQUALS(expectedNss, oplogEntry.getNss()) << oplogEntry.toBSONForLogging(); } void _checkOplogEntry(const OplogEntry& oplogEntry, const std::pair<OpTime, NamespaceString>& expectedOpTimeAndNss) { @@ -388,10 +388,10 @@ TEST_F(OplogTest, MigrationIdAddedToOplog) { ASSERT_EQUALS(opTime, oplogEntry.getOpTime()) << "OpTime returned from logOp() did not match that in the oplog entry written to the " "oplog: " - << oplogEntry.toBSON(); + << oplogEntry.toBSONForLogging(); ASSERT(OpTypeEnum::kNoop == oplogEntry.getOpType()) << "Expected 'n' op type but found '" << OpType_serializer(oplogEntry.getOpType()) - << "' instead: " << oplogEntry.toBSON(); + << "' instead: " << oplogEntry.toBSONForLogging(); ASSERT_BSONOBJ_EQ(msgObj, oplogEntry.getObject()); ASSERT_EQ(migrationUuid, oplogEntry.getFromTenantMigration()); } diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp index d42793e6101..a7674cf2499 100644 --- a/src/mongo/db/repl/replication_recovery.cpp +++ b/src/mongo/db/repl/replication_recovery.cpp @@ -97,7 +97,7 @@ public: "opIndex"_attr = i, "batchSize"_attr = batch.size(), "numBatches"_attr = _numBatches, - "oplogEntry"_attr = redact(entry.getRaw())); + "oplogEntry"_attr = redact(entry.toBSONForLogging())); } } } @@ -685,7 +685,8 @@ void ReplicationRecoveryImpl::_truncateOplogTo(OperationContext* opCtx, invariant(truncateAfterRecordId <= RecordId(truncateAfterTimestamp.asULL()), str::stream() << "Should have found a oplog entry timestamp lte to " << truncateAfterTimestamp.toString() << ", but instead found " - << truncateAfterOplogEntry.toString() << " with timestamp " + << redact(truncateAfterOplogEntry.toBSONForLogging()) + << " with timestamp " << Timestamp(truncateAfterRecordId.repr()).toString()); // Truncate the oplog AFTER the oplog entry found to be <= truncateAfterTimestamp. diff --git a/src/mongo/db/repl/replication_recovery_test.cpp b/src/mongo/db/repl/replication_recovery_test.cpp index 78e78e0d9cb..c8fdbfb4154 100644 --- a/src/mongo/db/repl/replication_recovery_test.cpp +++ b/src/mongo/db/repl/replication_recovery_test.cpp @@ -249,24 +249,25 @@ repl::OplogEntry _makeOplogEntry(repl::OpTime opTime, boost::optional<BSONObj> object2 = boost::none, OperationSessionInfo sessionInfo = {}, Date_t wallTime = Date_t()) { - return repl::OplogEntry(opTime, // optime - boost::none, // hash - opType, // opType - testNs, // namespace - boost::none, // uuid - boost::none, // fromMigrate - repl::OplogEntry::kOplogVersion, // version - object, // o - object2, // o2 - sessionInfo, // sessionInfo - boost::none, // isUpsert - wallTime, // wall clock time - boost::none, // statement id - boost::none, // optime of previous write within same transaction - boost::none, // pre-image optime - boost::none, // post-image optime - boost::none, // ShardId of resharding recipient - boost::none); // _id + return { + repl::DurableOplogEntry(opTime, // optime + boost::none, // hash + opType, // opType + testNs, // namespace + boost::none, // uuid + boost::none, // fromMigrate + repl::OplogEntry::kOplogVersion, // version + object, // o + object2, // o2 + sessionInfo, // sessionInfo + boost::none, // isUpsert + wallTime, // wall clock time + boost::none, // statement id + boost::none, // optime of previous write within same transaction + boost::none, // pre-image optime + boost::none, // post-image optime + boost::none, // ShardId of resharding recipient + boost::none)}; // _id } /** @@ -302,7 +303,7 @@ TimestampedBSONObj _makeInsertOplogEntry(int t) { OpTypeEnum::kInsert, // op type _makeInsertDocument(t), // o boost::none); // o2 - return {entry.toBSON(), Timestamp(t)}; + return {entry.getEntry().toBSON(), Timestamp(t)}; } /** @@ -849,35 +850,41 @@ TEST_F(ReplicationRecoveryTest, RecoveryAppliesUpdatesIdempotently) { ASSERT_OK(getStorageInterface()->insertDocument( opCtx, oplogNs, - {_makeUpdateOplogEntry(ts, BSON("_id" << 1), BSON("$set" << BSON("a" << 7))).toBSON(), + {_makeUpdateOplogEntry(ts, BSON("_id" << 1), BSON("$set" << BSON("a" << 7))) + .getEntry() + .toBSON(), Timestamp(ts, ts)}, OpTime::kUninitializedTerm)); ts++; ASSERT_OK(getStorageInterface()->insertDocument( opCtx, oplogNs, - {_makeDeleteOplogEntry(ts, BSON("_id" << 1)).toBSON(), Timestamp(ts, ts)}, + {_makeDeleteOplogEntry(ts, BSON("_id" << 1)).getEntry().toBSON(), Timestamp(ts, ts)}, OpTime::kUninitializedTerm)); // Test that updates and deletes on a document succeed. ts++; ASSERT_OK(getStorageInterface()->insertDocument( opCtx, oplogNs, - {_makeUpdateOplogEntry(ts, BSON("_id" << 2), BSON("$set" << BSON("a" << 7))).toBSON(), + {_makeUpdateOplogEntry(ts, BSON("_id" << 2), BSON("$set" << BSON("a" << 7))) + .getEntry() + .toBSON(), Timestamp(ts, ts)}, OpTime::kUninitializedTerm)); ts++; ASSERT_OK(getStorageInterface()->insertDocument( opCtx, oplogNs, - {_makeDeleteOplogEntry(ts, BSON("_id" << 2)).toBSON(), Timestamp(ts, ts)}, + {_makeDeleteOplogEntry(ts, BSON("_id" << 2)).getEntry().toBSON(), Timestamp(ts, ts)}, OpTime::kUninitializedTerm)); // Test that updates on a document succeed. ts++; ASSERT_OK(getStorageInterface()->insertDocument( opCtx, oplogNs, - {_makeUpdateOplogEntry(ts, BSON("_id" << 3), BSON("$set" << BSON("a" << 7))).toBSON(), + {_makeUpdateOplogEntry(ts, BSON("_id" << 3), BSON("$set" << BSON("a" << 7))) + .getEntry() + .toBSON(), Timestamp(ts, ts)}, OpTime::kUninitializedTerm)); @@ -900,7 +907,9 @@ DEATH_TEST_F(ReplicationRecoveryTest, RecoveryFailsWithBadOp, "terminate() calle ASSERT_OK(getStorageInterface()->insertDocument( opCtx, oplogNs, - {_makeUpdateOplogEntry(2, BSON("bad_op" << 1), BSON("$set" << BSON("a" << 7))).toBSON(), + {_makeUpdateOplogEntry(2, BSON("bad_op" << 1), BSON("$set" << BSON("a" << 7))) + .getEntry() + .toBSON(), Timestamp(2, 2)}, OpTime::kUninitializedTerm)); @@ -927,7 +936,7 @@ TEST_F(ReplicationRecoveryTest, CorrectlyUpdatesConfigTransactions) { Date_t::now()); ASSERT_OK(getStorageInterface()->insertDocument( - opCtx, oplogNs, {insertOp.toBSON(), Timestamp(2, 0)}, 1)); + opCtx, oplogNs, {insertOp.getEntry().toBSON(), Timestamp(2, 0)}, 1)); auto lastDate = Date_t::now(); auto insertOp2 = _makeOplogEntry({Timestamp(3, 0), 1}, @@ -938,7 +947,7 @@ TEST_F(ReplicationRecoveryTest, CorrectlyUpdatesConfigTransactions) { lastDate); ASSERT_OK(getStorageInterface()->insertDocument( - opCtx, oplogNs, {insertOp2.toBSON(), Timestamp(3, 0)}, 1)); + opCtx, oplogNs, {insertOp2.getEntry().toBSON(), Timestamp(3, 0)}, 1)); recovery.recoverFromOplog(opCtx, boost::none); @@ -986,7 +995,7 @@ TEST_F(ReplicationRecoveryTest, PrepareTransactionOplogEntryCorrectlyUpdatesConf lastDate); ASSERT_OK(getStorageInterface()->insertDocument( - opCtx, oplogNs, {prepareOp.toBSON(), Timestamp(2, 0)}, 1)); + opCtx, oplogNs, {prepareOp.getEntry().toBSON(), Timestamp(2, 0)}, 1)); recovery.recoverFromOplog(opCtx, boost::none); @@ -1034,7 +1043,7 @@ TEST_F(ReplicationRecoveryTest, AbortTransactionOplogEntryCorrectlyUpdatesConfig prepareDate); ASSERT_OK(getStorageInterface()->insertDocument( - opCtx, oplogNs, {prepareOp.toBSON(), Timestamp(2, 0)}, 1)); + opCtx, oplogNs, {prepareOp.getEntry().toBSON(), Timestamp(2, 0)}, 1)); const auto abortDate = Date_t::now(); const auto abortOp = _makeTransactionOplogEntry({Timestamp(3, 0), 1}, @@ -1046,7 +1055,7 @@ TEST_F(ReplicationRecoveryTest, AbortTransactionOplogEntryCorrectlyUpdatesConfig abortDate); ASSERT_OK(getStorageInterface()->insertDocument( - opCtx, oplogNs, {abortOp.toBSON(), Timestamp(3, 0)}, 1)); + opCtx, oplogNs, {abortOp.getEntry().toBSON(), Timestamp(3, 0)}, 1)); recovery.recoverFromOplog(opCtx, boost::none); @@ -1097,7 +1106,7 @@ DEATH_TEST_REGEX_F(ReplicationRecoveryTest, lastDate); ASSERT_OK(getStorageInterface()->insertDocument( - opCtx, oplogNs, {prepareOp.toBSON(), Timestamp(2, 0)}, 1)); + opCtx, oplogNs, {prepareOp.getEntry().toBSON(), Timestamp(2, 0)}, 1)); serverGlobalParams.enableMajorityReadConcern = false; @@ -1134,7 +1143,7 @@ TEST_F(ReplicationRecoveryTest, CommitTransactionOplogEntryCorrectlyUpdatesConfi prepareDate); ASSERT_OK(getStorageInterface()->insertDocument( - opCtx, oplogNs, {prepareOp.toBSON(), Timestamp(2, 0)}, 1)); + opCtx, oplogNs, {prepareOp.getEntry().toBSON(), Timestamp(2, 0)}, 1)); const auto commitDate = Date_t::now(); const auto commitOp = _makeTransactionOplogEntry( @@ -1147,7 +1156,7 @@ TEST_F(ReplicationRecoveryTest, CommitTransactionOplogEntryCorrectlyUpdatesConfi commitDate); ASSERT_OK(getStorageInterface()->insertDocument( - opCtx, oplogNs, {commitOp.toBSON(), Timestamp(3, 0)}, 1)); + opCtx, oplogNs, {commitOp.getEntry().toBSON(), Timestamp(3, 0)}, 1)); recovery.recoverFromOplog(opCtx, boost::none); @@ -1209,7 +1218,7 @@ TEST_F(ReplicationRecoveryTest, prepareDate); ASSERT_OK(getStorageInterface()->insertDocument( - opCtx, oplogNs, {prepareOp.toBSON(), Timestamp(2, 0)}, 1)); + opCtx, oplogNs, {prepareOp.getEntry().toBSON(), Timestamp(2, 0)}, 1)); // Add an operation here so that we can have the appliedThrough time be in-between the commit // timestamp and the commitTransaction oplog entry. @@ -1221,7 +1230,7 @@ TEST_F(ReplicationRecoveryTest, Date_t::now()); ASSERT_OK(getStorageInterface()->insertDocument( - opCtx, oplogNs, {insertOp.toBSON(), Timestamp(2, 2)}, 1)); + opCtx, oplogNs, {insertOp.getEntry().toBSON(), Timestamp(2, 2)}, 1)); const auto commitDate = Date_t::now(); const auto commitOp = _makeTransactionOplogEntry( @@ -1234,7 +1243,7 @@ TEST_F(ReplicationRecoveryTest, commitDate); ASSERT_OK(getStorageInterface()->insertDocument( - opCtx, oplogNs, {commitOp.toBSON(), Timestamp(3, 0)}, 1)); + opCtx, oplogNs, {commitOp.getEntry().toBSON(), Timestamp(3, 0)}, 1)); recovery.recoverFromOplog(opCtx, boost::none); @@ -1387,7 +1396,7 @@ TEST_F(ReplicationRecoveryTest, RecoverFromOplogUpToReconstructsPreparedTransact sessionInfo, lastDate); ASSERT_OK(getStorageInterface()->insertDocument( - opCtx, oplogNs, {prepareOp.toBSON(), Timestamp(3, 3)}, 1)); + opCtx, oplogNs, {prepareOp.getEntry().toBSON(), Timestamp(3, 3)}, 1)); } recovery.recoverFromOplogUpTo(opCtx, Timestamp(3, 3)); @@ -1429,7 +1438,7 @@ TEST_F(ReplicationRecoveryTest, sessionInfo, lastDate); ASSERT_OK(getStorageInterface()->insertDocument( - opCtx, oplogNs, {prepareOp.toBSON(), Timestamp(1, 1)}, 1)); + opCtx, oplogNs, {prepareOp.getEntry().toBSON(), Timestamp(1, 1)}, 1)); const BSONObj doc = BSON("_id" << sessionId.toBSON() << "txnNum" << static_cast<long long>(1) diff --git a/src/mongo/db/repl/rollback_impl.cpp b/src/mongo/db/repl/rollback_impl.cpp index 54931298957..0489468eddd 100644 --- a/src/mongo/db/repl/rollback_impl.cpp +++ b/src/mongo/db/repl/rollback_impl.cpp @@ -102,7 +102,7 @@ boost::optional<long long> _parseDroppedCollectionCount(const OplogEntry& oplogE LOGV2_WARNING(21634, "Unable to get collection count from oplog entry without the o2 field", "type"_attr = desc, - "oplogEntry"_attr = redact(oplogEntry.toBSON())); + "oplogEntry"_attr = redact(oplogEntry.toBSONForLogging())); return boost::none; } @@ -114,7 +114,7 @@ boost::optional<long long> _parseDroppedCollectionCount(const OplogEntry& oplogE "Failed to parse oplog entry for collection count", "type"_attr = desc, "error"_attr = status, - "oplogEntry"_attr = redact(oplogEntry.toBSON())); + "oplogEntry"_attr = redact(oplogEntry.toBSONForLogging())); return boost::none; } @@ -123,7 +123,7 @@ boost::optional<long long> _parseDroppedCollectionCount(const OplogEntry& oplogE "Invalid collection count found in oplog entry", "type"_attr = desc, "count"_attr = count, - "oplogEntry"_attr = redact(oplogEntry.toBSON())); + "oplogEntry"_attr = redact(oplogEntry.toBSONForLogging())); return boost::none; } @@ -132,7 +132,7 @@ boost::optional<long long> _parseDroppedCollectionCount(const OplogEntry& oplogE "Parsed collection count of oplog entry", "count"_attr = count, "type"_attr = desc, - "oplogEntry"_attr = redact(oplogEntry.toBSON())); + "oplogEntry"_attr = redact(oplogEntry.toBSONForLogging())); return count; } @@ -822,7 +822,7 @@ Status RollbackImpl::_processRollbackOp(OperationContext* opCtx, const OplogEntr const auto uuid = oplogEntry.getUuid(); invariant(uuid, str::stream() << "Oplog entry to roll back is unexpectedly missing a UUID: " - << redact(oplogEntry.toBSON())); + << redact(oplogEntry.toBSONForLogging())); const auto idElem = oplogEntry.getIdElement(); if (!idElem.eoo()) { // We call BSONElement::wrap() on each _id element to create a new BSONObj with an owned @@ -844,7 +844,7 @@ Status RollbackImpl::_processRollbackOp(OperationContext* opCtx, const OplogEntr LOGV2_WARNING(21641, "Shard identity document rollback detected. oplog op: {oplogEntry}", "Shard identity document rollback detected", - "oplogEntry"_attr = redact(oplogEntry.toBSON())); + "oplogEntry"_attr = redact(oplogEntry.toBSONForLogging())); } else if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer && opNss == VersionType::ConfigNS) { // Check if the creation of the config server config version document is being rolled @@ -853,7 +853,7 @@ Status RollbackImpl::_processRollbackOp(OperationContext* opCtx, const OplogEntr LOGV2_WARNING(21642, "Config version document rollback detected. oplog op: {oplogEntry}", "Config version document rollback detected", - "oplogEntry"_attr = redact(oplogEntry.toBSON())); + "oplogEntry"_attr = redact(oplogEntry.toBSONForLogging())); } // Rolling back an insert must decrement the count by 1. @@ -877,7 +877,7 @@ Status RollbackImpl::_processRollbackOp(OperationContext* opCtx, const OplogEntr invariant(UUID::parse(catalogEntry["md"]["options"]["uuid"]), str::stream() << "Oplog entry to roll back is unexpectedly missing " "import collection UUID: " - << redact(oplogEntry.toBSON())); + << redact(oplogEntry.toBSONForLogging())); // If we roll back an import, then we do not need to change the size of that uuid. _countDiffs.erase(importTargetUUID); _pendingDrops.erase(importTargetUUID); @@ -893,7 +893,7 @@ Status RollbackImpl::_processRollbackOp(OperationContext* opCtx, const OplogEntr const auto uuid = oplogEntry.getUuid().get(); invariant(_countDiffs.find(uuid) == _countDiffs.end(), str::stream() << "Unexpected existing count diff for " << uuid.toString() - << " op: " << redact(oplogEntry.toBSON())); + << " op: " << redact(oplogEntry.toBSONForLogging())); if (auto countResult = _parseDroppedCollectionCount(oplogEntry)) { PendingDropInfo info; info.count = *countResult; @@ -917,11 +917,11 @@ Status RollbackImpl::_processRollbackOp(OperationContext* opCtx, const OplogEntr UUID::parse(oplogEntry.getObject()[kDropTargetFieldName]), str::stream() << "Oplog entry to roll back is unexpectedly missing dropTarget UUID: " - << redact(oplogEntry.toBSON())); + << redact(oplogEntry.toBSONForLogging())); invariant(_countDiffs.find(dropTargetUUID) == _countDiffs.end(), str::stream() << "Unexpected existing count diff for " << dropTargetUUID.toString() - << " op: " << redact(oplogEntry.toBSON())); + << " op: " << redact(oplogEntry.toBSONForLogging())); if (auto countResult = _parseDroppedCollectionCount(oplogEntry)) { PendingDropInfo info; info.count = *countResult; diff --git a/src/mongo/db/repl/rollback_impl_test.cpp b/src/mongo/db/repl/rollback_impl_test.cpp index ab66cbbdf4d..8d09e456e50 100644 --- a/src/mongo/db/repl/rollback_impl_test.cpp +++ b/src/mongo/db/repl/rollback_impl_test.cpp @@ -1433,24 +1433,24 @@ RollbackImplTest::_setUpUnpreparedTransactionForCountTest(UUID collId) { insertOp2Obj = insertOp2Obj.removeField("wall"); auto partialApplyOpsObj = BSON("applyOps" << BSON_ARRAY(insertOp2Obj) << "partialTxn" << true); - OplogEntry partialApplyOpsOplogEntry(partialApplyOpsOpTime, // opTime - 1LL, // hash - OpTypeEnum::kCommand, // opType - adminCmdNss, // nss - boost::none, // uuid - boost::none, // fromMigrate - OplogEntry::kOplogVersion, // version - partialApplyOpsObj, // oField - boost::none, // o2Field - sessionInfo, // sessionInfo - boost::none, // isUpsert - Date_t(), // wallClockTime - boost::none, // statementId - OpTime(), // prevWriteOpTimeInTransaction - boost::none, // preImageOpTime - boost::none, // postImageOpTime - boost::none, // ShardId of resharding recipient - boost::none); // _id + DurableOplogEntry partialApplyOpsOplogEntry(partialApplyOpsOpTime, // opTime + 1LL, // hash + OpTypeEnum::kCommand, // opType + adminCmdNss, // nss + boost::none, // uuid + boost::none, // fromMigrate + OplogEntry::kOplogVersion, // version + partialApplyOpsObj, // oField + boost::none, // o2Field + sessionInfo, // sessionInfo + boost::none, // isUpsert + Date_t(), // wallClockTime + boost::none, // statementId + OpTime(), // prevWriteOpTimeInTransaction + boost::none, // preImageOpTime + boost::none, // postImageOpTime + boost::none, // ShardId of resharding recipient + boost::none); // _id ASSERT_OK(_insertOplogEntry(partialApplyOpsOplogEntry.toBSON())); ops.push_back(std::make_pair(partialApplyOpsOplogEntry.toBSON(), insertOp2.second)); @@ -1466,24 +1466,25 @@ RollbackImplTest::_setUpUnpreparedTransactionForCountTest(UUID collId) { insertOp3Obj = insertOp3Obj.removeField("wall"); auto commitApplyOpsObj = BSON("applyOps" << BSON_ARRAY(insertOp3Obj) << "count" << 1); - OplogEntry commitApplyOpsOplogEntry(commitApplyOpsOpTime, // opTime - 1LL, // hash - OpTypeEnum::kCommand, // opType - adminCmdNss, // nss - boost::none, // uuid - boost::none, // fromMigrate - OplogEntry::kOplogVersion, // version - commitApplyOpsObj, // oField - boost::none, // o2Field - sessionInfo, // sessionInfo - boost::none, // isUpsert - Date_t(), // wallClockTime - boost::none, // statementId - partialApplyOpsOpTime, // prevWriteOpTimeInTransaction - boost::none, // preImageOpTime - boost::none, // postImageOpTime - boost::none, // ShardId of resharding recipient - boost::none); // _id + DurableOplogEntry commitApplyOpsOplogEntry( + commitApplyOpsOpTime, // opTime + 1LL, // hash + OpTypeEnum::kCommand, // opType + adminCmdNss, // nss + boost::none, // uuid + boost::none, // fromMigrate + OplogEntry::kOplogVersion, // version + commitApplyOpsObj, // oField + boost::none, // o2Field + sessionInfo, // sessionInfo + boost::none, // isUpsert + Date_t(), // wallClockTime + boost::none, // statementId + partialApplyOpsOpTime, // prevWriteOpTimeInTransaction + boost::none, // preImageOpTime + boost::none, // postImageOpTime + boost::none, // ShardId of resharding recipient + boost::none); // _id ASSERT_OK(_insertOplogEntry(commitApplyOpsOplogEntry.toBSON())); ops.push_back(std::make_pair(commitApplyOpsOplogEntry.toBSON(), insertOp3.second)); diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index 59f515c2076..79a99bb34d7 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -276,7 +276,7 @@ Status rollback_internal::updateFixUpInfoFromLocalOplogEntry(OperationContext* o 2, "Updating rollback FixUpInfo for nested applyOps oplog entry: {oplogEntry}", "Updating rollback FixUpInfo for nested applyOps oplog entry", - "oplogEntry"_attr = redact(oplogEntry.toBSON())); + "oplogEntry"_attr = redact(oplogEntry.toBSONForLogging())); } // Extract the op's collection namespace and UUID. @@ -288,13 +288,13 @@ Status rollback_internal::updateFixUpInfoFromLocalOplogEntry(OperationContext* o if (oplogEntry.getNss().isEmpty()) { throw RSFatalException(str::stream() << "Local op on rollback has no ns: " - << redact(oplogEntry.toBSON())); + << redact(oplogEntry.toBSONForLogging())); } auto obj = oplogEntry.getOperationToApply(); if (obj.isEmpty()) { throw RSFatalException(str::stream() << "Local op on rollback has no object field: " - << redact(oplogEntry.toBSON())); + << redact(oplogEntry.toBSONForLogging())); } // If the operation being rolled back has a txnNumber, then the corresponding entry in the @@ -320,7 +320,7 @@ Status rollback_internal::updateFixUpInfoFromLocalOplogEntry(OperationContext* o throw RSFatalException( str::stream() << NamespaceString::kSessionTransactionsTableNamespace.ns() << " does not have a UUID, but local op has a transaction number: " - << redact(oplogEntry.toBSON())); + << redact(oplogEntry.toBSONForLogging())); } if (oplogEntry.isPartialTransaction()) { // If this is a transaction which did not commit, we need do nothing more than @@ -397,7 +397,7 @@ Status rollback_internal::updateFixUpInfoFromLocalOplogEntry(OperationContext* o "Missing index name in dropIndexes operation on rollback, " "document: {oplogEntry}", "Missing index name in dropIndexes operation on rollback", - "oplogEntry"_attr = redact(oplogEntry.toBSON())); + "oplogEntry"_attr = redact(oplogEntry.toBSONForLogging())); throw RSFatalException( "Missing index name in dropIndexes operation on rollback."); } @@ -438,7 +438,7 @@ Status rollback_internal::updateFixUpInfoFromLocalOplogEntry(OperationContext* o "Missing index name in createIndexes operation on rollback, " "document: {oplogEntry}", "Missing index name in createIndexes operation on rollback", - "oplogEntry"_attr = redact(oplogEntry.toBSON())); + "oplogEntry"_attr = redact(oplogEntry.toBSONForLogging())); throw RSFatalException( "Missing index name in createIndexes operation on rollback."); } @@ -791,7 +791,7 @@ Status rollback_internal::updateFixUpInfoFromLocalOplogEntry(OperationContext* o LOGV2_FATAL_CONTINUE(21737, message, "namespace"_attr = nss.ns(), - "oplogEntry"_attr = redact(oplogEntry.toBSON())); + "oplogEntry"_attr = redact(oplogEntry.toBSONForLogging())); throw RSFatalException(str::stream() << message << ". ns: " << nss.ns()); } fixUpInfo.docsToRefetch.insert(doc); diff --git a/src/mongo/db/repl/session_update_tracker.cpp b/src/mongo/db/repl/session_update_tracker.cpp index a9f418d619c..1acf972b467 100644 --- a/src/mongo/db/repl/session_update_tracker.cpp +++ b/src/mongo/db/repl/session_update_tracker.cpp @@ -53,24 +53,24 @@ OplogEntry createOplogEntryForTransactionTableUpdate(repl::OpTime opTime, const BSONObj& updateBSON, const BSONObj& o2Field, Date_t wallClockTime) { - return repl::OplogEntry(opTime, - boost::none, // hash - repl::OpTypeEnum::kUpdate, - NamespaceString::kSessionTransactionsTableNamespace, - boost::none, // uuid - false, // fromMigrate - repl::OplogEntry::kOplogVersion, - updateBSON, - o2Field, - {}, // sessionInfo - true, // upsert - wallClockTime, - boost::none, // statementId - boost::none, // prevWriteOpTime - boost::none, // preImageOpTime - boost::none, // postImageOpTime - boost::none, // destinedRecipient - boost::none); // _id + return {repl::DurableOplogEntry(opTime, + boost::none, // hash + repl::OpTypeEnum::kUpdate, + NamespaceString::kSessionTransactionsTableNamespace, + boost::none, // uuid + false, // fromMigrate + repl::OplogEntry::kOplogVersion, + updateBSON, + o2Field, + {}, // sessionInfo + true, // upsert + wallClockTime, + boost::none, // statementId + boost::none, // prevWriteOpTime + boost::none, // preImageOpTime + boost::none, // postImageOpTime + boost::none, // destinedRecipient + boost::none)}; // _id } /** @@ -201,8 +201,8 @@ void SessionUpdateTracker::_updateSessionInfo(const OplogEntry& entry) { "sessionInfo_getTxnNumber"_attr = *sessionInfo.getTxnNumber(), "existingSessionInfo_getTxnNumber"_attr = *existingSessionInfo.getTxnNumber(), - "newEntry"_attr = redact(entry.toString()), - "existingEntry"_attr = redact(iter->second.toString())); + "newEntry"_attr = redact(entry.toBSONForLogging()), + "existingEntry"_attr = redact(iter->second.toBSONForLogging())); } std::vector<OplogEntry> SessionUpdateTracker::_flush(const OplogEntry& entry) { diff --git a/src/mongo/db/repl/sync_source_resolver_test.cpp b/src/mongo/db/repl/sync_source_resolver_test.cpp index 37e42fcc4fe..1382b8b73c7 100644 --- a/src/mongo/db/repl/sync_source_resolver_test.cpp +++ b/src/mongo/db/repl/sync_source_resolver_test.cpp @@ -311,24 +311,24 @@ void _scheduleFirstOplogEntryFetcherResponse(executor::NetworkInterfaceMock* net * Generates oplog entries with the given optime. */ BSONObj _makeOplogEntry(Timestamp ts, long long term) { - return OplogEntry(OpTime(ts, term), // optime - boost::none, // hash - OpTypeEnum::kNoop, // op type - NamespaceString("a.a"), // namespace - boost::none, // uuid - boost::none, // fromMigrate - repl::OplogEntry::kOplogVersion, // version - BSONObj(), // o - boost::none, // o2 - {}, // sessionInfo - boost::none, // upsert - Date_t(), // wall clock time - boost::none, // statement id - boost::none, // optime of previous write within same transaction - boost::none, // pre-image optime - boost::none, // post-image optime - boost::none, // ShardId of resharding recipient - boost::none) // _id + return DurableOplogEntry(OpTime(ts, term), // optime + boost::none, // hash + OpTypeEnum::kNoop, // op type + NamespaceString("a.a"), // namespace + boost::none, // uuid + boost::none, // fromMigrate + repl::OplogEntry::kOplogVersion, // version + BSONObj(), // o + boost::none, // o2 + {}, // sessionInfo + boost::none, // upsert + Date_t(), // wall clock time + boost::none, // statement id + boost::none, // optime of previous write within same transaction + boost::none, // pre-image optime + boost::none, // post-image optime + boost::none, // ShardId of resharding recipient + boost::none) // _id .toBSON(); } diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp index f8d39c19b8c..db7a6e67170 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp @@ -77,24 +77,24 @@ OplogEntry makeOplogEntry(OpTime opTime, OptionalCollectionUUID uuid, BSONObj o, boost::optional<BSONObj> o2) { - return OplogEntry(opTime, // optime - boost::none, // hash - opType, // opType - nss, // namespace - uuid, // uuid - boost::none, // fromMigrate - OplogEntry::kOplogVersion, // version - o, // o - o2, // o2 - {}, // sessionInfo - boost::none, // upsert - Date_t(), // wall clock time - boost::none, // statement id - boost::none, // optime of previous write within same transaction - boost::none, // pre-image optime - boost::none, // post-image optime - boost::none, // ShardId of resharding recipient - boost::none); // _id + return {DurableOplogEntry(opTime, // optime + boost::none, // hash + opType, // opType + nss, // namespace + uuid, // uuid + boost::none, // fromMigrate + OplogEntry::kOplogVersion, // version + o, // o + o2, // o2 + {}, // sessionInfo + boost::none, // upsert + Date_t(), // wall clock time + boost::none, // statement id + boost::none, // optime of previous write within same transaction + boost::none, // pre-image optime + boost::none, // post-image optime + boost::none, // ShardId of resharding recipient + boost::none)}; // _id } /** @@ -264,6 +264,7 @@ protected: boost::none /* uuid */, BSONObj() /* o */, boost::none /* o2 */) + .getEntry() .toBSON()); } @@ -983,7 +984,8 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogApplierFails) { BSON("_id" << "bad insert"), boost::none /* o2 */); - oplogFetcher->receiveBatch(1LL, {oplogEntry.toBSON()}, injectedEntryOpTime.getTimestamp()); + oplogFetcher->receiveBatch( + 1LL, {oplogEntry.getEntry().toBSON()}, injectedEntryOpTime.getTimestamp()); // Wait for task completion failure. ASSERT_NOT_OK(instance->getDataSyncCompletionFuture().getNoThrow()); @@ -1090,7 +1092,7 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientAddResumeTok UUID::gen() /* uuid */, BSON("doc" << 2) /* o */, boost::none /* o2 */); - oplogFetcher->receiveBatch(17, {oplogEntry1.toBSON()}, resumeToken1); + oplogFetcher->receiveBatch(17, {oplogEntry1.getEntry().toBSON()}, resumeToken1); const Timestamp oplogEntryTS2 = Timestamp(6, 2); const Timestamp resumeToken2 = Timestamp(7, 3); @@ -1100,7 +1102,7 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientAddResumeTok UUID::gen() /* uuid */, BSON("doc" << 3) /* o */, boost::none /* o2 */); - oplogFetcher->receiveBatch(17, {oplogEntry2.toBSON()}, resumeToken2); + oplogFetcher->receiveBatch(17, {oplogEntry2.getEntry().toBSON()}, resumeToken2); // Receive an empty batch. oplogFetcher->receiveBatch(17, {}, resumeToken2); @@ -1112,14 +1114,14 @@ TEST_F(TenantMigrationRecipientServiceTest, TenantMigrationRecipientAddResumeTok BSONObj insertDoc; ASSERT_TRUE(oplogBuffer->tryPop(opCtx.get(), &insertDoc)); LOGV2(5124601, "Insert oplog entry", "entry"_attr = insertDoc); - ASSERT_BSONOBJ_EQ(insertDoc, oplogEntry1.toBSON()); + ASSERT_BSONOBJ_EQ(insertDoc, oplogEntry1.getEntry().toBSON()); } { BSONObj insertDoc; ASSERT_TRUE(oplogBuffer->tryPop(opCtx.get(), &insertDoc)); LOGV2(5124602, "Insert oplog entry", "entry"_attr = insertDoc); - ASSERT_BSONOBJ_EQ(insertDoc, oplogEntry2.toBSON()); + ASSERT_BSONOBJ_EQ(insertDoc, oplogEntry2.getEntry().toBSON()); } { diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp index 883071df97a..f94e25416c3 100644 --- a/src/mongo/db/repl/tenant_oplog_applier.cpp +++ b/src/mongo/db/repl/tenant_oplog_applier.cpp @@ -470,7 +470,7 @@ void TenantOplogApplier::_writeNoOpsForRange(OpObserver* opObserver, opCtx.get(), entry.getNss(), entry.getUuid(), - entry.toBSON(), + entry.getEntry().toBSON(), BSONObj(), // We link the no-ops together by recipient op time the same way the actual ops // were linked together by donor op time. This is to allow retryable writes @@ -539,7 +539,7 @@ Status TenantOplogApplier::_applyOplogEntryOrGroupedInserts( "Index operations are not currently supported in tenant migration", "tenant"_attr = _tenantId, "migrationUuid"_attr = _migrationUuid, - "op"_attr = redact(op.toBSON())); + "op"_attr = redact(op.toBSONForLogging())); return Status::OK(); } @@ -559,7 +559,7 @@ Status TenantOplogApplier::_applyOplogEntryOrGroupedInserts( "tenant"_attr = _tenantId, "migrationUuid"_attr = _migrationUuid, "error"_attr = status, - "op"_attr = redact(op.toBSON())); + "op"_attr = redact(op.toBSONForLogging())); return status; } diff --git a/src/mongo/db/repl/tenant_oplog_applier_test.cpp b/src/mongo/db/repl/tenant_oplog_applier_test.cpp index 02e654a2d01..558fdd61390 100644 --- a/src/mongo/db/repl/tenant_oplog_applier_test.cpp +++ b/src/mongo/db/repl/tenant_oplog_applier_test.cpp @@ -160,7 +160,7 @@ public: } void assertNoOpMatches(const OplogEntry& op, const MutableOplogEntry& noOp) { - ASSERT_BSONOBJ_EQ(op.toBSON(), noOp.getObject()); + ASSERT_BSONOBJ_EQ(op.getEntry().toBSON(), noOp.getObject()); ASSERT_EQ(op.getNss(), noOp.getNss()); ASSERT_EQ(op.getUuid(), noOp.getUuid()); ASSERT_EQ(_migrationUuid, noOp.getFromTenantMigration()); @@ -169,7 +169,7 @@ public: void pushOps(const std::vector<OplogEntry>& ops) { std::vector<BSONObj> bsonOps; for (const auto& op : ops) { - bsonOps.push_back(op.toBSON()); + bsonOps.push_back(op.getEntry().toBSON()); } _oplogBuffer.push(nullptr, bsonOps.begin(), bsonOps.end()); } diff --git a/src/mongo/db/repl/tenant_oplog_batcher.cpp b/src/mongo/db/repl/tenant_oplog_batcher.cpp index 9a254053588..c564d4b0119 100644 --- a/src/mongo/db/repl/tenant_oplog_batcher.cpp +++ b/src/mongo/db/repl/tenant_oplog_batcher.cpp @@ -56,7 +56,7 @@ void TenantOplogBatcher::_pushEntry(OperationContext* opCtx, OplogEntry&& op) { uassert(4885606, str::stream() << "Prepared transactions are not supported for tenant migration." - << redact(op.toBSON()), + << redact(op.toBSONForLogging()), !op.isPreparedCommit() && (op.getCommandType() != OplogEntry::CommandType::kApplyOps || !op.shouldPrepare())); if (op.isTerminalApplyOps()) { @@ -65,7 +65,7 @@ void TenantOplogBatcher::_pushEntry(OperationContext* opCtx, // This applies to both multi-document transactions and atomic applyOps. auto expansionsIndex = batch->expansions.size(); auto& curExpansion = batch->expansions.emplace_back(); - auto lastOpInTransactionBson = op.toBSON(); + auto lastOpInTransactionBson = op.getEntry().toBSON(); repl::ApplyOps::extractOperationsTo(op, lastOpInTransactionBson, &curExpansion); auto oplogPrevTsOption = op.getPrevWriteOpTimeInTransaction(); if (oplogPrevTsOption && !oplogPrevTsOption->isNull()) { diff --git a/src/mongo/db/repl/tenant_oplog_batcher_test.cpp b/src/mongo/db/repl/tenant_oplog_batcher_test.cpp index a2305561f4a..18ca8741d6a 100644 --- a/src/mongo/db/repl/tenant_oplog_batcher_test.cpp +++ b/src/mongo/db/repl/tenant_oplog_batcher_test.cpp @@ -77,7 +77,7 @@ std::string toString(const std::vector<TenantOplogEntry>& ops) { StringBuilder sb; sb << "["; for (const auto& op : ops) { - sb << " " << op.entry.toString() << "(" << op.expansionsEntry << ")"; + sb << " " << op.entry.toStringForLogging() << "(" << op.expansionsEntry << ")"; } sb << " ]"; return sb.str(); @@ -113,8 +113,8 @@ TEST_F(TenantOplogBatcherTest, OplogBatcherGroupsCrudOps) { // We just started, no batch should be available. ASSERT(!batchFuture.isReady()); std::vector<BSONObj> srcOps; - srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "foo")).toBSON()); - srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar")).toBSON()); + srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "foo")).getEntry().toBSON()); + srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar")).getEntry().toBSON()); _oplogBuffer.push(nullptr, srcOps.cbegin(), srcOps.cend()); auto batch = batchFuture.get(); @@ -122,9 +122,9 @@ TEST_F(TenantOplogBatcherTest, OplogBatcherGroupsCrudOps) { ASSERT_EQUALS(srcOps.size(), batch.ops.size()) << toString(batch); ASSERT(batch.expansions.empty()); - ASSERT_BSONOBJ_EQ(srcOps[0], batch.ops[0].entry.toBSON()); + ASSERT_BSONOBJ_EQ(srcOps[0], batch.ops[0].entry.getEntry().toBSON()); ASSERT_EQUALS(-1, batch.ops[0].expansionsEntry); - ASSERT_BSONOBJ_EQ(srcOps[1], batch.ops[1].entry.toBSON()); + ASSERT_BSONOBJ_EQ(srcOps[1], batch.ops[1].entry.getEntry().toBSON()); ASSERT_EQUALS(-1, batch.ops[1].expansionsEntry); batcher->join(); @@ -136,7 +136,7 @@ TEST_F(TenantOplogBatcherTest, OplogBatcherFailsOnPreparedApplyOps) { auto batchFuture = batcher->getNextBatch(bigBatchLimits); std::vector<BSONObj> srcOps; - srcOps.push_back(makeApplyOpsOplogEntry(1, true).toBSON()); + srcOps.push_back(makeApplyOpsOplogEntry(1, true).getEntry().toBSON()); _oplogBuffer.push(nullptr, srcOps.cbegin(), srcOps.cend()); ASSERT_THROWS(batchFuture.get(), AssertionException); @@ -150,8 +150,9 @@ TEST_F(TenantOplogBatcherTest, OplogBatcherFailsOnPreparedCommit) { auto batchFuture = batcher->getNextBatch(bigBatchLimits); std::vector<BSONObj> srcOps; - srcOps.push_back( - makeCommitTransactionOplogEntry(1, dbName, true /* prepared*/, 1 /* count */).toBSON()); + srcOps.push_back(makeCommitTransactionOplogEntry(1, dbName, true /* prepared*/, 1 /* count */) + .getEntry() + .toBSON()); _oplogBuffer.push(nullptr, srcOps.cbegin(), srcOps.cend()); ASSERT_THROWS(batchFuture.get(), AssertionException); @@ -161,9 +162,10 @@ TEST_F(TenantOplogBatcherTest, OplogBatcherFailsOnPreparedCommit) { // We internally add the 'b' field during applyOps expansion; we need to remove it when we check to // see that the expansion matches the expected test values input. -static DurableReplOperation& stripB(DurableReplOperation& withB) { - withB.setUpsert(boost::none); - return withB; +static DurableReplOperation stripB(const DurableReplOperation& withB) { + DurableReplOperation withoutB(withB); + withoutB.setUpsert(boost::none); + return withoutB; } TEST_F(TenantOplogBatcherTest, GetNextApplierBatchGroupsUnpreparedApplyOpsOpWithOtherOps) { @@ -171,8 +173,8 @@ TEST_F(TenantOplogBatcherTest, GetNextApplierBatchGroupsUnpreparedApplyOpsOpWith std::vector<BSONObj> srcOps; innerOps.push_back(makeInsertOplogEntry(10, NamespaceString(dbName, "foo"))); innerOps.push_back(makeInsertOplogEntry(11, NamespaceString(dbName, "foo"))); - srcOps.push_back(makeApplyOpsOplogEntry(1, false, innerOps).toBSON()); - srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar")).toBSON()); + srcOps.push_back(makeApplyOpsOplogEntry(1, false, innerOps).getEntry().toBSON()); + srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar")).getEntry().toBSON()); auto batcher = std::make_shared<TenantOplogBatcher>("tenant", &_oplogBuffer, _executor); ASSERT_OK(batcher->startup()); @@ -184,13 +186,13 @@ TEST_F(TenantOplogBatcherTest, GetNextApplierBatchGroupsUnpreparedApplyOpsOpWith ASSERT_EQUALS(2, batch.ops.size()) << toString(batch); ASSERT_EQUALS(1, batch.expansions.size()); ASSERT_EQUALS(2, batch.expansions[0].size()); - ASSERT_BSONOBJ_EQ(srcOps[0], batch.ops[0].entry.toBSON()); + ASSERT_BSONOBJ_EQ(srcOps[0], batch.ops[0].entry.getEntry().toBSON()); ASSERT_EQUALS(0, batch.ops[0].expansionsEntry); ASSERT_BSONOBJ_EQ(innerOps[0].getDurableReplOperation().toBSON(), stripB(batch.expansions[0][0].getDurableReplOperation()).toBSON()); ASSERT_BSONOBJ_EQ(innerOps[1].getDurableReplOperation().toBSON(), stripB(batch.expansions[0][1].getDurableReplOperation()).toBSON()); - ASSERT_BSONOBJ_EQ(srcOps[1], batch.ops[1].entry.toBSON()); + ASSERT_BSONOBJ_EQ(srcOps[1], batch.ops[1].entry.getEntry().toBSON()); ASSERT_EQUALS(-1, batch.ops[1].expansionsEntry); batcher->join(); @@ -204,8 +206,8 @@ TEST_F(TenantOplogBatcherTest, GetNextApplierBatchGroupsMultipleTransactions) { innerOps1.push_back(makeInsertOplogEntry(11, NamespaceString(dbName, "foo"))); innerOps2.push_back(makeInsertOplogEntry(20, NamespaceString(dbName, "foo"))); innerOps2.push_back(makeInsertOplogEntry(21, NamespaceString(dbName, "foo"))); - srcOps.push_back(makeApplyOpsOplogEntry(1, false, innerOps1).toBSON()); - srcOps.push_back(makeApplyOpsOplogEntry(2, false, innerOps2).toBSON()); + srcOps.push_back(makeApplyOpsOplogEntry(1, false, innerOps1).getEntry().toBSON()); + srcOps.push_back(makeApplyOpsOplogEntry(2, false, innerOps2).getEntry().toBSON()); auto batcher = std::make_shared<TenantOplogBatcher>("tenant", &_oplogBuffer, _executor); ASSERT_OK(batcher->startup()); @@ -218,7 +220,7 @@ TEST_F(TenantOplogBatcherTest, GetNextApplierBatchGroupsMultipleTransactions) { ASSERT_EQUALS(2, batch.expansions.size()); // First transaction. - ASSERT_BSONOBJ_EQ(srcOps[0], batch.ops[0].entry.toBSON()); + ASSERT_BSONOBJ_EQ(srcOps[0], batch.ops[0].entry.getEntry().toBSON()); ASSERT_EQUALS(0, batch.ops[0].expansionsEntry); ASSERT_EQUALS(2, batch.expansions[0].size()); ASSERT_BSONOBJ_EQ(innerOps1[0].getDurableReplOperation().toBSON(), @@ -227,7 +229,7 @@ TEST_F(TenantOplogBatcherTest, GetNextApplierBatchGroupsMultipleTransactions) { stripB(batch.expansions[0][1].getDurableReplOperation()).toBSON()); // Second transaction. - ASSERT_BSONOBJ_EQ(srcOps[1], batch.ops[1].entry.toBSON()); + ASSERT_BSONOBJ_EQ(srcOps[1], batch.ops[1].entry.getEntry().toBSON()); ASSERT_EQUALS(1, batch.ops[1].expansionsEntry); ASSERT_EQUALS(2, batch.expansions[1].size()); ASSERT_BSONOBJ_EQ(innerOps2[0].getDurableReplOperation().toBSON(), @@ -240,11 +242,11 @@ TEST_F(TenantOplogBatcherTest, GetNextApplierBatchGroupsMultipleTransactions) { TEST_F(TenantOplogBatcherTest, GetNextApplierBatchChecksBatchLimitsForNumberOfOperations) { std::vector<BSONObj> srcOps; - srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "bar")).toBSON()); - srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar")).toBSON()); - srcOps.push_back(makeInsertOplogEntry(3, NamespaceString(dbName, "bar")).toBSON()); - srcOps.push_back(makeInsertOplogEntry(4, NamespaceString(dbName, "bar")).toBSON()); - srcOps.push_back(makeInsertOplogEntry(5, NamespaceString(dbName, "bar")).toBSON()); + srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "bar")).getEntry().toBSON()); + srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar")).getEntry().toBSON()); + srcOps.push_back(makeInsertOplogEntry(3, NamespaceString(dbName, "bar")).getEntry().toBSON()); + srcOps.push_back(makeInsertOplogEntry(4, NamespaceString(dbName, "bar")).getEntry().toBSON()); + srcOps.push_back(makeInsertOplogEntry(5, NamespaceString(dbName, "bar")).getEntry().toBSON()); _oplogBuffer.push(nullptr, srcOps.cbegin(), srcOps.cend()); // Set batch limits so that each batch contains a maximum of 'BatchLimit::ops'. @@ -257,25 +259,25 @@ TEST_F(TenantOplogBatcherTest, GetNextApplierBatchChecksBatchLimitsForNumberOfOp // First batch: [insert, insert, insert] auto batch = batchFuture.get(); ASSERT_EQUALS(3U, batch.ops.size()) << toString(batch); - ASSERT_BSONOBJ_EQ(srcOps[0], batch.ops[0].entry.toBSON()); - ASSERT_BSONOBJ_EQ(srcOps[1], batch.ops[1].entry.toBSON()); - ASSERT_BSONOBJ_EQ(srcOps[2], batch.ops[2].entry.toBSON()); + ASSERT_BSONOBJ_EQ(srcOps[0], batch.ops[0].entry.getEntry().toBSON()); + ASSERT_BSONOBJ_EQ(srcOps[1], batch.ops[1].entry.getEntry().toBSON()); + ASSERT_BSONOBJ_EQ(srcOps[2], batch.ops[2].entry.getEntry().toBSON()); // Second batch: [insert, insert] batchFuture = batcher->getNextBatch(limits); batch = batchFuture.get(); ASSERT_EQUALS(2U, batch.ops.size()) << toString(batch); - ASSERT_BSONOBJ_EQ(srcOps[3], batch.ops[0].entry.toBSON()); - ASSERT_BSONOBJ_EQ(srcOps[4], batch.ops[1].entry.toBSON()); + ASSERT_BSONOBJ_EQ(srcOps[3], batch.ops[0].entry.getEntry().toBSON()); + ASSERT_BSONOBJ_EQ(srcOps[4], batch.ops[1].entry.getEntry().toBSON()); batcher->shutdown(); batcher->join(); } TEST_F(TenantOplogBatcherTest, GetNextApplierBatchChecksBatchLimitsForSizeOfOperations) { std::vector<BSONObj> srcOps; - srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "bar")).toBSON()); - srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar")).toBSON()); - srcOps.push_back(makeInsertOplogEntry(3, NamespaceString(dbName, "bar")).toBSON()); + srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "bar")).getEntry().toBSON()); + srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar")).getEntry().toBSON()); + srcOps.push_back(makeInsertOplogEntry(3, NamespaceString(dbName, "bar")).getEntry().toBSON()); _oplogBuffer.push(nullptr, srcOps.cbegin(), srcOps.cend()); // Set batch limits so that only the first two operations can fit into the first batch. @@ -288,21 +290,21 @@ TEST_F(TenantOplogBatcherTest, GetNextApplierBatchChecksBatchLimitsForSizeOfOper // First batch: [insert, insert] auto batch = batchFuture.get(); ASSERT_EQUALS(2U, batch.ops.size()) << toString(batch); - ASSERT_BSONOBJ_EQ(srcOps[0], batch.ops[0].entry.toBSON()); - ASSERT_BSONOBJ_EQ(srcOps[1], batch.ops[1].entry.toBSON()); + ASSERT_BSONOBJ_EQ(srcOps[0], batch.ops[0].entry.getEntry().toBSON()); + ASSERT_BSONOBJ_EQ(srcOps[1], batch.ops[1].entry.getEntry().toBSON()); // Second batch: [insert] batchFuture = batcher->getNextBatch(limits); batch = batchFuture.get(); ASSERT_EQUALS(1U, batch.ops.size()) << toString(batch); - ASSERT_BSONOBJ_EQ(srcOps[2], batch.ops[0].entry.toBSON()); + ASSERT_BSONOBJ_EQ(srcOps[2], batch.ops[0].entry.getEntry().toBSON()); batcher->shutdown(); batcher->join(); } TEST_F(TenantOplogBatcherTest, LargeTransactionProcessedIndividuallyAndExpanded) { std::vector<BSONObj> srcOps; - srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "bar")).toBSON()); + srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "bar")).getEntry().toBSON()); std::vector<OplogEntry> innerOps1; innerOps1.push_back(makeInsertOplogEntry(11, NamespaceString(dbName, "bar"))); innerOps1.push_back(makeInsertOplogEntry(12, NamespaceString(dbName, "bar"))); @@ -317,12 +319,12 @@ TEST_F(TenantOplogBatcherTest, LargeTransactionProcessedIndividuallyAndExpanded) std::vector<OplogEntry> multiEntryTransaction = makeMultiEntryTransactionOplogEntries( 2, dbName, /* prepared */ false, {innerOps1, innerOps2, innerOps3}); for (auto entry : multiEntryTransaction) { - srcOps.push_back(entry.toBSON()); + srcOps.push_back(entry.getEntry().toBSON()); } // Push one extra operation to ensure that the last oplog entry of a large transaction // is processed by itself. - srcOps.push_back(makeInsertOplogEntry(5, NamespaceString(dbName, "bar")).toBSON()); + srcOps.push_back(makeInsertOplogEntry(5, NamespaceString(dbName, "bar")).getEntry().toBSON()); _oplogBuffer.push(nullptr, srcOps.cbegin(), srcOps.cend()); @@ -333,11 +335,11 @@ TEST_F(TenantOplogBatcherTest, LargeTransactionProcessedIndividuallyAndExpanded) // First batch: [insert, applyops, applyops]. auto batch = batchFuture.get(); ASSERT_EQUALS(3U, batch.ops.size()) << toString(batch); - ASSERT_BSONOBJ_EQ(srcOps[0], batch.ops[0].entry.toBSON()); + ASSERT_BSONOBJ_EQ(srcOps[0], batch.ops[0].entry.getEntry().toBSON()); ASSERT_EQ(-1, batch.ops[0].expansionsEntry); - ASSERT_BSONOBJ_EQ(srcOps[1], batch.ops[1].entry.toBSON()); + ASSERT_BSONOBJ_EQ(srcOps[1], batch.ops[1].entry.getEntry().toBSON()); ASSERT_EQ(-1, batch.ops[1].expansionsEntry); - ASSERT_BSONOBJ_EQ(srcOps[2], batch.ops[2].entry.toBSON()); + ASSERT_BSONOBJ_EQ(srcOps[2], batch.ops[2].entry.getEntry().toBSON()); ASSERT_EQ(-1, batch.ops[2].expansionsEntry); // Partial applyops are not expanded. ASSERT(batch.expansions.empty()); @@ -349,7 +351,7 @@ TEST_F(TenantOplogBatcherTest, LargeTransactionProcessedIndividuallyAndExpanded) batch = batchFuture.get(); ASSERT_EQUALS(1U, batch.expansions.size()) << toString(batch); ASSERT_EQUALS(6U, batch.expansions[0].size()) << toString(batch); - ASSERT_BSONOBJ_EQ(srcOps[3], batch.ops[0].entry.toBSON()); + ASSERT_BSONOBJ_EQ(srcOps[3], batch.ops[0].entry.getEntry().toBSON()); ASSERT_EQ(0, batch.ops[0].expansionsEntry); ASSERT_BSONOBJ_EQ(innerOps1[0].getDurableReplOperation().toBSON(), @@ -371,7 +373,7 @@ TEST_F(TenantOplogBatcherTest, LargeTransactionProcessedIndividuallyAndExpanded) batch = batchFuture.get(); ASSERT_EQUALS(1U, batch.ops.size()) << toString(batch); ASSERT(batch.expansions.empty()); - ASSERT_BSONOBJ_EQ(srcOps[4], batch.ops[0].entry.toBSON()); + ASSERT_BSONOBJ_EQ(srcOps[4], batch.ops[0].entry.getEntry().toBSON()); batcher->shutdown(); batcher->join(); diff --git a/src/mongo/db/repl/transaction_oplog_application.cpp b/src/mongo/db/repl/transaction_oplog_application.cpp index a8350066cbe..c193b91fbbb 100644 --- a/src/mongo/db/repl/transaction_oplog_application.cpp +++ b/src/mongo/db/repl/transaction_oplog_application.cpp @@ -88,7 +88,7 @@ Status _applyOperationsForTransaction(OperationContext* opCtx, "Error applying operation in transaction. {error}- oplog entry: {oplogEntry}", "Error applying operation in transaction", "error"_attr = redact(ex), - "oplogEntry"_attr = redact(op.toBSON())); + "oplogEntry"_attr = redact(op.toBSONForLogging())); return exceptionToStatus(); } LOGV2_DEBUG(21846, @@ -99,7 +99,7 @@ Status _applyOperationsForTransaction(OperationContext* opCtx, "Encountered but ignoring error while applying operations for transaction " "because we are either in initial sync or recovering mode", "error"_attr = redact(ex), - "oplogEntry"_attr = redact(op.toBSON()), + "oplogEntry"_attr = redact(op.toBSONForLogging()), "oplogApplicationMode"_attr = repl::OplogApplication::modeToString(oplogApplicationMode)); } @@ -297,7 +297,7 @@ std::pair<std::vector<OplogEntry>, bool> _readTransactionOperationsFromOplogChai // The non-DurableReplOperation fields of the extracted transaction operations will match those // of the lastEntryInTxn. For a prepared commit, this will include the commit oplog entry's // 'ts' field, which is what we want. - auto lastEntryInTxnObj = lastEntryInTxn.toBSON(); + auto lastEntryInTxnObj = lastEntryInTxn.getEntry().toBSON(); // First retrieve and transform the ops from the oplog, which will be retrieved in reverse // order. diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index c3fb43073f7..e4102a89a0a 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -1092,7 +1092,7 @@ boost::optional<repl::OpTime> MigrationChunkClonerSourceLegacy::nextSessionMigra } auto newOpTime = result.oplog->getOpTime(); - auto oplogDoc = result.oplog->toBSON(); + auto oplogDoc = result.oplog->getEntry().toBSON(); // Use the builder size instead of accumulating the document sizes directly so that we // take into consideration the overhead of BSONArray indices. diff --git a/src/mongo/db/s/resharding/resharding_oplog_application.cpp b/src/mongo/db/s/resharding/resharding_oplog_application.cpp index 2752759ddc9..37ed803b97c 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_application.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_application.cpp @@ -212,12 +212,12 @@ Status ReshardingOplogApplicationRules::ReshardingOplogApplicationRules::applyCo return Status(ErrorCodes::OplogOperationUnsupported, str::stream() << "Received drop command for resharding source collection " - << redact(op.toBSON())); + << redact(op.toBSONForLogging())); } return Status(ErrorCodes::OplogOperationUnsupported, str::stream() << "Command not supported during resharding: " - << redact(op.toBSON())); + << redact(op.toBSONForLogging())); } // TODO SERVER-49907 implement applyOps write rule @@ -264,7 +264,8 @@ void ReshardingOplogApplicationRules::_applyInsert_inlock( // If the 'o' field does not have an _id, the oplog entry is corrupted. auto idField = oField["_id"]; uassert(ErrorCodes::NoSuchKey, - str::stream() << "Failed to apply insert due to missing _id: " << redact(op.toBSON()), + str::stream() << "Failed to apply insert due to missing _id: " + << redact(op.toBSONForLogging()), !idField.eoo()); BSONObj idQuery = idField.wrap(); @@ -360,7 +361,8 @@ void ReshardingOplogApplicationRules::_applyUpdate_inlock( // If the 'o2' field does not have an _id, the oplog entry is corrupted. auto idField = o2Field["_id"]; uassert(ErrorCodes::NoSuchKey, - str::stream() << "Failed to apply update due to missing _id: " << redact(op.toBSON()), + str::stream() << "Failed to apply update due to missing _id: " + << redact(op.toBSONForLogging()), !idField.eoo()); BSONObj idQuery = idField.wrap(); @@ -445,7 +447,8 @@ void ReshardingOplogApplicationRules::_applyDelete_inlock( // If the 'o' field does not have an _id, the oplog entry is corrupted. auto idField = oField["_id"]; uassert(ErrorCodes::NoSuchKey, - str::stream() << "Failed to apply delete due to missing _id: " << redact(op.toBSON()), + str::stream() << "Failed to apply delete due to missing _id: " + << redact(op.toBSONForLogging()), !idField.eoo()); BSONObj idQuery = idField.wrap(); diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp index a7d25a3ea5a..ad17443b6ca 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp @@ -114,7 +114,7 @@ Status insertOplogAndUpdateConfigForRetryable(OperationContext* opCtx, // TODO: handle pre/post image - auto rawOplogBSON = oplog.toBSON(); + auto rawOplogBSON = oplog.getEntry().toBSON(); auto noOpOplog = uassertStatusOK(repl::MutableOplogEntry::parse(rawOplogBSON)); noOpOplog.setObject2(rawOplogBSON); noOpOplog.setNss({}); @@ -407,25 +407,25 @@ Future<void> ReshardingOplogApplier::_applyBatch(OperationContext* opCtx) { } repl::OplogEntry convertToNoOpWithReshardingTag(const repl::OplogEntry& oplog) { - return repl::OplogEntry(oplog.getOpTime(), - oplog.getHash(), - repl::OpTypeEnum::kNoop, - oplog.getNss(), - boost::none /* uuid */, - oplog.getFromMigrate(), - oplog.getVersion(), - kReshardingOplogTag, - // Set the o2 field with the original oplog. - oplog.toBSON(), - oplog.getOperationSessionInfo(), - oplog.getUpsert(), - oplog.getWallClockTime(), - oplog.getStatementId(), - oplog.getPrevWriteOpTimeInTransaction(), - oplog.getPreImageOpTime(), - oplog.getPostImageOpTime(), - oplog.getDestinedRecipient(), - oplog.get_id()); + return {repl::DurableOplogEntry(oplog.getOpTime(), + oplog.getHash(), + repl::OpTypeEnum::kNoop, + oplog.getNss(), + boost::none /* uuid */, + oplog.getFromMigrate(), + oplog.getVersion(), + kReshardingOplogTag, + // Set the o2 field with the original oplog. + oplog.getEntry().toBSON(), + oplog.getOperationSessionInfo(), + oplog.getUpsert(), + oplog.getWallClockTime(), + oplog.getStatementId(), + oplog.getPrevWriteOpTimeInTransaction(), + oplog.getPreImageOpTime(), + oplog.getPostImageOpTime(), + oplog.getDestinedRecipient(), + oplog.get_id())}; } void addDerivedOpsToWriterVector(std::vector<std::vector<const repl::OplogEntry*>>* writerVectors, @@ -434,7 +434,7 @@ void addDerivedOpsToWriterVector(std::vector<std::vector<const repl::OplogEntry* invariant(op.getObject().woCompare(kReshardingOplogTag) == 0); uassert(4990403, "expected resharding derived oplog to have session id: {}"_format( - op.toBSON().toString()), + redact(op.toBSONForLogging()).toString()), op.getSessionId()); LogicalSessionIdHash hasher; @@ -478,8 +478,9 @@ std::vector<std::vector<const repl::OplogEntry*>> ReshardingOplogApplier::_fillW } else { uasserted(4990401, str::stream() << "retryable oplog applier for " << _sourceId.toBSON() - << " encountered out of order txnNum, saw " << op.toBSON() - << " after " << retryableOpList.ops.front()->toBSON()); + << " encountered out of order txnNum, saw " + << redact(op.toBSONForLogging()) << " after " + << redact(retryableOpList.ops.front()->toBSONForLogging())); } } } @@ -537,7 +538,7 @@ Status ReshardingOplogApplier::_applyOplogEntryOrGroupedInserts( if (opType == repl::OpTypeEnum::kNoop) { return Status::OK(); } else if (resharding::gUseReshardingOplogApplicationRules) { - if (repl::OplogEntry::isCrudOpType(opType)) { + if (repl::DurableOplogEntry::isCrudOpType(opType)) { return _applicationRules.applyOperation(opCtx, entryOrGroupedInserts); } else if (opType == repl::OpTypeEnum::kCommand) { return _applicationRules.applyCommand(opCtx, entryOrGroupedInserts); @@ -557,31 +558,32 @@ Status ReshardingOplogApplier::_applyOplogEntryOrGroupedInserts( void ReshardingOplogApplier::_preProcessAndPushOpsToBuffer(repl::OplogEntry oplog) { uassert(5012002, str::stream() << "trying to apply oplog not belonging to ns " << _nsBeingResharded - << " during resharding: " << oplog.toBSON(), + << " during resharding: " << redact(oplog.toBSONForLogging()), _nsBeingResharded == oplog.getNss()); uassert(5012005, str::stream() << "trying to apply oplog with a different UUID from " - << _uuidBeingResharded << " during resharding: " << oplog.toBSON(), + << _uuidBeingResharded + << " during resharding: " << redact(oplog.toBSONForLogging()), _uuidBeingResharded == oplog.getUuid()); - auto newOplog = repl::OplogEntry(oplog.getOpTime(), - oplog.getHash(), - oplog.getOpType(), - _outputNs, - boost::none /* uuid */, - oplog.getFromMigrate(), - oplog.getVersion(), - oplog.getObject(), - oplog.getObject2(), - oplog.getOperationSessionInfo(), - oplog.getUpsert(), - oplog.getWallClockTime(), - oplog.getStatementId(), - oplog.getPrevWriteOpTimeInTransaction(), - oplog.getPreImageOpTime(), - oplog.getPostImageOpTime(), - oplog.getDestinedRecipient(), - oplog.get_id()); + repl::OplogEntry newOplog(repl::DurableOplogEntry(oplog.getOpTime(), + oplog.getHash(), + oplog.getOpType(), + _outputNs, + boost::none /* uuid */, + oplog.getFromMigrate(), + oplog.getVersion(), + oplog.getObject(), + oplog.getObject2(), + oplog.getOperationSessionInfo(), + oplog.getUpsert(), + oplog.getWallClockTime(), + oplog.getStatementId(), + oplog.getPrevWriteOpTimeInTransaction(), + oplog.getPreImageOpTime(), + oplog.getPostImageOpTime(), + oplog.getDestinedRecipient(), + oplog.get_id())); _currentBatchToApply.push_back(std::move(newOplog)); } diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp index 2c16798e975..13987c47b0d 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp @@ -199,24 +199,24 @@ public: const OperationSessionInfo& sessionInfo, const boost::optional<StmtId>& statementId) { ReshardingDonorOplogId id(opTime.getTimestamp(), opTime.getTimestamp()); - return repl::OplogEntry(opTime, - boost::none /* hash */, - opType, - kCrudNs, - kCrudUUID, - false /* fromMigrate */, - 0 /* version */, - obj1, - obj2, - sessionInfo, - boost::none /* upsert */, - {} /* date */, - statementId, - boost::none /* prevWrite */, - boost::none /* preImage */, - boost::none /* postImage */, - kMyShardId, - Value(id.toBSON())); + return {repl::DurableOplogEntry(opTime, + boost::none /* hash */, + opType, + kCrudNs, + kCrudUUID, + false /* fromMigrate */, + 0 /* version */, + obj1, + obj2, + sessionInfo, + boost::none /* upsert */, + {} /* date */, + statementId, + boost::none /* prevWrite */, + boost::none /* preImage */, + boost::none /* postImage */, + kMyShardId, + Value(id.toBSON()))}; } void setReshardingOplogApplicationServerParameterTrue() { @@ -1833,7 +1833,7 @@ public: } auto operation = - repl::OplogEntry::makeInsertOperation(crudNs(), crudUUID(), BSON("x" << 20)); + repl::DurableOplogEntry::makeInsertOperation(crudNs(), crudUUID(), BSON("x" << 20)); txnParticipant.addTransactionOperation(innerOpCtx, operation); wuow.commit(); diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp index 748e641322a..28cac823d42 100644 --- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp @@ -91,24 +91,25 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime, boost::optional<StmtId> stmtId, boost::optional<repl::OpTime> preImageOpTime = boost::none, boost::optional<repl::OpTime> postImageOpTime = boost::none) { - return repl::OplogEntry(opTime, // optime - 0, // hash - opType, // opType - kNs, // namespace - boost::none, // uuid - boost::none, // fromMigrate - 0, // version - object, // o - object2, // o2 - sessionInfo, // sessionInfo - boost::none, // isUpsert - wallClockTime, // wall clock time - stmtId, // statement id - boost::none, // optime of previous write within same transaction - preImageOpTime, // pre-image optime - postImageOpTime, // post-image optime - boost::none, // ShardId of resharding recipient - boost::none); // _id + return { + repl::DurableOplogEntry(opTime, // optime + 0, // hash + opType, // opType + kNs, // namespace + boost::none, // uuid + boost::none, // fromMigrate + 0, // version + object, // o + object2, // o2 + sessionInfo, // sessionInfo + boost::none, // isUpsert + wallClockTime, // wall clock time + stmtId, // statement id + boost::none, // optime of previous write within same transaction + preImageOpTime, // pre-image optime + postImageOpTime, // post-image optime + boost::none, // ShardId of resharding recipient + boost::none)}; // _id } repl::OplogEntry extractInnerOplog(const repl::OplogEntry& oplog) { @@ -149,7 +150,7 @@ public: BSONArrayBuilder arrBuilder(builder.subarrayStart("oplog")); for (const auto& oplog : oplogList) { - arrBuilder.append(oplog.toBSON()); + arrBuilder.append(oplog.getEntry().toBSON()); } arrBuilder.doneFast(); @@ -614,21 +615,21 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldNotNestAlreadyNestedOplog) Date_t(), // wall clock time 45); // statement id - auto oplog1 = makeOplogEntry(OpTime(Timestamp(1100, 2), 1), // optime - OpTypeEnum::kNoop, // op type - BSONObj(), // o - origInnerOplog1.toBSON(), // o2 - sessionInfo, // session info - Date_t::now(), // wall clock time - 23); // statement id - - auto oplog2 = makeOplogEntry(OpTime(Timestamp(1080, 2), 1), // optime - OpTypeEnum::kNoop, // op type - BSONObj(), // o - origInnerOplog2.toBSON(), // o2 - sessionInfo, // session info - Date_t::now(), // wall clock time - 45); // statement id + auto oplog1 = makeOplogEntry(OpTime(Timestamp(1100, 2), 1), // optime + OpTypeEnum::kNoop, // op type + BSONObj(), // o + origInnerOplog1.getEntry().toBSON(), // o2 + sessionInfo, // session info + Date_t::now(), // wall clock time + 23); // statement id + + auto oplog2 = makeOplogEntry(OpTime(Timestamp(1080, 2), 1), // optime + OpTypeEnum::kNoop, // op type + BSONObj(), // o + origInnerOplog2.getEntry().toBSON(), // o2 + sessionInfo, // session info + Date_t::now(), // wall clock time + 45); // statement id returnOplog({oplog1, oplog2}); diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp index 565a42cbaa9..7708ad1c963 100644 --- a/src/mongo/db/s/session_catalog_migration_source.cpp +++ b/src/mongo/db/s/session_catalog_migration_source.cpp @@ -85,24 +85,25 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime, const OperationSessionInfo& sessionInfo, Date_t wallClockTime, const boost::optional<StmtId>& statementId) { - return repl::OplogEntry(opTime, // optime - hash, // hash - opType, // op type - {}, // namespace - boost::none, // uuid - boost::none, // fromMigrate - repl::OplogEntry::kOplogVersion, // version - oField, // o - o2Field, // o2 - sessionInfo, // session info - boost::none, // upsert - wallClockTime, // wall clock time - statementId, // statement id - boost::none, // optime of previous write within same transaction - boost::none, // pre-image optime - boost::none, // post-image optime - boost::none, // ShardId of resharding recipient - boost::none); // _id + return { + repl::DurableOplogEntry(opTime, // optime + hash, // hash + opType, // op type + {}, // namespace + boost::none, // uuid + boost::none, // fromMigrate + repl::OplogEntry::kOplogVersion, // version + oField, // o + o2Field, // o2 + sessionInfo, // session info + boost::none, // upsert + wallClockTime, // wall clock time + statementId, // statement id + boost::none, // optime of previous write within same transaction + boost::none, // pre-image optime + boost::none, // post-image optime + boost::none, // ShardId of resharding recipient + boost::none)}; // _id } /** diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp index f8ef41a25d9..349dfac2015 100644 --- a/src/mongo/db/s/session_catalog_migration_source_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp @@ -64,7 +64,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime, repl::OpTime prevWriteOpTimeInTransaction, boost::optional<repl::OpTime> preImageOpTime, boost::optional<repl::OpTime> postImageOpTime) { - return repl::OplogEntry( + return {repl::DurableOplogEntry( opTime, // optime 0, // hash opType, // opType @@ -82,7 +82,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime, preImageOpTime, // pre-image optime postImageOpTime, // post-image optime boost::none, // ShardId of resharding recipient - boost::none); // _id + boost::none)}; // _id } repl::OplogEntry makeOplogEntry(repl::OpTime opTime, @@ -150,7 +150,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithTwoWrites) { auto nextOplogResult = migrationSource.getLastFetchedOplog(); ASSERT_FALSE(nextOplogResult.shouldWaitForMajority); // Cannot compare directly because of SERVER-31356 - ASSERT_BSONOBJ_EQ(entry2.toBSON(), nextOplogResult.oplog->toBSON()); + ASSERT_BSONOBJ_EQ(entry2.getEntry().toBSON(), nextOplogResult.oplog->getEntry().toBSON()); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); } @@ -159,7 +159,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithTwoWrites) { auto nextOplogResult = migrationSource.getLastFetchedOplog(); ASSERT_FALSE(nextOplogResult.shouldWaitForMajority); // Cannot compare directly because of SERVER-31356 - ASSERT_BSONOBJ_EQ(entry1.toBSON(), nextOplogResult.oplog->toBSON()); + ASSERT_BSONOBJ_EQ(entry1.getEntry().toBSON(), nextOplogResult.oplog->getEntry().toBSON()); } ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); @@ -237,7 +237,8 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) { auto nextOplogResult = migrationSource.getLastFetchedOplog(); ASSERT_FALSE(nextOplogResult.shouldWaitForMajority); // Cannot compare directly because of SERVER-31356 - ASSERT_BSONOBJ_EQ(firstExpectedOplog.toBSON(), nextOplogResult.oplog->toBSON()); + ASSERT_BSONOBJ_EQ(firstExpectedOplog.getEntry().toBSON(), + nextOplogResult.oplog->getEntry().toBSON()); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); } @@ -245,7 +246,8 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) { ASSERT_TRUE(migrationSource.hasMoreOplog()); auto nextOplogResult = migrationSource.getLastFetchedOplog(); ASSERT_FALSE(nextOplogResult.shouldWaitForMajority); - ASSERT_BSONOBJ_EQ(secondExpectedOplog.toBSON(), nextOplogResult.oplog->toBSON()); + ASSERT_BSONOBJ_EQ(secondExpectedOplog.getEntry().toBSON(), + nextOplogResult.oplog->getEntry().toBSON()); } }; @@ -333,7 +335,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd auto nextOplogResult = migrationSource.getLastFetchedOplog(); ASSERT_FALSE(nextOplogResult.shouldWaitForMajority); // Cannot compare directly because of SERVER-31356 - ASSERT_BSONOBJ_EQ(oplog.toBSON(), nextOplogResult.oplog->toBSON()); + ASSERT_BSONOBJ_EQ(oplog.getEntry().toBSON(), nextOplogResult.oplog->getEntry().toBSON()); migrationSource.fetchNextOplog(opCtx()); } @@ -391,7 +393,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OplogWithOtherNsShouldBeIgnored) { auto nextOplogResult = migrationSource.getLastFetchedOplog(); ASSERT_FALSE(nextOplogResult.shouldWaitForMajority); // Cannot compare directly because of SERVER-31356 - ASSERT_BSONOBJ_EQ(entry1.toBSON(), nextOplogResult.oplog->toBSON()); + ASSERT_BSONOBJ_EQ(entry1.getEntry().toBSON(), nextOplogResult.oplog->getEntry().toBSON()); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); ASSERT_FALSE(migrationSource.hasMoreOplog()); @@ -452,7 +454,7 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) { auto nextOplogResult = migrationSource.getLastFetchedOplog(); ASSERT_FALSE(nextOplogResult.shouldWaitForMajority); // Cannot compare directly because of SERVER-31356 - ASSERT_BSONOBJ_EQ(entry1.toBSON(), nextOplogResult.oplog->toBSON()); + ASSERT_BSONOBJ_EQ(entry1.getEntry().toBSON(), nextOplogResult.oplog->getEntry().toBSON()); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); } @@ -460,7 +462,7 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) { ASSERT_TRUE(migrationSource.hasMoreOplog()); auto nextOplogResult = migrationSource.getLastFetchedOplog(); ASSERT_TRUE(nextOplogResult.shouldWaitForMajority); - ASSERT_BSONOBJ_EQ(entry2.toBSON(), nextOplogResult.oplog->toBSON()); + ASSERT_BSONOBJ_EQ(entry2.getEntry().toBSON(), nextOplogResult.oplog->getEntry().toBSON()); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); } @@ -468,7 +470,7 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) { ASSERT_TRUE(migrationSource.hasMoreOplog()); auto nextOplogResult = migrationSource.getLastFetchedOplog(); ASSERT_TRUE(nextOplogResult.shouldWaitForMajority); - ASSERT_BSONOBJ_EQ(entry3.toBSON(), nextOplogResult.oplog->toBSON()); + ASSERT_BSONOBJ_EQ(entry3.getEntry().toBSON(), nextOplogResult.oplog->getEntry().toBSON()); } ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); @@ -509,7 +511,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer auto nextOplogResult = migrationSource.getLastFetchedOplog(); ASSERT_TRUE(nextOplogResult.shouldWaitForMajority); // Cannot compare directly because of SERVER-31356 - ASSERT_BSONOBJ_EQ(entry.toBSON(), nextOplogResult.oplog->toBSON()); + ASSERT_BSONOBJ_EQ(entry.getEntry().toBSON(), nextOplogResult.oplog->getEntry().toBSON()); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); ASSERT_FALSE(migrationSource.hasMoreOplog()); @@ -533,7 +535,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); auto nextOplogResult = migrationSource.getLastFetchedOplog(); ASSERT_TRUE(nextOplogResult.shouldWaitForMajority); - ASSERT_BSONOBJ_EQ(entry.toBSON(), nextOplogResult.oplog->toBSON()); + ASSERT_BSONOBJ_EQ(entry.getEntry().toBSON(), nextOplogResult.oplog->getEntry().toBSON()); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); ASSERT_FALSE(migrationSource.hasMoreOplog()); @@ -557,7 +559,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); auto nextOplogResult = migrationSource.getLastFetchedOplog(); ASSERT_TRUE(nextOplogResult.shouldWaitForMajority); - ASSERT_BSONOBJ_EQ(entry.toBSON(), nextOplogResult.oplog->toBSON()); + ASSERT_BSONOBJ_EQ(entry.getEntry().toBSON(), nextOplogResult.oplog->getEntry().toBSON()); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); ASSERT_FALSE(migrationSource.hasMoreOplog()); @@ -594,7 +596,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ReturnsDeadEndSentinelForIncompleteHis auto nextOplogResult = migrationSource.getLastFetchedOplog(); ASSERT_FALSE(nextOplogResult.shouldWaitForMajority); // Cannot compare directly because of SERVER-31356 - ASSERT_BSONOBJ_EQ(entry.toBSON(), nextOplogResult.oplog->toBSON()); + ASSERT_BSONOBJ_EQ(entry.getEntry().toBSON(), nextOplogResult.oplog->getEntry().toBSON()); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); } @@ -651,7 +653,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertWhenRollbackDetected) { auto nextOplogResult = migrationSource.getLastFetchedOplog(); ASSERT_FALSE(nextOplogResult.shouldWaitForMajority); // Cannot compare directly because of SERVER-31356 - ASSERT_BSONOBJ_EQ(entry.toBSON(), nextOplogResult.oplog->toBSON()); + ASSERT_BSONOBJ_EQ(entry.getEntry().toBSON(), nextOplogResult.oplog->getEntry().toBSON()); } ASSERT_OK(repl::ReplicationProcess::get(opCtx())->incrementRollbackID(opCtx())); @@ -792,7 +794,8 @@ TEST_F(SessionCatalogMigrationSourceTest, auto nextOplogResult = migrationSource.getLastFetchedOplog(); ASSERT_FALSE(nextOplogResult.shouldWaitForMajority); // Cannot compare directly because of SERVER-31356 - ASSERT_BSONOBJ_EQ(insertOplog.toBSON(), nextOplogResult.oplog->toBSON()); + ASSERT_BSONOBJ_EQ(insertOplog.getEntry().toBSON(), + nextOplogResult.oplog->getEntry().toBSON()); }; // Function to verify the oplog entry corresponding to the transaction. @@ -935,7 +938,7 @@ TEST_F(SessionCatalogMigrationSourceTest, auto nextOplogResult = migrationSource.getLastFetchedOplog(); ASSERT_FALSE(nextOplogResult.shouldWaitForMajority); // Cannot compare directly because of SERVER-31356 - ASSERT_BSONOBJ_EQ(oplog.toBSON(), nextOplogResult.oplog->toBSON()); + ASSERT_BSONOBJ_EQ(oplog.getEntry().toBSON(), nextOplogResult.oplog->getEntry().toBSON()); migrationSource.fetchNextOplog(opCtx()); } @@ -1093,7 +1096,7 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWritesContainingWrite auto nextOplogResult = migrationSource.getLastFetchedOplog(); ASSERT_FALSE(nextOplogResult.shouldWaitForMajority); // Cannot compare directly because of SERVER-31356 - ASSERT_BSONOBJ_EQ(oplog.toBSON(), nextOplogResult.oplog->toBSON()); + ASSERT_BSONOBJ_EQ(oplog.getEntry().toBSON(), nextOplogResult.oplog->getEntry().toBSON()); migrationSource.fetchNextOplog(opCtx()); } diff --git a/src/mongo/db/transaction_history_iterator_test.cpp b/src/mongo/db/transaction_history_iterator_test.cpp index f165b216418..f6d97a50229 100644 --- a/src/mongo/db/transaction_history_iterator_test.cpp +++ b/src/mongo/db/transaction_history_iterator_test.cpp @@ -60,7 +60,7 @@ namespace { repl::OplogEntry makeOplogEntry(repl::OpTime opTime, BSONObj docToInsert, boost::optional<repl::OpTime> prevWriteOpTimeInTransaction) { - return repl::OplogEntry( + return {repl::DurableOplogEntry( opTime, // optime 0, // hash repl::OpTypeEnum::kInsert, // opType @@ -78,7 +78,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime, boost::none, // pre-image optime boost::none, // post-image optime boost::none, // ShardId of resharding recipient - boost::none); // _id + boost::none)}; // _id } } // namespace diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index f7b2423c48a..1d43b5c1679 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -1253,7 +1253,8 @@ void TransactionParticipant::Participant::addTransactionOperation( invariant(p().autoCommit && !*p().autoCommit && o().activeTxnNumber != kUninitializedTxnNumber); invariant(opCtx->lockState()->inAWriteUnitOfWork()); p().transactionOperations.push_back(operation); - p().transactionOperationBytes += repl::OplogEntry::getDurableReplOperationSize(operation); + p().transactionOperationBytes += + repl::DurableOplogEntry::getDurableReplOperationSize(operation); if (!operation.getPreImage().isEmpty()) { p().transactionOperationBytes += operation.getPreImage().objsize(); ++p().numberOfPreImagesToWrite; diff --git a/src/mongo/db/transaction_participant_retryable_writes_test.cpp b/src/mongo/db/transaction_participant_retryable_writes_test.cpp index 91f7ef81605..28335181dcf 100644 --- a/src/mongo/db/transaction_participant_retryable_writes_test.cpp +++ b/src/mongo/db/transaction_participant_retryable_writes_test.cpp @@ -67,7 +67,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime, Date_t wallClockTime, boost::optional<StmtId> stmtId, boost::optional<repl::OpTime> prevWriteOpTimeInTransaction) { - return repl::OplogEntry( + return {repl::DurableOplogEntry( opTime, // optime 0, // hash opType, // opType @@ -85,7 +85,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime, boost::none, // pre-image optime boost::none, // post-image optime boost::none, // ShardId of resharding recipient - boost::none); // _id + boost::none)}; // _id } class OpObserverMock : public OpObserverNoop { diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp index 28b57e27dac..f5147ddfb76 100644 --- a/src/mongo/db/transaction_participant_test.cpp +++ b/src/mongo/db/transaction_participant_test.cpp @@ -76,7 +76,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime, Date_t wallClockTime, boost::optional<StmtId> stmtId, boost::optional<repl::OpTime> prevWriteOpTimeInTransaction) { - return repl::OplogEntry( + return repl::DurableOplogEntry( opTime, // optime 0, // hash opType, // opType @@ -476,7 +476,8 @@ TEST_F(TxnParticipantTest, SameTransactionPreservesStoredStatements) { // We must have stashed transaction resources to re-open the transaction. txnParticipant.unstashTransactionResources(opCtx(), "insert"); - auto operation = repl::OplogEntry::makeInsertOperation(kNss, _uuid, BSON("TestValue" << 0)); + auto operation = + repl::DurableOplogEntry::makeInsertOperation(kNss, _uuid, BSON("TestValue" << 0)); txnParticipant.addTransactionOperation(opCtx(), operation); ASSERT_BSONOBJ_EQ(operation.toBSON(), txnParticipant.getTransactionOperationsForTest()[0].toBSON()); @@ -498,7 +499,8 @@ TEST_F(TxnParticipantTest, AbortClearsStoredStatements) { auto sessionCheckout = checkOutSession(); auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant.unstashTransactionResources(opCtx(), "insert"); - auto operation = repl::OplogEntry::makeInsertOperation(kNss, _uuid, BSON("TestValue" << 0)); + auto operation = + repl::DurableOplogEntry::makeInsertOperation(kNss, _uuid, BSON("TestValue" << 0)); txnParticipant.addTransactionOperation(opCtx(), operation); ASSERT_BSONOBJ_EQ(operation.toBSON(), txnParticipant.getTransactionOperationsForTest()[0].toBSON()); @@ -585,8 +587,8 @@ TEST_F(TxnParticipantTest, PrepareFailsOnTemporaryCollection) { txnParticipant.unstashTransactionResources(opCtx(), "insert"); - auto operation = - repl::OplogEntry::makeInsertOperation(tempCollNss, tempCollUUID, BSON("TestValue" << 0)); + auto operation = repl::DurableOplogEntry::makeInsertOperation( + tempCollNss, tempCollUUID, BSON("TestValue" << 0)); txnParticipant.addTransactionOperation(opCtx(), operation); ASSERT_THROWS_CODE(txnParticipant.prepareTransaction(opCtx(), {}), @@ -1235,7 +1237,8 @@ TEST_F(TxnParticipantTest, CannotInsertInPreparedTransaction) { auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant.unstashTransactionResources(opCtx(), "insert"); - auto operation = repl::OplogEntry::makeInsertOperation(kNss, _uuid, BSON("TestValue" << 0)); + auto operation = + repl::DurableOplogEntry::makeInsertOperation(kNss, _uuid, BSON("TestValue" << 0)); txnParticipant.addTransactionOperation(opCtx(), operation); txnParticipant.prepareTransaction(opCtx(), {}); @@ -1273,7 +1276,7 @@ TEST_F(TxnParticipantTest, TransactionExceedsSizeParameter) { // Two 1MB operations should succeed; three 1MB operations should fail. constexpr size_t kBigDataSize = 1 * 1024 * 1024; std::unique_ptr<uint8_t[]> bigData(new uint8_t[kBigDataSize]()); - auto operation = repl::OplogEntry::makeInsertOperation( + auto operation = repl::DurableOplogEntry::makeInsertOperation( kNss, _uuid, BSON("_id" << 0 << "data" << BSONBinData(bigData.get(), kBigDataSize, BinDataGeneral))); @@ -1420,7 +1423,8 @@ protected: ASSERT(txnParticipant.transactionIsOpen()); txnParticipant.unstashTransactionResources(opCtx(), "insert"); - auto operation = repl::OplogEntry::makeInsertOperation(kNss, _uuid, BSON("TestValue" << 0)); + auto operation = + repl::DurableOplogEntry::makeInsertOperation(kNss, _uuid, BSON("TestValue" << 0)); txnParticipant.addTransactionOperation(opCtx(), operation); txnParticipant.prepareTransaction(opCtx(), {}); @@ -3581,7 +3585,8 @@ TEST_F(TransactionsMetricsTest, LogTransactionInfoAfterSlowCommit) { setupAdditiveMetrics(metricValue, opCtx()); txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction"); - auto operation = repl::OplogEntry::makeInsertOperation(kNss, _uuid, BSON("TestValue" << 0)); + auto operation = + repl::DurableOplogEntry::makeInsertOperation(kNss, _uuid, BSON("TestValue" << 0)); txnParticipant.addTransactionOperation(opCtx(), operation); const auto originalSlowMS = serverGlobalParams.slowMS; @@ -4009,7 +4014,8 @@ TEST_F(TxnParticipantTest, RollbackResetsInMemoryStateOfPreparedTransaction) { // Perform an insert as a part of a transaction so that we have a transaction operation. txnParticipant.unstashTransactionResources(opCtx(), "insert"); - auto operation = repl::OplogEntry::makeInsertOperation(kNss, _uuid, BSON("TestValue" << 0)); + auto operation = + repl::DurableOplogEntry::makeInsertOperation(kNss, _uuid, BSON("TestValue" << 0)); txnParticipant.addTransactionOperation(opCtx(), operation); ASSERT_BSONOBJ_EQ(operation.toBSON(), txnParticipant.getTransactionOperationsForTest()[0].toBSON()); @@ -4192,7 +4198,8 @@ TEST_F(TxnParticipantTest, ASSERT_TRUE(txnParticipant.getResponseMetadata().getReadOnly()); // Simulate an insert. - auto operation = repl::OplogEntry::makeInsertOperation(kNss, _uuid, BSON("TestValue" << 0)); + auto operation = + repl::DurableOplogEntry::makeInsertOperation(kNss, _uuid, BSON("TestValue" << 0)); txnParticipant.addTransactionOperation(opCtx(), operation); ASSERT_FALSE(txnParticipant.getResponseMetadata().getReadOnly()); } diff --git a/src/mongo/dbtests/repltests.cpp b/src/mongo/dbtests/repltests.cpp index 6266eeb7336..b1c49cd5195 100644 --- a/src/mongo/dbtests/repltests.cpp +++ b/src/mongo/dbtests/repltests.cpp @@ -65,24 +65,25 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime, NamespaceString nss, BSONObj object, boost::optional<BSONObj> object2) { - return repl::OplogEntry(opTime, // optime - 0, // hash - opType, // opType - nss, // namespace - boost::none, // uuid - boost::none, // fromMigrate - OplogEntry::kOplogVersion, // version - object, // o - object2, // o2 - {}, // sessionInfo - boost::none, // upsert - Date_t(), // wall clock time - boost::none, // statement id - boost::none, // optime of previous write within same transaction - boost::none, // pre-image optime - boost::none, // post-image optime - boost::none, // ShardId of resharding recipient - boost::none); // _id + return { + repl::DurableOplogEntry(opTime, // optime + 0, // hash + opType, // opType + nss, // namespace + boost::none, // uuid + boost::none, // fromMigrate + OplogEntry::kOplogVersion, // version + object, // o + object2, // o2 + {}, // sessionInfo + boost::none, // upsert + Date_t(), // wall clock time + boost::none, // statement id + boost::none, // optime of previous write within same transaction + boost::none, // pre-image optime + boost::none, // post-image optime + boost::none, // ShardId of resharding recipient + boost::none)}; // _id } BSONObj f(const char* s) { diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp index 8df1072c25e..7cdc62217cc 100644 --- a/src/mongo/dbtests/storage_timestamp_tests.cpp +++ b/src/mongo/dbtests/storage_timestamp_tests.cpp @@ -1262,11 +1262,11 @@ public: ASSERT(Helpers::getLast( _opCtx, NamespaceString::kRsOplogNamespace.toString().c_str(), result)); repl::OplogEntry op(result); - ASSERT(op.getOpType() == repl::OpTypeEnum::kCommand) << op.toBSON(); + ASSERT(op.getOpType() == repl::OpTypeEnum::kCommand) << op.toBSONForLogging(); // The next logOp() call will get 'futureTs', which will be the timestamp at which we do // the write. Thus we expect the write to appear at 'futureTs' and not before. - ASSERT_EQ(op.getTimestamp(), futureTs) << op.toBSON(); - ASSERT_EQ(op.getNss().ns(), nss.getCommandNS().ns()) << op.toBSON(); + ASSERT_EQ(op.getTimestamp(), futureTs) << op.toBSONForLogging(); + ASSERT_EQ(op.getNss().ns(), nss.getCommandNS().ns()) << op.toBSONForLogging(); ASSERT_BSONOBJ_EQ(op.getObject(), BSON("create" << nss.coll())); assertNamespaceInIdents(nss, pastTs, false); @@ -3365,7 +3365,7 @@ public: // The logOp() call for createCollection should have timestamp 'futureTs', which will also // be the timestamp at which we do the write which creates the collection. Thus we expect // the collection to appear at 'futureTs' and not before. - ASSERT_EQ(op.getTimestamp(), futureTs) << op.toBSON(); + ASSERT_EQ(op.getTimestamp(), futureTs) << op.toBSONForLogging(); // The index build emits three oplog entries. Timestamp indexStartTs; |