summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLingzhi Deng <lingzhi.deng@mongodb.com>2020-01-16 10:31:03 -0500
committerA. Jesse Jiryu Davis <jesse@mongodb.com>2020-01-27 15:40:36 -0500
commit821b4198bcf3862b3cb06be8115e6fca0b902cef (patch)
treee746fb589ac8a0f3a3920da8766de76de2c3e588
parentd9642eba4b37bf4062278da065071a172dab7eef (diff)
downloadmongo-821b4198bcf3862b3cb06be8115e6fca0b902cef.tar.gz
SERVER-45603: Enhance MockDBClientConnection for OplogFetcher unit testing
-rw-r--r--src/mongo/dbtests/mock/mock_dbclient_connection.cpp71
-rw-r--r--src/mongo/dbtests/mock/mock_dbclient_connection.h87
-rw-r--r--src/mongo/dbtests/mock_dbclient_conn_test.cpp237
3 files changed, 384 insertions, 11 deletions
diff --git a/src/mongo/dbtests/mock/mock_dbclient_connection.cpp b/src/mongo/dbtests/mock/mock_dbclient_connection.cpp
index 79afb8861f8..0b86fd40912 100644
--- a/src/mongo/dbtests/mock/mock_dbclient_connection.cpp
+++ b/src/mongo/dbtests/mock/mock_dbclient_connection.cpp
@@ -43,6 +43,14 @@ using std::string;
using std::vector;
namespace mongo {
+MockDBClientConnection::MockDBClientConnection()
+ : _remoteServer(nullptr),
+ _isFailed(false),
+ _sockCreationTime(mongo::curTimeMicros64()),
+ _autoReconnect(false) {
+ _setServerRPCProtocols(rpc::supports::kAll);
+}
+
MockDBClientConnection::MockDBClientConnection(MockRemoteDBServer* remoteServer, bool autoReconnect)
: _remoteServerInstanceID(remoteServer->getInstanceID()),
_remoteServer(remoteServer),
@@ -55,6 +63,7 @@ MockDBClientConnection::~MockDBClientConnection() {}
bool MockDBClientConnection::connect(const char* hostName,
StringData applicationName,
std::string& errmsg) {
+ invariant(_remoteServer);
if (_remoteServer->isRunning()) {
_remoteServerInstanceID = _remoteServer->getInstanceID();
_setServerRPCProtocols(rpc::supports::kAll);
@@ -72,6 +81,7 @@ std::pair<rpc::UniqueReply, DBClientBase*> MockDBClientConnection::runCommandWit
try {
_lastCursorMessage = boost::none;
+ invariant(_remoteServer);
auto reply = _remoteServer->runCommand(_remoteServerInstanceID, request);
auto status = getStatusFromCommandResult(reply->getCommandReply());
// The real DBClientBase always throws HostUnreachable on network error, so we do the
@@ -103,6 +113,7 @@ std::unique_ptr<mongo::DBClientCursor> MockDBClientConnection::query(
checkConnection();
try {
+ invariant(_remoteServer);
mongo::BSONArray result(_remoteServer->query(_remoteServerInstanceID,
nsOrUuid,
query,
@@ -173,11 +184,11 @@ bool MockDBClientConnection::isFailed() const {
}
string MockDBClientConnection::getServerAddress() const {
- return _remoteServer->getServerAddress();
+ return _remoteServer ? _remoteServer->getServerAddress() : "localhost:27017";
}
string MockDBClientConnection::toString() const {
- return _remoteServer->toString();
+ return _remoteServer ? _remoteServer->toString() : "localhost:27017";
}
unsigned long long MockDBClientConnection::query(
@@ -195,6 +206,7 @@ uint64_t MockDBClientConnection::getSockCreationMicroSec() const {
}
void MockDBClientConnection::insert(const string& ns, BSONObj obj, int flags) {
+ invariant(_remoteServer);
_remoteServer->insert(ns, obj, flags);
}
@@ -205,11 +217,12 @@ void MockDBClientConnection::insert(const string& ns, const vector<BSONObj>& obj
}
void MockDBClientConnection::remove(const string& ns, Query query, int flags) {
+ invariant(_remoteServer);
_remoteServer->remove(ns, query, flags);
}
void MockDBClientConnection::killCursor(const NamespaceString& ns, long long cursorID) {
- verify(false); // unimplemented
+ invariant(false); // unimplemented
}
bool MockDBClientConnection::call(mongo::Message& toSend,
@@ -235,16 +248,60 @@ bool MockDBClientConnection::call(mongo::Message& toSend,
return true;
}
}
- verify(false); // unimplemented otherwise
- return false;
+ stdx::unique_lock lk(_netMutex);
+
+ _lastSentMessage = toSend;
+ _mockCallResponsesCV.wait(lk, [&] {
+ _blockedOnNetwork = (_callIter == _mockCallResponses.end());
+ return !_blockedOnNetwork;
+ });
+
+ const auto& swResponse = *_callIter;
+ _callIter++;
+ response = uassertStatusOK(swResponse);
+ return true;
+}
+
+Status MockDBClientConnection::recv(mongo::Message& m, int lastRequestId) {
+ stdx::unique_lock lk(_netMutex);
+
+ _mockRecvResponsesCV.wait(lk, [&] {
+ _blockedOnNetwork = (_recvIter == _mockRecvResponses.end());
+ return !_blockedOnNetwork;
+ });
+
+ const auto& swResponse = *_recvIter;
+ _recvIter++;
+ m = uassertStatusOK(swResponse);
+ return Status::OK();
+}
+
+void MockDBClientConnection::setCallResponses(Responses responses) {
+ stdx::lock_guard lk(_netMutex);
+ _mockCallResponses = std::move(responses);
+ _callIter = _mockCallResponses.begin();
+ if (_blockedOnNetwork && !_mockCallResponses.empty()) {
+ _blockedOnNetwork = false;
+ _mockCallResponsesCV.notify_all();
+ }
+}
+
+void MockDBClientConnection::setRecvResponses(Responses responses) {
+ stdx::lock_guard lk(_netMutex);
+ _mockRecvResponses = std::move(responses);
+ _recvIter = _mockRecvResponses.begin();
+ if (_blockedOnNetwork && !_mockRecvResponses.empty()) {
+ _blockedOnNetwork = false;
+ _mockRecvResponsesCV.notify_all();
+ }
}
void MockDBClientConnection::say(mongo::Message& toSend, bool isRetry, string* actualServer) {
- verify(false); // unimplemented
+ invariant(false); // unimplemented
}
bool MockDBClientConnection::lazySupported() const {
- verify(false); // unimplemented
+ invariant(false); // unimplemented
return false;
}
diff --git a/src/mongo/dbtests/mock/mock_dbclient_connection.h b/src/mongo/dbtests/mock/mock_dbclient_connection.h
index 5395f6068af..f931766af7d 100644
--- a/src/mongo/dbtests/mock/mock_dbclient_connection.h
+++ b/src/mongo/dbtests/mock/mock_dbclient_connection.h
@@ -33,6 +33,7 @@
#include <vector>
#include "mongo/client/dbclient_connection.h"
+#include "mongo/db/query/cursor_response.h"
#include "mongo/dbtests/mock/mock_remote_db_server.h"
namespace mongo {
@@ -44,6 +45,49 @@ namespace mongo {
class MockDBClientConnection : public mongo::DBClientConnection {
public:
/**
+ * An OP_MSG response to a 'find' command.
+ */
+ static Message mockFindResponse(NamespaceString nss,
+ long long cursorId,
+ std::vector<BSONObj> firstBatch,
+ const BSONObj& metadata) {
+ auto cursorRes = CursorResponse(nss, cursorId, firstBatch);
+ BSONObjBuilder bob(cursorRes.toBSON(CursorResponse::ResponseType::InitialResponse));
+ bob.appendElementsUnique(metadata);
+ return OpMsg{bob.obj()}.serialize();
+ }
+
+ /**
+ * An OP_MSG response to a 'getMore' command.
+ */
+ static Message mockGetMoreResponse(NamespaceString nss,
+ long long cursorId,
+ std::vector<BSONObj> batch,
+ const BSONObj& metadata,
+ bool moreToCome = false) {
+ auto cursorRes = CursorResponse(nss, cursorId, batch);
+ BSONObjBuilder bob(cursorRes.toBSON(CursorResponse::ResponseType::SubsequentResponse));
+ bob.appendElementsUnique(metadata);
+ auto m = OpMsg{bob.obj()}.serialize();
+ if (moreToCome) {
+ OpMsg::setFlag(&m, OpMsg::kMoreToCome);
+ }
+ return m;
+ }
+
+ /**
+ * A generic non-ok OP_MSG command response.
+ */
+ static Message mockErrorResponse(ErrorCodes::Error err) {
+ OpMsgBuilder builder;
+ BSONObjBuilder bodyBob;
+ bodyBob.append("ok", 0);
+ bodyBob.append("code", err);
+ builder.setBody(bodyBob.done());
+ return builder.finish();
+ }
+
+ /**
* Create a mock connection to a mock server.
*
* @param remoteServer the remote server to connect to. The caller is
@@ -56,6 +100,11 @@ public:
MockDBClientConnection(MockRemoteDBServer* remoteServer, bool autoReconnect = false);
virtual ~MockDBClientConnection();
+ /**
+ * Create a mock connection that only supports call() and recv().
+ */
+ MockDBClientConnection();
+
//
// DBClientBase methods
//
@@ -90,6 +139,17 @@ public:
void remove(const std::string& ns, Query query, int flags = 0) override;
+ bool call(mongo::Message& toSend,
+ mongo::Message& response,
+ bool assertOk,
+ std::string* actualServer) override;
+ Status recv(mongo::Message& m, int lastRequestId) override;
+
+ // Methods to simulate network responses.
+ using Responses = std::vector<StatusWith<mongo::Message>>;
+ void setCallResponses(Responses responses);
+ void setRecvResponses(Responses responses);
+
//
// Getters
//
@@ -100,6 +160,16 @@ public:
std::string getServerAddress() const override;
std::string toString() const override;
+ Message getLastSentMessage() {
+ stdx::lock_guard lk(_netMutex);
+ return _lastSentMessage;
+ }
+
+ bool isBlockedOnNetwork() {
+ stdx::lock_guard lk(_netMutex);
+ return _blockedOnNetwork;
+ }
+
//
// Unsupported methods (defined to get rid of virtual function was hidden error)
//
@@ -116,10 +186,6 @@ public:
//
void killCursor(const NamespaceString& ns, long long cursorID) override;
- bool call(mongo::Message& toSend,
- mongo::Message& response,
- bool assertOk,
- std::string* actualServer) override;
void say(mongo::Message& toSend,
bool isRetry = false,
std::string* actualServer = nullptr) override;
@@ -134,5 +200,18 @@ private:
uint64_t _sockCreationTime;
bool _autoReconnect;
boost::optional<OpMsgRequest> _lastCursorMessage;
+
+ Mutex _netMutex = MONGO_MAKE_LATCH("MockDBClientConnection");
+
+ stdx::condition_variable _mockCallResponsesCV;
+ Responses _mockCallResponses;
+ Responses::iterator _callIter;
+
+ stdx::condition_variable _mockRecvResponsesCV;
+ Responses _mockRecvResponses;
+ Responses::iterator _recvIter;
+
+ Message _lastSentMessage;
+ bool _blockedOnNetwork = false;
};
} // namespace mongo
diff --git a/src/mongo/dbtests/mock_dbclient_conn_test.cpp b/src/mongo/dbtests/mock_dbclient_conn_test.cpp
index 936dd270888..0cb8eb27263 100644
--- a/src/mongo/dbtests/mock_dbclient_conn_test.cpp
+++ b/src/mongo/dbtests/mock_dbclient_conn_test.cpp
@@ -624,4 +624,241 @@ TEST(MockDBClientConnTest, Delay) {
ASSERT_EQUALS(1U, server.getQueryCount());
ASSERT_EQUALS(1U, server.getCmdCount());
}
+
+TEST(MockDBClientConnTest, SimulateCallAndRecvResponses) {
+ MockDBClientConnection conn;
+
+ const NamespaceString nss("test", "coll");
+ mongo::DBClientCursor cursor(&conn,
+ mongo::NamespaceStringOrUUID(nss),
+ Query().obj,
+ 0,
+ 0,
+ nullptr,
+ mongo::QueryOption_Exhaust,
+ 0);
+ cursor.setBatchSize(2);
+
+ const auto docObj = [](int i) { return BSON("_id" << i); };
+ const auto metadata = [](int i) { return BSON("$fakeMetaData" << i); };
+ const long long cursorId = 123;
+ const bool moreToCome = true;
+
+ // Two batches from the initial find and getMore command.
+ MockDBClientConnection::Responses callResponses = {
+ MockDBClientConnection::mockFindResponse(
+ nss, cursorId, {docObj(1), docObj(2)}, metadata(1)),
+ MockDBClientConnection::mockGetMoreResponse(
+ nss, cursorId, {docObj(3), docObj(4)}, metadata(2), moreToCome)};
+ conn.setCallResponses(callResponses);
+
+ // Two more batches from the exhaust stream.
+ MockDBClientConnection::Responses recvResponses = {
+ MockDBClientConnection::mockGetMoreResponse(
+ nss, cursorId, {docObj(5), docObj(6)}, metadata(3), moreToCome),
+ // Terminal getMore responses with cursorId 0 and no kMoreToCome flag.
+ MockDBClientConnection::mockGetMoreResponse(nss, 0, {docObj(7), docObj(8)}, metadata(4))};
+ conn.setRecvResponses(recvResponses);
+
+ int numMetaRead = 0;
+ conn.setReplyMetadataReader(
+ [&](mongo::OperationContext* opCtx, const BSONObj& metadataObj, mongo::StringData target) {
+ numMetaRead++;
+ // Verify metadata for each batch.
+ ASSERT(metadataObj.hasField("$fakeMetaData"));
+ ASSERT_EQ(numMetaRead, metadataObj["$fakeMetaData"].number());
+ return mongo::Status::OK();
+ });
+
+ // First batch from the initial find command.
+ ASSERT_TRUE(cursor.init());
+ ASSERT_BSONOBJ_EQ(docObj(1), cursor.next());
+ ASSERT_BSONOBJ_EQ(docObj(2), cursor.next());
+ ASSERT_FALSE(cursor.moreInCurrentBatch());
+
+ // Second batch from the first getMore command.
+ ASSERT_TRUE(cursor.more());
+ ASSERT_BSONOBJ_EQ(docObj(3), cursor.next());
+ ASSERT_BSONOBJ_EQ(docObj(4), cursor.next());
+ ASSERT_FALSE(cursor.moreInCurrentBatch());
+
+ // Third batch from the exhaust stream.
+ ASSERT_TRUE(cursor.more());
+ ASSERT_BSONOBJ_EQ(docObj(5), cursor.next());
+ ASSERT_BSONOBJ_EQ(docObj(6), cursor.next());
+ ASSERT_FALSE(cursor.moreInCurrentBatch());
+
+ // Last batch from the exhaust stream.
+ ASSERT_TRUE(cursor.more());
+ ASSERT_BSONOBJ_EQ(docObj(7), cursor.next());
+ ASSERT_BSONOBJ_EQ(docObj(8), cursor.next());
+ ASSERT_FALSE(cursor.moreInCurrentBatch());
+
+ // No more batches.
+ ASSERT_FALSE(cursor.more());
+ ASSERT_TRUE(cursor.isDead());
+
+ // Test that metadata reader is called four times for the four batches.
+ ASSERT_EQ(4, numMetaRead);
+}
+
+TEST(MockDBClientConnTest, SimulateCallErrors) {
+ MockDBClientConnection conn;
+
+ const NamespaceString nss("test", "coll");
+ mongo::DBClientCursor cursor(
+ &conn, mongo::NamespaceStringOrUUID(nss), Query().obj, 0, 0, nullptr, 0, 0);
+
+ // Test network exception and error response for the initial find.
+ MockDBClientConnection::Responses callResponses = {
+ // Network exception during call().
+ mongo::Status{mongo::ErrorCodes::NetworkTimeout, "Fake socket timeout"},
+ // Error response from the find command.
+ MockDBClientConnection::mockErrorResponse(mongo::ErrorCodes::Interrupted)};
+ conn.setCallResponses(callResponses);
+
+ // Throw call exception.
+ ASSERT_THROWS_CODE_AND_WHAT(cursor.init(),
+ mongo::DBException,
+ mongo::ErrorCodes::NetworkTimeout,
+ "Fake socket timeout");
+ ASSERT_TRUE(cursor.isDead());
+
+ // Throw exception on non-OK response.
+ ASSERT_THROWS_CODE(cursor.init(), mongo::DBException, mongo::ErrorCodes::Interrupted);
+ ASSERT_TRUE(cursor.isDead());
+}
+
+TEST(MockDBClientConnTest, SimulateRecvErrors) {
+ MockDBClientConnection conn;
+
+ const NamespaceString nss("test", "coll");
+ mongo::DBClientCursor cursor(&conn,
+ mongo::NamespaceStringOrUUID(nss),
+ Query().obj,
+ 0,
+ 0,
+ nullptr,
+ mongo::QueryOption_Exhaust,
+ 0);
+ cursor.setBatchSize(1);
+
+ const auto docObj = [](int i) { return BSON("_id" << i); };
+ const auto metadata = [](int i) { return BSON("$fakeMetaData" << i); };
+ const long long cursorId = 123;
+ const bool moreToCome = true;
+
+ // Two batches from the initial find and getMore command.
+ MockDBClientConnection::Responses callResponses = {
+ MockDBClientConnection::mockFindResponse(nss, cursorId, {docObj(1)}, metadata(1)),
+ MockDBClientConnection::mockGetMoreResponse(
+ nss, cursorId, {docObj(2)}, metadata(2), moreToCome)};
+ conn.setCallResponses(callResponses);
+
+ // First batch from the initial find command.
+ ASSERT_TRUE(cursor.init());
+ ASSERT_BSONOBJ_EQ(docObj(1), cursor.next());
+ ASSERT_FALSE(cursor.moreInCurrentBatch());
+
+ // Second batch from the first getMore command.
+ ASSERT_TRUE(cursor.more());
+ ASSERT_BSONOBJ_EQ(docObj(2), cursor.next());
+ ASSERT_FALSE(cursor.moreInCurrentBatch());
+
+ // Test network exception and error response from exhaust stream.
+ MockDBClientConnection::Responses recvResponses = {
+ // Network exception during recv().
+ mongo::Status{mongo::ErrorCodes::NetworkTimeout, "Fake socket timeout"},
+ // Error response from the exhaust cursor.
+ MockDBClientConnection::mockErrorResponse(mongo::ErrorCodes::Interrupted)};
+ conn.setRecvResponses(recvResponses);
+
+ // The first recv() call gets a network exception.
+ ASSERT_THROWS_CODE_AND_WHAT(cursor.more(),
+ mongo::DBException,
+ mongo::ErrorCodes::NetworkTimeout,
+ "Fake socket timeout");
+ // Cursor is still valid on network exceptions.
+ ASSERT_FALSE(cursor.isDead());
+
+ // Throw exception on non-OK response.
+ ASSERT_THROWS_CODE(cursor.more(), mongo::DBException, mongo::ErrorCodes::Interrupted);
+ // Cursor is dead on command errors.
+ ASSERT_TRUE(cursor.isDead());
+}
+
+TEST(MockDBClientConnTest, BlockingNetwork) {
+ MockDBClientConnection conn;
+
+ const NamespaceString nss("test", "coll");
+ mongo::DBClientCursor cursor(&conn,
+ mongo::NamespaceStringOrUUID(nss),
+ Query().obj,
+ 0,
+ 0,
+ nullptr,
+ mongo::QueryOption_Exhaust,
+ 0);
+ cursor.setBatchSize(1);
+
+ const auto docObj = [](int i) { return BSON("_id" << i); };
+ const auto metadata = [](int i) { return BSON("$fakeMetaData" << i); };
+ const long long cursorId = 123;
+ const bool moreToCome = true;
+
+ mongo::stdx::thread cursorThread([&] {
+ // First batch from the initial find command.
+ ASSERT_TRUE(cursor.init());
+ ASSERT_BSONOBJ_EQ(docObj(1), cursor.next());
+ ASSERT_FALSE(cursor.moreInCurrentBatch());
+
+ // Second batch from the first getMore command.
+ ASSERT_TRUE(cursor.more());
+ ASSERT_BSONOBJ_EQ(docObj(2), cursor.next());
+ ASSERT_FALSE(cursor.moreInCurrentBatch());
+
+ // Last batch from the exhaust stream.
+ ASSERT_TRUE(cursor.more());
+ ASSERT_BSONOBJ_EQ(docObj(3), cursor.next());
+ ASSERT_FALSE(cursor.moreInCurrentBatch());
+ ASSERT_TRUE(cursor.isDead());
+ });
+
+ const auto blockedOnNetworkSoon = [&]() {
+ // Wait up to 10 seconds.
+ for (auto i = 0; i < 100; i++) {
+ if (conn.isBlockedOnNetwork()) {
+ return true;
+ }
+ mongo::sleepmillis(100);
+ }
+ return false;
+ };
+
+ // Cursor should be blocked on the first find command.
+ ASSERT_TRUE(blockedOnNetworkSoon());
+ auto m = conn.getLastSentMessage();
+ auto msg = mongo::OpMsg::parse(m);
+ ASSERT_EQ(mongo::StringData(msg.body.firstElement().fieldName()), "find");
+ // Set the response for the find command and unblock network call().
+ conn.setCallResponses(
+ {MockDBClientConnection::mockFindResponse(nss, cursorId, {docObj(1)}, metadata(1))});
+
+ // Cursor should be blocked on the getMore command.
+ ASSERT_TRUE(blockedOnNetworkSoon());
+ m = conn.getLastSentMessage();
+ msg = mongo::OpMsg::parse(m);
+ ASSERT_EQ(mongo::StringData(msg.body.firstElement().fieldName()), "getMore");
+ // Set the response for the getMore command and unblock network call().
+ conn.setCallResponses({MockDBClientConnection::mockGetMoreResponse(
+ nss, cursorId, {docObj(2)}, metadata(2), moreToCome)});
+
+ // Cursor should be blocked on the exhaust stream.
+ ASSERT_TRUE(blockedOnNetworkSoon());
+ // Set the response for the exhaust stream and unblock network recv().
+ conn.setRecvResponses(
+ {MockDBClientConnection::mockGetMoreResponse(nss, 0, {docObj(3)}, metadata(3))});
+
+ cursorThread.join();
+}
} // namespace mongo_test