From f51dce58fcda7341cc6d72279c23425205f958a2 Mon Sep 17 00:00:00 2001 From: Lingzhi Deng Date: Tue, 21 Jan 2020 17:46:44 -0500 Subject: SERVER-45431: Create new test fixture for OplogFetcher --- src/mongo/db/repl/oplog_fetcher.cpp | 13 +- src/mongo/db/repl/oplog_fetcher.h | 19 ++ src/mongo/db/repl/oplog_fetcher_test.cpp | 253 +++++++++++++++++++++- src/mongo/dbtests/mock/mock_dbclient_connection.h | 4 +- 4 files changed, 283 insertions(+), 6 deletions(-) diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp index 7c34f302284..c4595103905 100644 --- a/src/mongo/db/repl/oplog_fetcher.cpp +++ b/src/mongo/db/repl/oplog_fetcher.cpp @@ -601,13 +601,14 @@ NewOplogFetcher::NewOplogFetcher( _onShutdownCallbackFn(onShutdownCallbackFn), _lastFetched(lastFetched), _metadataObj(makeMetadataObject()), + _createClientFn( + [] { return std::make_unique(true /* autoReconnect */); }), _requireFresherSyncSource(requireFresherSyncSource), _dataReplicatorExternalState(dataReplicatorExternalState), _enqueueDocumentsFn(enqueueDocumentsFn), _awaitDataTimeout(calculateAwaitDataTimeout(config)), _batchSize(batchSize), _startingPoint(startingPoint) { - invariant(config.isInitialized()); invariant(!_lastFetched.isNull()); invariant(onShutdownCallbackFn); @@ -651,6 +652,16 @@ Milliseconds NewOplogFetcher::getAwaitDataTimeout_forTest() const { return _awaitDataTimeout; } +void NewOplogFetcher::setCreateClientFn_forTest(const CreateClientFn& createClientFn) { + stdx::lock_guard lock(_mutex); + _createClientFn = createClientFn; +} + +DBClientConnection* NewOplogFetcher::getDBClientConnection_forTest() const { + stdx::lock_guard lock(_mutex); + return _conn.get(); +} + OpTime NewOplogFetcher::_getLastOpTimeFetched() const { stdx::lock_guard lock(_mutex); return _lastFetched; diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h index fd3ecadba95..5ecda88adb1 100644 --- a/src/mongo/db/repl/oplog_fetcher.h +++ b/src/mongo/db/repl/oplog_fetcher.h @@ -410,6 +410,22 @@ public: */ Milliseconds getAwaitDataTimeout_forTest() const; + /** + * Type of function to create a database client connection. Used for testing only. + */ + using CreateClientFn = std::function()>; + + /** + * Overrides how the OplogFetcher creates the client. Used for testing only. + */ + void setCreateClientFn_forTest(const CreateClientFn& createClientFn); + + /** + * Get a raw pointer to the client connection. It is the caller's responsibility to not reuse + * this pointer beyond the lifetime of the underlying client. Used for testing only. + */ + DBClientConnection* getDBClientConnection_forTest() const; + private: // =============== AbstractAsyncComponent overrides ================ @@ -533,6 +549,9 @@ private: // via its shutdownAndDisallowReconnect function. std::unique_ptr _conn; + // Used to create the DBClientConnection for the oplog fetcher. + CreateClientFn _createClientFn; + // The tailable, awaitData, exhaust cursor used to fetch oplog entries from the sync source. // When an error is encountered, depending on the result of OplogFetcherRestartDecision's // shouldContinue function, a new cursor will be created or the oplog fetcher will shut down. diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp index ddbae311c66..7644fd5d2e0 100644 --- a/src/mongo/db/repl/oplog_fetcher_test.cpp +++ b/src/mongo/db/repl/oplog_fetcher_test.cpp @@ -35,6 +35,7 @@ #include "mongo/db/repl/data_replicator_external_state_mock.h" #include "mongo/db/repl/oplog_fetcher.h" #include "mongo/db/repl/task_executor_mock.h" +#include "mongo/dbtests/mock/mock_dbclient_connection.h" #include "mongo/rpc/metadata.h" #include "mongo/rpc/metadata/oplog_query_metadata.h" #include "mongo/rpc/metadata/repl_set_metadata.h" @@ -1026,17 +1027,126 @@ TEST_F(OplogFetcherTest, ASSERT_EQUALS(OpTime(), info.lastDocument); } +BSONObj makeNoopOplogEntry(OpTime opTime) { + auto oplogEntry = + repl::OplogEntry(opTime, // optime + boost::none, // hash + OpTypeEnum ::kNoop, // opType + NamespaceString("test.t"), // namespace + boost::none, // uuid + boost::none, // fromMigrate + repl::OplogEntry::kOplogVersion, // version + BSONObj(), // o + boost::none, // o2 + {}, // sessionInfo + boost::none, // upsert + Date_t(), // wall clock time + boost::none, // statement id + boost::none, // optime of previous write within same transaction + boost::none, // pre-image optime + boost::none); // post-image optime + return oplogEntry.toBSON(); +} + +BSONObj makeNoopOplogEntry(Seconds seconds) { + return makeNoopOplogEntry({{seconds, 0}, 1LL}); +} + +BSONObj makeOplogBatchMetadata(boost::optional replMetadata, + boost::optional oqMetadata) { + BSONObjBuilder bob; + if (replMetadata) { + ASSERT_OK(replMetadata->writeToMetadata(&bob)); + } + if (oqMetadata) { + ASSERT_OK(oqMetadata->writeToMetadata(&bob)); + } + return bob.obj(); +} + +Message makeFirstBatch(CursorId cursorId, + const NewOplogFetcher::Documents& oplogEntries, + const BSONObj& metadata) { + return MockDBClientConnection::mockFindResponse( + NamespaceString::kRsOplogNamespace, cursorId, oplogEntries, metadata); +} + +Message makeSubsequentBatch(CursorId cursorId, + const NewOplogFetcher::Documents& oplogEntries, + const BSONObj& metadata, + bool moreToCome) { + return MockDBClientConnection::mockGetMoreResponse( + NamespaceString::kRsOplogNamespace, cursorId, oplogEntries, metadata, moreToCome); +} + +bool blockedOnNetworkSoon(MockDBClientConnection* conn) { + // Wait up to 10 seconds. + for (auto i = 0; i < 100; i++) { + if (conn->isBlockedOnNetwork()) { + return true; + } + mongo::sleepmillis(100); + } + return false; +} + +// Simulate a response to a single outgoing client request and return the client request. Use this +// function to simulate responses to client find/getMore requests. +Message processSingleRequestResponse(DBClientConnection* conn, + const Message& response, + bool expectReadyNetworkOperationsAfterProcessing = false) { + auto* mockConn = dynamic_cast(conn); + ASSERT_TRUE(blockedOnNetworkSoon(mockConn)); + auto request = mockConn->getLastSentMessage(); + mockConn->setCallResponses({response}); + if (expectReadyNetworkOperationsAfterProcessing) { + ASSERT_TRUE(blockedOnNetworkSoon(mockConn)); + } + return request; +} + +// Simulate a response to a single network recv() call. Use this function to simulate responses to +// exhaust stream where a client expects to receive responses without sending out new requests. +void processSingleExhaustResponse(DBClientConnection* conn, + const Message& response, + bool expectReadyNetworkOperationsAfterProcessing = false) { + auto* mockConn = dynamic_cast(conn); + ASSERT_TRUE(blockedOnNetworkSoon(mockConn)); + mockConn->setRecvResponses({response}); + if (expectReadyNetworkOperationsAfterProcessing) { + ASSERT_TRUE(blockedOnNetworkSoon(mockConn)); + } +} + + class NewOplogFetcherTest : public executor::ThreadPoolExecutorTest { -public: - void setUp() override; +protected: + static const OpTime remoteNewerOpTime; + static const OpTime staleOpTime; + static const Date_t staleWallTime; + static const int rbid = 2; + static const int primaryIndex = 2; + static const int syncSourceIndex = 2; + static const rpc::OplogQueryMetadata staleOqMetadata; // 16MB max batch size / 12 byte min doc size * 10 (for good measure) = defaultBatchSize to use. const int defaultBatchSize = (16 * 1024 * 1024) / 12 * 10; + void setUp() override; + std::unique_ptr makeOplogFetcher(); std::unique_ptr makeOplogFetcherWithDifferentExecutor( executor::TaskExecutor* executor, NewOplogFetcher::OnShutdownCallbackFn fn); + std::unique_ptr processSingleBatch(const Message& response, + bool requireFresherSyncSource = true); + + /** + * Tests checkSyncSource result handling. + */ + void testSyncSourceChecking(boost::optional replMetadata, + boost::optional oqMetadata); + std::unique_ptr dataReplicatorExternalState; NewOplogFetcher::Documents lastEnqueuedDocuments; @@ -1047,6 +1157,12 @@ public: OpTime lastFetched; }; +const OpTime NewOplogFetcherTest::remoteNewerOpTime = OpTime(Timestamp(124, 1), 2); +const OpTime NewOplogFetcherTest::staleOpTime = OpTime(Timestamp(1, 1), 0); +const Date_t NewOplogFetcherTest::staleWallTime = Date_t() + Seconds(staleOpTime.getSecs()); +const rpc::OplogQueryMetadata NewOplogFetcherTest::staleOqMetadata = rpc::OplogQueryMetadata( + {staleOpTime, staleWallTime}, staleOpTime, rbid, primaryIndex, syncSourceIndex); + void NewOplogFetcherTest::setUp() { executor::ThreadPoolExecutorTest::setUp(); launchExecutorThread(); @@ -1072,7 +1188,7 @@ std::unique_ptr NewOplogFetcherTest::makeOplogFetcher() { std::unique_ptr NewOplogFetcherTest::makeOplogFetcherWithDifferentExecutor( executor::TaskExecutor* executor, NewOplogFetcher::OnShutdownCallbackFn fn) { - return std::make_unique( + auto oplogFetcher = std::make_unique( executor, lastFetched, source, @@ -1085,6 +1201,64 @@ std::unique_ptr NewOplogFetcherTest::makeOplogFetcherWithDiffer fn, defaultBatchSize, NewOplogFetcher::StartingPoint::kSkipFirstDoc); + oplogFetcher->setCreateClientFn_forTest( + []() { return std::unique_ptr(new MockDBClientConnection()); }); + return oplogFetcher; +} + +std::unique_ptr NewOplogFetcherTest::processSingleBatch( + const Message& response, bool requireFresherSyncSource) { + auto shutdownState = std::make_unique(); + NewOplogFetcher oplogFetcher( + &getExecutor(), + lastFetched, + source, + _createConfig(), + std::make_unique(0), + rbid, + requireFresherSyncSource, + dataReplicatorExternalState.get(), + enqueueDocumentsFn, + std::ref(*shutdownState), + defaultBatchSize, + NewOplogFetcher::StartingPoint::kSkipFirstDoc); + oplogFetcher.setCreateClientFn_forTest( + []() { return std::unique_ptr(new MockDBClientConnection()); }); + + ASSERT_FALSE(oplogFetcher.isActive()); + ASSERT_OK(oplogFetcher.startup()); + ASSERT_TRUE(oplogFetcher.isActive()); + + auto m = processSingleRequestResponse(oplogFetcher.getDBClientConnection_forTest(), response); + auto msg = mongo::OpMsg::parse(m); + ASSERT_EQ(mongo::StringData(msg.body.firstElement().fieldName()), "find"); + ASSERT_TRUE(msg.body.getBoolField("tailable")); + ASSERT_TRUE(msg.body.getBoolField("oplogReplay")); + ASSERT_TRUE(msg.body.getBoolField("awaitData")); + ASSERT_EQUALS(60000, msg.body.getIntField("maxTimeMS")); + // TODO SERVER-45468: Test the find command and the metadata sent. + + oplogFetcher.shutdown(); + oplogFetcher.join(); + + return shutdownState; +} + +void NewOplogFetcherTest::testSyncSourceChecking( + boost::optional replMetadata, + boost::optional oqMetadata) { + auto firstEntry = makeNoopOplogEntry(lastFetched); + auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()}); + auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.getTerm()}); + + auto metadataObj = makeOplogBatchMetadata(replMetadata, oqMetadata); + + dataReplicatorExternalState->shouldStopFetchingResult = true; + + auto shutdownState = + processSingleBatch(makeFirstBatch(0, {firstEntry, secondEntry, thirdEntry}, metadataObj)); + + ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, shutdownState->getStatus()); } TEST_F(NewOplogFetcherTest, ShuttingExecutorDownShouldPreventOplogFetcherFromStarting) { @@ -1234,4 +1408,77 @@ TEST_F(NewOplogFetcherTest, AwaitDataTimeoutSmallerWhenFailPointSet) { ASSERT_EQUALS(Milliseconds(50), timeout); failPoint->setMode(FailPoint::off); } + +TEST_F( + NewOplogFetcherTest, + NoDataAvailableAfterFirstTwoBatchesShouldCauseTheOplogFetcherToShutDownWithSuccessfulStatus) { + // TODO SERVER-45468: Enable this test. + return; + + ShutdownState shutdownState; + + NewOplogFetcher oplogFetcher( + &getExecutor(), + lastFetched, + source, + _createConfig(), + std::make_unique(0), + rbid, + true, + dataReplicatorExternalState.get(), + enqueueDocumentsFn, + std::ref(shutdownState), + defaultBatchSize, + NewOplogFetcher::StartingPoint::kSkipFirstDoc); + oplogFetcher.setCreateClientFn_forTest( + []() { return std::unique_ptr(new MockDBClientConnection()); }); + + ASSERT_EQUALS(OplogFetcher::State::kPreStart, oplogFetcher.getState_forTest()); + + ASSERT_OK(oplogFetcher.startup()); + ASSERT_EQUALS(OplogFetcher::State::kRunning, oplogFetcher.getState_forTest()); + + CursorId cursorId = 22LL; + auto firstEntry = makeNoopOplogEntry(lastFetched); + auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()}); + + auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata); + + processSingleRequestResponse(oplogFetcher.getDBClientConnection_forTest(), + makeFirstBatch(cursorId, {firstEntry, secondEntry}, metadataObj), + true); + + ASSERT_EQUALS(1U, lastEnqueuedDocuments.size()); + ASSERT_BSONOBJ_EQ(secondEntry, lastEnqueuedDocuments[0]); + + auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.getTerm()}); + auto fourthEntry = makeNoopOplogEntry({{Seconds(1200), 0}, lastFetched.getTerm()}); + + // Set cursor ID to 0 in getMore response to indicate no more data available. + const auto moreToCome = false; + auto m = processSingleRequestResponse( + oplogFetcher.getDBClientConnection_forTest(), + makeSubsequentBatch(0, {thirdEntry, fourthEntry}, metadataObj, moreToCome), + false); + auto msg = mongo::OpMsg::parse(m); + + ASSERT_EQ(mongo::StringData(msg.body.firstElement().fieldName()), "getMore"); + ASSERT_EQUALS(NamespaceString::kRsOplogNamespace.coll(), msg.body["collection"].String()); + ASSERT_EQUALS(int(durationCount(oplogFetcher.getAwaitDataTimeout_forTest())), + msg.body.getIntField("maxTimeMS")); + + ASSERT_EQUALS(2U, lastEnqueuedDocuments.size()); + ASSERT_BSONOBJ_EQ(thirdEntry, lastEnqueuedDocuments[0]); + ASSERT_BSONOBJ_EQ(fourthEntry, lastEnqueuedDocuments[1]); + + oplogFetcher.join(); + ASSERT_EQUALS(OplogFetcher::State::kComplete, oplogFetcher.getState_forTest()); + + ASSERT_OK(shutdownState.getStatus()); + + ASSERT_EQUALS(dataReplicatorExternalState->currentTerm, msg.body["term"].numberLong()); + ASSERT_EQUALS(dataReplicatorExternalState->lastCommittedOpTime, + unittest::assertGet( + OpTime::parseFromOplogEntry(msg.body["lastKnownCommittedOpTime"].Obj()))); +} } // namespace diff --git a/src/mongo/dbtests/mock/mock_dbclient_connection.h b/src/mongo/dbtests/mock/mock_dbclient_connection.h index f931766af7d..1976ce70d4d 100644 --- a/src/mongo/dbtests/mock/mock_dbclient_connection.h +++ b/src/mongo/dbtests/mock/mock_dbclient_connection.h @@ -49,7 +49,7 @@ public: */ static Message mockFindResponse(NamespaceString nss, long long cursorId, - std::vector firstBatch, + const std::vector& firstBatch, const BSONObj& metadata) { auto cursorRes = CursorResponse(nss, cursorId, firstBatch); BSONObjBuilder bob(cursorRes.toBSON(CursorResponse::ResponseType::InitialResponse)); @@ -62,7 +62,7 @@ public: */ static Message mockGetMoreResponse(NamespaceString nss, long long cursorId, - std::vector batch, + const std::vector& batch, const BSONObj& metadata, bool moreToCome = false) { auto cursorRes = CursorResponse(nss, cursorId, batch); -- cgit v1.2.1