diff options
Diffstat (limited to 'chromium/fuchsia/cast_streaming/stream_consumer.cc')
-rw-r--r-- | chromium/fuchsia/cast_streaming/stream_consumer.cc | 148 |
1 files changed, 124 insertions, 24 deletions
diff --git a/chromium/fuchsia/cast_streaming/stream_consumer.cc b/chromium/fuchsia/cast_streaming/stream_consumer.cc index bcfb8a4d6b1..353ac52119a 100644 --- a/chromium/fuchsia/cast_streaming/stream_consumer.cc +++ b/chromium/fuchsia/cast_streaming/stream_consumer.cc @@ -5,27 +5,132 @@ #include "fuchsia/cast_streaming/stream_consumer.h" #include "base/logging.h" +#include "base/time/time.h" +#include "media/base/media_util.h" namespace cast_streaming { StreamConsumer::StreamConsumer(openscreen::cast::Receiver* receiver, + mojo::ScopedDataPipeProducerHandle data_pipe, FrameReceivedCB frame_received_cb) - : receiver_(receiver), frame_received_cb_(std::move(frame_received_cb)) { + : receiver_(receiver), + data_pipe_(std::move(data_pipe)), + frame_received_cb_(std::move(frame_received_cb)), + pipe_watcher_(FROM_HERE, + mojo::SimpleWatcher::ArmingPolicy::MANUAL, + base::SequencedTaskRunnerHandle::Get()) { DCHECK(receiver_); receiver_->SetConsumer(this); + MojoResult result = + pipe_watcher_.Watch(data_pipe_.get(), MOJO_HANDLE_SIGNAL_WRITABLE, + base::BindRepeating(&StreamConsumer::OnPipeWritable, + base::Unretained(this))); + if (result != MOJO_RESULT_OK) { + CloseDataPipeOnError(); + } } StreamConsumer::~StreamConsumer() { receiver_->SetConsumer(nullptr); } +void StreamConsumer::CloseDataPipeOnError() { + DLOG(WARNING) << "[ssrc:" << receiver_->ssrc() << "] Data pipe closed."; + receiver_->SetConsumer(nullptr); + pipe_watcher_.Cancel(); + data_pipe_.reset(); +} + +void StreamConsumer::OnPipeWritable(MojoResult result) { + DCHECK(data_pipe_); + + if (result != MOJO_RESULT_OK) { + CloseDataPipeOnError(); + return; + } + + uint32_t bytes_written = pending_buffer_remaining_bytes_; + result = data_pipe_->WriteData(pending_buffer_ + pending_buffer_offset_, + &bytes_written, MOJO_WRITE_DATA_FLAG_NONE); + if (result != MOJO_RESULT_OK) { + CloseDataPipeOnError(); + return; + } + + pending_buffer_offset_ += bytes_written; + pending_buffer_remaining_bytes_ -= bytes_written; + if (pending_buffer_remaining_bytes_ != 0) { + pipe_watcher_.ArmOrNotify(); + return; + } + + // Advance to the next frame if a new one is ready. + int next_frame_buffer_size = receiver_->AdvanceToNextFrame(); + if (next_frame_buffer_size != openscreen::cast::Receiver::kNoFramesReady) + OnFramesReady(next_frame_buffer_size); +} + void StreamConsumer::OnFramesReady(int next_frame_buffer_size) { - // TODO(crbug.com/1042501): Do not allocate a buffer for every new frame, use - // a buffer pool. - std::unique_ptr<uint8_t[]> buffer = - std::make_unique<uint8_t[]>(next_frame_buffer_size); - openscreen::cast::EncodedFrame encoded_frame = receiver_->ConsumeNextFrame( - absl::Span<uint8_t>(buffer.get(), next_frame_buffer_size)); + DCHECK(data_pipe_); + + if (pending_buffer_remaining_bytes_ != 0) { + // There already is a pending frame. Ignore this one for now. + return; + } + + void* buffer = nullptr; + uint32_t buffer_size = next_frame_buffer_size; + uint32_t mojo_buffer_size = next_frame_buffer_size; + + if (buffer_size > kMaxFrameSize) { + LOG(ERROR) << "[ssrc:" << receiver_->ssrc() << "] " + << "Frame size too big: " << buffer_size; + CloseDataPipeOnError(); + return; + } + + MojoResult result = data_pipe_->BeginWriteData( + &buffer, &mojo_buffer_size, MOJO_BEGIN_WRITE_DATA_FLAG_NONE); + + if (result == MOJO_RESULT_SHOULD_WAIT) { + pipe_watcher_.ArmOrNotify(); + return; + } + + if (result != MOJO_RESULT_OK) { + CloseDataPipeOnError(); + return; + } + + openscreen::cast::EncodedFrame encoded_frame; + size_t bytes_written = 0; + + if (mojo_buffer_size < buffer_size) { + DVLOG(2) << "[ssrc:" << receiver_->ssrc() << "] " + << "Mojo data pipe full"; + + // The |data_pipe_| buffer cannot take the full frame, write to + // |pending_buffer_| instead. + encoded_frame = receiver_->ConsumeNextFrame( + absl::Span<uint8_t>(pending_buffer_, buffer_size)); + + // Write as much as we can to the |data_pipe_| buffer. + memcpy(buffer, pending_buffer_, mojo_buffer_size); + pending_buffer_offset_ = mojo_buffer_size; + pending_buffer_remaining_bytes_ = buffer_size - mojo_buffer_size; + bytes_written = mojo_buffer_size; + } else { + // Write directly to the |data_pipe_| buffer. + encoded_frame = receiver_->ConsumeNextFrame( + absl::Span<uint8_t>(static_cast<uint8_t*>(buffer), buffer_size)); + bytes_written = buffer_size; + } + + result = data_pipe_->EndWriteData(bytes_written); + if (result != MOJO_RESULT_OK) { + CloseDataPipeOnError(); + return; + } const bool is_key_frame = encoded_frame.dependency == @@ -37,26 +142,21 @@ void StreamConsumer::OnFramesReady(int next_frame_buffer_size) { receiver_->rtp_timebase()) .count()); - DVLOG(3) << "[ssrc:" << receiver_->ssrc() - << "] Received new frame. Timestamp: " << playout_time + DVLOG(3) << "[ssrc:" << receiver_->ssrc() << "] " + << "Received new frame. Timestamp: " << playout_time << ", is_key_frame: " << is_key_frame; - if (playout_time <= last_playout_time_) { - // TODO(b/156129097): We sometimes receive identical playout time for two - // frames in a row. - DVLOG(2) << "[ssrc:" << receiver_->ssrc() - << "] Droped frame due to identical playout time."; - return; - } - last_playout_time_ = playout_time; + frame_received_cb_.Run(media::mojom::DecoderBuffer::New( + playout_time /* timestamp */, base::TimeDelta() /* duration */, + false /* is_end_of_stream */, buffer_size, is_key_frame, + media::EmptyExtraData(), media::mojom::DecryptConfigPtr(), + base::TimeDelta() /* front_discard */, + base::TimeDelta() /* back_discard */ + )); - scoped_refptr<media::DecoderBuffer> decoder_buffer = - media::DecoderBuffer::FromArray(std::move(buffer), - next_frame_buffer_size); - decoder_buffer->set_is_key_frame(is_key_frame); - decoder_buffer->set_timestamp(playout_time); - - frame_received_cb_.Run(decoder_buffer); + if (pending_buffer_remaining_bytes_ != 0) { + pipe_watcher_.ArmOrNotify(); + } } } // namespace cast_streaming |