diff options
Diffstat (limited to 'src/mongo/db/repl/data_replicator_test.cpp')
-rw-r--r-- | src/mongo/db/repl/data_replicator_test.cpp | 387 |
1 files changed, 208 insertions, 179 deletions
diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp index 300100d4726..43e42f7cc5e 100644 --- a/src/mongo/db/repl/data_replicator_test.cpp +++ b/src/mongo/db/repl/data_replicator_test.cpp @@ -40,18 +40,18 @@ #include "mongo/db/repl/data_replicator_external_state_mock.h" #include "mongo/db/repl/member_state.h" #include "mongo/db/repl/optime.h" -#include "mongo/db/repl/update_position_args.h" -#include "mongo/db/repl/replication_executor_test_fixture.h" #include "mongo/db/repl/replication_executor.h" +#include "mongo/db/repl/replication_executor_test_fixture.h" #include "mongo/db/repl/reporter.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/repl/storage_interface_mock.h" -#include "mongo/db/repl/sync_source_selector.h" #include "mongo/db/repl/sync_source_resolver.h" +#include "mongo/db/repl/sync_source_selector.h" +#include "mongo/db/repl/update_position_args.h" #include "mongo/executor/network_interface_mock.h" #include "mongo/stdx/mutex.h" -#include "mongo/util/fail_point_service.h" #include "mongo/util/concurrency/thread_name.h" +#include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" @@ -105,8 +105,9 @@ public: * clear/reset state */ void reset() { - _rollbackFn = [](OperationContext*, const OpTime&, const HostAndPort&) - -> Status { return Status::OK(); }; + _rollbackFn = [](OperationContext*, const OpTime&, const HostAndPort&) -> Status { + return Status::OK(); + }; _setMyLastOptime = [this](const OpTime& opTime) { _myLastOpTime = opTime; }; _myLastOpTime = OpTime(); _memberState = MemberState::RS_UNKNOWN; @@ -198,7 +199,7 @@ protected: options.prepareReplSetUpdatePositionCommandFn = [](ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle) - -> StatusWith<BSONObj> { return BSON(UpdatePositionArgs::kCommandFieldName << 1); }; + -> StatusWith<BSONObj> { return BSON(UpdatePositionArgs::kCommandFieldName << 1); }; options.getMyLastOptime = [this]() { return _myLastOpTime; }; options.setMyLastOptime = [this](const OpTime& opTime) { _setMyLastOptime(opTime); }; options.setFollowerMode = [this](const MemberState& state) { @@ -209,13 +210,17 @@ protected: options.syncSourceSelector = this; options.getReplSetConfig = []() { ReplicaSetConfig config; - ASSERT_OK( - config.initialize(BSON("_id" - << "myset" - << "version" << 1 << "protocolVersion" << 1 << "members" - << BSON_ARRAY(BSON("_id" << 0 << "host" - << "localhost:12345")) << "settings" - << BSON("electionTimeoutMillis" << 10000)))); + ASSERT_OK(config.initialize(BSON("_id" + << "myset" + << "version" + << 1 + << "protocolVersion" + << 1 + << "members" + << BSON_ARRAY(BSON("_id" << 0 << "host" + << "localhost:12345")) + << "settings" + << BSON("electionTimeoutMillis" << 10000)))); return config; }; @@ -333,10 +338,9 @@ protected: _storage.beginCollectionFn = _beginCollectionFn; _storage.insertDocumentsFn = _insertCollectionFn; - _storage.insertMissingDocFn = - [&](OperationContext* txn, const NamespaceString& nss, const BSONObj& doc) { - return Status::OK(); - }; + _storage.insertMissingDocFn = [&](OperationContext* txn, + const NamespaceString& nss, + const BSONObj& doc) { return Status::OK(); }; dr->_setInitialSyncStorageInterface(&_storage); _isbr.reset(new InitialSyncBackgroundRunner(dr)); @@ -366,11 +370,15 @@ protected: const long long cursorId = cmdElem.numberLong(); if (isGetMore && cursorId == 1LL) { // process getmore requests from the oplog fetcher - auto respBSON = fromjson(str::stream() - << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs'" - " , nextBatch:[{ts:Timestamp(" << ++c - << ",1), h:1, ns:'test.a', v:" << OplogEntry::kOplogVersion - << ", op:'u', o2:{_id:" << c << "}, o:{$set:{a:1}}}]}}"); + auto respBSON = + fromjson(str::stream() << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs'" + " , nextBatch:[{ts:Timestamp(" + << ++c + << ",1), h:1, ns:'test.a', v:" + << OplogEntry::kOplogVersion + << ", op:'u', o2:{_id:" + << c + << "}, o:{$set:{a:1}}}]}}"); net->scheduleResponse( noi, net->now(), @@ -446,47 +454,50 @@ TEST_F(InitialSyncTest, Complete) { * */ - const std::vector<BSONObj> responses = { - // get rollback id - fromjson(str::stream() << "{ok: 1, rbid:1}"), - // get latest oplog ts - fromjson( - str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(1,1), h:1, ns:'a.a', v:" << OplogEntry::kOplogVersion - << ", op:'i', o:{_id:1, a:1}}]}}"), - // oplog fetcher find - fromjson( - str::stream() << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(1,1), h:1, ns:'a.a', v:" << OplogEntry::kOplogVersion - << ", op:'i', o:{_id:1, a:1}}]}}"), - // Clone Start - // listDatabases - fromjson("{ok:1, databases:[{name:'a'}]}"), - // listCollections for "a" - fromjson( - "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:[" - "{name:'a', options:{}} " - "]}}"), - // listIndexes:a - fromjson(str::stream() - << "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:[" - "{v:" << OplogEntry::kOplogVersion - << ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}"), - // find:a - fromjson( - "{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:[" - "{_id:1, a:1} " - "]}}"), - // Clone Done - // get latest oplog ts - fromjson( - str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(2,2), h:1, ns:'b.c', v:" << OplogEntry::kOplogVersion - << ", op:'i', o:{_id:1, c:1}}]}}"), - // Applier starts ... - // check for rollback - fromjson(str::stream() << "{ok: 1, rbid:1}"), - }; + const std::vector<BSONObj> responses = + { + // get rollback id + fromjson(str::stream() << "{ok: 1, rbid:1}"), + // get latest oplog ts + fromjson(str::stream() + << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" + "{ts:Timestamp(1,1), h:1, ns:'a.a', v:" + << OplogEntry::kOplogVersion + << ", op:'i', o:{_id:1, a:1}}]}}"), + // oplog fetcher find + fromjson(str::stream() + << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:[" + "{ts:Timestamp(1,1), h:1, ns:'a.a', v:" + << OplogEntry::kOplogVersion + << ", op:'i', o:{_id:1, a:1}}]}}"), + // Clone Start + // listDatabases + fromjson("{ok:1, databases:[{name:'a'}]}"), + // listCollections for "a" + fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:[" + "{name:'a', options:{}} " + "]}}"), + // listIndexes:a + fromjson(str::stream() + << "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:[" + "{v:" + << OplogEntry::kOplogVersion + << ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}"), + // find:a + fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:[" + "{_id:1, a:1} " + "]}}"), + // Clone Done + // get latest oplog ts + fromjson(str::stream() + << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" + "{ts:Timestamp(2,2), h:1, ns:'b.c', v:" + << OplogEntry::kOplogVersion + << ", op:'i', o:{_id:1, c:1}}]}}"), + // Applier starts ... + // check for rollback + fromjson(str::stream() << "{ok: 1, rbid:1}"), + }; // Initial sync flag should not be set before starting. ASSERT_FALSE(StorageInterface::get(getGlobalServiceContext()) @@ -516,58 +527,61 @@ TEST_F(InitialSyncTest, Complete) { TEST_F(InitialSyncTest, MissingDocOnMultiApplyCompletes) { DataReplicatorOptions opts; int applyCounter{0}; - getExternalState()->multiApplyFn = - [&](OperationContext*, const MultiApplier::Operations& ops, MultiApplier::ApplyOperationFn) - -> StatusWith<OpTime> { - if (++applyCounter == 1) { - return Status(ErrorCodes::NoMatchingDocument, "failed: missing doc."); - } - return ops.back().getOpTime(); - }; - - const std::vector<BSONObj> responses = { - // get rollback id - fromjson(str::stream() << "{ok: 1, rbid:1}"), - // get latest oplog ts - fromjson( - str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(1,1), h:1, ns:'a.a', v:" << OplogEntry::kOplogVersion - << ", op:'i', o:{_id:1, a:1}}]}}"), - // oplog fetcher find - fromjson( - str::stream() << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(1,1), h:1, ns:'a.a', v:" << OplogEntry::kOplogVersion - << ", op:'u', o2:{_id:1}, o:{$set:{a:1}}}]}}"), - // Clone Start - // listDatabases - fromjson("{ok:1, databases:[{name:'a'}]}"), - // listCollections for "a" - fromjson( - "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:[" - "{name:'a', options:{}} " - "]}}"), - // listIndexes:a - fromjson(str::stream() - << "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:[" - "{v:" << OplogEntry::kOplogVersion - << ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}"), - // find:a -- empty - fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:[]}}"), - // Clone Done - // get latest oplog ts - fromjson( - str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(2,2), h:1, ns:'b.c', v:" << OplogEntry::kOplogVersion - << ", op:'i', o:{_id:1, c:1}}]}}"), - // Applier starts ... - // missing doc fetch -- find:a {_id:1} - fromjson( - "{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:[" - "{_id:1, a:1} " - "]}}"), - // check for rollback - fromjson(str::stream() << "{ok: 1, rbid:1}"), + getExternalState()->multiApplyFn = [&](OperationContext*, + const MultiApplier::Operations& ops, + MultiApplier::ApplyOperationFn) -> StatusWith<OpTime> { + if (++applyCounter == 1) { + return Status(ErrorCodes::NoMatchingDocument, "failed: missing doc."); + } + return ops.back().getOpTime(); }; + + const std::vector<BSONObj> responses = + { + // get rollback id + fromjson(str::stream() << "{ok: 1, rbid:1}"), + // get latest oplog ts + fromjson(str::stream() + << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" + "{ts:Timestamp(1,1), h:1, ns:'a.a', v:" + << OplogEntry::kOplogVersion + << ", op:'i', o:{_id:1, a:1}}]}}"), + // oplog fetcher find + fromjson(str::stream() + << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:[" + "{ts:Timestamp(1,1), h:1, ns:'a.a', v:" + << OplogEntry::kOplogVersion + << ", op:'u', o2:{_id:1}, o:{$set:{a:1}}}]}}"), + // Clone Start + // listDatabases + fromjson("{ok:1, databases:[{name:'a'}]}"), + // listCollections for "a" + fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:[" + "{name:'a', options:{}} " + "]}}"), + // listIndexes:a + fromjson(str::stream() + << "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:[" + "{v:" + << OplogEntry::kOplogVersion + << ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}"), + // find:a -- empty + fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:[]}}"), + // Clone Done + // get latest oplog ts + fromjson(str::stream() + << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" + "{ts:Timestamp(2,2), h:1, ns:'b.c', v:" + << OplogEntry::kOplogVersion + << ", op:'i', o:{_id:1, c:1}}]}}"), + // Applier starts ... + // missing doc fetch -- find:a {_id:1} + fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:[" + "{_id:1, a:1} " + "]}}"), + // check for rollback + fromjson(str::stream() << "{ok: 1, rbid:1}"), + }; startSync(); setResponses(responses); playResponses(true); @@ -581,7 +595,9 @@ TEST_F(InitialSyncTest, Failpoint) { BSONObj configObj = BSON("_id" << "mySet" - << "version" << 1 << "members" + << "version" + << 1 + << "members" << BSON_ARRAY(BSON("_id" << 1 << "host" << "node1:12345") << BSON("_id" << 2 << "host" @@ -611,12 +627,14 @@ TEST_F(InitialSyncTest, FailsOnClone) { // get latest oplog ts fromjson( str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(1,1), h:1, ns:'a.a', v:" << OplogEntry::kOplogVersion + "{ts:Timestamp(1,1), h:1, ns:'a.a', v:" + << OplogEntry::kOplogVersion << ", op:'i', o:{_id:1, a:1}}]}}"), // oplog fetcher find fromjson( str::stream() << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(1,1), h:1, ns:'a.a', v:" << OplogEntry::kOplogVersion + "{ts:Timestamp(1,1), h:1, ns:'a.a', v:" + << OplogEntry::kOplogVersion << ", op:'i', o:{_id:1, a:1}}]}}"), // Clone Start // listDatabases @@ -631,47 +649,50 @@ TEST_F(InitialSyncTest, FailsOnClone) { } TEST_F(InitialSyncTest, FailOnRollback) { - const std::vector<BSONObj> responses = { - // get rollback id - fromjson(str::stream() << "{ok: 1, rbid:1}"), - // get latest oplog ts - fromjson( - str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(1,1), h:1, ns:'a.a', v:" << OplogEntry::kOplogVersion - << ", op:'i', o:{_id:1, a:1}}]}}"), - // oplog fetcher find - fromjson( - str::stream() << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(1,1), h:1, ns:'a.a', v:" << OplogEntry::kOplogVersion - << ", op:'i', o:{_id:1, a:1}}]}}"), - // Clone Start - // listDatabases - fromjson("{ok:1, databases:[{name:'a'}]}"), - // listCollections for "a" - fromjson( - "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:[" - "{name:'a', options:{}} " - "]}}"), - // listIndexes:a - fromjson(str::stream() - << "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:[" - "{v:" << OplogEntry::kOplogVersion - << ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}"), - // find:a - fromjson( - "{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:[" - "{_id:1, a:1} " - "]}}"), - // Clone Done - // get latest oplog ts - fromjson( - str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(2,2), h:1, ns:'b.c', v:" << OplogEntry::kOplogVersion - << ", op:'i', o:{_id:1, c:1}}]}}"), - // Applier starts ... - // check for rollback - fromjson(str::stream() << "{ok: 1, rbid:2}"), - }; + const std::vector<BSONObj> responses = + { + // get rollback id + fromjson(str::stream() << "{ok: 1, rbid:1}"), + // get latest oplog ts + fromjson(str::stream() + << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" + "{ts:Timestamp(1,1), h:1, ns:'a.a', v:" + << OplogEntry::kOplogVersion + << ", op:'i', o:{_id:1, a:1}}]}}"), + // oplog fetcher find + fromjson(str::stream() + << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:[" + "{ts:Timestamp(1,1), h:1, ns:'a.a', v:" + << OplogEntry::kOplogVersion + << ", op:'i', o:{_id:1, a:1}}]}}"), + // Clone Start + // listDatabases + fromjson("{ok:1, databases:[{name:'a'}]}"), + // listCollections for "a" + fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:[" + "{name:'a', options:{}} " + "]}}"), + // listIndexes:a + fromjson(str::stream() + << "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:[" + "{v:" + << OplogEntry::kOplogVersion + << ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}"), + // find:a + fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:[" + "{_id:1, a:1} " + "]}}"), + // Clone Done + // get latest oplog ts + fromjson(str::stream() + << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" + "{ts:Timestamp(2,2), h:1, ns:'b.c', v:" + << OplogEntry::kOplogVersion + << ", op:'i', o:{_id:1, c:1}}]}}"), + // Applier starts ... + // check for rollback + fromjson(str::stream() << "{ok: 1, rbid:2}"), + }; startSync(); setResponses({responses}); @@ -984,26 +1005,30 @@ TEST_F(SteadyStateTest, RollbackTwoSyncSourcesSecondRollbackSucceeds) { TEST_F(SteadyStateTest, PauseDataReplicator) { auto lastOperationApplied = BSON("op" << "a" - << "v" << OplogEntry::kOplogVersion << "ts" + << "v" + << OplogEntry::kOplogVersion + << "ts" << Timestamp(Seconds(123), 0)); auto operationToApply = BSON("op" << "a" - << "v" << OplogEntry::kOplogVersion << "ts" + << "v" + << OplogEntry::kOplogVersion + << "ts" << Timestamp(Seconds(456), 0)); stdx::mutex mutex; unittest::Barrier barrier(2U); Timestamp lastTimestampApplied; BSONObj operationApplied; - getExternalState()->multiApplyFn = - [&](OperationContext*, const MultiApplier::Operations& ops, MultiApplier::ApplyOperationFn) - -> StatusWith<OpTime> { - stdx::lock_guard<stdx::mutex> lock(mutex); - operationApplied = ops.back().raw; - barrier.countDownAndWait(); - return ops.back().getOpTime(); - }; + getExternalState()->multiApplyFn = [&](OperationContext*, + const MultiApplier::Operations& ops, + MultiApplier::ApplyOperationFn) -> StatusWith<OpTime> { + stdx::lock_guard<stdx::mutex> lock(mutex); + operationApplied = ops.back().raw; + barrier.countDownAndWait(); + return ops.back().getOpTime(); + }; DataReplicatorOptions::SetMyLastOptimeFn oldSetMyLastOptime = _setMyLastOptime; _setMyLastOptime = [&](const OpTime& opTime) { oldSetMyLastOptime(opTime); @@ -1076,26 +1101,30 @@ TEST_F(SteadyStateTest, PauseDataReplicator) { TEST_F(SteadyStateTest, ApplyOneOperation) { auto lastOperationApplied = BSON("op" << "a" - << "v" << OplogEntry::kOplogVersion << "ts" + << "v" + << OplogEntry::kOplogVersion + << "ts" << Timestamp(Seconds(123), 0)); auto operationToApply = BSON("op" << "a" - << "v" << OplogEntry::kOplogVersion << "ts" + << "v" + << OplogEntry::kOplogVersion + << "ts" << Timestamp(Seconds(456), 0)); stdx::mutex mutex; unittest::Barrier barrier(2U); Timestamp lastTimestampApplied; BSONObj operationApplied; - getExternalState()->multiApplyFn = - [&](OperationContext*, const MultiApplier::Operations& ops, MultiApplier::ApplyOperationFn) - -> StatusWith<OpTime> { - stdx::lock_guard<stdx::mutex> lock(mutex); - operationApplied = ops.back().raw; - barrier.countDownAndWait(); - return ops.back().getOpTime(); - }; + getExternalState()->multiApplyFn = [&](OperationContext*, + const MultiApplier::Operations& ops, + MultiApplier::ApplyOperationFn) -> StatusWith<OpTime> { + stdx::lock_guard<stdx::mutex> lock(mutex); + operationApplied = ops.back().raw; + barrier.countDownAndWait(); + return ops.back().getOpTime(); + }; DataReplicatorOptions::SetMyLastOptimeFn oldSetMyLastOptime = _setMyLastOptime; _setMyLastOptime = [&](const OpTime& opTime) { oldSetMyLastOptime(opTime); |