// Copyright 2016 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "mojo/edk/system/node_controller.h" #include #include #include "base/bind.h" #include "base/location.h" #include "base/logging.h" #include "base/macros.h" #include "base/message_loop/message_loop.h" #include "base/metrics/histogram_macros.h" #include "base/process/process_handle.h" #include "base/timer/elapsed_timer.h" #include "crypto/random.h" #include "mojo/edk/embedder/embedder_internal.h" #include "mojo/edk/embedder/platform_channel_pair.h" #include "mojo/edk/system/broker.h" #include "mojo/edk/system/broker_host.h" #include "mojo/edk/system/core.h" #include "mojo/edk/system/ports_message.h" #if defined(OS_MACOSX) && !defined(OS_IOS) #include "mojo/edk/system/mach_port_relay.h" #endif namespace mojo { namespace edk { namespace { template void GenerateRandomName(T* out) { crypto::RandBytes(out, sizeof(T)); } ports::NodeName GetRandomNodeName() { ports::NodeName name; GenerateRandomName(&name); return name; } void RecordPeerCount(size_t count) { DCHECK_LE(count, static_cast(std::numeric_limits::max())); // 8k is the maximum number of file descriptors allowed in Chrome. UMA_HISTOGRAM_CUSTOM_COUNTS("Mojo.System.Node.ConnectedPeers", static_cast(count), 0 /* min */, 8000 /* max */, 50 /* bucket count */); } void RecordPendingChildCount(size_t count) { DCHECK_LE(count, static_cast(std::numeric_limits::max())); // 8k is the maximum number of file descriptors allowed in Chrome. UMA_HISTOGRAM_CUSTOM_COUNTS("Mojo.System.Node.PendingChildren", static_cast(count), 0 /* min */, 8000 /* max */, 50 /* bucket count */); } // Used by NodeController to watch for shutdown. Since no IO can happen once // the IO thread is killed, the NodeController can cleanly drop all its peers // at that time. class ThreadDestructionObserver : public base::MessageLoop::DestructionObserver { public: static void Create(scoped_refptr task_runner, const base::Closure& callback) { if (task_runner->RunsTasksOnCurrentThread()) { // Owns itself. new ThreadDestructionObserver(callback); } else { task_runner->PostTask(FROM_HERE, base::Bind(&Create, task_runner, callback)); } } private: explicit ThreadDestructionObserver(const base::Closure& callback) : callback_(callback) { base::MessageLoop::current()->AddDestructionObserver(this); } ~ThreadDestructionObserver() override { base::MessageLoop::current()->RemoveDestructionObserver(this); } // base::MessageLoop::DestructionObserver: void WillDestroyCurrentMessageLoop() override { callback_.Run(); delete this; } const base::Closure callback_; DISALLOW_COPY_AND_ASSIGN(ThreadDestructionObserver); }; } // namespace NodeController::~NodeController() {} NodeController::NodeController(Core* core) : core_(core), name_(GetRandomNodeName()), node_(new ports::Node(name_, this)) { DVLOG(1) << "Initializing node " << name_; } #if defined(OS_MACOSX) && !defined(OS_IOS) void NodeController::CreateMachPortRelay( base::PortProvider* port_provider) { base::AutoLock lock(mach_port_relay_lock_); DCHECK(!mach_port_relay_); mach_port_relay_.reset(new MachPortRelay(port_provider)); } #endif void NodeController::SetIOTaskRunner( scoped_refptr task_runner) { io_task_runner_ = task_runner; ThreadDestructionObserver::Create( io_task_runner_, base::Bind(&NodeController::DropAllPeers, base::Unretained(this))); } void NodeController::ConnectToChild(base::ProcessHandle process_handle, ScopedPlatformHandle platform_handle) { io_task_runner_->PostTask( FROM_HERE, base::Bind(&NodeController::ConnectToChildOnIOThread, base::Unretained(this), process_handle, base::Passed(&platform_handle))); } void NodeController::ConnectToParent(ScopedPlatformHandle platform_handle) { // TODO(amistry): Consider the need for a broker on Windows. #if defined(OS_POSIX) && !defined(OS_MACOSX) // On posix, use the bootstrap channel for the broker and receive the node's // channel synchronously as the first message from the broker. base::ElapsedTimer timer; broker_.reset(new Broker(std::move(platform_handle))); platform_handle = broker_->GetParentPlatformHandle(); UMA_HISTOGRAM_TIMES("Mojo.System.GetParentPlatformHandleSyncTime", timer.Elapsed()); #endif io_task_runner_->PostTask( FROM_HERE, base::Bind(&NodeController::ConnectToParentOnIOThread, base::Unretained(this), base::Passed(&platform_handle))); } void NodeController::SetPortObserver( const ports::PortRef& port, const scoped_refptr& observer) { node_->SetUserData(port, observer); } void NodeController::ClosePort(const ports::PortRef& port) { SetPortObserver(port, nullptr); int rv = node_->ClosePort(port); DCHECK_EQ(rv, ports::OK) << " Failed to close port: " << port.name(); AcceptIncomingMessages(); } int NodeController::SendMessage(const ports::PortRef& port, scoped_ptr* message) { ports::ScopedMessage ports_message(message->release()); int rv = node_->SendMessage(port, &ports_message); if (rv != ports::OK) { DCHECK(ports_message); message->reset(static_cast(ports_message.release())); } AcceptIncomingMessages(); return rv; } void NodeController::ReservePort(const std::string& token, const ports::PortRef& port) { DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token " << token; base::AutoLock lock(reserved_ports_lock_); auto result = reserved_ports_.insert(std::make_pair(token, port)); DCHECK(result.second); } void NodeController::MergePortIntoParent(const std::string& token, const ports::PortRef& port) { { // This request may be coming from within the process that reserved the // "parent" side (e.g. for Chrome single-process mode), so if this token is // reserved locally, merge locally instead. base::AutoLock lock(reserved_ports_lock_); auto it = reserved_ports_.find(token); if (it != reserved_ports_.end()) { node_->MergePorts(port, name_, it->second.name()); reserved_ports_.erase(it); return; } } scoped_refptr parent; { // Hold |pending_port_merges_lock_| while getting |parent|. Otherwise, // there is a race where the parent can be set, and |pending_port_merges_| // be processed between retrieving |parent| and adding the merge to // |pending_port_merges_|. base::AutoLock lock(pending_port_merges_lock_); parent = GetParentChannel(); if (!parent) { pending_port_merges_.push_back(std::make_pair(token, port)); return; } } parent->RequestPortMerge(port.name(), token); } int NodeController::MergeLocalPorts(const ports::PortRef& port0, const ports::PortRef& port1) { int rv = node_->MergeLocalPorts(port0, port1); AcceptIncomingMessages(); return rv; } scoped_refptr NodeController::CreateSharedBuffer( size_t num_bytes) { #if defined(OS_POSIX) && !defined(OS_MACOSX) // Shared buffer creation failure is fatal, so always use the broker when we // have one. This does mean that a non-root process that has children will use // the broker for shared buffer creation even though that process is // privileged. if (broker_) { return broker_->GetSharedBuffer(num_bytes); } #endif return PlatformSharedBuffer::Create(num_bytes); } void NodeController::RequestShutdown(const base::Closure& callback) { { base::AutoLock lock(shutdown_lock_); shutdown_callback_ = callback; } AttemptShutdownIfRequested(); } void NodeController::ConnectToChildOnIOThread( base::ProcessHandle process_handle, ScopedPlatformHandle platform_handle) { DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); #if defined(OS_POSIX) && !defined(OS_MACOSX) PlatformChannelPair node_channel; // BrokerHost owns itself. BrokerHost* broker_host = new BrokerHost(std::move(platform_handle)); broker_host->SendChannel(node_channel.PassClientHandle()); scoped_refptr channel = NodeChannel::Create( this, node_channel.PassServerHandle(), io_task_runner_); #else scoped_refptr channel = NodeChannel::Create(this, std::move(platform_handle), io_task_runner_); #endif // We set up the child channel with a temporary name so it can be identified // as a pending child if it writes any messages to the channel. We may start // receiving messages from it (though we shouldn't) as soon as Start() is // called below. ports::NodeName token; GenerateRandomName(&token); pending_children_.insert(std::make_pair(token, channel)); RecordPendingChildCount(pending_children_.size()); channel->SetRemoteNodeName(token); channel->SetRemoteProcessHandle(process_handle); channel->Start(); channel->AcceptChild(name_, token); } void NodeController::ConnectToParentOnIOThread( ScopedPlatformHandle platform_handle) { DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); { base::AutoLock lock(parent_lock_); DCHECK(parent_name_ == ports::kInvalidNodeName); // At this point we don't know the parent's name, so we can't yet insert it // into our |peers_| map. That will happen as soon as we receive an // AcceptChild message from them. bootstrap_parent_channel_ = NodeChannel::Create(this, std::move(platform_handle), io_task_runner_); } bootstrap_parent_channel_->Start(); } scoped_refptr NodeController::GetPeerChannel( const ports::NodeName& name) { base::AutoLock lock(peers_lock_); auto it = peers_.find(name); if (it == peers_.end()) return nullptr; return it->second; } scoped_refptr NodeController::GetParentChannel() { ports::NodeName parent_name; { base::AutoLock lock(parent_lock_); parent_name = parent_name_; } return GetPeerChannel(parent_name); } scoped_refptr NodeController::GetBrokerChannel() { ports::NodeName broker_name; { base::AutoLock lock(broker_lock_); broker_name = broker_name_; } return GetPeerChannel(broker_name); } void NodeController::AddPeer(const ports::NodeName& name, scoped_refptr channel, bool start_channel) { DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); DCHECK(name != ports::kInvalidNodeName); DCHECK(channel); channel->SetRemoteNodeName(name); OutgoingMessageQueue pending_messages; { base::AutoLock lock(peers_lock_); if (peers_.find(name) != peers_.end()) { // This can happen normally if two nodes race to be introduced to each // other. The losing pipe will be silently closed and introduction should // not be affected. DVLOG(1) << "Ignoring duplicate peer name " << name; return; } auto result = peers_.insert(std::make_pair(name, channel)); DCHECK(result.second); DVLOG(2) << "Accepting new peer " << name << " on node " << name_; RecordPeerCount(peers_.size()); auto it = pending_peer_messages_.find(name); if (it != pending_peer_messages_.end()) { std::swap(pending_messages, it->second); pending_peer_messages_.erase(it); } } if (start_channel) channel->Start(); // Flush any queued message we need to deliver to this node. while (!pending_messages.empty()) { channel->PortsMessage(std::move(pending_messages.front())); pending_messages.pop(); } } void NodeController::DropPeer(const ports::NodeName& name) { DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); { base::AutoLock lock(peers_lock_); auto it = peers_.find(name); if (it != peers_.end()) { ports::NodeName peer = it->first; peers_.erase(it); DVLOG(1) << "Dropped peer " << peer; } pending_peer_messages_.erase(name); pending_children_.erase(name); RecordPeerCount(peers_.size()); RecordPendingChildCount(pending_children_.size()); } node_->LostConnectionToNode(name); } void NodeController::SendPeerMessage(const ports::NodeName& name, ports::ScopedMessage message) { Channel::MessagePtr channel_message = static_cast(message.get())->TakeChannelMessage(); scoped_refptr peer = GetPeerChannel(name); #if defined(OS_WIN) if (channel_message->has_handles()) { // If we're sending a message with handles we aren't the destination // node's parent or broker (i.e. we don't know its process handle), ask // the broker to relay for us. scoped_refptr broker = GetBrokerChannel(); if (!peer || !peer->HasRemoteProcessHandle()) { if (broker) { broker->RelayPortsMessage(name, std::move(channel_message)); } else { base::AutoLock lock(broker_lock_); pending_relay_messages_[name].emplace(std::move(channel_message)); } return; } } #elif defined(OS_MACOSX) && !defined(OS_IOS) if (channel_message->has_mach_ports()) { // Messages containing Mach ports are always routed through the broker, even // if the broker process is the intended recipient. bool use_broker = false; { base::AutoLock lock(parent_lock_); use_broker = (bootstrap_parent_channel_ || parent_name_ != ports::kInvalidNodeName); } if (use_broker) { scoped_refptr broker = GetBrokerChannel(); if (broker) { broker->RelayPortsMessage(name, std::move(channel_message)); } else { base::AutoLock lock(broker_lock_); pending_relay_messages_[name].emplace(std::move(channel_message)); } return; } } #endif // defined(OS_WIN) if (peer) { peer->PortsMessage(std::move(channel_message)); return; } // If we don't know who the peer is, queue the message for delivery. If this // is the first message queued for the peer, we also ask the broker to // introduce us to them. bool needs_introduction = false; { base::AutoLock lock(peers_lock_); auto& queue = pending_peer_messages_[name]; needs_introduction = queue.empty(); queue.emplace(std::move(channel_message)); } if (needs_introduction) { scoped_refptr broker = GetBrokerChannel(); if (!broker) { DVLOG(1) << "Dropping message for unknown peer: " << name; return; } broker->RequestIntroduction(name); } } void NodeController::AcceptIncomingMessages() { for (;;) { // TODO: We may need to be more careful to avoid starving the rest of the // thread here. Revisit this if it turns out to be a problem. One // alternative would be to schedule a task to continue pumping messages // after flushing once. messages_lock_.Acquire(); if (incoming_messages_.empty()) { messages_lock_.Release(); break; } // libstdc++'s deque creates an internal buffer on construction, even when // the size is 0. So avoid creating it until it is necessary. std::queue messages; std::swap(messages, incoming_messages_); messages_lock_.Release(); while (!messages.empty()) { node_->AcceptMessage(std::move(messages.front())); messages.pop(); } } AttemptShutdownIfRequested(); } void NodeController::DropAllPeers() { DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); std::vector> all_peers; { base::AutoLock lock(parent_lock_); if (bootstrap_parent_channel_) { // |bootstrap_parent_channel_| isn't null'd here becuase we rely on its // existence to determine whether or not this is the root node. Once // bootstrap_parent_channel_->ShutDown() has been called, // |bootstrap_parent_channel_| is essentially a dead object and it doesn't // matter if it's deleted now or when |this| is deleted. // Note: |bootstrap_parent_channel_| is only modified on the IO thread. all_peers.push_back(bootstrap_parent_channel_); } } { base::AutoLock lock(peers_lock_); for (const auto& peer : peers_) all_peers.push_back(peer.second); for (const auto& peer : pending_children_) all_peers.push_back(peer.second); peers_.clear(); pending_children_.clear(); pending_peer_messages_.clear(); } for (const auto& peer : all_peers) peer->ShutDown(); if (destroy_on_io_thread_shutdown_) delete this; } void NodeController::GenerateRandomPortName(ports::PortName* port_name) { GenerateRandomName(port_name); } void NodeController::AllocMessage(size_t num_header_bytes, ports::ScopedMessage* message) { message->reset(new PortsMessage(num_header_bytes, 0, 0, nullptr)); } void NodeController::ForwardMessage(const ports::NodeName& node, ports::ScopedMessage message) { if (node == name_) { // NOTE: We need to avoid re-entering the Node instance within // ForwardMessage. Because ForwardMessage is only ever called // (synchronously) in response to Node's ClosePort, SendMessage, or // AcceptMessage, we flush the queue after calling any of those methods. base::AutoLock lock(messages_lock_); incoming_messages_.emplace(std::move(message)); } else { SendPeerMessage(node, std::move(message)); } } void NodeController::PortStatusChanged(const ports::PortRef& port) { scoped_refptr user_data; node_->GetUserData(port, &user_data); PortObserver* observer = static_cast(user_data.get()); if (observer) { observer->OnPortStatusChanged(); } else { DVLOG(2) << "Ignoring status change for " << port.name() << " because it " << "doesn't have an observer."; } } void NodeController::OnAcceptChild(const ports::NodeName& from_node, const ports::NodeName& parent_name, const ports::NodeName& token) { DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); scoped_refptr parent; { base::AutoLock lock(parent_lock_); if (!bootstrap_parent_channel_ || parent_name_ != ports::kInvalidNodeName) { DLOG(ERROR) << "Unexpected AcceptChild message from " << from_node; DropPeer(from_node); return; } parent_name_ = parent_name; parent = bootstrap_parent_channel_; } parent->SetRemoteNodeName(parent_name); parent->AcceptParent(token, name_); // NOTE: The child does not actually add its parent as a peer until // receiving an AcceptBrokerClient message from the broker. The parent // will request that said message be sent upon receiving AcceptParent. DVLOG(1) << "Child " << name_ << " accepting parent " << parent_name; } void NodeController::OnAcceptParent(const ports::NodeName& from_node, const ports::NodeName& token, const ports::NodeName& child_name) { DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); auto it = pending_children_.find(from_node); if (it == pending_children_.end() || token != from_node) { DLOG(ERROR) << "Received unexpected AcceptParent message from " << from_node; DropPeer(from_node); return; } scoped_refptr channel = it->second; pending_children_.erase(it); DCHECK(channel); DVLOG(1) << "Parent " << name_ << " accepted child " << child_name; AddPeer(child_name, channel, false /* start_channel */); // TODO(rockot/amistry): We could simplify child initialization if we could // synchronously get a new async broker channel from the broker. For now we do // it asynchronously since it's only used to facilitate handle passing, not // handle creation. scoped_refptr broker = GetBrokerChannel(); if (broker) { // Inform the broker of this new child. broker->AddBrokerClient(child_name, channel->CopyRemoteProcessHandle()); } else { // If we have no broker, either we need to wait for one, or we *are* the // broker. scoped_refptr parent = GetParentChannel(); if (!parent) { base::AutoLock lock(parent_lock_); parent = bootstrap_parent_channel_; } if (!parent) { // Yes, we're the broker. We can initialize the child directly. channel->AcceptBrokerClient(name_, ScopedPlatformHandle()); } else { // We aren't the broker, so wait for a broker connection. base::AutoLock lock(broker_lock_); pending_broker_clients_.push(child_name); } } } void NodeController::OnAddBrokerClient(const ports::NodeName& from_node, const ports::NodeName& client_name, base::ProcessHandle process_handle) { #if defined(OS_WIN) // Scoped handle to avoid leaks on error. ScopedPlatformHandle scoped_process_handle = ScopedPlatformHandle(PlatformHandle(process_handle)); #endif scoped_refptr sender = GetPeerChannel(from_node); if (!sender) { DLOG(ERROR) << "Ignoring AddBrokerClient from unknown sender."; return; } if (GetPeerChannel(client_name)) { DLOG(ERROR) << "Ignoring AddBrokerClient for known client."; DropPeer(from_node); return; } PlatformChannelPair broker_channel; scoped_refptr client = NodeChannel::Create( this, broker_channel.PassServerHandle(), io_task_runner_); #if defined(OS_WIN) // The broker must have a working handle to the client process in order to // properly copy other handles to and from the client. if (!scoped_process_handle.is_valid()) { DLOG(ERROR) << "Broker rejecting client with invalid process handle."; return; } client->SetRemoteProcessHandle(scoped_process_handle.release().handle); #else client->SetRemoteProcessHandle(process_handle); #endif AddPeer(client_name, client, true /* start_channel */); DVLOG(1) << "Broker " << name_ << " accepting client " << client_name << " from peer " << from_node; sender->BrokerClientAdded(client_name, broker_channel.PassClientHandle()); } void NodeController::OnBrokerClientAdded(const ports::NodeName& from_node, const ports::NodeName& client_name, ScopedPlatformHandle broker_channel) { scoped_refptr client = GetPeerChannel(client_name); if (!client) { DLOG(ERROR) << "BrokerClientAdded for unknown child " << client_name; return; } // This should have come from our own broker. if (GetBrokerChannel() != GetPeerChannel(from_node)) { DLOG(ERROR) << "BrokerClientAdded from non-broker node " << from_node; return; } DVLOG(1) << "Child " << client_name << " accepted by broker " << from_node; client->AcceptBrokerClient(from_node, std::move(broker_channel)); } void NodeController::OnAcceptBrokerClient(const ports::NodeName& from_node, const ports::NodeName& broker_name, ScopedPlatformHandle broker_channel) { // This node should already have a parent in bootstrap mode. ports::NodeName parent_name; scoped_refptr parent; { base::AutoLock lock(parent_lock_); parent_name = parent_name_; parent = bootstrap_parent_channel_; bootstrap_parent_channel_ = nullptr; } DCHECK(parent_name == from_node); DCHECK(parent); std::queue pending_broker_clients; std::unordered_map pending_relay_messages; { base::AutoLock lock(broker_lock_); broker_name_ = broker_name; std::swap(pending_broker_clients, pending_broker_clients_); std::swap(pending_relay_messages, pending_relay_messages_); } DCHECK(broker_name != ports::kInvalidNodeName); // It's now possible to add both the broker and the parent as peers. // Note that the broker and parent may be the same node. scoped_refptr broker; if (broker_name == parent_name) { DCHECK(!broker_channel.is_valid()); broker = parent; } else { DCHECK(broker_channel.is_valid()); broker = NodeChannel::Create(this, std::move(broker_channel), io_task_runner_); AddPeer(broker_name, broker, true /* start_channel */); } AddPeer(parent_name, parent, false /* start_channel */); { // Complete any port merge requests we have waiting for the parent. base::AutoLock lock(pending_port_merges_lock_); for (const auto& request : pending_port_merges_) parent->RequestPortMerge(request.second.name(), request.first); pending_port_merges_.clear(); } // Feed the broker any pending children of our own. while (!pending_broker_clients.empty()) { const ports::NodeName& child_name = pending_broker_clients.front(); auto it = pending_children_.find(child_name); DCHECK(it != pending_children_.end()); broker->AddBrokerClient(child_name, it->second->CopyRemoteProcessHandle()); pending_broker_clients.pop(); } #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) // Have the broker relay any messages we have waiting. for (auto& entry : pending_relay_messages) { const ports::NodeName& destination = entry.first; auto& message_queue = entry.second; while (!message_queue.empty()) { broker->RelayPortsMessage(destination, std::move(message_queue.front())); message_queue.pop(); } } #endif DVLOG(1) << "Child " << name_ << " accepted by broker " << broker_name; } void NodeController::OnPortsMessage(const ports::NodeName& from_node, Channel::MessagePtr channel_message) { DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); void* data; size_t num_data_bytes; NodeChannel::GetPortsMessageData( channel_message.get(), &data, &num_data_bytes); if (!num_data_bytes) { DropPeer(from_node); return; } size_t num_header_bytes, num_payload_bytes, num_ports_bytes; if (!ports::Message::Parse(data, num_data_bytes, &num_header_bytes, &num_payload_bytes, &num_ports_bytes)) { DropPeer(from_node); return; } CHECK(channel_message); ports::ScopedMessage message( new PortsMessage(num_header_bytes, num_payload_bytes, num_ports_bytes, std::move(channel_message))); node_->AcceptMessage(std::move(message)); AcceptIncomingMessages(); } void NodeController::OnRequestPortMerge( const ports::NodeName& from_node, const ports::PortName& connector_port_name, const std::string& token) { DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); DVLOG(2) << "Node " << name_ << " received RequestPortMerge for token " << token << " and port " << connector_port_name << "@" << from_node; ports::PortRef local_port; { base::AutoLock lock(reserved_ports_lock_); auto it = reserved_ports_.find(token); if (it == reserved_ports_.end()) { DVLOG(1) << "Ignoring request to connect to port for unknown token " << token; return; } local_port = it->second; } int rv = node_->MergePorts(local_port, from_node, connector_port_name); if (rv != ports::OK) DLOG(ERROR) << "MergePorts failed: " << rv; } void NodeController::OnRequestIntroduction(const ports::NodeName& from_node, const ports::NodeName& name) { DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); scoped_refptr requestor = GetPeerChannel(from_node); if (from_node == name || name == ports::kInvalidNodeName || !requestor) { DLOG(ERROR) << "Rejecting invalid OnRequestIntroduction message from " << from_node; DropPeer(from_node); return; } scoped_refptr new_friend = GetPeerChannel(name); if (!new_friend) { // We don't know who they're talking about! requestor->Introduce(name, ScopedPlatformHandle()); } else { PlatformChannelPair new_channel; requestor->Introduce(name, new_channel.PassServerHandle()); new_friend->Introduce(from_node, new_channel.PassClientHandle()); } } void NodeController::OnIntroduce(const ports::NodeName& from_node, const ports::NodeName& name, ScopedPlatformHandle channel_handle) { DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); if (!channel_handle.is_valid()) { DLOG(ERROR) << "Could not be introduced to peer " << name; base::AutoLock lock(peers_lock_); pending_peer_messages_.erase(name); return; } scoped_refptr channel = NodeChannel::Create(this, std::move(channel_handle), io_task_runner_); DVLOG(1) << "Adding new peer " << name << " via parent introduction."; AddPeer(name, channel, true /* start_channel */); } #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node, base::ProcessHandle from_process, const ports::NodeName& destination, Channel::MessagePtr message) { DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); if (GetBrokerChannel()) { // Only the broker should be asked to relay a message. LOG(ERROR) << "Non-broker refusing to relay message."; DropPeer(from_node); return; } // The parent should always know which process this came from. DCHECK(from_process != base::kNullProcessHandle); #if defined(OS_WIN) // Rewrite the handles to this (the parent) process. If the message is // destined for another child process, the handles will be rewritten to that // process before going out (see NodeChannel::WriteChannelMessage). // // TODO: We could avoid double-duplication. if (!Channel::Message::RewriteHandles(from_process, base::GetCurrentProcessHandle(), message->handles(), message->num_handles())) { DLOG(ERROR) << "Failed to relay one or more handles."; } #else MachPortRelay* relay = GetMachPortRelay(); if (!relay) { LOG(ERROR) << "Receiving Mach ports without a port relay from " << from_node << ". Dropping message."; return; } if (!relay->ExtractPortRights(message.get(), from_process)) { // NodeChannel should ensure that MachPortRelay is ready for the remote // process. At this point, if the port extraction failed, either something // went wrong in the mach stuff, or the remote process died. LOG(ERROR) << "Error on receiving Mach ports " << from_node << ". Dropping message."; return; } #endif // defined(OS_WIN) if (destination == name_) { // Great, we can deliver this message locally. OnPortsMessage(from_node, std::move(message)); return; } scoped_refptr peer = GetPeerChannel(destination); if (peer) peer->PortsMessage(std::move(message)); else DLOG(ERROR) << "Dropping relay message for unknown node " << destination; } #endif void NodeController::OnChannelError(const ports::NodeName& from_node) { if (io_task_runner_->RunsTasksOnCurrentThread()) { DropPeer(from_node); } else { io_task_runner_->PostTask( FROM_HERE, base::Bind(&NodeController::DropPeer, base::Unretained(this), from_node)); } } #if defined(OS_MACOSX) && !defined(OS_IOS) MachPortRelay* NodeController::GetMachPortRelay() { { base::AutoLock lock(parent_lock_); // Return null if we're not the root. if (bootstrap_parent_channel_ || parent_name_ != ports::kInvalidNodeName) return nullptr; } base::AutoLock lock(mach_port_relay_lock_); return mach_port_relay_.get(); } #endif void NodeController::DestroyOnIOThreadShutdown() { destroy_on_io_thread_shutdown_ = true; } void NodeController::AttemptShutdownIfRequested() { base::Closure callback; { base::AutoLock lock(shutdown_lock_); if (shutdown_callback_.is_null()) return; if (!node_->CanShutdownCleanly(true /* allow_local_ports */)) { DVLOG(2) << "Unable to cleanly shut down node " << name_ << "."; return; } callback = shutdown_callback_; shutdown_callback_.Reset(); } DCHECK(!callback.is_null()); callback.Run(); } } // namespace edk } // namespace mojo