diff options
author | Lingzhi Deng <lingzhi.deng@mongodb.com> | 2020-02-05 16:16:59 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-02-06 19:34:38 +0000 |
commit | d6a8ae5bcb0edefc2312dcfa2f6196b74711aa89 (patch) | |
tree | cd5c6ee25a050bd8980960faf5274d680cf35968 /src | |
parent | b0827ec75609c03f835d81417d7776e3015289a0 (diff) | |
download | mongo-d6a8ae5bcb0edefc2312dcfa2f6196b74711aa89.tar.gz |
SERVER-45931: Handle DBClientConnection connect and reconnect in NewOplogFetcher
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/client/dbclient_connection.cpp | 5 | ||||
-rw-r--r-- | src/mongo/client/dbclient_connection.h | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher.cpp | 43 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher.h | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher_test.cpp | 188 | ||||
-rw-r--r-- | src/mongo/dbtests/mock/mock_dbclient_connection.cpp | 75 | ||||
-rw-r--r-- | src/mongo/dbtests/mock/mock_dbclient_connection.h | 11 | ||||
-rw-r--r-- | src/mongo/dbtests/mock_dbclient_conn_test.cpp | 247 |
9 files changed, 488 insertions, 96 deletions
diff --git a/src/mongo/client/dbclient_connection.cpp b/src/mongo/client/dbclient_connection.cpp index cc22ee388df..d43c788affa 100644 --- a/src/mongo/client/dbclient_connection.cpp +++ b/src/mongo/client/dbclient_connection.cpp @@ -454,6 +454,11 @@ void DBClientConnection::setTags(transport::Session::TagMask tags) { _session->setTags(tags); } +void DBClientConnection::shutdown() { + stdx::lock_guard<Latch> lk(_sessionMutex); + _markFailed(kEndSession); +} + void DBClientConnection::shutdownAndDisallowReconnect() { stdx::lock_guard<Latch> lk(_sessionMutex); _stayFailed.store(true); diff --git a/src/mongo/client/dbclient_connection.h b/src/mongo/client/dbclient_connection.h index ca8ffc9f5f9..9dccef0ea42 100644 --- a/src/mongo/client/dbclient_connection.h +++ b/src/mongo/client/dbclient_connection.h @@ -190,6 +190,13 @@ public: void setTags(transport::Session::TagMask tag); + + /** + * Disconnects the client and interrupts operations if they are currently blocked waiting for + * the network. If autoreconnect is on, a connection will be re-established after reconnecting. + */ + virtual void shutdown(); + /** * Causes an error to be reported the next time the connection is used. Will interrupt * operations if they are currently blocked waiting for the network. diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 96a37e1c59d..6efb849fc51 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -890,6 +890,7 @@ env.Library( ], LIBDEPS_PRIVATE=[ 'repl_server_parameters', + 'replication_auth', '$BUILD_DIR/mongo/db/matcher/expressions', '$BUILD_DIR/mongo/db/commands/server_status_core', ], diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp index 1398dfd6025..7689d70bed2 100644 --- a/src/mongo/db/repl/oplog_fetcher.cpp +++ b/src/mongo/db/repl/oplog_fetcher.cpp @@ -57,6 +57,7 @@ MONGO_FAIL_POINT_DEFINE(stopReplProducerOnDocument); MONGO_FAIL_POINT_DEFINE(setSmallOplogGetMoreMaxTimeMS); MONGO_FAIL_POINT_DEFINE(logAfterOplogFetcherConnCreated); MONGO_FAIL_POINT_DEFINE(hangAfterOplogFetcherCallbackScheduled); +MONGO_FAIL_POINT_DEFINE(hangBeforeOplogFetcherRetries); // TODO SERVER-45574: Define the failpoint in this file instead. extern FailPoint hangBeforeStartingOplogFetcher; @@ -733,8 +734,6 @@ void NewOplogFetcher::_runQuery(const executor::TaskExecutor::CallbackArgs& call { stdx::lock_guard<Latch> lock(_mutex); _conn = _createClientFn(); - - // TODO SERVER-45931: Connect and authenticate. } if (MONGO_unlikely(logAfterOplogFetcherConnCreated.shouldFail())) { @@ -745,6 +744,13 @@ void NewOplogFetcher::_runQuery(const executor::TaskExecutor::CallbackArgs& call hangAfterOplogFetcherCallbackScheduled.pauseWhileSet(); + auto connectStatus = _connect(); + // Error out if we failed to connect after exhausting the allowed retry attempts. + if (!connectStatus.isOK()) { + _finishCallback(connectStatus); + return; + } + _createNewCursor(true /* initialFind */); while (true) { @@ -767,6 +773,7 @@ void NewOplogFetcher::_runQuery(const executor::TaskExecutor::CallbackArgs& call // Recreate a cursor if we have enough retries left. if (_oplogFetcherRestartDecision->shouldContinue(this, brStatus)) { + hangBeforeOplogFetcherRetries.pauseWhileSet(); _createNewCursor(false /* initialFind */); continue; } else { @@ -797,6 +804,32 @@ void NewOplogFetcher::_runQuery(const executor::TaskExecutor::CallbackArgs& call } } +Status NewOplogFetcher::_connect() { + Status connectStatus = Status::OK(); + do { + connectStatus = [&] { + try { + // Always try to start from scratch with a new connection if either the connection + // or the authentication fails from the previous attempt. + uassertStatusOK(_conn->connect(_source, "OplogFetcher")); + uassertStatusOK(replAuthenticate(_conn.get()) + .withContext(str::stream() + << "OplogFecther failed to authenticate to " + << _source)); + // Reset any state needed to track restarts on successful connection. + _oplogFetcherRestartDecision->fetchSuccessful(this); + return Status::OK(); + } catch (const DBException& e) { + hangBeforeOplogFetcherRetries.pauseWhileSet(); + return e.toStatus(); + } + }(); + } while (!connectStatus.isOK() && + _oplogFetcherRestartDecision->shouldContinue(this, connectStatus)); + + return connectStatus; +} + BSONObj NewOplogFetcher::_makeFindQuery(long long findTimeout) const { BSONObjBuilder queryBob; @@ -875,6 +908,12 @@ StatusWith<NewOplogFetcher::Documents> NewOplogFetcher::_getNextBatch() { batch.emplace_back(_cursor->nextSafe()); } } catch (const DBException& ex) { + if (_cursor->connectionHasPendingReplies()) { + // Close the connection because the connection cannot be used anymore as more data is on + // the way from the server for the exhaust stream. Thus, we have to reconnect. The + // DBClientConnection does autoreconnect on the next network operation. + _conn->shutdown(); + } return ex.toStatus("Error while getting the next batch in the oplog fetcher"); } diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h index 942edcda9eb..0ee841bd6ba 100644 --- a/src/mongo/db/repl/oplog_fetcher.h +++ b/src/mongo/db/repl/oplog_fetcher.h @@ -474,6 +474,13 @@ private: void _runQuery(const executor::TaskExecutor::CallbackArgs& callbackData) noexcept; /** + * Establishes the initial connection to the sync source and authenticates the connection for + * replication. This will also retry on connection failures until it exhausts the allowed retry + * attempts. + */ + Status _connect(); + + /** * Executes a `find` query on the sync source's oplog and establishes a tailable, awaitData, * exhaust cursor. * diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp index 8920cc28415..8562b7fbfde 100644 --- a/src/mongo/db/repl/oplog_fetcher_test.cpp +++ b/src/mongo/db/repl/oplog_fetcher_test.cpp @@ -1149,6 +1149,12 @@ void processSingleExhaustResponse(DBClientConnection* conn, } } +void simulateNetworkDisconnect(DBClientConnection* conn) { + auto* mockConn = dynamic_cast<MockDBClientConnection*>(conn); + ASSERT_TRUE(blockedOnNetworkSoon(mockConn)); + mockConn->shutdown(); +} + class NewOplogFetcherTest : public executor::ThreadPoolExecutorTest, public ScopedGlobalServiceContextForTest { protected: @@ -1193,6 +1199,8 @@ protected: // The last OpTime fetched by the oplog fetcher. OpTime lastFetched; + + std::unique_ptr<MockRemoteDBServer> _mockServer; }; const OpTime NewOplogFetcherTest::remoteNewerOpTime = OpTime(Timestamp(124, 1), 2); @@ -1218,6 +1226,9 @@ void NewOplogFetcherTest::setUp() { lastEnqueuedDocumentsInfo = info; return Status::OK(); }; + + auto target = HostAndPort{"localhost:12346"}; + _mockServer = std::make_unique<MockRemoteDBServer>(target.toString()); } std::unique_ptr<NewOplogFetcher> NewOplogFetcherTest::makeOplogFetcher() { @@ -1261,8 +1272,11 @@ std::unique_ptr<NewOplogFetcher> NewOplogFetcherTest::makeOplogFetcherWithDiffer fn, defaultBatchSize, NewOplogFetcher::StartingPoint::kSkipFirstDoc); - oplogFetcher->setCreateClientFn_forTest( - []() { return std::unique_ptr<DBClientConnection>(new MockDBClientConnection()); }); + oplogFetcher->setCreateClientFn_forTest([this]() { + const auto autoReconnect = true; + return std::unique_ptr<DBClientConnection>( + new MockDBClientConnection(_mockServer.get(), autoReconnect)); + }); return oplogFetcher; } @@ -1429,6 +1443,176 @@ TEST_F(NewOplogFetcherTest, ASSERT_EQUALS(ErrorCodes::HostUnreachable, shutdownState.getStatus()); } +TEST_F(NewOplogFetcherTest, OplogFetcherReturnsHostUnreachableOnConnectionFailures) { + // Test that OplogFetcher fails to establish initial connection, retrying HostUnreachable. + ShutdownState shutdownState; + + // Shutdown the mock remote server before the OplogFetcher tries to connect. + _mockServer->shutdown(); + + // This will also ensure that _runQuery was scheduled before returning. + auto oplogFetcher = getOplogFetcherAfterConnectionCreated(std::ref(shutdownState)); + + oplogFetcher->join(); + + // This is the error code for connection failures. + ASSERT_EQUALS(ErrorCodes::HostUnreachable, shutdownState.getStatus()); +} + +TEST_F(NewOplogFetcherTest, OplogFetcherRetriesConnectionButFails) { + // Test that OplogFetcher tries but fails after failing the initial connection, retrying + // HostUnreachable. + ShutdownState shutdownState; + + // Shutdown the mock remote server before the OplogFetcher tries to connect. + _mockServer->shutdown(); + + // Create an OplogFetcher with 1 retry attempt. This will also ensure that _runQuery was + // scheduled before returning. + auto oplogFetcher = getOplogFetcherAfterConnectionCreated(std::ref(shutdownState), 1); + + oplogFetcher->join(); + + // This is the error code for connection failures. + ASSERT_EQUALS(ErrorCodes::HostUnreachable, shutdownState.getStatus()); +} + +TEST_F(NewOplogFetcherTest, OplogFetcherResetsNumRestartsOnSuccessfulConnection) { + // Test that OplogFetcher resets the number of restarts after a successful connection on a + // retry. + ShutdownState shutdownState; + + // Shutdown the mock remote server before the OplogFetcher tries to connect. + _mockServer->shutdown(); + + // Hang OplogFetcher before it retries the connection. + auto beforeRetryingConnection = globalFailPointRegistry().find("hangBeforeOplogFetcherRetries"); + auto timesEntered = beforeRetryingConnection->setMode(FailPoint::alwaysOn); + + // Create an OplogFetcher with 1 retry attempt. This will also ensure that _runQuery was + // scheduled before returning. + auto oplogFetcher = getOplogFetcherAfterConnectionCreated(std::ref(shutdownState), 1); + + // Wait until the first connect attempt fails but before it retries. + beforeRetryingConnection->waitForTimesEntered(timesEntered + 1); + + // Bring up the mock server so that the connection retry can succeed. + _mockServer->reboot(); + + // Disable the failpoint to allow reconnection. + beforeRetryingConnection->setMode(FailPoint::off); + + // After the connection succeeded, the number of restarts should be reset back to 0 so that the + // OplogFetcher can tolerate another failure before failing. This will cause the first attempt + // to create a cursor to fail. After this, the new cursor will be blocked on call() while + // retrying to initialize. This also makes sure the OplogFetcher reconnects correctly. + simulateNetworkDisconnect(oplogFetcher->getDBClientConnection_forTest()); + + CursorId cursorId = 0LL; + auto firstEntry = makeNoopOplogEntry(lastFetched); + auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata); + + // Allow the cursor re-initialization to succeed. But the OplogFetcher will shut down with an OK + // status after receiving this batch because the cursor id is 0. + processSingleRequestResponse(oplogFetcher->getDBClientConnection_forTest(), + makeFirstBatch(cursorId, {firstEntry}, metadataObj)); + + oplogFetcher->join(); + + ASSERT_OK(shutdownState.getStatus()); +} + +TEST_F(NewOplogFetcherTest, OplogFetcherCanAutoReconnect) { + // Test that the OplogFetcher can autoreconnect after a broken connection. + ShutdownState shutdownState; + + // Create an oplog fetcher with one retry. + auto oplogFetcher = getOplogFetcherAfterConnectionCreated(std::ref(shutdownState), 1); + + auto conn = oplogFetcher->getDBClientConnection_forTest(); + // Simulate a disconnect for the first find command. + simulateNetworkDisconnect(conn); + + auto firstEntry = makeNoopOplogEntry(lastFetched); + auto secondEntry = makeNoopOplogEntry({{Seconds(124), 0}, lastFetched.getTerm()}); + auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata); + + // Simulate closing the cursor and the OplogFetcher should exit with an OK status. + processSingleRequestResponse(conn, makeFirstBatch(0LL, {firstEntry}, metadataObj)); + + oplogFetcher->join(); + + ASSERT_OK(shutdownState.getStatus()); +} + +TEST_F(NewOplogFetcherTest, OplogFetcherAutoReconnectsButFails) { + // Test that the OplogFetcher fails an autoreconnect after a broken connection. + ShutdownState shutdownState; + + // Create an oplog fetcher with one retry. + auto oplogFetcher = getOplogFetcherAfterConnectionCreated(std::ref(shutdownState), 1); + + auto conn = oplogFetcher->getDBClientConnection_forTest(); + // Shut down the mock server and simulate a disconnect for the first find command. And the + // OplogFetcher should retry with AutoReconnect. + _mockServer->shutdown(); + simulateNetworkDisconnect(conn); + + oplogFetcher->join(); + + // AutoReconnect should also fail because the server is shut down. + ASSERT_EQUALS(ErrorCodes::HostUnreachable, shutdownState.getStatus()); +} + +TEST_F(NewOplogFetcherTest, DisconnectsOnErrorsDuringExhaustStream) { + // Test that the connection disconnects if we get errors after successfully receiving a batch + // from the exhaust stream. + ShutdownState shutdownState; + + // Create an oplog fetcher with one retry. + auto oplogFetcher = getOplogFetcherAfterConnectionCreated(std::ref(shutdownState), 1); + + CursorId cursorId = 22LL; + auto firstEntry = makeNoopOplogEntry(lastFetched); + auto secondEntry = makeNoopOplogEntry({{Seconds(124), 0}, lastFetched.getTerm()}); + auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata); + + auto conn = oplogFetcher->getDBClientConnection_forTest(); + // First batch for the initial find command. + processSingleRequestResponse(conn, makeFirstBatch(cursorId, {firstEntry}, metadataObj), true); + + auto beforeRecreatingCursor = globalFailPointRegistry().find("hangBeforeOplogFetcherRetries"); + auto timesEntered = beforeRecreatingCursor->setMode(FailPoint::alwaysOn); + + // Temporarily override the metatdata reader to introduce failure after successfully receiving a + // batch from the first getMore. And the exhaust stream is now established. + conn->setReplyMetadataReader( + [&](OperationContext* opCtx, const BSONObj& metadataObj, StringData target) { + return Status(ErrorCodes::FailedToParse, "Fake error"); + }); + processSingleRequestResponse( + conn, makeSubsequentBatch(cursorId, {secondEntry}, metadataObj, true /* moreToCome */)); + + beforeRecreatingCursor->waitForTimesEntered(timesEntered + 1); + + // Test that the connection is disconnected because we cannot use the same connection to + // recreate cursor as more data is on the way from the server for the exhaust stream. + ASSERT_TRUE(conn->isFailed()); + + // Unset the metatdata reader. + conn->setReplyMetadataReader(rpc::ReplyMetadataReader()); + + // Allow retry and autoreconnect. + beforeRecreatingCursor->setMode(FailPoint::off); + + // Simulate closing the cursor and the OplogFetcher should exit with an OK status. + processSingleRequestResponse(conn, makeFirstBatch(0LL, {firstEntry}, metadataObj)); + + oplogFetcher->join(); + + ASSERT_OK(shutdownState.getStatus()); +} + TEST_F(NewOplogFetcherTest, FindQueryContainsTermIfGetCurrentTermAndLastCommittedOpTimeReturnsValidTerm) { // Test that the correct maxTimeMS is set if this is the initial 'find' query. diff --git a/src/mongo/dbtests/mock/mock_dbclient_connection.cpp b/src/mongo/dbtests/mock/mock_dbclient_connection.cpp index b14296f5acd..3bb3a845d3e 100644 --- a/src/mongo/dbtests/mock/mock_dbclient_connection.cpp +++ b/src/mongo/dbtests/mock/mock_dbclient_connection.cpp @@ -43,22 +43,12 @@ using std::string; using std::vector; namespace mongo { -MockDBClientConnection::MockDBClientConnection() - : _remoteServer(nullptr), - _isFailed(false), - _sockCreationTime(mongo::curTimeMicros64()), - _autoReconnect(false) { - _setServerRPCProtocols(rpc::supports::kAll); - _callIter = _mockCallResponses.begin(); - _recvIter = _mockRecvResponses.begin(); -} - MockDBClientConnection::MockDBClientConnection(MockRemoteDBServer* remoteServer, bool autoReconnect) - : _remoteServerInstanceID(remoteServer->getInstanceID()), + : DBClientConnection(autoReconnect), + _remoteServerInstanceID(remoteServer->getInstanceID()), _remoteServer(remoteServer), - _isFailed(false), - _sockCreationTime(mongo::curTimeMicros64()), - _autoReconnect(autoReconnect) { + _sockCreationTime(mongo::curTimeMicros64()) { + _setServerRPCProtocols(rpc::supports::kAll); _callIter = _mockCallResponses.begin(); _recvIter = _mockRecvResponses.begin(); } @@ -75,6 +65,7 @@ bool MockDBClientConnection::connect(const char* hostName, return true; } + _failed.store(true); errmsg.assign("cannot connect to " + _remoteServer->getServerAddress()); return false; } @@ -101,7 +92,7 @@ std::pair<rpc::UniqueReply, DBClientBase*> MockDBClientConnection::runCommandWit } return {std::move(reply), this}; } catch (const mongo::DBException&) { - _isFailed = true; + _failed.store(true); throw; } } // namespace mongo @@ -172,7 +163,7 @@ std::unique_ptr<mongo::DBClientCursor> MockDBClientConnection::query( this, BSONArray(resultsInCursor), provideResumeToken, batchSize)); return cursor; } catch (const mongo::DBException&) { - _isFailed = true; + _failed.store(true); throw; } @@ -184,10 +175,6 @@ mongo::ConnectionString::ConnectionType MockDBClientConnection::type() const { return mongo::ConnectionString::CUSTOM; } -bool MockDBClientConnection::isFailed() const { - return _isFailed; -} - string MockDBClientConnection::getServerAddress() const { return _remoteServer ? _remoteServer->getServerAddress() : "localhost:27017"; } @@ -227,9 +214,7 @@ void MockDBClientConnection::remove(const string& ns, Query query, int flags) { } void MockDBClientConnection::killCursor(const NamespaceString& ns, long long cursorID) { - // Unimplemented if there is a remote server. Without a remote server, there is nothing that - // needs to be done. - invariant(!_remoteServer); + // It is not worth the bother of killing the cursor in the mock. } bool MockDBClientConnection::call(mongo::Message& toSend, @@ -255,15 +240,26 @@ bool MockDBClientConnection::call(mongo::Message& toSend, return true; } } + + auto killSessionOnDisconnect = makeGuard([this] { shutdown(); }); + stdx::unique_lock lk(_netMutex); + checkConnection(); + if (!isStillConnected() || !_remoteServer->isRunning()) { + uasserted(ErrorCodes::SocketException, "Broken pipe in call"); + } _lastSentMessage = toSend; _mockCallResponsesCV.wait(lk, [&] { _blockedOnNetwork = (_callIter == _mockCallResponses.end()); - return !_blockedOnNetwork || !isStillConnected(); + return !_blockedOnNetwork || !isStillConnected() || !_remoteServer->isRunning(); }); - uassert(ErrorCodes::HostUnreachable, "Socket was shut down while in call", isStillConnected()); + uassert(ErrorCodes::HostUnreachable, + "Socket was shut down while in call", + isStillConnected() && _remoteServer->isRunning()); + + killSessionOnDisconnect.dismiss(); const auto& swResponse = *_callIter; _callIter++; @@ -272,14 +268,23 @@ bool MockDBClientConnection::call(mongo::Message& toSend, } Status MockDBClientConnection::recv(mongo::Message& m, int lastRequestId) { + auto killSessionOnDisconnect = makeGuard([this] { shutdown(); }); + stdx::unique_lock lk(_netMutex); + if (!isStillConnected() || !_remoteServer->isRunning()) { + return Status(ErrorCodes::SocketException, "Broken pipe in recv"); + } _mockRecvResponsesCV.wait(lk, [&] { _blockedOnNetwork = (_recvIter == _mockRecvResponses.end()); - return !_blockedOnNetwork || !isStillConnected(); + return !_blockedOnNetwork || !isStillConnected() || !_remoteServer->isRunning(); }); - uassert(ErrorCodes::HostUnreachable, "Socket was shut down while in recv", isStillConnected()); + if (!isStillConnected() || !_remoteServer->isRunning()) { + return Status(ErrorCodes::HostUnreachable, "Socket was shut down while in recv"); + } + + killSessionOnDisconnect.dismiss(); const auto& swResponse = *_recvIter; _recvIter++; @@ -287,7 +292,15 @@ Status MockDBClientConnection::recv(mongo::Message& m, int lastRequestId) { return Status::OK(); } +void MockDBClientConnection::shutdown() { + stdx::lock_guard lk(_netMutex); + DBClientConnection::shutdown(); + _mockCallResponsesCV.notify_all(); + _mockRecvResponsesCV.notify_all(); +} + void MockDBClientConnection::shutdownAndDisallowReconnect() { + stdx::lock_guard lk(_netMutex); DBClientConnection::shutdownAndDisallowReconnect(); _mockCallResponsesCV.notify_all(); _mockRecvResponsesCV.notify_all(); @@ -323,9 +336,13 @@ bool MockDBClientConnection::lazySupported() const { } void MockDBClientConnection::checkConnection() { - if (_isFailed && _autoReconnect && _remoteServer->isRunning()) { + if (_failed.load()) { + uassert(ErrorCodes::SocketException, toString(), autoReconnect); + uassert(ErrorCodes::HostUnreachable, + "cannot connect to " + _remoteServer->getServerAddress(), + _remoteServer->isRunning()); _remoteServerInstanceID = _remoteServer->getInstanceID(); - _isFailed = false; + _failed.store(false); } } } // namespace mongo diff --git a/src/mongo/dbtests/mock/mock_dbclient_connection.h b/src/mongo/dbtests/mock/mock_dbclient_connection.h index ff96854ffe1..5a8d9fa727e 100644 --- a/src/mongo/dbtests/mock/mock_dbclient_connection.h +++ b/src/mongo/dbtests/mock/mock_dbclient_connection.h @@ -100,11 +100,6 @@ public: MockDBClientConnection(MockRemoteDBServer* remoteServer, bool autoReconnect = false); virtual ~MockDBClientConnection(); - /** - * Create a mock connection that only supports call() and recv(). - */ - MockDBClientConnection(); - // // DBClientBase methods // @@ -115,7 +110,7 @@ public: Status connect(const HostAndPort& host, StringData applicationName) override { std::string errmsg; if (!connect(host.toString().c_str(), applicationName, errmsg)) { - return {ErrorCodes::HostNotFound, errmsg}; + return {ErrorCodes::HostUnreachable, errmsg}; } return Status::OK(); } @@ -145,6 +140,7 @@ public: std::string* actualServer) override; Status recv(mongo::Message& m, int lastRequestId) override; + void shutdown() override; void shutdownAndDisallowReconnect() override; // Methods to simulate network responses. @@ -157,7 +153,6 @@ public: // mongo::ConnectionString::ConnectionType type() const override; - bool isFailed() const override; std::string getServerAddress() const override; std::string toString() const override; @@ -197,9 +192,7 @@ private: MockRemoteDBServer::InstanceID _remoteServerInstanceID; MockRemoteDBServer* _remoteServer; - bool _isFailed; uint64_t _sockCreationTime; - bool _autoReconnect; boost::optional<OpMsgRequest> _lastCursorMessage; Mutex _netMutex = MONGO_MAKE_LATCH("MockDBClientConnection"); diff --git a/src/mongo/dbtests/mock_dbclient_conn_test.cpp b/src/mongo/dbtests/mock_dbclient_conn_test.cpp index 0cb8eb27263..971ab740b4b 100644 --- a/src/mongo/dbtests/mock_dbclient_conn_test.cpp +++ b/src/mongo/dbtests/mock_dbclient_conn_test.cpp @@ -625,10 +625,16 @@ TEST(MockDBClientConnTest, Delay) { ASSERT_EQUALS(1U, server.getCmdCount()); } +const auto docObj = [](int i) { return BSON("_id" << i); }; +const auto metadata = [](int i) { return BSON("$fakeMetaData" << i); }; +const long long cursorId = 123; +const bool moreToCome = true; +const NamespaceString nss("test", "coll"); + TEST(MockDBClientConnTest, SimulateCallAndRecvResponses) { - MockDBClientConnection conn; + MockRemoteDBServer server("test"); + MockDBClientConnection conn(&server); - const NamespaceString nss("test", "coll"); mongo::DBClientCursor cursor(&conn, mongo::NamespaceStringOrUUID(nss), Query().obj, @@ -639,11 +645,6 @@ TEST(MockDBClientConnTest, SimulateCallAndRecvResponses) { 0); cursor.setBatchSize(2); - const auto docObj = [](int i) { return BSON("_id" << i); }; - const auto metadata = [](int i) { return BSON("$fakeMetaData" << i); }; - const long long cursorId = 123; - const bool moreToCome = true; - // Two batches from the initial find and getMore command. MockDBClientConnection::Responses callResponses = { MockDBClientConnection::mockFindResponse( @@ -703,9 +704,9 @@ TEST(MockDBClientConnTest, SimulateCallAndRecvResponses) { } TEST(MockDBClientConnTest, SimulateCallErrors) { - MockDBClientConnection conn; + MockRemoteDBServer server("test"); + MockDBClientConnection conn(&server); - const NamespaceString nss("test", "coll"); mongo::DBClientCursor cursor( &conn, mongo::NamespaceStringOrUUID(nss), Query().obj, 0, 0, nullptr, 0, 0); @@ -729,10 +730,31 @@ TEST(MockDBClientConnTest, SimulateCallErrors) { ASSERT_TRUE(cursor.isDead()); } +void runUntilExhaustRecv(MockDBClientConnection* conn, mongo::DBClientCursor* cursor) { + cursor->setBatchSize(1); + + // Two batches from the initial find and getMore command. + MockDBClientConnection::Responses callResponses = { + MockDBClientConnection::mockFindResponse(nss, cursorId, {docObj(1)}, metadata(1)), + MockDBClientConnection::mockGetMoreResponse( + nss, cursorId, {docObj(2)}, metadata(2), moreToCome)}; + conn->setCallResponses(callResponses); + + // First batch from the initial find command. + ASSERT_TRUE(cursor->init()); + ASSERT_BSONOBJ_EQ(docObj(1), cursor->next()); + ASSERT_FALSE(cursor->moreInCurrentBatch()); + + // Second batch from the first getMore command. + ASSERT_TRUE(cursor->more()); + ASSERT_BSONOBJ_EQ(docObj(2), cursor->next()); + ASSERT_FALSE(cursor->moreInCurrentBatch()); +} + TEST(MockDBClientConnTest, SimulateRecvErrors) { - MockDBClientConnection conn; + MockRemoteDBServer server("test"); + MockDBClientConnection conn(&server); - const NamespaceString nss("test", "coll"); mongo::DBClientCursor cursor(&conn, mongo::NamespaceStringOrUUID(nss), Query().obj, @@ -741,29 +763,8 @@ TEST(MockDBClientConnTest, SimulateRecvErrors) { nullptr, mongo::QueryOption_Exhaust, 0); - cursor.setBatchSize(1); - - const auto docObj = [](int i) { return BSON("_id" << i); }; - const auto metadata = [](int i) { return BSON("$fakeMetaData" << i); }; - const long long cursorId = 123; - const bool moreToCome = true; - - // Two batches from the initial find and getMore command. - MockDBClientConnection::Responses callResponses = { - MockDBClientConnection::mockFindResponse(nss, cursorId, {docObj(1)}, metadata(1)), - MockDBClientConnection::mockGetMoreResponse( - nss, cursorId, {docObj(2)}, metadata(2), moreToCome)}; - conn.setCallResponses(callResponses); - // First batch from the initial find command. - ASSERT_TRUE(cursor.init()); - ASSERT_BSONOBJ_EQ(docObj(1), cursor.next()); - ASSERT_FALSE(cursor.moreInCurrentBatch()); - - // Second batch from the first getMore command. - ASSERT_TRUE(cursor.more()); - ASSERT_BSONOBJ_EQ(docObj(2), cursor.next()); - ASSERT_FALSE(cursor.moreInCurrentBatch()); + runUntilExhaustRecv(&conn, &cursor); // Test network exception and error response from exhaust stream. MockDBClientConnection::Responses recvResponses = { @@ -787,10 +788,21 @@ TEST(MockDBClientConnTest, SimulateRecvErrors) { ASSERT_TRUE(cursor.isDead()); } +bool blockedOnNetworkSoon(MockDBClientConnection* conn) { + // Wait up to 10 seconds. + for (auto i = 0; i < 100; i++) { + if (conn->isBlockedOnNetwork()) { + return true; + } + mongo::sleepmillis(100); + } + return false; +} + TEST(MockDBClientConnTest, BlockingNetwork) { - MockDBClientConnection conn; + MockRemoteDBServer server("test"); + MockDBClientConnection conn(&server); - const NamespaceString nss("test", "coll"); mongo::DBClientCursor cursor(&conn, mongo::NamespaceStringOrUUID(nss), Query().obj, @@ -801,11 +813,6 @@ TEST(MockDBClientConnTest, BlockingNetwork) { 0); cursor.setBatchSize(1); - const auto docObj = [](int i) { return BSON("_id" << i); }; - const auto metadata = [](int i) { return BSON("$fakeMetaData" << i); }; - const long long cursorId = 123; - const bool moreToCome = true; - mongo::stdx::thread cursorThread([&] { // First batch from the initial find command. ASSERT_TRUE(cursor.init()); @@ -824,19 +831,8 @@ TEST(MockDBClientConnTest, BlockingNetwork) { ASSERT_TRUE(cursor.isDead()); }); - const auto blockedOnNetworkSoon = [&]() { - // Wait up to 10 seconds. - for (auto i = 0; i < 100; i++) { - if (conn.isBlockedOnNetwork()) { - return true; - } - mongo::sleepmillis(100); - } - return false; - }; - // Cursor should be blocked on the first find command. - ASSERT_TRUE(blockedOnNetworkSoon()); + ASSERT_TRUE(blockedOnNetworkSoon(&conn)); auto m = conn.getLastSentMessage(); auto msg = mongo::OpMsg::parse(m); ASSERT_EQ(mongo::StringData(msg.body.firstElement().fieldName()), "find"); @@ -845,7 +841,7 @@ TEST(MockDBClientConnTest, BlockingNetwork) { {MockDBClientConnection::mockFindResponse(nss, cursorId, {docObj(1)}, metadata(1))}); // Cursor should be blocked on the getMore command. - ASSERT_TRUE(blockedOnNetworkSoon()); + ASSERT_TRUE(blockedOnNetworkSoon(&conn)); m = conn.getLastSentMessage(); msg = mongo::OpMsg::parse(m); ASSERT_EQ(mongo::StringData(msg.body.firstElement().fieldName()), "getMore"); @@ -854,11 +850,154 @@ TEST(MockDBClientConnTest, BlockingNetwork) { nss, cursorId, {docObj(2)}, metadata(2), moreToCome)}); // Cursor should be blocked on the exhaust stream. - ASSERT_TRUE(blockedOnNetworkSoon()); + ASSERT_TRUE(blockedOnNetworkSoon(&conn)); // Set the response for the exhaust stream and unblock network recv(). conn.setRecvResponses( {MockDBClientConnection::mockGetMoreResponse(nss, 0, {docObj(3)}, metadata(3))}); cursorThread.join(); } + +TEST(MockDBClientConnTest, ShutdownServerBeforeCall) { + MockRemoteDBServer server("test"); + MockDBClientConnection conn(&server); + + ASSERT_OK(conn.connect(mongo::HostAndPort("localhost", 12345), mongo::StringData())); + mongo::DBClientCursor cursor(&conn, + mongo::NamespaceStringOrUUID(nss), + Query().obj, + 0, + 0, + nullptr, + mongo::QueryOption_Exhaust, + 0); + + // Shut down server before call. + server.shutdown(); + + // Connection is broken before call. + ASSERT_THROWS_CODE(cursor.init(), mongo::DBException, mongo::ErrorCodes::SocketException); + + // Reboot the mock server but the cursor.init() would still fail because the connection does not + // support autoreconnect. + server.reboot(); + ASSERT_THROWS_CODE(cursor.init(), mongo::DBException, mongo::ErrorCodes::SocketException); +} + +TEST(MockDBClientConnTest, ShutdownServerAfterCall) { + MockRemoteDBServer server("test"); + MockDBClientConnection conn(&server); + + mongo::DBClientCursor cursor(&conn, + mongo::NamespaceStringOrUUID(nss), + Query().obj, + 0, + 0, + nullptr, + mongo::QueryOption_Exhaust, + 0); + + mongo::stdx::thread cursorThread([&] { + ASSERT_THROWS_CODE(cursor.init(), mongo::DBException, mongo::ErrorCodes::HostUnreachable); + }); + + // Cursor should be blocked on the first find command. + ASSERT_TRUE(blockedOnNetworkSoon(&conn)); + + // Shutting down the server doesn't interrupt the blocking network call, so we shut down the + // connection as well to simulate shutdown of the server. + server.shutdown(); + conn.shutdown(); + + cursorThread.join(); +} + +TEST(MockDBClientConnTest, ConnectionAutoReconnect) { + const bool autoReconnect = true; + MockRemoteDBServer server("test"); + MockDBClientConnection conn(&server, autoReconnect); + + ASSERT_OK(conn.connect(mongo::HostAndPort("localhost", 12345), mongo::StringData())); + mongo::DBClientCursor cursor(&conn, + mongo::NamespaceStringOrUUID(nss), + Query().obj, + 0, + 0, + nullptr, + mongo::QueryOption_Exhaust, + 0); + + server.shutdown(); + + // Connection is broken before call. + ASSERT_THROWS_CODE(cursor.init(), mongo::DBException, mongo::ErrorCodes::SocketException); + + // AutoReconnect fails because the server is down. + ASSERT_THROWS_CODE(cursor.init(), mongo::DBException, mongo::ErrorCodes::HostUnreachable); + + // Reboot the mock server and the cursor.init() will reconnect and succeed. + server.reboot(); + + conn.setCallResponses( + {MockDBClientConnection::mockFindResponse(nss, cursorId, {docObj(1)}, metadata(1))}); + ASSERT_TRUE(cursor.init()); + ASSERT_BSONOBJ_EQ(docObj(1), cursor.next()); + ASSERT_FALSE(cursor.moreInCurrentBatch()); +} + +TEST(MockDBClientConnTest, ShutdownServerBeforeRecv) { + const bool autoReconnect = true; + MockRemoteDBServer server("test"); + MockDBClientConnection conn(&server, autoReconnect); + + mongo::DBClientCursor cursor(&conn, + mongo::NamespaceStringOrUUID(nss), + Query().obj, + 0, + 0, + nullptr, + mongo::QueryOption_Exhaust, + 0); + + runUntilExhaustRecv(&conn, &cursor); + + server.shutdown(); + + // cursor.more() will call recv on the exhaust stream. + ASSERT_THROWS_CODE(cursor.more(), mongo::DBException, mongo::ErrorCodes::SocketException); + + server.reboot(); + // Reconnect is not possible for exhaust recv. + ASSERT_THROWS_CODE(cursor.more(), mongo::DBException, mongo::ErrorCodes::SocketException); +} + +TEST(MockDBClientConnTest, ShutdownServerAfterRecv) { + MockRemoteDBServer server("test"); + MockDBClientConnection conn(&server); + + mongo::DBClientCursor cursor(&conn, + mongo::NamespaceStringOrUUID(nss), + Query().obj, + 0, + 0, + nullptr, + mongo::QueryOption_Exhaust, + 0); + + runUntilExhaustRecv(&conn, &cursor); + + mongo::stdx::thread cursorThread([&] { + ASSERT_THROWS_CODE(cursor.more(), mongo::DBException, mongo::ErrorCodes::HostUnreachable); + }); + + // Cursor should be blocked on the recv. + ASSERT_TRUE(blockedOnNetworkSoon(&conn)); + + // Shutting down the server doesn't interrupt the blocking network call, so we shut down the + // connection as well to simulate shutdown of the server. + server.shutdown(); + conn.shutdown(); + + cursorThread.join(); +} } // namespace mongo_test |