summaryrefslogtreecommitdiff
path: root/chromium/fuchsia/cast_streaming
diff options
context:
space:
mode:
Diffstat (limited to 'chromium/fuchsia/cast_streaming')
-rw-r--r--chromium/fuchsia/cast_streaming/BUILD.gn9
-rw-r--r--chromium/fuchsia/cast_streaming/DEPS2
-rw-r--r--chromium/fuchsia/cast_streaming/cast_streaming.cc24
-rw-r--r--chromium/fuchsia/cast_streaming/cast_streaming_session.cc123
-rw-r--r--chromium/fuchsia/cast_streaming/public/cast_streaming.h25
-rw-r--r--chromium/fuchsia/cast_streaming/public/cast_streaming_session.h53
-rw-r--r--chromium/fuchsia/cast_streaming/stream_consumer.cc148
-rw-r--r--chromium/fuchsia/cast_streaming/stream_consumer.h46
8 files changed, 351 insertions, 79 deletions
diff --git a/chromium/fuchsia/cast_streaming/BUILD.gn b/chromium/fuchsia/cast_streaming/BUILD.gn
index c691165e93d..94d567cc610 100644
--- a/chromium/fuchsia/cast_streaming/BUILD.gn
+++ b/chromium/fuchsia/cast_streaming/BUILD.gn
@@ -11,16 +11,23 @@ source_set("cast_streaming") {
"//components/openscreen_platform:openscreen_platform_network_service",
"//fuchsia/base",
"//media",
+ "//media/mojo/common",
+ "//media/mojo/mojom",
+ "//mojo/public/cpp/system",
"//third_party/fuchsia-sdk/sdk/fidl/fuchsia.web",
"//third_party/openscreen/src/cast/streaming:receiver",
"//third_party/openscreen/src/platform:api",
"//third_party/openscreen/src/util",
]
visibility = [ "//fuchsia/engine/*" ]
- public = [ "public/cast_streaming_session.h" ]
+ public = [
+ "public/cast_streaming.h",
+ "public/cast_streaming_session.h",
+ ]
sources = [
"cast_message_port_impl.cc",
"cast_message_port_impl.h",
+ "cast_streaming.cc",
"cast_streaming_session.cc",
"stream_consumer.cc",
"stream_consumer.h",
diff --git a/chromium/fuchsia/cast_streaming/DEPS b/chromium/fuchsia/cast_streaming/DEPS
index 13d521158c3..5208c523d56 100644
--- a/chromium/fuchsia/cast_streaming/DEPS
+++ b/chromium/fuchsia/cast_streaming/DEPS
@@ -1,5 +1,7 @@
include_rules = [
"+components/openscreen_platform",
"+media/base",
+ "+media/mojo",
+ "+mojo/public",
"+third_party/openscreen/src",
]
diff --git a/chromium/fuchsia/cast_streaming/cast_streaming.cc b/chromium/fuchsia/cast_streaming/cast_streaming.cc
new file mode 100644
index 00000000000..2044900b05b
--- /dev/null
+++ b/chromium/fuchsia/cast_streaming/cast_streaming.cc
@@ -0,0 +1,24 @@
+// Copyright 2020 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "fuchsia/cast_streaming/public/cast_streaming.h"
+
+#include "base/strings/string_piece.h"
+
+namespace cast_streaming {
+
+bool IsCastStreamingAppOrigin(base::StringPiece origin) {
+ constexpr char kCastStreamingMessagePortOrigin[] = "cast-streaming:receiver";
+ return origin == kCastStreamingMessagePortOrigin;
+}
+
+bool IsValidCastStreamingMessage(const fuchsia::web::WebMessage& message) {
+ // |message| should contain exactly one OutgoingTransferrable, with a single
+ // MessagePort.
+ return message.has_outgoing_transfer() &&
+ message.outgoing_transfer().size() == 1u &&
+ message.outgoing_transfer()[0].is_message_port();
+}
+
+} // namespace cast_streaming
diff --git a/chromium/fuchsia/cast_streaming/cast_streaming_session.cc b/chromium/fuchsia/cast_streaming/cast_streaming_session.cc
index 71f931737bc..565df0defae 100644
--- a/chromium/fuchsia/cast_streaming/cast_streaming_session.cc
+++ b/chromium/fuchsia/cast_streaming/cast_streaming_session.cc
@@ -8,11 +8,14 @@
#include "base/bind.h"
#include "base/notreached.h"
+#include "components/openscreen_platform/network_context.h"
#include "components/openscreen_platform/network_util.h"
#include "components/openscreen_platform/task_runner.h"
#include "fuchsia/cast_streaming/cast_message_port_impl.h"
#include "fuchsia/cast_streaming/stream_consumer.h"
#include "media/base/media_util.h"
+#include "media/mojo/common/mojo_decoder_buffer_converter.h"
+#include "mojo/public/cpp/system/data_pipe.h"
#include "third_party/openscreen/src/cast/streaming/receiver.h"
#include "third_party/openscreen/src/cast/streaming/receiver_session.h"
@@ -27,6 +30,12 @@ constexpr char kVideoCodecVp8[] = "vp8";
namespace cast_streaming {
+// static
+void CastStreamingSession::SetNetworkContextGetter(
+ NetworkContextGetter getter) {
+ openscreen_platform::SetNetworkContextGetter(std::move(getter));
+}
+
// Owns the Open Screen ReceiverSession. The Cast Streaming Session is tied to
// the lifespan of this object.
class CastStreamingSession::Internal
@@ -37,12 +46,9 @@ class CastStreamingSession::Internal
fidl::InterfaceRequest<fuchsia::web::MessagePort> message_port_request,
scoped_refptr<base::SequencedTaskRunner> task_runner)
: task_runner_(task_runner),
- environment_(&openscreen::Clock::now,
- &task_runner_,
- openscreen::IPEndpoint{
- openscreen::IPAddress(0, 0, 0, 0, 0, 0, 0, 0), 0}),
+ environment_(&openscreen::Clock::now, &task_runner_),
cast_message_port_impl_(std::move(message_port_request)),
- // TODO(crbug.com/1042501): Add streaming session Constraints and
+ // TODO(crbug.com/1087520): Add streaming session Constraints and
// DisplayDescription.
receiver_session_(
this,
@@ -73,14 +79,31 @@ class CastStreamingSession::Internal
DVLOG(1) << __func__;
DCHECK_EQ(session, &receiver_session_);
- base::Optional<media::AudioDecoderConfig> audio_decoder_config;
+ base::Optional<AudioStreamInfo> audio_stream_info;
if (receivers.audio) {
+ // Creare the audio data pipe.
+ const MojoCreateDataPipeOptions data_pipe_options{
+ sizeof(MojoCreateDataPipeOptions), MOJO_CREATE_DATA_PIPE_FLAG_NONE,
+ 1u /* element_num_bytes */,
+ media::GetDefaultDecoderBufferConverterCapacity(
+ media::DemuxerStream::Type::AUDIO)};
+ mojo::ScopedDataPipeProducerHandle data_pipe_producer;
+ mojo::ScopedDataPipeConsumerHandle data_pipe_consumer;
+ MojoResult result = mojo::CreateDataPipe(
+ &data_pipe_options, &data_pipe_producer, &data_pipe_consumer);
+ if (result != MOJO_RESULT_OK) {
+ client_->OnInitializationFailure();
+ return;
+ }
+
+ // Initialize the audio consumer.
audio_consumer_ = std::make_unique<StreamConsumer>(
- receivers.audio->receiver,
+ receivers.audio->receiver, std::move(data_pipe_producer),
base::BindRepeating(
- &CastStreamingSession::Client::OnAudioFrameReceived,
+ &CastStreamingSession::Client::OnAudioBufferReceived,
base::Unretained(client_)));
+ // Gather data for the audio decoder config.
media::ChannelLayout channel_layout =
media::GuessChannelLayout(receivers.audio->receiver_config.channels);
const std::string& audio_codec =
@@ -89,22 +112,42 @@ class CastStreamingSession::Internal
media::StringToAudioCodec(audio_codec);
int samples_per_second = receivers.audio->receiver_config.rtp_timebase;
- audio_decoder_config.emplace(media::AudioDecoderConfig(
- media_audio_codec, media::SampleFormat::kSampleFormatF32,
- channel_layout, samples_per_second, media::EmptyExtraData(),
- media::EncryptionScheme::kUnencrypted));
+ audio_stream_info.emplace(AudioStreamInfo{
+ media::AudioDecoderConfig(
+ media_audio_codec, media::SampleFormat::kSampleFormatF32,
+ channel_layout, samples_per_second, media::EmptyExtraData(),
+ media::EncryptionScheme::kUnencrypted),
+ std::move(data_pipe_consumer)});
- DVLOG(1) << "Initialized audio stream using " << audio_codec << " codec.";
+ DVLOG(1) << "Initialized audio stream. "
+ << audio_stream_info->decoder_config.AsHumanReadableString();
}
- base::Optional<media::VideoDecoderConfig> video_decoder_config;
+ base::Optional<VideoStreamInfo> video_stream_info;
if (receivers.video) {
+ // Creare the video data pipe.
+ const MojoCreateDataPipeOptions data_pipe_options{
+ sizeof(MojoCreateDataPipeOptions), MOJO_CREATE_DATA_PIPE_FLAG_NONE,
+ 1u /* element_num_bytes */,
+ media::GetDefaultDecoderBufferConverterCapacity(
+ media::DemuxerStream::Type::VIDEO)};
+ mojo::ScopedDataPipeProducerHandle data_pipe_producer;
+ mojo::ScopedDataPipeConsumerHandle data_pipe_consumer;
+ MojoResult result = mojo::CreateDataPipe(
+ &data_pipe_options, &data_pipe_producer, &data_pipe_consumer);
+ if (result != MOJO_RESULT_OK) {
+ client_->OnInitializationFailure();
+ return;
+ }
+
+ // Initialize the video consumer.
video_consumer_ = std::make_unique<StreamConsumer>(
- receivers.video->receiver,
+ receivers.video->receiver, std::move(data_pipe_producer),
base::BindRepeating(
- &CastStreamingSession::Client::OnVideoFrameReceived,
+ &CastStreamingSession::Client::OnVideoBufferReceived,
base::Unretained(client_)));
+ // Gather data for the video decoder config.
const std::string& video_codec =
receivers.video->selected_stream.stream.codec_name;
uint32_t video_width =
@@ -114,36 +157,39 @@ class CastStreamingSession::Internal
gfx::Size video_size(video_width, video_height);
gfx::Rect video_rect(video_width, video_height);
+ media::VideoCodec media_video_codec =
+ media::VideoCodec::kUnknownVideoCodec;
+ media::VideoCodecProfile video_codec_profile =
+ media::VideoCodecProfile::VIDEO_CODEC_PROFILE_UNKNOWN;
+
if (video_codec == kVideoCodecH264) {
- video_decoder_config.emplace(media::VideoDecoderConfig(
- media::VideoCodec::kCodecH264,
- media::VideoCodecProfile::H264PROFILE_BASELINE,
- media::VideoDecoderConfig::AlphaMode::kIsOpaque,
- media::VideoColorSpace(), media::VideoTransformation(), video_size,
- video_rect, video_size, media::EmptyExtraData(),
- media::EncryptionScheme::kUnencrypted));
+ media_video_codec = media::VideoCodec::kCodecH264;
+ video_codec_profile = media::VideoCodecProfile::H264PROFILE_BASELINE;
} else if (video_codec == kVideoCodecVp8) {
- video_decoder_config.emplace(media::VideoDecoderConfig(
- media::VideoCodec::kCodecVP8,
- media::VideoCodecProfile::VP8PROFILE_MIN,
- media::VideoDecoderConfig::AlphaMode::kIsOpaque,
- media::VideoColorSpace(), media::VideoTransformation(), video_size,
- video_rect, video_size, media::EmptyExtraData(),
- media::EncryptionScheme::kUnencrypted));
+ media_video_codec = media::VideoCodec::kCodecVP8;
+ video_codec_profile = media::VideoCodecProfile::VP8PROFILE_MIN;
} else {
NOTREACHED();
}
- DVLOG(1) << "Initialized video stream of " << video_width << "x"
- << video_height << " resolution using " << video_codec
- << " codec.";
+ video_stream_info.emplace(VideoStreamInfo{
+ media::VideoDecoderConfig(
+ media_video_codec, video_codec_profile,
+ media::VideoDecoderConfig::AlphaMode::kIsOpaque,
+ media::VideoColorSpace(), media::VideoTransformation(),
+ video_size, video_rect, video_size, media::EmptyExtraData(),
+ media::EncryptionScheme::kUnencrypted),
+ std::move(data_pipe_consumer)});
+
+ DVLOG(1) << "Initialized video stream. "
+ << video_stream_info->decoder_config.AsHumanReadableString();
}
- if (!video_decoder_config && !audio_decoder_config) {
+ if (!audio_stream_info && !video_stream_info) {
client_->OnInitializationFailure();
} else {
- client_->OnInitializationSuccess(std::move(audio_decoder_config),
- std::move(video_decoder_config));
+ client_->OnInitializationSuccess(std::move(audio_stream_info),
+ std::move(video_stream_info));
}
initialized_called_ = true;
}
@@ -192,4 +238,9 @@ void CastStreamingSession::Start(
client, std::move(message_port_request), task_runner);
}
+void CastStreamingSession::Stop() {
+ DCHECK(internal_);
+ internal_.reset();
+}
+
} // namespace cast_streaming
diff --git a/chromium/fuchsia/cast_streaming/public/cast_streaming.h b/chromium/fuchsia/cast_streaming/public/cast_streaming.h
new file mode 100644
index 00000000000..0838317b6ac
--- /dev/null
+++ b/chromium/fuchsia/cast_streaming/public/cast_streaming.h
@@ -0,0 +1,25 @@
+// Copyright 2020 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef FUCHSIA_CAST_STREAMING_PUBLIC_CAST_STREAMING_H_
+#define FUCHSIA_CAST_STREAMING_PUBLIC_CAST_STREAMING_H_
+
+#include <fuchsia/web/cpp/fidl.h>
+
+#include "base/strings/string_piece_forward.h"
+
+namespace cast_streaming {
+
+// TODO(crbug.com/1082821): Remove this file once the Cast Streaming Receiver is
+// implemented as a separate component from WebEngine.
+
+// Returns true if |origin| is the Cast Streaming MessagePort origin.
+bool IsCastStreamingAppOrigin(base::StringPiece origin);
+
+// Returns true if |message| contains a valid Cast Streaming Message.
+bool IsValidCastStreamingMessage(const fuchsia::web::WebMessage& message);
+
+} // namespace cast_streaming
+
+#endif // FUCHSIA_CAST_STREAMING_PUBLIC_CAST_STREAMING_H_
diff --git a/chromium/fuchsia/cast_streaming/public/cast_streaming_session.h b/chromium/fuchsia/cast_streaming/public/cast_streaming_session.h
index 6769ba1b49f..bb6703989c7 100644
--- a/chromium/fuchsia/cast_streaming/public/cast_streaming_session.h
+++ b/chromium/fuchsia/cast_streaming/public/cast_streaming_session.h
@@ -9,11 +9,19 @@
#include <fuchsia/web/cpp/fidl.h>
+#include "base/callback.h"
#include "base/optional.h"
#include "base/sequenced_task_runner.h"
#include "media/base/audio_decoder_config.h"
-#include "media/base/decoder_buffer.h"
#include "media/base/video_decoder_config.h"
+#include "media/mojo/mojom/media_types.mojom.h"
+#include "mojo/public/cpp/system/data_pipe.h"
+
+namespace network {
+namespace mojom {
+class NetworkContext;
+} // namespace mojom
+} // namespace network
namespace cast_streaming {
@@ -21,25 +29,44 @@ namespace cast_streaming {
// Cast Streaming Session for a provided FIDL MessagePort request.
class CastStreamingSession {
public:
+ using NetworkContextGetter =
+ base::RepeatingCallback<network::mojom::NetworkContext*()>;
+
+ // Sets the NetworkContextGetter. This must be called before any call to
+ // Start() and must only be called once. If the NetworkContext crashes, any
+ // existing Cast Streaming Session will eventually terminate and call
+ // OnReceiverSessionEnded().
+ static void SetNetworkContextGetter(NetworkContextGetter getter);
+
+ template <class T>
+ struct StreamInfo {
+ T decoder_config;
+ mojo::ScopedDataPipeConsumerHandle data_pipe;
+ };
+ using AudioStreamInfo = StreamInfo<media::AudioDecoderConfig>;
+ using VideoStreamInfo = StreamInfo<media::VideoDecoderConfig>;
+
class Client {
public:
// Called when the Cast Streaming Session has been successfully initialized.
- // It is guaranteed that at least one of |audio_decoder_config| or
- // |video_decoder_config| will be set.
+ // It is guaranteed that at least one of |audio_stream_info| or
+ // |video_stream_info| will be set.
virtual void OnInitializationSuccess(
- base::Optional<media::AudioDecoderConfig> audio_decoder_config,
- base::Optional<media::VideoDecoderConfig> video_decoder_config) = 0;
+ base::Optional<AudioStreamInfo> audio_stream_info,
+ base::Optional<VideoStreamInfo> video_stream_info) = 0;
// Called when the Cast Stream Session failed to initialize.
virtual void OnInitializationFailure() = 0;
- // Called on every new audio frame after OnInitializationSuccess().
- virtual void OnAudioFrameReceived(
- scoped_refptr<media::DecoderBuffer> buffer) = 0;
+ // Called on every new audio buffer after OnInitializationSuccess(). The
+ // frame data must be accessed via the |data_pipe| property in StreamInfo.
+ virtual void OnAudioBufferReceived(
+ media::mojom::DecoderBufferPtr buffer) = 0;
- // Called on every new video frame after OnInitializationSuccess().
- virtual void OnVideoFrameReceived(
- scoped_refptr<media::DecoderBuffer> buffer) = 0;
+ // Called on every new video buffer after OnInitializationSuccess(). The
+ // frame data must be accessed via the |data_pipe| property in StreamInfo.
+ virtual void OnVideoBufferReceived(
+ media::mojom::DecoderBufferPtr buffer) = 0;
// Called when the Cast Streaming Session has ended.
virtual void OnReceiverSessionEnded() = 0;
@@ -66,6 +93,10 @@ class CastStreamingSession {
fidl::InterfaceRequest<fuchsia::web::MessagePort> message_port_request,
scoped_refptr<base::SequencedTaskRunner> task_runner);
+ // Stops the Cast Streaming Session. This can only be called once during the
+ // lifespan of this object and only after a call to Start().
+ void Stop();
+
private:
class Internal;
std::unique_ptr<Internal> internal_;
diff --git a/chromium/fuchsia/cast_streaming/stream_consumer.cc b/chromium/fuchsia/cast_streaming/stream_consumer.cc
index bcfb8a4d6b1..353ac52119a 100644
--- a/chromium/fuchsia/cast_streaming/stream_consumer.cc
+++ b/chromium/fuchsia/cast_streaming/stream_consumer.cc
@@ -5,27 +5,132 @@
#include "fuchsia/cast_streaming/stream_consumer.h"
#include "base/logging.h"
+#include "base/time/time.h"
+#include "media/base/media_util.h"
namespace cast_streaming {
StreamConsumer::StreamConsumer(openscreen::cast::Receiver* receiver,
+ mojo::ScopedDataPipeProducerHandle data_pipe,
FrameReceivedCB frame_received_cb)
- : receiver_(receiver), frame_received_cb_(std::move(frame_received_cb)) {
+ : receiver_(receiver),
+ data_pipe_(std::move(data_pipe)),
+ frame_received_cb_(std::move(frame_received_cb)),
+ pipe_watcher_(FROM_HERE,
+ mojo::SimpleWatcher::ArmingPolicy::MANUAL,
+ base::SequencedTaskRunnerHandle::Get()) {
DCHECK(receiver_);
receiver_->SetConsumer(this);
+ MojoResult result =
+ pipe_watcher_.Watch(data_pipe_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
+ base::BindRepeating(&StreamConsumer::OnPipeWritable,
+ base::Unretained(this)));
+ if (result != MOJO_RESULT_OK) {
+ CloseDataPipeOnError();
+ }
}
StreamConsumer::~StreamConsumer() {
receiver_->SetConsumer(nullptr);
}
+void StreamConsumer::CloseDataPipeOnError() {
+ DLOG(WARNING) << "[ssrc:" << receiver_->ssrc() << "] Data pipe closed.";
+ receiver_->SetConsumer(nullptr);
+ pipe_watcher_.Cancel();
+ data_pipe_.reset();
+}
+
+void StreamConsumer::OnPipeWritable(MojoResult result) {
+ DCHECK(data_pipe_);
+
+ if (result != MOJO_RESULT_OK) {
+ CloseDataPipeOnError();
+ return;
+ }
+
+ uint32_t bytes_written = pending_buffer_remaining_bytes_;
+ result = data_pipe_->WriteData(pending_buffer_ + pending_buffer_offset_,
+ &bytes_written, MOJO_WRITE_DATA_FLAG_NONE);
+ if (result != MOJO_RESULT_OK) {
+ CloseDataPipeOnError();
+ return;
+ }
+
+ pending_buffer_offset_ += bytes_written;
+ pending_buffer_remaining_bytes_ -= bytes_written;
+ if (pending_buffer_remaining_bytes_ != 0) {
+ pipe_watcher_.ArmOrNotify();
+ return;
+ }
+
+ // Advance to the next frame if a new one is ready.
+ int next_frame_buffer_size = receiver_->AdvanceToNextFrame();
+ if (next_frame_buffer_size != openscreen::cast::Receiver::kNoFramesReady)
+ OnFramesReady(next_frame_buffer_size);
+}
+
void StreamConsumer::OnFramesReady(int next_frame_buffer_size) {
- // TODO(crbug.com/1042501): Do not allocate a buffer for every new frame, use
- // a buffer pool.
- std::unique_ptr<uint8_t[]> buffer =
- std::make_unique<uint8_t[]>(next_frame_buffer_size);
- openscreen::cast::EncodedFrame encoded_frame = receiver_->ConsumeNextFrame(
- absl::Span<uint8_t>(buffer.get(), next_frame_buffer_size));
+ DCHECK(data_pipe_);
+
+ if (pending_buffer_remaining_bytes_ != 0) {
+ // There already is a pending frame. Ignore this one for now.
+ return;
+ }
+
+ void* buffer = nullptr;
+ uint32_t buffer_size = next_frame_buffer_size;
+ uint32_t mojo_buffer_size = next_frame_buffer_size;
+
+ if (buffer_size > kMaxFrameSize) {
+ LOG(ERROR) << "[ssrc:" << receiver_->ssrc() << "] "
+ << "Frame size too big: " << buffer_size;
+ CloseDataPipeOnError();
+ return;
+ }
+
+ MojoResult result = data_pipe_->BeginWriteData(
+ &buffer, &mojo_buffer_size, MOJO_BEGIN_WRITE_DATA_FLAG_NONE);
+
+ if (result == MOJO_RESULT_SHOULD_WAIT) {
+ pipe_watcher_.ArmOrNotify();
+ return;
+ }
+
+ if (result != MOJO_RESULT_OK) {
+ CloseDataPipeOnError();
+ return;
+ }
+
+ openscreen::cast::EncodedFrame encoded_frame;
+ size_t bytes_written = 0;
+
+ if (mojo_buffer_size < buffer_size) {
+ DVLOG(2) << "[ssrc:" << receiver_->ssrc() << "] "
+ << "Mojo data pipe full";
+
+ // The |data_pipe_| buffer cannot take the full frame, write to
+ // |pending_buffer_| instead.
+ encoded_frame = receiver_->ConsumeNextFrame(
+ absl::Span<uint8_t>(pending_buffer_, buffer_size));
+
+ // Write as much as we can to the |data_pipe_| buffer.
+ memcpy(buffer, pending_buffer_, mojo_buffer_size);
+ pending_buffer_offset_ = mojo_buffer_size;
+ pending_buffer_remaining_bytes_ = buffer_size - mojo_buffer_size;
+ bytes_written = mojo_buffer_size;
+ } else {
+ // Write directly to the |data_pipe_| buffer.
+ encoded_frame = receiver_->ConsumeNextFrame(
+ absl::Span<uint8_t>(static_cast<uint8_t*>(buffer), buffer_size));
+ bytes_written = buffer_size;
+ }
+
+ result = data_pipe_->EndWriteData(bytes_written);
+ if (result != MOJO_RESULT_OK) {
+ CloseDataPipeOnError();
+ return;
+ }
const bool is_key_frame =
encoded_frame.dependency ==
@@ -37,26 +142,21 @@ void StreamConsumer::OnFramesReady(int next_frame_buffer_size) {
receiver_->rtp_timebase())
.count());
- DVLOG(3) << "[ssrc:" << receiver_->ssrc()
- << "] Received new frame. Timestamp: " << playout_time
+ DVLOG(3) << "[ssrc:" << receiver_->ssrc() << "] "
+ << "Received new frame. Timestamp: " << playout_time
<< ", is_key_frame: " << is_key_frame;
- if (playout_time <= last_playout_time_) {
- // TODO(b/156129097): We sometimes receive identical playout time for two
- // frames in a row.
- DVLOG(2) << "[ssrc:" << receiver_->ssrc()
- << "] Droped frame due to identical playout time.";
- return;
- }
- last_playout_time_ = playout_time;
+ frame_received_cb_.Run(media::mojom::DecoderBuffer::New(
+ playout_time /* timestamp */, base::TimeDelta() /* duration */,
+ false /* is_end_of_stream */, buffer_size, is_key_frame,
+ media::EmptyExtraData(), media::mojom::DecryptConfigPtr(),
+ base::TimeDelta() /* front_discard */,
+ base::TimeDelta() /* back_discard */
+ ));
- scoped_refptr<media::DecoderBuffer> decoder_buffer =
- media::DecoderBuffer::FromArray(std::move(buffer),
- next_frame_buffer_size);
- decoder_buffer->set_is_key_frame(is_key_frame);
- decoder_buffer->set_timestamp(playout_time);
-
- frame_received_cb_.Run(decoder_buffer);
+ if (pending_buffer_remaining_bytes_ != 0) {
+ pipe_watcher_.ArmOrNotify();
+ }
}
} // namespace cast_streaming
diff --git a/chromium/fuchsia/cast_streaming/stream_consumer.h b/chromium/fuchsia/cast_streaming/stream_consumer.h
index 4605d158bde..eef5f14b1a7 100644
--- a/chromium/fuchsia/cast_streaming/stream_consumer.h
+++ b/chromium/fuchsia/cast_streaming/stream_consumer.h
@@ -8,23 +8,33 @@
#include <fuchsia/media/cpp/fidl.h>
#include "base/callback.h"
-#include "base/time/time.h"
-#include "media/base/decoder_buffer.h"
+#include "media/mojo/mojom/media_types.mojom.h"
+#include "mojo/public/cpp/system/data_pipe.h"
+#include "mojo/public/cpp/system/simple_watcher.h"
#include "third_party/openscreen/src/cast/streaming/receiver.h"
#include "third_party/openscreen/src/cast/streaming/receiver_session.h"
namespace cast_streaming {
-// Attaches to an Open Screen Receiver to receive frames and invokes
-// |frame_received_cb_| with each received buffer of encoded data.
+// Attaches to an Open Screen Receiver to receive buffers of encoded data and
+// invokes |frame_received_cb_| with each buffer.
+//
+// Internally, this class writes buffers of encoded data directly to
+// |data_pipe_| rather than using a helper class like MojoDecoderBufferWriter.
+// This allows us to use |data_pipe_| as an end-to-end buffer to cap memory
+// usage. Receiving new buffers is delayed until the pipe has free memory again.
+// The Open Screen library takes care of discarding buffers that are too old and
+// requesting new key frames as needed.
class StreamConsumer : public openscreen::cast::Receiver::Consumer {
public:
using FrameReceivedCB =
- base::RepeatingCallback<void(scoped_refptr<media::DecoderBuffer>)>;
+ base::RepeatingCallback<void(media::mojom::DecoderBufferPtr)>;
// |receiver| sends frames to this object. It must outlive this object.
- // |frame_received_cb| is called on every new frame.
+ // |frame_received_cb| is called on every new frame, after a new frame has
+ // been written to |data_pipe|. On error, |data_pipe| will be closed.
StreamConsumer(openscreen::cast::Receiver* receiver,
+ mojo::ScopedDataPipeProducerHandle data_pipe,
FrameReceivedCB frame_received_cb);
~StreamConsumer() final;
@@ -32,12 +42,34 @@ class StreamConsumer : public openscreen::cast::Receiver::Consumer {
StreamConsumer& operator=(const StreamConsumer&) = delete;
private:
+ // Maximum frame size that OnFramesReady() can accept.
+ static constexpr uint32_t kMaxFrameSize = 512 * 1024;
+
+ // Closes |data_pipe_| and resets the Consumer in |receiver_|. No frames will
+ // be received after this call.
+ void CloseDataPipeOnError();
+
+ // Callback when |data_pipe_| can be written to again after it was full.
+ void OnPipeWritable(MojoResult result);
+
// openscreen::cast::Receiver::Consumer implementation.
void OnFramesReady(int next_frame_buffer_size) final;
openscreen::cast::Receiver* const receiver_;
+ mojo::ScopedDataPipeProducerHandle data_pipe_;
const FrameReceivedCB frame_received_cb_;
- base::TimeDelta last_playout_time_;
+
+ // Provides notifications about |data_pipe_| readiness.
+ mojo::SimpleWatcher pipe_watcher_;
+
+ // Buffer used when |data_pipe_| is too full to accept the next frame size.
+ uint8_t pending_buffer_[kMaxFrameSize];
+
+ // Current offset for data |pending_buffer_| to be written to |data_pipe_|.
+ size_t pending_buffer_offset_ = 0;
+
+ // Remaining bytes to write from |pending_buffer_| to |data_pipe_|.
+ size_t pending_buffer_remaining_bytes_ = 0;
};
} // namespace cast_streaming