diff options
-rw-r--r-- | src/mongo/db/repl/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/applier.cpp | 24 | ||||
-rw-r--r-- | src/mongo/db/repl/applier.h | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/applier_test.cpp | 113 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator_test.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_entry.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_entry.h | 9 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 2 |
10 files changed, 101 insertions, 83 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 5d38684ffdf..3c99b2d5f4c 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -673,6 +673,7 @@ env.Library( target='applier', source=[ 'applier.cpp', + 'oplog_entry.cpp', ], LIBDEPS=[ 'replication_executor', diff --git a/src/mongo/db/repl/applier.cpp b/src/mongo/db/repl/applier.cpp index 9000e0ebf65..ef7e37debde 100644 --- a/src/mongo/db/repl/applier.cpp +++ b/src/mongo/db/repl/applier.cpp @@ -52,11 +52,11 @@ Applier::Applier(ReplicationExecutor* executor, uassert(ErrorCodes::BadValue, "null replication executor", executor); uassert(ErrorCodes::BadValue, "empty list of operations", !operations.empty()); uassert(ErrorCodes::FailedToParse, - str::stream() << "last operation missing 'ts' field: " << operations.back(), - operations.back().hasField("ts")); + str::stream() << "last operation missing 'ts' field: " << operations.back().raw, + operations.back().raw.hasField("ts")); uassert(ErrorCodes::TypeMismatch, - str::stream() << "'ts' in last operation not a timestamp: " << operations.back(), - BSONType::bsonTimestamp == operations.back().getField("ts").type()); + str::stream() << "'ts' in last operation not a timestamp: " << operations.back().raw, + BSONType::bsonTimestamp == operations.back().raw.getField("ts").type()); uassert(ErrorCodes::BadValue, "apply operation function cannot be null", applyOperation); uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion); } @@ -152,7 +152,7 @@ void Applier::_callback(const ReplicationExecutor::CallbackArgs& cbd) { return; } } - _finishCallback(_operations.back().getField("ts").timestamp(), Operations()); + _finishCallback(_operations.back().raw.getField("ts").timestamp(), Operations()); } void Applier::_finishCallback(const StatusWith<Timestamp>& result, const Operations& operations) { @@ -184,16 +184,16 @@ StatusWith<std::pair<std::unique_ptr<Applier>, Applier::Operations>> applyUntilA const PauseDataReplicatorFn& pauseDataReplicator, const Applier::CallbackFn& onCompletion) { try { - auto comp = [](const BSONObj& left, const BSONObj& right) { + auto comp = [](const OplogEntry& left, const OplogEntry& right) { uassert(ErrorCodes::FailedToParse, - str::stream() << "Operation missing 'ts' field': " << left, - left.hasField("ts")); + str::stream() << "Operation missing 'ts' field': " << left.raw, + left.raw.hasField("ts")); uassert(ErrorCodes::FailedToParse, - str::stream() << "Operation missing 'ts' field': " << right, - right.hasField("ts")); - return left["ts"].timestamp() < right["ts"].timestamp(); + str::stream() << "Operation missing 'ts' field': " << right.raw, + right.raw.hasField("ts")); + return left.raw["ts"].timestamp() < right.raw["ts"].timestamp(); }; - auto wrapped = BSON("ts" << lastTimestampToApply); + auto wrapped = OplogEntry(BSON("ts" << lastTimestampToApply)); auto i = std::lower_bound(operations.cbegin(), operations.cend(), wrapped, comp); bool found = i != operations.cend() && !comp(wrapped, *i); auto j = found ? i + 1 : i; diff --git a/src/mongo/db/repl/applier.h b/src/mongo/db/repl/applier.h index a8e48c0edb9..30f20a94024 100644 --- a/src/mongo/db/repl/applier.h +++ b/src/mongo/db/repl/applier.h @@ -37,6 +37,7 @@ #include "mongo/base/status.h" #include "mongo/base/status_with.h" #include "mongo/db/jsobj.h" +#include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/replication_executor.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/functional.h" @@ -52,7 +53,7 @@ public: /** * Operations sorted by timestamp in ascending order. */ - using Operations = std::vector<BSONObj>; + using Operations = std::vector<OplogEntry>; /** * Callback function to report final status of applying operations along with @@ -67,7 +68,7 @@ public: * would have the same outcome as calling SyncTail::syncApply() ('convertUpdatesToUpserts' * value will be embedded in the function implementation). */ - using ApplyOperationFn = stdx::function<Status(OperationContext*, const BSONObj&)>; + using ApplyOperationFn = stdx::function<Status(OperationContext*, const OplogEntry&)>; /** * Creates Applier in inactive state. diff --git a/src/mongo/db/repl/applier_test.cpp b/src/mongo/db/repl/applier_test.cpp index 63522b8f403..2b49a92dc19 100644 --- a/src/mongo/db/repl/applier_test.cpp +++ b/src/mongo/db/repl/applier_test.cpp @@ -67,9 +67,9 @@ protected: void ApplierTest::setUp() { ReplicationExecutorTest::setUp(); launchExecutorThread(); - auto apply = [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); }; + auto apply = [](OperationContext* txn, const OplogEntry& operation) { return Status::OK(); }; _applier.reset(new Applier(&getReplExecutor(), - {BSON("ts" << Timestamp(Seconds(123), 0))}, + {OplogEntry(BSON("ts" << Timestamp(Seconds(123), 0)))}, apply, [this](const StatusWith<Timestamp>&, const Operations&) { if (_barrier.get()) { @@ -89,8 +89,8 @@ Applier* ApplierTest::getApplier() const { } TEST_F(ApplierTest, InvalidConstruction) { - const Operations operations{BSON("ts" << Timestamp(Seconds(123), 0))}; - auto apply = [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); }; + const Operations operations{OplogEntry(BSON("ts" << Timestamp(Seconds(123), 0)))}; + auto apply = [](OperationContext* txn, const OplogEntry& operation) { return Status::OK(); }; auto callback = [](const StatusWith<Timestamp>& status, const Operations& operations) {}; // Null executor. @@ -102,12 +102,12 @@ TEST_F(ApplierTest, InvalidConstruction) { Applier(&getReplExecutor(), {}, apply, callback), UserException, ErrorCodes::BadValue); // Last operation missing timestamp field. - ASSERT_THROWS_CODE(Applier(&getReplExecutor(), {BSONObj()}, apply, callback), + ASSERT_THROWS_CODE(Applier(&getReplExecutor(), {OplogEntry(BSONObj())}, apply, callback), UserException, ErrorCodes::FailedToParse); // "ts" field in last operation not a timestamp. - ASSERT_THROWS_CODE(Applier(&getReplExecutor(), {BSON("ts" << 99)}, apply, callback), + ASSERT_THROWS_CODE(Applier(&getReplExecutor(), {OplogEntry(BSON("ts" << 99))}, apply, callback), UserException, ErrorCodes::TypeMismatch); @@ -174,14 +174,14 @@ TEST_F(ApplierTest, CancelBeforeStartingDBWork) { getReplExecutor().scheduleDBWork([&](const CallbackData& cbd) { barrier.countDownAndWait(); // generation 0 }); - const BSONObj operation = BSON("ts" << Timestamp(Seconds(123), 0)); + const OplogEntry operation(BSON("ts" << Timestamp(Seconds(123), 0))); stdx::mutex mutex; StatusWith<Timestamp> result = getDetectableErrorStatus(); Applier::Operations operations; _applier.reset( new Applier(&getReplExecutor(), {operation}, - [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); }, + [](OperationContext* txn, const OplogEntry& operation) { return Status::OK(); }, [&](const StatusWith<Timestamp>& theResult, const Operations& theOperations) { stdx::lock_guard<stdx::mutex> lock(mutex); result = theResult; @@ -213,14 +213,14 @@ TEST_F(ApplierTest, DestroyBeforeStartingDBWork) { // Give the main thread a head start in invoking the applier destructor. sleepmillis(1); }); - const BSONObj operation = BSON("ts" << Timestamp(Seconds(123), 0)); + const OplogEntry operation(BSON("ts" << Timestamp(Seconds(123), 0))); stdx::mutex mutex; StatusWith<Timestamp> result = getDetectableErrorStatus(); Applier::Operations operations; _applier.reset( new Applier(&getReplExecutor(), {operation}, - [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); }, + [](OperationContext* txn, const OplogEntry& operation) { return Status::OK(); }, [&](const StatusWith<Timestamp>& theResult, const Operations& theOperations) { stdx::lock_guard<stdx::mutex> lock(mutex); result = theResult; @@ -254,8 +254,8 @@ TEST_F(ApplierTest, WaitForCompletion) { Applier::Operations operations; _applier.reset( new Applier(&getReplExecutor(), - {BSON("ts" << timestamp)}, - [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); }, + {OplogEntry(BSON("ts" << timestamp))}, + [](OperationContext* txn, const OplogEntry& operation) { return Status::OK(); }, [&](const StatusWith<Timestamp>& theResult, const Operations& theOperations) { stdx::lock_guard<stdx::mutex> lock(mutex); result = theResult; @@ -280,8 +280,8 @@ TEST_F(ApplierTest, DestroyShouldBlockUntilInactive) { Applier::Operations operations; _applier.reset( new Applier(&getReplExecutor(), - {BSON("ts" << timestamp)}, - [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); }, + {OplogEntry(BSON("ts" << timestamp))}, + [](OperationContext* txn, const OplogEntry& operation) { return Status::OK(); }, [&](const StatusWith<Timestamp>& theResult, const Operations& theOperations) { stdx::lock_guard<stdx::mutex> lock(mutex); result = theResult; @@ -302,15 +302,15 @@ TEST_F(ApplierTest, DestroyShouldBlockUntilInactive) { TEST_F(ApplierTest, ApplyOperationSuccessful) { // Bogus operations codes. Applier::Operations operationsToApply{ - BSON("op" - << "a" - << "ts" << Timestamp(Seconds(123), 0)), - BSON("op" - << "b" - << "ts" << Timestamp(Seconds(456), 0)), - BSON("op" - << "c" - << "ts" << Timestamp(Seconds(789), 0)), + OplogEntry(BSON("op" + << "a" + << "ts" << Timestamp(Seconds(123), 0))), + OplogEntry(BSON("op" + << "b" + << "ts" << Timestamp(Seconds(456), 0))), + OplogEntry(BSON("op" + << "c" + << "ts" << Timestamp(Seconds(789), 0))), }; stdx::mutex mutex; StatusWith<Timestamp> result = getDetectableErrorStatus(); @@ -318,7 +318,7 @@ TEST_F(ApplierTest, ApplyOperationSuccessful) { bool isLockBatchWriter = false; Applier::Operations operationsApplied; Applier::Operations operationsOnCompletion; - auto apply = [&](OperationContext* txn, const BSONObj& operation) { + auto apply = [&](OperationContext* txn, const OplogEntry& operation) { stdx::lock_guard<stdx::mutex> lock(mutex); areWritesReplicationOnOperationContext = txn->writesAreReplicated(); isLockBatchWriter = txn->lockState()->isBatchWriter(); @@ -343,28 +343,28 @@ TEST_F(ApplierTest, ApplyOperationSuccessful) { ASSERT_EQUALS(operationsToApply[1], operationsApplied[1]); ASSERT_EQUALS(operationsToApply[2], operationsApplied[2]); ASSERT_OK(result.getStatus()); - ASSERT_EQUALS(operationsToApply[2]["ts"].timestamp(), result.getValue()); + ASSERT_EQUALS(operationsToApply[2].raw["ts"].timestamp(), result.getValue()); ASSERT_TRUE(operationsOnCompletion.empty()); } void ApplierTest::_testApplyOperationFailed(size_t opIndex, stdx::function<Status()> fail) { // Bogus operations codes. Applier::Operations operationsToApply{ - BSON("op" - << "a" - << "ts" << Timestamp(Seconds(123), 0)), - BSON("op" - << "b" - << "ts" << Timestamp(Seconds(456), 0)), - BSON("op" - << "c" - << "ts" << Timestamp(Seconds(789), 0)), + OplogEntry(BSON("op" + << "a" + << "ts" << Timestamp(Seconds(123), 0))), + OplogEntry(BSON("op" + << "b" + << "ts" << Timestamp(Seconds(456), 0))), + OplogEntry(BSON("op" + << "c" + << "ts" << Timestamp(Seconds(789), 0))), }; stdx::mutex mutex; StatusWith<Timestamp> result = getDetectableErrorStatus(); Applier::Operations operationsApplied; Applier::Operations operationsOnCompletion; - auto apply = [&](OperationContext* txn, const BSONObj& operation) { + auto apply = [&](OperationContext* txn, const OplogEntry& operation) { stdx::lock_guard<stdx::mutex> lock(mutex); if (operationsApplied.size() == opIndex) { return fail(); @@ -443,7 +443,7 @@ TEST_F(ApplyUntilAndPauseTest, EmptyOperations) { auto result = applyUntilAndPause( &getReplExecutor(), {}, - [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); }, + [](OperationContext* txn, const OplogEntry& operation) { return Status::OK(); }, Timestamp(Seconds(123), 0), [] {}, [](const StatusWith<Timestamp>& theResult, const Operations& theOperations) {}); @@ -454,9 +454,10 @@ TEST_F(ApplyUntilAndPauseTest, NoOperationsInRange) { auto result = applyUntilAndPause( &getReplExecutor(), { - BSON("ts" << Timestamp(Seconds(456), 0)), BSON("ts" << Timestamp(Seconds(789), 0)), + OplogEntry(BSON("ts" << Timestamp(Seconds(456), 0))), + OplogEntry(BSON("ts" << Timestamp(Seconds(789), 0))), }, - [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); }, + [](OperationContext* txn, const OplogEntry& operation) { return Status::OK(); }, Timestamp(Seconds(123), 0), [] {}, [](const StatusWith<Timestamp>& theResult, const Operations& theOperations) {}); @@ -466,8 +467,8 @@ TEST_F(ApplyUntilAndPauseTest, NoOperationsInRange) { TEST_F(ApplyUntilAndPauseTest, OperationMissingTimestampField) { auto result = applyUntilAndPause( &getReplExecutor(), - {BSONObj()}, - [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); }, + {OplogEntry(BSONObj())}, + [](OperationContext* txn, const OplogEntry& operation) { return Status::OK(); }, Timestamp(Seconds(123), 0), [] {}, [](const StatusWith<Timestamp>& theResult, const Operations& theOperations) {}); @@ -476,12 +477,12 @@ TEST_F(ApplyUntilAndPauseTest, OperationMissingTimestampField) { TEST_F(ApplyUntilAndPauseTest, ApplyUntilAndPauseSingleOperation) { Timestamp ts(Seconds(123), 0); - const Operations operationsToApply{BSON("ts" << ts)}; + const Operations operationsToApply{OplogEntry(BSON("ts" << ts))}; stdx::mutex mutex; StatusWith<Timestamp> completionResult = getDetectableErrorStatus(); bool pauseCalled = false; Applier::Operations operationsOnCompletion; - auto apply = [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); }; + auto apply = [](OperationContext* txn, const OplogEntry& operation) { return Status::OK(); }; auto pause = [&] { stdx::lock_guard<stdx::mutex> lock(mutex); pauseCalled = true; @@ -513,12 +514,12 @@ TEST_F(ApplyUntilAndPauseTest, ApplyUntilAndPauseSingleOperation) { TEST_F(ApplyUntilAndPauseTest, ApplyUntilAndPauseSingleOperationTimestampNotInOperations) { Timestamp ts(Seconds(123), 0); - const Operations operationsToApply{BSON("ts" << ts)}; + const Operations operationsToApply{OplogEntry(BSON("ts" << ts))}; stdx::mutex mutex; StatusWith<Timestamp> completionResult = getDetectableErrorStatus(); bool pauseCalled = false; Applier::Operations operationsOnCompletion; - auto apply = [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); }; + auto apply = [](OperationContext* txn, const OplogEntry& operation) { return Status::OK(); }; auto pause = [&] { stdx::lock_guard<stdx::mutex> lock(mutex); pauseCalled = true; @@ -551,12 +552,12 @@ TEST_F(ApplyUntilAndPauseTest, ApplyUntilAndPauseSingleOperationTimestampNotInOp TEST_F(ApplyUntilAndPauseTest, ApplyUntilAndPauseSingleOperationAppliedFailed) { Timestamp ts(Seconds(123), 0); - const Operations operationsToApply{BSON("ts" << ts)}; + const Operations operationsToApply{OplogEntry(BSON("ts" << ts))}; stdx::mutex mutex; StatusWith<Timestamp> completionResult = getDetectableErrorStatus(); bool pauseCalled = false; Applier::Operations operationsOnCompletion; - auto apply = [](OperationContext* txn, const BSONObj& operation) { + auto apply = [](OperationContext* txn, const OplogEntry& operation) { return Status(ErrorCodes::OperationFailed, ""); }; auto pause = [&] { @@ -591,22 +592,22 @@ void _testApplyUntilAndPauseDiscardOperations(ReplicationExecutor* executor, const Timestamp& ts, bool expectedPauseCalled) { Applier::Operations operationsToApply{ - BSON("op" - << "a" - << "ts" << Timestamp(Seconds(123), 0)), - BSON("op" - << "b" - << "ts" << Timestamp(Seconds(456), 0)), - BSON("op" - << "c" - << "ts" << Timestamp(Seconds(789), 0)), + OplogEntry(BSON("op" + << "a" + << "ts" << Timestamp(Seconds(123), 0))), + OplogEntry(BSON("op" + << "b" + << "ts" << Timestamp(Seconds(456), 0))), + OplogEntry(BSON("op" + << "c" + << "ts" << Timestamp(Seconds(789), 0))), }; stdx::mutex mutex; StatusWith<Timestamp> completionResult = ApplyUntilAndPauseTest::getDetectableErrorStatus(); bool pauseCalled = false; Applier::Operations operationsApplied; Applier::Operations operationsOnCompletion; - auto apply = [&](OperationContext* txn, const BSONObj& operation) { + auto apply = [&](OperationContext* txn, const OplogEntry& operation) { stdx::lock_guard<stdx::mutex> lock(mutex); operationsApplied.push_back(operation); return Status::OK(); diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp index a01ac6e1b5f..3554a86476f 100644 --- a/src/mongo/db/repl/data_replicator.cpp +++ b/src/mongo/db/repl/data_replicator.cpp @@ -1031,7 +1031,7 @@ Operations DataReplicator::_getNextApplierBatch_inlock() { Operations ops; BSONObj op; while (_oplogBuffer.tryPop(op)) { - ops.push_back(op); + ops.push_back(OplogEntry(op)); } return ops; } @@ -1080,9 +1080,9 @@ void DataReplicator::_handleFailedApplyBatch(const TimestampStatus& ts, const Op void DataReplicator::_scheduleApplyAfterFetch(const Operations& ops) { ++_initialSyncState->fetchedMissingDocs; // TODO: check collection.isCapped, like SyncTail::getMissingDoc - const BSONObj failedOplogEntry = *ops.begin(); + const BSONObj failedOplogEntry = ops.begin()->raw; const BSONElement missingIdElem = failedOplogEntry.getFieldDotted("o2._id"); - const NamespaceString nss(ops.begin()->getField("ns").str()); + const NamespaceString nss(ops.begin()->ns); const BSONObj query = BSON("find" << nss.coll() << "filter" << missingIdElem.wrap()); _tmpFetcher.reset(new QueryFetcher(_exec, _syncSource, diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp index e0a4005acd5..27dd810716c 100644 --- a/src/mongo/db/repl/data_replicator_test.cpp +++ b/src/mongo/db/repl/data_replicator_test.cpp @@ -102,7 +102,7 @@ public: * clear/reset state */ void reset() { - _applierFn = [](OperationContext*, const BSONObj&) -> Status { return Status::OK(); }; + _applierFn = [](OperationContext*, const OplogEntry&) -> Status { return Status::OK(); }; _rollbackFn = [](OperationContext*, const OpTime&, const HostAndPort&) -> Status { return Status::OK(); }; _setMyLastOptime = [this](const OpTime& opTime) { _myLastOpTime = opTime; }; @@ -181,7 +181,7 @@ protected: launchExecutorThread(); DataReplicatorOptions options; options.initialSyncRetryWait = Milliseconds(0); - options.applierFn = [this](OperationContext* txn, const BSONObj& operation) { + options.applierFn = [this](OperationContext* txn, const OplogEntry& operation) { return _applierFn(txn, operation); }; options.rollbackFn = [this](OperationContext* txn, @@ -455,7 +455,7 @@ TEST_F(InitialSyncTest, Complete) { TEST_F(InitialSyncTest, MissingDocOnApplyCompletes) { DataReplicatorOptions opts; int applyCounter{0}; - _applierFn = [&](OperationContext* txn, const BSONObj& op) { + _applierFn = [&](OperationContext* txn, const OplogEntry& op) { if (++applyCounter == 1) { return Status(ErrorCodes::NoMatchingDocument, "failed: missing doc."); } @@ -869,9 +869,9 @@ TEST_F(SteadyStateTest, PauseDataReplicator) { unittest::Barrier barrier(2U); Timestamp lastTimestampApplied; BSONObj operationApplied; - _applierFn = [&](OperationContext* txn, const BSONObj& op) { + _applierFn = [&](OperationContext* txn, const OplogEntry& op) { stdx::lock_guard<stdx::mutex> lock(mutex); - operationApplied = op; + operationApplied = op.raw; barrier.countDownAndWait(); return Status::OK(); }; @@ -950,9 +950,9 @@ TEST_F(SteadyStateTest, ApplyOneOperation) { unittest::Barrier barrier(2U); Timestamp lastTimestampApplied; BSONObj operationApplied; - _applierFn = [&](OperationContext* txn, const BSONObj& op) { + _applierFn = [&](OperationContext* txn, const OplogEntry& op) { stdx::lock_guard<stdx::mutex> lock(mutex); - operationApplied = op; + operationApplied = op.raw; barrier.countDownAndWait(); return Status::OK(); }; diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp index 1b612d6ca1a..5c9922b3873 100644 --- a/src/mongo/db/repl/oplog_entry.cpp +++ b/src/mongo/db/repl/oplog_entry.cpp @@ -52,5 +52,13 @@ OplogEntry::OplogEntry(const BSONObj& rawInput) : raw(rawInput.getOwned()) { } } +std::string OplogEntry::toString() const { + return raw.toString(); +} + +std::ostream& operator<<(std::ostream& s, const OplogEntry& o) { + return s << o.toString(); +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h index b86851b34d3..4172c9659d5 100644 --- a/src/mongo/db/repl/oplog_entry.h +++ b/src/mongo/db/repl/oplog_entry.h @@ -47,6 +47,9 @@ struct OplogEntry { // This member is not parsed from the BSON and is instead populated by fillWriterVectors. bool isForCappedCollection = false; + std::string toString() const; + + BSONObj raw; // Owned. StringData ns = ""; @@ -57,5 +60,11 @@ struct OplogEntry { BSONElement o2; }; +std::ostream& operator<<(std::ostream& s, const OplogEntry& o); + +inline bool operator==(const OplogEntry& lhs, const OplogEntry& rhs) { + return lhs.raw == rhs.raw; +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index aa64f41ac7e..32f0fabb098 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -181,7 +181,7 @@ ReplicationCoordinator::Mode getReplicationModeFromSettings(const ReplSettings& DataReplicatorOptions createDataReplicatorOptions(ReplicationCoordinator* replCoord) { DataReplicatorOptions options; - options.applierFn = [](OperationContext*, const BSONObj&) -> Status { return Status::OK(); }; + options.applierFn = [](OperationContext*, const OplogEntry&) -> Status { return Status::OK(); }; options.rollbackFn = [](OperationContext*, const OpTime&, const HostAndPort&) { return Status::OK(); }; options.prepareReplSetUpdatePositionCommandFn = diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index cd1e9eace81..95cd8624fea 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -1014,8 +1014,6 @@ static void initializeWriterThread() { // This free function is used by the writer threads to apply each op void multiSyncApply(const std::vector<OplogEntry>& ops, SyncTail* st) { - using OplogEntry = OplogEntry; - std::vector<OplogEntry> oplogEntries(ops.begin(), ops.end()); std::vector<OplogEntry*> oplogEntryPointers(oplogEntries.size()); for (size_t i = 0; i < oplogEntries.size(); i++) { |