summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormatt dannenberg <matt.dannenberg@10gen.com>2016-03-08 08:02:52 -0500
committermatt dannenberg <matt.dannenberg@10gen.com>2016-03-15 04:49:59 -0400
commitbd26c0dbb66ec1ea46b7fc891dde6f7f8a166d50 (patch)
tree551ba0f60961b7e2be156928071ac5576d2e9a58
parent5f856fc375b1a5554895f98f9873c1382008f564 (diff)
downloadmongo-bd26c0dbb66ec1ea46b7fc891dde6f7f8a166d50.tar.gz
SERVER-22858 change Applier to use OplogEntry instead of BSONObj
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/applier.cpp24
-rw-r--r--src/mongo/db/repl/applier.h5
-rw-r--r--src/mongo/db/repl/applier_test.cpp113
-rw-r--r--src/mongo/db/repl/data_replicator.cpp6
-rw-r--r--src/mongo/db/repl/data_replicator_test.cpp14
-rw-r--r--src/mongo/db/repl/oplog_entry.cpp8
-rw-r--r--src/mongo/db/repl/oplog_entry.h9
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp2
-rw-r--r--src/mongo/db/repl/sync_tail.cpp2
10 files changed, 101 insertions, 83 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 5d38684ffdf..3c99b2d5f4c 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -673,6 +673,7 @@ env.Library(
target='applier',
source=[
'applier.cpp',
+ 'oplog_entry.cpp',
],
LIBDEPS=[
'replication_executor',
diff --git a/src/mongo/db/repl/applier.cpp b/src/mongo/db/repl/applier.cpp
index 9000e0ebf65..ef7e37debde 100644
--- a/src/mongo/db/repl/applier.cpp
+++ b/src/mongo/db/repl/applier.cpp
@@ -52,11 +52,11 @@ Applier::Applier(ReplicationExecutor* executor,
uassert(ErrorCodes::BadValue, "null replication executor", executor);
uassert(ErrorCodes::BadValue, "empty list of operations", !operations.empty());
uassert(ErrorCodes::FailedToParse,
- str::stream() << "last operation missing 'ts' field: " << operations.back(),
- operations.back().hasField("ts"));
+ str::stream() << "last operation missing 'ts' field: " << operations.back().raw,
+ operations.back().raw.hasField("ts"));
uassert(ErrorCodes::TypeMismatch,
- str::stream() << "'ts' in last operation not a timestamp: " << operations.back(),
- BSONType::bsonTimestamp == operations.back().getField("ts").type());
+ str::stream() << "'ts' in last operation not a timestamp: " << operations.back().raw,
+ BSONType::bsonTimestamp == operations.back().raw.getField("ts").type());
uassert(ErrorCodes::BadValue, "apply operation function cannot be null", applyOperation);
uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion);
}
@@ -152,7 +152,7 @@ void Applier::_callback(const ReplicationExecutor::CallbackArgs& cbd) {
return;
}
}
- _finishCallback(_operations.back().getField("ts").timestamp(), Operations());
+ _finishCallback(_operations.back().raw.getField("ts").timestamp(), Operations());
}
void Applier::_finishCallback(const StatusWith<Timestamp>& result, const Operations& operations) {
@@ -184,16 +184,16 @@ StatusWith<std::pair<std::unique_ptr<Applier>, Applier::Operations>> applyUntilA
const PauseDataReplicatorFn& pauseDataReplicator,
const Applier::CallbackFn& onCompletion) {
try {
- auto comp = [](const BSONObj& left, const BSONObj& right) {
+ auto comp = [](const OplogEntry& left, const OplogEntry& right) {
uassert(ErrorCodes::FailedToParse,
- str::stream() << "Operation missing 'ts' field': " << left,
- left.hasField("ts"));
+ str::stream() << "Operation missing 'ts' field': " << left.raw,
+ left.raw.hasField("ts"));
uassert(ErrorCodes::FailedToParse,
- str::stream() << "Operation missing 'ts' field': " << right,
- right.hasField("ts"));
- return left["ts"].timestamp() < right["ts"].timestamp();
+ str::stream() << "Operation missing 'ts' field': " << right.raw,
+ right.raw.hasField("ts"));
+ return left.raw["ts"].timestamp() < right.raw["ts"].timestamp();
};
- auto wrapped = BSON("ts" << lastTimestampToApply);
+ auto wrapped = OplogEntry(BSON("ts" << lastTimestampToApply));
auto i = std::lower_bound(operations.cbegin(), operations.cend(), wrapped, comp);
bool found = i != operations.cend() && !comp(wrapped, *i);
auto j = found ? i + 1 : i;
diff --git a/src/mongo/db/repl/applier.h b/src/mongo/db/repl/applier.h
index a8e48c0edb9..30f20a94024 100644
--- a/src/mongo/db/repl/applier.h
+++ b/src/mongo/db/repl/applier.h
@@ -37,6 +37,7 @@
#include "mongo/base/status.h"
#include "mongo/base/status_with.h"
#include "mongo/db/jsobj.h"
+#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/replication_executor.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/functional.h"
@@ -52,7 +53,7 @@ public:
/**
* Operations sorted by timestamp in ascending order.
*/
- using Operations = std::vector<BSONObj>;
+ using Operations = std::vector<OplogEntry>;
/**
* Callback function to report final status of applying operations along with
@@ -67,7 +68,7 @@ public:
* would have the same outcome as calling SyncTail::syncApply() ('convertUpdatesToUpserts'
* value will be embedded in the function implementation).
*/
- using ApplyOperationFn = stdx::function<Status(OperationContext*, const BSONObj&)>;
+ using ApplyOperationFn = stdx::function<Status(OperationContext*, const OplogEntry&)>;
/**
* Creates Applier in inactive state.
diff --git a/src/mongo/db/repl/applier_test.cpp b/src/mongo/db/repl/applier_test.cpp
index 63522b8f403..2b49a92dc19 100644
--- a/src/mongo/db/repl/applier_test.cpp
+++ b/src/mongo/db/repl/applier_test.cpp
@@ -67,9 +67,9 @@ protected:
void ApplierTest::setUp() {
ReplicationExecutorTest::setUp();
launchExecutorThread();
- auto apply = [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); };
+ auto apply = [](OperationContext* txn, const OplogEntry& operation) { return Status::OK(); };
_applier.reset(new Applier(&getReplExecutor(),
- {BSON("ts" << Timestamp(Seconds(123), 0))},
+ {OplogEntry(BSON("ts" << Timestamp(Seconds(123), 0)))},
apply,
[this](const StatusWith<Timestamp>&, const Operations&) {
if (_barrier.get()) {
@@ -89,8 +89,8 @@ Applier* ApplierTest::getApplier() const {
}
TEST_F(ApplierTest, InvalidConstruction) {
- const Operations operations{BSON("ts" << Timestamp(Seconds(123), 0))};
- auto apply = [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); };
+ const Operations operations{OplogEntry(BSON("ts" << Timestamp(Seconds(123), 0)))};
+ auto apply = [](OperationContext* txn, const OplogEntry& operation) { return Status::OK(); };
auto callback = [](const StatusWith<Timestamp>& status, const Operations& operations) {};
// Null executor.
@@ -102,12 +102,12 @@ TEST_F(ApplierTest, InvalidConstruction) {
Applier(&getReplExecutor(), {}, apply, callback), UserException, ErrorCodes::BadValue);
// Last operation missing timestamp field.
- ASSERT_THROWS_CODE(Applier(&getReplExecutor(), {BSONObj()}, apply, callback),
+ ASSERT_THROWS_CODE(Applier(&getReplExecutor(), {OplogEntry(BSONObj())}, apply, callback),
UserException,
ErrorCodes::FailedToParse);
// "ts" field in last operation not a timestamp.
- ASSERT_THROWS_CODE(Applier(&getReplExecutor(), {BSON("ts" << 99)}, apply, callback),
+ ASSERT_THROWS_CODE(Applier(&getReplExecutor(), {OplogEntry(BSON("ts" << 99))}, apply, callback),
UserException,
ErrorCodes::TypeMismatch);
@@ -174,14 +174,14 @@ TEST_F(ApplierTest, CancelBeforeStartingDBWork) {
getReplExecutor().scheduleDBWork([&](const CallbackData& cbd) {
barrier.countDownAndWait(); // generation 0
});
- const BSONObj operation = BSON("ts" << Timestamp(Seconds(123), 0));
+ const OplogEntry operation(BSON("ts" << Timestamp(Seconds(123), 0)));
stdx::mutex mutex;
StatusWith<Timestamp> result = getDetectableErrorStatus();
Applier::Operations operations;
_applier.reset(
new Applier(&getReplExecutor(),
{operation},
- [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); },
+ [](OperationContext* txn, const OplogEntry& operation) { return Status::OK(); },
[&](const StatusWith<Timestamp>& theResult, const Operations& theOperations) {
stdx::lock_guard<stdx::mutex> lock(mutex);
result = theResult;
@@ -213,14 +213,14 @@ TEST_F(ApplierTest, DestroyBeforeStartingDBWork) {
// Give the main thread a head start in invoking the applier destructor.
sleepmillis(1);
});
- const BSONObj operation = BSON("ts" << Timestamp(Seconds(123), 0));
+ const OplogEntry operation(BSON("ts" << Timestamp(Seconds(123), 0)));
stdx::mutex mutex;
StatusWith<Timestamp> result = getDetectableErrorStatus();
Applier::Operations operations;
_applier.reset(
new Applier(&getReplExecutor(),
{operation},
- [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); },
+ [](OperationContext* txn, const OplogEntry& operation) { return Status::OK(); },
[&](const StatusWith<Timestamp>& theResult, const Operations& theOperations) {
stdx::lock_guard<stdx::mutex> lock(mutex);
result = theResult;
@@ -254,8 +254,8 @@ TEST_F(ApplierTest, WaitForCompletion) {
Applier::Operations operations;
_applier.reset(
new Applier(&getReplExecutor(),
- {BSON("ts" << timestamp)},
- [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); },
+ {OplogEntry(BSON("ts" << timestamp))},
+ [](OperationContext* txn, const OplogEntry& operation) { return Status::OK(); },
[&](const StatusWith<Timestamp>& theResult, const Operations& theOperations) {
stdx::lock_guard<stdx::mutex> lock(mutex);
result = theResult;
@@ -280,8 +280,8 @@ TEST_F(ApplierTest, DestroyShouldBlockUntilInactive) {
Applier::Operations operations;
_applier.reset(
new Applier(&getReplExecutor(),
- {BSON("ts" << timestamp)},
- [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); },
+ {OplogEntry(BSON("ts" << timestamp))},
+ [](OperationContext* txn, const OplogEntry& operation) { return Status::OK(); },
[&](const StatusWith<Timestamp>& theResult, const Operations& theOperations) {
stdx::lock_guard<stdx::mutex> lock(mutex);
result = theResult;
@@ -302,15 +302,15 @@ TEST_F(ApplierTest, DestroyShouldBlockUntilInactive) {
TEST_F(ApplierTest, ApplyOperationSuccessful) {
// Bogus operations codes.
Applier::Operations operationsToApply{
- BSON("op"
- << "a"
- << "ts" << Timestamp(Seconds(123), 0)),
- BSON("op"
- << "b"
- << "ts" << Timestamp(Seconds(456), 0)),
- BSON("op"
- << "c"
- << "ts" << Timestamp(Seconds(789), 0)),
+ OplogEntry(BSON("op"
+ << "a"
+ << "ts" << Timestamp(Seconds(123), 0))),
+ OplogEntry(BSON("op"
+ << "b"
+ << "ts" << Timestamp(Seconds(456), 0))),
+ OplogEntry(BSON("op"
+ << "c"
+ << "ts" << Timestamp(Seconds(789), 0))),
};
stdx::mutex mutex;
StatusWith<Timestamp> result = getDetectableErrorStatus();
@@ -318,7 +318,7 @@ TEST_F(ApplierTest, ApplyOperationSuccessful) {
bool isLockBatchWriter = false;
Applier::Operations operationsApplied;
Applier::Operations operationsOnCompletion;
- auto apply = [&](OperationContext* txn, const BSONObj& operation) {
+ auto apply = [&](OperationContext* txn, const OplogEntry& operation) {
stdx::lock_guard<stdx::mutex> lock(mutex);
areWritesReplicationOnOperationContext = txn->writesAreReplicated();
isLockBatchWriter = txn->lockState()->isBatchWriter();
@@ -343,28 +343,28 @@ TEST_F(ApplierTest, ApplyOperationSuccessful) {
ASSERT_EQUALS(operationsToApply[1], operationsApplied[1]);
ASSERT_EQUALS(operationsToApply[2], operationsApplied[2]);
ASSERT_OK(result.getStatus());
- ASSERT_EQUALS(operationsToApply[2]["ts"].timestamp(), result.getValue());
+ ASSERT_EQUALS(operationsToApply[2].raw["ts"].timestamp(), result.getValue());
ASSERT_TRUE(operationsOnCompletion.empty());
}
void ApplierTest::_testApplyOperationFailed(size_t opIndex, stdx::function<Status()> fail) {
// Bogus operations codes.
Applier::Operations operationsToApply{
- BSON("op"
- << "a"
- << "ts" << Timestamp(Seconds(123), 0)),
- BSON("op"
- << "b"
- << "ts" << Timestamp(Seconds(456), 0)),
- BSON("op"
- << "c"
- << "ts" << Timestamp(Seconds(789), 0)),
+ OplogEntry(BSON("op"
+ << "a"
+ << "ts" << Timestamp(Seconds(123), 0))),
+ OplogEntry(BSON("op"
+ << "b"
+ << "ts" << Timestamp(Seconds(456), 0))),
+ OplogEntry(BSON("op"
+ << "c"
+ << "ts" << Timestamp(Seconds(789), 0))),
};
stdx::mutex mutex;
StatusWith<Timestamp> result = getDetectableErrorStatus();
Applier::Operations operationsApplied;
Applier::Operations operationsOnCompletion;
- auto apply = [&](OperationContext* txn, const BSONObj& operation) {
+ auto apply = [&](OperationContext* txn, const OplogEntry& operation) {
stdx::lock_guard<stdx::mutex> lock(mutex);
if (operationsApplied.size() == opIndex) {
return fail();
@@ -443,7 +443,7 @@ TEST_F(ApplyUntilAndPauseTest, EmptyOperations) {
auto result = applyUntilAndPause(
&getReplExecutor(),
{},
- [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); },
+ [](OperationContext* txn, const OplogEntry& operation) { return Status::OK(); },
Timestamp(Seconds(123), 0),
[] {},
[](const StatusWith<Timestamp>& theResult, const Operations& theOperations) {});
@@ -454,9 +454,10 @@ TEST_F(ApplyUntilAndPauseTest, NoOperationsInRange) {
auto result = applyUntilAndPause(
&getReplExecutor(),
{
- BSON("ts" << Timestamp(Seconds(456), 0)), BSON("ts" << Timestamp(Seconds(789), 0)),
+ OplogEntry(BSON("ts" << Timestamp(Seconds(456), 0))),
+ OplogEntry(BSON("ts" << Timestamp(Seconds(789), 0))),
},
- [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); },
+ [](OperationContext* txn, const OplogEntry& operation) { return Status::OK(); },
Timestamp(Seconds(123), 0),
[] {},
[](const StatusWith<Timestamp>& theResult, const Operations& theOperations) {});
@@ -466,8 +467,8 @@ TEST_F(ApplyUntilAndPauseTest, NoOperationsInRange) {
TEST_F(ApplyUntilAndPauseTest, OperationMissingTimestampField) {
auto result = applyUntilAndPause(
&getReplExecutor(),
- {BSONObj()},
- [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); },
+ {OplogEntry(BSONObj())},
+ [](OperationContext* txn, const OplogEntry& operation) { return Status::OK(); },
Timestamp(Seconds(123), 0),
[] {},
[](const StatusWith<Timestamp>& theResult, const Operations& theOperations) {});
@@ -476,12 +477,12 @@ TEST_F(ApplyUntilAndPauseTest, OperationMissingTimestampField) {
TEST_F(ApplyUntilAndPauseTest, ApplyUntilAndPauseSingleOperation) {
Timestamp ts(Seconds(123), 0);
- const Operations operationsToApply{BSON("ts" << ts)};
+ const Operations operationsToApply{OplogEntry(BSON("ts" << ts))};
stdx::mutex mutex;
StatusWith<Timestamp> completionResult = getDetectableErrorStatus();
bool pauseCalled = false;
Applier::Operations operationsOnCompletion;
- auto apply = [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); };
+ auto apply = [](OperationContext* txn, const OplogEntry& operation) { return Status::OK(); };
auto pause = [&] {
stdx::lock_guard<stdx::mutex> lock(mutex);
pauseCalled = true;
@@ -513,12 +514,12 @@ TEST_F(ApplyUntilAndPauseTest, ApplyUntilAndPauseSingleOperation) {
TEST_F(ApplyUntilAndPauseTest, ApplyUntilAndPauseSingleOperationTimestampNotInOperations) {
Timestamp ts(Seconds(123), 0);
- const Operations operationsToApply{BSON("ts" << ts)};
+ const Operations operationsToApply{OplogEntry(BSON("ts" << ts))};
stdx::mutex mutex;
StatusWith<Timestamp> completionResult = getDetectableErrorStatus();
bool pauseCalled = false;
Applier::Operations operationsOnCompletion;
- auto apply = [](OperationContext* txn, const BSONObj& operation) { return Status::OK(); };
+ auto apply = [](OperationContext* txn, const OplogEntry& operation) { return Status::OK(); };
auto pause = [&] {
stdx::lock_guard<stdx::mutex> lock(mutex);
pauseCalled = true;
@@ -551,12 +552,12 @@ TEST_F(ApplyUntilAndPauseTest, ApplyUntilAndPauseSingleOperationTimestampNotInOp
TEST_F(ApplyUntilAndPauseTest, ApplyUntilAndPauseSingleOperationAppliedFailed) {
Timestamp ts(Seconds(123), 0);
- const Operations operationsToApply{BSON("ts" << ts)};
+ const Operations operationsToApply{OplogEntry(BSON("ts" << ts))};
stdx::mutex mutex;
StatusWith<Timestamp> completionResult = getDetectableErrorStatus();
bool pauseCalled = false;
Applier::Operations operationsOnCompletion;
- auto apply = [](OperationContext* txn, const BSONObj& operation) {
+ auto apply = [](OperationContext* txn, const OplogEntry& operation) {
return Status(ErrorCodes::OperationFailed, "");
};
auto pause = [&] {
@@ -591,22 +592,22 @@ void _testApplyUntilAndPauseDiscardOperations(ReplicationExecutor* executor,
const Timestamp& ts,
bool expectedPauseCalled) {
Applier::Operations operationsToApply{
- BSON("op"
- << "a"
- << "ts" << Timestamp(Seconds(123), 0)),
- BSON("op"
- << "b"
- << "ts" << Timestamp(Seconds(456), 0)),
- BSON("op"
- << "c"
- << "ts" << Timestamp(Seconds(789), 0)),
+ OplogEntry(BSON("op"
+ << "a"
+ << "ts" << Timestamp(Seconds(123), 0))),
+ OplogEntry(BSON("op"
+ << "b"
+ << "ts" << Timestamp(Seconds(456), 0))),
+ OplogEntry(BSON("op"
+ << "c"
+ << "ts" << Timestamp(Seconds(789), 0))),
};
stdx::mutex mutex;
StatusWith<Timestamp> completionResult = ApplyUntilAndPauseTest::getDetectableErrorStatus();
bool pauseCalled = false;
Applier::Operations operationsApplied;
Applier::Operations operationsOnCompletion;
- auto apply = [&](OperationContext* txn, const BSONObj& operation) {
+ auto apply = [&](OperationContext* txn, const OplogEntry& operation) {
stdx::lock_guard<stdx::mutex> lock(mutex);
operationsApplied.push_back(operation);
return Status::OK();
diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp
index a01ac6e1b5f..3554a86476f 100644
--- a/src/mongo/db/repl/data_replicator.cpp
+++ b/src/mongo/db/repl/data_replicator.cpp
@@ -1031,7 +1031,7 @@ Operations DataReplicator::_getNextApplierBatch_inlock() {
Operations ops;
BSONObj op;
while (_oplogBuffer.tryPop(op)) {
- ops.push_back(op);
+ ops.push_back(OplogEntry(op));
}
return ops;
}
@@ -1080,9 +1080,9 @@ void DataReplicator::_handleFailedApplyBatch(const TimestampStatus& ts, const Op
void DataReplicator::_scheduleApplyAfterFetch(const Operations& ops) {
++_initialSyncState->fetchedMissingDocs;
// TODO: check collection.isCapped, like SyncTail::getMissingDoc
- const BSONObj failedOplogEntry = *ops.begin();
+ const BSONObj failedOplogEntry = ops.begin()->raw;
const BSONElement missingIdElem = failedOplogEntry.getFieldDotted("o2._id");
- const NamespaceString nss(ops.begin()->getField("ns").str());
+ const NamespaceString nss(ops.begin()->ns);
const BSONObj query = BSON("find" << nss.coll() << "filter" << missingIdElem.wrap());
_tmpFetcher.reset(new QueryFetcher(_exec,
_syncSource,
diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp
index e0a4005acd5..27dd810716c 100644
--- a/src/mongo/db/repl/data_replicator_test.cpp
+++ b/src/mongo/db/repl/data_replicator_test.cpp
@@ -102,7 +102,7 @@ public:
* clear/reset state
*/
void reset() {
- _applierFn = [](OperationContext*, const BSONObj&) -> Status { return Status::OK(); };
+ _applierFn = [](OperationContext*, const OplogEntry&) -> Status { return Status::OK(); };
_rollbackFn = [](OperationContext*, const OpTime&, const HostAndPort&)
-> Status { return Status::OK(); };
_setMyLastOptime = [this](const OpTime& opTime) { _myLastOpTime = opTime; };
@@ -181,7 +181,7 @@ protected:
launchExecutorThread();
DataReplicatorOptions options;
options.initialSyncRetryWait = Milliseconds(0);
- options.applierFn = [this](OperationContext* txn, const BSONObj& operation) {
+ options.applierFn = [this](OperationContext* txn, const OplogEntry& operation) {
return _applierFn(txn, operation);
};
options.rollbackFn = [this](OperationContext* txn,
@@ -455,7 +455,7 @@ TEST_F(InitialSyncTest, Complete) {
TEST_F(InitialSyncTest, MissingDocOnApplyCompletes) {
DataReplicatorOptions opts;
int applyCounter{0};
- _applierFn = [&](OperationContext* txn, const BSONObj& op) {
+ _applierFn = [&](OperationContext* txn, const OplogEntry& op) {
if (++applyCounter == 1) {
return Status(ErrorCodes::NoMatchingDocument, "failed: missing doc.");
}
@@ -869,9 +869,9 @@ TEST_F(SteadyStateTest, PauseDataReplicator) {
unittest::Barrier barrier(2U);
Timestamp lastTimestampApplied;
BSONObj operationApplied;
- _applierFn = [&](OperationContext* txn, const BSONObj& op) {
+ _applierFn = [&](OperationContext* txn, const OplogEntry& op) {
stdx::lock_guard<stdx::mutex> lock(mutex);
- operationApplied = op;
+ operationApplied = op.raw;
barrier.countDownAndWait();
return Status::OK();
};
@@ -950,9 +950,9 @@ TEST_F(SteadyStateTest, ApplyOneOperation) {
unittest::Barrier barrier(2U);
Timestamp lastTimestampApplied;
BSONObj operationApplied;
- _applierFn = [&](OperationContext* txn, const BSONObj& op) {
+ _applierFn = [&](OperationContext* txn, const OplogEntry& op) {
stdx::lock_guard<stdx::mutex> lock(mutex);
- operationApplied = op;
+ operationApplied = op.raw;
barrier.countDownAndWait();
return Status::OK();
};
diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp
index 1b612d6ca1a..5c9922b3873 100644
--- a/src/mongo/db/repl/oplog_entry.cpp
+++ b/src/mongo/db/repl/oplog_entry.cpp
@@ -52,5 +52,13 @@ OplogEntry::OplogEntry(const BSONObj& rawInput) : raw(rawInput.getOwned()) {
}
}
+std::string OplogEntry::toString() const {
+ return raw.toString();
+}
+
+std::ostream& operator<<(std::ostream& s, const OplogEntry& o) {
+ return s << o.toString();
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h
index b86851b34d3..4172c9659d5 100644
--- a/src/mongo/db/repl/oplog_entry.h
+++ b/src/mongo/db/repl/oplog_entry.h
@@ -47,6 +47,9 @@ struct OplogEntry {
// This member is not parsed from the BSON and is instead populated by fillWriterVectors.
bool isForCappedCollection = false;
+ std::string toString() const;
+
+
BSONObj raw; // Owned.
StringData ns = "";
@@ -57,5 +60,11 @@ struct OplogEntry {
BSONElement o2;
};
+std::ostream& operator<<(std::ostream& s, const OplogEntry& o);
+
+inline bool operator==(const OplogEntry& lhs, const OplogEntry& rhs) {
+ return lhs.raw == rhs.raw;
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index aa64f41ac7e..32f0fabb098 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -181,7 +181,7 @@ ReplicationCoordinator::Mode getReplicationModeFromSettings(const ReplSettings&
DataReplicatorOptions createDataReplicatorOptions(ReplicationCoordinator* replCoord) {
DataReplicatorOptions options;
- options.applierFn = [](OperationContext*, const BSONObj&) -> Status { return Status::OK(); };
+ options.applierFn = [](OperationContext*, const OplogEntry&) -> Status { return Status::OK(); };
options.rollbackFn =
[](OperationContext*, const OpTime&, const HostAndPort&) { return Status::OK(); };
options.prepareReplSetUpdatePositionCommandFn =
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index cd1e9eace81..95cd8624fea 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -1014,8 +1014,6 @@ static void initializeWriterThread() {
// This free function is used by the writer threads to apply each op
void multiSyncApply(const std::vector<OplogEntry>& ops, SyncTail* st) {
- using OplogEntry = OplogEntry;
-
std::vector<OplogEntry> oplogEntries(ops.begin(), ops.end());
std::vector<OplogEntry*> oplogEntryPointers(oplogEntries.size());
for (size_t i = 0; i < oplogEntries.size(); i++) {