summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorAmirsaman Memaripour <amirsaman.memaripour@mongodb.com>2020-07-27 22:17:21 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-07-28 23:31:54 +0000
commit165dfb50f7693cadb80e988d08222700b1cc6b5c (patch)
tree4f8512dcb1c9637feee9cbf7693d46185190cdc1 /src/mongo
parent48a125c2ae350d0e915c9aa3e6212dda190bacb5 (diff)
downloadmongo-165dfb50f7693cadb80e988d08222700b1cc6b5c.tar.gz
SERVER-49106 Add poll function to the ServiceExecutor API
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/transport/SConscript2
-rw-r--r--src/mongo/transport/mock_session.h22
-rw-r--r--src/mongo/transport/service_executor.h10
-rw-r--r--src/mongo/transport/service_executor_fixed.cpp8
-rw-r--r--src/mongo/transport/service_executor_fixed.h3
-rw-r--r--src/mongo/transport/service_executor_noop.h64
-rw-r--r--src/mongo/transport/service_executor_reserved.cpp7
-rw-r--r--src/mongo/transport/service_executor_reserved.h3
-rw-r--r--src/mongo/transport/service_executor_synchronous.cpp8
-rw-r--r--src/mongo/transport/service_executor_synchronous.h3
-rw-r--r--src/mongo/transport/service_executor_test.cpp24
-rw-r--r--src/mongo/transport/service_executor_utils.cpp (renamed from src/mongo/transport/service_entry_point_utils.cpp)15
-rw-r--r--src/mongo/transport/service_executor_utils.h (renamed from src/mongo/transport/service_entry_point_utils.h)12
-rw-r--r--src/mongo/transport/service_state_machine_test.cpp6
-rw-r--r--src/mongo/transport/session.h5
-rw-r--r--src/mongo/transport/session_asio.h8
16 files changed, 132 insertions, 68 deletions
diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript
index 813401c3c94..1f7f6a16107 100644
--- a/src/mongo/transport/SConscript
+++ b/src/mongo/transport/SConscript
@@ -8,7 +8,6 @@ env.Library(
target='transport_layer_common',
source=[
'ismaster_metrics.cpp',
- 'service_entry_point_utils.cpp',
'session.cpp',
'transport_layer.cpp',
],
@@ -89,6 +88,7 @@ tlEnv.Library(
'service_executor_fixed.cpp',
'service_executor_reserved.cpp',
'service_executor_synchronous.cpp',
+ 'service_executor_utils.cpp',
env.Idlc('service_executor.idl')[0],
],
LIBDEPS=[
diff --git a/src/mongo/transport/mock_session.h b/src/mongo/transport/mock_session.h
index d88a0af263e..72d05aa3e66 100644
--- a/src/mongo/transport/mock_session.h
+++ b/src/mongo/transport/mock_session.h
@@ -29,8 +29,11 @@
#pragma once
+#include <list>
+
#include "mongo/base/checked_cast.h"
#include "mongo/config.h"
+#include "mongo/stdx/mutex.h"
#include "mongo/transport/session.h"
#include "mongo/transport/transport_layer_mock.h"
#include "mongo/util/net/hostandport.h"
@@ -100,6 +103,22 @@ public:
return Future<Message>::makeReady(sourceMessage());
}
+ Future<void> waitForData() override {
+ auto fp = makePromiseFuture<void>();
+ stdx::lock_guard<Latch> lk(_waitForDataMutex);
+ _waitForDataQueue.emplace_back(std::move(fp.promise));
+ return std::move(fp.future);
+ }
+
+ void signalAvailableData() {
+ stdx::lock_guard<Latch> lk(_waitForDataMutex);
+ if (_waitForDataQueue.size() == 0)
+ return;
+ Promise<void> promise = std::move(_waitForDataQueue.front());
+ _waitForDataQueue.pop_front();
+ promise.emplaceValue();
+ }
+
Status sinkMessage(Message message) override {
if (!_tl || _tl->inShutdown()) {
return TransportLayer::ShutdownStatus;
@@ -154,6 +173,9 @@ protected:
HostAndPort _local;
SockAddr _remoteAddr;
SockAddr _localAddr;
+
+ mutable Mutex _waitForDataMutex = MONGO_MAKE_LATCH("MockSession::_waitForDataMutex");
+ std::list<Promise<void>> _waitForDataQueue;
};
} // namespace transport
diff --git a/src/mongo/transport/service_executor.h b/src/mongo/transport/service_executor.h
index 859969efa66..b702198e6b5 100644
--- a/src/mongo/transport/service_executor.h
+++ b/src/mongo/transport/service_executor.h
@@ -45,6 +45,8 @@ class ServiceContext;
namespace transport {
+class Session;
+
/*
* This is the interface for all ServiceExecutors.
*/
@@ -95,6 +97,14 @@ public:
}
/*
+ * Awaits the availability of incoming data for the specified session. On success, it will
+ * schedule the callback on current executor. Otherwise, it will invoke the callback with a
+ * non-okay status on the caller thread.
+ */
+ virtual void runOnDataAvailable(Session* session,
+ OutOfLineExecutor::Task onCompletionCallback) = 0;
+
+ /*
* Stops and joins the ServiceExecutor. Any outstanding tasks will not be executed, and any
* associated callbacks waiting on I/O may get called with an error code.
*
diff --git a/src/mongo/transport/service_executor_fixed.cpp b/src/mongo/transport/service_executor_fixed.cpp
index ac3653126e0..abdd99ca586 100644
--- a/src/mongo/transport/service_executor_fixed.cpp
+++ b/src/mongo/transport/service_executor_fixed.cpp
@@ -34,6 +34,8 @@
#include "mongo/base/error_codes.h"
#include "mongo/logv2/log.h"
#include "mongo/transport/service_executor_gen.h"
+#include "mongo/transport/session.h"
+#include "mongo/util/assert_util.h"
#include "mongo/util/fail_point.h"
namespace mongo {
@@ -149,6 +151,12 @@ Status ServiceExecutorFixed::scheduleTask(Task task, ScheduleFlags flags) {
return Status::OK();
}
+void ServiceExecutorFixed::runOnDataAvailable(Session* session,
+ OutOfLineExecutor::Task onCompletionCallback) {
+ invariant(session);
+ session->waitForData().thenRunOn(shared_from_this()).getAsync(std::move(onCompletionCallback));
+}
+
void ServiceExecutorFixed::appendStats(BSONObjBuilder* bob) const {
*bob << kExecutorLabel << kExecutorName << kThreadsRunning
<< static_cast<int>(_numRunningExecutorThreads.load());
diff --git a/src/mongo/transport/service_executor_fixed.h b/src/mongo/transport/service_executor_fixed.h
index 127c3e5d24d..b7a90d43447 100644
--- a/src/mongo/transport/service_executor_fixed.h
+++ b/src/mongo/transport/service_executor_fixed.h
@@ -58,6 +58,9 @@ public:
Status shutdown(Milliseconds timeout) override;
Status scheduleTask(Task task, ScheduleFlags flags) override;
+ void runOnDataAvailable(Session* session,
+ OutOfLineExecutor::Task onCompletionCallback) override;
+
Mode transportMode() const override {
return Mode::kSynchronous;
}
diff --git a/src/mongo/transport/service_executor_noop.h b/src/mongo/transport/service_executor_noop.h
deleted file mode 100644
index b12de2247f3..00000000000
--- a/src/mongo/transport/service_executor_noop.h
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include "mongo/base/status.h"
-#include "mongo/transport/service_executor.h"
-
-namespace mongo {
-namespace transport {
-
-/**
- * The noop service executor provides the necessary interface for some unittests. Doesn't actually
- * execute any work
- */
-class ServiceExecutorNoop final : public ServiceExecutor {
-public:
- explicit ServiceExecutorNoop(ServiceContext* ctx) {}
-
- Status start() override {
- return Status::OK();
- }
- Status shutdown(Milliseconds timeout) override {
- return Status::OK();
- }
- Status schedule(Task task, ScheduleFlags flags) override {
- return Status::OK();
- }
-
- Mode transportMode() const override {
- return Mode::kSynchronous;
- }
-
- void appendStats(BSONObjBuilder* bob) const override {}
-};
-
-} // namespace transport
-} // namespace mongo
diff --git a/src/mongo/transport/service_executor_reserved.cpp b/src/mongo/transport/service_executor_reserved.cpp
index c03070a43e9..cae9653e60f 100644
--- a/src/mongo/transport/service_executor_reserved.cpp
+++ b/src/mongo/transport/service_executor_reserved.cpp
@@ -35,8 +35,8 @@
#include "mongo/logv2/log.h"
#include "mongo/stdx/thread.h"
-#include "mongo/transport/service_entry_point_utils.h"
#include "mongo/transport/service_executor_gen.h"
+#include "mongo/transport/service_executor_utils.h"
#include "mongo/util/processinfo.h"
namespace mongo {
@@ -200,5 +200,10 @@ void ServiceExecutorReserved::appendStats(BSONObjBuilder* bob) const {
<< static_cast<int>(_numStartingThreads);
}
+void ServiceExecutorReserved::runOnDataAvailable(Session* session,
+ OutOfLineExecutor::Task onCompletionCallback) {
+ scheduleCallbackOnDataAvailable(session, std::move(onCompletionCallback), this);
+}
+
} // namespace transport
} // namespace mongo
diff --git a/src/mongo/transport/service_executor_reserved.h b/src/mongo/transport/service_executor_reserved.h
index 4d2aabbce36..e3acf6febd9 100644
--- a/src/mongo/transport/service_executor_reserved.h
+++ b/src/mongo/transport/service_executor_reserved.h
@@ -62,6 +62,9 @@ public:
return Mode::kSynchronous;
}
+ void runOnDataAvailable(Session* session,
+ OutOfLineExecutor::Task onCompletionCallback) override;
+
void appendStats(BSONObjBuilder* bob) const override;
private:
diff --git a/src/mongo/transport/service_executor_synchronous.cpp b/src/mongo/transport/service_executor_synchronous.cpp
index cab4c158433..d034e208954 100644
--- a/src/mongo/transport/service_executor_synchronous.cpp
+++ b/src/mongo/transport/service_executor_synchronous.cpp
@@ -35,8 +35,8 @@
#include "mongo/logv2/log.h"
#include "mongo/stdx/thread.h"
-#include "mongo/transport/service_entry_point_utils.h"
#include "mongo/transport/service_executor_gen.h"
+#include "mongo/transport/service_executor_utils.h"
#include "mongo/util/processinfo.h"
namespace mongo {
@@ -141,5 +141,11 @@ void ServiceExecutorSynchronous::appendStats(BSONObjBuilder* bob) const {
<< static_cast<int>(_numRunningWorkerThreads.loadRelaxed());
}
+void ServiceExecutorSynchronous::runOnDataAvailable(Session* session,
+ OutOfLineExecutor::Task onCompletionCallback) {
+ scheduleCallbackOnDataAvailable(session, std::move(onCompletionCallback), this);
+}
+
+
} // namespace transport
} // namespace mongo
diff --git a/src/mongo/transport/service_executor_synchronous.h b/src/mongo/transport/service_executor_synchronous.h
index 19a651f9f09..940382b53e8 100644
--- a/src/mongo/transport/service_executor_synchronous.h
+++ b/src/mongo/transport/service_executor_synchronous.h
@@ -57,6 +57,9 @@ public:
return Mode::kSynchronous;
}
+ void runOnDataAvailable(Session* session,
+ OutOfLineExecutor::Task onCompletionCallback) override;
+
void appendStats(BSONObjBuilder* bob) const override;
private:
diff --git a/src/mongo/transport/service_executor_test.cpp b/src/mongo/transport/service_executor_test.cpp
index eb1927da8aa..59bed8cc568 100644
--- a/src/mongo/transport/service_executor_test.cpp
+++ b/src/mongo/transport/service_executor_test.cpp
@@ -37,10 +37,12 @@
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/service_context.h"
#include "mongo/logv2/log.h"
+#include "mongo/transport/mock_session.h"
#include "mongo/transport/service_executor_fixed.h"
#include "mongo/transport/service_executor_gen.h"
#include "mongo/transport/service_executor_synchronous.h"
#include "mongo/transport/transport_layer.h"
+#include "mongo/transport/transport_layer_mock.h"
#include "mongo/unittest/barrier.h"
#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
@@ -343,5 +345,27 @@ TEST_F(ServiceExecutorFixedFixture, ScheduleFailsAfterShutdown) {
schedulerThread->join();
}
+TEST_F(ServiceExecutorFixedFixture, RunTaskAfterWaitingForData) {
+ auto tl = std::make_unique<TransportLayerMock>();
+ auto session = tl->createSession();
+
+ auto executor = startAndGetServiceExecutor();
+
+ const auto mainThreadId = stdx::this_thread::get_id();
+ AtomicWord<bool> ranOnDataAvailable{false};
+ auto barrier = std::make_shared<unittest::Barrier>(2);
+ executor->runOnDataAvailable(
+ session.get(), [&ranOnDataAvailable, mainThreadId, barrier](Status) mutable -> void {
+ ranOnDataAvailable.store(true);
+ ASSERT(stdx::this_thread::get_id() != mainThreadId);
+ barrier->countDownAndWait();
+ });
+
+ ASSERT(!ranOnDataAvailable.load());
+ reinterpret_cast<MockSession*>(session.get())->signalAvailableData();
+ barrier->countDownAndWait();
+ ASSERT(ranOnDataAvailable.load());
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/transport/service_entry_point_utils.cpp b/src/mongo/transport/service_executor_utils.cpp
index f0c95a28dba..450babfb36c 100644
--- a/src/mongo/transport/service_entry_point_utils.cpp
+++ b/src/mongo/transport/service_executor_utils.cpp
@@ -31,7 +31,7 @@
#include "mongo/platform/basic.h"
-#include "mongo/transport/service_entry_point_utils.h"
+#include "mongo/transport/service_executor_utils.h"
#include <fmt/format.h>
#include <functional>
@@ -39,6 +39,7 @@
#include "mongo/logv2/log.h"
#include "mongo/stdx/thread.h"
+#include "mongo/transport/service_executor.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/debug_util.h"
#include "mongo/util/thread_safety_context.h"
@@ -128,4 +129,16 @@ Status launchServiceWorkerThread(unique_function<void()> task) noexcept {
return Status::OK();
}
+void scheduleCallbackOnDataAvailable(transport::Session* session,
+ unique_function<void(Status)> callback,
+ transport::ServiceExecutor* executor) noexcept {
+ invariant(session);
+ try {
+ session->waitForData().get();
+ executor->schedule(std::move(callback));
+ } catch (DBException& e) {
+ callback(e.toStatus());
+ }
+}
+
} // namespace mongo
diff --git a/src/mongo/transport/service_entry_point_utils.h b/src/mongo/transport/service_executor_utils.h
index f6f83d6d37e..3dc0c4ab11f 100644
--- a/src/mongo/transport/service_entry_point_utils.h
+++ b/src/mongo/transport/service_executor_utils.h
@@ -36,6 +36,18 @@
namespace mongo {
+namespace transport {
+class ServiceExecutor;
+}
+
Status launchServiceWorkerThread(unique_function<void()> task) noexcept;
+/* The default implementation for "ServiceExecutor::runOnDataAvailable()", which blocks the caller
+ * thread until data is available for reading. On success, it schedules "callback" on "executor".
+ * Other implementations (e.g., "ServiceExecutorFixed") may provide asynchronous variants.
+ */
+void scheduleCallbackOnDataAvailable(transport::Session* session,
+ unique_function<void(Status)> callback,
+ transport::ServiceExecutor* executor) noexcept;
+
} // namespace mongo
diff --git a/src/mongo/transport/service_state_machine_test.cpp b/src/mongo/transport/service_state_machine_test.cpp
index 0189093a34e..b5aceac7ea5 100644
--- a/src/mongo/transport/service_state_machine_test.cpp
+++ b/src/mongo/transport/service_state_machine_test.cpp
@@ -44,6 +44,7 @@
#include "mongo/transport/mock_session.h"
#include "mongo/transport/service_entry_point.h"
#include "mongo/transport/service_executor.h"
+#include "mongo/transport/service_executor_utils.h"
#include "mongo/transport/service_state_machine.h"
#include "mongo/transport/transport_layer_mock.h"
#include "mongo/unittest/unittest.h"
@@ -266,6 +267,11 @@ public:
return Mode::kSynchronous;
}
+ void runOnDataAvailable(Session* session,
+ OutOfLineExecutor::Task onCompletionCallback) override {
+ scheduleCallbackOnDataAvailable(session, std::move(onCompletionCallback), this);
+ }
+
void appendStats(BSONObjBuilder* bob) const override {}
void setScheduleHook(ScheduleHook hook) {
diff --git a/src/mongo/transport/session.h b/src/mongo/transport/session.h
index 8f51cdc80e3..39595d8ed37 100644
--- a/src/mongo/transport/session.h
+++ b/src/mongo/transport/session.h
@@ -116,6 +116,11 @@ public:
virtual Future<Message> asyncSourceMessage(const BatonHandle& handle = nullptr) = 0;
/**
+ * Asynchronously waits for the availability of incoming data.
+ */
+ virtual Future<void> waitForData() = 0;
+
+ /**
* Sink (send) a Message to the remote host for this Session.
*
* Async version will keep the buffer alive until the operation completes.
diff --git a/src/mongo/transport/session_asio.h b/src/mongo/transport/session_asio.h
index 315cef28975..e0e9701857b 100644
--- a/src/mongo/transport/session_asio.h
+++ b/src/mongo/transport/session_asio.h
@@ -167,6 +167,14 @@ public:
return sourceMessageImpl(baton);
}
+ Future<void> waitForData() override {
+#ifdef MONGO_CONFIG_SSL
+ if (_sslSocket)
+ return asio::async_read(*_sslSocket, asio::null_buffers(), UseFuture{}).ignoreValue();
+#endif
+ return asio::async_read(_socket, asio::null_buffers(), UseFuture{}).ignoreValue();
+ }
+
Status sinkMessage(Message message) override {
ensureSync();