summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/data_replicator_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/data_replicator_test.cpp')
-rw-r--r--src/mongo/db/repl/data_replicator_test.cpp387
1 files changed, 208 insertions, 179 deletions
diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp
index 300100d4726..43e42f7cc5e 100644
--- a/src/mongo/db/repl/data_replicator_test.cpp
+++ b/src/mongo/db/repl/data_replicator_test.cpp
@@ -40,18 +40,18 @@
#include "mongo/db/repl/data_replicator_external_state_mock.h"
#include "mongo/db/repl/member_state.h"
#include "mongo/db/repl/optime.h"
-#include "mongo/db/repl/update_position_args.h"
-#include "mongo/db/repl/replication_executor_test_fixture.h"
#include "mongo/db/repl/replication_executor.h"
+#include "mongo/db/repl/replication_executor_test_fixture.h"
#include "mongo/db/repl/reporter.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/repl/storage_interface_mock.h"
-#include "mongo/db/repl/sync_source_selector.h"
#include "mongo/db/repl/sync_source_resolver.h"
+#include "mongo/db/repl/sync_source_selector.h"
+#include "mongo/db/repl/update_position_args.h"
#include "mongo/executor/network_interface_mock.h"
#include "mongo/stdx/mutex.h"
-#include "mongo/util/fail_point_service.h"
#include "mongo/util/concurrency/thread_name.h"
+#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
@@ -105,8 +105,9 @@ public:
* clear/reset state
*/
void reset() {
- _rollbackFn = [](OperationContext*, const OpTime&, const HostAndPort&)
- -> Status { return Status::OK(); };
+ _rollbackFn = [](OperationContext*, const OpTime&, const HostAndPort&) -> Status {
+ return Status::OK();
+ };
_setMyLastOptime = [this](const OpTime& opTime) { _myLastOpTime = opTime; };
_myLastOpTime = OpTime();
_memberState = MemberState::RS_UNKNOWN;
@@ -198,7 +199,7 @@ protected:
options.prepareReplSetUpdatePositionCommandFn =
[](ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle)
- -> StatusWith<BSONObj> { return BSON(UpdatePositionArgs::kCommandFieldName << 1); };
+ -> StatusWith<BSONObj> { return BSON(UpdatePositionArgs::kCommandFieldName << 1); };
options.getMyLastOptime = [this]() { return _myLastOpTime; };
options.setMyLastOptime = [this](const OpTime& opTime) { _setMyLastOptime(opTime); };
options.setFollowerMode = [this](const MemberState& state) {
@@ -209,13 +210,17 @@ protected:
options.syncSourceSelector = this;
options.getReplSetConfig = []() {
ReplicaSetConfig config;
- ASSERT_OK(
- config.initialize(BSON("_id"
- << "myset"
- << "version" << 1 << "protocolVersion" << 1 << "members"
- << BSON_ARRAY(BSON("_id" << 0 << "host"
- << "localhost:12345")) << "settings"
- << BSON("electionTimeoutMillis" << 10000))));
+ ASSERT_OK(config.initialize(BSON("_id"
+ << "myset"
+ << "version"
+ << 1
+ << "protocolVersion"
+ << 1
+ << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "localhost:12345"))
+ << "settings"
+ << BSON("electionTimeoutMillis" << 10000))));
return config;
};
@@ -333,10 +338,9 @@ protected:
_storage.beginCollectionFn = _beginCollectionFn;
_storage.insertDocumentsFn = _insertCollectionFn;
- _storage.insertMissingDocFn =
- [&](OperationContext* txn, const NamespaceString& nss, const BSONObj& doc) {
- return Status::OK();
- };
+ _storage.insertMissingDocFn = [&](OperationContext* txn,
+ const NamespaceString& nss,
+ const BSONObj& doc) { return Status::OK(); };
dr->_setInitialSyncStorageInterface(&_storage);
_isbr.reset(new InitialSyncBackgroundRunner(dr));
@@ -366,11 +370,15 @@ protected:
const long long cursorId = cmdElem.numberLong();
if (isGetMore && cursorId == 1LL) {
// process getmore requests from the oplog fetcher
- auto respBSON = fromjson(str::stream()
- << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs'"
- " , nextBatch:[{ts:Timestamp(" << ++c
- << ",1), h:1, ns:'test.a', v:" << OplogEntry::kOplogVersion
- << ", op:'u', o2:{_id:" << c << "}, o:{$set:{a:1}}}]}}");
+ auto respBSON =
+ fromjson(str::stream() << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs'"
+ " , nextBatch:[{ts:Timestamp("
+ << ++c
+ << ",1), h:1, ns:'test.a', v:"
+ << OplogEntry::kOplogVersion
+ << ", op:'u', o2:{_id:"
+ << c
+ << "}, o:{$set:{a:1}}}]}}");
net->scheduleResponse(
noi,
net->now(),
@@ -446,47 +454,50 @@ TEST_F(InitialSyncTest, Complete) {
*
*/
- const std::vector<BSONObj> responses = {
- // get rollback id
- fromjson(str::stream() << "{ok: 1, rbid:1}"),
- // get latest oplog ts
- fromjson(
- str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(1,1), h:1, ns:'a.a', v:" << OplogEntry::kOplogVersion
- << ", op:'i', o:{_id:1, a:1}}]}}"),
- // oplog fetcher find
- fromjson(
- str::stream() << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(1,1), h:1, ns:'a.a', v:" << OplogEntry::kOplogVersion
- << ", op:'i', o:{_id:1, a:1}}]}}"),
- // Clone Start
- // listDatabases
- fromjson("{ok:1, databases:[{name:'a'}]}"),
- // listCollections for "a"
- fromjson(
- "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:["
- "{name:'a', options:{}} "
- "]}}"),
- // listIndexes:a
- fromjson(str::stream()
- << "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:["
- "{v:" << OplogEntry::kOplogVersion
- << ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}"),
- // find:a
- fromjson(
- "{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:["
- "{_id:1, a:1} "
- "]}}"),
- // Clone Done
- // get latest oplog ts
- fromjson(
- str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(2,2), h:1, ns:'b.c', v:" << OplogEntry::kOplogVersion
- << ", op:'i', o:{_id:1, c:1}}]}}"),
- // Applier starts ...
- // check for rollback
- fromjson(str::stream() << "{ok: 1, rbid:1}"),
- };
+ const std::vector<BSONObj> responses =
+ {
+ // get rollback id
+ fromjson(str::stream() << "{ok: 1, rbid:1}"),
+ // get latest oplog ts
+ fromjson(str::stream()
+ << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
+ "{ts:Timestamp(1,1), h:1, ns:'a.a', v:"
+ << OplogEntry::kOplogVersion
+ << ", op:'i', o:{_id:1, a:1}}]}}"),
+ // oplog fetcher find
+ fromjson(str::stream()
+ << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:["
+ "{ts:Timestamp(1,1), h:1, ns:'a.a', v:"
+ << OplogEntry::kOplogVersion
+ << ", op:'i', o:{_id:1, a:1}}]}}"),
+ // Clone Start
+ // listDatabases
+ fromjson("{ok:1, databases:[{name:'a'}]}"),
+ // listCollections for "a"
+ fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:["
+ "{name:'a', options:{}} "
+ "]}}"),
+ // listIndexes:a
+ fromjson(str::stream()
+ << "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:["
+ "{v:"
+ << OplogEntry::kOplogVersion
+ << ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}"),
+ // find:a
+ fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:["
+ "{_id:1, a:1} "
+ "]}}"),
+ // Clone Done
+ // get latest oplog ts
+ fromjson(str::stream()
+ << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
+ "{ts:Timestamp(2,2), h:1, ns:'b.c', v:"
+ << OplogEntry::kOplogVersion
+ << ", op:'i', o:{_id:1, c:1}}]}}"),
+ // Applier starts ...
+ // check for rollback
+ fromjson(str::stream() << "{ok: 1, rbid:1}"),
+ };
// Initial sync flag should not be set before starting.
ASSERT_FALSE(StorageInterface::get(getGlobalServiceContext())
@@ -516,58 +527,61 @@ TEST_F(InitialSyncTest, Complete) {
TEST_F(InitialSyncTest, MissingDocOnMultiApplyCompletes) {
DataReplicatorOptions opts;
int applyCounter{0};
- getExternalState()->multiApplyFn =
- [&](OperationContext*, const MultiApplier::Operations& ops, MultiApplier::ApplyOperationFn)
- -> StatusWith<OpTime> {
- if (++applyCounter == 1) {
- return Status(ErrorCodes::NoMatchingDocument, "failed: missing doc.");
- }
- return ops.back().getOpTime();
- };
-
- const std::vector<BSONObj> responses = {
- // get rollback id
- fromjson(str::stream() << "{ok: 1, rbid:1}"),
- // get latest oplog ts
- fromjson(
- str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(1,1), h:1, ns:'a.a', v:" << OplogEntry::kOplogVersion
- << ", op:'i', o:{_id:1, a:1}}]}}"),
- // oplog fetcher find
- fromjson(
- str::stream() << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(1,1), h:1, ns:'a.a', v:" << OplogEntry::kOplogVersion
- << ", op:'u', o2:{_id:1}, o:{$set:{a:1}}}]}}"),
- // Clone Start
- // listDatabases
- fromjson("{ok:1, databases:[{name:'a'}]}"),
- // listCollections for "a"
- fromjson(
- "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:["
- "{name:'a', options:{}} "
- "]}}"),
- // listIndexes:a
- fromjson(str::stream()
- << "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:["
- "{v:" << OplogEntry::kOplogVersion
- << ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}"),
- // find:a -- empty
- fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:[]}}"),
- // Clone Done
- // get latest oplog ts
- fromjson(
- str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(2,2), h:1, ns:'b.c', v:" << OplogEntry::kOplogVersion
- << ", op:'i', o:{_id:1, c:1}}]}}"),
- // Applier starts ...
- // missing doc fetch -- find:a {_id:1}
- fromjson(
- "{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:["
- "{_id:1, a:1} "
- "]}}"),
- // check for rollback
- fromjson(str::stream() << "{ok: 1, rbid:1}"),
+ getExternalState()->multiApplyFn = [&](OperationContext*,
+ const MultiApplier::Operations& ops,
+ MultiApplier::ApplyOperationFn) -> StatusWith<OpTime> {
+ if (++applyCounter == 1) {
+ return Status(ErrorCodes::NoMatchingDocument, "failed: missing doc.");
+ }
+ return ops.back().getOpTime();
};
+
+ const std::vector<BSONObj> responses =
+ {
+ // get rollback id
+ fromjson(str::stream() << "{ok: 1, rbid:1}"),
+ // get latest oplog ts
+ fromjson(str::stream()
+ << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
+ "{ts:Timestamp(1,1), h:1, ns:'a.a', v:"
+ << OplogEntry::kOplogVersion
+ << ", op:'i', o:{_id:1, a:1}}]}}"),
+ // oplog fetcher find
+ fromjson(str::stream()
+ << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:["
+ "{ts:Timestamp(1,1), h:1, ns:'a.a', v:"
+ << OplogEntry::kOplogVersion
+ << ", op:'u', o2:{_id:1}, o:{$set:{a:1}}}]}}"),
+ // Clone Start
+ // listDatabases
+ fromjson("{ok:1, databases:[{name:'a'}]}"),
+ // listCollections for "a"
+ fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:["
+ "{name:'a', options:{}} "
+ "]}}"),
+ // listIndexes:a
+ fromjson(str::stream()
+ << "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:["
+ "{v:"
+ << OplogEntry::kOplogVersion
+ << ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}"),
+ // find:a -- empty
+ fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:[]}}"),
+ // Clone Done
+ // get latest oplog ts
+ fromjson(str::stream()
+ << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
+ "{ts:Timestamp(2,2), h:1, ns:'b.c', v:"
+ << OplogEntry::kOplogVersion
+ << ", op:'i', o:{_id:1, c:1}}]}}"),
+ // Applier starts ...
+ // missing doc fetch -- find:a {_id:1}
+ fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:["
+ "{_id:1, a:1} "
+ "]}}"),
+ // check for rollback
+ fromjson(str::stream() << "{ok: 1, rbid:1}"),
+ };
startSync();
setResponses(responses);
playResponses(true);
@@ -581,7 +595,9 @@ TEST_F(InitialSyncTest, Failpoint) {
BSONObj configObj = BSON("_id"
<< "mySet"
- << "version" << 1 << "members"
+ << "version"
+ << 1
+ << "members"
<< BSON_ARRAY(BSON("_id" << 1 << "host"
<< "node1:12345")
<< BSON("_id" << 2 << "host"
@@ -611,12 +627,14 @@ TEST_F(InitialSyncTest, FailsOnClone) {
// get latest oplog ts
fromjson(
str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(1,1), h:1, ns:'a.a', v:" << OplogEntry::kOplogVersion
+ "{ts:Timestamp(1,1), h:1, ns:'a.a', v:"
+ << OplogEntry::kOplogVersion
<< ", op:'i', o:{_id:1, a:1}}]}}"),
// oplog fetcher find
fromjson(
str::stream() << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(1,1), h:1, ns:'a.a', v:" << OplogEntry::kOplogVersion
+ "{ts:Timestamp(1,1), h:1, ns:'a.a', v:"
+ << OplogEntry::kOplogVersion
<< ", op:'i', o:{_id:1, a:1}}]}}"),
// Clone Start
// listDatabases
@@ -631,47 +649,50 @@ TEST_F(InitialSyncTest, FailsOnClone) {
}
TEST_F(InitialSyncTest, FailOnRollback) {
- const std::vector<BSONObj> responses = {
- // get rollback id
- fromjson(str::stream() << "{ok: 1, rbid:1}"),
- // get latest oplog ts
- fromjson(
- str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(1,1), h:1, ns:'a.a', v:" << OplogEntry::kOplogVersion
- << ", op:'i', o:{_id:1, a:1}}]}}"),
- // oplog fetcher find
- fromjson(
- str::stream() << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(1,1), h:1, ns:'a.a', v:" << OplogEntry::kOplogVersion
- << ", op:'i', o:{_id:1, a:1}}]}}"),
- // Clone Start
- // listDatabases
- fromjson("{ok:1, databases:[{name:'a'}]}"),
- // listCollections for "a"
- fromjson(
- "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:["
- "{name:'a', options:{}} "
- "]}}"),
- // listIndexes:a
- fromjson(str::stream()
- << "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:["
- "{v:" << OplogEntry::kOplogVersion
- << ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}"),
- // find:a
- fromjson(
- "{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:["
- "{_id:1, a:1} "
- "]}}"),
- // Clone Done
- // get latest oplog ts
- fromjson(
- str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(2,2), h:1, ns:'b.c', v:" << OplogEntry::kOplogVersion
- << ", op:'i', o:{_id:1, c:1}}]}}"),
- // Applier starts ...
- // check for rollback
- fromjson(str::stream() << "{ok: 1, rbid:2}"),
- };
+ const std::vector<BSONObj> responses =
+ {
+ // get rollback id
+ fromjson(str::stream() << "{ok: 1, rbid:1}"),
+ // get latest oplog ts
+ fromjson(str::stream()
+ << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
+ "{ts:Timestamp(1,1), h:1, ns:'a.a', v:"
+ << OplogEntry::kOplogVersion
+ << ", op:'i', o:{_id:1, a:1}}]}}"),
+ // oplog fetcher find
+ fromjson(str::stream()
+ << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:["
+ "{ts:Timestamp(1,1), h:1, ns:'a.a', v:"
+ << OplogEntry::kOplogVersion
+ << ", op:'i', o:{_id:1, a:1}}]}}"),
+ // Clone Start
+ // listDatabases
+ fromjson("{ok:1, databases:[{name:'a'}]}"),
+ // listCollections for "a"
+ fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:["
+ "{name:'a', options:{}} "
+ "]}}"),
+ // listIndexes:a
+ fromjson(str::stream()
+ << "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:["
+ "{v:"
+ << OplogEntry::kOplogVersion
+ << ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}"),
+ // find:a
+ fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:["
+ "{_id:1, a:1} "
+ "]}}"),
+ // Clone Done
+ // get latest oplog ts
+ fromjson(str::stream()
+ << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
+ "{ts:Timestamp(2,2), h:1, ns:'b.c', v:"
+ << OplogEntry::kOplogVersion
+ << ", op:'i', o:{_id:1, c:1}}]}}"),
+ // Applier starts ...
+ // check for rollback
+ fromjson(str::stream() << "{ok: 1, rbid:2}"),
+ };
startSync();
setResponses({responses});
@@ -984,26 +1005,30 @@ TEST_F(SteadyStateTest, RollbackTwoSyncSourcesSecondRollbackSucceeds) {
TEST_F(SteadyStateTest, PauseDataReplicator) {
auto lastOperationApplied = BSON("op"
<< "a"
- << "v" << OplogEntry::kOplogVersion << "ts"
+ << "v"
+ << OplogEntry::kOplogVersion
+ << "ts"
<< Timestamp(Seconds(123), 0));
auto operationToApply = BSON("op"
<< "a"
- << "v" << OplogEntry::kOplogVersion << "ts"
+ << "v"
+ << OplogEntry::kOplogVersion
+ << "ts"
<< Timestamp(Seconds(456), 0));
stdx::mutex mutex;
unittest::Barrier barrier(2U);
Timestamp lastTimestampApplied;
BSONObj operationApplied;
- getExternalState()->multiApplyFn =
- [&](OperationContext*, const MultiApplier::Operations& ops, MultiApplier::ApplyOperationFn)
- -> StatusWith<OpTime> {
- stdx::lock_guard<stdx::mutex> lock(mutex);
- operationApplied = ops.back().raw;
- barrier.countDownAndWait();
- return ops.back().getOpTime();
- };
+ getExternalState()->multiApplyFn = [&](OperationContext*,
+ const MultiApplier::Operations& ops,
+ MultiApplier::ApplyOperationFn) -> StatusWith<OpTime> {
+ stdx::lock_guard<stdx::mutex> lock(mutex);
+ operationApplied = ops.back().raw;
+ barrier.countDownAndWait();
+ return ops.back().getOpTime();
+ };
DataReplicatorOptions::SetMyLastOptimeFn oldSetMyLastOptime = _setMyLastOptime;
_setMyLastOptime = [&](const OpTime& opTime) {
oldSetMyLastOptime(opTime);
@@ -1076,26 +1101,30 @@ TEST_F(SteadyStateTest, PauseDataReplicator) {
TEST_F(SteadyStateTest, ApplyOneOperation) {
auto lastOperationApplied = BSON("op"
<< "a"
- << "v" << OplogEntry::kOplogVersion << "ts"
+ << "v"
+ << OplogEntry::kOplogVersion
+ << "ts"
<< Timestamp(Seconds(123), 0));
auto operationToApply = BSON("op"
<< "a"
- << "v" << OplogEntry::kOplogVersion << "ts"
+ << "v"
+ << OplogEntry::kOplogVersion
+ << "ts"
<< Timestamp(Seconds(456), 0));
stdx::mutex mutex;
unittest::Barrier barrier(2U);
Timestamp lastTimestampApplied;
BSONObj operationApplied;
- getExternalState()->multiApplyFn =
- [&](OperationContext*, const MultiApplier::Operations& ops, MultiApplier::ApplyOperationFn)
- -> StatusWith<OpTime> {
- stdx::lock_guard<stdx::mutex> lock(mutex);
- operationApplied = ops.back().raw;
- barrier.countDownAndWait();
- return ops.back().getOpTime();
- };
+ getExternalState()->multiApplyFn = [&](OperationContext*,
+ const MultiApplier::Operations& ops,
+ MultiApplier::ApplyOperationFn) -> StatusWith<OpTime> {
+ stdx::lock_guard<stdx::mutex> lock(mutex);
+ operationApplied = ops.back().raw;
+ barrier.countDownAndWait();
+ return ops.back().getOpTime();
+ };
DataReplicatorOptions::SetMyLastOptimeFn oldSetMyLastOptime = _setMyLastOptime;
_setMyLastOptime = [&](const OpTime& opTime) {
oldSetMyLastOptime(opTime);