summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLingzhi Deng <lingzhi.deng@mongodb.com>2020-01-21 17:46:44 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-01-23 00:29:00 +0000
commitf51dce58fcda7341cc6d72279c23425205f958a2 (patch)
tree6e478f07e43c79d766766320060bfc9d42bd4e3a
parent759e930c88081aa0fb86e34a3ce7b2ed190c806e (diff)
downloadmongo-f51dce58fcda7341cc6d72279c23425205f958a2.tar.gz
SERVER-45431: Create new test fixture for OplogFetcher
-rw-r--r--src/mongo/db/repl/oplog_fetcher.cpp13
-rw-r--r--src/mongo/db/repl/oplog_fetcher.h19
-rw-r--r--src/mongo/db/repl/oplog_fetcher_test.cpp253
-rw-r--r--src/mongo/dbtests/mock/mock_dbclient_connection.h4
4 files changed, 283 insertions, 6 deletions
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp
index 7c34f302284..c4595103905 100644
--- a/src/mongo/db/repl/oplog_fetcher.cpp
+++ b/src/mongo/db/repl/oplog_fetcher.cpp
@@ -601,13 +601,14 @@ NewOplogFetcher::NewOplogFetcher(
_onShutdownCallbackFn(onShutdownCallbackFn),
_lastFetched(lastFetched),
_metadataObj(makeMetadataObject()),
+ _createClientFn(
+ [] { return std::make_unique<DBClientConnection>(true /* autoReconnect */); }),
_requireFresherSyncSource(requireFresherSyncSource),
_dataReplicatorExternalState(dataReplicatorExternalState),
_enqueueDocumentsFn(enqueueDocumentsFn),
_awaitDataTimeout(calculateAwaitDataTimeout(config)),
_batchSize(batchSize),
_startingPoint(startingPoint) {
-
invariant(config.isInitialized());
invariant(!_lastFetched.isNull());
invariant(onShutdownCallbackFn);
@@ -651,6 +652,16 @@ Milliseconds NewOplogFetcher::getAwaitDataTimeout_forTest() const {
return _awaitDataTimeout;
}
+void NewOplogFetcher::setCreateClientFn_forTest(const CreateClientFn& createClientFn) {
+ stdx::lock_guard lock(_mutex);
+ _createClientFn = createClientFn;
+}
+
+DBClientConnection* NewOplogFetcher::getDBClientConnection_forTest() const {
+ stdx::lock_guard lock(_mutex);
+ return _conn.get();
+}
+
OpTime NewOplogFetcher::_getLastOpTimeFetched() const {
stdx::lock_guard<Latch> lock(_mutex);
return _lastFetched;
diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h
index fd3ecadba95..5ecda88adb1 100644
--- a/src/mongo/db/repl/oplog_fetcher.h
+++ b/src/mongo/db/repl/oplog_fetcher.h
@@ -410,6 +410,22 @@ public:
*/
Milliseconds getAwaitDataTimeout_forTest() const;
+ /**
+ * Type of function to create a database client connection. Used for testing only.
+ */
+ using CreateClientFn = std::function<std::unique_ptr<DBClientConnection>()>;
+
+ /**
+ * Overrides how the OplogFetcher creates the client. Used for testing only.
+ */
+ void setCreateClientFn_forTest(const CreateClientFn& createClientFn);
+
+ /**
+ * Get a raw pointer to the client connection. It is the caller's responsibility to not reuse
+ * this pointer beyond the lifetime of the underlying client. Used for testing only.
+ */
+ DBClientConnection* getDBClientConnection_forTest() const;
+
private:
// =============== AbstractAsyncComponent overrides ================
@@ -533,6 +549,9 @@ private:
// via its shutdownAndDisallowReconnect function.
std::unique_ptr<DBClientConnection> _conn;
+ // Used to create the DBClientConnection for the oplog fetcher.
+ CreateClientFn _createClientFn;
+
// The tailable, awaitData, exhaust cursor used to fetch oplog entries from the sync source.
// When an error is encountered, depending on the result of OplogFetcherRestartDecision's
// shouldContinue function, a new cursor will be created or the oplog fetcher will shut down.
diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp
index ddbae311c66..7644fd5d2e0 100644
--- a/src/mongo/db/repl/oplog_fetcher_test.cpp
+++ b/src/mongo/db/repl/oplog_fetcher_test.cpp
@@ -35,6 +35,7 @@
#include "mongo/db/repl/data_replicator_external_state_mock.h"
#include "mongo/db/repl/oplog_fetcher.h"
#include "mongo/db/repl/task_executor_mock.h"
+#include "mongo/dbtests/mock/mock_dbclient_connection.h"
#include "mongo/rpc/metadata.h"
#include "mongo/rpc/metadata/oplog_query_metadata.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
@@ -1026,17 +1027,126 @@ TEST_F(OplogFetcherTest,
ASSERT_EQUALS(OpTime(), info.lastDocument);
}
+BSONObj makeNoopOplogEntry(OpTime opTime) {
+ auto oplogEntry =
+ repl::OplogEntry(opTime, // optime
+ boost::none, // hash
+ OpTypeEnum ::kNoop, // opType
+ NamespaceString("test.t"), // namespace
+ boost::none, // uuid
+ boost::none, // fromMigrate
+ repl::OplogEntry::kOplogVersion, // version
+ BSONObj(), // o
+ boost::none, // o2
+ {}, // sessionInfo
+ boost::none, // upsert
+ Date_t(), // wall clock time
+ boost::none, // statement id
+ boost::none, // optime of previous write within same transaction
+ boost::none, // pre-image optime
+ boost::none); // post-image optime
+ return oplogEntry.toBSON();
+}
+
+BSONObj makeNoopOplogEntry(Seconds seconds) {
+ return makeNoopOplogEntry({{seconds, 0}, 1LL});
+}
+
+BSONObj makeOplogBatchMetadata(boost::optional<const rpc::ReplSetMetadata&> replMetadata,
+ boost::optional<const rpc::OplogQueryMetadata&> oqMetadata) {
+ BSONObjBuilder bob;
+ if (replMetadata) {
+ ASSERT_OK(replMetadata->writeToMetadata(&bob));
+ }
+ if (oqMetadata) {
+ ASSERT_OK(oqMetadata->writeToMetadata(&bob));
+ }
+ return bob.obj();
+}
+
+Message makeFirstBatch(CursorId cursorId,
+ const NewOplogFetcher::Documents& oplogEntries,
+ const BSONObj& metadata) {
+ return MockDBClientConnection::mockFindResponse(
+ NamespaceString::kRsOplogNamespace, cursorId, oplogEntries, metadata);
+}
+
+Message makeSubsequentBatch(CursorId cursorId,
+ const NewOplogFetcher::Documents& oplogEntries,
+ const BSONObj& metadata,
+ bool moreToCome) {
+ return MockDBClientConnection::mockGetMoreResponse(
+ NamespaceString::kRsOplogNamespace, cursorId, oplogEntries, metadata, moreToCome);
+}
+
+bool blockedOnNetworkSoon(MockDBClientConnection* conn) {
+ // Wait up to 10 seconds.
+ for (auto i = 0; i < 100; i++) {
+ if (conn->isBlockedOnNetwork()) {
+ return true;
+ }
+ mongo::sleepmillis(100);
+ }
+ return false;
+}
+
+// Simulate a response to a single outgoing client request and return the client request. Use this
+// function to simulate responses to client find/getMore requests.
+Message processSingleRequestResponse(DBClientConnection* conn,
+ const Message& response,
+ bool expectReadyNetworkOperationsAfterProcessing = false) {
+ auto* mockConn = dynamic_cast<MockDBClientConnection*>(conn);
+ ASSERT_TRUE(blockedOnNetworkSoon(mockConn));
+ auto request = mockConn->getLastSentMessage();
+ mockConn->setCallResponses({response});
+ if (expectReadyNetworkOperationsAfterProcessing) {
+ ASSERT_TRUE(blockedOnNetworkSoon(mockConn));
+ }
+ return request;
+}
+
+// Simulate a response to a single network recv() call. Use this function to simulate responses to
+// exhaust stream where a client expects to receive responses without sending out new requests.
+void processSingleExhaustResponse(DBClientConnection* conn,
+ const Message& response,
+ bool expectReadyNetworkOperationsAfterProcessing = false) {
+ auto* mockConn = dynamic_cast<MockDBClientConnection*>(conn);
+ ASSERT_TRUE(blockedOnNetworkSoon(mockConn));
+ mockConn->setRecvResponses({response});
+ if (expectReadyNetworkOperationsAfterProcessing) {
+ ASSERT_TRUE(blockedOnNetworkSoon(mockConn));
+ }
+}
+
+
class NewOplogFetcherTest : public executor::ThreadPoolExecutorTest {
-public:
- void setUp() override;
+protected:
+ static const OpTime remoteNewerOpTime;
+ static const OpTime staleOpTime;
+ static const Date_t staleWallTime;
+ static const int rbid = 2;
+ static const int primaryIndex = 2;
+ static const int syncSourceIndex = 2;
+ static const rpc::OplogQueryMetadata staleOqMetadata;
// 16MB max batch size / 12 byte min doc size * 10 (for good measure) = defaultBatchSize to use.
const int defaultBatchSize = (16 * 1024 * 1024) / 12 * 10;
+ void setUp() override;
+
std::unique_ptr<NewOplogFetcher> makeOplogFetcher();
std::unique_ptr<NewOplogFetcher> makeOplogFetcherWithDifferentExecutor(
executor::TaskExecutor* executor, NewOplogFetcher::OnShutdownCallbackFn fn);
+ std::unique_ptr<ShutdownState> processSingleBatch(const Message& response,
+ bool requireFresherSyncSource = true);
+
+ /**
+ * Tests checkSyncSource result handling.
+ */
+ void testSyncSourceChecking(boost::optional<const rpc::ReplSetMetadata&> replMetadata,
+ boost::optional<const rpc::OplogQueryMetadata&> oqMetadata);
+
std::unique_ptr<DataReplicatorExternalStateMock> dataReplicatorExternalState;
NewOplogFetcher::Documents lastEnqueuedDocuments;
@@ -1047,6 +1157,12 @@ public:
OpTime lastFetched;
};
+const OpTime NewOplogFetcherTest::remoteNewerOpTime = OpTime(Timestamp(124, 1), 2);
+const OpTime NewOplogFetcherTest::staleOpTime = OpTime(Timestamp(1, 1), 0);
+const Date_t NewOplogFetcherTest::staleWallTime = Date_t() + Seconds(staleOpTime.getSecs());
+const rpc::OplogQueryMetadata NewOplogFetcherTest::staleOqMetadata = rpc::OplogQueryMetadata(
+ {staleOpTime, staleWallTime}, staleOpTime, rbid, primaryIndex, syncSourceIndex);
+
void NewOplogFetcherTest::setUp() {
executor::ThreadPoolExecutorTest::setUp();
launchExecutorThread();
@@ -1072,7 +1188,7 @@ std::unique_ptr<NewOplogFetcher> NewOplogFetcherTest::makeOplogFetcher() {
std::unique_ptr<NewOplogFetcher> NewOplogFetcherTest::makeOplogFetcherWithDifferentExecutor(
executor::TaskExecutor* executor, NewOplogFetcher::OnShutdownCallbackFn fn) {
- return std::make_unique<NewOplogFetcher>(
+ auto oplogFetcher = std::make_unique<NewOplogFetcher>(
executor,
lastFetched,
source,
@@ -1085,6 +1201,64 @@ std::unique_ptr<NewOplogFetcher> NewOplogFetcherTest::makeOplogFetcherWithDiffer
fn,
defaultBatchSize,
NewOplogFetcher::StartingPoint::kSkipFirstDoc);
+ oplogFetcher->setCreateClientFn_forTest(
+ []() { return std::unique_ptr<DBClientConnection>(new MockDBClientConnection()); });
+ return oplogFetcher;
+}
+
+std::unique_ptr<ShutdownState> NewOplogFetcherTest::processSingleBatch(
+ const Message& response, bool requireFresherSyncSource) {
+ auto shutdownState = std::make_unique<ShutdownState>();
+ NewOplogFetcher oplogFetcher(
+ &getExecutor(),
+ lastFetched,
+ source,
+ _createConfig(),
+ std::make_unique<NewOplogFetcher::OplogFetcherRestartDecisionDefault>(0),
+ rbid,
+ requireFresherSyncSource,
+ dataReplicatorExternalState.get(),
+ enqueueDocumentsFn,
+ std::ref(*shutdownState),
+ defaultBatchSize,
+ NewOplogFetcher::StartingPoint::kSkipFirstDoc);
+ oplogFetcher.setCreateClientFn_forTest(
+ []() { return std::unique_ptr<DBClientConnection>(new MockDBClientConnection()); });
+
+ ASSERT_FALSE(oplogFetcher.isActive());
+ ASSERT_OK(oplogFetcher.startup());
+ ASSERT_TRUE(oplogFetcher.isActive());
+
+ auto m = processSingleRequestResponse(oplogFetcher.getDBClientConnection_forTest(), response);
+ auto msg = mongo::OpMsg::parse(m);
+ ASSERT_EQ(mongo::StringData(msg.body.firstElement().fieldName()), "find");
+ ASSERT_TRUE(msg.body.getBoolField("tailable"));
+ ASSERT_TRUE(msg.body.getBoolField("oplogReplay"));
+ ASSERT_TRUE(msg.body.getBoolField("awaitData"));
+ ASSERT_EQUALS(60000, msg.body.getIntField("maxTimeMS"));
+ // TODO SERVER-45468: Test the find command and the metadata sent.
+
+ oplogFetcher.shutdown();
+ oplogFetcher.join();
+
+ return shutdownState;
+}
+
+void NewOplogFetcherTest::testSyncSourceChecking(
+ boost::optional<const rpc::ReplSetMetadata&> replMetadata,
+ boost::optional<const rpc::OplogQueryMetadata&> oqMetadata) {
+ auto firstEntry = makeNoopOplogEntry(lastFetched);
+ auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()});
+ auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.getTerm()});
+
+ auto metadataObj = makeOplogBatchMetadata(replMetadata, oqMetadata);
+
+ dataReplicatorExternalState->shouldStopFetchingResult = true;
+
+ auto shutdownState =
+ processSingleBatch(makeFirstBatch(0, {firstEntry, secondEntry, thirdEntry}, metadataObj));
+
+ ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, shutdownState->getStatus());
}
TEST_F(NewOplogFetcherTest, ShuttingExecutorDownShouldPreventOplogFetcherFromStarting) {
@@ -1234,4 +1408,77 @@ TEST_F(NewOplogFetcherTest, AwaitDataTimeoutSmallerWhenFailPointSet) {
ASSERT_EQUALS(Milliseconds(50), timeout);
failPoint->setMode(FailPoint::off);
}
+
+TEST_F(
+ NewOplogFetcherTest,
+ NoDataAvailableAfterFirstTwoBatchesShouldCauseTheOplogFetcherToShutDownWithSuccessfulStatus) {
+ // TODO SERVER-45468: Enable this test.
+ return;
+
+ ShutdownState shutdownState;
+
+ NewOplogFetcher oplogFetcher(
+ &getExecutor(),
+ lastFetched,
+ source,
+ _createConfig(),
+ std::make_unique<NewOplogFetcher::OplogFetcherRestartDecisionDefault>(0),
+ rbid,
+ true,
+ dataReplicatorExternalState.get(),
+ enqueueDocumentsFn,
+ std::ref(shutdownState),
+ defaultBatchSize,
+ NewOplogFetcher::StartingPoint::kSkipFirstDoc);
+ oplogFetcher.setCreateClientFn_forTest(
+ []() { return std::unique_ptr<DBClientConnection>(new MockDBClientConnection()); });
+
+ ASSERT_EQUALS(OplogFetcher::State::kPreStart, oplogFetcher.getState_forTest());
+
+ ASSERT_OK(oplogFetcher.startup());
+ ASSERT_EQUALS(OplogFetcher::State::kRunning, oplogFetcher.getState_forTest());
+
+ CursorId cursorId = 22LL;
+ auto firstEntry = makeNoopOplogEntry(lastFetched);
+ auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()});
+
+ auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata);
+
+ processSingleRequestResponse(oplogFetcher.getDBClientConnection_forTest(),
+ makeFirstBatch(cursorId, {firstEntry, secondEntry}, metadataObj),
+ true);
+
+ ASSERT_EQUALS(1U, lastEnqueuedDocuments.size());
+ ASSERT_BSONOBJ_EQ(secondEntry, lastEnqueuedDocuments[0]);
+
+ auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.getTerm()});
+ auto fourthEntry = makeNoopOplogEntry({{Seconds(1200), 0}, lastFetched.getTerm()});
+
+ // Set cursor ID to 0 in getMore response to indicate no more data available.
+ const auto moreToCome = false;
+ auto m = processSingleRequestResponse(
+ oplogFetcher.getDBClientConnection_forTest(),
+ makeSubsequentBatch(0, {thirdEntry, fourthEntry}, metadataObj, moreToCome),
+ false);
+ auto msg = mongo::OpMsg::parse(m);
+
+ ASSERT_EQ(mongo::StringData(msg.body.firstElement().fieldName()), "getMore");
+ ASSERT_EQUALS(NamespaceString::kRsOplogNamespace.coll(), msg.body["collection"].String());
+ ASSERT_EQUALS(int(durationCount<Milliseconds>(oplogFetcher.getAwaitDataTimeout_forTest())),
+ msg.body.getIntField("maxTimeMS"));
+
+ ASSERT_EQUALS(2U, lastEnqueuedDocuments.size());
+ ASSERT_BSONOBJ_EQ(thirdEntry, lastEnqueuedDocuments[0]);
+ ASSERT_BSONOBJ_EQ(fourthEntry, lastEnqueuedDocuments[1]);
+
+ oplogFetcher.join();
+ ASSERT_EQUALS(OplogFetcher::State::kComplete, oplogFetcher.getState_forTest());
+
+ ASSERT_OK(shutdownState.getStatus());
+
+ ASSERT_EQUALS(dataReplicatorExternalState->currentTerm, msg.body["term"].numberLong());
+ ASSERT_EQUALS(dataReplicatorExternalState->lastCommittedOpTime,
+ unittest::assertGet(
+ OpTime::parseFromOplogEntry(msg.body["lastKnownCommittedOpTime"].Obj())));
+}
} // namespace
diff --git a/src/mongo/dbtests/mock/mock_dbclient_connection.h b/src/mongo/dbtests/mock/mock_dbclient_connection.h
index f931766af7d..1976ce70d4d 100644
--- a/src/mongo/dbtests/mock/mock_dbclient_connection.h
+++ b/src/mongo/dbtests/mock/mock_dbclient_connection.h
@@ -49,7 +49,7 @@ public:
*/
static Message mockFindResponse(NamespaceString nss,
long long cursorId,
- std::vector<BSONObj> firstBatch,
+ const std::vector<BSONObj>& firstBatch,
const BSONObj& metadata) {
auto cursorRes = CursorResponse(nss, cursorId, firstBatch);
BSONObjBuilder bob(cursorRes.toBSON(CursorResponse::ResponseType::InitialResponse));
@@ -62,7 +62,7 @@ public:
*/
static Message mockGetMoreResponse(NamespaceString nss,
long long cursorId,
- std::vector<BSONObj> batch,
+ const std::vector<BSONObj>& batch,
const BSONObj& metadata,
bool moreToCome = false) {
auto cursorRes = CursorResponse(nss, cursorId, batch);