// Copyright 2014 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "mojo/edk/system/channel_endpoint.h" #include "base/logging.h" #include "mojo/edk/system/channel.h" #include "mojo/edk/system/message_pipe.h" #include "mojo/edk/system/transport_data.h" namespace mojo { namespace system { ChannelEndpoint::ChannelEndpoint(MessagePipe* message_pipe, unsigned port, MessageInTransitQueue* message_queue) : message_pipe_(message_pipe), port_(port), channel_(nullptr) { DCHECK(message_pipe_.get() || message_queue); DCHECK(port_ == 0 || port_ == 1); if (message_queue) paused_message_queue_.Swap(message_queue); } bool ChannelEndpoint::EnqueueMessage(scoped_ptr message) { DCHECK(message); base::AutoLock locker(lock_); if (!channel_ || !remote_id_.is_valid()) { // We may reach here if we haven't been attached or run yet. // TODO(vtl): We may also reach here if the channel is shut down early for // some reason (with live message pipes on it). We can't check |state_| yet, // until it's protected under lock, but in this case we should return false // (and not enqueue any messages). paused_message_queue_.AddMessage(message.Pass()); return true; } // TODO(vtl): Currently, this only works in the "running" case. DCHECK(remote_id_.is_valid()); return WriteMessageNoLock(message.Pass()); } void ChannelEndpoint::DetachFromMessagePipe() { { base::AutoLock locker(lock_); DCHECK(message_pipe_.get()); message_pipe_ = nullptr; if (!channel_) return; DCHECK(local_id_.is_valid()); // TODO(vtl): Once we combine "run" into "attach", |remote_id_| should valid // here as well. channel_->DetachEndpoint(this, local_id_, remote_id_); channel_ = nullptr; local_id_ = ChannelEndpointId(); remote_id_ = ChannelEndpointId(); } } void ChannelEndpoint::AttachAndRun(Channel* channel, ChannelEndpointId local_id, ChannelEndpointId remote_id) { DCHECK(channel); DCHECK(local_id.is_valid()); DCHECK(remote_id.is_valid()); base::AutoLock locker(lock_); DCHECK(!channel_); DCHECK(!local_id_.is_valid()); DCHECK(!remote_id_.is_valid()); channel_ = channel; local_id_ = local_id; remote_id_ = remote_id; while (!paused_message_queue_.IsEmpty()) { LOG_IF(WARNING, !WriteMessageNoLock(paused_message_queue_.GetMessage())) << "Failed to write enqueue message to channel"; } if (!message_pipe_.get()) { channel_->DetachEndpoint(this, local_id_, remote_id_); channel_ = nullptr; local_id_ = ChannelEndpointId(); remote_id_ = ChannelEndpointId(); } } bool ChannelEndpoint::OnReadMessage( const MessageInTransit::View& message_view, embedder::ScopedPlatformHandleVectorPtr platform_handles) { scoped_ptr message(new MessageInTransit(message_view)); scoped_refptr message_pipe; unsigned port; { base::AutoLock locker(lock_); DCHECK(channel_); if (!message_pipe_.get()) { // This isn't a failure per se. (It just means that, e.g., the other end // of the message point closed first.) return true; } if (message_view.transport_data_buffer_size() > 0) { DCHECK(message_view.transport_data_buffer()); message->SetDispatchers(TransportData::DeserializeDispatchers( message_view.transport_data_buffer(), message_view.transport_data_buffer_size(), platform_handles.Pass(), channel_)); } // Take a ref, and call |EnqueueMessage()| outside the lock. message_pipe = message_pipe_; port = port_; } MojoResult result = message_pipe->EnqueueMessage( MessagePipe::GetPeerPort(port), message.Pass()); return (result == MOJO_RESULT_OK); } void ChannelEndpoint::OnDisconnect() { scoped_refptr message_pipe; unsigned port; { base::AutoLock locker(lock_); if (!message_pipe_.get()) return; // Take a ref, and call |Close()| outside the lock. message_pipe = message_pipe_; port = port_; } message_pipe->Close(port); } void ChannelEndpoint::DetachFromChannel() { base::AutoLock locker(lock_); // This may already be null if we already detached from the channel in // |DetachFromMessagePipe()| by calling |Channel::DetachEndpoint()| (and there // are racing detaches). if (!channel_) return; DCHECK(local_id_.is_valid()); // TODO(vtl): Once we combine "run" into "attach", |remote_id_| should valid // here as well. channel_ = nullptr; local_id_ = ChannelEndpointId(); remote_id_ = ChannelEndpointId(); } ChannelEndpoint::~ChannelEndpoint() { DCHECK(!message_pipe_.get()); DCHECK(!channel_); DCHECK(!local_id_.is_valid()); DCHECK(!remote_id_.is_valid()); } bool ChannelEndpoint::WriteMessageNoLock(scoped_ptr message) { DCHECK(message); lock_.AssertAcquired(); DCHECK(channel_); DCHECK(local_id_.is_valid()); DCHECK(remote_id_.is_valid()); message->SerializeAndCloseDispatchers(channel_); message->set_source_id(local_id_); message->set_destination_id(remote_id_); return channel_->WriteMessage(message.Pass()); } } // namespace system } // namespace mojo