diff options
author | Allan Sandfeld Jensen <allan.jensen@qt.io> | 2020-10-12 14:27:29 +0200 |
---|---|---|
committer | Allan Sandfeld Jensen <allan.jensen@qt.io> | 2020-10-13 09:35:20 +0000 |
commit | c30a6232df03e1efbd9f3b226777b07e087a1122 (patch) | |
tree | e992f45784689f373bcc38d1b79a239ebe17ee23 /chromium/media/remoting | |
parent | 7b5b123ac58f58ffde0f4f6e488bcd09aa4decd3 (diff) | |
download | qtwebengine-chromium-85-based.tar.gz |
BASELINE: Update Chromium to 85.0.4183.14085-based
Change-Id: Iaa42f4680837c57725b1344f108c0196741f6057
Reviewed-by: Allan Sandfeld Jensen <allan.jensen@qt.io>
Diffstat (limited to 'chromium/media/remoting')
27 files changed, 2804 insertions, 655 deletions
diff --git a/chromium/media/remoting/BUILD.gn b/chromium/media/remoting/BUILD.gn index f302a0571b0..b01ab5ffd6b 100644 --- a/chromium/media/remoting/BUILD.gn +++ b/chromium/media/remoting/BUILD.gn @@ -31,7 +31,7 @@ source_set("rpc") { public_deps = [ ":media_remoting_proto" ] } -source_set("remoting") { +source_set("remoting_sender") { sources = [ "courier_renderer_factory.cc", "courier_renderer_factory.h", @@ -64,16 +64,48 @@ source_set("remoting") { } } +source_set("remoting_constants") { + sources = [ "remoting_constants.h" ] +} + +source_set("remoting_renderer") { + sources = [ + "receiver.cc", + "receiver.h", + "receiver_controller.cc", + "receiver_controller.h", + "remoting_renderer_factory.cc", + "remoting_renderer_factory.h", + "stream_provider.cc", + "stream_provider.h", + ] + + deps = [ + ":remoting_constants", + ":rpc", + "//media/mojo/common:common", + "//media/mojo/mojom:remoting", + ] +} + source_set("media_remoting_tests") { testonly = true sources = [ "fake_remoter.cc", "fake_remoter.h", + "mock_receiver_controller.cc", + "mock_receiver_controller.h", + "receiver_unittest.cc", "renderer_controller_unittest.cc", + "stream_provider_unittest.cc", + "test_utils.cc", + "test_utils.h", ] deps = [ - ":remoting", + ":remoting_renderer", + ":remoting_sender", + ":rpc", "//base", "//base/test:test_support", "//media:test_support", @@ -94,16 +126,13 @@ source_set("media_remoting_tests") { "fake_media_resource.h", "integration_test.cc", "proto_utils_unittest.cc", - "receiver.cc", - "receiver.h", "rpc_broker_unittest.cc", - "stream_provider.cc", - "stream_provider.h", ] deps += [ ":rpc", "//media/test:pipeline_integration_test_base", + "//services/service_manager/public/cpp:cpp", "//ui/gfx:test_support", "//ui/gfx/geometry", ] diff --git a/chromium/media/remoting/courier_renderer.cc b/chromium/media/remoting/courier_renderer.cc index d820952f0ca..dfd4b8a1e2a 100644 --- a/chromium/media/remoting/courier_renderer.cc +++ b/chromium/media/remoting/courier_renderer.cc @@ -74,7 +74,6 @@ CourierRenderer::CourierRenderer( remote_renderer_handle_(RpcBroker::kInvalidHandle), video_renderer_sink_(video_renderer_sink), clock_(base::DefaultTickClock::GetInstance()) { - VLOG(2) << __func__; // Note: The constructor is running on the main thread, but will be destroyed // on the media thread. Therefore, all weak pointers must be dereferenced on // the media thread. @@ -85,7 +84,6 @@ CourierRenderer::CourierRenderer( } CourierRenderer::~CourierRenderer() { - VLOG(2) << __func__; DCHECK(media_task_runner_->BelongsToCurrentThread()); // Post task on main thread to unregister message receiver. @@ -102,7 +100,6 @@ CourierRenderer::~CourierRenderer() { void CourierRenderer::Initialize(MediaResource* media_resource, RendererClient* client, PipelineStatusCallback init_cb) { - VLOG(2) << __func__; DCHECK(media_task_runner_->BelongsToCurrentThread()); DCHECK(media_resource); DCHECK(client); @@ -150,19 +147,10 @@ void CourierRenderer::Initialize(MediaResource* media_resource, rpc_broker_))); } -void CourierRenderer::SetCdm(CdmContext* cdm_context, - CdmAttachedCB cdm_attached_cb) { - DCHECK(media_task_runner_->BelongsToCurrentThread()); - - // Media remoting doesn't support encrypted content. - NOTIMPLEMENTED(); -} - void CourierRenderer::SetLatencyHint( base::Optional<base::TimeDelta> latency_hint) {} void CourierRenderer::Flush(base::OnceClosure flush_cb) { - VLOG(2) << __func__; DCHECK(media_task_runner_->BelongsToCurrentThread()); DCHECK(!flush_cb_); @@ -188,14 +176,13 @@ void CourierRenderer::Flush(base::OnceClosure flush_cb) { (video_demuxer_stream_adapter_ && !flush_video_count.has_value()) || (audio_demuxer_stream_adapter_ && video_demuxer_stream_adapter_ && flush_audio_count.has_value() != flush_video_count.has_value())) { - VLOG(1) << "Ignoring flush request while under flushing operation"; return; } flush_cb_ = std::move(flush_cb); // Issues RPC_R_FLUSHUNTIL RPC message. - std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage()); + auto rpc = std::make_unique<pb::RpcMessage>(); rpc->set_handle(remote_renderer_handle_); rpc->set_proc(pb::RpcMessage::RPC_R_FLUSHUNTIL); pb::RendererFlushUntil* message = rpc->mutable_renderer_flushuntil_rpc(); @@ -204,15 +191,10 @@ void CourierRenderer::Flush(base::OnceClosure flush_cb) { if (flush_video_count.has_value()) message->set_video_count(*flush_video_count); message->set_callback_handle(rpc_handle_); - VLOG(2) << __func__ << ": Sending RPC_R_FLUSHUNTIL to " << rpc->handle() - << " with audio_count=" << message->audio_count() - << ", video_count=" << message->video_count() - << ", callback_handle=" << message->callback_handle(); SendRpcToRemote(std::move(rpc)); } void CourierRenderer::StartPlayingFrom(base::TimeDelta time) { - VLOG(2) << __func__ << ": " << time.InMicroseconds(); DCHECK(media_task_runner_->BelongsToCurrentThread()); if (state_ != STATE_PLAYING) { @@ -221,12 +203,10 @@ void CourierRenderer::StartPlayingFrom(base::TimeDelta time) { } // Issues RPC_R_STARTPLAYINGFROM RPC message. - std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage()); + auto rpc = std::make_unique<pb::RpcMessage>(); rpc->set_handle(remote_renderer_handle_); rpc->set_proc(pb::RpcMessage::RPC_R_STARTPLAYINGFROM); rpc->set_integer64_value(time.InMicroseconds()); - VLOG(2) << __func__ << ": Sending RPC_R_STARTPLAYINGFROM to " << rpc->handle() - << " with time_usec=" << rpc->integer64_value(); SendRpcToRemote(std::move(rpc)); { @@ -237,7 +217,6 @@ void CourierRenderer::StartPlayingFrom(base::TimeDelta time) { } void CourierRenderer::SetPlaybackRate(double playback_rate) { - VLOG(2) << __func__ << ": " << playback_rate; DCHECK(media_task_runner_->BelongsToCurrentThread()); if (state_ != STATE_FLUSHING && state_ != STATE_PLAYING) { @@ -246,19 +225,16 @@ void CourierRenderer::SetPlaybackRate(double playback_rate) { } // Issues RPC_R_SETPLAYBACKRATE RPC message. - std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage()); + auto rpc = std::make_unique<pb::RpcMessage>(); rpc->set_handle(remote_renderer_handle_); rpc->set_proc(pb::RpcMessage::RPC_R_SETPLAYBACKRATE); rpc->set_double_value(playback_rate); - VLOG(2) << __func__ << ": Sending RPC_R_SETPLAYBACKRATE to " << rpc->handle() - << " with rate=" << rpc->double_value(); SendRpcToRemote(std::move(rpc)); playback_rate_ = playback_rate; ResetMeasurements(); } void CourierRenderer::SetVolume(float volume) { - VLOG(2) << __func__ << ": " << volume; DCHECK(media_task_runner_->BelongsToCurrentThread()); if (state_ != STATE_FLUSHING && state_ != STATE_PLAYING) { @@ -267,12 +243,10 @@ void CourierRenderer::SetVolume(float volume) { } // Issues RPC_R_SETVOLUME RPC message. - std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage()); + auto rpc = std::make_unique<pb::RpcMessage>(); rpc->set_handle(remote_renderer_handle_); rpc->set_proc(pb::RpcMessage::RPC_R_SETVOLUME); rpc->set_double_value(volume); - VLOG(2) << __func__ << ": Sending RPC_R_SETVOLUME to " << rpc->handle() - << " with volume=" << rpc->double_value(); SendRpcToRemote(std::move(rpc)); } @@ -314,7 +288,6 @@ void CourierRenderer::OnDataPipeCreated( mojo::ScopedDataPipeProducerHandle video_handle, int audio_rpc_handle, int video_rpc_handle) { - VLOG(2) << __func__; DCHECK(media_task_runner_->BelongsToCurrentThread()); if (state_ == STATE_ERROR) @@ -332,7 +305,6 @@ void CourierRenderer::OnDataPipeCreated( // Create audio demuxer stream adapter if audio is available. if (audio_demuxer_stream && audio.is_valid() && audio_handle.is_valid() && audio_rpc_handle != RpcBroker::kInvalidHandle) { - VLOG(2) << "Initialize audio"; audio_demuxer_stream_adapter_.reset(new DemuxerStreamAdapter( main_task_runner_, media_task_runner_, "audio", audio_demuxer_stream, rpc_broker_, audio_rpc_handle, std::move(audio), @@ -344,7 +316,6 @@ void CourierRenderer::OnDataPipeCreated( // Create video demuxer stream adapter if video is available. if (video_demuxer_stream && video.is_valid() && video_handle.is_valid() && video_rpc_handle != RpcBroker::kInvalidHandle) { - VLOG(2) << "Initialize video"; video_demuxer_stream_adapter_.reset(new DemuxerStreamAdapter( main_task_runner_, media_task_runner_, "video", video_demuxer_stream, rpc_broker_, video_rpc_handle, std::move(video), @@ -360,13 +331,27 @@ void CourierRenderer::OnDataPipeCreated( } state_ = STATE_ACQUIRING; + + // Issues RPC_ACQUIRE_DEMUXER RPC message. + auto rpc = std::make_unique<pb::RpcMessage>(); + rpc->set_handle(RpcBroker::kAcquireDemuxerHandle); + rpc->set_proc(pb::RpcMessage::RPC_ACQUIRE_DEMUXER); + pb::AcquireDemuxer* message = rpc->mutable_acquire_demuxer_rpc(); + message->set_audio_demuxer_handle( + audio_demuxer_stream_adapter_ + ? audio_demuxer_stream_adapter_->rpc_handle() + : RpcBroker::kInvalidHandle); + message->set_video_demuxer_handle( + video_demuxer_stream_adapter_ + ? video_demuxer_stream_adapter_->rpc_handle() + : RpcBroker::kInvalidHandle); + SendRpcToRemote(std::move(rpc)); + // Issues RPC_ACQUIRE_RENDERER RPC message. - std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage()); - rpc->set_handle(RpcBroker::kAcquireHandle); + rpc = std::make_unique<pb::RpcMessage>(); + rpc->set_handle(RpcBroker::kAcquireRendererHandle); rpc->set_proc(pb::RpcMessage::RPC_ACQUIRE_RENDERER); rpc->set_integer_value(rpc_handle_); - VLOG(2) << __func__ << ": Sending RPC_ACQUIRE_RENDERER to " << rpc->handle() - << " with rpc_handle=" << rpc->integer_value(); SendRpcToRemote(std::move(rpc)); } @@ -403,11 +388,9 @@ void CourierRenderer::OnReceivedRpc(std::unique_ptr<pb::RpcMessage> message) { OnBufferingStateChange(std::move(message)); break; case pb::RpcMessage::RPC_RC_ONENDED: - VLOG(2) << __func__ << ": Received RPC_RC_ONENDED."; client_->OnEnded(); break; case pb::RpcMessage::RPC_RC_ONERROR: - VLOG(2) << __func__ << ": Received RPC_RC_ONERROR."; OnFatalError(RECEIVER_PIPELINE_ERROR); break; case pb::RpcMessage::RPC_RC_ONAUDIOCONFIGCHANGE: @@ -426,12 +409,11 @@ void CourierRenderer::OnReceivedRpc(std::unique_ptr<pb::RpcMessage> message) { OnStatisticsUpdate(std::move(message)); break; case pb::RpcMessage::RPC_RC_ONWAITINGFORDECRYPTIONKEY: - VLOG(2) << __func__ << ": Received RPC_RC_ONWAITINGFORDECRYPTIONKEY."; client_->OnWaiting(WaitingReason::kNoDecryptionKey); break; default: - VLOG(1) << "Unknown RPC: " << message->proc(); + DVLOG(1) << "Unknown RPC: " << message->proc(); } } @@ -449,19 +431,15 @@ void CourierRenderer::AcquireRendererDone( DCHECK(message); remote_renderer_handle_ = message->integer_value(); - VLOG(2) << __func__ - << ": Received RPC_ACQUIRE_RENDERER_DONE with remote_renderer_handle=" - << remote_renderer_handle_; if (state_ != STATE_ACQUIRING || init_workflow_done_callback_.is_null()) { - LOG(WARNING) << "Unexpected acquire renderer done RPC."; OnFatalError(PEERS_OUT_OF_SYNC); return; } state_ = STATE_INITIALIZING; // Issues RPC_R_INITIALIZE RPC message to initialize renderer. - std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage()); + auto rpc = std::make_unique<pb::RpcMessage>(); rpc->set_handle(remote_renderer_handle_); rpc->set_proc(pb::RpcMessage::RPC_R_INITIALIZE); pb::RendererInitialize* init = rpc->mutable_renderer_initialize_rpc(); @@ -475,11 +453,6 @@ void CourierRenderer::AcquireRendererDone( ? video_demuxer_stream_adapter_->rpc_handle() : RpcBroker::kInvalidHandle); init->set_callback_handle(rpc_handle_); - VLOG(2) << __func__ << ": Sending RPC_R_INITIALIZE to " << rpc->handle() - << " with client_handle=" << init->client_handle() - << ", audio_demuxer_handle=" << init->audio_demuxer_handle() - << ", video_demuxer_handle=" << init->video_demuxer_handle() - << ", callback_handle=" << init->callback_handle(); SendRpcToRemote(std::move(rpc)); } @@ -489,11 +462,7 @@ void CourierRenderer::InitializeCallback( DCHECK(message); const bool success = message->boolean_value(); - VLOG(2) << __func__ - << ": Received RPC_R_INITIALIZE_CALLBACK with success=" << success; - if (state_ != STATE_INITIALIZING || init_workflow_done_callback_.is_null()) { - LOG(WARNING) << "Unexpected initialize callback RPC."; OnFatalError(PEERS_OUT_OF_SYNC); return; } @@ -511,10 +480,8 @@ void CourierRenderer::InitializeCallback( void CourierRenderer::FlushUntilCallback() { DCHECK(media_task_runner_->BelongsToCurrentThread()); - VLOG(2) << __func__ << ": Received RPC_R_FLUSHUNTIL_CALLBACK"; if (state_ != STATE_FLUSHING || !flush_cb_) { - LOG(WARNING) << "Unexpected flushuntil callback RPC."; OnFatalError(PEERS_OUT_OF_SYNC); return; } @@ -531,9 +498,6 @@ void CourierRenderer::FlushUntilCallback() { void CourierRenderer::SetCdmCallback(std::unique_ptr<pb::RpcMessage> message) { DCHECK(media_task_runner_->BelongsToCurrentThread()); DCHECK(message); - VLOG(2) << __func__ << ": Received RPC_R_SETCDM_CALLBACK with cdm_id=" - << message->renderer_set_cdm_rpc().cdm_id() << ", callback_handle=" - << message->renderer_set_cdm_rpc().callback_handle(); // TODO(erickung): add implementation once Remote CDM implementation is done. NOTIMPLEMENTED(); } @@ -543,7 +507,6 @@ void CourierRenderer::OnTimeUpdate(std::unique_ptr<pb::RpcMessage> message) { DCHECK(message); // Shutdown remoting session if receiving malformed RPC message. if (!message->has_rendererclient_ontimeupdate_rpc()) { - VLOG(1) << __func__ << " missing required RPC message"; OnFatalError(RPC_INVALID); return; } @@ -551,9 +514,6 @@ void CourierRenderer::OnTimeUpdate(std::unique_ptr<pb::RpcMessage> message) { message->rendererclient_ontimeupdate_rpc().time_usec(); const int64_t max_time_usec = message->rendererclient_ontimeupdate_rpc().max_time_usec(); - VLOG(2) << __func__ - << ": Received RPC_RC_ONTIMEUPDATE with time_usec=" << time_usec - << ", max_time_usec=" << max_time_usec; // Ignores invalid time, such as negative value, or time larger than max value // (usually the time stamp that all streams are pushed into AV pipeline). if (time_usec < 0 || max_time_usec < 0 || time_usec > max_time_usec) @@ -575,12 +535,9 @@ void CourierRenderer::OnBufferingStateChange( DCHECK(media_task_runner_->BelongsToCurrentThread()); DCHECK(message); if (!message->has_rendererclient_onbufferingstatechange_rpc()) { - VLOG(1) << __func__ << " missing required RPC message"; OnFatalError(RPC_INVALID); return; } - VLOG(2) << __func__ << ": Received RPC_RC_ONBUFFERINGSTATECHANGE with state=" - << message->rendererclient_onbufferingstatechange_rpc().state(); base::Optional<BufferingState> state = ToMediaBufferingState( message->rendererclient_onbufferingstatechange_rpc().state()); BufferingStateChangeReason reason = BUFFERING_CHANGE_REASON_UNKNOWN; @@ -605,7 +562,6 @@ void CourierRenderer::OnAudioConfigChange( DCHECK(message); // Shutdown remoting session if receiving malformed RPC message. if (!message->has_rendererclient_onaudioconfigchange_rpc()) { - VLOG(1) << __func__ << " missing required RPC message"; OnFatalError(RPC_INVALID); return; } @@ -618,8 +574,6 @@ void CourierRenderer::OnAudioConfigChange( ConvertProtoToAudioDecoderConfig(pb_audio_config, &out_audio_config); DCHECK(out_audio_config.IsValidConfig()); - VLOG(2) << __func__ << ": Received RPC_RC_ONAUDIOCONFIGCHANGE with config:" - << out_audio_config.AsHumanReadableString(); client_->OnAudioConfigChange(out_audio_config); } @@ -629,7 +583,6 @@ void CourierRenderer::OnVideoConfigChange( DCHECK(message); // Shutdown remoting session if receiving malformed RPC message. if (!message->has_rendererclient_onvideoconfigchange_rpc()) { - VLOG(1) << __func__ << " missing required RPC message"; OnFatalError(RPC_INVALID); return; } @@ -642,8 +595,6 @@ void CourierRenderer::OnVideoConfigChange( ConvertProtoToVideoDecoderConfig(pb_video_config, &out_video_config); DCHECK(out_video_config.IsValidConfig()); - VLOG(2) << __func__ << ": Received RPC_RC_ONVIDEOCONFIGCHANGE with config:" - << out_video_config.AsHumanReadableString(); client_->OnVideoConfigChange(out_video_config); } @@ -653,14 +604,11 @@ void CourierRenderer::OnVideoNaturalSizeChange( DCHECK(message); // Shutdown remoting session if receiving malformed RPC message. if (!message->has_rendererclient_onvideonatualsizechange_rpc()) { - VLOG(1) << __func__ << " missing required RPC message"; OnFatalError(RPC_INVALID); return; } const auto& size_change = message->rendererclient_onvideonatualsizechange_rpc(); - VLOG(2) << __func__ << ": Received RPC_RC_ONVIDEONATURALSIZECHANGE with size=" - << size_change.width() << 'x' << size_change.height(); if (size_change.width() <= 0 || size_change.height() <= 0) return; client_->OnVideoNaturalSizeChange( @@ -672,8 +620,6 @@ void CourierRenderer::OnVideoOpacityChange( DCHECK(media_task_runner_->BelongsToCurrentThread()); DCHECK(message); const bool opaque = message->boolean_value(); - VLOG(2) << __func__ - << ": Received RPC_RC_ONVIDEOOPACITYCHANGE with opaque=" << opaque; client_->OnVideoOpacityChange(opaque); } @@ -683,7 +629,6 @@ void CourierRenderer::OnStatisticsUpdate( DCHECK(message); // Shutdown remoting session if receiving malformed RPC message. if (!message->has_rendererclient_onstatisticsupdate_rpc()) { - VLOG(1) << __func__ << " missing required RPC message"; OnFatalError(RPC_INVALID); return; } @@ -691,15 +636,6 @@ void CourierRenderer::OnStatisticsUpdate( ConvertProtoToPipelineStatistics( message->rendererclient_onstatisticsupdate_rpc(), &stats); // Note: Each field in |stats| is a delta, not the aggregate amount. - VLOG(2) << __func__ - << ": Received RPC_RC_ONSTATISTICSUPDATE with audio_bytes_decoded=" - << stats.audio_bytes_decoded - << ", video_bytes_decoded=" << stats.video_bytes_decoded - << ", video_frames_decoded=" << stats.video_frames_decoded - << ", video_frames_dropped=" << stats.video_frames_dropped - << ", audio_memory_usage=" << stats.audio_memory_usage - << ", video_memory_usage=" << stats.video_memory_usage; - if (stats.audio_bytes_decoded > 0 || stats.video_frames_decoded > 0 || stats.video_frames_dropped > 0) { metrics_recorder_.OnEvidenceOfPlayoutAtReceiver(); @@ -712,8 +648,6 @@ void CourierRenderer::OnFatalError(StopTrigger stop_trigger) { DCHECK(media_task_runner_->BelongsToCurrentThread()); DCHECK_NE(UNKNOWN_STOP_TRIGGER, stop_trigger); - VLOG(2) << __func__ << " with StopTrigger " << stop_trigger; - // If this is the first error, notify the controller. It is expected the // controller will cause this renderer to shut down shortly. if (state_ != STATE_ERROR) { @@ -761,9 +695,6 @@ void CourierRenderer::OnMediaTimeUpdated() { playback_rate_; if ((media_duration - update_duration).magnitude() >= kMediaPlaybackDelayThreshold) { - VLOG(1) << "Irregular playback detected: Media playback delayed." - << " media_duration = " << media_duration - << " update_duration = " << update_duration; ++times_playback_delayed_; if (times_playback_delayed_ == kPlaybackDelayCountThreshold) OnFatalError(PACING_TOO_SLOWLY); @@ -807,9 +738,6 @@ void CourierRenderer::UpdateVideoStatsQueue(int video_frames_decoded, if (sum_video_frames_decoded_ && sum_video_frames_dropped_ * 100 > sum_video_frames_decoded_ * kMaxNumVideoFramesDroppedPercentage) { - VLOG(1) << "Irregular playback detected: Too many video frames dropped." - << " video_frames_decoded= " << sum_video_frames_decoded_ - << " video_frames_dropped= " << sum_video_frames_dropped_; OnFatalError(FRAME_DROP_RATE_HIGH); } // Prune |video_stats_queue_|. diff --git a/chromium/media/remoting/courier_renderer.h b/chromium/media/remoting/courier_renderer.h index 75111e4174c..38e5e338a44 100644 --- a/chromium/media/remoting/courier_renderer.h +++ b/chromium/media/remoting/courier_renderer.h @@ -5,8 +5,6 @@ #ifndef MEDIA_REMOTING_COURIER_RENDERER_H_ #define MEDIA_REMOTING_COURIER_RENDERER_H_ -#include <stdint.h> - #include <memory> #include "base/callback.h" @@ -20,6 +18,7 @@ #include "media/base/pipeline_status.h" #include "media/base/renderer.h" #include "media/mojo/mojom/remoting.mojom.h" +#include "media/remoting/media_remoting_rpc.pb.h" #include "media/remoting/metrics.h" #include "media/remoting/rpc_broker.h" #include "mojo/public/cpp/bindings/pending_remote.h" @@ -74,7 +73,6 @@ class CourierRenderer : public Renderer { void Initialize(MediaResource* media_resource, RendererClient* client, PipelineStatusCallback init_cb) final; - void SetCdm(CdmContext* cdm_context, CdmAttachedCB cdm_attached_cb) final; void SetLatencyHint(base::Optional<base::TimeDelta> latency_hint) final; void Flush(base::OnceClosure flush_cb) final; void StartPlayingFrom(base::TimeDelta time) final; diff --git a/chromium/media/remoting/courier_renderer_unittest.cc b/chromium/media/remoting/courier_renderer_unittest.cc index a51f3dc86ef..4d13c140856 100644 --- a/chromium/media/remoting/courier_renderer_unittest.cc +++ b/chromium/media/remoting/courier_renderer_unittest.cc @@ -7,6 +7,7 @@ #include <memory> #include "base/bind.h" +#include "base/check.h" #include "base/run_loop.h" #include "base/test/simple_test_tick_clock.h" #include "base/test/task_environment.h" @@ -20,6 +21,7 @@ #include "media/remoting/proto_enum_utils.h" #include "media/remoting/proto_utils.h" #include "media/remoting/renderer_controller.h" +#include "media/remoting/rpc_broker.h" #include "testing/gmock/include/gmock/gmock.h" #include "testing/gtest/include/gtest/gtest.h" @@ -146,16 +148,7 @@ class RendererClientImpl final : public RendererClient { class CourierRendererTest : public testing::Test { public: - CourierRendererTest() - : receiver_renderer_handle_(10), - receiver_audio_demuxer_callback_handle_(11), - receiver_video_demuxer_callback_handle_(12), - sender_client_handle_(RpcBroker::kInvalidHandle), - sender_renderer_callback_handle_(RpcBroker::kInvalidHandle), - sender_audio_demuxer_handle_(RpcBroker::kInvalidHandle), - sender_video_demuxer_handle_(RpcBroker::kInvalidHandle), - received_audio_ds_init_cb_(false), - received_video_ds_init_cb_(false) {} + CourierRendererTest() = default; ~CourierRendererTest() override = default; // Use this function to mimic receiver to handle RPC message for renderer @@ -165,40 +158,87 @@ class CourierRendererTest : public testing::Test { ASSERT_TRUE(rpc->ParseFromArray(message->data(), message->size())); switch (rpc->proc()) { case pb::RpcMessage::RPC_ACQUIRE_RENDERER: { + DCHECK(rpc->has_integer_value()); + sender_renderer_handle_ = rpc->integer_value(); // Issues RPC_ACQUIRE_RENDERER_DONE RPC message. - std::unique_ptr<pb::RpcMessage> acquire_done(new pb::RpcMessage()); - acquire_done->set_handle(rpc->integer_value()); + auto acquire_done = std::make_unique<pb::RpcMessage>(); + acquire_done->set_handle(sender_renderer_handle_); acquire_done->set_proc(pb::RpcMessage::RPC_ACQUIRE_RENDERER_DONE); acquire_done->set_integer_value(receiver_renderer_handle_); controller_->GetRpcBroker()->ProcessMessageFromRemote( std::move(acquire_done)); } break; + case pb::RpcMessage::RPC_ACQUIRE_DEMUXER: { + if (!is_backward_compatible_mode_) { + int acquire_demuxer_handle = RpcBroker::kAcquireDemuxerHandle; + EXPECT_EQ(rpc->handle(), acquire_demuxer_handle); + sender_audio_demuxer_handle_ = + rpc->acquire_demuxer_rpc().audio_demuxer_handle(); + sender_video_demuxer_handle_ = + rpc->acquire_demuxer_rpc().video_demuxer_handle(); + + // Issues audio RPC_DS_INITIALIZE RPC message. + if (sender_audio_demuxer_handle_ != RpcBroker::kInvalidHandle) { + auto ds_init = std::make_unique<pb::RpcMessage>(); + ds_init->set_handle(sender_audio_demuxer_handle_); + ds_init->set_proc(pb::RpcMessage::RPC_DS_INITIALIZE); + ds_init->set_integer_value(receiver_audio_demuxer_callback_handle_); + controller_->GetRpcBroker()->ProcessMessageFromRemote( + std::move(ds_init)); + } + + // Issues video RPC_DS_INITIALIZE RPC message. + if (sender_video_demuxer_handle_ != RpcBroker::kInvalidHandle) { + auto ds_init = std::make_unique<pb::RpcMessage>(); + ds_init->set_handle(sender_video_demuxer_handle_); + ds_init->set_proc(pb::RpcMessage::RPC_DS_INITIALIZE); + ds_init->set_integer_value(receiver_video_demuxer_callback_handle_); + controller_->GetRpcBroker()->ProcessMessageFromRemote( + std::move(ds_init)); + } + } + } break; case pb::RpcMessage::RPC_R_INITIALIZE: { - EXPECT_EQ(rpc->handle(), receiver_renderer_handle_); sender_renderer_callback_handle_ = rpc->renderer_initialize_rpc().callback_handle(); sender_client_handle_ = rpc->renderer_initialize_rpc().client_handle(); - sender_audio_demuxer_handle_ = - rpc->renderer_initialize_rpc().audio_demuxer_handle(); - sender_video_demuxer_handle_ = - rpc->renderer_initialize_rpc().video_demuxer_handle(); - - // Issues audio RPC_DS_INITIALIZE RPC message. - if (sender_audio_demuxer_handle_ != RpcBroker::kInvalidHandle) { - std::unique_ptr<pb::RpcMessage> ds_init(new pb::RpcMessage()); - ds_init->set_handle(sender_audio_demuxer_handle_); - ds_init->set_proc(pb::RpcMessage::RPC_DS_INITIALIZE); - ds_init->set_integer_value(receiver_audio_demuxer_callback_handle_); - controller_->GetRpcBroker()->ProcessMessageFromRemote( - std::move(ds_init)); - } - if (sender_video_demuxer_handle_ != RpcBroker::kInvalidHandle) { - std::unique_ptr<pb::RpcMessage> ds_init(new pb::RpcMessage()); - ds_init->set_handle(sender_video_demuxer_handle_); - ds_init->set_proc(pb::RpcMessage::RPC_DS_INITIALIZE); - ds_init->set_integer_value(receiver_video_demuxer_callback_handle_); + + if (is_backward_compatible_mode_) { + EXPECT_EQ(rpc->handle(), receiver_renderer_handle_); + + sender_audio_demuxer_handle_ = + rpc->renderer_initialize_rpc().audio_demuxer_handle(); + sender_video_demuxer_handle_ = + rpc->renderer_initialize_rpc().video_demuxer_handle(); + + // Issues audio RPC_DS_INITIALIZE RPC message. + if (sender_audio_demuxer_handle_ != RpcBroker::kInvalidHandle) { + auto ds_init = std::make_unique<pb::RpcMessage>(); + ds_init->set_handle(sender_audio_demuxer_handle_); + ds_init->set_proc(pb::RpcMessage::RPC_DS_INITIALIZE); + ds_init->set_integer_value(receiver_audio_demuxer_callback_handle_); + controller_->GetRpcBroker()->ProcessMessageFromRemote( + std::move(ds_init)); + } + + // Issues video RPC_DS_INITIALIZE RPC message. + if (sender_video_demuxer_handle_ != RpcBroker::kInvalidHandle) { + auto ds_init = std::make_unique<pb::RpcMessage>(); + ds_init->set_handle(sender_video_demuxer_handle_); + ds_init->set_proc(pb::RpcMessage::RPC_DS_INITIALIZE); + ds_init->set_integer_value(receiver_video_demuxer_callback_handle_); + controller_->GetRpcBroker()->ProcessMessageFromRemote( + std::move(ds_init)); + } + } else { + // Issues RPC_R_INITIALIZE_CALLBACK RPC message when receiving + // RPC_R_INITIALIZE. + auto init_cb = std::make_unique<pb::RpcMessage>(); + init_cb->set_handle(sender_renderer_callback_handle_); + init_cb->set_proc(pb::RpcMessage::RPC_R_INITIALIZE_CALLBACK); + init_cb->set_boolean_value(is_successfully_initialized_); controller_->GetRpcBroker()->ProcessMessageFromRemote( - std::move(ds_init)); + std::move(init_cb)); } } break; case pb::RpcMessage::RPC_DS_INITIALIZE_CALLBACK: { @@ -207,20 +247,24 @@ class CourierRendererTest : public testing::Test { if (rpc->handle() == receiver_video_demuxer_callback_handle_) received_video_ds_init_cb_ = true; - // Issues RPC_R_INITIALIZE_CALLBACK RPC message when receiving - // RPC_DS_INITIALIZE_CALLBACK on available streams. + // Check whether the demuxer at the receiver end is initialized. if (received_audio_ds_init_cb_ == (sender_audio_demuxer_handle_ != RpcBroker::kInvalidHandle) && received_video_ds_init_cb_ == (sender_video_demuxer_handle_ != RpcBroker::kInvalidHandle)) { - std::unique_ptr<pb::RpcMessage> init_cb(new pb::RpcMessage()); + is_receiver_demuxer_initialized_ = true; + } + + if (is_backward_compatible_mode_ && is_receiver_demuxer_initialized_) { + // Issues RPC_R_INITIALIZE_CALLBACK RPC message when receiving + // RPC_DS_INITIALIZE_CALLBACK on available streams. + auto init_cb = std::make_unique<pb::RpcMessage>(); init_cb->set_handle(sender_renderer_callback_handle_); init_cb->set_proc(pb::RpcMessage::RPC_R_INITIALIZE_CALLBACK); init_cb->set_boolean_value(is_successfully_initialized_); controller_->GetRpcBroker()->ProcessMessageFromRemote( std::move(init_cb)); } - } break; case pb::RpcMessage::RPC_R_FLUSHUNTIL: { // Issues RPC_R_FLUSHUNTIL_CALLBACK RPC message. @@ -269,8 +313,16 @@ class CourierRendererTest : public testing::Test { RunPendingTasks(); } + void InitializeRendererBackwardsCompatible() { + is_backward_compatible_mode_ = true; + InitializeRenderer(); + } + bool IsRendererInitialized() const { - return renderer_->state_ == CourierRenderer::STATE_PLAYING; + EXPECT_TRUE(received_audio_ds_init_cb_); + EXPECT_TRUE(received_video_ds_init_cb_); + return renderer_->state_ == CourierRenderer::STATE_PLAYING && + is_receiver_demuxer_initialized_; } bool DidEncounterFatalError() const { @@ -402,17 +454,24 @@ class CourierRendererTest : public testing::Test { base::SimpleTestTickClock clock_; // RPC handles. - const int receiver_renderer_handle_; - const int receiver_audio_demuxer_callback_handle_; - const int receiver_video_demuxer_callback_handle_; - int sender_client_handle_; - int sender_renderer_callback_handle_; - int sender_audio_demuxer_handle_; - int sender_video_demuxer_handle_; + const int receiver_renderer_handle_{10}; + const int receiver_audio_demuxer_callback_handle_{11}; + const int receiver_video_demuxer_callback_handle_{12}; + int sender_renderer_handle_; + int sender_client_handle_{RpcBroker::kInvalidHandle}; + int sender_renderer_callback_handle_{RpcBroker::kInvalidHandle}; + int sender_audio_demuxer_handle_{RpcBroker::kInvalidHandle}; + int sender_video_demuxer_handle_{RpcBroker::kInvalidHandle}; + + // Indicates whether the test runs in backward-compatible mode. + bool is_backward_compatible_mode_ = false; + + // Indicates whether the demuxer at receiver is initialized or not. + bool is_receiver_demuxer_initialized_ = false; // Indicate whether RPC_DS_INITIALIZE_CALLBACK RPC messages are received. - bool received_audio_ds_init_cb_; - bool received_video_ds_init_cb_; + bool received_audio_ds_init_cb_ = false; + bool received_video_ds_init_cb_ = false; // Indicates whether the test wants to simulate successful initialization in // the renderer on the receiver side. @@ -433,6 +492,14 @@ TEST_F(CourierRendererTest, Initialize) { ASSERT_EQ(render_client_->status(), PIPELINE_OK); } +TEST_F(CourierRendererTest, InitializeBackwardCompatible) { + InitializeRendererBackwardsCompatible(); + RunPendingTasks(); + + ASSERT_TRUE(IsRendererInitialized()); + ASSERT_EQ(render_client_->status(), PIPELINE_OK); +} + TEST_F(CourierRendererTest, InitializeFailed) { is_successfully_initialized_ = false; InitializeRenderer(); diff --git a/chromium/media/remoting/demuxer_stream_adapter.cc b/chromium/media/remoting/demuxer_stream_adapter.cc index 2efbf3873e1..63b7b9d201d 100644 --- a/chromium/media/remoting/demuxer_stream_adapter.cc +++ b/chromium/media/remoting/demuxer_stream_adapter.cc @@ -128,7 +128,9 @@ void DemuxerStreamAdapter::OnReceivedRpc( case pb::RpcMessage::RPC_DS_ENABLEBITSTREAMCONVERTER: EnableBitstreamConverter(); break; - + case pb::RpcMessage::RPC_DS_ONERROR: + OnFatalError(UNEXPECTED_FAILURE); + break; default: DEMUXER_VLOG(1) << "Unknown RPC: " << message->proc(); } diff --git a/chromium/media/remoting/end2end_test_renderer.cc b/chromium/media/remoting/end2end_test_renderer.cc index f08c848d0c4..932e0ab2fdc 100644 --- a/chromium/media/remoting/end2end_test_renderer.cc +++ b/chromium/media/remoting/end2end_test_renderer.cc @@ -4,23 +4,29 @@ #include "media/remoting/end2end_test_renderer.h" -#include <memory> - #include "base/bind.h" #include "base/bind_helpers.h" #include "base/callback.h" +#include "base/check.h" +#include "base/notreached.h" #include "base/threading/thread_task_runner_handle.h" +#include "media/base/demuxer_stream.h" #include "media/mojo/common/mojo_data_pipe_read_write.h" +#include "media/mojo/common/mojo_decoder_buffer_converter.h" #include "media/mojo/mojom/remoting.mojom.h" #include "media/remoting/courier_renderer.h" #include "media/remoting/proto_utils.h" #include "media/remoting/receiver.h" +#include "media/remoting/receiver_controller.h" #include "media/remoting/renderer_controller.h" +#include "media/remoting/stream_provider.h" +#include "media/remoting/test_utils.h" #include "mojo/public/cpp/bindings/pending_receiver.h" #include "mojo/public/cpp/bindings/pending_remote.h" #include "mojo/public/cpp/bindings/receiver.h" #include "mojo/public/cpp/bindings/remote.h" #include "mojo/public/cpp/bindings/self_owned_receiver.h" +#include "mojo/public/cpp/system/data_pipe.h" namespace media { namespace remoting { @@ -30,7 +36,8 @@ namespace { class TestStreamSender final : public mojom::RemotingDataStreamSender { public: using SendFrameToSinkCallback = - base::RepeatingCallback<void(const std::vector<uint8_t>& data, + base::RepeatingCallback<void(uint32_t frame_count, + const std::vector<uint8_t>& data, DemuxerStream::Type type)>; TestStreamSender( mojo::PendingReceiver<mojom::RemotingDataStreamSender> receiver, @@ -49,19 +56,21 @@ class TestStreamSender final : public mojom::RemotingDataStreamSender { next_frame_data_.resize(frame_size); data_pipe_reader_.Read( next_frame_data_.data(), frame_size, - base::BindOnce(&TestStreamSender::OnFrameRead, base::Unretained(this))); + base::BindOnce(&TestStreamSender::OnFrameRead, base::Unretained(this), + frame_count_++)); } void CancelInFlightData() override { next_frame_data_.resize(0); } private: - void OnFrameRead(bool success) { + void OnFrameRead(uint32_t count, bool success) { DCHECK(success); if (send_frame_to_sink_cb_) - send_frame_to_sink_cb_.Run(next_frame_data_, type_); + send_frame_to_sink_cb_.Run(count, next_frame_data_, type_); next_frame_data_.resize(0); } + uint32_t frame_count_ = 0; mojo::Receiver<RemotingDataStreamSender> receiver_; MojoDataPipeReader data_pipe_reader_; const DemuxerStream::Type type_; @@ -153,33 +162,201 @@ std::unique_ptr<RendererController> CreateController( } // namespace +class End2EndTestRenderer::TestRemotee : public mojom::Remotee { + public: + explicit TestRemotee(RendererController* controller) + : controller_(controller) {} + + ~TestRemotee() override = default; + + void OnAudioFrame(uint32_t frame_count, + scoped_refptr<DecoderBuffer> decoder_buffer) { + ::media::mojom::DecoderBufferPtr mojo_buffer = + audio_buffer_writer_->WriteDecoderBuffer(std::move(decoder_buffer)); + audio_stream_->ReceiveFrame(frame_count, std::move(mojo_buffer)); + } + + void OnVideoFrame(uint32_t frame_count, + scoped_refptr<DecoderBuffer> decoder_buffer) { + ::media::mojom::DecoderBufferPtr mojo_buffer = + video_buffer_writer_->WriteDecoderBuffer(std::move(decoder_buffer)); + video_stream_->ReceiveFrame(frame_count, std::move(mojo_buffer)); + } + + void BindMojoReceiver(mojo::PendingReceiver<mojom::Remotee> receiver) { + mojo_receiver_.Bind(std::move(receiver)); + } + + void OnMessage(const std::vector<uint8_t>& message) { + receiver_controller_->OnMessageFromSource(message); + } + + // mojom::Remotee implementation + void OnRemotingSinkReady( + mojo::PendingRemote<::media::mojom::RemotingSink> sink) override { + receiver_controller_.Bind(std::move(sink)); + } + + void SendMessageToSource(const std::vector<uint8_t>& message) override { + controller_->OnMessageFromSink(message); + } + + void StartDataStreams( + mojo::PendingRemote<::media::mojom::RemotingDataStreamReceiver> + audio_stream, + mojo::PendingRemote<::media::mojom::RemotingDataStreamReceiver> + video_stream) override { + if (audio_stream.is_valid()) { + // initialize data pipe for audio data stream receiver + mojo::ScopedDataPipeConsumerHandle audio_data_pipe; + audio_stream_.Bind(std::move(audio_stream)); + audio_buffer_writer_ = ::media::MojoDecoderBufferWriter::Create( + GetDefaultDecoderBufferConverterCapacity( + ::media::DemuxerStream::AUDIO), + &audio_data_pipe); + audio_stream_->InitializeDataPipe(std::move(audio_data_pipe)); + } + + if (video_stream.is_valid()) { + // initialize data pipe for video data stream receiver + mojo::ScopedDataPipeConsumerHandle video_data_pipe; + video_stream_.Bind(std::move(video_stream)); + video_buffer_writer_ = ::media::MojoDecoderBufferWriter::Create( + GetDefaultDecoderBufferConverterCapacity( + ::media::DemuxerStream::VIDEO), + &video_data_pipe); + video_stream_->InitializeDataPipe(std::move(video_data_pipe)); + } + } + + void OnFlushUntil(uint32_t audio_frame_count, + uint32_t video_frame_count) override {} + + void OnVideoNaturalSizeChange(const gfx::Size& size) override {} + + private: + RendererController* controller_; + + std::unique_ptr<MojoDecoderBufferWriter> audio_buffer_writer_; + std::unique_ptr<MojoDecoderBufferWriter> video_buffer_writer_; + + mojo::Remote<mojom::RemotingDataStreamReceiver> audio_stream_; + mojo::Remote<mojom::RemotingDataStreamReceiver> video_stream_; + + mojo::Remote<mojom::RemotingSink> receiver_controller_; + mojo::Receiver<mojom::Remotee> mojo_receiver_{this}; +}; + End2EndTestRenderer::End2EndTestRenderer(std::unique_ptr<Renderer> renderer) - : receiver_rpc_broker_( - base::BindRepeating(&End2EndTestRenderer::OnMessageFromSink, - base::Unretained(this))), - receiver_(new Receiver(std::move(renderer), &receiver_rpc_broker_)) { + : courier_renderer_initialized_(false), receiver_initialized_(false) { + // create sender components controller_ = CreateController( base::BindRepeating(&End2EndTestRenderer::SendMessageToSink, weak_factory_.GetWeakPtr()), base::BindRepeating(&End2EndTestRenderer::SendFrameToSink, weak_factory_.GetWeakPtr())); - courier_renderer_.reset(new CourierRenderer( - base::ThreadTaskRunnerHandle::Get(), controller_->GetWeakPtr(), nullptr)); + courier_renderer_ = std::make_unique<CourierRenderer>( + base::ThreadTaskRunnerHandle::Get(), controller_->GetWeakPtr(), nullptr); + + // create receiver components + media_remotee_ = std::make_unique<TestRemotee>(controller_.get()); + + receiver_controller_ = ReceiverController::GetInstance(); + ResetForTesting(receiver_controller_); + + receiver_rpc_broker_ = receiver_controller_->rpc_broker(); + receiver_renderer_handle_ = receiver_rpc_broker_->GetUniqueHandle(); + + receiver_rpc_broker_->RegisterMessageReceiverCallback( + RpcBroker::kAcquireRendererHandle, + base::BindRepeating(&End2EndTestRenderer::OnReceivedRpc, + weak_factory_.GetWeakPtr())); + + receiver_ = std::make_unique<Receiver>( + receiver_renderer_handle_, sender_renderer_handle_, receiver_controller_, + base::ThreadTaskRunnerHandle::Get(), std::move(renderer), + base::BindOnce(&End2EndTestRenderer::OnAcquireRendererDone, + weak_factory_.GetWeakPtr())); + + mojo::PendingRemote<media::mojom::Remotee> remotee; + media_remotee_->BindMojoReceiver(remotee.InitWithNewPipeAndPassReceiver()); + receiver_controller_->Initialize(std::move(remotee)); + stream_provider_ = std::make_unique<StreamProvider>( + receiver_controller_, base::ThreadTaskRunnerHandle::Get()); } -End2EndTestRenderer::~End2EndTestRenderer() = default; +End2EndTestRenderer::~End2EndTestRenderer() { + receiver_rpc_broker_->UnregisterMessageReceiverCallback( + RpcBroker::kAcquireRendererHandle); +} void End2EndTestRenderer::Initialize(MediaResource* media_resource, RendererClient* client, PipelineStatusCallback init_cb) { - courier_renderer_->Initialize(media_resource, client, std::move(init_cb)); + init_cb_ = std::move(init_cb); + + stream_provider_->Initialize( + nullptr, base::BindOnce(&End2EndTestRenderer::InitializeReceiverRenderer, + weak_factory_.GetWeakPtr())); + + courier_renderer_->Initialize( + media_resource, client, + base::BindOnce(&End2EndTestRenderer::OnCourierRendererInitialized, + weak_factory_.GetWeakPtr())); +} + +void End2EndTestRenderer::InitializeReceiverRenderer(PipelineStatus status) { + DCHECK_EQ(PIPELINE_OK, status); + receiver_->Initialize( + stream_provider_.get(), nullptr, + base::BindOnce(&End2EndTestRenderer::OnReceiverInitalized, + weak_factory_.GetWeakPtr())); } -void End2EndTestRenderer::SetCdm(CdmContext* cdm_context, - CdmAttachedCB cdc_attached_cb) { - // TODO(xjz): Add the implementation when media remoting starts supporting - // encrypted contents. - NOTIMPLEMENTED() << "Media Remoting doesn't support EME for now."; +void End2EndTestRenderer::OnCourierRendererInitialized(PipelineStatus status) { + DCHECK_EQ(PIPELINE_OK, status); + courier_renderer_initialized_ = true; + CompleteInitialize(); +} + +void End2EndTestRenderer::OnReceiverInitalized(PipelineStatus status) { + DCHECK_EQ(PIPELINE_OK, status); + receiver_initialized_ = true; + CompleteInitialize(); +} +void End2EndTestRenderer::CompleteInitialize() { + if (!courier_renderer_initialized_ || !receiver_initialized_) + return; + + DCHECK(init_cb_); + std::move(init_cb_).Run(PIPELINE_OK); +} + +void End2EndTestRenderer::OnReceivedRpc( + std::unique_ptr<media::remoting::pb::RpcMessage> message) { + DCHECK(message); + DCHECK_EQ(message->proc(), + media::remoting::pb::RpcMessage::RPC_ACQUIRE_RENDERER); + OnAcquireRenderer(std::move(message)); +} + +void End2EndTestRenderer::OnAcquireRenderer( + std::unique_ptr<media::remoting::pb::RpcMessage> message) { + DCHECK(message->has_integer_value()); + DCHECK(message->integer_value() != RpcBroker::kInvalidHandle); + + if (sender_renderer_handle_ == RpcBroker::kInvalidHandle) { + sender_renderer_handle_ = message->integer_value(); + receiver_->SetRemoteHandle(sender_renderer_handle_); + } +} + +void End2EndTestRenderer::OnAcquireRendererDone(int receiver_renderer_handle) { + auto rpc = std::make_unique<pb::RpcMessage>(); + rpc->set_handle(sender_renderer_handle_); + rpc->set_proc(pb::RpcMessage::RPC_ACQUIRE_RENDERER_DONE); + rpc->set_integer_value(receiver_renderer_handle); + receiver_rpc_broker_->SendMessageToRemote(std::move(rpc)); } void End2EndTestRenderer::SetLatencyHint( @@ -187,6 +364,10 @@ void End2EndTestRenderer::SetLatencyHint( courier_renderer_->SetLatencyHint(latency_hint); } +void End2EndTestRenderer::SetPreservesPitch(bool preserves_pitch) { + courier_renderer_->SetPreservesPitch(preserves_pitch); +} + void End2EndTestRenderer::Flush(base::OnceClosure flush_cb) { courier_renderer_->Flush(std::move(flush_cb)); } @@ -209,19 +390,21 @@ base::TimeDelta End2EndTestRenderer::GetMediaTime() { void End2EndTestRenderer::SendMessageToSink( const std::vector<uint8_t>& message) { - std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage()); - if (!rpc->ParseFromArray(message.data(), message.size())) { - VLOG(1) << __func__ << ": Received corrupted Rpc message."; - return; - } - receiver_rpc_broker_.ProcessMessageFromRemote(std::move(rpc)); + media_remotee_->OnMessage(message); } -void End2EndTestRenderer::SendFrameToSink(const std::vector<uint8_t>& frame, +void End2EndTestRenderer::SendFrameToSink(uint32_t frame_count, + const std::vector<uint8_t>& frame, DemuxerStream::Type type) { scoped_refptr<DecoderBuffer> decoder_buffer = ByteArrayToDecoderBuffer(frame.data(), frame.size()); - receiver_->OnReceivedBuffer(type, decoder_buffer); + if (type == DemuxerStream::Type::AUDIO) { + media_remotee_->OnAudioFrame(frame_count, decoder_buffer); + } else if (type == DemuxerStream::Type::VIDEO) { + media_remotee_->OnVideoFrame(frame_count, decoder_buffer); + } else { + NOTREACHED(); + } } void End2EndTestRenderer::OnMessageFromSink( diff --git a/chromium/media/remoting/end2end_test_renderer.h b/chromium/media/remoting/end2end_test_renderer.h index 1e8e635f878..905e27f5776 100644 --- a/chromium/media/remoting/end2end_test_renderer.h +++ b/chromium/media/remoting/end2end_test_renderer.h @@ -5,12 +5,14 @@ #ifndef MEDIA_REMOTING_END2END_RENDERER_H_ #define MEDIA_REMOTING_END2END_RENDERER_H_ +#include <memory> #include <vector> #include "base/memory/weak_ptr.h" #include "media/base/demuxer_stream.h" #include "media/base/renderer.h" #include "media/remoting/rpc_broker.h" +#include "media/remoting/stream_provider.h" namespace media { namespace remoting { @@ -18,6 +20,7 @@ namespace remoting { class RendererController; class CourierRenderer; class Receiver; +class ReceiverController; // Simulates the media remoting pipeline. class End2EndTestRenderer final : public Renderer { @@ -29,8 +32,8 @@ class End2EndTestRenderer final : public Renderer { void Initialize(MediaResource* media_resource, RendererClient* client, PipelineStatusCallback init_cb) override; - void SetCdm(CdmContext* cdm_context, CdmAttachedCB cdm_attached_cb) override; void SetLatencyHint(base::Optional<base::TimeDelta> latency_hint) override; + void SetPreservesPitch(bool preserves_pitch) override; void Flush(base::OnceClosure flush_cb) override; void StartPlayingFrom(base::TimeDelta time) override; void SetPlaybackRate(double playback_rate) override; @@ -46,28 +49,55 @@ class End2EndTestRenderer final : public Renderer { base::OnceClosure change_completed_cb) override; private: + class TestRemotee; + + void InitTestApi(); + // Called to send RPC messages to |receiver_|. void SendMessageToSink(const std::vector<uint8_t>& message); // Called to send frame data to |receiver_|. - void SendFrameToSink(const std::vector<uint8_t>& data, + void SendFrameToSink(uint32_t frame_count, + const std::vector<uint8_t>& data, DemuxerStream::Type type); // Called when receives RPC messages from |receiver_|. void OnMessageFromSink(std::unique_ptr<std::vector<uint8_t>> message); + void InitializeReceiverRenderer(PipelineStatus status); + void OnCourierRendererInitialized(PipelineStatus status); + void OnReceiverInitalized(PipelineStatus status); + void CompleteInitialize(); + + // Callback function when RPC message is received. + void OnReceivedRpc(std::unique_ptr<media::remoting::pb::RpcMessage> message); + void OnAcquireRenderer( + std::unique_ptr<media::remoting::pb::RpcMessage> message); + void OnAcquireRendererDone(int receiver_renderer_handle); + + PipelineStatusCallback init_cb_; + + bool courier_renderer_initialized_; + bool receiver_initialized_; + + // Sender components. std::unique_ptr<RendererController> controller_; std::unique_ptr<CourierRenderer> courier_renderer_; - // The RpcBroker to handle the RPC messages to/from |receiver_|. - RpcBroker receiver_rpc_broker_; - - // A receiver that renders media streams. + // Receiver components. + std::unique_ptr<TestRemotee> media_remotee_; + ReceiverController* receiver_controller_; std::unique_ptr<Receiver> receiver_; + std::unique_ptr<StreamProvider> stream_provider_; + RpcBroker* receiver_rpc_broker_; - base::WeakPtrFactory<End2EndTestRenderer> weak_factory_{this}; + // Handle of |receiver_| + int receiver_renderer_handle_ = RpcBroker::kInvalidHandle; + // Handle of |courier_renderer_|, it would be sent with AcquireRenderer + // message. + int sender_renderer_handle_ = RpcBroker::kInvalidHandle; - DISALLOW_COPY_AND_ASSIGN(End2EndTestRenderer); + base::WeakPtrFactory<End2EndTestRenderer> weak_factory_{this}; }; } // namespace remoting diff --git a/chromium/media/remoting/fake_media_resource.cc b/chromium/media/remoting/fake_media_resource.cc index e77cf87e10c..710b84a8931 100644 --- a/chromium/media/remoting/fake_media_resource.cc +++ b/chromium/media/remoting/fake_media_resource.cc @@ -101,13 +101,15 @@ void FakeDemuxerStream::CreateFakeFrame(size_t size, } FakeMediaResource::FakeMediaResource() - : demuxer_stream_(new FakeDemuxerStream(true)) {} + : audio_stream_(new FakeDemuxerStream(true)), + video_stream_(new FakeDemuxerStream(false)) {} FakeMediaResource::~FakeMediaResource() = default; std::vector<DemuxerStream*> FakeMediaResource::GetAllStreams() { std::vector<DemuxerStream*> streams; - streams.push_back(demuxer_stream_.get()); + streams.push_back(audio_stream_.get()); + streams.push_back(video_stream_.get()); return streams; } diff --git a/chromium/media/remoting/fake_media_resource.h b/chromium/media/remoting/fake_media_resource.h index 93b53180a05..f6391f8a033 100644 --- a/chromium/media/remoting/fake_media_resource.h +++ b/chromium/media/remoting/fake_media_resource.h @@ -5,6 +5,8 @@ #ifndef MEDIA_REMOTING_FAKE_MEDIA_RESOURCE_H_ #define MEDIA_REMOTING_FAKE_MEDIA_RESOURCE_H_ +#include <memory> + #include "base/containers/circular_deque.h" #include "media/base/audio_decoder_config.h" #include "media/base/demuxer_stream.h" @@ -54,7 +56,8 @@ class FakeMediaResource : public MediaResource { std::vector<DemuxerStream*> GetAllStreams() override; private: - std::unique_ptr<FakeDemuxerStream> demuxer_stream_; + std::unique_ptr<FakeDemuxerStream> audio_stream_; + std::unique_ptr<FakeDemuxerStream> video_stream_; DISALLOW_COPY_AND_ASSIGN(FakeMediaResource); }; diff --git a/chromium/media/remoting/integration_test.cc b/chromium/media/remoting/integration_test.cc index bc0aa888909..bbac6f6a8a8 100644 --- a/chromium/media/remoting/integration_test.cc +++ b/chromium/media/remoting/integration_test.cc @@ -72,8 +72,7 @@ TEST_F(MediaRemotingIntegrationTest, MediaSource_ConfigChange_WebM) { Stop(); } -// Flaky: http://crbug.com/1043812. -TEST_F(MediaRemotingIntegrationTest, DISABLED_SeekWhilePlaying) { +TEST_F(MediaRemotingIntegrationTest, SeekWhilePlaying) { ASSERT_EQ(PIPELINE_OK, Start("bear-320x240.webm")); base::TimeDelta duration(pipeline_->GetMediaDuration()); diff --git a/chromium/media/remoting/media_remoting_rpc.proto b/chromium/media/remoting/media_remoting_rpc.proto index bea8599a538..a7b584f07ac 100644 --- a/chromium/media/remoting/media_remoting_rpc.proto +++ b/chromium/media/remoting/media_remoting_rpc.proto @@ -337,6 +337,11 @@ enum CdmSessionType { kPersistentUsageRecord = 2; }; +message AcquireDemuxer { + optional int32 audio_demuxer_handle = 1; + optional int32 video_demuxer_handle = 2; +} + message RendererInitialize { optional int32 client_handle = 1; optional int32 audio_demuxer_handle = 2; @@ -495,6 +500,7 @@ message RpcMessage { RPC_ACQUIRE_RENDERER_DONE = 2; RPC_ACQUIRE_CDM = 3; RPC_ACQUIRE_CDM_DONE = 4; + RPC_ACQUIRE_DEMUXER = 5; // Renderer message RPC_R_INITIALIZE = 1000; RPC_R_FLUSHUNTIL = 1001; @@ -522,6 +528,7 @@ message RpcMessage { RPC_DS_INITIALIZE = 3000; RPC_DS_READUNTIL = 3001; RPC_DS_ENABLEBITSTREAMCONVERTER = 3002; + RPC_DS_ONERROR = 3003; // DemuxerStream callbacks RPC_DS_INITIALIZE_CALLBACK = 3100; RPC_DS_READUNTIL_CALLBACK = 3101; @@ -594,6 +601,9 @@ message RpcMessage { // RPC_R_SETCDM RendererSetCdm renderer_set_cdm_rpc = 102; + // RPC_ACQUIRE_DEMUXER + AcquireDemuxer acquire_demuxer_rpc = 103; + // RPC_RC_ONTIMEUPDATE RendererClientOnTimeUpdate rendererclient_ontimeupdate_rpc = 200; // RPC_RC_ONVIDEONATURALSIZECHANGE diff --git a/chromium/media/remoting/mock_receiver_controller.cc b/chromium/media/remoting/mock_receiver_controller.cc new file mode 100644 index 00000000000..b0e742d7635 --- /dev/null +++ b/chromium/media/remoting/mock_receiver_controller.cc @@ -0,0 +1,118 @@ +// Copyright 2020 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "media/remoting/mock_receiver_controller.h" + +#include "base/check.h" +#include "media/mojo/common/mojo_decoder_buffer_converter.h" +#include "media/remoting/test_utils.h" +#include "mojo/public/cpp/system/data_pipe.h" + +namespace media { +namespace remoting { + +MockRemotee::MockRemotee() = default; + +MockRemotee::~MockRemotee() = default; + +void MockRemotee::BindMojoReceiver(mojo::PendingReceiver<Remotee> receiver) { + DCHECK(receiver); + receiver_.Bind(std::move(receiver)); +} + +void MockRemotee::SendAudioFrame(uint32_t frame_count, + scoped_refptr<DecoderBuffer> buffer) { + mojom::DecoderBufferPtr mojo_buffer = + audio_buffer_writer_->WriteDecoderBuffer(std::move(buffer)); + audio_stream_->ReceiveFrame(frame_count, std::move(mojo_buffer)); +} + +void MockRemotee::SendVideoFrame(uint32_t frame_count, + scoped_refptr<DecoderBuffer> buffer) { + mojom::DecoderBufferPtr mojo_buffer = + video_buffer_writer_->WriteDecoderBuffer(std::move(buffer)); + video_stream_->ReceiveFrame(frame_count, std::move(mojo_buffer)); +} + +void MockRemotee::OnRemotingSinkReady( + mojo::PendingRemote<mojom::RemotingSink> remoting_sink) { + DCHECK(remoting_sink); + remoting_sink_.Bind(std::move(remoting_sink)); +} + +void MockRemotee::SendMessageToSource(const std::vector<uint8_t>& message) {} + +void MockRemotee::StartDataStreams( + mojo::PendingRemote<mojom::RemotingDataStreamReceiver> audio_stream, + mojo::PendingRemote<mojom::RemotingDataStreamReceiver> video_stream) { + if (audio_stream.is_valid()) { + // Initialize data pipe for audio data stream receiver. + audio_stream_.Bind(std::move(audio_stream)); + mojo::ScopedDataPipeConsumerHandle audio_data_pipe; + audio_buffer_writer_ = MojoDecoderBufferWriter::Create( + GetDefaultDecoderBufferConverterCapacity(DemuxerStream::AUDIO), + &audio_data_pipe); + audio_stream_->InitializeDataPipe(std::move(audio_data_pipe)); + } + + if (video_stream.is_valid()) { + // Initialize data pipe for video data stream receiver. + video_stream_.Bind(std::move(video_stream)); + mojo::ScopedDataPipeConsumerHandle video_data_pipe; + video_buffer_writer_ = MojoDecoderBufferWriter::Create( + GetDefaultDecoderBufferConverterCapacity(DemuxerStream::VIDEO), + &video_data_pipe); + video_stream_->InitializeDataPipe(std::move(video_data_pipe)); + } +} + +void MockRemotee::OnFlushUntil(uint32_t audio_count, uint32_t video_count) { + flush_audio_count_ = audio_count; + flush_video_count_ = video_count; + + if (audio_stream_.is_bound()) { + audio_stream_->FlushUntil(audio_count); + } + if (video_stream_.is_bound()) { + video_stream_->FlushUntil(video_count); + } +} + +void MockRemotee::OnVideoNaturalSizeChange(const gfx::Size& size) { + DCHECK(!size.IsEmpty()); + changed_size_ = size; +} + +void MockRemotee::Reset() { + audio_stream_.reset(); + video_stream_.reset(); + receiver_.reset(); + remoting_sink_.reset(); +} + +// static +MockReceiverController* MockReceiverController::GetInstance() { + static base::NoDestructor<MockReceiverController> controller; + ResetForTesting(controller.get()); + controller->mock_remotee_->Reset(); + return controller.get(); +} + +MockReceiverController::MockReceiverController() + : mock_remotee_(new MockRemotee()) { + // Overwrites |rpc_broker_|. + rpc_broker_.SetMessageCallbackForTesting(base::BindRepeating( + &MockReceiverController::OnSendRpc, base::Unretained(this))); +} + +MockReceiverController::~MockReceiverController() = default; + +void MockReceiverController::OnSendRpc( + std::unique_ptr<std::vector<uint8_t>> message) { + std::vector<uint8_t> binary_message = *message; + ReceiverController::OnMessageFromSource(binary_message); +} + +} // namespace remoting +} // namespace media diff --git a/chromium/media/remoting/mock_receiver_controller.h b/chromium/media/remoting/mock_receiver_controller.h new file mode 100644 index 00000000000..e4c6da6c37b --- /dev/null +++ b/chromium/media/remoting/mock_receiver_controller.h @@ -0,0 +1,96 @@ +// Copyright 2020 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MEDIA_REMOTING_MOCK_RECEIVER_CONTROLLER_H_ +#define MEDIA_REMOTING_MOCK_RECEIVER_CONTROLLER_H_ + +#include <memory> +#include <vector> + +#include "base/memory/scoped_refptr.h" +#include "base/no_destructor.h" +#include "media/mojo/mojom/remoting.mojom.h" +#include "media/remoting/receiver_controller.h" +#include "testing/gmock/include/gmock/gmock.h" +#include "testing/gtest/include/gtest/gtest.h" +#include "ui/gfx/geometry/size.h" + +namespace media { + +class MojoDecoderBufferWriter; + +namespace remoting { + +class MockRemotee : public mojom::Remotee { + public: + MockRemotee(); + ~MockRemotee() override; + + void BindMojoReceiver(mojo::PendingReceiver<Remotee> receiver); + + void SendAudioFrame(uint32_t frame_count, + scoped_refptr<DecoderBuffer> buffer); + void SendVideoFrame(uint32_t frame_count, + scoped_refptr<DecoderBuffer> buffer); + + // mojom::Remotee implementation + void OnRemotingSinkReady( + mojo::PendingRemote<mojom::RemotingSink> remoting_sink) override; + void SendMessageToSource(const std::vector<uint8_t>& message) override; + void StartDataStreams( + mojo::PendingRemote<mojom::RemotingDataStreamReceiver> audio_stream, + mojo::PendingRemote<mojom::RemotingDataStreamReceiver> video_stream) + override; + void OnFlushUntil(uint32_t audio_count, uint32_t video_count) override; + void OnVideoNaturalSizeChange(const gfx::Size& size) override; + + void Reset(); + + gfx::Size changed_size() { return changed_size_; } + uint32_t flush_audio_count() { return flush_audio_count_; } + uint32_t flush_video_count() { return flush_video_count_; } + + mojo::PendingRemote<mojom::Remotee> BindNewPipeAndPassRemote() { + return receiver_.BindNewPipeAndPassRemote(); + } + + mojo::Remote<mojom::RemotingDataStreamReceiver> audio_stream_; + mojo::Remote<mojom::RemotingDataStreamReceiver> video_stream_; + + private: + gfx::Size changed_size_; + + uint32_t flush_audio_count_{0}; + uint32_t flush_video_count_{0}; + + std::unique_ptr<MojoDecoderBufferWriter> audio_buffer_writer_; + std::unique_ptr<MojoDecoderBufferWriter> video_buffer_writer_; + + mojo::Remote<mojom::RemotingSink> remoting_sink_; + mojo::Receiver<mojom::Remotee> receiver_{this}; +}; + +class MockReceiverController : public ReceiverController { + public: + static MockReceiverController* GetInstance(); + + MockRemotee* mock_remotee() { return mock_remotee_.get(); } + + private: + friend base::NoDestructor<MockReceiverController>; + friend testing::StrictMock<MockReceiverController>; + friend testing::NiceMock<MockReceiverController>; + + MockReceiverController(); + ~MockReceiverController() override; + + void OnSendRpc(std::unique_ptr<std::vector<uint8_t>> message); + + std::unique_ptr<MockRemotee> mock_remotee_; +}; + +} // namespace remoting +} // namespace media + +#endif // MEDIA_REMOTING_MOCK_RECEIVER_CONTROLLER_H_ diff --git a/chromium/media/remoting/receiver.cc b/chromium/media/remoting/receiver.cc index 6db0c7d3b9d..40bffe10dd3 100644 --- a/chromium/media/remoting/receiver.cc +++ b/chromium/media/remoting/receiver.cc @@ -6,10 +6,14 @@ #include "base/bind.h" #include "base/callback.h" +#include "base/single_thread_task_runner.h" +#include "base/threading/thread_task_runner_handle.h" +#include "media/base/bind_to_current_loop.h" #include "media/base/decoder_buffer.h" #include "media/base/renderer.h" #include "media/remoting/proto_enum_utils.h" #include "media/remoting/proto_utils.h" +#include "media/remoting/receiver_controller.h" #include "media/remoting/stream_provider.h" namespace media { @@ -23,116 +27,168 @@ constexpr base::TimeDelta kTimeUpdateInterval = } // namespace -Receiver::Receiver(std::unique_ptr<Renderer> renderer, RpcBroker* rpc_broker) - : renderer_(std::move(renderer)), - rpc_broker_(rpc_broker), - rpc_handle_(rpc_broker_->GetUniqueHandle()) { - DCHECK(renderer_); +Receiver::Receiver( + int rpc_handle, + int remote_handle, + ReceiverController* receiver_controller, + const scoped_refptr<base::SingleThreadTaskRunner>& media_task_runner, + std::unique_ptr<Renderer> renderer, + base::OnceCallback<void(int)> acquire_renderer_done_cb) + : rpc_handle_(rpc_handle), + remote_handle_(remote_handle), + receiver_controller_(receiver_controller), + rpc_broker_(receiver_controller_->rpc_broker()), + main_task_runner_(base::ThreadTaskRunnerHandle::Get()), + media_task_runner_(media_task_runner), + renderer_(std::move(renderer)), + acquire_renderer_done_cb_(std::move(acquire_renderer_done_cb)) { + DCHECK(rpc_handle_ != RpcBroker::kInvalidHandle); + DCHECK(receiver_controller_); DCHECK(rpc_broker_); - rpc_broker_->RegisterMessageReceiverCallback( - rpc_handle_, base::BindRepeating(&Receiver::OnReceivedRpc, - weak_factory_.GetWeakPtr())); - rpc_broker_->RegisterMessageReceiverCallback( - RpcBroker::kAcquireHandle, - base::BindRepeating(&Receiver::OnReceivedRpc, - weak_factory_.GetWeakPtr())); + DCHECK(renderer_); + + // Note: The constructor is running on the main thread, but will be destroyed + // on the media thread. Therefore, all weak pointers must be dereferenced on + // the media thread. + const RpcBroker::ReceiveMessageCallback receive_callback = BindToLoop( + media_task_runner_, + BindRepeating(&Receiver::OnReceivedRpc, weak_factory_.GetWeakPtr())); + + // Listening all renderer rpc messages. + rpc_broker_->RegisterMessageReceiverCallback(rpc_handle_, receive_callback); + VerifyAcquireRendererDone(); } Receiver::~Receiver() { rpc_broker_->UnregisterMessageReceiverCallback(rpc_handle_); - rpc_broker_->UnregisterMessageReceiverCallback(RpcBroker::kAcquireHandle); + rpc_broker_->UnregisterMessageReceiverCallback( + RpcBroker::kAcquireRendererHandle); +} + +// Receiver::Initialize() will be called by the local pipeline, it would only +// keep the |init_cb| in order to continue the initialization once it receives +// RPC_R_INITIALIZE, which means Receiver::RpcInitialize() is called. +void Receiver::Initialize(MediaResource* media_resource, + RendererClient* client, + PipelineStatusCallback init_cb) { + demuxer_ = media_resource; + init_cb_ = std::move(init_cb); + ShouldInitializeRenderer(); +} + +/* CDM is not supported for remoting media */ +void Receiver::SetCdm(CdmContext* cdm_context, CdmAttachedCB cdm_attached_cb) { + NOTREACHED(); +} + +// No-op. Controlled by sender via RPC calls instead. +void Receiver::SetLatencyHint(base::Optional<base::TimeDelta> latency_hint) {} + +// No-op. Controlled by sender via RPC calls instead. +void Receiver::Flush(base::OnceClosure flush_cb) {} + +// No-op. Controlled by sender via RPC calls instead. +void Receiver::StartPlayingFrom(base::TimeDelta time) {} + +// No-op. Controlled by sender via RPC calls instead. +void Receiver::SetPlaybackRate(double playback_rate) {} + +// No-op. Controlled by sender via RPC calls instead. +void Receiver::SetVolume(float volume) {} + +// No-op. Controlled by sender via RPC calls instead. +base::TimeDelta Receiver::GetMediaTime() { + return base::TimeDelta(); +} + +void Receiver::SendRpcMessageOnMainThread( + std::unique_ptr<pb::RpcMessage> message) { + // |rpc_broker_| is owned by |receiver_controller_| which is a singleton per + // process, so it's safe to use Unretained() here. + main_task_runner_->PostTask( + FROM_HERE, + base::BindOnce(&RpcBroker::SendMessageToRemote, + base::Unretained(rpc_broker_), std::move(message))); } void Receiver::OnReceivedRpc(std::unique_ptr<pb::RpcMessage> message) { + DCHECK(media_task_runner_->BelongsToCurrentThread()); DCHECK(message); switch (message->proc()) { - case pb::RpcMessage::RPC_ACQUIRE_RENDERER: - AcquireRenderer(std::move(message)); + case pb::RpcMessage::RPC_R_INITIALIZE: + RpcInitialize(std::move(message)); break; case pb::RpcMessage::RPC_R_FLUSHUNTIL: - FlushUntil(std::move(message)); + RpcFlushUntil(std::move(message)); break; case pb::RpcMessage::RPC_R_STARTPLAYINGFROM: - StartPlayingFrom(std::move(message)); + RpcStartPlayingFrom(std::move(message)); break; case pb::RpcMessage::RPC_R_SETPLAYBACKRATE: - SetPlaybackRate(std::move(message)); + RpcSetPlaybackRate(std::move(message)); break; case pb::RpcMessage::RPC_R_SETVOLUME: - SetVolume(std::move(message)); - break; - case pb::RpcMessage::RPC_R_INITIALIZE: - Initialize(std::move(message)); + RpcSetVolume(std::move(message)); break; default: - VLOG(1) << __func__ << ": Unknow RPC message. proc=" << message->proc(); + VLOG(1) << __func__ << ": Unknown RPC message. proc=" << message->proc(); } } -void Receiver::AcquireRenderer(std::unique_ptr<pb::RpcMessage> message) { - DVLOG(3) << __func__ << ": Receives RPC_ACQUIRE_RENDERER with remote_handle= " - << message->integer_value(); +void Receiver::SetRemoteHandle(int remote_handle) { + DCHECK_NE(remote_handle, RpcBroker::kInvalidHandle); + DCHECK_EQ(remote_handle_, RpcBroker::kInvalidHandle); + remote_handle_ = remote_handle; + VerifyAcquireRendererDone(); +} - remote_handle_ = message->integer_value(); - if (stream_provider_) { - VLOG(1) << "Acquire renderer error: Already acquired."; - OnError(PipelineStatus::PIPELINE_ERROR_DECODE); +void Receiver::VerifyAcquireRendererDone() { + if (remote_handle_ == RpcBroker::kInvalidHandle) return; - } - - stream_provider_.reset(new StreamProvider( - rpc_broker_, - base::BindOnce(&Receiver::OnError, weak_factory_.GetWeakPtr(), - PipelineStatus::PIPELINE_ERROR_DECODE))); - DVLOG(3) << __func__ - << ": Issues RPC_ACQUIRE_RENDERER_DONE RPC message. remote_handle=" - << remote_handle_ << " rpc_handle=" << rpc_handle_; - std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage()); - rpc->set_handle(remote_handle_); - rpc->set_proc(pb::RpcMessage::RPC_ACQUIRE_RENDERER_DONE); - rpc->set_integer_value(rpc_handle_); - rpc_broker_->SendMessageToRemote(std::move(rpc)); + DCHECK(acquire_renderer_done_cb_); + std::move(acquire_renderer_done_cb_).Run(rpc_handle_); } -void Receiver::Initialize(std::unique_ptr<pb::RpcMessage> message) { - DCHECK(stream_provider_); - DVLOG(3) << __func__ << ": Receives RPC_R_INITIALIZE with callback handle= " - << message->renderer_initialize_rpc().callback_handle(); - DCHECK(message->renderer_initialize_rpc().callback_handle() == - remote_handle_); - if (!stream_provider_) - OnRendererInitialized(PipelineStatus::PIPELINE_ERROR_INITIALIZATION_FAILED); - - stream_provider_->Initialize( - message->renderer_initialize_rpc().audio_demuxer_handle(), - message->renderer_initialize_rpc().video_demuxer_handle(), - base::BindOnce(&Receiver::OnStreamInitialized, - weak_factory_.GetWeakPtr())); +void Receiver::RpcInitialize(std::unique_ptr<pb::RpcMessage> message) { + DCHECK(renderer_); + rpc_initialize_received_ = true; + ShouldInitializeRenderer(); } -void Receiver::OnStreamInitialized() { - DCHECK(stream_provider_); - renderer_->Initialize(stream_provider_.get(), this, +void Receiver::ShouldInitializeRenderer() { + // ShouldInitializeRenderer() will be called from Initialize() and + // RpcInitialize() in different orders. + // + // |renderer_| must be initialized when both Initialize() and + // RpcInitialize() are called. + if (!rpc_initialize_received_ || !init_cb_) + return; + + DCHECK(media_task_runner_->BelongsToCurrentThread()); + DCHECK(renderer_); + DCHECK(demuxer_); + renderer_->Initialize(demuxer_, this, base::BindOnce(&Receiver::OnRendererInitialized, weak_factory_.GetWeakPtr())); } void Receiver::OnRendererInitialized(PipelineStatus status) { - DVLOG(3) << __func__ << ": Issues RPC_R_INITIALIZE_CALLBACK RPC message." - << "remote_handle=" << remote_handle_; + DCHECK(media_task_runner_->BelongsToCurrentThread()); + DCHECK(init_cb_); + std::move(init_cb_).Run(status); - std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage()); + auto rpc = std::make_unique<pb::RpcMessage>(); rpc->set_handle(remote_handle_); rpc->set_proc(pb::RpcMessage::RPC_R_INITIALIZE_CALLBACK); rpc->set_boolean_value(status == PIPELINE_OK); - rpc_broker_->SendMessageToRemote(std::move(rpc)); + SendRpcMessageOnMainThread(std::move(rpc)); } -void Receiver::SetPlaybackRate(std::unique_ptr<pb::RpcMessage> message) { +void Receiver::RpcSetPlaybackRate(std::unique_ptr<pb::RpcMessage> message) { + DCHECK(media_task_runner_->BelongsToCurrentThread()); + const double playback_rate = message->double_value(); - DVLOG(3) << __func__ - << ": Receives RPC_R_SETPLAYBACKRATE with rate=" << playback_rate; renderer_->SetPlaybackRate(playback_rate); if (playback_rate == 0.0) { @@ -147,38 +203,32 @@ void Receiver::SetPlaybackRate(std::unique_ptr<pb::RpcMessage> message) { } } -void Receiver::FlushUntil(std::unique_ptr<pb::RpcMessage> message) { - DVLOG(3) << __func__ << ": Receives RPC_R_FLUSHUNTIL RPC message."; +void Receiver::RpcFlushUntil(std::unique_ptr<pb::RpcMessage> message) { + DCHECK(media_task_runner_->BelongsToCurrentThread()); + DCHECK(message->has_renderer_flushuntil_rpc()); const pb::RendererFlushUntil flush_message = message->renderer_flushuntil_rpc(); DCHECK_EQ(flush_message.callback_handle(), remote_handle_); - if (stream_provider_) { - if (flush_message.has_audio_count()) { - stream_provider_->FlushUntil(DemuxerStream::AUDIO, - flush_message.audio_count()); - } - if (flush_message.has_video_count()) { - stream_provider_->FlushUntil(DemuxerStream::VIDEO, - flush_message.video_count()); - } - } + + receiver_controller_->OnRendererFlush(flush_message.audio_count(), + flush_message.video_count()); + time_update_timer_.Stop(); renderer_->Flush( base::BindOnce(&Receiver::OnFlushDone, weak_factory_.GetWeakPtr())); } void Receiver::OnFlushDone() { - DVLOG(3) << __func__ << ": Issues RPC_R_FLUSHUNTIL_CALLBACK RPC message."; - - std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage()); + auto rpc = std::make_unique<pb::RpcMessage>(); rpc->set_handle(remote_handle_); rpc->set_proc(pb::RpcMessage::RPC_R_FLUSHUNTIL_CALLBACK); - rpc_broker_->SendMessageToRemote(std::move(rpc)); + SendRpcMessageOnMainThread(std::move(rpc)); } -void Receiver::StartPlayingFrom(std::unique_ptr<pb::RpcMessage> message) { - DVLOG(3) << __func__ << ": Receives RPC_R_STARTPLAYINGFROM message."; +void Receiver::RpcStartPlayingFrom(std::unique_ptr<pb::RpcMessage> message) { + DCHECK(media_task_runner_->BelongsToCurrentThread()); + base::TimeDelta time = base::TimeDelta::FromMicroseconds(message->integer64_value()); renderer_->StartPlayingFrom(time); @@ -194,14 +244,14 @@ void Receiver::ScheduleMediaTimeUpdates() { weak_factory_.GetWeakPtr())); } -void Receiver::SetVolume(std::unique_ptr<pb::RpcMessage> message) { - DVLOG(3) << __func__ << ": Receives RPC_R_SETVOLUME message."; +void Receiver::RpcSetVolume(std::unique_ptr<pb::RpcMessage> message) { + DCHECK(media_task_runner_->BelongsToCurrentThread()); renderer_->SetVolume(message->double_value()); } void Receiver::SendMediaTimeUpdate() { // Issues RPC_RC_ONTIMEUPDATE RPC message. - std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage()); + auto rpc = std::make_unique<pb::RpcMessage>(); rpc->set_handle(remote_handle_); rpc->set_proc(pb::RpcMessage::RPC_RC_ONTIMEUPDATE); auto* message = rpc->mutable_rendererclient_ontimeupdate_rpc(); @@ -209,40 +259,26 @@ void Receiver::SendMediaTimeUpdate() { message->set_time_usec(media_time.InMicroseconds()); base::TimeDelta max_time = media_time; message->set_max_time_usec(max_time.InMicroseconds()); - DVLOG(3) << __func__ << ": Issues RPC_RC_ONTIMEUPDATE message." - << " media_time = " << media_time.InMicroseconds() - << " max_time= " << max_time.InMicroseconds(); - rpc_broker_->SendMessageToRemote(std::move(rpc)); -} - -void Receiver::OnReceivedBuffer(DemuxerStream::Type type, - scoped_refptr<DecoderBuffer> buffer) { - DVLOG(3) << __func__ - << ": type=" << (type == DemuxerStream::AUDIO ? "Audio" : "Video"); - DCHECK(stream_provider_); - stream_provider_->AppendBuffer(type, buffer); + SendRpcMessageOnMainThread(std::move(rpc)); } void Receiver::OnError(PipelineStatus status) { - VLOG(1) << __func__ << ": Issues RPC_RC_ONERROR message."; - std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage()); + auto rpc = std::make_unique<pb::RpcMessage>(); rpc->set_handle(remote_handle_); rpc->set_proc(pb::RpcMessage::RPC_RC_ONERROR); - rpc_broker_->SendMessageToRemote(std::move(rpc)); + SendRpcMessageOnMainThread(std::move(rpc)); } void Receiver::OnEnded() { - DVLOG(3) << __func__ << ": Issues RPC_RC_ONENDED message."; - std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage()); + auto rpc = std::make_unique<pb::RpcMessage>(); rpc->set_handle(remote_handle_); rpc->set_proc(pb::RpcMessage::RPC_RC_ONENDED); - rpc_broker_->SendMessageToRemote(std::move(rpc)); + SendRpcMessageOnMainThread(std::move(rpc)); time_update_timer_.Stop(); } void Receiver::OnStatisticsUpdate(const PipelineStatistics& stats) { - DVLOG(3) << __func__ << ": Issues RPC_RC_ONSTATISTICSUPDATE message."; - std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage()); + auto rpc = std::make_unique<pb::RpcMessage>(); rpc->set_handle(remote_handle_); rpc->set_proc(pb::RpcMessage::RPC_RC_ONSTATISTICSUPDATE); auto* message = rpc->mutable_rendererclient_onstatisticsupdate_rpc(); @@ -252,77 +288,68 @@ void Receiver::OnStatisticsUpdate(const PipelineStatistics& stats) { message->set_video_frames_dropped(stats.video_frames_dropped); message->set_audio_memory_usage(stats.audio_memory_usage); message->set_video_memory_usage(stats.video_memory_usage); - rpc_broker_->SendMessageToRemote(std::move(rpc)); + SendRpcMessageOnMainThread(std::move(rpc)); } void Receiver::OnBufferingStateChange(BufferingState state, BufferingStateChangeReason reason) { - DVLOG(3) << __func__ - << ": Issues RPC_RC_ONBUFFERINGSTATECHANGE message: state=" << state; - - // The |reason| is determined on the other side of the RPC in CourierRenderer. - // For now, there is no reason to provide this in the |message| below. - DCHECK_EQ(reason, BUFFERING_CHANGE_REASON_UNKNOWN); - - std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage()); + auto rpc = std::make_unique<pb::RpcMessage>(); rpc->set_handle(remote_handle_); rpc->set_proc(pb::RpcMessage::RPC_RC_ONBUFFERINGSTATECHANGE); auto* message = rpc->mutable_rendererclient_onbufferingstatechange_rpc(); message->set_state(ToProtoMediaBufferingState(state).value()); - rpc_broker_->SendMessageToRemote(std::move(rpc)); + SendRpcMessageOnMainThread(std::move(rpc)); } // TODO: Passes |reason| over. void Receiver::OnWaiting(WaitingReason reason) { - DVLOG(3) << __func__ << ": Issues RPC_RC_ONWAITINGFORDECRYPTIONKEY message."; - std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage()); + auto rpc = std::make_unique<pb::RpcMessage>(); rpc->set_handle(remote_handle_); rpc->set_proc(pb::RpcMessage::RPC_RC_ONWAITINGFORDECRYPTIONKEY); - rpc_broker_->SendMessageToRemote(std::move(rpc)); + SendRpcMessageOnMainThread(std::move(rpc)); } void Receiver::OnAudioConfigChange(const AudioDecoderConfig& config) { - DVLOG(3) << __func__ << ": Issues RPC_RC_ONAUDIOCONFIGCHANGE message."; - std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage()); + auto rpc = std::make_unique<pb::RpcMessage>(); rpc->set_handle(remote_handle_); rpc->set_proc(pb::RpcMessage::RPC_RC_ONAUDIOCONFIGCHANGE); auto* message = rpc->mutable_rendererclient_onaudioconfigchange_rpc(); pb::AudioDecoderConfig* proto_audio_config = message->mutable_audio_decoder_config(); ConvertAudioDecoderConfigToProto(config, proto_audio_config); - rpc_broker_->SendMessageToRemote(std::move(rpc)); + SendRpcMessageOnMainThread(std::move(rpc)); } void Receiver::OnVideoConfigChange(const VideoDecoderConfig& config) { - DVLOG(3) << __func__ << ": Issues RPC_RC_ONVIDEOCONFIGCHANGE message."; - std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage()); + auto rpc = std::make_unique<pb::RpcMessage>(); rpc->set_handle(remote_handle_); rpc->set_proc(pb::RpcMessage::RPC_RC_ONVIDEOCONFIGCHANGE); auto* message = rpc->mutable_rendererclient_onvideoconfigchange_rpc(); pb::VideoDecoderConfig* proto_video_config = message->mutable_video_decoder_config(); ConvertVideoDecoderConfigToProto(config, proto_video_config); - rpc_broker_->SendMessageToRemote(std::move(rpc)); + SendRpcMessageOnMainThread(std::move(rpc)); } void Receiver::OnVideoNaturalSizeChange(const gfx::Size& size) { - DVLOG(3) << __func__ << ": Issues RPC_RC_ONVIDEONATURALSIZECHANGE message."; - std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage()); + auto rpc = std::make_unique<pb::RpcMessage>(); rpc->set_handle(remote_handle_); rpc->set_proc(pb::RpcMessage::RPC_RC_ONVIDEONATURALSIZECHANGE); auto* message = rpc->mutable_rendererclient_onvideonatualsizechange_rpc(); message->set_width(size.width()); message->set_height(size.height()); - rpc_broker_->SendMessageToRemote(std::move(rpc)); + SendRpcMessageOnMainThread(std::move(rpc)); + + // Notify the host. + receiver_controller_->OnVideoNaturalSizeChange(size); } void Receiver::OnVideoOpacityChange(bool opaque) { - DVLOG(3) << __func__ << ": Issues RPC_RC_ONVIDEOOPACITYCHANGE message."; - std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage()); + auto rpc = std::make_unique<pb::RpcMessage>(); rpc->set_handle(remote_handle_); rpc->set_proc(pb::RpcMessage::RPC_RC_ONVIDEOOPACITYCHANGE); rpc->set_boolean_value(opaque); - rpc_broker_->SendMessageToRemote(std::move(rpc)); + SendRpcMessageOnMainThread(std::move(rpc)); } void Receiver::OnVideoFrameRateChange(base::Optional<int>) {} diff --git a/chromium/media/remoting/receiver.h b/chromium/media/remoting/receiver.h index 38d5f18f91b..8f0ac1154b2 100644 --- a/chromium/media/remoting/receiver.h +++ b/chromium/media/remoting/receiver.h @@ -5,30 +5,61 @@ #ifndef MEDIA_REMOTING_RECEIVER_H_ #define MEDIA_REMOTING_RECEIVER_H_ +#include <memory> + +#include "base/callback_forward.h" +#include "base/memory/scoped_refptr.h" #include "base/memory/weak_ptr.h" +#include "base/single_thread_task_runner.h" #include "base/timer/timer.h" #include "media/base/buffering_state.h" #include "media/base/demuxer_stream.h" +#include "media/base/renderer.h" #include "media/base/renderer_client.h" +#include "media/remoting/media_remoting_rpc.pb.h" #include "media/remoting/rpc_broker.h" -namespace media { -class Renderer; -class DecoderBuffer; -} // namespace media +namespace base { +class SingleThreadTaskRunner; +} // namespace base namespace media { namespace remoting { +class ReceiverController; class RpcBroker; -class StreamProvider; -// Media remoting receiver. Media streams are rendered by |renderer|. -// |rpc_broker| outlives this class. -class Receiver final : public RendererClient { +// Receiver runs on a remote device, and forwards the information sent from a +// CourierRenderer to |renderer_|, which actually renders the media. +// +// Receiver implements media::Renderer to be able to work with +// WebMediaPlayerImpl. However, most of the APIs of media::Renderer are dummy +// functions, because the media playback of the remoting media is not controlled +// by the local pipeline of WMPI. It should be controlled by the remoting sender +// via RPC calls. When Receiver receives RPC calls, it will call the +// corresponding functions of |renderer_| to control the media playback of +// the remoting media. +class Receiver final : public Renderer, public RendererClient { public: - Receiver(std::unique_ptr<Renderer> renderer, RpcBroker* rpc_broker); - ~Receiver(); + Receiver(int rpc_handle, + int remote_handle, + ReceiverController* receiver_controller, + const scoped_refptr<base::SingleThreadTaskRunner>& media_task_runner, + std::unique_ptr<Renderer> renderer, + base::OnceCallback<void(int)> acquire_renderer_done_cb); + ~Receiver() override; + + // Renderer implementation + void Initialize(MediaResource* media_resource, + RendererClient* client, + PipelineStatusCallback init_cb) override; + void SetCdm(CdmContext* cdm_context, CdmAttachedCB cdm_attached_cb) override; + void SetLatencyHint(base::Optional<base::TimeDelta> latency_hint) override; + void Flush(base::OnceClosure flush_cb) override; + void StartPlayingFrom(base::TimeDelta time) override; + void SetPlaybackRate(double playback_rate) override; + void SetVolume(float volume) override; + base::TimeDelta GetMediaTime() override; // RendererClient implementation. void OnError(PipelineStatus status) override; @@ -43,46 +74,70 @@ class Receiver final : public RendererClient { void OnVideoOpacityChange(bool opaque) override; void OnVideoFrameRateChange(base::Optional<int>) override; - void OnReceivedRpc(std::unique_ptr<pb::RpcMessage> message); - void OnReceivedBuffer(DemuxerStream::Type type, - scoped_refptr<DecoderBuffer> buffer); + // Used to set |remote_handle_| after Receiver is created, because the remote + // handle might be received after Receiver is created. + void SetRemoteHandle(int remote_handle); + + base::WeakPtr<Receiver> GetWeakPtr() { return weak_factory_.GetWeakPtr(); } private: + // Send RPC message on |main_task_runner_|. + void SendRpcMessageOnMainThread(std::unique_ptr<pb::RpcMessage> message); + + // Callback function when RPC message is received. + void OnReceivedRpc(std::unique_ptr<pb::RpcMessage> message); + // RPC message handlers. - void AcquireRenderer(std::unique_ptr<pb::RpcMessage> message); - void Initialize(std::unique_ptr<pb::RpcMessage> message); - void SetPlaybackRate(std::unique_ptr<pb::RpcMessage> message); - void FlushUntil(std::unique_ptr<pb::RpcMessage> message); - void StartPlayingFrom(std::unique_ptr<pb::RpcMessage> message); - void SetVolume(std::unique_ptr<pb::RpcMessage> message); - - // Initialization callbacks. - void OnStreamInitialized(); - void OnRendererInitialized(PipelineStatus status); + void RpcInitialize(std::unique_ptr<pb::RpcMessage> message); + void RpcSetPlaybackRate(std::unique_ptr<pb::RpcMessage> message); + void RpcFlushUntil(std::unique_ptr<pb::RpcMessage> message); + void RpcStartPlayingFrom(std::unique_ptr<pb::RpcMessage> message); + void RpcSetVolume(std::unique_ptr<pb::RpcMessage> message); + void ShouldInitializeRenderer(); + void OnRendererInitialized(PipelineStatus status); + void VerifyAcquireRendererDone(); void OnFlushDone(); // Periodically send the UpdateTime RPC message to update the media time. void ScheduleMediaTimeUpdates(); void SendMediaTimeUpdate(); - const std::unique_ptr<Renderer> renderer_; - RpcBroker* const rpc_broker_; // Outlives this class. + // The callback function to call when |this| is initialized. + PipelineStatusCallback init_cb_; + + // Indicates whether |this| received RPC_R_INITIALIZE message or not. + bool rpc_initialize_received_ = false; + + // Owns by the WebMediaPlayerImpl instance. + MediaResource* demuxer_ = nullptr; + + // The handle of |this| for listening RPC messages. + const int rpc_handle_; - // The CourierRenderer handle on sender side. Set when AcauireRenderer() is - // called. - int remote_handle_ = RpcBroker::kInvalidHandle; + // The CourierRenderer handle on sender side. |remote_handle_| could be set + // through the ctor or SetRemoteHandle(). + int remote_handle_; - int rpc_handle_ = RpcBroker::kInvalidHandle; + ReceiverController* const receiver_controller_; // Outlives this class. + RpcBroker* const rpc_broker_; // Outlives this class. - std::unique_ptr<StreamProvider> stream_provider_; + // Calling SendMessageCallback() of |rpc_broker_| should be on main thread. + const scoped_refptr<base::SingleThreadTaskRunner> main_task_runner_; + + // Media tasks should run on media thread. + const scoped_refptr<base::SingleThreadTaskRunner> media_task_runner_; + + // |renderer_| is the real renderer to render media. + std::unique_ptr<Renderer> renderer_; + + // The callback function to send RPC_ACQUIRE_RENDERER_DONE. + base::OnceCallback<void(int)> acquire_renderer_done_cb_; // The timer to periodically update the media time. base::RepeatingTimer time_update_timer_; base::WeakPtrFactory<Receiver> weak_factory_{this}; - - DISALLOW_COPY_AND_ASSIGN(Receiver); }; } // namespace remoting diff --git a/chromium/media/remoting/receiver_controller.cc b/chromium/media/remoting/receiver_controller.cc new file mode 100644 index 00000000000..549087cf391 --- /dev/null +++ b/chromium/media/remoting/receiver_controller.cc @@ -0,0 +1,116 @@ +// Copyright 2020 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "media/remoting/receiver_controller.h" + +#include "base/single_thread_task_runner.h" +#include "base/threading/thread_task_runner_handle.h" + +namespace media { +namespace remoting { + +// static +ReceiverController* ReceiverController::GetInstance() { + static base::NoDestructor<ReceiverController> controller; + return controller.get(); +} + +ReceiverController::ReceiverController() + : rpc_broker_(base::BindRepeating(&ReceiverController::OnSendRpc, + base::Unretained(this))), + main_task_runner_(base::ThreadTaskRunnerHandle::Get()) {} + +ReceiverController::~ReceiverController() = default; + +void ReceiverController::Initialize( + mojo::PendingRemote<mojom::Remotee> remotee) { + DCHECK(main_task_runner_->BelongsToCurrentThread()); + DCHECK(!media_remotee_.is_bound()); + media_remotee_.Bind(std::move(remotee)); + + // Calling NotifyRemotingSinkReady() to notify the host that RemotingSink is + // ready. + media_remotee_->OnRemotingSinkReady(receiver_.BindNewPipeAndPassRemote()); +} + +void ReceiverController::OnRendererFlush(uint32_t audio_count, + uint32_t video_count) { + if (!main_task_runner_->BelongsToCurrentThread()) { + // |this| is a singleton per process, it would be safe to use + // base::Unretained() here. + main_task_runner_->PostTask( + FROM_HERE, + base::BindOnce(&ReceiverController::OnRendererFlush, + base::Unretained(this), audio_count, video_count)); + return; + } + + if (media_remotee_.is_bound()) + media_remotee_->OnFlushUntil(audio_count, video_count); +} + +void ReceiverController::OnVideoNaturalSizeChange(const gfx::Size& size) { + if (!main_task_runner_->BelongsToCurrentThread()) { + // |this| is a singleton per process, it would be safe to use + // base::Unretained() here. + main_task_runner_->PostTask( + FROM_HERE, base::BindOnce(&ReceiverController::OnVideoNaturalSizeChange, + base::Unretained(this), size)); + return; + } + + if (media_remotee_.is_bound()) + media_remotee_->OnVideoNaturalSizeChange(size); +} + +void ReceiverController::StartDataStreams( + mojo::PendingRemote<::media::mojom::RemotingDataStreamReceiver> + audio_stream, + mojo::PendingRemote<::media::mojom::RemotingDataStreamReceiver> + video_stream) { + if (!main_task_runner_->BelongsToCurrentThread()) { + // |this| is a singleton per process, it would be safe to use + // base::Unretained() here. + main_task_runner_->PostTask( + FROM_HERE, + base::BindOnce(&ReceiverController::StartDataStreams, + base::Unretained(this), std::move(audio_stream), + std::move(video_stream))); + return; + } + if (media_remotee_.is_bound()) { + media_remotee_->StartDataStreams(std::move(audio_stream), + std::move(video_stream)); + } +} + +void ReceiverController::OnMessageFromSource( + const std::vector<uint8_t>& message) { + DCHECK(main_task_runner_->BelongsToCurrentThread()); + auto rpc_message = std::make_unique<pb::RpcMessage>(pb::RpcMessage()); + if (!rpc_message->ParseFromArray(message.data(), message.size())) + return; + + rpc_broker_.ProcessMessageFromRemote(std::move(rpc_message)); +} + +void ReceiverController::OnSendRpc( + std::unique_ptr<std::vector<uint8_t>> message) { + if (!main_task_runner_->BelongsToCurrentThread()) { + // |this| is a singleton per process, it would be safe to use + // base::Unretained() here. + main_task_runner_->PostTask( + FROM_HERE, base::BindOnce(&ReceiverController::OnSendRpc, + base::Unretained(this), std::move(message))); + return; + } + + DCHECK(media_remotee_.is_bound()); + std::vector<uint8_t> binary_message = *message; + if (media_remotee_.is_bound()) + media_remotee_->SendMessageToSource(binary_message); +} + +} // namespace remoting +} // namespace media diff --git a/chromium/media/remoting/receiver_controller.h b/chromium/media/remoting/receiver_controller.h new file mode 100644 index 00000000000..1071de2760a --- /dev/null +++ b/chromium/media/remoting/receiver_controller.h @@ -0,0 +1,70 @@ +// Copyright 2020 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MEDIA_REMOTING_RECEIVER_CONTROLLER_H_ +#define MEDIA_REMOTING_RECEIVER_CONTROLLER_H_ + +#include <memory> + +#include "base/no_destructor.h" +#include "media/mojo/mojom/remoting.mojom.h" +#include "media/remoting/rpc_broker.h" +#include "mojo/public/cpp/bindings/pending_remote.h" +#include "mojo/public/cpp/bindings/receiver.h" +#include "mojo/public/cpp/bindings/remote.h" + +namespace media { +namespace remoting { + +// ReceiverController is the bridge that owns |rpc_broker_| to allow Receivers +// and StreamProvider::MediaStreams to communicate with the sender via RPC +// calls. +// +// It also forwards calls to a |media_remotee_| instance, which will be +// implemented the browser process. Currently, the only use case will be on +// Chromecast, the Remotee implementation will be implemented in the browser +// code on Chromecast. +// +// NOTE: ReceiverController is a singleton per process. +class ReceiverController : mojom::RemotingSink { + public: + static ReceiverController* GetInstance(); + void Initialize(mojo::PendingRemote<mojom::Remotee> remotee); + + // Proxy functions to |media_remotee_|. + void OnRendererFlush(uint32_t audio_count, uint32_t video_count); + void OnVideoNaturalSizeChange(const gfx::Size& size); + void StartDataStreams( + mojo::PendingRemote<mojom::RemotingDataStreamReceiver> audio_stream, + mojo::PendingRemote<mojom::RemotingDataStreamReceiver> video_stream); + + // The reference of |rpc_broker_|. + media::remoting::RpcBroker* rpc_broker() { return &rpc_broker_; } + + private: + friend base::NoDestructor<ReceiverController>; + friend class MockReceiverController; + friend void ResetForTesting(ReceiverController* controller); + + ReceiverController(); + ~ReceiverController() override; + + // media::mojom::RemotingSink implementation. + void OnMessageFromSource(const std::vector<uint8_t>& message) override; + + // Callback for |rpc_broker_| to send messages. + void OnSendRpc(std::unique_ptr<std::vector<uint8_t>> message); + + RpcBroker rpc_broker_; + + const scoped_refptr<base::SingleThreadTaskRunner> main_task_runner_; + + mojo::Remote<media::mojom::Remotee> media_remotee_; + mojo::Receiver<media::mojom::RemotingSink> receiver_{this}; +}; + +} // namespace remoting +} // namespace media + +#endif // MEDIA_REMOTING_RECEIVER_CONTROLLER_H_ diff --git a/chromium/media/remoting/receiver_unittest.cc b/chromium/media/remoting/receiver_unittest.cc new file mode 100644 index 00000000000..94cb8cc50ef --- /dev/null +++ b/chromium/media/remoting/receiver_unittest.cc @@ -0,0 +1,471 @@ +// Copyright 2020 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "media/remoting/receiver.h" + +#include "base/check.h" +#include "base/optional.h" +#include "base/test/gmock_callback_support.h" +#include "base/test/task_environment.h" +#include "media/base/audio_decoder_config.h" +#include "media/base/media_util.h" +#include "media/base/mock_filters.h" +#include "media/base/renderer.h" +#include "media/base/test_helpers.h" +#include "media/base/video_decoder_config.h" +#include "media/remoting/mock_receiver_controller.h" +#include "media/remoting/proto_enum_utils.h" +#include "media/remoting/proto_utils.h" +#include "testing/gmock/include/gmock/gmock.h" +#include "testing/gtest/include/gtest/gtest.h" + +using base::test::RunOnceCallback; +using testing::_; +using testing::AtLeast; +using testing::NiceMock; +using testing::StrictMock; + +namespace media { +namespace remoting { + +class MockSender { + public: + MockSender(RpcBroker* rpc_broker, int remote_handle) + : rpc_broker_(rpc_broker), + rpc_handle_(rpc_broker->GetUniqueHandle()), + remote_handle_(remote_handle) { + rpc_broker_->RegisterMessageReceiverCallback( + rpc_handle_, base::BindRepeating(&MockSender::OnReceivedRpc, + base::Unretained(this))); + } + + MOCK_METHOD(void, AcquireRendererDone, ()); + MOCK_METHOD(void, InitializeCallback, (bool)); + MOCK_METHOD(void, FlushUntilCallback, ()); + MOCK_METHOD(void, OnTimeUpdate, (int64_t, int64_t)); + MOCK_METHOD(void, OnBufferingStateChange, (BufferingState)); + MOCK_METHOD(void, OnEnded, ()); + MOCK_METHOD(void, OnFatalError, ()); + MOCK_METHOD(void, OnAudioConfigChange, (AudioDecoderConfig)); + MOCK_METHOD(void, OnVideoConfigChange, (VideoDecoderConfig)); + MOCK_METHOD(void, OnVideoNaturalSizeChange, (gfx::Size)); + MOCK_METHOD(void, OnVideoOpacityChange, (bool)); + MOCK_METHOD(void, OnStatisticsUpdate, (PipelineStatistics)); + MOCK_METHOD(void, OnWaiting, ()); + + void OnReceivedRpc(std::unique_ptr<pb::RpcMessage> message) { + DCHECK(message); + switch (message->proc()) { + case pb::RpcMessage::RPC_ACQUIRE_RENDERER_DONE: + AcquireRendererDone(); + break; + case pb::RpcMessage::RPC_R_INITIALIZE_CALLBACK: + InitializeCallback(message->boolean_value()); + break; + case pb::RpcMessage::RPC_R_FLUSHUNTIL_CALLBACK: + FlushUntilCallback(); + break; + case pb::RpcMessage::RPC_RC_ONTIMEUPDATE: { + DCHECK(message->has_rendererclient_ontimeupdate_rpc()); + const int64_t time_usec = + message->rendererclient_ontimeupdate_rpc().time_usec(); + const int64_t max_time_usec = + message->rendererclient_ontimeupdate_rpc().max_time_usec(); + OnTimeUpdate(time_usec, max_time_usec); + break; + } + case pb::RpcMessage::RPC_RC_ONBUFFERINGSTATECHANGE: { + base::Optional<BufferingState> state = ToMediaBufferingState( + message->rendererclient_onbufferingstatechange_rpc().state()); + if (state.has_value()) + OnBufferingStateChange(state.value()); + break; + } + case pb::RpcMessage::RPC_RC_ONENDED: + OnEnded(); + break; + case pb::RpcMessage::RPC_RC_ONERROR: + OnFatalError(); + break; + case pb::RpcMessage::RPC_RC_ONAUDIOCONFIGCHANGE: { + DCHECK(message->has_rendererclient_onaudioconfigchange_rpc()); + const auto* audio_config_message = + message->mutable_rendererclient_onaudioconfigchange_rpc(); + const pb::AudioDecoderConfig pb_audio_config = + audio_config_message->audio_decoder_config(); + AudioDecoderConfig out_audio_config; + ConvertProtoToAudioDecoderConfig(pb_audio_config, &out_audio_config); + DCHECK(out_audio_config.IsValidConfig()); + OnAudioConfigChange(out_audio_config); + break; + } + case pb::RpcMessage::RPC_RC_ONVIDEOCONFIGCHANGE: { + DCHECK(message->has_rendererclient_onvideoconfigchange_rpc()); + const auto* video_config_message = + message->mutable_rendererclient_onvideoconfigchange_rpc(); + const pb::VideoDecoderConfig pb_video_config = + video_config_message->video_decoder_config(); + VideoDecoderConfig out_video_config; + ConvertProtoToVideoDecoderConfig(pb_video_config, &out_video_config); + DCHECK(out_video_config.IsValidConfig()); + + OnVideoConfigChange(out_video_config); + break; + } + case pb::RpcMessage::RPC_RC_ONVIDEONATURALSIZECHANGE: { + DCHECK(message->has_rendererclient_onvideonatualsizechange_rpc()); + + gfx::Size size( + message->rendererclient_onvideonatualsizechange_rpc().width(), + message->rendererclient_onvideonatualsizechange_rpc().height()); + OnVideoNaturalSizeChange(size); + break; + } + case pb::RpcMessage::RPC_RC_ONVIDEOOPACITYCHANGE: + OnVideoOpacityChange(message->boolean_value()); + break; + case pb::RpcMessage::RPC_RC_ONSTATISTICSUPDATE: { + DCHECK(message->has_rendererclient_onstatisticsupdate_rpc()); + auto rpc_message = message->rendererclient_onstatisticsupdate_rpc(); + PipelineStatistics statistics; + statistics.audio_bytes_decoded = rpc_message.audio_bytes_decoded(); + statistics.video_bytes_decoded = rpc_message.video_bytes_decoded(); + statistics.video_frames_decoded = rpc_message.video_frames_decoded(); + statistics.video_frames_dropped = rpc_message.video_frames_dropped(); + statistics.audio_memory_usage = rpc_message.audio_memory_usage(); + statistics.video_memory_usage = rpc_message.video_memory_usage(); + OnStatisticsUpdate(statistics); + break; + } + case pb::RpcMessage::RPC_RC_ONWAITINGFORDECRYPTIONKEY: + OnWaiting(); + break; + + default: + VLOG(1) << "Unknown RPC: " << message->proc(); + } + } + + void SendRpcAcquireRenderer() { + auto rpc = std::make_unique<pb::RpcMessage>(); + rpc->set_handle(RpcBroker::kAcquireRendererHandle); + rpc->set_proc(pb::RpcMessage::RPC_ACQUIRE_RENDERER); + rpc->set_integer_value(rpc_handle_); + rpc_broker_->SendMessageToRemote(std::move(rpc)); + } + + void SendRpcInitialize() { + auto rpc = std::make_unique<pb::RpcMessage>(); + rpc->set_handle(remote_handle_); + rpc->set_proc(pb::RpcMessage::RPC_R_INITIALIZE); + rpc_broker_->SendMessageToRemote(std::move(rpc)); + } + + void SendRpcSetPlaybackRate(double playback_rate) { + auto rpc = std::make_unique<pb::RpcMessage>(); + rpc->set_handle(remote_handle_); + rpc->set_proc(pb::RpcMessage::RPC_R_SETPLAYBACKRATE); + rpc->set_double_value(playback_rate); + rpc_broker_->SendMessageToRemote(std::move(rpc)); + } + + void SendRpcFlushUntil(uint32_t audio_count, uint32_t video_count) { + auto rpc = std::make_unique<pb::RpcMessage>(); + rpc->set_handle(remote_handle_); + rpc->set_proc(pb::RpcMessage::RPC_R_FLUSHUNTIL); + pb::RendererFlushUntil* message = rpc->mutable_renderer_flushuntil_rpc(); + message->set_audio_count(audio_count); + message->set_video_count(video_count); + message->set_callback_handle(rpc_handle_); + rpc_broker_->SendMessageToRemote(std::move(rpc)); + } + + void SendRpcStartPlayingFrom(base::TimeDelta time) { + auto rpc = std::make_unique<pb::RpcMessage>(); + rpc->set_handle(remote_handle_); + rpc->set_proc(pb::RpcMessage::RPC_R_STARTPLAYINGFROM); + rpc->set_integer64_value(time.InMicroseconds()); + rpc_broker_->SendMessageToRemote(std::move(rpc)); + } + + void SendRpcSetVolume(float volume) { + auto rpc = std::make_unique<pb::RpcMessage>(); + rpc->set_handle(remote_handle_); + rpc->set_proc(pb::RpcMessage::RPC_R_SETVOLUME); + rpc->set_double_value(volume); + rpc_broker_->SendMessageToRemote(std::move(rpc)); + } + + private: + RpcBroker* const rpc_broker_; + const int rpc_handle_; + const int remote_handle_; +}; + +class ReceiverTest : public ::testing::Test { + public: + ReceiverTest() = default; + + void SetUp() override { + mock_controller_ = MockReceiverController::GetInstance(); + mock_controller_->Initialize( + mock_controller_->mock_remotee()->BindNewPipeAndPassRemote()); + mock_remotee_ = mock_controller_->mock_remotee(); + + rpc_broker_ = mock_controller_->rpc_broker(); + receiver_renderer_handle_ = rpc_broker_->GetUniqueHandle(); + + mock_sender_ = std::make_unique<StrictMock<MockSender>>( + rpc_broker_, receiver_renderer_handle_); + + rpc_broker_->RegisterMessageReceiverCallback( + RpcBroker::kAcquireRendererHandle, + base::BindRepeating(&ReceiverTest::OnReceivedRpc, + weak_factory_.GetWeakPtr())); + } + + void TearDown() override { + rpc_broker_->UnregisterMessageReceiverCallback( + RpcBroker::kAcquireRendererHandle); + } + + void OnReceivedRpc(std::unique_ptr<media::remoting::pb::RpcMessage> message) { + DCHECK(message); + EXPECT_EQ(message->proc(), + media::remoting::pb::RpcMessage::RPC_ACQUIRE_RENDERER); + OnAcquireRenderer(std::move(message)); + } + + void OnAcquireRenderer( + std::unique_ptr<media::remoting::pb::RpcMessage> message) { + DCHECK(message->has_integer_value()); + DCHECK(message->integer_value() != RpcBroker::kInvalidHandle); + + if (sender_renderer_handle_ == RpcBroker::kInvalidHandle) { + sender_renderer_handle_ = message->integer_value(); + SetRemoteHandle(); + } + } + + void OnAcquireRendererDone(int receiver_renderer_handle) { + DVLOG(3) << __func__ + << ": Issues RPC_ACQUIRE_RENDERER_DONE RPC message. remote_handle=" + << sender_renderer_handle_ + << " rpc_handle=" << receiver_renderer_handle; + auto rpc = std::make_unique<pb::RpcMessage>(); + rpc->set_handle(sender_renderer_handle_); + rpc->set_proc(pb::RpcMessage::RPC_ACQUIRE_RENDERER_DONE); + rpc->set_integer_value(receiver_renderer_handle); + rpc_broker_->SendMessageToRemote(std::move(rpc)); + } + + void CreateReceiver() { + auto renderer = std::make_unique<NiceMock<MockRenderer>>(); + mock_renderer_ = renderer.get(); + receiver_ = std::make_unique<Receiver>( + receiver_renderer_handle_, sender_renderer_handle_, mock_controller_, + base::ThreadTaskRunnerHandle::Get(), std::move(renderer), + base::BindOnce(&ReceiverTest::OnAcquireRendererDone, + weak_factory_.GetWeakPtr())); + } + + void SetRemoteHandle() { + if (!receiver_) + return; + receiver_->SetRemoteHandle(sender_renderer_handle_); + } + + void InitializeReceiver() { + receiver_->Initialize(&mock_media_resource_, nullptr, + base::BindOnce(&ReceiverTest::OnRendererInitialized, + weak_factory_.GetWeakPtr())); + } + + MOCK_METHOD(void, OnRendererInitialized, (PipelineStatus)); + + base::test::TaskEnvironment task_environment_; + + int sender_renderer_handle_ = RpcBroker::kInvalidHandle; + int receiver_renderer_handle_ = RpcBroker::kInvalidHandle; + + MockMediaResource mock_media_resource_; + MockRenderer* mock_renderer_; + std::unique_ptr<MockSender> mock_sender_; + + RpcBroker* rpc_broker_; + MockRemotee* mock_remotee_; + MockReceiverController* mock_controller_; + std::unique_ptr<Receiver> receiver_; + + base::WeakPtrFactory<ReceiverTest> weak_factory_{this}; +}; + +TEST_F(ReceiverTest, AcquireRendererBeforeCreateReceiver) { + mock_sender_->SendRpcAcquireRenderer(); + EXPECT_CALL(*mock_sender_, AcquireRendererDone()).Times(1); + CreateReceiver(); + task_environment_.RunUntilIdle(); +} + +TEST_F(ReceiverTest, AcquireRendererAfterCreateReceiver) { + CreateReceiver(); + EXPECT_CALL(*mock_sender_, AcquireRendererDone()).Times(1); + mock_sender_->SendRpcAcquireRenderer(); + task_environment_.RunUntilIdle(); +} + +// |Receiver::Initialize| will be called by the local pipeline, and the +// |Receiver::RpcInitialize| will be called once it received the +// RPC_R_INITIALIZE messages, so these two initialization functions are possible +// to be called in difference orders. +// +// Call |Receiver::Initialize| first, then send RPC_R_INITIALIZE. +TEST_F(ReceiverTest, InitializeBeforeRpcInitialize) { + EXPECT_CALL(*mock_sender_, AcquireRendererDone()).Times(1); + mock_sender_->SendRpcAcquireRenderer(); + CreateReceiver(); + + EXPECT_CALL(*mock_renderer_, + OnInitialize(&mock_media_resource_, receiver_.get(), _)) + .WillOnce(RunOnceCallback<2>(PipelineStatus::PIPELINE_OK)); + EXPECT_CALL(*this, OnRendererInitialized(PipelineStatus::PIPELINE_OK)) + .Times(1); + EXPECT_CALL(*mock_sender_, InitializeCallback(true)).Times(1); + + InitializeReceiver(); + mock_sender_->SendRpcInitialize(); + task_environment_.RunUntilIdle(); +} + +// Send RPC_R_INITIALIZE first, then call |Receiver::Initialize|. +TEST_F(ReceiverTest, InitializeAfterRpcInitialize) { + EXPECT_CALL(*mock_sender_, AcquireRendererDone()).Times(1); + mock_sender_->SendRpcAcquireRenderer(); + CreateReceiver(); + + EXPECT_CALL(*mock_renderer_, + OnInitialize(&mock_media_resource_, receiver_.get(), _)) + .WillOnce(RunOnceCallback<2>(PipelineStatus::PIPELINE_OK)); + EXPECT_CALL(*this, OnRendererInitialized(PipelineStatus::PIPELINE_OK)) + .Times(1); + EXPECT_CALL(*mock_sender_, InitializeCallback(true)).Times(1); + + mock_sender_->SendRpcInitialize(); + InitializeReceiver(); + task_environment_.RunUntilIdle(); +} + +TEST_F(ReceiverTest, RpcRendererMessages) { + EXPECT_CALL(*mock_sender_, AcquireRendererDone()).Times(1); + mock_sender_->SendRpcAcquireRenderer(); + CreateReceiver(); + mock_sender_->SendRpcInitialize(); + InitializeReceiver(); + task_environment_.RunUntilIdle(); + + // SetVolume + const float volume = 0.5; + EXPECT_CALL(*mock_renderer_, SetVolume(volume)).Times(1); + mock_sender_->SendRpcSetVolume(volume); + task_environment_.RunUntilIdle(); + + EXPECT_CALL(*mock_sender_, OnTimeUpdate(_, _)).Times(AtLeast(1)); + + // SetPlaybackRate + const double playback_rate = 1.2; + EXPECT_CALL(*mock_renderer_, SetPlaybackRate(playback_rate)).Times(1); + mock_sender_->SendRpcSetPlaybackRate(playback_rate); + task_environment_.RunUntilIdle(); + + // Flush + const uint32_t flush_audio_count = 10; + const uint32_t flush_video_count = 20; + EXPECT_CALL(*mock_renderer_, OnFlush(_)).WillOnce(RunOnceCallback<0>()); + EXPECT_CALL(*mock_sender_, FlushUntilCallback()).Times(1); + mock_sender_->SendRpcFlushUntil(flush_audio_count, flush_video_count); + task_environment_.RunUntilIdle(); + EXPECT_EQ(flush_audio_count, mock_remotee_->flush_audio_count()); + EXPECT_EQ(flush_video_count, mock_remotee_->flush_video_count()); + + // StartPlayingFrom + const base::TimeDelta time = base::TimeDelta::FromSeconds(100); + EXPECT_CALL(*mock_renderer_, StartPlayingFrom(time)).Times(1); + mock_sender_->SendRpcStartPlayingFrom(time); + task_environment_.RunUntilIdle(); +} + +TEST_F(ReceiverTest, RendererClientInterface) { + EXPECT_CALL(*mock_sender_, AcquireRendererDone()).Times(1); + mock_sender_->SendRpcAcquireRenderer(); + CreateReceiver(); + mock_sender_->SendRpcInitialize(); + InitializeReceiver(); + task_environment_.RunUntilIdle(); + + // OnBufferingStateChange + EXPECT_CALL(*mock_sender_, OnBufferingStateChange(BUFFERING_HAVE_ENOUGH)) + .Times(1); + receiver_->OnBufferingStateChange(BUFFERING_HAVE_ENOUGH, + BUFFERING_CHANGE_REASON_UNKNOWN); + task_environment_.RunUntilIdle(); + + // OnEnded + EXPECT_CALL(*mock_sender_, OnEnded()).Times(1); + receiver_->OnEnded(); + task_environment_.RunUntilIdle(); + + // OnError + EXPECT_CALL(*mock_sender_, OnFatalError()).Times(1); + receiver_->OnError(PipelineStatus::AUDIO_RENDERER_ERROR); + task_environment_.RunUntilIdle(); + + // OnAudioConfigChange + const auto kNewAudioConfig = TestAudioConfig::Normal(); + EXPECT_CALL(*mock_sender_, + OnAudioConfigChange(DecoderConfigEq(kNewAudioConfig))) + .Times(1); + receiver_->OnAudioConfigChange(kNewAudioConfig); + task_environment_.RunUntilIdle(); + + // OnVideoConfigChange + const auto kNewVideoConfig = TestVideoConfig::Normal(); + EXPECT_CALL(*mock_sender_, + OnVideoConfigChange(DecoderConfigEq(kNewVideoConfig))) + .Times(1); + receiver_->OnVideoConfigChange(kNewVideoConfig); + task_environment_.RunUntilIdle(); + + // OnVideoNaturalSizeChange + const gfx::Size size(100, 200); + EXPECT_CALL(*mock_sender_, OnVideoNaturalSizeChange(size)).Times(1); + receiver_->OnVideoNaturalSizeChange(size); + task_environment_.RunUntilIdle(); + EXPECT_EQ(size, mock_remotee_->changed_size()); + + // OnVideoOpacityChange + const bool opaque = true; + EXPECT_CALL(*mock_sender_, OnVideoOpacityChange(opaque)).Times(1); + receiver_->OnVideoOpacityChange(opaque); + task_environment_.RunUntilIdle(); + + // OnStatisticsUpdate + PipelineStatistics statistics; + statistics.audio_bytes_decoded = 100; + statistics.video_bytes_decoded = 200; + statistics.video_frames_decoded = 300; + statistics.video_frames_dropped = 400; + statistics.audio_memory_usage = 500; + statistics.video_memory_usage = 600; + EXPECT_CALL(*mock_sender_, OnStatisticsUpdate(statistics)).Times(1); + receiver_->OnStatisticsUpdate(statistics); + task_environment_.RunUntilIdle(); + + // OnWaiting + EXPECT_CALL(*mock_sender_, OnWaiting()).Times(1); + receiver_->OnWaiting(WaitingReason::kNoDecryptionKey); + task_environment_.RunUntilIdle(); +} + +} // namespace remoting +} // namespace media diff --git a/chromium/media/remoting/remoting_constants.h b/chromium/media/remoting/remoting_constants.h new file mode 100644 index 00000000000..4dd35dd21f5 --- /dev/null +++ b/chromium/media/remoting/remoting_constants.h @@ -0,0 +1,18 @@ +// Copyright 2020 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MEDIA_REMOTING_REMOTING_CONSTANTS_H_ +#define MEDIA_REMOTING_REMOTING_CONSTANTS_H_ + +namespace media { +namespace remoting { + +// The src attribute for remoting media should use the URL with this scheme. +// The URL format is "media-remoting:<id>", e.g. "media-remoting:test". +constexpr char kRemotingScheme[] = "media-remoting"; + +} // namespace remoting +} // namespace media + +#endif // MEDIA_REMOTING_REMOTING_CONSTANTS_H_ diff --git a/chromium/media/remoting/remoting_renderer_factory.cc b/chromium/media/remoting/remoting_renderer_factory.cc new file mode 100644 index 00000000000..ea3051e715b --- /dev/null +++ b/chromium/media/remoting/remoting_renderer_factory.cc @@ -0,0 +1,122 @@ +// Copyright 2020 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "media/remoting/remoting_renderer_factory.h" + +#include "media/base/demuxer.h" +#include "media/remoting/receiver.h" +#include "media/remoting/receiver_controller.h" +#include "media/remoting/stream_provider.h" + +namespace media { +namespace remoting { + +RemotingRendererFactory::RemotingRendererFactory( + mojo::PendingRemote<mojom::Remotee> remotee, + std::unique_ptr<RendererFactory> renderer_factory, + const scoped_refptr<base::SingleThreadTaskRunner>& media_task_runner) + : receiver_controller_(ReceiverController::GetInstance()), + rpc_broker_(receiver_controller_->rpc_broker()), + renderer_handle_(rpc_broker_->GetUniqueHandle()), + waiting_for_remote_handle_receiver_(nullptr), + real_renderer_factory_(std::move(renderer_factory)), + media_task_runner_(media_task_runner) { + DVLOG(2) << __func__; + DCHECK(receiver_controller_); + + // Register the callback to listen RPC_ACQUIRE_RENDERER message. + rpc_broker_->RegisterMessageReceiverCallback( + RpcBroker::kAcquireRendererHandle, + base::BindRepeating(&RemotingRendererFactory::OnAcquireRenderer, + weak_factory_.GetWeakPtr())); + receiver_controller_->Initialize(std::move(remotee)); +} + +RemotingRendererFactory::~RemotingRendererFactory() { + rpc_broker_->UnregisterMessageReceiverCallback( + RpcBroker::kAcquireRendererHandle); +} + +std::unique_ptr<Renderer> RemotingRendererFactory::CreateRenderer( + const scoped_refptr<base::SingleThreadTaskRunner>& media_task_runner, + const scoped_refptr<base::TaskRunner>& worker_task_runner, + AudioRendererSink* audio_renderer_sink, + VideoRendererSink* video_renderer_sink, + RequestOverlayInfoCB request_overlay_info_cb, + const gfx::ColorSpace& target_color_space) { + DVLOG(2) << __func__; + + auto receiver = std::make_unique<Receiver>( + renderer_handle_, remote_renderer_handle_, receiver_controller_, + media_task_runner, + real_renderer_factory_->CreateRenderer( + media_task_runner, worker_task_runner, audio_renderer_sink, + video_renderer_sink, request_overlay_info_cb, target_color_space), + base::BindOnce(&RemotingRendererFactory::OnAcquireRendererDone, + base::Unretained(this))); + + // If we haven't received a RPC_ACQUIRE_RENDERER yet, keep a reference to + // |receiver|, and set its remote handle when we get the call to + // OnAcquireRenderer(). + if (remote_renderer_handle_ == RpcBroker::kInvalidHandle) + waiting_for_remote_handle_receiver_ = receiver->GetWeakPtr(); + + return std::move(receiver); +} + +void RemotingRendererFactory::OnReceivedRpc( + std::unique_ptr<pb::RpcMessage> message) { + DCHECK(message); + if (message->proc() == pb::RpcMessage::RPC_ACQUIRE_RENDERER) + OnAcquireRenderer(std::move(message)); + else + VLOG(1) << __func__ << ": Unknow RPC message. proc=" << message->proc(); +} + +void RemotingRendererFactory::OnAcquireRenderer( + std::unique_ptr<pb::RpcMessage> message) { + DCHECK(message->has_integer_value()); + DCHECK(message->integer_value() != RpcBroker::kInvalidHandle); + + remote_renderer_handle_ = message->integer_value(); + + // If CreateRenderer() was called before we had a valid + // |remote_renderer_handle_|, set it on the already created Receiver. + if (waiting_for_remote_handle_receiver_) { + // |waiting_for_remote_handle_receiver_| is the WeakPtr of the Receiver + // instance and should be deref in the media thread. + media_task_runner_->PostTask( + FROM_HERE, base::BindOnce(&Receiver::SetRemoteHandle, + waiting_for_remote_handle_receiver_, + remote_renderer_handle_)); + } +} + +void RemotingRendererFactory::OnAcquireRendererDone(int receiver_rpc_handle) { + // RPC_ACQUIRE_RENDERER_DONE should be sent only once. + // + // WebMediaPlayerImpl might destroy and re-create the Receiver instance + // several times for saving resources. However, RPC_ACQUIRE_RENDERER_DONE + // shouldn't be sent multiple times whenever a Receiver instance is created. + if (is_acquire_renderer_done_sent_) + return; + + DVLOG(3) << __func__ + << ": Issues RPC_ACQUIRE_RENDERER_DONE RPC message. remote_handle=" + << remote_renderer_handle_ << " rpc_handle=" << receiver_rpc_handle; + auto rpc = std::make_unique<pb::RpcMessage>(); + rpc->set_handle(remote_renderer_handle_); + rpc->set_proc(pb::RpcMessage::RPC_ACQUIRE_RENDERER_DONE); + rpc->set_integer_value(receiver_rpc_handle); + rpc_broker_->SendMessageToRemote(std::move(rpc)); + + // Once RPC_ACQUIRE_RENDERER_DONE is sent, it implies there is no Receiver + // instance that is waiting the remote handle. + waiting_for_remote_handle_receiver_ = nullptr; + + is_acquire_renderer_done_sent_ = true; +} + +} // namespace remoting +} // namespace media diff --git a/chromium/media/remoting/remoting_renderer_factory.h b/chromium/media/remoting/remoting_renderer_factory.h new file mode 100644 index 00000000000..cf74f57efcb --- /dev/null +++ b/chromium/media/remoting/remoting_renderer_factory.h @@ -0,0 +1,72 @@ +// Copyright 2020 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MEDIA_REMOTING_REMOTING_RENDERER_FACTORY_H_ +#define MEDIA_REMOTING_REMOTING_RENDERER_FACTORY_H_ + +#include "media/base/renderer_factory.h" +#include "media/mojo/mojom/remoting.mojom.h" +#include "media/remoting/rpc_broker.h" +#include "mojo/public/cpp/bindings/pending_remote.h" + +namespace media { +namespace remoting { + +class Receiver; +class ReceiverController; + +class RemotingRendererFactory : public RendererFactory { + public: + RemotingRendererFactory( + mojo::PendingRemote<mojom::Remotee> remotee, + std::unique_ptr<RendererFactory> renderer_factory, + const scoped_refptr<base::SingleThreadTaskRunner>& media_task_runner); + ~RemotingRendererFactory() override; + + // RendererFactory implementation + std::unique_ptr<Renderer> CreateRenderer( + const scoped_refptr<base::SingleThreadTaskRunner>& media_task_runner, + const scoped_refptr<base::TaskRunner>& worker_task_runner, + AudioRendererSink* audio_renderer_sink, + VideoRendererSink* video_renderer_sink, + RequestOverlayInfoCB request_overlay_info_cb, + const gfx::ColorSpace& target_color_space) override; + + private: + // Callback function when RPC message is received. + void OnReceivedRpc(std::unique_ptr<pb::RpcMessage> message); + void OnAcquireRenderer(std::unique_ptr<pb::RpcMessage> message); + void OnAcquireRendererDone(int receiver_rpc_handle); + + // Indicates whether RPC_ACQUIRE_RENDERER_DONE is sent or not. + bool is_acquire_renderer_done_sent_ = false; + + ReceiverController* receiver_controller_; + + RpcBroker* rpc_broker_; // Outlives this class. + + // The RPC handle used by all Receiver instances created by |this|. Sent only + // once to the sender side, through RPC_ACQUIRE_RENDERER_DONE, regardless of + // how many times CreateRenderer() is called." + const int renderer_handle_ = RpcBroker::kInvalidHandle; + + // The RPC handle of the CourierRenderer on the sender side. Will be received + // once, via an RPC_ACQUIRE_RENDERER message" + int remote_renderer_handle_ = RpcBroker::kInvalidHandle; + + // Used to set remote handle if receiving RPC_ACQUIRE_RENDERER after + // CreateRenderer() is called. + base::WeakPtr<Receiver> waiting_for_remote_handle_receiver_; + std::unique_ptr<RendererFactory> real_renderer_factory_; + + // Used to instantiate |receiver_|. + const scoped_refptr<base::SingleThreadTaskRunner> media_task_runner_; + + base::WeakPtrFactory<RemotingRendererFactory> weak_factory_{this}; +}; + +} // namespace remoting +} // namespace media + +#endif // MEDIA_REMOTING_REMOTING_RENDERER_FACTORY_H_ diff --git a/chromium/media/remoting/rpc_broker.h b/chromium/media/remoting/rpc_broker.h index 280d63fa112..912e3b61ad4 100644 --- a/chromium/media/remoting/rpc_broker.h +++ b/chromium/media/remoting/rpc_broker.h @@ -46,6 +46,7 @@ class RpcBroker { // Get unique handle value (larger than 0) for RPC message handles. int GetUniqueHandle(); + // TODO(chkuo): Change the parameter to accept const ref of RpcMessage. using ReceiveMessageCallback = base::RepeatingCallback<void(std::unique_ptr<pb::RpcMessage>)>; // Register a component to receive messages via the given @@ -77,10 +78,11 @@ class RpcBroker { // Predefined handle value for RPC messages related to initialization (before // the receiver handle(s) are known). - static constexpr int kAcquireHandle = 0; + static constexpr int kAcquireRendererHandle = 0; + static constexpr int kAcquireDemuxerHandle = 1; // The first handle to return from GetUniqueHandle(). - static constexpr int kFirstHandle = 1; + static constexpr int kFirstHandle = 100; private: // Checks that all method calls occur on the same thread. diff --git a/chromium/media/remoting/stream_provider.cc b/chromium/media/remoting/stream_provider.cc index 7ff69c808c5..198cf6c3f14 100644 --- a/chromium/media/remoting/stream_provider.cc +++ b/chromium/media/remoting/stream_provider.cc @@ -3,16 +3,23 @@ // found in the LICENSE file. #include "media/remoting/stream_provider.h" +#include <vector> #include "base/bind.h" #include "base/callback.h" #include "base/callback_helpers.h" #include "base/containers/circular_deque.h" #include "base/logging.h" +#include "base/single_thread_task_runner.h" +#include "base/threading/thread_task_runner_handle.h" +#include "media/base/bind_to_current_loop.h" #include "media/base/decoder_buffer.h" #include "media/base/video_transformation.h" +#include "media/mojo/common/mojo_decoder_buffer_converter.h" #include "media/remoting/proto_enum_utils.h" #include "media/remoting/proto_utils.h" +#include "media/remoting/receiver_controller.h" +#include "media/remoting/rpc_broker.h" namespace media { namespace remoting { @@ -22,131 +29,140 @@ namespace { constexpr int kNumFramesInEachReadUntil = 10; } -// An implementation of media::DemuxerStream on Media Remoting receiver. -// Receives data from mojo data pipe, and returns one frame or/and status when -// Read() is called. -class MediaStream final : public DemuxerStream { - public: - MediaStream(RpcBroker* rpc_broker, - Type type, - int remote_handle, - base::OnceClosure error_callback); - ~MediaStream() override; - - // DemuxerStream implementation. - void Read(ReadCB read_cb) override; - bool IsReadPending() const override; - AudioDecoderConfig audio_decoder_config() override; - VideoDecoderConfig video_decoder_config() override; - DemuxerStream::Type type() const override; - Liveness liveness() const override; - bool SupportsConfigChanges() override; - - void Initialize(base::OnceClosure init_done_cb); - void FlushUntil(int count); - void AppendBuffer(scoped_refptr<DecoderBuffer> buffer); - - private: - // RPC messages handlers. - void OnReceivedRpc(std::unique_ptr<pb::RpcMessage> message); - void OnInitializeCallback(std::unique_ptr<pb::RpcMessage> message); - void OnReadUntilCallback(std::unique_ptr<pb::RpcMessage> message); - - // Issues the ReadUntil RPC message when read is pending and buffer is empty. - void SendReadUntil(); - - // Run and reset the read callback. - void CompleteRead(DemuxerStream::Status status); - - // Update the audio/video decoder config When config changes in the mid - // stream, the new config will be stored in - // |next_audio/video_decoder_config_|. Old config will be droped when all - // associated frames are consumed. - void UpdateConfig(const pb::AudioDecoderConfig* audio_message, - const pb::VideoDecoderConfig* video_message); - - // Called when any error occurs. - void OnError(const std::string& error); - - RpcBroker* const rpc_broker_; // Outlives this class. - const Type type_; - const int remote_handle_; - const int rpc_handle_; - - // Set when Initialize() is called, and will be run after initialization is - // done. - base::OnceClosure init_done_callback_; - - // The read until count in the last ReadUntil RPC message. - int last_read_until_count_ = 0; - - // Indicates whether Audio/VideoDecoderConfig changed and the frames with the - // old config are not yet consumed. The new config is stored in the end of - // |audio/video_decoder_config_|; - bool config_changed_ = false; - - // Indicates whether a ReadUntil RPC message was sent without receiving the - // ReadUntilCallback message yet. - bool read_until_sent_ = false; - - // Set when Read() is called. Run only once when read completes. - ReadCB read_complete_callback_; - - base::OnceClosure error_callback_; // Called when first error occurs. - - base::circular_deque<scoped_refptr<DecoderBuffer>> buffers_; - - // Current audio/video config. - AudioDecoderConfig audio_decoder_config_; - VideoDecoderConfig video_decoder_config_; - - // Stores the new auido/video config when config changes. - AudioDecoderConfig next_audio_decoder_config_; - VideoDecoderConfig next_video_decoder_config_; - - base::WeakPtrFactory<MediaStream> weak_factory_{this}; - - DISALLOW_COPY_AND_ASSIGN(MediaStream); -}; - -MediaStream::MediaStream(RpcBroker* rpc_broker, - Type type, - int remote_handle, - base::OnceClosure error_callback) - : rpc_broker_(rpc_broker), +// static +void StreamProvider::MediaStream::CreateOnMainThread( + RpcBroker* rpc_broker, + Type type, + int32_t handle, + const scoped_refptr<base::SingleThreadTaskRunner>& media_task_runner, + base::OnceCallback<void(MediaStream::UniquePtr)> callback) { + MediaStream::UniquePtr stream( + new MediaStream(rpc_broker, type, handle, media_task_runner), + &DestructionHelper); + std::move(callback).Run(std::move(stream)); +} + +// static +void StreamProvider::MediaStream::DestructionHelper(MediaStream* stream) { + stream->Destroy(); +} + +StreamProvider::MediaStream::MediaStream( + RpcBroker* rpc_broker, + Type type, + int remote_handle, + const scoped_refptr<base::SingleThreadTaskRunner>& media_task_runner) + : main_task_runner_(base::ThreadTaskRunnerHandle::Get()), + media_task_runner_(media_task_runner), + rpc_broker_(rpc_broker), type_(type), remote_handle_(remote_handle), - rpc_handle_(rpc_broker_->GetUniqueHandle()), - error_callback_(std::move(error_callback)) { + rpc_handle_(rpc_broker_->GetUniqueHandle()) { DCHECK(remote_handle_ != RpcBroker::kInvalidHandle); - rpc_broker_->RegisterMessageReceiverCallback( - rpc_handle_, base::BindRepeating(&MediaStream::OnReceivedRpc, - weak_factory_.GetWeakPtr())); + + media_weak_this_ = media_weak_factory_.GetWeakPtr(); + + const RpcBroker::ReceiveMessageCallback receive_callback = + BindToLoop(media_task_runner_, + BindRepeating(&MediaStream::OnReceivedRpc, media_weak_this_)); + rpc_broker_->RegisterMessageReceiverCallback(rpc_handle_, receive_callback); } -MediaStream::~MediaStream() { +StreamProvider::MediaStream::~MediaStream() { + DCHECK(main_task_runner_->BelongsToCurrentThread()); rpc_broker_->UnregisterMessageReceiverCallback(rpc_handle_); } -void MediaStream::Initialize(base::OnceClosure init_done_cb) { - DCHECK(init_done_cb); - if (!init_done_callback_.is_null()) { +void StreamProvider::MediaStream::Destroy() { + DCHECK(media_task_runner_->BelongsToCurrentThread()); + + // Invalid weak pointers to prevent |this| from receiving RPC calls on the + // media thread. + media_weak_factory_.InvalidateWeakPtrs(); + + // Unbind all mojo pipes and bindings. + receiver_.reset(); + decoder_buffer_reader_.reset(); + + // After invalidating all weak ptrs of |media_weak_factory_|, MediaStream + // won't be access anymore, so using |this| here is safe. + main_task_runner_->DeleteSoon(FROM_HERE, this); +} + +void StreamProvider::MediaStream::SendRpcMessageOnMainThread( + std::unique_ptr<pb::RpcMessage> message) { + // |rpc_broker_| is owned by |receiver_controller_| which is a singleton per + // process, so it's safe to use Unretained() here. + main_task_runner_->PostTask( + FROM_HERE, + base::BindOnce(&RpcBroker::SendMessageToRemote, + base::Unretained(rpc_broker_), std::move(message))); +} + +void StreamProvider::MediaStream::Initialize( + base::OnceClosure init_done_callback) { + DCHECK(media_task_runner_->BelongsToCurrentThread()); + DCHECK(init_done_callback); + + if (init_done_callback_) { OnError("Duplicate initialization"); return; } - init_done_callback_ = std::move(init_done_cb); - DVLOG(3) << __func__ << "Issues RpcMessage::RPC_DS_INITIALIZE with " - << "remote_handle=" << remote_handle_ - << " rpc_handle=" << rpc_handle_; - std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage()); + init_done_callback_ = std::move(init_done_callback); + + auto rpc = std::make_unique<pb::RpcMessage>(); rpc->set_handle(remote_handle_); rpc->set_proc(pb::RpcMessage::RPC_DS_INITIALIZE); rpc->set_integer_value(rpc_handle_); - rpc_broker_->SendMessageToRemote(std::move(rpc)); + SendRpcMessageOnMainThread(std::move(rpc)); +} + +void StreamProvider::MediaStream::InitializeDataPipe( + mojo::ScopedDataPipeConsumerHandle data_pipe) { + DCHECK(media_task_runner_->BelongsToCurrentThread()); + + decoder_buffer_reader_ = + std::make_unique<MojoDecoderBufferReader>(std::move(data_pipe)); + CompleteInitialize(); +} + +void StreamProvider::MediaStream::ReceiveFrame(uint32_t count, + mojom::DecoderBufferPtr buffer) { + DCHECK(media_task_runner_->BelongsToCurrentThread()); + DCHECK(decoder_buffer_reader_); + + auto callback = BindToCurrentLoop( + base::BindOnce(&MediaStream::AppendBuffer, media_weak_this_, count)); + decoder_buffer_reader_->ReadDecoderBuffer(std::move(buffer), + std::move(callback)); +} + +void StreamProvider::MediaStream::FlushUntil(uint32_t count) { + DCHECK(media_task_runner_->BelongsToCurrentThread()); + + if (count < current_frame_count_) + return; + + uint32_t buffers_to_erase = count - current_frame_count_; + + if (buffers_to_erase > buffers_.size()) { + buffers_.clear(); + } else { + buffers_.erase(buffers_.begin(), buffers_.begin() + buffers_to_erase); + } + + current_frame_count_ = count; + + if (!read_complete_callback_.is_null()) + CompleteRead(DemuxerStream::kAborted); + + read_until_sent_ = false; } -void MediaStream::OnReceivedRpc(std::unique_ptr<pb::RpcMessage> message) { +void StreamProvider::MediaStream::OnReceivedRpc( + std::unique_ptr<pb::RpcMessage> message) { + DCHECK(media_task_runner_->BelongsToCurrentThread()); DCHECK(message->handle() == rpc_handle_); switch (message->proc()) { @@ -161,24 +177,21 @@ void MediaStream::OnReceivedRpc(std::unique_ptr<pb::RpcMessage> message) { } } -void MediaStream::OnInitializeCallback( +void StreamProvider::MediaStream::OnInitializeCallback( std::unique_ptr<pb::RpcMessage> message) { - DVLOG(3) << __func__ << "Receives RPC_DS_INITIALIZE_CALLBACK message."; + DCHECK(media_task_runner_->BelongsToCurrentThread()); const pb::DemuxerStreamInitializeCallback callback_message = message->demuxerstream_initializecb_rpc(); if (callback_message.type() != type_) { OnError("Wrong type"); return; } + if ((type_ == DemuxerStream::AUDIO && audio_decoder_config_.IsValidConfig()) || (type_ == DemuxerStream::VIDEO && video_decoder_config_.IsValidConfig())) { - OnError("Duplicate Iniitialize"); - return; - } - if (init_done_callback_.is_null()) { - OnError("Iniitialize callback missing"); + OnError("Duplicate initialization"); return; } @@ -186,21 +199,41 @@ void MediaStream::OnInitializeCallback( callback_message.has_audio_decoder_config()) { const pb::AudioDecoderConfig audio_message = callback_message.audio_decoder_config(); - UpdateConfig(&audio_message, nullptr); + UpdateAudioConfig(audio_message); } else if (type_ == DemuxerStream::VIDEO && callback_message.has_video_decoder_config()) { const pb::VideoDecoderConfig video_message = callback_message.video_decoder_config(); - UpdateConfig(nullptr, &video_message); + UpdateVideoConfig(video_message); } else { - OnError("config missing"); + OnError("Config missing"); return; } + + rpc_initialized_ = true; + CompleteInitialize(); +} + +void StreamProvider::MediaStream::CompleteInitialize() { + DCHECK(media_task_runner_->BelongsToCurrentThread()); + + // Initialization finished when received RPC_DS_INITIALIZE_CALLBACK and + // |decoder_buffer_reader_| is created. + if (!rpc_initialized_ || !decoder_buffer_reader_) + return; + + if (!init_done_callback_) { + OnError("Initialize callback missing"); + return; + } + std::move(init_done_callback_).Run(); } -void MediaStream::OnReadUntilCallback(std::unique_ptr<pb::RpcMessage> message) { - DVLOG(3) << __func__ << ": Receives RPC_DS_READUNTIL_CALLBACK message."; +void StreamProvider::MediaStream::OnReadUntilCallback( + std::unique_ptr<pb::RpcMessage> message) { + DCHECK(media_task_runner_->BelongsToCurrentThread()); + if (!read_until_sent_) { OnError("Unexpected ReadUntilCallback"); return; @@ -208,98 +241,90 @@ void MediaStream::OnReadUntilCallback(std::unique_ptr<pb::RpcMessage> message) { read_until_sent_ = false; const pb::DemuxerStreamReadUntilCallback callback_message = message->demuxerstream_readuntilcb_rpc(); - last_read_until_count_ = callback_message.count(); + total_received_frame_count_ = callback_message.count(); + if (ToDemuxerStreamStatus(callback_message.status()) == kConfigChanged) { config_changed_ = true; + if (callback_message.has_audio_decoder_config()) { const pb::AudioDecoderConfig audio_message = callback_message.audio_decoder_config(); - UpdateConfig(&audio_message, nullptr); + UpdateAudioConfig(audio_message); } + if (callback_message.has_video_decoder_config()) { const pb::VideoDecoderConfig video_message = callback_message.video_decoder_config(); - UpdateConfig(nullptr, &video_message); + UpdateVideoConfig(video_message); } + if (buffers_.empty() && !read_complete_callback_.is_null()) CompleteRead(DemuxerStream::kConfigChanged); + return; } + if (buffers_.empty() && !read_complete_callback_.is_null()) SendReadUntil(); } -void MediaStream::UpdateConfig(const pb::AudioDecoderConfig* audio_message, - const pb::VideoDecoderConfig* video_message) { - if (type_ == AUDIO) { - DCHECK(audio_message && !video_message); - AudioDecoderConfig audio_config; - ConvertProtoToAudioDecoderConfig(*audio_message, &audio_config); - if (!audio_config.IsValidConfig()) { - OnError("Invalid audio config"); - return; - } - if (config_changed_) { - DCHECK(audio_decoder_config_.IsValidConfig()); - DCHECK(!next_audio_decoder_config_.IsValidConfig()); - next_audio_decoder_config_ = audio_config; - } else { - DCHECK(!audio_decoder_config_.IsValidConfig()); - audio_decoder_config_ = audio_config; - } - } else if (type_ == VIDEO) { - DCHECK(video_message && !audio_message); - VideoDecoderConfig video_config; - ConvertProtoToVideoDecoderConfig(*video_message, &video_config); - if (!video_config.IsValidConfig()) { - OnError("Invalid video config"); - return; - } - if (config_changed_) { - DCHECK(video_decoder_config_.IsValidConfig()); - DCHECK(!next_video_decoder_config_.IsValidConfig()); - next_video_decoder_config_ = video_config; - } else { - DCHECK(!video_decoder_config_.IsValidConfig()); - video_decoder_config_ = video_config; - } +void StreamProvider::MediaStream::UpdateAudioConfig( + const pb::AudioDecoderConfig& audio_message) { + DCHECK(type_ == AUDIO); + AudioDecoderConfig audio_config; + ConvertProtoToAudioDecoderConfig(audio_message, &audio_config); + if (!audio_config.IsValidConfig()) { + OnError("Invalid audio config"); + return; + } + if (config_changed_) { + DCHECK(audio_decoder_config_.IsValidConfig()); + DCHECK(!next_audio_decoder_config_.IsValidConfig()); + next_audio_decoder_config_ = audio_config; } else { - NOTREACHED() << ": Only supports video or audio stream."; + DCHECK(!audio_decoder_config_.IsValidConfig()); + audio_decoder_config_ = audio_config; } } -void MediaStream::SendReadUntil() { +void StreamProvider::MediaStream::UpdateVideoConfig( + const pb::VideoDecoderConfig& video_message) { + DCHECK(type_ == VIDEO); + VideoDecoderConfig video_config; + ConvertProtoToVideoDecoderConfig(video_message, &video_config); + if (!video_config.IsValidConfig()) { + OnError("Invalid video config"); + return; + } + if (config_changed_) { + DCHECK(video_decoder_config_.IsValidConfig()); + DCHECK(!next_video_decoder_config_.IsValidConfig()); + next_video_decoder_config_ = video_config; + } else { + DCHECK(!video_decoder_config_.IsValidConfig()); + video_decoder_config_ = video_config; + } +} + +void StreamProvider::MediaStream::SendReadUntil() { if (read_until_sent_) return; - DVLOG(3) << "Issues RPC_DS_READUNTIL RPC message to remote_handle_=" - << remote_handle_ << " with callback handle=" << rpc_handle_ - << " count=" << last_read_until_count_; std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage()); rpc->set_handle(remote_handle_); rpc->set_proc(pb::RpcMessage::RPC_DS_READUNTIL); auto* message = rpc->mutable_demuxerstream_readuntil_rpc(); - last_read_until_count_ += kNumFramesInEachReadUntil; - message->set_count(last_read_until_count_); + message->set_count(total_received_frame_count_ + kNumFramesInEachReadUntil); message->set_callback_handle(rpc_handle_); - rpc_broker_->SendMessageToRemote(std::move(rpc)); + SendRpcMessageOnMainThread(std::move(rpc)); read_until_sent_ = true; } -void MediaStream::FlushUntil(int count) { - while (!buffers_.empty()) { - buffers_.pop_front(); - } - - last_read_until_count_ = count; - if (!read_complete_callback_.is_null()) - CompleteRead(DemuxerStream::kAborted); - read_until_sent_ = false; -} - -void MediaStream::Read(ReadCB read_cb) { +void StreamProvider::MediaStream::Read(ReadCB read_cb) { + DCHECK(media_task_runner_->BelongsToCurrentThread()); DCHECK(read_complete_callback_.is_null()); DCHECK(read_cb); + read_complete_callback_ = std::move(read_cb); if (buffers_.empty() && config_changed_) { CompleteRead(DemuxerStream::kConfigChanged); @@ -315,26 +340,21 @@ void MediaStream::Read(ReadCB read_cb) { CompleteRead(DemuxerStream::kOk); } -bool MediaStream::IsReadPending() const { +bool StreamProvider::MediaStream::IsReadPending() const { return !read_complete_callback_.is_null(); } -void MediaStream::CompleteRead(DemuxerStream::Status status) { - DVLOG(3) << __func__ << ": " << status; +void StreamProvider::MediaStream::CompleteRead(DemuxerStream::Status status) { + DCHECK(media_task_runner_->BelongsToCurrentThread()); + switch (status) { case DemuxerStream::kConfigChanged: if (type_ == AUDIO) { DCHECK(next_audio_decoder_config_.IsValidConfig()); audio_decoder_config_ = next_audio_decoder_config_; -#if DCHECK_IS_ON() - next_audio_decoder_config_ = AudioDecoderConfig(); -#endif // DCHECK_IS_ON() } else { DCHECK(next_video_decoder_config_.IsValidConfig()); video_decoder_config_ = next_video_decoder_config_; -#if DCHECK_IS_ON() - next_video_decoder_config_ = VideoDecoderConfig(); -#endif // DCHECK_IS_ON() } config_changed_ = false; std::move(read_complete_callback_).Run(status, nullptr); @@ -344,111 +364,263 @@ void MediaStream::CompleteRead(DemuxerStream::Status status) { std::move(read_complete_callback_).Run(status, nullptr); return; case DemuxerStream::kOk: + DCHECK(read_complete_callback_); DCHECK(!buffers_.empty()); + DCHECK_LT(current_frame_count_, buffered_frame_count_); scoped_refptr<DecoderBuffer> frame_data = buffers_.front(); buffers_.pop_front(); + ++current_frame_count_; std::move(read_complete_callback_).Run(status, frame_data); return; } } -AudioDecoderConfig MediaStream::audio_decoder_config() { - DVLOG(3) << __func__; +AudioDecoderConfig StreamProvider::MediaStream::audio_decoder_config() { DCHECK(type_ == DemuxerStream::AUDIO); return audio_decoder_config_; } -VideoDecoderConfig MediaStream::video_decoder_config() { - DVLOG(3) << __func__; +VideoDecoderConfig StreamProvider::MediaStream::video_decoder_config() { DCHECK(type_ == DemuxerStream::VIDEO); return video_decoder_config_; } -DemuxerStream::Type MediaStream::type() const { +DemuxerStream::Type StreamProvider::MediaStream::type() const { return type_; } -DemuxerStream::Liveness MediaStream::liveness() const { +DemuxerStream::Liveness StreamProvider::MediaStream::liveness() const { return DemuxerStream::LIVENESS_LIVE; } -bool MediaStream::SupportsConfigChanges() { +bool StreamProvider::MediaStream::SupportsConfigChanges() { return true; } -void MediaStream::AppendBuffer(scoped_refptr<DecoderBuffer> buffer) { - DVLOG(3) << __func__; +void StreamProvider::MediaStream::AppendBuffer( + uint32_t count, + scoped_refptr<DecoderBuffer> buffer) { + DCHECK(media_task_runner_->BelongsToCurrentThread()); + + // Drop flushed frame. + if (count < current_frame_count_) + return; + + // Continuity check. + DCHECK(buffers_.empty() || buffered_frame_count_ == count); + buffers_.push_back(buffer); + buffered_frame_count_ = count + 1; + if (!read_complete_callback_.is_null()) CompleteRead(DemuxerStream::kOk); } -void MediaStream::OnError(const std::string& error) { - VLOG(1) << __func__ << ": " << error; - if (error_callback_.is_null()) - return; - std::move(error_callback_).Run(); +void StreamProvider::MediaStream::OnError(const std::string& error) { + auto rpc = std::make_unique<pb::RpcMessage>(); + rpc->set_handle(remote_handle_); + rpc->set_proc(pb::RpcMessage::RPC_DS_ONERROR); + SendRpcMessageOnMainThread(std::move(rpc)); } -StreamProvider::StreamProvider(RpcBroker* rpc_broker, - base::OnceClosure error_callback) - : rpc_broker_(rpc_broker), error_callback_(std::move(error_callback)) {} +StreamProvider::StreamProvider( + ReceiverController* receiver_controller, + const scoped_refptr<base::SingleThreadTaskRunner>& media_task_runner) + : main_task_runner_(base::ThreadTaskRunnerHandle::Get()), + media_task_runner_(media_task_runner), + receiver_controller_(receiver_controller), + rpc_broker_(receiver_controller_->rpc_broker()) { + DCHECK(receiver_controller_); + DCHECK(rpc_broker_); + + media_weak_this_ = media_weak_factory_.GetWeakPtr(); + + auto callback = BindToLoop( + media_task_runner_, + base::BindRepeating(&StreamProvider::OnReceivedRpc, media_weak_this_)); + rpc_broker_->RegisterMessageReceiverCallback(RpcBroker::kAcquireDemuxerHandle, + callback); +} -StreamProvider::~StreamProvider() = default; +StreamProvider::~StreamProvider() { + DCHECK(main_task_runner_->BelongsToCurrentThread()); + rpc_broker_->UnregisterMessageReceiverCallback( + RpcBroker::kAcquireDemuxerHandle); +} -void StreamProvider::Initialize(int remote_audio_handle, - int remote_video_handle, - base::OnceClosure callback) { - DVLOG(3) << __func__ << ": remote_audio_handle=" << remote_audio_handle - << " remote_video_handle=" << remote_video_handle; - if (!init_done_callback_.is_null()) { - OnError("Duplicate initialization."); - return; - } - if (remote_audio_handle == RpcBroker::kInvalidHandle && - remote_video_handle == RpcBroker::kInvalidHandle) { - OnError("Invalid handle."); - return; +std::string StreamProvider::GetDisplayName() const { + return "media::remoting::StreamProvider"; +} + +void StreamProvider::Initialize(DemuxerHost* host, + PipelineStatusCallback status_cb) { + DCHECK(media_task_runner_->BelongsToCurrentThread()); + init_done_callback_ = std::move(status_cb); + CompleteInitialize(); +} + +void StreamProvider::AbortPendingReads() {} + +void StreamProvider::StartWaitingForSeek(base::TimeDelta seek_time) {} + +void StreamProvider::CancelPendingSeek(base::TimeDelta seek_time) {} + +void StreamProvider::Seek(base::TimeDelta time, + PipelineStatusCallback seek_cb) { + media_task_runner_->PostTask( + FROM_HERE, + base::BindOnce(std::move(seek_cb), PipelineStatus::PIPELINE_OK)); +} + +void StreamProvider::Stop() {} + +base::TimeDelta StreamProvider::GetStartTime() const { + return base::TimeDelta(); +} + +base::Time StreamProvider::GetTimelineOffset() const { + return base::Time(); +} + +int64_t StreamProvider::GetMemoryUsage() const { + return 0; +} + +base::Optional<container_names::MediaContainerName> +StreamProvider::GetContainerForMetrics() const { + return base::Optional<container_names::MediaContainerName>(); +} + +void StreamProvider::OnEnabledAudioTracksChanged( + const std::vector<MediaTrack::Id>& track_ids, + base::TimeDelta curr_time, + TrackChangeCB change_completed_cb) { + std::vector<DemuxerStream*> streams; + std::move(change_completed_cb).Run(DemuxerStream::AUDIO, streams); + DVLOG(1) << "Track changes are not supported."; +} + +void StreamProvider::OnSelectedVideoTrackChanged( + const std::vector<MediaTrack::Id>& track_ids, + base::TimeDelta curr_time, + TrackChangeCB change_completed_cb) { + std::vector<DemuxerStream*> streams; + std::move(change_completed_cb).Run(DemuxerStream::VIDEO, streams); + DVLOG(1) << "Track changes are not supported."; +} + +void StreamProvider::Destroy() { + DCHECK(media_task_runner_->BelongsToCurrentThread()); + + if (init_done_callback_) + std::move(init_done_callback_).Run(PIPELINE_ERROR_ABORT); + + // Invalid weak pointers to prevent |this| from receiving RPC calls on the + // media thread. + media_weak_factory_.InvalidateWeakPtrs(); + + audio_stream_.reset(); + video_stream_.reset(); + + // After invalidating all weak ptrs of |media_weak_factory_|, StreamProvider + // won't be access anymore, so using |this| here is safe. + main_task_runner_->DeleteSoon(FROM_HERE, this); +} + +void StreamProvider::OnReceivedRpc(std::unique_ptr<pb::RpcMessage> message) { + switch (message->proc()) { + case pb::RpcMessage::RPC_ACQUIRE_DEMUXER: + OnAcquireDemuxer(std::move(message)); + break; + default: + VLOG(3) << __func__ << "Unknown RPC message."; } +} - init_done_callback_ = std::move(callback); - if (remote_audio_handle != RpcBroker::kInvalidHandle) { - audio_stream_.reset(new MediaStream( - rpc_broker_, DemuxerStream::AUDIO, remote_audio_handle, - base::BindOnce(&StreamProvider::OnError, weak_factory_.GetWeakPtr(), - "Media stream error"))); - audio_stream_->Initialize(base::BindOnce( - &StreamProvider::AudioStreamInitialized, weak_factory_.GetWeakPtr())); +void StreamProvider::OnAcquireDemuxer(std::unique_ptr<pb::RpcMessage> message) { + DCHECK(media_task_runner_->BelongsToCurrentThread()); + DCHECK(message->has_acquire_demuxer_rpc()); + + int32_t audio_demuxer_handle = + message->acquire_demuxer_rpc().audio_demuxer_handle(); + int32_t video_demuxer_handle = + message->acquire_demuxer_rpc().video_demuxer_handle(); + has_audio_ = audio_demuxer_handle != RpcBroker::kInvalidHandle; + has_video_ = video_demuxer_handle != RpcBroker::kInvalidHandle; + + DCHECK(has_audio_ || has_video_); + + if (has_audio_) { + auto callback = BindToCurrentLoop(base::BindOnce( + &StreamProvider::OnAudioStreamCreated, media_weak_this_)); + main_task_runner_->PostTask( + FROM_HERE, base::BindOnce(&MediaStream::CreateOnMainThread, rpc_broker_, + DemuxerStream::AUDIO, audio_demuxer_handle, + media_task_runner_, std::move(callback))); } - if (remote_video_handle != RpcBroker::kInvalidHandle) { - video_stream_.reset(new MediaStream( - rpc_broker_, DemuxerStream::VIDEO, remote_video_handle, - base::BindOnce(&StreamProvider::OnError, weak_factory_.GetWeakPtr(), - "Media stream error"))); - video_stream_->Initialize(base::BindOnce( - &StreamProvider::VideoStreamInitialized, weak_factory_.GetWeakPtr())); + + if (has_video_) { + auto callback = BindToCurrentLoop(base::BindOnce( + &StreamProvider::OnVideoStreamCreated, media_weak_this_)); + main_task_runner_->PostTask( + FROM_HERE, base::BindOnce(&MediaStream::CreateOnMainThread, rpc_broker_, + DemuxerStream::VIDEO, video_demuxer_handle, + media_task_runner_, std::move(callback))); } } -void StreamProvider::OnError(const std::string& error) { - VLOG(1) << __func__ << ": " << error; - if (error_callback_.is_null()) +void StreamProvider::OnAudioStreamCreated(MediaStream::UniquePtr stream) { + DCHECK(media_task_runner_->BelongsToCurrentThread()); + audio_stream_ = std::move(stream); + audio_stream_->Initialize(base::BindOnce( + &StreamProvider::OnAudioStreamInitialized, media_weak_this_)); + InitializeDataPipe(); +} + +void StreamProvider::OnVideoStreamCreated(MediaStream::UniquePtr stream) { + DCHECK(media_task_runner_->BelongsToCurrentThread()); + video_stream_ = std::move(stream); + video_stream_->Initialize(base::BindOnce( + &StreamProvider::OnVideoStreamInitialized, media_weak_this_)); + InitializeDataPipe(); +} + +void StreamProvider::InitializeDataPipe() { + DCHECK(media_task_runner_->BelongsToCurrentThread()); + + if ((has_audio_ && !audio_stream_) || (has_video_ && !video_stream_)) return; - std::move(error_callback_).Run(); + + receiver_controller_->StartDataStreams( + has_audio_ ? audio_stream_->BindNewPipeAndPassRemote() + : mojo::NullRemote(), + has_video_ ? video_stream_->BindNewPipeAndPassRemote() + : mojo::NullRemote()); } -void StreamProvider::AudioStreamInitialized() { - DCHECK(!init_done_callback_.is_null()); +void StreamProvider::OnAudioStreamInitialized() { audio_stream_initialized_ = true; - if (video_stream_initialized_ || !video_stream_) - std::move(init_done_callback_).Run(); + CompleteInitialize(); } -void StreamProvider::VideoStreamInitialized() { - DCHECK(!init_done_callback_.is_null()); +void StreamProvider::OnVideoStreamInitialized() { video_stream_initialized_ = true; - if (audio_stream_initialized_ || !audio_stream_) - std::move(init_done_callback_).Run(); + CompleteInitialize(); +} + +void StreamProvider::CompleteInitialize() { + DCHECK(media_task_runner_->BelongsToCurrentThread()); + + // Haven't receive RpcAcquireRenderer message + if (!has_audio_ && !has_video_) + return; + + if ((has_audio_ && !audio_stream_initialized_) || + (has_video_ && !video_stream_initialized_) || !init_done_callback_) + return; + + // |init_done_callback_| should be called on |media_task_runner_|. + std::move(init_done_callback_).Run(PipelineStatus::PIPELINE_OK); } std::vector<DemuxerStream*> StreamProvider::GetAllStreams() { @@ -460,25 +632,14 @@ std::vector<DemuxerStream*> StreamProvider::GetAllStreams() { return streams; } -void StreamProvider::AppendBuffer(DemuxerStream::Type type, - scoped_refptr<DecoderBuffer> buffer) { - if (type == DemuxerStream::AUDIO) - audio_stream_->AppendBuffer(buffer); - else if (type == DemuxerStream::VIDEO) - video_stream_->AppendBuffer(buffer); - else - NOTREACHED() << ": Only supports video or audio stream."; -} +} // namespace remoting +} // namespace media + +namespace std { -void StreamProvider::FlushUntil(DemuxerStream::Type type, int count) { - DVLOG(3) << __func__ << ": type=" << type << " count=" << count; - if (type == DemuxerStream::AUDIO) - audio_stream_->FlushUntil(count); - else if (type == DemuxerStream::VIDEO) - video_stream_->FlushUntil(count); - else - NOTREACHED() << ": Only supports video or audio stream."; +void default_delete<media::remoting::StreamProvider>::operator()( + media::remoting::StreamProvider* ptr) const { + ptr->Destroy(); } -} // namespace remoting -} // namespace media +} // namespace std diff --git a/chromium/media/remoting/stream_provider.h b/chromium/media/remoting/stream_provider.h index f6c7802008b..95d2c214279 100644 --- a/chromium/media/remoting/stream_provider.h +++ b/chromium/media/remoting/stream_provider.h @@ -5,62 +5,280 @@ #ifndef MEDIA_REMOTING_STREAM_PROVIDER_H_ #define MEDIA_REMOTING_STREAM_PROVIDER_H_ +#include "base/callback_forward.h" +#include "base/containers/circular_deque.h" +#include "base/memory/scoped_refptr.h" #include "base/memory/weak_ptr.h" +#include "base/sequenced_task_runner_helpers.h" +#include "base/single_thread_task_runner.h" #include "media/base/audio_decoder_config.h" +#include "media/base/demuxer.h" #include "media/base/demuxer_stream.h" -#include "media/base/media_resource.h" #include "media/base/video_decoder_config.h" -#include "media/remoting/rpc_broker.h" +#include "media/mojo/mojom/remoting.mojom.h" +#include "media/remoting/media_remoting_rpc.pb.h" +#include "mojo/public/cpp/bindings/receiver.h" +#include "mojo/public/cpp/bindings/remote.h" + +namespace base { +class SingleThreadTaskRunner; +} // namespace base namespace media { + +class MojoDecoderBufferReader; + namespace remoting { -class MediaStream; +class ReceiverController; +class RpcBroker; // The media stream provider for Media Remoting receiver. -class StreamProvider final : public MediaResource { +class StreamProvider final : public Demuxer { public: - StreamProvider(RpcBroker* rpc_broker, base::OnceClosure error_callback); + StreamProvider( + ReceiverController* receiver_controller, + const scoped_refptr<base::SingleThreadTaskRunner>& media_task_runner); - ~StreamProvider() override; - - // MediaResource implemenation. + // Demuxer implementation. std::vector<DemuxerStream*> GetAllStreams() override; + std::string GetDisplayName() const override; + void Initialize(DemuxerHost* host, PipelineStatusCallback status_cb) override; + void AbortPendingReads() override; + void StartWaitingForSeek(base::TimeDelta seek_time) override; + void CancelPendingSeek(base::TimeDelta seek_time) override; + void Seek(base::TimeDelta time, PipelineStatusCallback status_cb) override; + void Stop() override; + base::TimeDelta GetStartTime() const override; + base::Time GetTimelineOffset() const override; + int64_t GetMemoryUsage() const override; + base::Optional<container_names::MediaContainerName> GetContainerForMetrics() + const override; + void OnEnabledAudioTracksChanged(const std::vector<MediaTrack::Id>& track_ids, + base::TimeDelta curr_time, + TrackChangeCB change_completed_cb) override; + void OnSelectedVideoTrackChanged(const std::vector<MediaTrack::Id>& track_ids, + base::TimeDelta curr_time, + TrackChangeCB change_completed_cb) override; - void Initialize(int remote_audio_handle, - int remote_video_handle, - base::OnceClosure callback); - void AppendBuffer(DemuxerStream::Type type, - scoped_refptr<DecoderBuffer> buffer); - void FlushUntil(DemuxerStream::Type type, int count); + protected: + // Deletion is only allowed via Destroy(). + ~StreamProvider() override; private: - // Called when audio/video stream is initialized. - void AudioStreamInitialized(); - void VideoStreamInitialized(); + // An implementation of media::DemuxerStream on Media Remoting receiver. + // Receives data from mojo data pipe, and returns one frame or/and status when + // Read() is called. + class MediaStream final : public DemuxerStream, + public mojom::RemotingDataStreamReceiver { + public: + using UniquePtr = + std::unique_ptr<MediaStream, std::function<void(MediaStream*)>>; - // Called when any error occurs. - void OnError(const std::string& error); + // MediaStream should be created on the main thread to be able to get unique + // handle ID from |rpc_broker_|. + static void CreateOnMainThread( + RpcBroker* rpc_broker, + Type type, + int32_t handle, + const scoped_refptr<base::SingleThreadTaskRunner>& media_task_runner, + base::OnceCallback<void(MediaStream::UniquePtr)> callback); - RpcBroker* const rpc_broker_; // Outlives this class. - std::unique_ptr<MediaStream> video_stream_; - std::unique_ptr<MediaStream> audio_stream_; - bool audio_stream_initialized_ = false; - bool video_stream_initialized_ = false; + // In order to destroy members in the right thread, MediaStream has to use + // DestructionHelper() to destroy itself. + static void DestructionHelper(MediaStream* stream); - // Set when Initialize() is called, and will run when both video and audio - // streams are initialized or error occurs. - base::OnceClosure init_done_callback_; + MediaStream( + RpcBroker* rpc_broker, + Type type, + int32_t remote_handle, + const scoped_refptr<base::SingleThreadTaskRunner>& media_task_runner); + + // DemuxerStream implementation. + void Read(ReadCB read_cb) override; + bool IsReadPending() const override; + AudioDecoderConfig audio_decoder_config() override; + VideoDecoderConfig video_decoder_config() override; + DemuxerStream::Type type() const override; + Liveness liveness() const override; + bool SupportsConfigChanges() override; + + void Initialize(base::OnceClosure init_done_cb); + + mojo::PendingRemote<mojom::RemotingDataStreamReceiver> + BindNewPipeAndPassRemote() { + return receiver_.BindNewPipeAndPassRemote(); + } + + private: + friend class base::DeleteHelper<MediaStream>; // For using DeleteSoon(). + // For testing. + friend class StreamProviderTest; + + // Prevent from unique_ptr using ~MediaStream() to destroy MediaStream + // instances. Use DestructionHelper() as the custom deleter with unique_ptr + // to destroy MediaStream instances. + ~MediaStream() override; + + void Destroy(); + + // Send RPC message on |main_task_runner_|. + void SendRpcMessageOnMainThread(std::unique_ptr<pb::RpcMessage> message); + + // mojom::RemotingDataStreamReceiver implementation. + void InitializeDataPipe( + mojo::ScopedDataPipeConsumerHandle data_pipe) override; + void ReceiveFrame(uint32_t count, mojom::DecoderBufferPtr buffer) override; + void FlushUntil(uint32_t count) override; + + // RPC messages handlers. + void OnReceivedRpc(std::unique_ptr<pb::RpcMessage> message); + void OnInitializeCallback(std::unique_ptr<pb::RpcMessage> message); + void OnReadUntilCallback(std::unique_ptr<pb::RpcMessage> message); + + // Issues the ReadUntil RPC message when read is pending and buffer is + // empty. + void SendReadUntil(); + + // Run |init_done_callback_| when MojoDecoderBufferReader is created and + // received RPC_DS_INITIALIZE_CALLBACK + void CompleteInitialize(); + + // Append a frame into |buffers_|. + void AppendBuffer(uint32_t count, scoped_refptr<DecoderBuffer> buffer); + + // Run and reset the read callback. + void CompleteRead(DemuxerStream::Status status); + + // Update the audio/video decoder config. When config changes in the mid + // stream, the new config will be stored in |next_audio_decoder_config_|. + // Old config will be dropped when all associated frames are consumed. + void UpdateAudioConfig(const pb::AudioDecoderConfig& audio_message); + void UpdateVideoConfig(const pb::VideoDecoderConfig& video_message); + + // Called when any error occurs. + void OnError(const std::string& error); + + scoped_refptr<base::SingleThreadTaskRunner> main_task_runner_; + scoped_refptr<base::SingleThreadTaskRunner> media_task_runner_; + + RpcBroker* const rpc_broker_; // Outlives this class. + const Type type_; + const int remote_handle_; + const int rpc_handle_; + + // Set when Initialize() is called. + base::OnceClosure init_done_callback_; - // Run when first error occurs; - base::OnceClosure error_callback_; + // The frame count of the frame to be returned on the next Read call. It + // will be increased whenever a frame is read. It will be updated when + // FlushUntil() is called. + uint32_t current_frame_count_ = 0; - base::WeakPtrFactory<StreamProvider> weak_factory_{this}; + // One plus the last frame count received over RTP. Used for continuity + // check. + uint32_t buffered_frame_count_ = 0; - DISALLOW_COPY_AND_ASSIGN(StreamProvider); + // The total number of frames received from the sender side. It will be used + // as the base value for sending ReadUntil() to request more frames and be + // updated in OnReadUntilCallback() which would get the message that + // contains how many frames are sent. + uint32_t total_received_frame_count_ = 0; + + // Indicates whether Audio/VideoDecoderConfig changed and the frames with + // the old config are not yet consumed. The new config is stored in the end + // of |audio/video_decoder_config_|. + bool config_changed_ = false; + + // Indicates whether a ReadUntil RPC message was sent without receiving the + // ReadUntilCallback message yet. + bool read_until_sent_ = false; + + // Indicates whether RPC_DS_INITIALIZE_CALLBACK received. + bool rpc_initialized_ = false; + + // Set when Read() is called. Run only once when read completes. + ReadCB read_complete_callback_; + + // The frame data would be sent via Mojo IPC as MojoDecoderBuffer. When a + // frame is sent to |this| from host by calling ReceiveFrame(), + // |decoder_buffer_reader_| is used to read the frame date from data pipe. + std::unique_ptr<MojoDecoderBufferReader> decoder_buffer_reader_; + + base::circular_deque<scoped_refptr<DecoderBuffer>> buffers_; + + // Current audio/video config. + AudioDecoderConfig audio_decoder_config_; + VideoDecoderConfig video_decoder_config_; + + // Stores the new audio/video config when config changes. + AudioDecoderConfig next_audio_decoder_config_; + VideoDecoderConfig next_video_decoder_config_; + + mojo::Receiver<mojom::RemotingDataStreamReceiver> receiver_{this}; + + base::WeakPtr<MediaStream> media_weak_this_; + base::WeakPtrFactory<MediaStream> media_weak_factory_{this}; + }; + + friend std::default_delete<StreamProvider>; + friend class base::DeleteHelper<StreamProvider>; // For using DeleteSoon(). + + // For testing. + friend class StreamProviderTest; + + void Destroy(); + + // RPC messages handlers. + void OnReceivedRpc(std::unique_ptr<pb::RpcMessage> message); + void OnAcquireDemuxer(std::unique_ptr<pb::RpcMessage> message); + + // Called when audio/video stream is created and initialized. + void InitializeDataPipe(); + void OnAudioStreamCreated(MediaStream::UniquePtr stream); + void OnVideoStreamCreated(MediaStream::UniquePtr stream); + void OnAudioStreamInitialized(); + void OnVideoStreamInitialized(); + void CompleteInitialize(); + + scoped_refptr<base::SingleThreadTaskRunner> main_task_runner_; + scoped_refptr<base::SingleThreadTaskRunner> media_task_runner_; + ReceiverController* const receiver_controller_; // Outlives this class + RpcBroker* const rpc_broker_; // Outlives this class + MediaStream::UniquePtr audio_stream_; + MediaStream::UniquePtr video_stream_; + bool has_audio_{false}; + bool has_video_{false}; + bool audio_stream_initialized_{false}; + bool video_stream_initialized_{false}; + + // Set when Initialize() is called, and will run when both video and audio + // streams are initialized or error occurs. + PipelineStatusCallback init_done_callback_; + + base::WeakPtr<StreamProvider> media_weak_this_; + base::WeakPtrFactory<StreamProvider> media_weak_factory_{this}; }; } // namespace remoting } // namespace media +namespace std { + +// Specialize std::default_delete to call Destroy(). +template <> +struct default_delete<media::remoting::StreamProvider> { + constexpr default_delete() = default; + + template <typename U, + typename = typename std::enable_if<std::is_convertible< + U*, + media::remoting::StreamProvider*>::value>::type> + explicit default_delete(const default_delete<U>& d) {} + + void operator()(media::remoting::StreamProvider* ptr) const; +}; + +} // namespace std + #endif // MEDIA_REMOTING_STREAM_PROVIDER_H_ diff --git a/chromium/media/remoting/stream_provider_unittest.cc b/chromium/media/remoting/stream_provider_unittest.cc new file mode 100644 index 00000000000..7ad43222db0 --- /dev/null +++ b/chromium/media/remoting/stream_provider_unittest.cc @@ -0,0 +1,316 @@ +// Copyright 2020 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "media/remoting/stream_provider.h" + +#include "base/test/task_environment.h" +#include "media/base/audio_decoder_config.h" +#include "media/base/demuxer_stream.h" +#include "media/base/media_util.h" +#include "media/base/test_helpers.h" +#include "media/base/video_decoder_config.h" +#include "media/remoting/mock_receiver_controller.h" +#include "media/remoting/proto_enum_utils.h" +#include "media/remoting/proto_utils.h" +#include "testing/gmock/include/gmock/gmock.h" +#include "testing/gtest/include/gtest/gtest.h" + +using testing::NiceMock; + +namespace { +constexpr int kBufferSize = 10; +} // namespace + +namespace media { +namespace remoting { + +class StreamProviderTest : public testing::Test { + public: + StreamProviderTest() + : audio_config_(TestAudioConfig::Normal()), + video_config_(TestVideoConfig::Normal()), + audio_buffer_(new DecoderBuffer(kBufferSize)), + video_buffer_(DecoderBuffer::CreateEOSBuffer()) {} + + void SetUp() override { + mock_controller_ = MockReceiverController::GetInstance(); + mock_controller_->Initialize( + mock_controller_->mock_remotee()->BindNewPipeAndPassRemote()); + mock_remotee_ = mock_controller_->mock_remotee(); + stream_provider_ = std::make_unique<StreamProvider>( + mock_controller_, base::ThreadTaskRunnerHandle::Get()); + + rpc_broker_ = mock_controller_->rpc_broker(); + sender_audio_demuxer_stream_handle_ = rpc_broker_->GetUniqueHandle(); + sender_video_demuxer_stream_handle_ = rpc_broker_->GetUniqueHandle(); + rpc_broker_->RegisterMessageReceiverCallback( + sender_audio_demuxer_stream_handle_, + base::BindRepeating(&StreamProviderTest::OnDemuxerStreamReceivedRpc, + base::Unretained(this), + DemuxerStream::Type::AUDIO)); + rpc_broker_->RegisterMessageReceiverCallback( + sender_video_demuxer_stream_handle_, + base::BindRepeating(&StreamProviderTest::OnDemuxerStreamReceivedRpc, + base::Unretained(this), + DemuxerStream::Type::VIDEO)); + } + + void TearDown() override { + stream_provider_.reset(); + task_environment_.RunUntilIdle(); + } + + void OnDemuxerStreamReceivedRpc(DemuxerStream::Type type, + std::unique_ptr<pb::RpcMessage> message) { + DCHECK(message); + switch (message->proc()) { + case pb::RpcMessage::RPC_DS_INITIALIZE: + if (type == DemuxerStream::Type::AUDIO) { + receiver_audio_demuxer_stream_handle_ = message->integer_value(); + } else if (type == DemuxerStream::Type::VIDEO) { + receiver_video_demuxer_stream_handle_ = message->integer_value(); + } else { + NOTREACHED(); + } + + RpcInitializeCallback(type); + break; + + case pb::RpcMessage::RPC_DS_READUNTIL: + ReadUntil(type); + break; + + default: + DVLOG(1) << __func__ << "Unknown supported message."; + } + } + + void RpcInitializeCallback(DemuxerStream::Type type) { + // Issues RPC_DS_INITIALIZE_CALLBACK RPC message. + auto rpc = std::make_unique<pb::RpcMessage>(); + rpc->set_handle(type == DemuxerStream::Type::AUDIO + ? receiver_audio_demuxer_stream_handle_ + : receiver_video_demuxer_stream_handle_); + rpc->set_proc(pb::RpcMessage::RPC_DS_INITIALIZE_CALLBACK); + auto* init_cb_message = rpc->mutable_demuxerstream_initializecb_rpc(); + init_cb_message->set_type(type); + + switch (type) { + case DemuxerStream::Type::AUDIO: { + pb::AudioDecoderConfig* audio_message = + init_cb_message->mutable_audio_decoder_config(); + ConvertAudioDecoderConfigToProto(audio_config_, audio_message); + break; + } + + case DemuxerStream::Type::VIDEO: { + pb::VideoDecoderConfig* video_message = + init_cb_message->mutable_video_decoder_config(); + ConvertVideoDecoderConfigToProto(video_config_, video_message); + break; + } + + default: + NOTREACHED(); + } + + rpc_broker_->SendMessageToRemote(std::move(rpc)); + } + + void ReadUntil(DemuxerStream::Type type) { + switch (type) { + case DemuxerStream::Type::AUDIO: + SendAudioFrame(); + break; + case DemuxerStream::Type::VIDEO: + SendVideoFrame(); + break; + default: + NOTREACHED(); + } + } + + void SendRpcAcquireDemuxer() { + auto rpc = std::make_unique<pb::RpcMessage>(); + rpc->set_handle(RpcBroker::kAcquireDemuxerHandle); + rpc->set_proc(pb::RpcMessage::RPC_ACQUIRE_DEMUXER); + pb::AcquireDemuxer* message = rpc->mutable_acquire_demuxer_rpc(); + message->set_audio_demuxer_handle(sender_audio_demuxer_stream_handle_); + message->set_video_demuxer_handle(sender_video_demuxer_stream_handle_); + rpc_broker_->SendMessageToRemote(std::move(rpc)); + } + + void OnStreamProviderInitialized(PipelineStatus status) { + EXPECT_EQ(PipelineStatus::PIPELINE_OK, status); + stream_provider_initialized_ = true; + audio_stream_ = + stream_provider_->GetFirstStream(DemuxerStream::Type::AUDIO); + video_stream_ = + stream_provider_->GetFirstStream(DemuxerStream::Type::VIDEO); + + EXPECT_TRUE(audio_stream_); + EXPECT_TRUE(video_stream_); + } + + void InitializeDemuxer() { + DCHECK(stream_provider_); + stream_provider_->Initialize( + nullptr, + base::BindOnce(&StreamProviderTest::OnStreamProviderInitialized, + base::Unretained(this))); + } + + void SendAudioFrame() { + mock_remotee_->SendAudioFrame(0, audio_buffer_); + SendRpcReadUntilCallback(DemuxerStream::Type::AUDIO); + } + + void SendVideoFrame() { + mock_remotee_->SendVideoFrame(0, video_buffer_); + SendRpcReadUntilCallback(DemuxerStream::Type::VIDEO); + } + + void SendRpcReadUntilCallback(DemuxerStream::Type type) { + // Issues RPC_DS_READUNTIL_CALLBACK RPC message. + auto rpc = std::make_unique<pb::RpcMessage>(); + rpc->set_handle(type == DemuxerStream::Type::AUDIO + ? receiver_audio_demuxer_stream_handle_ + : receiver_video_demuxer_stream_handle_); + rpc->set_proc(pb::RpcMessage::RPC_DS_READUNTIL_CALLBACK); + auto* message = rpc->mutable_demuxerstream_readuntilcb_rpc(); + message->set_count(0); + message->set_status( + ToProtoDemuxerStreamStatus(DemuxerStream::Status::kOk).value()); + rpc_broker_->SendMessageToRemote(std::move(rpc)); + } + + void FlushUntil(uint32_t flush_audio_count, uint32_t flush_video_count) { + mock_remotee_->OnFlushUntil(flush_audio_count, flush_video_count); + } + + uint32_t GetAudioCurrentFrameCount() { + return stream_provider_->audio_stream_->current_frame_count_; + } + + uint32_t GetVideoCurrentFrameCount() { + return stream_provider_->video_stream_->current_frame_count_; + } + + void OnBufferReadFromDemuxerStream(DemuxerStream::Type type, + DemuxerStream::Status status, + scoped_refptr<DecoderBuffer> buffer) { + EXPECT_EQ(status, DemuxerStream::Status::kOk); + switch (type) { + case DemuxerStream::Type::AUDIO: + received_audio_buffer_ = buffer; + break; + case DemuxerStream::Type::VIDEO: + received_video_buffer_ = buffer; + break; + default: + NOTREACHED(); + } + } + + base::test::TaskEnvironment task_environment_; + + AudioDecoderConfig audio_config_; + VideoDecoderConfig video_config_; + + DemuxerStream* audio_stream_; + DemuxerStream* video_stream_; + + scoped_refptr<DecoderBuffer> audio_buffer_; + scoped_refptr<DecoderBuffer> video_buffer_; + + bool stream_provider_initialized_{false}; + scoped_refptr<DecoderBuffer> received_audio_buffer_; + scoped_refptr<DecoderBuffer> received_video_buffer_; + + int sender_audio_demuxer_stream_handle_ = RpcBroker::kInvalidHandle; + int sender_video_demuxer_stream_handle_ = RpcBroker::kInvalidHandle; + int receiver_audio_demuxer_stream_handle_ = RpcBroker::kInvalidHandle; + int receiver_video_demuxer_stream_handle_ = RpcBroker::kInvalidHandle; + + RpcBroker* rpc_broker_; + MockReceiverController* mock_controller_; + MockRemotee* mock_remotee_; + std::unique_ptr<StreamProvider> stream_provider_; +}; + +TEST_F(StreamProviderTest, InitializeBeforeRpcAcquireDemuxer) { + InitializeDemuxer(); + EXPECT_FALSE(stream_provider_initialized_); + + SendRpcAcquireDemuxer(); + task_environment_.RunUntilIdle(); + + EXPECT_TRUE(mock_remotee_->audio_stream_.is_bound()); + EXPECT_TRUE(mock_remotee_->video_stream_.is_bound()); + EXPECT_TRUE(stream_provider_initialized_); + + // 1 audio stream and 1 video stream + EXPECT_EQ(size_t(2), stream_provider_->GetAllStreams().size()); +} + +TEST_F(StreamProviderTest, InitializeAfterRpcAcquireDemuxer) { + SendRpcAcquireDemuxer(); + EXPECT_FALSE(stream_provider_initialized_); + + InitializeDemuxer(); + task_environment_.RunUntilIdle(); + + EXPECT_TRUE(mock_remotee_->audio_stream_.is_bound()); + EXPECT_TRUE(mock_remotee_->video_stream_.is_bound()); + EXPECT_TRUE(stream_provider_initialized_); + + // 1 audio stream and 1 video stream + EXPECT_EQ(size_t(2), stream_provider_->GetAllStreams().size()); +} + +TEST_F(StreamProviderTest, ReadBuffer) { + InitializeDemuxer(); + SendRpcAcquireDemuxer(); + task_environment_.RunUntilIdle(); + EXPECT_TRUE(mock_remotee_->audio_stream_.is_bound()); + EXPECT_TRUE(mock_remotee_->video_stream_.is_bound()); + EXPECT_TRUE(stream_provider_initialized_); + + audio_stream_->Read( + base::BindOnce(&StreamProviderTest::OnBufferReadFromDemuxerStream, + base::Unretained(this), DemuxerStream::Type::AUDIO)); + task_environment_.RunUntilIdle(); + EXPECT_EQ(audio_buffer_->data_size(), received_audio_buffer_->data_size()); + EXPECT_EQ(audio_buffer_->end_of_stream(), + received_audio_buffer_->end_of_stream()); + EXPECT_EQ(audio_buffer_->is_key_frame(), + received_audio_buffer_->is_key_frame()); + + video_stream_->Read( + base::BindOnce(&StreamProviderTest::OnBufferReadFromDemuxerStream, + base::Unretained(this), DemuxerStream::Type::VIDEO)); + task_environment_.RunUntilIdle(); + EXPECT_EQ(video_buffer_->end_of_stream(), + received_video_buffer_->end_of_stream()); +} + +TEST_F(StreamProviderTest, FlushUntil) { + InitializeDemuxer(); + SendRpcAcquireDemuxer(); + task_environment_.RunUntilIdle(); + EXPECT_TRUE(mock_remotee_->audio_stream_.is_bound()); + EXPECT_TRUE(mock_remotee_->video_stream_.is_bound()); + EXPECT_TRUE(stream_provider_initialized_); + + uint32_t flush_audio_count = 10; + uint32_t flush_video_count = 20; + FlushUntil(flush_audio_count, flush_video_count); + task_environment_.RunUntilIdle(); + + EXPECT_EQ(GetAudioCurrentFrameCount(), flush_audio_count); + EXPECT_EQ(GetVideoCurrentFrameCount(), flush_video_count); +} + +} // namespace remoting +} // namespace media diff --git a/chromium/media/remoting/test_utils.cc b/chromium/media/remoting/test_utils.cc new file mode 100644 index 00000000000..d3ec82254d4 --- /dev/null +++ b/chromium/media/remoting/test_utils.cc @@ -0,0 +1,17 @@ +// Copyright 2020 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "media/remoting/test_utils.h" +#include "media/remoting/receiver_controller.h" + +namespace media { +namespace remoting { + +void ResetForTesting(ReceiverController* controller) { + controller->receiver_.reset(); + controller->media_remotee_.reset(); +} + +} // namespace remoting +} // namespace media diff --git a/chromium/media/remoting/test_utils.h b/chromium/media/remoting/test_utils.h new file mode 100644 index 00000000000..2b5a3e77de3 --- /dev/null +++ b/chromium/media/remoting/test_utils.h @@ -0,0 +1,19 @@ +// Copyright 2020 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MEDIA_REMOTING_TEST_UTILS_H_ +#define MEDIA_REMOTING_TEST_UTILS_H_ + +namespace media { +namespace remoting { + +class ReceiverController; + +// Friend function for resetting the mojo binding in ReceiverController. +void ResetForTesting(ReceiverController* controller); + +} // namespace remoting +} // namespace media + +#endif // MEDIA_REMOTING_TEST_UTILS_H_ |