diff options
author | Eugene Ostroukhov <eostroukhov@google.com> | 2018-05-21 16:59:04 -0700 |
---|---|---|
committer | Anna Henningsen <anna@addaleax.net> | 2018-07-13 23:42:50 +0200 |
commit | 39977db7c015e94d33885249f50e62fa8b1f1bb9 (patch) | |
tree | fb330cb04106d7a030b1599549f5503c3ea086ea /src/inspector_io.cc | |
parent | 9374a83d6983710844c5436f32c14242ba600a20 (diff) | |
download | node-new-39977db7c015e94d33885249f50e62fa8b1f1bb9.tar.gz |
inspector: split main thread interface from transport
Workers debugging will require interfacing between the "main" inspector
and per-worker isolate inspectors. This is consistent with what WS
interface does. This change is a refactoring change and does not change
the functionality.
PR-URL: https://github.com/nodejs/node/pull/21182
Fixes: https://github.com/nodejs/node/issues/21725
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: James M Snell <jasnell@gmail.com>
Diffstat (limited to 'src/inspector_io.cc')
-rw-r--r-- | src/inspector_io.cc | 549 |
1 files changed, 214 insertions, 335 deletions
diff --git a/src/inspector_io.cc b/src/inspector_io.cc index 78ecce7398..41fea546a8 100644 --- a/src/inspector_io.cc +++ b/src/inspector_io.cc @@ -1,6 +1,7 @@ #include "inspector_io.h" #include "inspector_socket_server.h" +#include "inspector/main_thread_interface.h" #include "inspector/node_string.h" #include "env-inl.h" #include "debug_utils.h" @@ -11,23 +12,16 @@ #include "util.h" #include "zlib.h" -#include <sstream> -#include <unicode/unistr.h> - +#include <deque> #include <string.h> #include <vector> - namespace node { namespace inspector { namespace { -using AsyncAndAgent = std::pair<uv_async_t, Agent*>; using v8_inspector::StringBuffer; using v8_inspector::StringView; -template <typename Transport> -using TransportAndIo = std::pair<Transport*, InspectorIo*>; - std::string ScriptPath(uv_loop_t* loop, const std::string& script_name) { std::string script_path; @@ -64,45 +58,151 @@ std::string GenerateID() { return uuid; } -void HandleSyncCloseCb(uv_handle_t* handle) { - *static_cast<bool*>(handle->data) = true; -} +class RequestToServer { + public: + RequestToServer(TransportAction action, + int session_id, + std::unique_ptr<v8_inspector::StringBuffer> message) + : action_(action), + session_id_(session_id), + message_(std::move(message)) {} + + void Dispatch(InspectorSocketServer* server) const { + switch (action_) { + case TransportAction::kKill: + server->TerminateConnections(); + // Fallthrough + case TransportAction::kStop: + server->Stop(); + break; + case TransportAction::kSendMessage: + server->Send( + session_id_, + protocol::StringUtil::StringViewToUtf8(message_->string())); + break; + } + } -void CloseAsyncAndLoop(uv_async_t* async) { - bool is_closed = false; - async->data = &is_closed; - uv_close(reinterpret_cast<uv_handle_t*>(async), HandleSyncCloseCb); - while (!is_closed) - uv_run(async->loop, UV_RUN_ONCE); - async->data = nullptr; - CheckedUvLoopClose(async->loop); -} + private: + TransportAction action_; + int session_id_; + std::unique_ptr<v8_inspector::StringBuffer> message_; +}; -// Delete main_thread_req_ on async handle close -void ReleasePairOnAsyncClose(uv_handle_t* async) { - std::unique_ptr<AsyncAndAgent> pair(node::ContainerOf(&AsyncAndAgent::first, - reinterpret_cast<uv_async_t*>(async))); - // Unique_ptr goes out of scope here and pointer is deleted. -} +class RequestQueueData { + public: + using MessageQueue = std::deque<RequestToServer>; + + explicit RequestQueueData(uv_loop_t* loop) + : handle_(std::make_shared<RequestQueue>(this)) { + int err = uv_async_init(loop, &async_, [](uv_async_t* async) { + RequestQueueData* wrapper = + node::ContainerOf(&RequestQueueData::async_, async); + wrapper->DoDispatch(); + }); + CHECK_EQ(0, err); + } + + static void CloseAndFree(RequestQueueData* queue); + + void Post(int session_id, + TransportAction action, + std::unique_ptr<StringBuffer> message) { + Mutex::ScopedLock scoped_lock(state_lock_); + bool notify = messages_.empty(); + messages_.emplace_back(action, session_id, std::move(message)); + if (notify) { + CHECK_EQ(0, uv_async_send(&async_)); + incoming_message_cond_.Broadcast(scoped_lock); + } + } + + void Wait() { + Mutex::ScopedLock scoped_lock(state_lock_); + if (messages_.empty()) { + incoming_message_cond_.Wait(scoped_lock); + } + } + + void SetServer(InspectorSocketServer* server) { + server_ = server; + } + std::shared_ptr<RequestQueue> handle() { + return handle_; + } + + private: + ~RequestQueueData() = default; + + MessageQueue GetMessages() { + Mutex::ScopedLock scoped_lock(state_lock_); + MessageQueue messages; + messages_.swap(messages); + return messages; + } + + void DoDispatch() { + if (server_ == nullptr) + return; + for (const auto& request : GetMessages()) { + request.Dispatch(server_); + } + } + + std::shared_ptr<RequestQueue> handle_; + uv_async_t async_; + InspectorSocketServer* server_ = nullptr; + MessageQueue messages_; + Mutex state_lock_; // Locked before mutating the queue. + ConditionVariable incoming_message_cond_; +}; } // namespace -std::unique_ptr<StringBuffer> Utf8ToStringView(const std::string& message) { - icu::UnicodeString utf16 = - icu::UnicodeString::fromUTF8(icu::StringPiece(message.data(), - message.length())); - StringView view(reinterpret_cast<const uint16_t*>(utf16.getBuffer()), - utf16.length()); - return StringBuffer::create(view); -} +class RequestQueue { + public: + explicit RequestQueue(RequestQueueData* data) : data_(data) {} + + void Reset() { + Mutex::ScopedLock scoped_lock(lock_); + data_ = nullptr; + } + void Post(int session_id, + TransportAction action, + std::unique_ptr<StringBuffer> message) { + Mutex::ScopedLock scoped_lock(lock_); + if (data_ != nullptr) + data_->Post(session_id, action, std::move(message)); + } + + void SetServer(InspectorSocketServer* server) { + Mutex::ScopedLock scoped_lock(lock_); + if (data_ != nullptr) + data_->SetServer(server); + } + + bool Expired() { + Mutex::ScopedLock scoped_lock(lock_); + return data_ == nullptr; + } + + private: + RequestQueueData* data_; + Mutex lock_; +}; class IoSessionDelegate : public InspectorSessionDelegate { public: - explicit IoSessionDelegate(InspectorIo* io, int id) : io_(io), id_(id) { } - void SendMessageToFrontend(const v8_inspector::StringView& message) override; + explicit IoSessionDelegate(std::shared_ptr<RequestQueue> queue, int id) + : request_queue_(queue), id_(id) { } + void SendMessageToFrontend(const v8_inspector::StringView& message) override { + request_queue_->Post(id_, TransportAction::kSendMessage, + StringBuffer::create(message)); + } + private: - InspectorIo* io_; + std::shared_ptr<RequestQueue> request_queue_; int id_; }; @@ -110,361 +210,133 @@ class IoSessionDelegate : public InspectorSessionDelegate { // mostly session start, message received, and session end. class InspectorIoDelegate: public node::inspector::SocketServerDelegate { public: - InspectorIoDelegate(InspectorIo* io, const std::string& target_id, + InspectorIoDelegate(std::shared_ptr<RequestQueueData> queue, + std::shared_ptr<MainThreadHandle> main_threade, + const std::string& target_id, const std::string& script_path, - const std::string& script_name, bool wait); + const std::string& script_name); ~InspectorIoDelegate() { - io_->ServerDone(); } - // Calls PostIncomingMessage() with appropriate InspectorAction: - // kStartSession + void StartSession(int session_id, const std::string& target_id) override; - // kSendMessage void MessageReceived(int session_id, const std::string& message) override; - // kEndSession void EndSession(int session_id) override; std::vector<std::string> GetTargetIds() override; std::string GetTargetTitle(const std::string& id) override; std::string GetTargetUrl(const std::string& id) override; - void AssignServer(InspectorSocketServer* server) override { - server_ = server; + request_queue_->SetServer(server); } private: - InspectorIo* io_; - int session_id_; + std::shared_ptr<RequestQueueData> request_queue_; + std::shared_ptr<MainThreadHandle> main_thread_; + std::unordered_map<int, std::unique_ptr<InspectorSession>> sessions_; const std::string script_name_; const std::string script_path_; const std::string target_id_; - bool waiting_; - InspectorSocketServer* server_; }; -void InterruptCallback(v8::Isolate*, void* agent) { - InspectorIo* io = static_cast<Agent*>(agent)->io(); - if (io != nullptr) - io->DispatchMessages(); -} - -class DispatchMessagesTask : public v8::Task { - public: - explicit DispatchMessagesTask(Agent* agent) : agent_(agent) {} - - void Run() override { - InspectorIo* io = agent_->io(); - if (io != nullptr) - io->DispatchMessages(); +// static +std::unique_ptr<InspectorIo> InspectorIo::Start( + std::shared_ptr<MainThreadHandle> main_thread, + const std::string& path, + const DebugOptions& options) { + auto io = std::unique_ptr<InspectorIo>( + new InspectorIo(main_thread, path, options)); + if (io->request_queue_->Expired()) { // Thread is not running + return nullptr; } - - private: - Agent* agent_; -}; - -InspectorIo::InspectorIo(Environment* env, const std::string& path, - const DebugOptions& options, bool wait_for_connect) - : options_(options), thread_(), state_(State::kNew), - parent_env_(env), thread_req_(), - platform_(parent_env_->isolate_data()->platform()), - dispatching_messages_(false), script_name_(path), - wait_for_connect_(wait_for_connect), port_(-1), - id_(GenerateID()) { - main_thread_req_ = new AsyncAndAgent({uv_async_t(), env->inspector_agent()}); - CHECK_EQ(0, uv_async_init(env->event_loop(), &main_thread_req_->first, - InspectorIo::MainThreadReqAsyncCb)); - uv_unref(reinterpret_cast<uv_handle_t*>(&main_thread_req_->first)); - CHECK_EQ(0, uv_sem_init(&thread_start_sem_, 0)); + return io; } -InspectorIo::~InspectorIo() { - uv_sem_destroy(&thread_start_sem_); - uv_close(reinterpret_cast<uv_handle_t*>(&main_thread_req_->first), - ReleasePairOnAsyncClose); -} - -bool InspectorIo::Start() { - CHECK_EQ(state_, State::kNew); +InspectorIo::InspectorIo(std::shared_ptr<MainThreadHandle> main_thread, + const std::string& path, + const DebugOptions& options) + : main_thread_(main_thread), options_(options), + thread_(), script_name_(path), id_(GenerateID()) { + Mutex::ScopedLock scoped_lock(thread_start_lock_); CHECK_EQ(uv_thread_create(&thread_, InspectorIo::ThreadMain, this), 0); - uv_sem_wait(&thread_start_sem_); - - if (state_ == State::kError) { - return false; - } - state_ = State::kAccepting; - if (wait_for_connect_) { - DispatchMessages(); - } - return true; + thread_start_condition_.Wait(scoped_lock); } -void InspectorIo::Stop() { - CHECK_IMPLIES(sessions_.empty(), state_ == State::kAccepting); - Write(TransportAction::kKill, 0, StringView()); +InspectorIo::~InspectorIo() { + request_queue_->Post(0, TransportAction::kKill, nullptr); int err = uv_thread_join(&thread_); CHECK_EQ(err, 0); - state_ = State::kShutDown; - DispatchMessages(); } -bool InspectorIo::IsStarted() { - return platform_ != nullptr; -} - -void InspectorIo::WaitForDisconnect() { - if (state_ == State::kAccepting) - state_ = State::kDone; - if (!sessions_.empty()) { - state_ = State::kShutDown; - Write(TransportAction::kStop, 0, StringView()); - fprintf(stderr, "Waiting for the debugger to disconnect...\n"); - fflush(stderr); - } +void InspectorIo::StopAcceptingNewConnections() { + request_queue_->Post(0, TransportAction::kStop, nullptr); } // static void InspectorIo::ThreadMain(void* io) { - static_cast<InspectorIo*>(io)->ThreadMain<InspectorSocketServer>(); -} - -// static -template <typename Transport> -void InspectorIo::IoThreadAsyncCb(uv_async_t* async) { - TransportAndIo<Transport>* transport_and_io = - static_cast<TransportAndIo<Transport>*>(async->data); - if (transport_and_io == nullptr) { - return; - } - Transport* transport = transport_and_io->first; - InspectorIo* io = transport_and_io->second; - MessageQueue<TransportAction> outgoing_message_queue; - io->SwapBehindLock(&io->outgoing_message_queue_, &outgoing_message_queue); - for (const auto& outgoing : outgoing_message_queue) { - int session_id = std::get<1>(outgoing); - switch (std::get<0>(outgoing)) { - case TransportAction::kKill: - transport->TerminateConnections(); - // Fallthrough - case TransportAction::kStop: - transport->Stop(); - break; - case TransportAction::kSendMessage: - transport->Send(session_id, - protocol::StringUtil::StringViewToUtf8( - std::get<2>(outgoing)->string())); - break; - case TransportAction::kAcceptSession: - transport->AcceptSession(session_id); - break; - case TransportAction::kDeclineSession: - transport->DeclineSession(session_id); - break; - } - } + static_cast<InspectorIo*>(io)->ThreadMain(); } -template <typename Transport> void InspectorIo::ThreadMain() { uv_loop_t loop; loop.data = nullptr; int err = uv_loop_init(&loop); CHECK_EQ(err, 0); - thread_req_.data = nullptr; - err = uv_async_init(&loop, &thread_req_, IoThreadAsyncCb<Transport>); - CHECK_EQ(err, 0); + std::shared_ptr<RequestQueueData> queue(new RequestQueueData(&loop), + RequestQueueData::CloseAndFree); std::string script_path = ScriptPath(&loop, script_name_); - auto delegate = std::unique_ptr<InspectorIoDelegate>( - new InspectorIoDelegate(this, id_, script_path, script_name_, - wait_for_connect_)); - Transport server(std::move(delegate), &loop, options_.host_name(), - options_.port()); - TransportAndIo<Transport> queue_transport(&server, this); - thread_req_.data = &queue_transport; - if (!server.Start()) { - state_ = State::kError; // Safe, main thread is waiting on semaphore - CloseAsyncAndLoop(&thread_req_); - uv_sem_post(&thread_start_sem_); - return; - } - port_ = server.Port(); // Safe, main thread is waiting on semaphore. - if (!wait_for_connect_) { - uv_sem_post(&thread_start_sem_); + std::unique_ptr<InspectorIoDelegate> delegate( + new InspectorIoDelegate(queue, main_thread_, id_, + script_path, script_name_)); + InspectorSocketServer server(std::move(delegate), &loop, + options_.host_name(), options_.port()); + request_queue_ = queue->handle(); + // Its lifetime is now that of the server delegate + queue.reset(); + { + Mutex::ScopedLock scoped_lock(thread_start_lock_); + if (server.Start()) { + port_ = server.Port(); + } + thread_start_condition_.Broadcast(scoped_lock); } uv_run(&loop, UV_RUN_DEFAULT); - thread_req_.data = nullptr; CheckedUvLoopClose(&loop); } -template <typename ActionType> -bool InspectorIo::AppendMessage(MessageQueue<ActionType>* queue, - ActionType action, int session_id, - std::unique_ptr<StringBuffer> buffer) { - Mutex::ScopedLock scoped_lock(state_lock_); - bool trigger_pumping = queue->empty(); - queue->push_back(std::make_tuple(action, session_id, std::move(buffer))); - return trigger_pumping; -} - -template <typename ActionType> -void InspectorIo::SwapBehindLock(MessageQueue<ActionType>* vector1, - MessageQueue<ActionType>* vector2) { - Mutex::ScopedLock scoped_lock(state_lock_); - vector1->swap(*vector2); -} - -void InspectorIo::PostIncomingMessage(InspectorAction action, int session_id, - const std::string& message) { - Debug(parent_env_, DebugCategory::INSPECTOR_SERVER, - ">>> %s\n", message.c_str()); - if (AppendMessage(&incoming_message_queue_, action, session_id, - Utf8ToStringView(message))) { - Agent* agent = main_thread_req_->second; - v8::Isolate* isolate = parent_env_->isolate(); - platform_->CallOnForegroundThread(isolate, - new DispatchMessagesTask(agent)); - isolate->RequestInterrupt(InterruptCallback, agent); - CHECK_EQ(0, uv_async_send(&main_thread_req_->first)); - } - Mutex::ScopedLock scoped_lock(state_lock_); - incoming_message_cond_.Broadcast(scoped_lock); -} - std::vector<std::string> InspectorIo::GetTargetIds() const { return { id_ }; } -TransportAction InspectorIo::Attach(int session_id) { - Agent* agent = parent_env_->inspector_agent(); - fprintf(stderr, "Debugger attached.\n"); - sessions_[session_id] = agent->Connect(std::unique_ptr<IoSessionDelegate>( - new IoSessionDelegate(this, session_id))); - return TransportAction::kAcceptSession; -} - -void InspectorIo::DispatchMessages() { - if (dispatching_messages_) - return; - dispatching_messages_ = true; - bool had_messages = false; - do { - if (dispatching_message_queue_.empty()) - SwapBehindLock(&incoming_message_queue_, &dispatching_message_queue_); - had_messages = !dispatching_message_queue_.empty(); - while (!dispatching_message_queue_.empty()) { - MessageQueue<InspectorAction>::value_type task; - std::swap(dispatching_message_queue_.front(), task); - dispatching_message_queue_.pop_front(); - int id = std::get<1>(task); - StringView message = std::get<2>(task)->string(); - switch (std::get<0>(task)) { - case InspectorAction::kStartSession: - Write(Attach(id), id, StringView()); - break; - case InspectorAction::kStartSessionUnconditionally: - Attach(id); - break; - case InspectorAction::kEndSession: - sessions_.erase(id); - if (!sessions_.empty()) - continue; - if (state_ == State::kShutDown) { - state_ = State::kDone; - } else { - state_ = State::kAccepting; - } - break; - case InspectorAction::kSendMessage: - auto session = sessions_.find(id); - if (session != sessions_.end() && session->second) { - session->second->Dispatch(message); - } - break; - } - } - } while (had_messages); - dispatching_messages_ = false; -} - -// static -void InspectorIo::MainThreadReqAsyncCb(uv_async_t* req) { - AsyncAndAgent* pair = node::ContainerOf(&AsyncAndAgent::first, req); - // Note that this may be called after io was closed or even after a new - // one was created and ran. - InspectorIo* io = pair->second->io(); - if (io != nullptr) - io->DispatchMessages(); -} - -void InspectorIo::Write(TransportAction action, int session_id, - const StringView& inspector_message) { - std::string message_str = - protocol::StringUtil::StringViewToUtf8(inspector_message); - Debug(parent_env_, DebugCategory::INSPECTOR_SERVER, - "<<< %s\n", message_str.c_str()); - AppendMessage(&outgoing_message_queue_, action, session_id, - StringBuffer::create(inspector_message)); - int err = uv_async_send(&thread_req_); - CHECK_EQ(0, err); -} - -bool InspectorIo::WaitForFrontendEvent() { - // We allow DispatchMessages reentry as we enter the pause. This is important - // to support debugging the code invoked by an inspector call, such - // as Runtime.evaluate - dispatching_messages_ = false; - Mutex::ScopedLock scoped_lock(state_lock_); - if (sessions_.empty()) - return false; - if (dispatching_message_queue_.empty() && incoming_message_queue_.empty()) { - incoming_message_cond_.Wait(scoped_lock); - } - return true; -} - -InspectorIoDelegate::InspectorIoDelegate(InspectorIo* io, - const std::string& target_id, - const std::string& script_path, - const std::string& script_name, - bool wait) - : io_(io), - session_id_(0), - script_name_(script_name), - script_path_(script_path), - target_id_(target_id), - waiting_(wait), - server_(nullptr) { } - +InspectorIoDelegate::InspectorIoDelegate( + std::shared_ptr<RequestQueueData> queue, + std::shared_ptr<MainThreadHandle> main_thread, + const std::string& target_id, + const std::string& script_path, + const std::string& script_name) + : request_queue_(queue), main_thread_(main_thread), + script_name_(script_name), script_path_(script_path), + target_id_(target_id) {} void InspectorIoDelegate::StartSession(int session_id, const std::string& target_id) { - session_id_ = session_id; - InspectorAction action = InspectorAction::kStartSession; - if (waiting_) { - action = InspectorAction::kStartSessionUnconditionally; - server_->AcceptSession(session_id); + auto session = main_thread_->Connect( + std::unique_ptr<InspectorSessionDelegate>( + new IoSessionDelegate(request_queue_->handle(), session_id)), true); + if (session) { + sessions_[session_id] = std::move(session); + fprintf(stderr, "Debugger attached.\n"); } - io_->PostIncomingMessage(action, session_id, ""); } void InspectorIoDelegate::MessageReceived(int session_id, const std::string& message) { - // TODO(pfeldman): Instead of blocking execution while debugger - // engages, node should wait for the run callback from the remote client - // and initiate its startup. This is a change to node.cc that should be - // upstreamed separately. - if (waiting_) { - if (message.find("\"Runtime.runIfWaitingForDebugger\"") != - std::string::npos) { - waiting_ = false; - io_->ResumeStartup(); - } - } - io_->PostIncomingMessage(InspectorAction::kSendMessage, session_id, - message); + auto session = sessions_.find(session_id); + if (session != sessions_.end()) + session->second->Dispatch(Utf8ToStringView(message)->string()); } void InspectorIoDelegate::EndSession(int session_id) { - io_->PostIncomingMessage(InspectorAction::kEndSession, session_id, ""); + sessions_.erase(session_id); } std::vector<std::string> InspectorIoDelegate::GetTargetIds() { @@ -479,10 +351,17 @@ std::string InspectorIoDelegate::GetTargetUrl(const std::string& id) { return "file://" + script_path_; } -void IoSessionDelegate::SendMessageToFrontend( - const v8_inspector::StringView& message) { - io_->Write(TransportAction::kSendMessage, id_, message); +// static +void RequestQueueData::CloseAndFree(RequestQueueData* queue) { + queue->handle_->Reset(); + queue->handle_.reset(); + uv_close(reinterpret_cast<uv_handle_t*>(&queue->async_), + [](uv_handle_t* handle) { + uv_async_t* async = reinterpret_cast<uv_async_t*>(handle); + RequestQueueData* wrapper = + node::ContainerOf(&RequestQueueData::async_, async); + delete wrapper; + }); } - } // namespace inspector } // namespace node |