diff options
Diffstat (limited to 'chromium/jingle/glue/channel_socket_adapter.cc')
-rw-r--r-- | chromium/jingle/glue/channel_socket_adapter.cc | 194 |
1 files changed, 194 insertions, 0 deletions
diff --git a/chromium/jingle/glue/channel_socket_adapter.cc b/chromium/jingle/glue/channel_socket_adapter.cc new file mode 100644 index 00000000000..0f68d502c73 --- /dev/null +++ b/chromium/jingle/glue/channel_socket_adapter.cc @@ -0,0 +1,194 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "jingle/glue/channel_socket_adapter.h" + +#include <limits> + +#include "base/callback.h" +#include "base/logging.h" +#include "base/message_loop/message_loop.h" +#include "net/base/io_buffer.h" +#include "net/base/net_errors.h" +#include "third_party/libjingle/source/talk/p2p/base/transportchannel.h" + +namespace jingle_glue { + +TransportChannelSocketAdapter::TransportChannelSocketAdapter( + cricket::TransportChannel* channel) + : message_loop_(base::MessageLoop::current()), + channel_(channel), + closed_error_code_(net::OK) { + DCHECK(channel_); + + channel_->SignalReadPacket.connect( + this, &TransportChannelSocketAdapter::OnNewPacket); + channel_->SignalWritableState.connect( + this, &TransportChannelSocketAdapter::OnWritableState); + channel_->SignalDestroyed.connect( + this, &TransportChannelSocketAdapter::OnChannelDestroyed); +} + +TransportChannelSocketAdapter::~TransportChannelSocketAdapter() { + if (!destruction_callback_.is_null()) + destruction_callback_.Run(); +} + +void TransportChannelSocketAdapter::SetOnDestroyedCallback( + const base::Closure& callback) { + destruction_callback_ = callback; +} + +int TransportChannelSocketAdapter::Read( + net::IOBuffer* buf, + int buffer_size, + const net::CompletionCallback& callback) { + DCHECK_EQ(base::MessageLoop::current(), message_loop_); + DCHECK(buf); + DCHECK(!callback.is_null()); + CHECK(read_callback_.is_null()); + + if (!channel_) { + DCHECK(closed_error_code_ != net::OK); + return closed_error_code_; + } + + read_callback_ = callback; + read_buffer_ = buf; + read_buffer_size_ = buffer_size; + + return net::ERR_IO_PENDING; +} + +int TransportChannelSocketAdapter::Write( + net::IOBuffer* buffer, + int buffer_size, + const net::CompletionCallback& callback) { + DCHECK_EQ(base::MessageLoop::current(), message_loop_); + DCHECK(buffer); + DCHECK(!callback.is_null()); + CHECK(write_callback_.is_null()); + + if (!channel_) { + DCHECK(closed_error_code_ != net::OK); + return closed_error_code_; + } + + int result; + if (channel_->writable()) { + result = channel_->SendPacket(buffer->data(), buffer_size); + if (result < 0) { + result = net::MapSystemError(channel_->GetError()); + + // If the underlying socket returns IO pending where it shouldn't we + // pretend the packet is dropped and return as succeeded because no + // writeable callback will happen. + if (result == net::ERR_IO_PENDING) + result = net::OK; + } + } else { + // Channel is not writable yet. + result = net::ERR_IO_PENDING; + write_callback_ = callback; + write_buffer_ = buffer; + write_buffer_size_ = buffer_size; + } + + return result; +} + +bool TransportChannelSocketAdapter::SetReceiveBufferSize(int32 size) { + DCHECK_EQ(base::MessageLoop::current(), message_loop_); + return channel_->SetOption(talk_base::Socket::OPT_RCVBUF, size) == 0; +} + +bool TransportChannelSocketAdapter::SetSendBufferSize(int32 size) { + DCHECK_EQ(base::MessageLoop::current(), message_loop_); + return channel_->SetOption(talk_base::Socket::OPT_SNDBUF, size) == 0; +} + +void TransportChannelSocketAdapter::Close(int error_code) { + DCHECK_EQ(base::MessageLoop::current(), message_loop_); + + if (!channel_) // Already closed. + return; + + DCHECK(error_code != net::OK); + closed_error_code_ = error_code; + channel_->SignalReadPacket.disconnect(this); + channel_->SignalDestroyed.disconnect(this); + channel_ = NULL; + + if (!read_callback_.is_null()) { + net::CompletionCallback callback = read_callback_; + read_callback_.Reset(); + read_buffer_ = NULL; + callback.Run(error_code); + } + + if (!write_callback_.is_null()) { + net::CompletionCallback callback = write_callback_; + write_callback_.Reset(); + write_buffer_ = NULL; + callback.Run(error_code); + } +} + +void TransportChannelSocketAdapter::OnNewPacket( + cricket::TransportChannel* channel, + const char* data, + size_t data_size, + int flags) { + DCHECK_EQ(base::MessageLoop::current(), message_loop_); + DCHECK_EQ(channel, channel_); + if (!read_callback_.is_null()) { + DCHECK(read_buffer_.get()); + CHECK_LT(data_size, static_cast<size_t>(std::numeric_limits<int>::max())); + + if (read_buffer_size_ < static_cast<int>(data_size)) { + LOG(WARNING) << "Data buffer is smaller than the received packet. " + << "Dropping the data that doesn't fit."; + data_size = read_buffer_size_; + } + + memcpy(read_buffer_->data(), data, data_size); + + net::CompletionCallback callback = read_callback_; + read_callback_.Reset(); + read_buffer_ = NULL; + + callback.Run(data_size); + } else { + LOG(WARNING) + << "Data was received without a callback. Dropping the packet."; + } +} + +void TransportChannelSocketAdapter::OnWritableState( + cricket::TransportChannel* channel) { + DCHECK_EQ(base::MessageLoop::current(), message_loop_); + // Try to send the packet if there is a pending write. + if (!write_callback_.is_null()) { + int result = channel_->SendPacket(write_buffer_->data(), + write_buffer_size_); + if (result < 0) + result = net::MapSystemError(channel_->GetError()); + + if (result != net::ERR_IO_PENDING) { + net::CompletionCallback callback = write_callback_; + write_callback_.Reset(); + write_buffer_ = NULL; + callback.Run(result); + } + } +} + +void TransportChannelSocketAdapter::OnChannelDestroyed( + cricket::TransportChannel* channel) { + DCHECK_EQ(base::MessageLoop::current(), message_loop_); + DCHECK_EQ(channel, channel_); + Close(net::ERR_CONNECTION_ABORTED); +} + +} // namespace jingle_glue |