diff options
Diffstat (limited to 'chromium/fuchsia/cast_streaming')
-rw-r--r-- | chromium/fuchsia/cast_streaming/BUILD.gn | 9 | ||||
-rw-r--r-- | chromium/fuchsia/cast_streaming/DEPS | 2 | ||||
-rw-r--r-- | chromium/fuchsia/cast_streaming/cast_streaming.cc | 24 | ||||
-rw-r--r-- | chromium/fuchsia/cast_streaming/cast_streaming_session.cc | 123 | ||||
-rw-r--r-- | chromium/fuchsia/cast_streaming/public/cast_streaming.h | 25 | ||||
-rw-r--r-- | chromium/fuchsia/cast_streaming/public/cast_streaming_session.h | 53 | ||||
-rw-r--r-- | chromium/fuchsia/cast_streaming/stream_consumer.cc | 148 | ||||
-rw-r--r-- | chromium/fuchsia/cast_streaming/stream_consumer.h | 46 |
8 files changed, 351 insertions, 79 deletions
diff --git a/chromium/fuchsia/cast_streaming/BUILD.gn b/chromium/fuchsia/cast_streaming/BUILD.gn index c691165e93d..94d567cc610 100644 --- a/chromium/fuchsia/cast_streaming/BUILD.gn +++ b/chromium/fuchsia/cast_streaming/BUILD.gn @@ -11,16 +11,23 @@ source_set("cast_streaming") { "//components/openscreen_platform:openscreen_platform_network_service", "//fuchsia/base", "//media", + "//media/mojo/common", + "//media/mojo/mojom", + "//mojo/public/cpp/system", "//third_party/fuchsia-sdk/sdk/fidl/fuchsia.web", "//third_party/openscreen/src/cast/streaming:receiver", "//third_party/openscreen/src/platform:api", "//third_party/openscreen/src/util", ] visibility = [ "//fuchsia/engine/*" ] - public = [ "public/cast_streaming_session.h" ] + public = [ + "public/cast_streaming.h", + "public/cast_streaming_session.h", + ] sources = [ "cast_message_port_impl.cc", "cast_message_port_impl.h", + "cast_streaming.cc", "cast_streaming_session.cc", "stream_consumer.cc", "stream_consumer.h", diff --git a/chromium/fuchsia/cast_streaming/DEPS b/chromium/fuchsia/cast_streaming/DEPS index 13d521158c3..5208c523d56 100644 --- a/chromium/fuchsia/cast_streaming/DEPS +++ b/chromium/fuchsia/cast_streaming/DEPS @@ -1,5 +1,7 @@ include_rules = [ "+components/openscreen_platform", "+media/base", + "+media/mojo", + "+mojo/public", "+third_party/openscreen/src", ] diff --git a/chromium/fuchsia/cast_streaming/cast_streaming.cc b/chromium/fuchsia/cast_streaming/cast_streaming.cc new file mode 100644 index 00000000000..2044900b05b --- /dev/null +++ b/chromium/fuchsia/cast_streaming/cast_streaming.cc @@ -0,0 +1,24 @@ +// Copyright 2020 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "fuchsia/cast_streaming/public/cast_streaming.h" + +#include "base/strings/string_piece.h" + +namespace cast_streaming { + +bool IsCastStreamingAppOrigin(base::StringPiece origin) { + constexpr char kCastStreamingMessagePortOrigin[] = "cast-streaming:receiver"; + return origin == kCastStreamingMessagePortOrigin; +} + +bool IsValidCastStreamingMessage(const fuchsia::web::WebMessage& message) { + // |message| should contain exactly one OutgoingTransferrable, with a single + // MessagePort. + return message.has_outgoing_transfer() && + message.outgoing_transfer().size() == 1u && + message.outgoing_transfer()[0].is_message_port(); +} + +} // namespace cast_streaming diff --git a/chromium/fuchsia/cast_streaming/cast_streaming_session.cc b/chromium/fuchsia/cast_streaming/cast_streaming_session.cc index 71f931737bc..565df0defae 100644 --- a/chromium/fuchsia/cast_streaming/cast_streaming_session.cc +++ b/chromium/fuchsia/cast_streaming/cast_streaming_session.cc @@ -8,11 +8,14 @@ #include "base/bind.h" #include "base/notreached.h" +#include "components/openscreen_platform/network_context.h" #include "components/openscreen_platform/network_util.h" #include "components/openscreen_platform/task_runner.h" #include "fuchsia/cast_streaming/cast_message_port_impl.h" #include "fuchsia/cast_streaming/stream_consumer.h" #include "media/base/media_util.h" +#include "media/mojo/common/mojo_decoder_buffer_converter.h" +#include "mojo/public/cpp/system/data_pipe.h" #include "third_party/openscreen/src/cast/streaming/receiver.h" #include "third_party/openscreen/src/cast/streaming/receiver_session.h" @@ -27,6 +30,12 @@ constexpr char kVideoCodecVp8[] = "vp8"; namespace cast_streaming { +// static +void CastStreamingSession::SetNetworkContextGetter( + NetworkContextGetter getter) { + openscreen_platform::SetNetworkContextGetter(std::move(getter)); +} + // Owns the Open Screen ReceiverSession. The Cast Streaming Session is tied to // the lifespan of this object. class CastStreamingSession::Internal @@ -37,12 +46,9 @@ class CastStreamingSession::Internal fidl::InterfaceRequest<fuchsia::web::MessagePort> message_port_request, scoped_refptr<base::SequencedTaskRunner> task_runner) : task_runner_(task_runner), - environment_(&openscreen::Clock::now, - &task_runner_, - openscreen::IPEndpoint{ - openscreen::IPAddress(0, 0, 0, 0, 0, 0, 0, 0), 0}), + environment_(&openscreen::Clock::now, &task_runner_), cast_message_port_impl_(std::move(message_port_request)), - // TODO(crbug.com/1042501): Add streaming session Constraints and + // TODO(crbug.com/1087520): Add streaming session Constraints and // DisplayDescription. receiver_session_( this, @@ -73,14 +79,31 @@ class CastStreamingSession::Internal DVLOG(1) << __func__; DCHECK_EQ(session, &receiver_session_); - base::Optional<media::AudioDecoderConfig> audio_decoder_config; + base::Optional<AudioStreamInfo> audio_stream_info; if (receivers.audio) { + // Creare the audio data pipe. + const MojoCreateDataPipeOptions data_pipe_options{ + sizeof(MojoCreateDataPipeOptions), MOJO_CREATE_DATA_PIPE_FLAG_NONE, + 1u /* element_num_bytes */, + media::GetDefaultDecoderBufferConverterCapacity( + media::DemuxerStream::Type::AUDIO)}; + mojo::ScopedDataPipeProducerHandle data_pipe_producer; + mojo::ScopedDataPipeConsumerHandle data_pipe_consumer; + MojoResult result = mojo::CreateDataPipe( + &data_pipe_options, &data_pipe_producer, &data_pipe_consumer); + if (result != MOJO_RESULT_OK) { + client_->OnInitializationFailure(); + return; + } + + // Initialize the audio consumer. audio_consumer_ = std::make_unique<StreamConsumer>( - receivers.audio->receiver, + receivers.audio->receiver, std::move(data_pipe_producer), base::BindRepeating( - &CastStreamingSession::Client::OnAudioFrameReceived, + &CastStreamingSession::Client::OnAudioBufferReceived, base::Unretained(client_))); + // Gather data for the audio decoder config. media::ChannelLayout channel_layout = media::GuessChannelLayout(receivers.audio->receiver_config.channels); const std::string& audio_codec = @@ -89,22 +112,42 @@ class CastStreamingSession::Internal media::StringToAudioCodec(audio_codec); int samples_per_second = receivers.audio->receiver_config.rtp_timebase; - audio_decoder_config.emplace(media::AudioDecoderConfig( - media_audio_codec, media::SampleFormat::kSampleFormatF32, - channel_layout, samples_per_second, media::EmptyExtraData(), - media::EncryptionScheme::kUnencrypted)); + audio_stream_info.emplace(AudioStreamInfo{ + media::AudioDecoderConfig( + media_audio_codec, media::SampleFormat::kSampleFormatF32, + channel_layout, samples_per_second, media::EmptyExtraData(), + media::EncryptionScheme::kUnencrypted), + std::move(data_pipe_consumer)}); - DVLOG(1) << "Initialized audio stream using " << audio_codec << " codec."; + DVLOG(1) << "Initialized audio stream. " + << audio_stream_info->decoder_config.AsHumanReadableString(); } - base::Optional<media::VideoDecoderConfig> video_decoder_config; + base::Optional<VideoStreamInfo> video_stream_info; if (receivers.video) { + // Creare the video data pipe. + const MojoCreateDataPipeOptions data_pipe_options{ + sizeof(MojoCreateDataPipeOptions), MOJO_CREATE_DATA_PIPE_FLAG_NONE, + 1u /* element_num_bytes */, + media::GetDefaultDecoderBufferConverterCapacity( + media::DemuxerStream::Type::VIDEO)}; + mojo::ScopedDataPipeProducerHandle data_pipe_producer; + mojo::ScopedDataPipeConsumerHandle data_pipe_consumer; + MojoResult result = mojo::CreateDataPipe( + &data_pipe_options, &data_pipe_producer, &data_pipe_consumer); + if (result != MOJO_RESULT_OK) { + client_->OnInitializationFailure(); + return; + } + + // Initialize the video consumer. video_consumer_ = std::make_unique<StreamConsumer>( - receivers.video->receiver, + receivers.video->receiver, std::move(data_pipe_producer), base::BindRepeating( - &CastStreamingSession::Client::OnVideoFrameReceived, + &CastStreamingSession::Client::OnVideoBufferReceived, base::Unretained(client_))); + // Gather data for the video decoder config. const std::string& video_codec = receivers.video->selected_stream.stream.codec_name; uint32_t video_width = @@ -114,36 +157,39 @@ class CastStreamingSession::Internal gfx::Size video_size(video_width, video_height); gfx::Rect video_rect(video_width, video_height); + media::VideoCodec media_video_codec = + media::VideoCodec::kUnknownVideoCodec; + media::VideoCodecProfile video_codec_profile = + media::VideoCodecProfile::VIDEO_CODEC_PROFILE_UNKNOWN; + if (video_codec == kVideoCodecH264) { - video_decoder_config.emplace(media::VideoDecoderConfig( - media::VideoCodec::kCodecH264, - media::VideoCodecProfile::H264PROFILE_BASELINE, - media::VideoDecoderConfig::AlphaMode::kIsOpaque, - media::VideoColorSpace(), media::VideoTransformation(), video_size, - video_rect, video_size, media::EmptyExtraData(), - media::EncryptionScheme::kUnencrypted)); + media_video_codec = media::VideoCodec::kCodecH264; + video_codec_profile = media::VideoCodecProfile::H264PROFILE_BASELINE; } else if (video_codec == kVideoCodecVp8) { - video_decoder_config.emplace(media::VideoDecoderConfig( - media::VideoCodec::kCodecVP8, - media::VideoCodecProfile::VP8PROFILE_MIN, - media::VideoDecoderConfig::AlphaMode::kIsOpaque, - media::VideoColorSpace(), media::VideoTransformation(), video_size, - video_rect, video_size, media::EmptyExtraData(), - media::EncryptionScheme::kUnencrypted)); + media_video_codec = media::VideoCodec::kCodecVP8; + video_codec_profile = media::VideoCodecProfile::VP8PROFILE_MIN; } else { NOTREACHED(); } - DVLOG(1) << "Initialized video stream of " << video_width << "x" - << video_height << " resolution using " << video_codec - << " codec."; + video_stream_info.emplace(VideoStreamInfo{ + media::VideoDecoderConfig( + media_video_codec, video_codec_profile, + media::VideoDecoderConfig::AlphaMode::kIsOpaque, + media::VideoColorSpace(), media::VideoTransformation(), + video_size, video_rect, video_size, media::EmptyExtraData(), + media::EncryptionScheme::kUnencrypted), + std::move(data_pipe_consumer)}); + + DVLOG(1) << "Initialized video stream. " + << video_stream_info->decoder_config.AsHumanReadableString(); } - if (!video_decoder_config && !audio_decoder_config) { + if (!audio_stream_info && !video_stream_info) { client_->OnInitializationFailure(); } else { - client_->OnInitializationSuccess(std::move(audio_decoder_config), - std::move(video_decoder_config)); + client_->OnInitializationSuccess(std::move(audio_stream_info), + std::move(video_stream_info)); } initialized_called_ = true; } @@ -192,4 +238,9 @@ void CastStreamingSession::Start( client, std::move(message_port_request), task_runner); } +void CastStreamingSession::Stop() { + DCHECK(internal_); + internal_.reset(); +} + } // namespace cast_streaming diff --git a/chromium/fuchsia/cast_streaming/public/cast_streaming.h b/chromium/fuchsia/cast_streaming/public/cast_streaming.h new file mode 100644 index 00000000000..0838317b6ac --- /dev/null +++ b/chromium/fuchsia/cast_streaming/public/cast_streaming.h @@ -0,0 +1,25 @@ +// Copyright 2020 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef FUCHSIA_CAST_STREAMING_PUBLIC_CAST_STREAMING_H_ +#define FUCHSIA_CAST_STREAMING_PUBLIC_CAST_STREAMING_H_ + +#include <fuchsia/web/cpp/fidl.h> + +#include "base/strings/string_piece_forward.h" + +namespace cast_streaming { + +// TODO(crbug.com/1082821): Remove this file once the Cast Streaming Receiver is +// implemented as a separate component from WebEngine. + +// Returns true if |origin| is the Cast Streaming MessagePort origin. +bool IsCastStreamingAppOrigin(base::StringPiece origin); + +// Returns true if |message| contains a valid Cast Streaming Message. +bool IsValidCastStreamingMessage(const fuchsia::web::WebMessage& message); + +} // namespace cast_streaming + +#endif // FUCHSIA_CAST_STREAMING_PUBLIC_CAST_STREAMING_H_ diff --git a/chromium/fuchsia/cast_streaming/public/cast_streaming_session.h b/chromium/fuchsia/cast_streaming/public/cast_streaming_session.h index 6769ba1b49f..bb6703989c7 100644 --- a/chromium/fuchsia/cast_streaming/public/cast_streaming_session.h +++ b/chromium/fuchsia/cast_streaming/public/cast_streaming_session.h @@ -9,11 +9,19 @@ #include <fuchsia/web/cpp/fidl.h> +#include "base/callback.h" #include "base/optional.h" #include "base/sequenced_task_runner.h" #include "media/base/audio_decoder_config.h" -#include "media/base/decoder_buffer.h" #include "media/base/video_decoder_config.h" +#include "media/mojo/mojom/media_types.mojom.h" +#include "mojo/public/cpp/system/data_pipe.h" + +namespace network { +namespace mojom { +class NetworkContext; +} // namespace mojom +} // namespace network namespace cast_streaming { @@ -21,25 +29,44 @@ namespace cast_streaming { // Cast Streaming Session for a provided FIDL MessagePort request. class CastStreamingSession { public: + using NetworkContextGetter = + base::RepeatingCallback<network::mojom::NetworkContext*()>; + + // Sets the NetworkContextGetter. This must be called before any call to + // Start() and must only be called once. If the NetworkContext crashes, any + // existing Cast Streaming Session will eventually terminate and call + // OnReceiverSessionEnded(). + static void SetNetworkContextGetter(NetworkContextGetter getter); + + template <class T> + struct StreamInfo { + T decoder_config; + mojo::ScopedDataPipeConsumerHandle data_pipe; + }; + using AudioStreamInfo = StreamInfo<media::AudioDecoderConfig>; + using VideoStreamInfo = StreamInfo<media::VideoDecoderConfig>; + class Client { public: // Called when the Cast Streaming Session has been successfully initialized. - // It is guaranteed that at least one of |audio_decoder_config| or - // |video_decoder_config| will be set. + // It is guaranteed that at least one of |audio_stream_info| or + // |video_stream_info| will be set. virtual void OnInitializationSuccess( - base::Optional<media::AudioDecoderConfig> audio_decoder_config, - base::Optional<media::VideoDecoderConfig> video_decoder_config) = 0; + base::Optional<AudioStreamInfo> audio_stream_info, + base::Optional<VideoStreamInfo> video_stream_info) = 0; // Called when the Cast Stream Session failed to initialize. virtual void OnInitializationFailure() = 0; - // Called on every new audio frame after OnInitializationSuccess(). - virtual void OnAudioFrameReceived( - scoped_refptr<media::DecoderBuffer> buffer) = 0; + // Called on every new audio buffer after OnInitializationSuccess(). The + // frame data must be accessed via the |data_pipe| property in StreamInfo. + virtual void OnAudioBufferReceived( + media::mojom::DecoderBufferPtr buffer) = 0; - // Called on every new video frame after OnInitializationSuccess(). - virtual void OnVideoFrameReceived( - scoped_refptr<media::DecoderBuffer> buffer) = 0; + // Called on every new video buffer after OnInitializationSuccess(). The + // frame data must be accessed via the |data_pipe| property in StreamInfo. + virtual void OnVideoBufferReceived( + media::mojom::DecoderBufferPtr buffer) = 0; // Called when the Cast Streaming Session has ended. virtual void OnReceiverSessionEnded() = 0; @@ -66,6 +93,10 @@ class CastStreamingSession { fidl::InterfaceRequest<fuchsia::web::MessagePort> message_port_request, scoped_refptr<base::SequencedTaskRunner> task_runner); + // Stops the Cast Streaming Session. This can only be called once during the + // lifespan of this object and only after a call to Start(). + void Stop(); + private: class Internal; std::unique_ptr<Internal> internal_; diff --git a/chromium/fuchsia/cast_streaming/stream_consumer.cc b/chromium/fuchsia/cast_streaming/stream_consumer.cc index bcfb8a4d6b1..353ac52119a 100644 --- a/chromium/fuchsia/cast_streaming/stream_consumer.cc +++ b/chromium/fuchsia/cast_streaming/stream_consumer.cc @@ -5,27 +5,132 @@ #include "fuchsia/cast_streaming/stream_consumer.h" #include "base/logging.h" +#include "base/time/time.h" +#include "media/base/media_util.h" namespace cast_streaming { StreamConsumer::StreamConsumer(openscreen::cast::Receiver* receiver, + mojo::ScopedDataPipeProducerHandle data_pipe, FrameReceivedCB frame_received_cb) - : receiver_(receiver), frame_received_cb_(std::move(frame_received_cb)) { + : receiver_(receiver), + data_pipe_(std::move(data_pipe)), + frame_received_cb_(std::move(frame_received_cb)), + pipe_watcher_(FROM_HERE, + mojo::SimpleWatcher::ArmingPolicy::MANUAL, + base::SequencedTaskRunnerHandle::Get()) { DCHECK(receiver_); receiver_->SetConsumer(this); + MojoResult result = + pipe_watcher_.Watch(data_pipe_.get(), MOJO_HANDLE_SIGNAL_WRITABLE, + base::BindRepeating(&StreamConsumer::OnPipeWritable, + base::Unretained(this))); + if (result != MOJO_RESULT_OK) { + CloseDataPipeOnError(); + } } StreamConsumer::~StreamConsumer() { receiver_->SetConsumer(nullptr); } +void StreamConsumer::CloseDataPipeOnError() { + DLOG(WARNING) << "[ssrc:" << receiver_->ssrc() << "] Data pipe closed."; + receiver_->SetConsumer(nullptr); + pipe_watcher_.Cancel(); + data_pipe_.reset(); +} + +void StreamConsumer::OnPipeWritable(MojoResult result) { + DCHECK(data_pipe_); + + if (result != MOJO_RESULT_OK) { + CloseDataPipeOnError(); + return; + } + + uint32_t bytes_written = pending_buffer_remaining_bytes_; + result = data_pipe_->WriteData(pending_buffer_ + pending_buffer_offset_, + &bytes_written, MOJO_WRITE_DATA_FLAG_NONE); + if (result != MOJO_RESULT_OK) { + CloseDataPipeOnError(); + return; + } + + pending_buffer_offset_ += bytes_written; + pending_buffer_remaining_bytes_ -= bytes_written; + if (pending_buffer_remaining_bytes_ != 0) { + pipe_watcher_.ArmOrNotify(); + return; + } + + // Advance to the next frame if a new one is ready. + int next_frame_buffer_size = receiver_->AdvanceToNextFrame(); + if (next_frame_buffer_size != openscreen::cast::Receiver::kNoFramesReady) + OnFramesReady(next_frame_buffer_size); +} + void StreamConsumer::OnFramesReady(int next_frame_buffer_size) { - // TODO(crbug.com/1042501): Do not allocate a buffer for every new frame, use - // a buffer pool. - std::unique_ptr<uint8_t[]> buffer = - std::make_unique<uint8_t[]>(next_frame_buffer_size); - openscreen::cast::EncodedFrame encoded_frame = receiver_->ConsumeNextFrame( - absl::Span<uint8_t>(buffer.get(), next_frame_buffer_size)); + DCHECK(data_pipe_); + + if (pending_buffer_remaining_bytes_ != 0) { + // There already is a pending frame. Ignore this one for now. + return; + } + + void* buffer = nullptr; + uint32_t buffer_size = next_frame_buffer_size; + uint32_t mojo_buffer_size = next_frame_buffer_size; + + if (buffer_size > kMaxFrameSize) { + LOG(ERROR) << "[ssrc:" << receiver_->ssrc() << "] " + << "Frame size too big: " << buffer_size; + CloseDataPipeOnError(); + return; + } + + MojoResult result = data_pipe_->BeginWriteData( + &buffer, &mojo_buffer_size, MOJO_BEGIN_WRITE_DATA_FLAG_NONE); + + if (result == MOJO_RESULT_SHOULD_WAIT) { + pipe_watcher_.ArmOrNotify(); + return; + } + + if (result != MOJO_RESULT_OK) { + CloseDataPipeOnError(); + return; + } + + openscreen::cast::EncodedFrame encoded_frame; + size_t bytes_written = 0; + + if (mojo_buffer_size < buffer_size) { + DVLOG(2) << "[ssrc:" << receiver_->ssrc() << "] " + << "Mojo data pipe full"; + + // The |data_pipe_| buffer cannot take the full frame, write to + // |pending_buffer_| instead. + encoded_frame = receiver_->ConsumeNextFrame( + absl::Span<uint8_t>(pending_buffer_, buffer_size)); + + // Write as much as we can to the |data_pipe_| buffer. + memcpy(buffer, pending_buffer_, mojo_buffer_size); + pending_buffer_offset_ = mojo_buffer_size; + pending_buffer_remaining_bytes_ = buffer_size - mojo_buffer_size; + bytes_written = mojo_buffer_size; + } else { + // Write directly to the |data_pipe_| buffer. + encoded_frame = receiver_->ConsumeNextFrame( + absl::Span<uint8_t>(static_cast<uint8_t*>(buffer), buffer_size)); + bytes_written = buffer_size; + } + + result = data_pipe_->EndWriteData(bytes_written); + if (result != MOJO_RESULT_OK) { + CloseDataPipeOnError(); + return; + } const bool is_key_frame = encoded_frame.dependency == @@ -37,26 +142,21 @@ void StreamConsumer::OnFramesReady(int next_frame_buffer_size) { receiver_->rtp_timebase()) .count()); - DVLOG(3) << "[ssrc:" << receiver_->ssrc() - << "] Received new frame. Timestamp: " << playout_time + DVLOG(3) << "[ssrc:" << receiver_->ssrc() << "] " + << "Received new frame. Timestamp: " << playout_time << ", is_key_frame: " << is_key_frame; - if (playout_time <= last_playout_time_) { - // TODO(b/156129097): We sometimes receive identical playout time for two - // frames in a row. - DVLOG(2) << "[ssrc:" << receiver_->ssrc() - << "] Droped frame due to identical playout time."; - return; - } - last_playout_time_ = playout_time; + frame_received_cb_.Run(media::mojom::DecoderBuffer::New( + playout_time /* timestamp */, base::TimeDelta() /* duration */, + false /* is_end_of_stream */, buffer_size, is_key_frame, + media::EmptyExtraData(), media::mojom::DecryptConfigPtr(), + base::TimeDelta() /* front_discard */, + base::TimeDelta() /* back_discard */ + )); - scoped_refptr<media::DecoderBuffer> decoder_buffer = - media::DecoderBuffer::FromArray(std::move(buffer), - next_frame_buffer_size); - decoder_buffer->set_is_key_frame(is_key_frame); - decoder_buffer->set_timestamp(playout_time); - - frame_received_cb_.Run(decoder_buffer); + if (pending_buffer_remaining_bytes_ != 0) { + pipe_watcher_.ArmOrNotify(); + } } } // namespace cast_streaming diff --git a/chromium/fuchsia/cast_streaming/stream_consumer.h b/chromium/fuchsia/cast_streaming/stream_consumer.h index 4605d158bde..eef5f14b1a7 100644 --- a/chromium/fuchsia/cast_streaming/stream_consumer.h +++ b/chromium/fuchsia/cast_streaming/stream_consumer.h @@ -8,23 +8,33 @@ #include <fuchsia/media/cpp/fidl.h> #include "base/callback.h" -#include "base/time/time.h" -#include "media/base/decoder_buffer.h" +#include "media/mojo/mojom/media_types.mojom.h" +#include "mojo/public/cpp/system/data_pipe.h" +#include "mojo/public/cpp/system/simple_watcher.h" #include "third_party/openscreen/src/cast/streaming/receiver.h" #include "third_party/openscreen/src/cast/streaming/receiver_session.h" namespace cast_streaming { -// Attaches to an Open Screen Receiver to receive frames and invokes -// |frame_received_cb_| with each received buffer of encoded data. +// Attaches to an Open Screen Receiver to receive buffers of encoded data and +// invokes |frame_received_cb_| with each buffer. +// +// Internally, this class writes buffers of encoded data directly to +// |data_pipe_| rather than using a helper class like MojoDecoderBufferWriter. +// This allows us to use |data_pipe_| as an end-to-end buffer to cap memory +// usage. Receiving new buffers is delayed until the pipe has free memory again. +// The Open Screen library takes care of discarding buffers that are too old and +// requesting new key frames as needed. class StreamConsumer : public openscreen::cast::Receiver::Consumer { public: using FrameReceivedCB = - base::RepeatingCallback<void(scoped_refptr<media::DecoderBuffer>)>; + base::RepeatingCallback<void(media::mojom::DecoderBufferPtr)>; // |receiver| sends frames to this object. It must outlive this object. - // |frame_received_cb| is called on every new frame. + // |frame_received_cb| is called on every new frame, after a new frame has + // been written to |data_pipe|. On error, |data_pipe| will be closed. StreamConsumer(openscreen::cast::Receiver* receiver, + mojo::ScopedDataPipeProducerHandle data_pipe, FrameReceivedCB frame_received_cb); ~StreamConsumer() final; @@ -32,12 +42,34 @@ class StreamConsumer : public openscreen::cast::Receiver::Consumer { StreamConsumer& operator=(const StreamConsumer&) = delete; private: + // Maximum frame size that OnFramesReady() can accept. + static constexpr uint32_t kMaxFrameSize = 512 * 1024; + + // Closes |data_pipe_| and resets the Consumer in |receiver_|. No frames will + // be received after this call. + void CloseDataPipeOnError(); + + // Callback when |data_pipe_| can be written to again after it was full. + void OnPipeWritable(MojoResult result); + // openscreen::cast::Receiver::Consumer implementation. void OnFramesReady(int next_frame_buffer_size) final; openscreen::cast::Receiver* const receiver_; + mojo::ScopedDataPipeProducerHandle data_pipe_; const FrameReceivedCB frame_received_cb_; - base::TimeDelta last_playout_time_; + + // Provides notifications about |data_pipe_| readiness. + mojo::SimpleWatcher pipe_watcher_; + + // Buffer used when |data_pipe_| is too full to accept the next frame size. + uint8_t pending_buffer_[kMaxFrameSize]; + + // Current offset for data |pending_buffer_| to be written to |data_pipe_|. + size_t pending_buffer_offset_ = 0; + + // Remaining bytes to write from |pending_buffer_| to |data_pipe_|. + size_t pending_buffer_remaining_bytes_ = 0; }; } // namespace cast_streaming |