summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorLingzhi Deng <lingzhi.deng@mongodb.com>2020-02-05 16:16:59 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-02-06 19:34:38 +0000
commitd6a8ae5bcb0edefc2312dcfa2f6196b74711aa89 (patch)
treecd5c6ee25a050bd8980960faf5274d680cf35968 /src
parentb0827ec75609c03f835d81417d7776e3015289a0 (diff)
downloadmongo-d6a8ae5bcb0edefc2312dcfa2f6196b74711aa89.tar.gz
SERVER-45931: Handle DBClientConnection connect and reconnect in NewOplogFetcher
Diffstat (limited to 'src')
-rw-r--r--src/mongo/client/dbclient_connection.cpp5
-rw-r--r--src/mongo/client/dbclient_connection.h7
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/oplog_fetcher.cpp43
-rw-r--r--src/mongo/db/repl/oplog_fetcher.h7
-rw-r--r--src/mongo/db/repl/oplog_fetcher_test.cpp188
-rw-r--r--src/mongo/dbtests/mock/mock_dbclient_connection.cpp75
-rw-r--r--src/mongo/dbtests/mock/mock_dbclient_connection.h11
-rw-r--r--src/mongo/dbtests/mock_dbclient_conn_test.cpp247
9 files changed, 488 insertions, 96 deletions
diff --git a/src/mongo/client/dbclient_connection.cpp b/src/mongo/client/dbclient_connection.cpp
index cc22ee388df..d43c788affa 100644
--- a/src/mongo/client/dbclient_connection.cpp
+++ b/src/mongo/client/dbclient_connection.cpp
@@ -454,6 +454,11 @@ void DBClientConnection::setTags(transport::Session::TagMask tags) {
_session->setTags(tags);
}
+void DBClientConnection::shutdown() {
+ stdx::lock_guard<Latch> lk(_sessionMutex);
+ _markFailed(kEndSession);
+}
+
void DBClientConnection::shutdownAndDisallowReconnect() {
stdx::lock_guard<Latch> lk(_sessionMutex);
_stayFailed.store(true);
diff --git a/src/mongo/client/dbclient_connection.h b/src/mongo/client/dbclient_connection.h
index ca8ffc9f5f9..9dccef0ea42 100644
--- a/src/mongo/client/dbclient_connection.h
+++ b/src/mongo/client/dbclient_connection.h
@@ -190,6 +190,13 @@ public:
void setTags(transport::Session::TagMask tag);
+
+ /**
+ * Disconnects the client and interrupts operations if they are currently blocked waiting for
+ * the network. If autoreconnect is on, a connection will be re-established after reconnecting.
+ */
+ virtual void shutdown();
+
/**
* Causes an error to be reported the next time the connection is used. Will interrupt
* operations if they are currently blocked waiting for the network.
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 96a37e1c59d..6efb849fc51 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -890,6 +890,7 @@ env.Library(
],
LIBDEPS_PRIVATE=[
'repl_server_parameters',
+ 'replication_auth',
'$BUILD_DIR/mongo/db/matcher/expressions',
'$BUILD_DIR/mongo/db/commands/server_status_core',
],
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp
index 1398dfd6025..7689d70bed2 100644
--- a/src/mongo/db/repl/oplog_fetcher.cpp
+++ b/src/mongo/db/repl/oplog_fetcher.cpp
@@ -57,6 +57,7 @@ MONGO_FAIL_POINT_DEFINE(stopReplProducerOnDocument);
MONGO_FAIL_POINT_DEFINE(setSmallOplogGetMoreMaxTimeMS);
MONGO_FAIL_POINT_DEFINE(logAfterOplogFetcherConnCreated);
MONGO_FAIL_POINT_DEFINE(hangAfterOplogFetcherCallbackScheduled);
+MONGO_FAIL_POINT_DEFINE(hangBeforeOplogFetcherRetries);
// TODO SERVER-45574: Define the failpoint in this file instead.
extern FailPoint hangBeforeStartingOplogFetcher;
@@ -733,8 +734,6 @@ void NewOplogFetcher::_runQuery(const executor::TaskExecutor::CallbackArgs& call
{
stdx::lock_guard<Latch> lock(_mutex);
_conn = _createClientFn();
-
- // TODO SERVER-45931: Connect and authenticate.
}
if (MONGO_unlikely(logAfterOplogFetcherConnCreated.shouldFail())) {
@@ -745,6 +744,13 @@ void NewOplogFetcher::_runQuery(const executor::TaskExecutor::CallbackArgs& call
hangAfterOplogFetcherCallbackScheduled.pauseWhileSet();
+ auto connectStatus = _connect();
+ // Error out if we failed to connect after exhausting the allowed retry attempts.
+ if (!connectStatus.isOK()) {
+ _finishCallback(connectStatus);
+ return;
+ }
+
_createNewCursor(true /* initialFind */);
while (true) {
@@ -767,6 +773,7 @@ void NewOplogFetcher::_runQuery(const executor::TaskExecutor::CallbackArgs& call
// Recreate a cursor if we have enough retries left.
if (_oplogFetcherRestartDecision->shouldContinue(this, brStatus)) {
+ hangBeforeOplogFetcherRetries.pauseWhileSet();
_createNewCursor(false /* initialFind */);
continue;
} else {
@@ -797,6 +804,32 @@ void NewOplogFetcher::_runQuery(const executor::TaskExecutor::CallbackArgs& call
}
}
+Status NewOplogFetcher::_connect() {
+ Status connectStatus = Status::OK();
+ do {
+ connectStatus = [&] {
+ try {
+ // Always try to start from scratch with a new connection if either the connection
+ // or the authentication fails from the previous attempt.
+ uassertStatusOK(_conn->connect(_source, "OplogFetcher"));
+ uassertStatusOK(replAuthenticate(_conn.get())
+ .withContext(str::stream()
+ << "OplogFecther failed to authenticate to "
+ << _source));
+ // Reset any state needed to track restarts on successful connection.
+ _oplogFetcherRestartDecision->fetchSuccessful(this);
+ return Status::OK();
+ } catch (const DBException& e) {
+ hangBeforeOplogFetcherRetries.pauseWhileSet();
+ return e.toStatus();
+ }
+ }();
+ } while (!connectStatus.isOK() &&
+ _oplogFetcherRestartDecision->shouldContinue(this, connectStatus));
+
+ return connectStatus;
+}
+
BSONObj NewOplogFetcher::_makeFindQuery(long long findTimeout) const {
BSONObjBuilder queryBob;
@@ -875,6 +908,12 @@ StatusWith<NewOplogFetcher::Documents> NewOplogFetcher::_getNextBatch() {
batch.emplace_back(_cursor->nextSafe());
}
} catch (const DBException& ex) {
+ if (_cursor->connectionHasPendingReplies()) {
+ // Close the connection because the connection cannot be used anymore as more data is on
+ // the way from the server for the exhaust stream. Thus, we have to reconnect. The
+ // DBClientConnection does autoreconnect on the next network operation.
+ _conn->shutdown();
+ }
return ex.toStatus("Error while getting the next batch in the oplog fetcher");
}
diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h
index 942edcda9eb..0ee841bd6ba 100644
--- a/src/mongo/db/repl/oplog_fetcher.h
+++ b/src/mongo/db/repl/oplog_fetcher.h
@@ -474,6 +474,13 @@ private:
void _runQuery(const executor::TaskExecutor::CallbackArgs& callbackData) noexcept;
/**
+ * Establishes the initial connection to the sync source and authenticates the connection for
+ * replication. This will also retry on connection failures until it exhausts the allowed retry
+ * attempts.
+ */
+ Status _connect();
+
+ /**
* Executes a `find` query on the sync source's oplog and establishes a tailable, awaitData,
* exhaust cursor.
*
diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp
index 8920cc28415..8562b7fbfde 100644
--- a/src/mongo/db/repl/oplog_fetcher_test.cpp
+++ b/src/mongo/db/repl/oplog_fetcher_test.cpp
@@ -1149,6 +1149,12 @@ void processSingleExhaustResponse(DBClientConnection* conn,
}
}
+void simulateNetworkDisconnect(DBClientConnection* conn) {
+ auto* mockConn = dynamic_cast<MockDBClientConnection*>(conn);
+ ASSERT_TRUE(blockedOnNetworkSoon(mockConn));
+ mockConn->shutdown();
+}
+
class NewOplogFetcherTest : public executor::ThreadPoolExecutorTest,
public ScopedGlobalServiceContextForTest {
protected:
@@ -1193,6 +1199,8 @@ protected:
// The last OpTime fetched by the oplog fetcher.
OpTime lastFetched;
+
+ std::unique_ptr<MockRemoteDBServer> _mockServer;
};
const OpTime NewOplogFetcherTest::remoteNewerOpTime = OpTime(Timestamp(124, 1), 2);
@@ -1218,6 +1226,9 @@ void NewOplogFetcherTest::setUp() {
lastEnqueuedDocumentsInfo = info;
return Status::OK();
};
+
+ auto target = HostAndPort{"localhost:12346"};
+ _mockServer = std::make_unique<MockRemoteDBServer>(target.toString());
}
std::unique_ptr<NewOplogFetcher> NewOplogFetcherTest::makeOplogFetcher() {
@@ -1261,8 +1272,11 @@ std::unique_ptr<NewOplogFetcher> NewOplogFetcherTest::makeOplogFetcherWithDiffer
fn,
defaultBatchSize,
NewOplogFetcher::StartingPoint::kSkipFirstDoc);
- oplogFetcher->setCreateClientFn_forTest(
- []() { return std::unique_ptr<DBClientConnection>(new MockDBClientConnection()); });
+ oplogFetcher->setCreateClientFn_forTest([this]() {
+ const auto autoReconnect = true;
+ return std::unique_ptr<DBClientConnection>(
+ new MockDBClientConnection(_mockServer.get(), autoReconnect));
+ });
return oplogFetcher;
}
@@ -1429,6 +1443,176 @@ TEST_F(NewOplogFetcherTest,
ASSERT_EQUALS(ErrorCodes::HostUnreachable, shutdownState.getStatus());
}
+TEST_F(NewOplogFetcherTest, OplogFetcherReturnsHostUnreachableOnConnectionFailures) {
+ // Test that OplogFetcher fails to establish initial connection, retrying HostUnreachable.
+ ShutdownState shutdownState;
+
+ // Shutdown the mock remote server before the OplogFetcher tries to connect.
+ _mockServer->shutdown();
+
+ // This will also ensure that _runQuery was scheduled before returning.
+ auto oplogFetcher = getOplogFetcherAfterConnectionCreated(std::ref(shutdownState));
+
+ oplogFetcher->join();
+
+ // This is the error code for connection failures.
+ ASSERT_EQUALS(ErrorCodes::HostUnreachable, shutdownState.getStatus());
+}
+
+TEST_F(NewOplogFetcherTest, OplogFetcherRetriesConnectionButFails) {
+ // Test that OplogFetcher tries but fails after failing the initial connection, retrying
+ // HostUnreachable.
+ ShutdownState shutdownState;
+
+ // Shutdown the mock remote server before the OplogFetcher tries to connect.
+ _mockServer->shutdown();
+
+ // Create an OplogFetcher with 1 retry attempt. This will also ensure that _runQuery was
+ // scheduled before returning.
+ auto oplogFetcher = getOplogFetcherAfterConnectionCreated(std::ref(shutdownState), 1);
+
+ oplogFetcher->join();
+
+ // This is the error code for connection failures.
+ ASSERT_EQUALS(ErrorCodes::HostUnreachable, shutdownState.getStatus());
+}
+
+TEST_F(NewOplogFetcherTest, OplogFetcherResetsNumRestartsOnSuccessfulConnection) {
+ // Test that OplogFetcher resets the number of restarts after a successful connection on a
+ // retry.
+ ShutdownState shutdownState;
+
+ // Shutdown the mock remote server before the OplogFetcher tries to connect.
+ _mockServer->shutdown();
+
+ // Hang OplogFetcher before it retries the connection.
+ auto beforeRetryingConnection = globalFailPointRegistry().find("hangBeforeOplogFetcherRetries");
+ auto timesEntered = beforeRetryingConnection->setMode(FailPoint::alwaysOn);
+
+ // Create an OplogFetcher with 1 retry attempt. This will also ensure that _runQuery was
+ // scheduled before returning.
+ auto oplogFetcher = getOplogFetcherAfterConnectionCreated(std::ref(shutdownState), 1);
+
+ // Wait until the first connect attempt fails but before it retries.
+ beforeRetryingConnection->waitForTimesEntered(timesEntered + 1);
+
+ // Bring up the mock server so that the connection retry can succeed.
+ _mockServer->reboot();
+
+ // Disable the failpoint to allow reconnection.
+ beforeRetryingConnection->setMode(FailPoint::off);
+
+ // After the connection succeeded, the number of restarts should be reset back to 0 so that the
+ // OplogFetcher can tolerate another failure before failing. This will cause the first attempt
+ // to create a cursor to fail. After this, the new cursor will be blocked on call() while
+ // retrying to initialize. This also makes sure the OplogFetcher reconnects correctly.
+ simulateNetworkDisconnect(oplogFetcher->getDBClientConnection_forTest());
+
+ CursorId cursorId = 0LL;
+ auto firstEntry = makeNoopOplogEntry(lastFetched);
+ auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata);
+
+ // Allow the cursor re-initialization to succeed. But the OplogFetcher will shut down with an OK
+ // status after receiving this batch because the cursor id is 0.
+ processSingleRequestResponse(oplogFetcher->getDBClientConnection_forTest(),
+ makeFirstBatch(cursorId, {firstEntry}, metadataObj));
+
+ oplogFetcher->join();
+
+ ASSERT_OK(shutdownState.getStatus());
+}
+
+TEST_F(NewOplogFetcherTest, OplogFetcherCanAutoReconnect) {
+ // Test that the OplogFetcher can autoreconnect after a broken connection.
+ ShutdownState shutdownState;
+
+ // Create an oplog fetcher with one retry.
+ auto oplogFetcher = getOplogFetcherAfterConnectionCreated(std::ref(shutdownState), 1);
+
+ auto conn = oplogFetcher->getDBClientConnection_forTest();
+ // Simulate a disconnect for the first find command.
+ simulateNetworkDisconnect(conn);
+
+ auto firstEntry = makeNoopOplogEntry(lastFetched);
+ auto secondEntry = makeNoopOplogEntry({{Seconds(124), 0}, lastFetched.getTerm()});
+ auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata);
+
+ // Simulate closing the cursor and the OplogFetcher should exit with an OK status.
+ processSingleRequestResponse(conn, makeFirstBatch(0LL, {firstEntry}, metadataObj));
+
+ oplogFetcher->join();
+
+ ASSERT_OK(shutdownState.getStatus());
+}
+
+TEST_F(NewOplogFetcherTest, OplogFetcherAutoReconnectsButFails) {
+ // Test that the OplogFetcher fails an autoreconnect after a broken connection.
+ ShutdownState shutdownState;
+
+ // Create an oplog fetcher with one retry.
+ auto oplogFetcher = getOplogFetcherAfterConnectionCreated(std::ref(shutdownState), 1);
+
+ auto conn = oplogFetcher->getDBClientConnection_forTest();
+ // Shut down the mock server and simulate a disconnect for the first find command. And the
+ // OplogFetcher should retry with AutoReconnect.
+ _mockServer->shutdown();
+ simulateNetworkDisconnect(conn);
+
+ oplogFetcher->join();
+
+ // AutoReconnect should also fail because the server is shut down.
+ ASSERT_EQUALS(ErrorCodes::HostUnreachable, shutdownState.getStatus());
+}
+
+TEST_F(NewOplogFetcherTest, DisconnectsOnErrorsDuringExhaustStream) {
+ // Test that the connection disconnects if we get errors after successfully receiving a batch
+ // from the exhaust stream.
+ ShutdownState shutdownState;
+
+ // Create an oplog fetcher with one retry.
+ auto oplogFetcher = getOplogFetcherAfterConnectionCreated(std::ref(shutdownState), 1);
+
+ CursorId cursorId = 22LL;
+ auto firstEntry = makeNoopOplogEntry(lastFetched);
+ auto secondEntry = makeNoopOplogEntry({{Seconds(124), 0}, lastFetched.getTerm()});
+ auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata);
+
+ auto conn = oplogFetcher->getDBClientConnection_forTest();
+ // First batch for the initial find command.
+ processSingleRequestResponse(conn, makeFirstBatch(cursorId, {firstEntry}, metadataObj), true);
+
+ auto beforeRecreatingCursor = globalFailPointRegistry().find("hangBeforeOplogFetcherRetries");
+ auto timesEntered = beforeRecreatingCursor->setMode(FailPoint::alwaysOn);
+
+ // Temporarily override the metatdata reader to introduce failure after successfully receiving a
+ // batch from the first getMore. And the exhaust stream is now established.
+ conn->setReplyMetadataReader(
+ [&](OperationContext* opCtx, const BSONObj& metadataObj, StringData target) {
+ return Status(ErrorCodes::FailedToParse, "Fake error");
+ });
+ processSingleRequestResponse(
+ conn, makeSubsequentBatch(cursorId, {secondEntry}, metadataObj, true /* moreToCome */));
+
+ beforeRecreatingCursor->waitForTimesEntered(timesEntered + 1);
+
+ // Test that the connection is disconnected because we cannot use the same connection to
+ // recreate cursor as more data is on the way from the server for the exhaust stream.
+ ASSERT_TRUE(conn->isFailed());
+
+ // Unset the metatdata reader.
+ conn->setReplyMetadataReader(rpc::ReplyMetadataReader());
+
+ // Allow retry and autoreconnect.
+ beforeRecreatingCursor->setMode(FailPoint::off);
+
+ // Simulate closing the cursor and the OplogFetcher should exit with an OK status.
+ processSingleRequestResponse(conn, makeFirstBatch(0LL, {firstEntry}, metadataObj));
+
+ oplogFetcher->join();
+
+ ASSERT_OK(shutdownState.getStatus());
+}
+
TEST_F(NewOplogFetcherTest,
FindQueryContainsTermIfGetCurrentTermAndLastCommittedOpTimeReturnsValidTerm) {
// Test that the correct maxTimeMS is set if this is the initial 'find' query.
diff --git a/src/mongo/dbtests/mock/mock_dbclient_connection.cpp b/src/mongo/dbtests/mock/mock_dbclient_connection.cpp
index b14296f5acd..3bb3a845d3e 100644
--- a/src/mongo/dbtests/mock/mock_dbclient_connection.cpp
+++ b/src/mongo/dbtests/mock/mock_dbclient_connection.cpp
@@ -43,22 +43,12 @@ using std::string;
using std::vector;
namespace mongo {
-MockDBClientConnection::MockDBClientConnection()
- : _remoteServer(nullptr),
- _isFailed(false),
- _sockCreationTime(mongo::curTimeMicros64()),
- _autoReconnect(false) {
- _setServerRPCProtocols(rpc::supports::kAll);
- _callIter = _mockCallResponses.begin();
- _recvIter = _mockRecvResponses.begin();
-}
-
MockDBClientConnection::MockDBClientConnection(MockRemoteDBServer* remoteServer, bool autoReconnect)
- : _remoteServerInstanceID(remoteServer->getInstanceID()),
+ : DBClientConnection(autoReconnect),
+ _remoteServerInstanceID(remoteServer->getInstanceID()),
_remoteServer(remoteServer),
- _isFailed(false),
- _sockCreationTime(mongo::curTimeMicros64()),
- _autoReconnect(autoReconnect) {
+ _sockCreationTime(mongo::curTimeMicros64()) {
+ _setServerRPCProtocols(rpc::supports::kAll);
_callIter = _mockCallResponses.begin();
_recvIter = _mockRecvResponses.begin();
}
@@ -75,6 +65,7 @@ bool MockDBClientConnection::connect(const char* hostName,
return true;
}
+ _failed.store(true);
errmsg.assign("cannot connect to " + _remoteServer->getServerAddress());
return false;
}
@@ -101,7 +92,7 @@ std::pair<rpc::UniqueReply, DBClientBase*> MockDBClientConnection::runCommandWit
}
return {std::move(reply), this};
} catch (const mongo::DBException&) {
- _isFailed = true;
+ _failed.store(true);
throw;
}
} // namespace mongo
@@ -172,7 +163,7 @@ std::unique_ptr<mongo::DBClientCursor> MockDBClientConnection::query(
this, BSONArray(resultsInCursor), provideResumeToken, batchSize));
return cursor;
} catch (const mongo::DBException&) {
- _isFailed = true;
+ _failed.store(true);
throw;
}
@@ -184,10 +175,6 @@ mongo::ConnectionString::ConnectionType MockDBClientConnection::type() const {
return mongo::ConnectionString::CUSTOM;
}
-bool MockDBClientConnection::isFailed() const {
- return _isFailed;
-}
-
string MockDBClientConnection::getServerAddress() const {
return _remoteServer ? _remoteServer->getServerAddress() : "localhost:27017";
}
@@ -227,9 +214,7 @@ void MockDBClientConnection::remove(const string& ns, Query query, int flags) {
}
void MockDBClientConnection::killCursor(const NamespaceString& ns, long long cursorID) {
- // Unimplemented if there is a remote server. Without a remote server, there is nothing that
- // needs to be done.
- invariant(!_remoteServer);
+ // It is not worth the bother of killing the cursor in the mock.
}
bool MockDBClientConnection::call(mongo::Message& toSend,
@@ -255,15 +240,26 @@ bool MockDBClientConnection::call(mongo::Message& toSend,
return true;
}
}
+
+ auto killSessionOnDisconnect = makeGuard([this] { shutdown(); });
+
stdx::unique_lock lk(_netMutex);
+ checkConnection();
+ if (!isStillConnected() || !_remoteServer->isRunning()) {
+ uasserted(ErrorCodes::SocketException, "Broken pipe in call");
+ }
_lastSentMessage = toSend;
_mockCallResponsesCV.wait(lk, [&] {
_blockedOnNetwork = (_callIter == _mockCallResponses.end());
- return !_blockedOnNetwork || !isStillConnected();
+ return !_blockedOnNetwork || !isStillConnected() || !_remoteServer->isRunning();
});
- uassert(ErrorCodes::HostUnreachable, "Socket was shut down while in call", isStillConnected());
+ uassert(ErrorCodes::HostUnreachable,
+ "Socket was shut down while in call",
+ isStillConnected() && _remoteServer->isRunning());
+
+ killSessionOnDisconnect.dismiss();
const auto& swResponse = *_callIter;
_callIter++;
@@ -272,14 +268,23 @@ bool MockDBClientConnection::call(mongo::Message& toSend,
}
Status MockDBClientConnection::recv(mongo::Message& m, int lastRequestId) {
+ auto killSessionOnDisconnect = makeGuard([this] { shutdown(); });
+
stdx::unique_lock lk(_netMutex);
+ if (!isStillConnected() || !_remoteServer->isRunning()) {
+ return Status(ErrorCodes::SocketException, "Broken pipe in recv");
+ }
_mockRecvResponsesCV.wait(lk, [&] {
_blockedOnNetwork = (_recvIter == _mockRecvResponses.end());
- return !_blockedOnNetwork || !isStillConnected();
+ return !_blockedOnNetwork || !isStillConnected() || !_remoteServer->isRunning();
});
- uassert(ErrorCodes::HostUnreachable, "Socket was shut down while in recv", isStillConnected());
+ if (!isStillConnected() || !_remoteServer->isRunning()) {
+ return Status(ErrorCodes::HostUnreachable, "Socket was shut down while in recv");
+ }
+
+ killSessionOnDisconnect.dismiss();
const auto& swResponse = *_recvIter;
_recvIter++;
@@ -287,7 +292,15 @@ Status MockDBClientConnection::recv(mongo::Message& m, int lastRequestId) {
return Status::OK();
}
+void MockDBClientConnection::shutdown() {
+ stdx::lock_guard lk(_netMutex);
+ DBClientConnection::shutdown();
+ _mockCallResponsesCV.notify_all();
+ _mockRecvResponsesCV.notify_all();
+}
+
void MockDBClientConnection::shutdownAndDisallowReconnect() {
+ stdx::lock_guard lk(_netMutex);
DBClientConnection::shutdownAndDisallowReconnect();
_mockCallResponsesCV.notify_all();
_mockRecvResponsesCV.notify_all();
@@ -323,9 +336,13 @@ bool MockDBClientConnection::lazySupported() const {
}
void MockDBClientConnection::checkConnection() {
- if (_isFailed && _autoReconnect && _remoteServer->isRunning()) {
+ if (_failed.load()) {
+ uassert(ErrorCodes::SocketException, toString(), autoReconnect);
+ uassert(ErrorCodes::HostUnreachable,
+ "cannot connect to " + _remoteServer->getServerAddress(),
+ _remoteServer->isRunning());
_remoteServerInstanceID = _remoteServer->getInstanceID();
- _isFailed = false;
+ _failed.store(false);
}
}
} // namespace mongo
diff --git a/src/mongo/dbtests/mock/mock_dbclient_connection.h b/src/mongo/dbtests/mock/mock_dbclient_connection.h
index ff96854ffe1..5a8d9fa727e 100644
--- a/src/mongo/dbtests/mock/mock_dbclient_connection.h
+++ b/src/mongo/dbtests/mock/mock_dbclient_connection.h
@@ -100,11 +100,6 @@ public:
MockDBClientConnection(MockRemoteDBServer* remoteServer, bool autoReconnect = false);
virtual ~MockDBClientConnection();
- /**
- * Create a mock connection that only supports call() and recv().
- */
- MockDBClientConnection();
-
//
// DBClientBase methods
//
@@ -115,7 +110,7 @@ public:
Status connect(const HostAndPort& host, StringData applicationName) override {
std::string errmsg;
if (!connect(host.toString().c_str(), applicationName, errmsg)) {
- return {ErrorCodes::HostNotFound, errmsg};
+ return {ErrorCodes::HostUnreachable, errmsg};
}
return Status::OK();
}
@@ -145,6 +140,7 @@ public:
std::string* actualServer) override;
Status recv(mongo::Message& m, int lastRequestId) override;
+ void shutdown() override;
void shutdownAndDisallowReconnect() override;
// Methods to simulate network responses.
@@ -157,7 +153,6 @@ public:
//
mongo::ConnectionString::ConnectionType type() const override;
- bool isFailed() const override;
std::string getServerAddress() const override;
std::string toString() const override;
@@ -197,9 +192,7 @@ private:
MockRemoteDBServer::InstanceID _remoteServerInstanceID;
MockRemoteDBServer* _remoteServer;
- bool _isFailed;
uint64_t _sockCreationTime;
- bool _autoReconnect;
boost::optional<OpMsgRequest> _lastCursorMessage;
Mutex _netMutex = MONGO_MAKE_LATCH("MockDBClientConnection");
diff --git a/src/mongo/dbtests/mock_dbclient_conn_test.cpp b/src/mongo/dbtests/mock_dbclient_conn_test.cpp
index 0cb8eb27263..971ab740b4b 100644
--- a/src/mongo/dbtests/mock_dbclient_conn_test.cpp
+++ b/src/mongo/dbtests/mock_dbclient_conn_test.cpp
@@ -625,10 +625,16 @@ TEST(MockDBClientConnTest, Delay) {
ASSERT_EQUALS(1U, server.getCmdCount());
}
+const auto docObj = [](int i) { return BSON("_id" << i); };
+const auto metadata = [](int i) { return BSON("$fakeMetaData" << i); };
+const long long cursorId = 123;
+const bool moreToCome = true;
+const NamespaceString nss("test", "coll");
+
TEST(MockDBClientConnTest, SimulateCallAndRecvResponses) {
- MockDBClientConnection conn;
+ MockRemoteDBServer server("test");
+ MockDBClientConnection conn(&server);
- const NamespaceString nss("test", "coll");
mongo::DBClientCursor cursor(&conn,
mongo::NamespaceStringOrUUID(nss),
Query().obj,
@@ -639,11 +645,6 @@ TEST(MockDBClientConnTest, SimulateCallAndRecvResponses) {
0);
cursor.setBatchSize(2);
- const auto docObj = [](int i) { return BSON("_id" << i); };
- const auto metadata = [](int i) { return BSON("$fakeMetaData" << i); };
- const long long cursorId = 123;
- const bool moreToCome = true;
-
// Two batches from the initial find and getMore command.
MockDBClientConnection::Responses callResponses = {
MockDBClientConnection::mockFindResponse(
@@ -703,9 +704,9 @@ TEST(MockDBClientConnTest, SimulateCallAndRecvResponses) {
}
TEST(MockDBClientConnTest, SimulateCallErrors) {
- MockDBClientConnection conn;
+ MockRemoteDBServer server("test");
+ MockDBClientConnection conn(&server);
- const NamespaceString nss("test", "coll");
mongo::DBClientCursor cursor(
&conn, mongo::NamespaceStringOrUUID(nss), Query().obj, 0, 0, nullptr, 0, 0);
@@ -729,10 +730,31 @@ TEST(MockDBClientConnTest, SimulateCallErrors) {
ASSERT_TRUE(cursor.isDead());
}
+void runUntilExhaustRecv(MockDBClientConnection* conn, mongo::DBClientCursor* cursor) {
+ cursor->setBatchSize(1);
+
+ // Two batches from the initial find and getMore command.
+ MockDBClientConnection::Responses callResponses = {
+ MockDBClientConnection::mockFindResponse(nss, cursorId, {docObj(1)}, metadata(1)),
+ MockDBClientConnection::mockGetMoreResponse(
+ nss, cursorId, {docObj(2)}, metadata(2), moreToCome)};
+ conn->setCallResponses(callResponses);
+
+ // First batch from the initial find command.
+ ASSERT_TRUE(cursor->init());
+ ASSERT_BSONOBJ_EQ(docObj(1), cursor->next());
+ ASSERT_FALSE(cursor->moreInCurrentBatch());
+
+ // Second batch from the first getMore command.
+ ASSERT_TRUE(cursor->more());
+ ASSERT_BSONOBJ_EQ(docObj(2), cursor->next());
+ ASSERT_FALSE(cursor->moreInCurrentBatch());
+}
+
TEST(MockDBClientConnTest, SimulateRecvErrors) {
- MockDBClientConnection conn;
+ MockRemoteDBServer server("test");
+ MockDBClientConnection conn(&server);
- const NamespaceString nss("test", "coll");
mongo::DBClientCursor cursor(&conn,
mongo::NamespaceStringOrUUID(nss),
Query().obj,
@@ -741,29 +763,8 @@ TEST(MockDBClientConnTest, SimulateRecvErrors) {
nullptr,
mongo::QueryOption_Exhaust,
0);
- cursor.setBatchSize(1);
-
- const auto docObj = [](int i) { return BSON("_id" << i); };
- const auto metadata = [](int i) { return BSON("$fakeMetaData" << i); };
- const long long cursorId = 123;
- const bool moreToCome = true;
-
- // Two batches from the initial find and getMore command.
- MockDBClientConnection::Responses callResponses = {
- MockDBClientConnection::mockFindResponse(nss, cursorId, {docObj(1)}, metadata(1)),
- MockDBClientConnection::mockGetMoreResponse(
- nss, cursorId, {docObj(2)}, metadata(2), moreToCome)};
- conn.setCallResponses(callResponses);
- // First batch from the initial find command.
- ASSERT_TRUE(cursor.init());
- ASSERT_BSONOBJ_EQ(docObj(1), cursor.next());
- ASSERT_FALSE(cursor.moreInCurrentBatch());
-
- // Second batch from the first getMore command.
- ASSERT_TRUE(cursor.more());
- ASSERT_BSONOBJ_EQ(docObj(2), cursor.next());
- ASSERT_FALSE(cursor.moreInCurrentBatch());
+ runUntilExhaustRecv(&conn, &cursor);
// Test network exception and error response from exhaust stream.
MockDBClientConnection::Responses recvResponses = {
@@ -787,10 +788,21 @@ TEST(MockDBClientConnTest, SimulateRecvErrors) {
ASSERT_TRUE(cursor.isDead());
}
+bool blockedOnNetworkSoon(MockDBClientConnection* conn) {
+ // Wait up to 10 seconds.
+ for (auto i = 0; i < 100; i++) {
+ if (conn->isBlockedOnNetwork()) {
+ return true;
+ }
+ mongo::sleepmillis(100);
+ }
+ return false;
+}
+
TEST(MockDBClientConnTest, BlockingNetwork) {
- MockDBClientConnection conn;
+ MockRemoteDBServer server("test");
+ MockDBClientConnection conn(&server);
- const NamespaceString nss("test", "coll");
mongo::DBClientCursor cursor(&conn,
mongo::NamespaceStringOrUUID(nss),
Query().obj,
@@ -801,11 +813,6 @@ TEST(MockDBClientConnTest, BlockingNetwork) {
0);
cursor.setBatchSize(1);
- const auto docObj = [](int i) { return BSON("_id" << i); };
- const auto metadata = [](int i) { return BSON("$fakeMetaData" << i); };
- const long long cursorId = 123;
- const bool moreToCome = true;
-
mongo::stdx::thread cursorThread([&] {
// First batch from the initial find command.
ASSERT_TRUE(cursor.init());
@@ -824,19 +831,8 @@ TEST(MockDBClientConnTest, BlockingNetwork) {
ASSERT_TRUE(cursor.isDead());
});
- const auto blockedOnNetworkSoon = [&]() {
- // Wait up to 10 seconds.
- for (auto i = 0; i < 100; i++) {
- if (conn.isBlockedOnNetwork()) {
- return true;
- }
- mongo::sleepmillis(100);
- }
- return false;
- };
-
// Cursor should be blocked on the first find command.
- ASSERT_TRUE(blockedOnNetworkSoon());
+ ASSERT_TRUE(blockedOnNetworkSoon(&conn));
auto m = conn.getLastSentMessage();
auto msg = mongo::OpMsg::parse(m);
ASSERT_EQ(mongo::StringData(msg.body.firstElement().fieldName()), "find");
@@ -845,7 +841,7 @@ TEST(MockDBClientConnTest, BlockingNetwork) {
{MockDBClientConnection::mockFindResponse(nss, cursorId, {docObj(1)}, metadata(1))});
// Cursor should be blocked on the getMore command.
- ASSERT_TRUE(blockedOnNetworkSoon());
+ ASSERT_TRUE(blockedOnNetworkSoon(&conn));
m = conn.getLastSentMessage();
msg = mongo::OpMsg::parse(m);
ASSERT_EQ(mongo::StringData(msg.body.firstElement().fieldName()), "getMore");
@@ -854,11 +850,154 @@ TEST(MockDBClientConnTest, BlockingNetwork) {
nss, cursorId, {docObj(2)}, metadata(2), moreToCome)});
// Cursor should be blocked on the exhaust stream.
- ASSERT_TRUE(blockedOnNetworkSoon());
+ ASSERT_TRUE(blockedOnNetworkSoon(&conn));
// Set the response for the exhaust stream and unblock network recv().
conn.setRecvResponses(
{MockDBClientConnection::mockGetMoreResponse(nss, 0, {docObj(3)}, metadata(3))});
cursorThread.join();
}
+
+TEST(MockDBClientConnTest, ShutdownServerBeforeCall) {
+ MockRemoteDBServer server("test");
+ MockDBClientConnection conn(&server);
+
+ ASSERT_OK(conn.connect(mongo::HostAndPort("localhost", 12345), mongo::StringData()));
+ mongo::DBClientCursor cursor(&conn,
+ mongo::NamespaceStringOrUUID(nss),
+ Query().obj,
+ 0,
+ 0,
+ nullptr,
+ mongo::QueryOption_Exhaust,
+ 0);
+
+ // Shut down server before call.
+ server.shutdown();
+
+ // Connection is broken before call.
+ ASSERT_THROWS_CODE(cursor.init(), mongo::DBException, mongo::ErrorCodes::SocketException);
+
+ // Reboot the mock server but the cursor.init() would still fail because the connection does not
+ // support autoreconnect.
+ server.reboot();
+ ASSERT_THROWS_CODE(cursor.init(), mongo::DBException, mongo::ErrorCodes::SocketException);
+}
+
+TEST(MockDBClientConnTest, ShutdownServerAfterCall) {
+ MockRemoteDBServer server("test");
+ MockDBClientConnection conn(&server);
+
+ mongo::DBClientCursor cursor(&conn,
+ mongo::NamespaceStringOrUUID(nss),
+ Query().obj,
+ 0,
+ 0,
+ nullptr,
+ mongo::QueryOption_Exhaust,
+ 0);
+
+ mongo::stdx::thread cursorThread([&] {
+ ASSERT_THROWS_CODE(cursor.init(), mongo::DBException, mongo::ErrorCodes::HostUnreachable);
+ });
+
+ // Cursor should be blocked on the first find command.
+ ASSERT_TRUE(blockedOnNetworkSoon(&conn));
+
+ // Shutting down the server doesn't interrupt the blocking network call, so we shut down the
+ // connection as well to simulate shutdown of the server.
+ server.shutdown();
+ conn.shutdown();
+
+ cursorThread.join();
+}
+
+TEST(MockDBClientConnTest, ConnectionAutoReconnect) {
+ const bool autoReconnect = true;
+ MockRemoteDBServer server("test");
+ MockDBClientConnection conn(&server, autoReconnect);
+
+ ASSERT_OK(conn.connect(mongo::HostAndPort("localhost", 12345), mongo::StringData()));
+ mongo::DBClientCursor cursor(&conn,
+ mongo::NamespaceStringOrUUID(nss),
+ Query().obj,
+ 0,
+ 0,
+ nullptr,
+ mongo::QueryOption_Exhaust,
+ 0);
+
+ server.shutdown();
+
+ // Connection is broken before call.
+ ASSERT_THROWS_CODE(cursor.init(), mongo::DBException, mongo::ErrorCodes::SocketException);
+
+ // AutoReconnect fails because the server is down.
+ ASSERT_THROWS_CODE(cursor.init(), mongo::DBException, mongo::ErrorCodes::HostUnreachable);
+
+ // Reboot the mock server and the cursor.init() will reconnect and succeed.
+ server.reboot();
+
+ conn.setCallResponses(
+ {MockDBClientConnection::mockFindResponse(nss, cursorId, {docObj(1)}, metadata(1))});
+ ASSERT_TRUE(cursor.init());
+ ASSERT_BSONOBJ_EQ(docObj(1), cursor.next());
+ ASSERT_FALSE(cursor.moreInCurrentBatch());
+}
+
+TEST(MockDBClientConnTest, ShutdownServerBeforeRecv) {
+ const bool autoReconnect = true;
+ MockRemoteDBServer server("test");
+ MockDBClientConnection conn(&server, autoReconnect);
+
+ mongo::DBClientCursor cursor(&conn,
+ mongo::NamespaceStringOrUUID(nss),
+ Query().obj,
+ 0,
+ 0,
+ nullptr,
+ mongo::QueryOption_Exhaust,
+ 0);
+
+ runUntilExhaustRecv(&conn, &cursor);
+
+ server.shutdown();
+
+ // cursor.more() will call recv on the exhaust stream.
+ ASSERT_THROWS_CODE(cursor.more(), mongo::DBException, mongo::ErrorCodes::SocketException);
+
+ server.reboot();
+ // Reconnect is not possible for exhaust recv.
+ ASSERT_THROWS_CODE(cursor.more(), mongo::DBException, mongo::ErrorCodes::SocketException);
+}
+
+TEST(MockDBClientConnTest, ShutdownServerAfterRecv) {
+ MockRemoteDBServer server("test");
+ MockDBClientConnection conn(&server);
+
+ mongo::DBClientCursor cursor(&conn,
+ mongo::NamespaceStringOrUUID(nss),
+ Query().obj,
+ 0,
+ 0,
+ nullptr,
+ mongo::QueryOption_Exhaust,
+ 0);
+
+ runUntilExhaustRecv(&conn, &cursor);
+
+ mongo::stdx::thread cursorThread([&] {
+ ASSERT_THROWS_CODE(cursor.more(), mongo::DBException, mongo::ErrorCodes::HostUnreachable);
+ });
+
+ // Cursor should be blocked on the recv.
+ ASSERT_TRUE(blockedOnNetworkSoon(&conn));
+
+ // Shutting down the server doesn't interrupt the blocking network call, so we shut down the
+ // connection as well to simulate shutdown of the server.
+ server.shutdown();
+ conn.shutdown();
+
+ cursorThread.join();
+}
} // namespace mongo_test