From 821b4198bcf3862b3cb06be8115e6fca0b902cef Mon Sep 17 00:00:00 2001 From: Lingzhi Deng Date: Thu, 16 Jan 2020 10:31:03 -0500 Subject: SERVER-45603: Enhance MockDBClientConnection for OplogFetcher unit testing --- .../dbtests/mock/mock_dbclient_connection.cpp | 71 +++++- src/mongo/dbtests/mock/mock_dbclient_connection.h | 87 +++++++- src/mongo/dbtests/mock_dbclient_conn_test.cpp | 237 +++++++++++++++++++++ 3 files changed, 384 insertions(+), 11 deletions(-) diff --git a/src/mongo/dbtests/mock/mock_dbclient_connection.cpp b/src/mongo/dbtests/mock/mock_dbclient_connection.cpp index 79afb8861f8..0b86fd40912 100644 --- a/src/mongo/dbtests/mock/mock_dbclient_connection.cpp +++ b/src/mongo/dbtests/mock/mock_dbclient_connection.cpp @@ -43,6 +43,14 @@ using std::string; using std::vector; namespace mongo { +MockDBClientConnection::MockDBClientConnection() + : _remoteServer(nullptr), + _isFailed(false), + _sockCreationTime(mongo::curTimeMicros64()), + _autoReconnect(false) { + _setServerRPCProtocols(rpc::supports::kAll); +} + MockDBClientConnection::MockDBClientConnection(MockRemoteDBServer* remoteServer, bool autoReconnect) : _remoteServerInstanceID(remoteServer->getInstanceID()), _remoteServer(remoteServer), @@ -55,6 +63,7 @@ MockDBClientConnection::~MockDBClientConnection() {} bool MockDBClientConnection::connect(const char* hostName, StringData applicationName, std::string& errmsg) { + invariant(_remoteServer); if (_remoteServer->isRunning()) { _remoteServerInstanceID = _remoteServer->getInstanceID(); _setServerRPCProtocols(rpc::supports::kAll); @@ -72,6 +81,7 @@ std::pair MockDBClientConnection::runCommandWit try { _lastCursorMessage = boost::none; + invariant(_remoteServer); auto reply = _remoteServer->runCommand(_remoteServerInstanceID, request); auto status = getStatusFromCommandResult(reply->getCommandReply()); // The real DBClientBase always throws HostUnreachable on network error, so we do the @@ -103,6 +113,7 @@ std::unique_ptr MockDBClientConnection::query( checkConnection(); try { + invariant(_remoteServer); mongo::BSONArray result(_remoteServer->query(_remoteServerInstanceID, nsOrUuid, query, @@ -173,11 +184,11 @@ bool MockDBClientConnection::isFailed() const { } string MockDBClientConnection::getServerAddress() const { - return _remoteServer->getServerAddress(); + return _remoteServer ? _remoteServer->getServerAddress() : "localhost:27017"; } string MockDBClientConnection::toString() const { - return _remoteServer->toString(); + return _remoteServer ? _remoteServer->toString() : "localhost:27017"; } unsigned long long MockDBClientConnection::query( @@ -195,6 +206,7 @@ uint64_t MockDBClientConnection::getSockCreationMicroSec() const { } void MockDBClientConnection::insert(const string& ns, BSONObj obj, int flags) { + invariant(_remoteServer); _remoteServer->insert(ns, obj, flags); } @@ -205,11 +217,12 @@ void MockDBClientConnection::insert(const string& ns, const vector& obj } void MockDBClientConnection::remove(const string& ns, Query query, int flags) { + invariant(_remoteServer); _remoteServer->remove(ns, query, flags); } void MockDBClientConnection::killCursor(const NamespaceString& ns, long long cursorID) { - verify(false); // unimplemented + invariant(false); // unimplemented } bool MockDBClientConnection::call(mongo::Message& toSend, @@ -235,16 +248,60 @@ bool MockDBClientConnection::call(mongo::Message& toSend, return true; } } - verify(false); // unimplemented otherwise - return false; + stdx::unique_lock lk(_netMutex); + + _lastSentMessage = toSend; + _mockCallResponsesCV.wait(lk, [&] { + _blockedOnNetwork = (_callIter == _mockCallResponses.end()); + return !_blockedOnNetwork; + }); + + const auto& swResponse = *_callIter; + _callIter++; + response = uassertStatusOK(swResponse); + return true; +} + +Status MockDBClientConnection::recv(mongo::Message& m, int lastRequestId) { + stdx::unique_lock lk(_netMutex); + + _mockRecvResponsesCV.wait(lk, [&] { + _blockedOnNetwork = (_recvIter == _mockRecvResponses.end()); + return !_blockedOnNetwork; + }); + + const auto& swResponse = *_recvIter; + _recvIter++; + m = uassertStatusOK(swResponse); + return Status::OK(); +} + +void MockDBClientConnection::setCallResponses(Responses responses) { + stdx::lock_guard lk(_netMutex); + _mockCallResponses = std::move(responses); + _callIter = _mockCallResponses.begin(); + if (_blockedOnNetwork && !_mockCallResponses.empty()) { + _blockedOnNetwork = false; + _mockCallResponsesCV.notify_all(); + } +} + +void MockDBClientConnection::setRecvResponses(Responses responses) { + stdx::lock_guard lk(_netMutex); + _mockRecvResponses = std::move(responses); + _recvIter = _mockRecvResponses.begin(); + if (_blockedOnNetwork && !_mockRecvResponses.empty()) { + _blockedOnNetwork = false; + _mockRecvResponsesCV.notify_all(); + } } void MockDBClientConnection::say(mongo::Message& toSend, bool isRetry, string* actualServer) { - verify(false); // unimplemented + invariant(false); // unimplemented } bool MockDBClientConnection::lazySupported() const { - verify(false); // unimplemented + invariant(false); // unimplemented return false; } diff --git a/src/mongo/dbtests/mock/mock_dbclient_connection.h b/src/mongo/dbtests/mock/mock_dbclient_connection.h index 5395f6068af..f931766af7d 100644 --- a/src/mongo/dbtests/mock/mock_dbclient_connection.h +++ b/src/mongo/dbtests/mock/mock_dbclient_connection.h @@ -33,6 +33,7 @@ #include #include "mongo/client/dbclient_connection.h" +#include "mongo/db/query/cursor_response.h" #include "mongo/dbtests/mock/mock_remote_db_server.h" namespace mongo { @@ -43,6 +44,49 @@ namespace mongo { */ class MockDBClientConnection : public mongo::DBClientConnection { public: + /** + * An OP_MSG response to a 'find' command. + */ + static Message mockFindResponse(NamespaceString nss, + long long cursorId, + std::vector firstBatch, + const BSONObj& metadata) { + auto cursorRes = CursorResponse(nss, cursorId, firstBatch); + BSONObjBuilder bob(cursorRes.toBSON(CursorResponse::ResponseType::InitialResponse)); + bob.appendElementsUnique(metadata); + return OpMsg{bob.obj()}.serialize(); + } + + /** + * An OP_MSG response to a 'getMore' command. + */ + static Message mockGetMoreResponse(NamespaceString nss, + long long cursorId, + std::vector batch, + const BSONObj& metadata, + bool moreToCome = false) { + auto cursorRes = CursorResponse(nss, cursorId, batch); + BSONObjBuilder bob(cursorRes.toBSON(CursorResponse::ResponseType::SubsequentResponse)); + bob.appendElementsUnique(metadata); + auto m = OpMsg{bob.obj()}.serialize(); + if (moreToCome) { + OpMsg::setFlag(&m, OpMsg::kMoreToCome); + } + return m; + } + + /** + * A generic non-ok OP_MSG command response. + */ + static Message mockErrorResponse(ErrorCodes::Error err) { + OpMsgBuilder builder; + BSONObjBuilder bodyBob; + bodyBob.append("ok", 0); + bodyBob.append("code", err); + builder.setBody(bodyBob.done()); + return builder.finish(); + } + /** * Create a mock connection to a mock server. * @@ -56,6 +100,11 @@ public: MockDBClientConnection(MockRemoteDBServer* remoteServer, bool autoReconnect = false); virtual ~MockDBClientConnection(); + /** + * Create a mock connection that only supports call() and recv(). + */ + MockDBClientConnection(); + // // DBClientBase methods // @@ -90,6 +139,17 @@ public: void remove(const std::string& ns, Query query, int flags = 0) override; + bool call(mongo::Message& toSend, + mongo::Message& response, + bool assertOk, + std::string* actualServer) override; + Status recv(mongo::Message& m, int lastRequestId) override; + + // Methods to simulate network responses. + using Responses = std::vector>; + void setCallResponses(Responses responses); + void setRecvResponses(Responses responses); + // // Getters // @@ -100,6 +160,16 @@ public: std::string getServerAddress() const override; std::string toString() const override; + Message getLastSentMessage() { + stdx::lock_guard lk(_netMutex); + return _lastSentMessage; + } + + bool isBlockedOnNetwork() { + stdx::lock_guard lk(_netMutex); + return _blockedOnNetwork; + } + // // Unsupported methods (defined to get rid of virtual function was hidden error) // @@ -116,10 +186,6 @@ public: // void killCursor(const NamespaceString& ns, long long cursorID) override; - bool call(mongo::Message& toSend, - mongo::Message& response, - bool assertOk, - std::string* actualServer) override; void say(mongo::Message& toSend, bool isRetry = false, std::string* actualServer = nullptr) override; @@ -134,5 +200,18 @@ private: uint64_t _sockCreationTime; bool _autoReconnect; boost::optional _lastCursorMessage; + + Mutex _netMutex = MONGO_MAKE_LATCH("MockDBClientConnection"); + + stdx::condition_variable _mockCallResponsesCV; + Responses _mockCallResponses; + Responses::iterator _callIter; + + stdx::condition_variable _mockRecvResponsesCV; + Responses _mockRecvResponses; + Responses::iterator _recvIter; + + Message _lastSentMessage; + bool _blockedOnNetwork = false; }; } // namespace mongo diff --git a/src/mongo/dbtests/mock_dbclient_conn_test.cpp b/src/mongo/dbtests/mock_dbclient_conn_test.cpp index 936dd270888..0cb8eb27263 100644 --- a/src/mongo/dbtests/mock_dbclient_conn_test.cpp +++ b/src/mongo/dbtests/mock_dbclient_conn_test.cpp @@ -624,4 +624,241 @@ TEST(MockDBClientConnTest, Delay) { ASSERT_EQUALS(1U, server.getQueryCount()); ASSERT_EQUALS(1U, server.getCmdCount()); } + +TEST(MockDBClientConnTest, SimulateCallAndRecvResponses) { + MockDBClientConnection conn; + + const NamespaceString nss("test", "coll"); + mongo::DBClientCursor cursor(&conn, + mongo::NamespaceStringOrUUID(nss), + Query().obj, + 0, + 0, + nullptr, + mongo::QueryOption_Exhaust, + 0); + cursor.setBatchSize(2); + + const auto docObj = [](int i) { return BSON("_id" << i); }; + const auto metadata = [](int i) { return BSON("$fakeMetaData" << i); }; + const long long cursorId = 123; + const bool moreToCome = true; + + // Two batches from the initial find and getMore command. + MockDBClientConnection::Responses callResponses = { + MockDBClientConnection::mockFindResponse( + nss, cursorId, {docObj(1), docObj(2)}, metadata(1)), + MockDBClientConnection::mockGetMoreResponse( + nss, cursorId, {docObj(3), docObj(4)}, metadata(2), moreToCome)}; + conn.setCallResponses(callResponses); + + // Two more batches from the exhaust stream. + MockDBClientConnection::Responses recvResponses = { + MockDBClientConnection::mockGetMoreResponse( + nss, cursorId, {docObj(5), docObj(6)}, metadata(3), moreToCome), + // Terminal getMore responses with cursorId 0 and no kMoreToCome flag. + MockDBClientConnection::mockGetMoreResponse(nss, 0, {docObj(7), docObj(8)}, metadata(4))}; + conn.setRecvResponses(recvResponses); + + int numMetaRead = 0; + conn.setReplyMetadataReader( + [&](mongo::OperationContext* opCtx, const BSONObj& metadataObj, mongo::StringData target) { + numMetaRead++; + // Verify metadata for each batch. + ASSERT(metadataObj.hasField("$fakeMetaData")); + ASSERT_EQ(numMetaRead, metadataObj["$fakeMetaData"].number()); + return mongo::Status::OK(); + }); + + // First batch from the initial find command. + ASSERT_TRUE(cursor.init()); + ASSERT_BSONOBJ_EQ(docObj(1), cursor.next()); + ASSERT_BSONOBJ_EQ(docObj(2), cursor.next()); + ASSERT_FALSE(cursor.moreInCurrentBatch()); + + // Second batch from the first getMore command. + ASSERT_TRUE(cursor.more()); + ASSERT_BSONOBJ_EQ(docObj(3), cursor.next()); + ASSERT_BSONOBJ_EQ(docObj(4), cursor.next()); + ASSERT_FALSE(cursor.moreInCurrentBatch()); + + // Third batch from the exhaust stream. + ASSERT_TRUE(cursor.more()); + ASSERT_BSONOBJ_EQ(docObj(5), cursor.next()); + ASSERT_BSONOBJ_EQ(docObj(6), cursor.next()); + ASSERT_FALSE(cursor.moreInCurrentBatch()); + + // Last batch from the exhaust stream. + ASSERT_TRUE(cursor.more()); + ASSERT_BSONOBJ_EQ(docObj(7), cursor.next()); + ASSERT_BSONOBJ_EQ(docObj(8), cursor.next()); + ASSERT_FALSE(cursor.moreInCurrentBatch()); + + // No more batches. + ASSERT_FALSE(cursor.more()); + ASSERT_TRUE(cursor.isDead()); + + // Test that metadata reader is called four times for the four batches. + ASSERT_EQ(4, numMetaRead); +} + +TEST(MockDBClientConnTest, SimulateCallErrors) { + MockDBClientConnection conn; + + const NamespaceString nss("test", "coll"); + mongo::DBClientCursor cursor( + &conn, mongo::NamespaceStringOrUUID(nss), Query().obj, 0, 0, nullptr, 0, 0); + + // Test network exception and error response for the initial find. + MockDBClientConnection::Responses callResponses = { + // Network exception during call(). + mongo::Status{mongo::ErrorCodes::NetworkTimeout, "Fake socket timeout"}, + // Error response from the find command. + MockDBClientConnection::mockErrorResponse(mongo::ErrorCodes::Interrupted)}; + conn.setCallResponses(callResponses); + + // Throw call exception. + ASSERT_THROWS_CODE_AND_WHAT(cursor.init(), + mongo::DBException, + mongo::ErrorCodes::NetworkTimeout, + "Fake socket timeout"); + ASSERT_TRUE(cursor.isDead()); + + // Throw exception on non-OK response. + ASSERT_THROWS_CODE(cursor.init(), mongo::DBException, mongo::ErrorCodes::Interrupted); + ASSERT_TRUE(cursor.isDead()); +} + +TEST(MockDBClientConnTest, SimulateRecvErrors) { + MockDBClientConnection conn; + + const NamespaceString nss("test", "coll"); + mongo::DBClientCursor cursor(&conn, + mongo::NamespaceStringOrUUID(nss), + Query().obj, + 0, + 0, + nullptr, + mongo::QueryOption_Exhaust, + 0); + cursor.setBatchSize(1); + + const auto docObj = [](int i) { return BSON("_id" << i); }; + const auto metadata = [](int i) { return BSON("$fakeMetaData" << i); }; + const long long cursorId = 123; + const bool moreToCome = true; + + // Two batches from the initial find and getMore command. + MockDBClientConnection::Responses callResponses = { + MockDBClientConnection::mockFindResponse(nss, cursorId, {docObj(1)}, metadata(1)), + MockDBClientConnection::mockGetMoreResponse( + nss, cursorId, {docObj(2)}, metadata(2), moreToCome)}; + conn.setCallResponses(callResponses); + + // First batch from the initial find command. + ASSERT_TRUE(cursor.init()); + ASSERT_BSONOBJ_EQ(docObj(1), cursor.next()); + ASSERT_FALSE(cursor.moreInCurrentBatch()); + + // Second batch from the first getMore command. + ASSERT_TRUE(cursor.more()); + ASSERT_BSONOBJ_EQ(docObj(2), cursor.next()); + ASSERT_FALSE(cursor.moreInCurrentBatch()); + + // Test network exception and error response from exhaust stream. + MockDBClientConnection::Responses recvResponses = { + // Network exception during recv(). + mongo::Status{mongo::ErrorCodes::NetworkTimeout, "Fake socket timeout"}, + // Error response from the exhaust cursor. + MockDBClientConnection::mockErrorResponse(mongo::ErrorCodes::Interrupted)}; + conn.setRecvResponses(recvResponses); + + // The first recv() call gets a network exception. + ASSERT_THROWS_CODE_AND_WHAT(cursor.more(), + mongo::DBException, + mongo::ErrorCodes::NetworkTimeout, + "Fake socket timeout"); + // Cursor is still valid on network exceptions. + ASSERT_FALSE(cursor.isDead()); + + // Throw exception on non-OK response. + ASSERT_THROWS_CODE(cursor.more(), mongo::DBException, mongo::ErrorCodes::Interrupted); + // Cursor is dead on command errors. + ASSERT_TRUE(cursor.isDead()); +} + +TEST(MockDBClientConnTest, BlockingNetwork) { + MockDBClientConnection conn; + + const NamespaceString nss("test", "coll"); + mongo::DBClientCursor cursor(&conn, + mongo::NamespaceStringOrUUID(nss), + Query().obj, + 0, + 0, + nullptr, + mongo::QueryOption_Exhaust, + 0); + cursor.setBatchSize(1); + + const auto docObj = [](int i) { return BSON("_id" << i); }; + const auto metadata = [](int i) { return BSON("$fakeMetaData" << i); }; + const long long cursorId = 123; + const bool moreToCome = true; + + mongo::stdx::thread cursorThread([&] { + // First batch from the initial find command. + ASSERT_TRUE(cursor.init()); + ASSERT_BSONOBJ_EQ(docObj(1), cursor.next()); + ASSERT_FALSE(cursor.moreInCurrentBatch()); + + // Second batch from the first getMore command. + ASSERT_TRUE(cursor.more()); + ASSERT_BSONOBJ_EQ(docObj(2), cursor.next()); + ASSERT_FALSE(cursor.moreInCurrentBatch()); + + // Last batch from the exhaust stream. + ASSERT_TRUE(cursor.more()); + ASSERT_BSONOBJ_EQ(docObj(3), cursor.next()); + ASSERT_FALSE(cursor.moreInCurrentBatch()); + ASSERT_TRUE(cursor.isDead()); + }); + + const auto blockedOnNetworkSoon = [&]() { + // Wait up to 10 seconds. + for (auto i = 0; i < 100; i++) { + if (conn.isBlockedOnNetwork()) { + return true; + } + mongo::sleepmillis(100); + } + return false; + }; + + // Cursor should be blocked on the first find command. + ASSERT_TRUE(blockedOnNetworkSoon()); + auto m = conn.getLastSentMessage(); + auto msg = mongo::OpMsg::parse(m); + ASSERT_EQ(mongo::StringData(msg.body.firstElement().fieldName()), "find"); + // Set the response for the find command and unblock network call(). + conn.setCallResponses( + {MockDBClientConnection::mockFindResponse(nss, cursorId, {docObj(1)}, metadata(1))}); + + // Cursor should be blocked on the getMore command. + ASSERT_TRUE(blockedOnNetworkSoon()); + m = conn.getLastSentMessage(); + msg = mongo::OpMsg::parse(m); + ASSERT_EQ(mongo::StringData(msg.body.firstElement().fieldName()), "getMore"); + // Set the response for the getMore command and unblock network call(). + conn.setCallResponses({MockDBClientConnection::mockGetMoreResponse( + nss, cursorId, {docObj(2)}, metadata(2), moreToCome)}); + + // Cursor should be blocked on the exhaust stream. + ASSERT_TRUE(blockedOnNetworkSoon()); + // Set the response for the exhaust stream and unblock network recv(). + conn.setRecvResponses( + {MockDBClientConnection::mockGetMoreResponse(nss, 0, {docObj(3)}, metadata(3))}); + + cursorThread.join(); +} } // namespace mongo_test -- cgit v1.2.1