summaryrefslogtreecommitdiff
path: root/chromium/media/remoting
diff options
context:
space:
mode:
authorAllan Sandfeld Jensen <allan.jensen@qt.io>2020-10-12 14:27:29 +0200
committerAllan Sandfeld Jensen <allan.jensen@qt.io>2020-10-13 09:35:20 +0000
commitc30a6232df03e1efbd9f3b226777b07e087a1122 (patch)
treee992f45784689f373bcc38d1b79a239ebe17ee23 /chromium/media/remoting
parent7b5b123ac58f58ffde0f4f6e488bcd09aa4decd3 (diff)
downloadqtwebengine-chromium-85-based.tar.gz
BASELINE: Update Chromium to 85.0.4183.14085-based
Change-Id: Iaa42f4680837c57725b1344f108c0196741f6057 Reviewed-by: Allan Sandfeld Jensen <allan.jensen@qt.io>
Diffstat (limited to 'chromium/media/remoting')
-rw-r--r--chromium/media/remoting/BUILD.gn41
-rw-r--r--chromium/media/remoting/courier_renderer.cc120
-rw-r--r--chromium/media/remoting/courier_renderer.h4
-rw-r--r--chromium/media/remoting/courier_renderer_unittest.cc161
-rw-r--r--chromium/media/remoting/demuxer_stream_adapter.cc4
-rw-r--r--chromium/media/remoting/end2end_test_renderer.cc237
-rw-r--r--chromium/media/remoting/end2end_test_renderer.h46
-rw-r--r--chromium/media/remoting/fake_media_resource.cc6
-rw-r--r--chromium/media/remoting/fake_media_resource.h5
-rw-r--r--chromium/media/remoting/integration_test.cc3
-rw-r--r--chromium/media/remoting/media_remoting_rpc.proto10
-rw-r--r--chromium/media/remoting/mock_receiver_controller.cc118
-rw-r--r--chromium/media/remoting/mock_receiver_controller.h96
-rw-r--r--chromium/media/remoting/receiver.cc295
-rw-r--r--chromium/media/remoting/receiver.h119
-rw-r--r--chromium/media/remoting/receiver_controller.cc116
-rw-r--r--chromium/media/remoting/receiver_controller.h70
-rw-r--r--chromium/media/remoting/receiver_unittest.cc471
-rw-r--r--chromium/media/remoting/remoting_constants.h18
-rw-r--r--chromium/media/remoting/remoting_renderer_factory.cc122
-rw-r--r--chromium/media/remoting/remoting_renderer_factory.h72
-rw-r--r--chromium/media/remoting/rpc_broker.h6
-rw-r--r--chromium/media/remoting/stream_provider.cc687
-rw-r--r--chromium/media/remoting/stream_provider.h280
-rw-r--r--chromium/media/remoting/stream_provider_unittest.cc316
-rw-r--r--chromium/media/remoting/test_utils.cc17
-rw-r--r--chromium/media/remoting/test_utils.h19
27 files changed, 2804 insertions, 655 deletions
diff --git a/chromium/media/remoting/BUILD.gn b/chromium/media/remoting/BUILD.gn
index f302a0571b0..b01ab5ffd6b 100644
--- a/chromium/media/remoting/BUILD.gn
+++ b/chromium/media/remoting/BUILD.gn
@@ -31,7 +31,7 @@ source_set("rpc") {
public_deps = [ ":media_remoting_proto" ]
}
-source_set("remoting") {
+source_set("remoting_sender") {
sources = [
"courier_renderer_factory.cc",
"courier_renderer_factory.h",
@@ -64,16 +64,48 @@ source_set("remoting") {
}
}
+source_set("remoting_constants") {
+ sources = [ "remoting_constants.h" ]
+}
+
+source_set("remoting_renderer") {
+ sources = [
+ "receiver.cc",
+ "receiver.h",
+ "receiver_controller.cc",
+ "receiver_controller.h",
+ "remoting_renderer_factory.cc",
+ "remoting_renderer_factory.h",
+ "stream_provider.cc",
+ "stream_provider.h",
+ ]
+
+ deps = [
+ ":remoting_constants",
+ ":rpc",
+ "//media/mojo/common:common",
+ "//media/mojo/mojom:remoting",
+ ]
+}
+
source_set("media_remoting_tests") {
testonly = true
sources = [
"fake_remoter.cc",
"fake_remoter.h",
+ "mock_receiver_controller.cc",
+ "mock_receiver_controller.h",
+ "receiver_unittest.cc",
"renderer_controller_unittest.cc",
+ "stream_provider_unittest.cc",
+ "test_utils.cc",
+ "test_utils.h",
]
deps = [
- ":remoting",
+ ":remoting_renderer",
+ ":remoting_sender",
+ ":rpc",
"//base",
"//base/test:test_support",
"//media:test_support",
@@ -94,16 +126,13 @@ source_set("media_remoting_tests") {
"fake_media_resource.h",
"integration_test.cc",
"proto_utils_unittest.cc",
- "receiver.cc",
- "receiver.h",
"rpc_broker_unittest.cc",
- "stream_provider.cc",
- "stream_provider.h",
]
deps += [
":rpc",
"//media/test:pipeline_integration_test_base",
+ "//services/service_manager/public/cpp:cpp",
"//ui/gfx:test_support",
"//ui/gfx/geometry",
]
diff --git a/chromium/media/remoting/courier_renderer.cc b/chromium/media/remoting/courier_renderer.cc
index d820952f0ca..dfd4b8a1e2a 100644
--- a/chromium/media/remoting/courier_renderer.cc
+++ b/chromium/media/remoting/courier_renderer.cc
@@ -74,7 +74,6 @@ CourierRenderer::CourierRenderer(
remote_renderer_handle_(RpcBroker::kInvalidHandle),
video_renderer_sink_(video_renderer_sink),
clock_(base::DefaultTickClock::GetInstance()) {
- VLOG(2) << __func__;
// Note: The constructor is running on the main thread, but will be destroyed
// on the media thread. Therefore, all weak pointers must be dereferenced on
// the media thread.
@@ -85,7 +84,6 @@ CourierRenderer::CourierRenderer(
}
CourierRenderer::~CourierRenderer() {
- VLOG(2) << __func__;
DCHECK(media_task_runner_->BelongsToCurrentThread());
// Post task on main thread to unregister message receiver.
@@ -102,7 +100,6 @@ CourierRenderer::~CourierRenderer() {
void CourierRenderer::Initialize(MediaResource* media_resource,
RendererClient* client,
PipelineStatusCallback init_cb) {
- VLOG(2) << __func__;
DCHECK(media_task_runner_->BelongsToCurrentThread());
DCHECK(media_resource);
DCHECK(client);
@@ -150,19 +147,10 @@ void CourierRenderer::Initialize(MediaResource* media_resource,
rpc_broker_)));
}
-void CourierRenderer::SetCdm(CdmContext* cdm_context,
- CdmAttachedCB cdm_attached_cb) {
- DCHECK(media_task_runner_->BelongsToCurrentThread());
-
- // Media remoting doesn't support encrypted content.
- NOTIMPLEMENTED();
-}
-
void CourierRenderer::SetLatencyHint(
base::Optional<base::TimeDelta> latency_hint) {}
void CourierRenderer::Flush(base::OnceClosure flush_cb) {
- VLOG(2) << __func__;
DCHECK(media_task_runner_->BelongsToCurrentThread());
DCHECK(!flush_cb_);
@@ -188,14 +176,13 @@ void CourierRenderer::Flush(base::OnceClosure flush_cb) {
(video_demuxer_stream_adapter_ && !flush_video_count.has_value()) ||
(audio_demuxer_stream_adapter_ && video_demuxer_stream_adapter_ &&
flush_audio_count.has_value() != flush_video_count.has_value())) {
- VLOG(1) << "Ignoring flush request while under flushing operation";
return;
}
flush_cb_ = std::move(flush_cb);
// Issues RPC_R_FLUSHUNTIL RPC message.
- std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage());
+ auto rpc = std::make_unique<pb::RpcMessage>();
rpc->set_handle(remote_renderer_handle_);
rpc->set_proc(pb::RpcMessage::RPC_R_FLUSHUNTIL);
pb::RendererFlushUntil* message = rpc->mutable_renderer_flushuntil_rpc();
@@ -204,15 +191,10 @@ void CourierRenderer::Flush(base::OnceClosure flush_cb) {
if (flush_video_count.has_value())
message->set_video_count(*flush_video_count);
message->set_callback_handle(rpc_handle_);
- VLOG(2) << __func__ << ": Sending RPC_R_FLUSHUNTIL to " << rpc->handle()
- << " with audio_count=" << message->audio_count()
- << ", video_count=" << message->video_count()
- << ", callback_handle=" << message->callback_handle();
SendRpcToRemote(std::move(rpc));
}
void CourierRenderer::StartPlayingFrom(base::TimeDelta time) {
- VLOG(2) << __func__ << ": " << time.InMicroseconds();
DCHECK(media_task_runner_->BelongsToCurrentThread());
if (state_ != STATE_PLAYING) {
@@ -221,12 +203,10 @@ void CourierRenderer::StartPlayingFrom(base::TimeDelta time) {
}
// Issues RPC_R_STARTPLAYINGFROM RPC message.
- std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage());
+ auto rpc = std::make_unique<pb::RpcMessage>();
rpc->set_handle(remote_renderer_handle_);
rpc->set_proc(pb::RpcMessage::RPC_R_STARTPLAYINGFROM);
rpc->set_integer64_value(time.InMicroseconds());
- VLOG(2) << __func__ << ": Sending RPC_R_STARTPLAYINGFROM to " << rpc->handle()
- << " with time_usec=" << rpc->integer64_value();
SendRpcToRemote(std::move(rpc));
{
@@ -237,7 +217,6 @@ void CourierRenderer::StartPlayingFrom(base::TimeDelta time) {
}
void CourierRenderer::SetPlaybackRate(double playback_rate) {
- VLOG(2) << __func__ << ": " << playback_rate;
DCHECK(media_task_runner_->BelongsToCurrentThread());
if (state_ != STATE_FLUSHING && state_ != STATE_PLAYING) {
@@ -246,19 +225,16 @@ void CourierRenderer::SetPlaybackRate(double playback_rate) {
}
// Issues RPC_R_SETPLAYBACKRATE RPC message.
- std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage());
+ auto rpc = std::make_unique<pb::RpcMessage>();
rpc->set_handle(remote_renderer_handle_);
rpc->set_proc(pb::RpcMessage::RPC_R_SETPLAYBACKRATE);
rpc->set_double_value(playback_rate);
- VLOG(2) << __func__ << ": Sending RPC_R_SETPLAYBACKRATE to " << rpc->handle()
- << " with rate=" << rpc->double_value();
SendRpcToRemote(std::move(rpc));
playback_rate_ = playback_rate;
ResetMeasurements();
}
void CourierRenderer::SetVolume(float volume) {
- VLOG(2) << __func__ << ": " << volume;
DCHECK(media_task_runner_->BelongsToCurrentThread());
if (state_ != STATE_FLUSHING && state_ != STATE_PLAYING) {
@@ -267,12 +243,10 @@ void CourierRenderer::SetVolume(float volume) {
}
// Issues RPC_R_SETVOLUME RPC message.
- std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage());
+ auto rpc = std::make_unique<pb::RpcMessage>();
rpc->set_handle(remote_renderer_handle_);
rpc->set_proc(pb::RpcMessage::RPC_R_SETVOLUME);
rpc->set_double_value(volume);
- VLOG(2) << __func__ << ": Sending RPC_R_SETVOLUME to " << rpc->handle()
- << " with volume=" << rpc->double_value();
SendRpcToRemote(std::move(rpc));
}
@@ -314,7 +288,6 @@ void CourierRenderer::OnDataPipeCreated(
mojo::ScopedDataPipeProducerHandle video_handle,
int audio_rpc_handle,
int video_rpc_handle) {
- VLOG(2) << __func__;
DCHECK(media_task_runner_->BelongsToCurrentThread());
if (state_ == STATE_ERROR)
@@ -332,7 +305,6 @@ void CourierRenderer::OnDataPipeCreated(
// Create audio demuxer stream adapter if audio is available.
if (audio_demuxer_stream && audio.is_valid() && audio_handle.is_valid() &&
audio_rpc_handle != RpcBroker::kInvalidHandle) {
- VLOG(2) << "Initialize audio";
audio_demuxer_stream_adapter_.reset(new DemuxerStreamAdapter(
main_task_runner_, media_task_runner_, "audio", audio_demuxer_stream,
rpc_broker_, audio_rpc_handle, std::move(audio),
@@ -344,7 +316,6 @@ void CourierRenderer::OnDataPipeCreated(
// Create video demuxer stream adapter if video is available.
if (video_demuxer_stream && video.is_valid() && video_handle.is_valid() &&
video_rpc_handle != RpcBroker::kInvalidHandle) {
- VLOG(2) << "Initialize video";
video_demuxer_stream_adapter_.reset(new DemuxerStreamAdapter(
main_task_runner_, media_task_runner_, "video", video_demuxer_stream,
rpc_broker_, video_rpc_handle, std::move(video),
@@ -360,13 +331,27 @@ void CourierRenderer::OnDataPipeCreated(
}
state_ = STATE_ACQUIRING;
+
+ // Issues RPC_ACQUIRE_DEMUXER RPC message.
+ auto rpc = std::make_unique<pb::RpcMessage>();
+ rpc->set_handle(RpcBroker::kAcquireDemuxerHandle);
+ rpc->set_proc(pb::RpcMessage::RPC_ACQUIRE_DEMUXER);
+ pb::AcquireDemuxer* message = rpc->mutable_acquire_demuxer_rpc();
+ message->set_audio_demuxer_handle(
+ audio_demuxer_stream_adapter_
+ ? audio_demuxer_stream_adapter_->rpc_handle()
+ : RpcBroker::kInvalidHandle);
+ message->set_video_demuxer_handle(
+ video_demuxer_stream_adapter_
+ ? video_demuxer_stream_adapter_->rpc_handle()
+ : RpcBroker::kInvalidHandle);
+ SendRpcToRemote(std::move(rpc));
+
// Issues RPC_ACQUIRE_RENDERER RPC message.
- std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage());
- rpc->set_handle(RpcBroker::kAcquireHandle);
+ rpc = std::make_unique<pb::RpcMessage>();
+ rpc->set_handle(RpcBroker::kAcquireRendererHandle);
rpc->set_proc(pb::RpcMessage::RPC_ACQUIRE_RENDERER);
rpc->set_integer_value(rpc_handle_);
- VLOG(2) << __func__ << ": Sending RPC_ACQUIRE_RENDERER to " << rpc->handle()
- << " with rpc_handle=" << rpc->integer_value();
SendRpcToRemote(std::move(rpc));
}
@@ -403,11 +388,9 @@ void CourierRenderer::OnReceivedRpc(std::unique_ptr<pb::RpcMessage> message) {
OnBufferingStateChange(std::move(message));
break;
case pb::RpcMessage::RPC_RC_ONENDED:
- VLOG(2) << __func__ << ": Received RPC_RC_ONENDED.";
client_->OnEnded();
break;
case pb::RpcMessage::RPC_RC_ONERROR:
- VLOG(2) << __func__ << ": Received RPC_RC_ONERROR.";
OnFatalError(RECEIVER_PIPELINE_ERROR);
break;
case pb::RpcMessage::RPC_RC_ONAUDIOCONFIGCHANGE:
@@ -426,12 +409,11 @@ void CourierRenderer::OnReceivedRpc(std::unique_ptr<pb::RpcMessage> message) {
OnStatisticsUpdate(std::move(message));
break;
case pb::RpcMessage::RPC_RC_ONWAITINGFORDECRYPTIONKEY:
- VLOG(2) << __func__ << ": Received RPC_RC_ONWAITINGFORDECRYPTIONKEY.";
client_->OnWaiting(WaitingReason::kNoDecryptionKey);
break;
default:
- VLOG(1) << "Unknown RPC: " << message->proc();
+ DVLOG(1) << "Unknown RPC: " << message->proc();
}
}
@@ -449,19 +431,15 @@ void CourierRenderer::AcquireRendererDone(
DCHECK(message);
remote_renderer_handle_ = message->integer_value();
- VLOG(2) << __func__
- << ": Received RPC_ACQUIRE_RENDERER_DONE with remote_renderer_handle="
- << remote_renderer_handle_;
if (state_ != STATE_ACQUIRING || init_workflow_done_callback_.is_null()) {
- LOG(WARNING) << "Unexpected acquire renderer done RPC.";
OnFatalError(PEERS_OUT_OF_SYNC);
return;
}
state_ = STATE_INITIALIZING;
// Issues RPC_R_INITIALIZE RPC message to initialize renderer.
- std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage());
+ auto rpc = std::make_unique<pb::RpcMessage>();
rpc->set_handle(remote_renderer_handle_);
rpc->set_proc(pb::RpcMessage::RPC_R_INITIALIZE);
pb::RendererInitialize* init = rpc->mutable_renderer_initialize_rpc();
@@ -475,11 +453,6 @@ void CourierRenderer::AcquireRendererDone(
? video_demuxer_stream_adapter_->rpc_handle()
: RpcBroker::kInvalidHandle);
init->set_callback_handle(rpc_handle_);
- VLOG(2) << __func__ << ": Sending RPC_R_INITIALIZE to " << rpc->handle()
- << " with client_handle=" << init->client_handle()
- << ", audio_demuxer_handle=" << init->audio_demuxer_handle()
- << ", video_demuxer_handle=" << init->video_demuxer_handle()
- << ", callback_handle=" << init->callback_handle();
SendRpcToRemote(std::move(rpc));
}
@@ -489,11 +462,7 @@ void CourierRenderer::InitializeCallback(
DCHECK(message);
const bool success = message->boolean_value();
- VLOG(2) << __func__
- << ": Received RPC_R_INITIALIZE_CALLBACK with success=" << success;
-
if (state_ != STATE_INITIALIZING || init_workflow_done_callback_.is_null()) {
- LOG(WARNING) << "Unexpected initialize callback RPC.";
OnFatalError(PEERS_OUT_OF_SYNC);
return;
}
@@ -511,10 +480,8 @@ void CourierRenderer::InitializeCallback(
void CourierRenderer::FlushUntilCallback() {
DCHECK(media_task_runner_->BelongsToCurrentThread());
- VLOG(2) << __func__ << ": Received RPC_R_FLUSHUNTIL_CALLBACK";
if (state_ != STATE_FLUSHING || !flush_cb_) {
- LOG(WARNING) << "Unexpected flushuntil callback RPC.";
OnFatalError(PEERS_OUT_OF_SYNC);
return;
}
@@ -531,9 +498,6 @@ void CourierRenderer::FlushUntilCallback() {
void CourierRenderer::SetCdmCallback(std::unique_ptr<pb::RpcMessage> message) {
DCHECK(media_task_runner_->BelongsToCurrentThread());
DCHECK(message);
- VLOG(2) << __func__ << ": Received RPC_R_SETCDM_CALLBACK with cdm_id="
- << message->renderer_set_cdm_rpc().cdm_id() << ", callback_handle="
- << message->renderer_set_cdm_rpc().callback_handle();
// TODO(erickung): add implementation once Remote CDM implementation is done.
NOTIMPLEMENTED();
}
@@ -543,7 +507,6 @@ void CourierRenderer::OnTimeUpdate(std::unique_ptr<pb::RpcMessage> message) {
DCHECK(message);
// Shutdown remoting session if receiving malformed RPC message.
if (!message->has_rendererclient_ontimeupdate_rpc()) {
- VLOG(1) << __func__ << " missing required RPC message";
OnFatalError(RPC_INVALID);
return;
}
@@ -551,9 +514,6 @@ void CourierRenderer::OnTimeUpdate(std::unique_ptr<pb::RpcMessage> message) {
message->rendererclient_ontimeupdate_rpc().time_usec();
const int64_t max_time_usec =
message->rendererclient_ontimeupdate_rpc().max_time_usec();
- VLOG(2) << __func__
- << ": Received RPC_RC_ONTIMEUPDATE with time_usec=" << time_usec
- << ", max_time_usec=" << max_time_usec;
// Ignores invalid time, such as negative value, or time larger than max value
// (usually the time stamp that all streams are pushed into AV pipeline).
if (time_usec < 0 || max_time_usec < 0 || time_usec > max_time_usec)
@@ -575,12 +535,9 @@ void CourierRenderer::OnBufferingStateChange(
DCHECK(media_task_runner_->BelongsToCurrentThread());
DCHECK(message);
if (!message->has_rendererclient_onbufferingstatechange_rpc()) {
- VLOG(1) << __func__ << " missing required RPC message";
OnFatalError(RPC_INVALID);
return;
}
- VLOG(2) << __func__ << ": Received RPC_RC_ONBUFFERINGSTATECHANGE with state="
- << message->rendererclient_onbufferingstatechange_rpc().state();
base::Optional<BufferingState> state = ToMediaBufferingState(
message->rendererclient_onbufferingstatechange_rpc().state());
BufferingStateChangeReason reason = BUFFERING_CHANGE_REASON_UNKNOWN;
@@ -605,7 +562,6 @@ void CourierRenderer::OnAudioConfigChange(
DCHECK(message);
// Shutdown remoting session if receiving malformed RPC message.
if (!message->has_rendererclient_onaudioconfigchange_rpc()) {
- VLOG(1) << __func__ << " missing required RPC message";
OnFatalError(RPC_INVALID);
return;
}
@@ -618,8 +574,6 @@ void CourierRenderer::OnAudioConfigChange(
ConvertProtoToAudioDecoderConfig(pb_audio_config, &out_audio_config);
DCHECK(out_audio_config.IsValidConfig());
- VLOG(2) << __func__ << ": Received RPC_RC_ONAUDIOCONFIGCHANGE with config:"
- << out_audio_config.AsHumanReadableString();
client_->OnAudioConfigChange(out_audio_config);
}
@@ -629,7 +583,6 @@ void CourierRenderer::OnVideoConfigChange(
DCHECK(message);
// Shutdown remoting session if receiving malformed RPC message.
if (!message->has_rendererclient_onvideoconfigchange_rpc()) {
- VLOG(1) << __func__ << " missing required RPC message";
OnFatalError(RPC_INVALID);
return;
}
@@ -642,8 +595,6 @@ void CourierRenderer::OnVideoConfigChange(
ConvertProtoToVideoDecoderConfig(pb_video_config, &out_video_config);
DCHECK(out_video_config.IsValidConfig());
- VLOG(2) << __func__ << ": Received RPC_RC_ONVIDEOCONFIGCHANGE with config:"
- << out_video_config.AsHumanReadableString();
client_->OnVideoConfigChange(out_video_config);
}
@@ -653,14 +604,11 @@ void CourierRenderer::OnVideoNaturalSizeChange(
DCHECK(message);
// Shutdown remoting session if receiving malformed RPC message.
if (!message->has_rendererclient_onvideonatualsizechange_rpc()) {
- VLOG(1) << __func__ << " missing required RPC message";
OnFatalError(RPC_INVALID);
return;
}
const auto& size_change =
message->rendererclient_onvideonatualsizechange_rpc();
- VLOG(2) << __func__ << ": Received RPC_RC_ONVIDEONATURALSIZECHANGE with size="
- << size_change.width() << 'x' << size_change.height();
if (size_change.width() <= 0 || size_change.height() <= 0)
return;
client_->OnVideoNaturalSizeChange(
@@ -672,8 +620,6 @@ void CourierRenderer::OnVideoOpacityChange(
DCHECK(media_task_runner_->BelongsToCurrentThread());
DCHECK(message);
const bool opaque = message->boolean_value();
- VLOG(2) << __func__
- << ": Received RPC_RC_ONVIDEOOPACITYCHANGE with opaque=" << opaque;
client_->OnVideoOpacityChange(opaque);
}
@@ -683,7 +629,6 @@ void CourierRenderer::OnStatisticsUpdate(
DCHECK(message);
// Shutdown remoting session if receiving malformed RPC message.
if (!message->has_rendererclient_onstatisticsupdate_rpc()) {
- VLOG(1) << __func__ << " missing required RPC message";
OnFatalError(RPC_INVALID);
return;
}
@@ -691,15 +636,6 @@ void CourierRenderer::OnStatisticsUpdate(
ConvertProtoToPipelineStatistics(
message->rendererclient_onstatisticsupdate_rpc(), &stats);
// Note: Each field in |stats| is a delta, not the aggregate amount.
- VLOG(2) << __func__
- << ": Received RPC_RC_ONSTATISTICSUPDATE with audio_bytes_decoded="
- << stats.audio_bytes_decoded
- << ", video_bytes_decoded=" << stats.video_bytes_decoded
- << ", video_frames_decoded=" << stats.video_frames_decoded
- << ", video_frames_dropped=" << stats.video_frames_dropped
- << ", audio_memory_usage=" << stats.audio_memory_usage
- << ", video_memory_usage=" << stats.video_memory_usage;
-
if (stats.audio_bytes_decoded > 0 || stats.video_frames_decoded > 0 ||
stats.video_frames_dropped > 0) {
metrics_recorder_.OnEvidenceOfPlayoutAtReceiver();
@@ -712,8 +648,6 @@ void CourierRenderer::OnFatalError(StopTrigger stop_trigger) {
DCHECK(media_task_runner_->BelongsToCurrentThread());
DCHECK_NE(UNKNOWN_STOP_TRIGGER, stop_trigger);
- VLOG(2) << __func__ << " with StopTrigger " << stop_trigger;
-
// If this is the first error, notify the controller. It is expected the
// controller will cause this renderer to shut down shortly.
if (state_ != STATE_ERROR) {
@@ -761,9 +695,6 @@ void CourierRenderer::OnMediaTimeUpdated() {
playback_rate_;
if ((media_duration - update_duration).magnitude() >=
kMediaPlaybackDelayThreshold) {
- VLOG(1) << "Irregular playback detected: Media playback delayed."
- << " media_duration = " << media_duration
- << " update_duration = " << update_duration;
++times_playback_delayed_;
if (times_playback_delayed_ == kPlaybackDelayCountThreshold)
OnFatalError(PACING_TOO_SLOWLY);
@@ -807,9 +738,6 @@ void CourierRenderer::UpdateVideoStatsQueue(int video_frames_decoded,
if (sum_video_frames_decoded_ &&
sum_video_frames_dropped_ * 100 >
sum_video_frames_decoded_ * kMaxNumVideoFramesDroppedPercentage) {
- VLOG(1) << "Irregular playback detected: Too many video frames dropped."
- << " video_frames_decoded= " << sum_video_frames_decoded_
- << " video_frames_dropped= " << sum_video_frames_dropped_;
OnFatalError(FRAME_DROP_RATE_HIGH);
}
// Prune |video_stats_queue_|.
diff --git a/chromium/media/remoting/courier_renderer.h b/chromium/media/remoting/courier_renderer.h
index 75111e4174c..38e5e338a44 100644
--- a/chromium/media/remoting/courier_renderer.h
+++ b/chromium/media/remoting/courier_renderer.h
@@ -5,8 +5,6 @@
#ifndef MEDIA_REMOTING_COURIER_RENDERER_H_
#define MEDIA_REMOTING_COURIER_RENDERER_H_
-#include <stdint.h>
-
#include <memory>
#include "base/callback.h"
@@ -20,6 +18,7 @@
#include "media/base/pipeline_status.h"
#include "media/base/renderer.h"
#include "media/mojo/mojom/remoting.mojom.h"
+#include "media/remoting/media_remoting_rpc.pb.h"
#include "media/remoting/metrics.h"
#include "media/remoting/rpc_broker.h"
#include "mojo/public/cpp/bindings/pending_remote.h"
@@ -74,7 +73,6 @@ class CourierRenderer : public Renderer {
void Initialize(MediaResource* media_resource,
RendererClient* client,
PipelineStatusCallback init_cb) final;
- void SetCdm(CdmContext* cdm_context, CdmAttachedCB cdm_attached_cb) final;
void SetLatencyHint(base::Optional<base::TimeDelta> latency_hint) final;
void Flush(base::OnceClosure flush_cb) final;
void StartPlayingFrom(base::TimeDelta time) final;
diff --git a/chromium/media/remoting/courier_renderer_unittest.cc b/chromium/media/remoting/courier_renderer_unittest.cc
index a51f3dc86ef..4d13c140856 100644
--- a/chromium/media/remoting/courier_renderer_unittest.cc
+++ b/chromium/media/remoting/courier_renderer_unittest.cc
@@ -7,6 +7,7 @@
#include <memory>
#include "base/bind.h"
+#include "base/check.h"
#include "base/run_loop.h"
#include "base/test/simple_test_tick_clock.h"
#include "base/test/task_environment.h"
@@ -20,6 +21,7 @@
#include "media/remoting/proto_enum_utils.h"
#include "media/remoting/proto_utils.h"
#include "media/remoting/renderer_controller.h"
+#include "media/remoting/rpc_broker.h"
#include "testing/gmock/include/gmock/gmock.h"
#include "testing/gtest/include/gtest/gtest.h"
@@ -146,16 +148,7 @@ class RendererClientImpl final : public RendererClient {
class CourierRendererTest : public testing::Test {
public:
- CourierRendererTest()
- : receiver_renderer_handle_(10),
- receiver_audio_demuxer_callback_handle_(11),
- receiver_video_demuxer_callback_handle_(12),
- sender_client_handle_(RpcBroker::kInvalidHandle),
- sender_renderer_callback_handle_(RpcBroker::kInvalidHandle),
- sender_audio_demuxer_handle_(RpcBroker::kInvalidHandle),
- sender_video_demuxer_handle_(RpcBroker::kInvalidHandle),
- received_audio_ds_init_cb_(false),
- received_video_ds_init_cb_(false) {}
+ CourierRendererTest() = default;
~CourierRendererTest() override = default;
// Use this function to mimic receiver to handle RPC message for renderer
@@ -165,40 +158,87 @@ class CourierRendererTest : public testing::Test {
ASSERT_TRUE(rpc->ParseFromArray(message->data(), message->size()));
switch (rpc->proc()) {
case pb::RpcMessage::RPC_ACQUIRE_RENDERER: {
+ DCHECK(rpc->has_integer_value());
+ sender_renderer_handle_ = rpc->integer_value();
// Issues RPC_ACQUIRE_RENDERER_DONE RPC message.
- std::unique_ptr<pb::RpcMessage> acquire_done(new pb::RpcMessage());
- acquire_done->set_handle(rpc->integer_value());
+ auto acquire_done = std::make_unique<pb::RpcMessage>();
+ acquire_done->set_handle(sender_renderer_handle_);
acquire_done->set_proc(pb::RpcMessage::RPC_ACQUIRE_RENDERER_DONE);
acquire_done->set_integer_value(receiver_renderer_handle_);
controller_->GetRpcBroker()->ProcessMessageFromRemote(
std::move(acquire_done));
} break;
+ case pb::RpcMessage::RPC_ACQUIRE_DEMUXER: {
+ if (!is_backward_compatible_mode_) {
+ int acquire_demuxer_handle = RpcBroker::kAcquireDemuxerHandle;
+ EXPECT_EQ(rpc->handle(), acquire_demuxer_handle);
+ sender_audio_demuxer_handle_ =
+ rpc->acquire_demuxer_rpc().audio_demuxer_handle();
+ sender_video_demuxer_handle_ =
+ rpc->acquire_demuxer_rpc().video_demuxer_handle();
+
+ // Issues audio RPC_DS_INITIALIZE RPC message.
+ if (sender_audio_demuxer_handle_ != RpcBroker::kInvalidHandle) {
+ auto ds_init = std::make_unique<pb::RpcMessage>();
+ ds_init->set_handle(sender_audio_demuxer_handle_);
+ ds_init->set_proc(pb::RpcMessage::RPC_DS_INITIALIZE);
+ ds_init->set_integer_value(receiver_audio_demuxer_callback_handle_);
+ controller_->GetRpcBroker()->ProcessMessageFromRemote(
+ std::move(ds_init));
+ }
+
+ // Issues video RPC_DS_INITIALIZE RPC message.
+ if (sender_video_demuxer_handle_ != RpcBroker::kInvalidHandle) {
+ auto ds_init = std::make_unique<pb::RpcMessage>();
+ ds_init->set_handle(sender_video_demuxer_handle_);
+ ds_init->set_proc(pb::RpcMessage::RPC_DS_INITIALIZE);
+ ds_init->set_integer_value(receiver_video_demuxer_callback_handle_);
+ controller_->GetRpcBroker()->ProcessMessageFromRemote(
+ std::move(ds_init));
+ }
+ }
+ } break;
case pb::RpcMessage::RPC_R_INITIALIZE: {
- EXPECT_EQ(rpc->handle(), receiver_renderer_handle_);
sender_renderer_callback_handle_ =
rpc->renderer_initialize_rpc().callback_handle();
sender_client_handle_ = rpc->renderer_initialize_rpc().client_handle();
- sender_audio_demuxer_handle_ =
- rpc->renderer_initialize_rpc().audio_demuxer_handle();
- sender_video_demuxer_handle_ =
- rpc->renderer_initialize_rpc().video_demuxer_handle();
-
- // Issues audio RPC_DS_INITIALIZE RPC message.
- if (sender_audio_demuxer_handle_ != RpcBroker::kInvalidHandle) {
- std::unique_ptr<pb::RpcMessage> ds_init(new pb::RpcMessage());
- ds_init->set_handle(sender_audio_demuxer_handle_);
- ds_init->set_proc(pb::RpcMessage::RPC_DS_INITIALIZE);
- ds_init->set_integer_value(receiver_audio_demuxer_callback_handle_);
- controller_->GetRpcBroker()->ProcessMessageFromRemote(
- std::move(ds_init));
- }
- if (sender_video_demuxer_handle_ != RpcBroker::kInvalidHandle) {
- std::unique_ptr<pb::RpcMessage> ds_init(new pb::RpcMessage());
- ds_init->set_handle(sender_video_demuxer_handle_);
- ds_init->set_proc(pb::RpcMessage::RPC_DS_INITIALIZE);
- ds_init->set_integer_value(receiver_video_demuxer_callback_handle_);
+
+ if (is_backward_compatible_mode_) {
+ EXPECT_EQ(rpc->handle(), receiver_renderer_handle_);
+
+ sender_audio_demuxer_handle_ =
+ rpc->renderer_initialize_rpc().audio_demuxer_handle();
+ sender_video_demuxer_handle_ =
+ rpc->renderer_initialize_rpc().video_demuxer_handle();
+
+ // Issues audio RPC_DS_INITIALIZE RPC message.
+ if (sender_audio_demuxer_handle_ != RpcBroker::kInvalidHandle) {
+ auto ds_init = std::make_unique<pb::RpcMessage>();
+ ds_init->set_handle(sender_audio_demuxer_handle_);
+ ds_init->set_proc(pb::RpcMessage::RPC_DS_INITIALIZE);
+ ds_init->set_integer_value(receiver_audio_demuxer_callback_handle_);
+ controller_->GetRpcBroker()->ProcessMessageFromRemote(
+ std::move(ds_init));
+ }
+
+ // Issues video RPC_DS_INITIALIZE RPC message.
+ if (sender_video_demuxer_handle_ != RpcBroker::kInvalidHandle) {
+ auto ds_init = std::make_unique<pb::RpcMessage>();
+ ds_init->set_handle(sender_video_demuxer_handle_);
+ ds_init->set_proc(pb::RpcMessage::RPC_DS_INITIALIZE);
+ ds_init->set_integer_value(receiver_video_demuxer_callback_handle_);
+ controller_->GetRpcBroker()->ProcessMessageFromRemote(
+ std::move(ds_init));
+ }
+ } else {
+ // Issues RPC_R_INITIALIZE_CALLBACK RPC message when receiving
+ // RPC_R_INITIALIZE.
+ auto init_cb = std::make_unique<pb::RpcMessage>();
+ init_cb->set_handle(sender_renderer_callback_handle_);
+ init_cb->set_proc(pb::RpcMessage::RPC_R_INITIALIZE_CALLBACK);
+ init_cb->set_boolean_value(is_successfully_initialized_);
controller_->GetRpcBroker()->ProcessMessageFromRemote(
- std::move(ds_init));
+ std::move(init_cb));
}
} break;
case pb::RpcMessage::RPC_DS_INITIALIZE_CALLBACK: {
@@ -207,20 +247,24 @@ class CourierRendererTest : public testing::Test {
if (rpc->handle() == receiver_video_demuxer_callback_handle_)
received_video_ds_init_cb_ = true;
- // Issues RPC_R_INITIALIZE_CALLBACK RPC message when receiving
- // RPC_DS_INITIALIZE_CALLBACK on available streams.
+ // Check whether the demuxer at the receiver end is initialized.
if (received_audio_ds_init_cb_ ==
(sender_audio_demuxer_handle_ != RpcBroker::kInvalidHandle) &&
received_video_ds_init_cb_ ==
(sender_video_demuxer_handle_ != RpcBroker::kInvalidHandle)) {
- std::unique_ptr<pb::RpcMessage> init_cb(new pb::RpcMessage());
+ is_receiver_demuxer_initialized_ = true;
+ }
+
+ if (is_backward_compatible_mode_ && is_receiver_demuxer_initialized_) {
+ // Issues RPC_R_INITIALIZE_CALLBACK RPC message when receiving
+ // RPC_DS_INITIALIZE_CALLBACK on available streams.
+ auto init_cb = std::make_unique<pb::RpcMessage>();
init_cb->set_handle(sender_renderer_callback_handle_);
init_cb->set_proc(pb::RpcMessage::RPC_R_INITIALIZE_CALLBACK);
init_cb->set_boolean_value(is_successfully_initialized_);
controller_->GetRpcBroker()->ProcessMessageFromRemote(
std::move(init_cb));
}
-
} break;
case pb::RpcMessage::RPC_R_FLUSHUNTIL: {
// Issues RPC_R_FLUSHUNTIL_CALLBACK RPC message.
@@ -269,8 +313,16 @@ class CourierRendererTest : public testing::Test {
RunPendingTasks();
}
+ void InitializeRendererBackwardsCompatible() {
+ is_backward_compatible_mode_ = true;
+ InitializeRenderer();
+ }
+
bool IsRendererInitialized() const {
- return renderer_->state_ == CourierRenderer::STATE_PLAYING;
+ EXPECT_TRUE(received_audio_ds_init_cb_);
+ EXPECT_TRUE(received_video_ds_init_cb_);
+ return renderer_->state_ == CourierRenderer::STATE_PLAYING &&
+ is_receiver_demuxer_initialized_;
}
bool DidEncounterFatalError() const {
@@ -402,17 +454,24 @@ class CourierRendererTest : public testing::Test {
base::SimpleTestTickClock clock_;
// RPC handles.
- const int receiver_renderer_handle_;
- const int receiver_audio_demuxer_callback_handle_;
- const int receiver_video_demuxer_callback_handle_;
- int sender_client_handle_;
- int sender_renderer_callback_handle_;
- int sender_audio_demuxer_handle_;
- int sender_video_demuxer_handle_;
+ const int receiver_renderer_handle_{10};
+ const int receiver_audio_demuxer_callback_handle_{11};
+ const int receiver_video_demuxer_callback_handle_{12};
+ int sender_renderer_handle_;
+ int sender_client_handle_{RpcBroker::kInvalidHandle};
+ int sender_renderer_callback_handle_{RpcBroker::kInvalidHandle};
+ int sender_audio_demuxer_handle_{RpcBroker::kInvalidHandle};
+ int sender_video_demuxer_handle_{RpcBroker::kInvalidHandle};
+
+ // Indicates whether the test runs in backward-compatible mode.
+ bool is_backward_compatible_mode_ = false;
+
+ // Indicates whether the demuxer at receiver is initialized or not.
+ bool is_receiver_demuxer_initialized_ = false;
// Indicate whether RPC_DS_INITIALIZE_CALLBACK RPC messages are received.
- bool received_audio_ds_init_cb_;
- bool received_video_ds_init_cb_;
+ bool received_audio_ds_init_cb_ = false;
+ bool received_video_ds_init_cb_ = false;
// Indicates whether the test wants to simulate successful initialization in
// the renderer on the receiver side.
@@ -433,6 +492,14 @@ TEST_F(CourierRendererTest, Initialize) {
ASSERT_EQ(render_client_->status(), PIPELINE_OK);
}
+TEST_F(CourierRendererTest, InitializeBackwardCompatible) {
+ InitializeRendererBackwardsCompatible();
+ RunPendingTasks();
+
+ ASSERT_TRUE(IsRendererInitialized());
+ ASSERT_EQ(render_client_->status(), PIPELINE_OK);
+}
+
TEST_F(CourierRendererTest, InitializeFailed) {
is_successfully_initialized_ = false;
InitializeRenderer();
diff --git a/chromium/media/remoting/demuxer_stream_adapter.cc b/chromium/media/remoting/demuxer_stream_adapter.cc
index 2efbf3873e1..63b7b9d201d 100644
--- a/chromium/media/remoting/demuxer_stream_adapter.cc
+++ b/chromium/media/remoting/demuxer_stream_adapter.cc
@@ -128,7 +128,9 @@ void DemuxerStreamAdapter::OnReceivedRpc(
case pb::RpcMessage::RPC_DS_ENABLEBITSTREAMCONVERTER:
EnableBitstreamConverter();
break;
-
+ case pb::RpcMessage::RPC_DS_ONERROR:
+ OnFatalError(UNEXPECTED_FAILURE);
+ break;
default:
DEMUXER_VLOG(1) << "Unknown RPC: " << message->proc();
}
diff --git a/chromium/media/remoting/end2end_test_renderer.cc b/chromium/media/remoting/end2end_test_renderer.cc
index f08c848d0c4..932e0ab2fdc 100644
--- a/chromium/media/remoting/end2end_test_renderer.cc
+++ b/chromium/media/remoting/end2end_test_renderer.cc
@@ -4,23 +4,29 @@
#include "media/remoting/end2end_test_renderer.h"
-#include <memory>
-
#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/callback.h"
+#include "base/check.h"
+#include "base/notreached.h"
#include "base/threading/thread_task_runner_handle.h"
+#include "media/base/demuxer_stream.h"
#include "media/mojo/common/mojo_data_pipe_read_write.h"
+#include "media/mojo/common/mojo_decoder_buffer_converter.h"
#include "media/mojo/mojom/remoting.mojom.h"
#include "media/remoting/courier_renderer.h"
#include "media/remoting/proto_utils.h"
#include "media/remoting/receiver.h"
+#include "media/remoting/receiver_controller.h"
#include "media/remoting/renderer_controller.h"
+#include "media/remoting/stream_provider.h"
+#include "media/remoting/test_utils.h"
#include "mojo/public/cpp/bindings/pending_receiver.h"
#include "mojo/public/cpp/bindings/pending_remote.h"
#include "mojo/public/cpp/bindings/receiver.h"
#include "mojo/public/cpp/bindings/remote.h"
#include "mojo/public/cpp/bindings/self_owned_receiver.h"
+#include "mojo/public/cpp/system/data_pipe.h"
namespace media {
namespace remoting {
@@ -30,7 +36,8 @@ namespace {
class TestStreamSender final : public mojom::RemotingDataStreamSender {
public:
using SendFrameToSinkCallback =
- base::RepeatingCallback<void(const std::vector<uint8_t>& data,
+ base::RepeatingCallback<void(uint32_t frame_count,
+ const std::vector<uint8_t>& data,
DemuxerStream::Type type)>;
TestStreamSender(
mojo::PendingReceiver<mojom::RemotingDataStreamSender> receiver,
@@ -49,19 +56,21 @@ class TestStreamSender final : public mojom::RemotingDataStreamSender {
next_frame_data_.resize(frame_size);
data_pipe_reader_.Read(
next_frame_data_.data(), frame_size,
- base::BindOnce(&TestStreamSender::OnFrameRead, base::Unretained(this)));
+ base::BindOnce(&TestStreamSender::OnFrameRead, base::Unretained(this),
+ frame_count_++));
}
void CancelInFlightData() override { next_frame_data_.resize(0); }
private:
- void OnFrameRead(bool success) {
+ void OnFrameRead(uint32_t count, bool success) {
DCHECK(success);
if (send_frame_to_sink_cb_)
- send_frame_to_sink_cb_.Run(next_frame_data_, type_);
+ send_frame_to_sink_cb_.Run(count, next_frame_data_, type_);
next_frame_data_.resize(0);
}
+ uint32_t frame_count_ = 0;
mojo::Receiver<RemotingDataStreamSender> receiver_;
MojoDataPipeReader data_pipe_reader_;
const DemuxerStream::Type type_;
@@ -153,33 +162,201 @@ std::unique_ptr<RendererController> CreateController(
} // namespace
+class End2EndTestRenderer::TestRemotee : public mojom::Remotee {
+ public:
+ explicit TestRemotee(RendererController* controller)
+ : controller_(controller) {}
+
+ ~TestRemotee() override = default;
+
+ void OnAudioFrame(uint32_t frame_count,
+ scoped_refptr<DecoderBuffer> decoder_buffer) {
+ ::media::mojom::DecoderBufferPtr mojo_buffer =
+ audio_buffer_writer_->WriteDecoderBuffer(std::move(decoder_buffer));
+ audio_stream_->ReceiveFrame(frame_count, std::move(mojo_buffer));
+ }
+
+ void OnVideoFrame(uint32_t frame_count,
+ scoped_refptr<DecoderBuffer> decoder_buffer) {
+ ::media::mojom::DecoderBufferPtr mojo_buffer =
+ video_buffer_writer_->WriteDecoderBuffer(std::move(decoder_buffer));
+ video_stream_->ReceiveFrame(frame_count, std::move(mojo_buffer));
+ }
+
+ void BindMojoReceiver(mojo::PendingReceiver<mojom::Remotee> receiver) {
+ mojo_receiver_.Bind(std::move(receiver));
+ }
+
+ void OnMessage(const std::vector<uint8_t>& message) {
+ receiver_controller_->OnMessageFromSource(message);
+ }
+
+ // mojom::Remotee implementation
+ void OnRemotingSinkReady(
+ mojo::PendingRemote<::media::mojom::RemotingSink> sink) override {
+ receiver_controller_.Bind(std::move(sink));
+ }
+
+ void SendMessageToSource(const std::vector<uint8_t>& message) override {
+ controller_->OnMessageFromSink(message);
+ }
+
+ void StartDataStreams(
+ mojo::PendingRemote<::media::mojom::RemotingDataStreamReceiver>
+ audio_stream,
+ mojo::PendingRemote<::media::mojom::RemotingDataStreamReceiver>
+ video_stream) override {
+ if (audio_stream.is_valid()) {
+ // initialize data pipe for audio data stream receiver
+ mojo::ScopedDataPipeConsumerHandle audio_data_pipe;
+ audio_stream_.Bind(std::move(audio_stream));
+ audio_buffer_writer_ = ::media::MojoDecoderBufferWriter::Create(
+ GetDefaultDecoderBufferConverterCapacity(
+ ::media::DemuxerStream::AUDIO),
+ &audio_data_pipe);
+ audio_stream_->InitializeDataPipe(std::move(audio_data_pipe));
+ }
+
+ if (video_stream.is_valid()) {
+ // initialize data pipe for video data stream receiver
+ mojo::ScopedDataPipeConsumerHandle video_data_pipe;
+ video_stream_.Bind(std::move(video_stream));
+ video_buffer_writer_ = ::media::MojoDecoderBufferWriter::Create(
+ GetDefaultDecoderBufferConverterCapacity(
+ ::media::DemuxerStream::VIDEO),
+ &video_data_pipe);
+ video_stream_->InitializeDataPipe(std::move(video_data_pipe));
+ }
+ }
+
+ void OnFlushUntil(uint32_t audio_frame_count,
+ uint32_t video_frame_count) override {}
+
+ void OnVideoNaturalSizeChange(const gfx::Size& size) override {}
+
+ private:
+ RendererController* controller_;
+
+ std::unique_ptr<MojoDecoderBufferWriter> audio_buffer_writer_;
+ std::unique_ptr<MojoDecoderBufferWriter> video_buffer_writer_;
+
+ mojo::Remote<mojom::RemotingDataStreamReceiver> audio_stream_;
+ mojo::Remote<mojom::RemotingDataStreamReceiver> video_stream_;
+
+ mojo::Remote<mojom::RemotingSink> receiver_controller_;
+ mojo::Receiver<mojom::Remotee> mojo_receiver_{this};
+};
+
End2EndTestRenderer::End2EndTestRenderer(std::unique_ptr<Renderer> renderer)
- : receiver_rpc_broker_(
- base::BindRepeating(&End2EndTestRenderer::OnMessageFromSink,
- base::Unretained(this))),
- receiver_(new Receiver(std::move(renderer), &receiver_rpc_broker_)) {
+ : courier_renderer_initialized_(false), receiver_initialized_(false) {
+ // create sender components
controller_ = CreateController(
base::BindRepeating(&End2EndTestRenderer::SendMessageToSink,
weak_factory_.GetWeakPtr()),
base::BindRepeating(&End2EndTestRenderer::SendFrameToSink,
weak_factory_.GetWeakPtr()));
- courier_renderer_.reset(new CourierRenderer(
- base::ThreadTaskRunnerHandle::Get(), controller_->GetWeakPtr(), nullptr));
+ courier_renderer_ = std::make_unique<CourierRenderer>(
+ base::ThreadTaskRunnerHandle::Get(), controller_->GetWeakPtr(), nullptr);
+
+ // create receiver components
+ media_remotee_ = std::make_unique<TestRemotee>(controller_.get());
+
+ receiver_controller_ = ReceiverController::GetInstance();
+ ResetForTesting(receiver_controller_);
+
+ receiver_rpc_broker_ = receiver_controller_->rpc_broker();
+ receiver_renderer_handle_ = receiver_rpc_broker_->GetUniqueHandle();
+
+ receiver_rpc_broker_->RegisterMessageReceiverCallback(
+ RpcBroker::kAcquireRendererHandle,
+ base::BindRepeating(&End2EndTestRenderer::OnReceivedRpc,
+ weak_factory_.GetWeakPtr()));
+
+ receiver_ = std::make_unique<Receiver>(
+ receiver_renderer_handle_, sender_renderer_handle_, receiver_controller_,
+ base::ThreadTaskRunnerHandle::Get(), std::move(renderer),
+ base::BindOnce(&End2EndTestRenderer::OnAcquireRendererDone,
+ weak_factory_.GetWeakPtr()));
+
+ mojo::PendingRemote<media::mojom::Remotee> remotee;
+ media_remotee_->BindMojoReceiver(remotee.InitWithNewPipeAndPassReceiver());
+ receiver_controller_->Initialize(std::move(remotee));
+ stream_provider_ = std::make_unique<StreamProvider>(
+ receiver_controller_, base::ThreadTaskRunnerHandle::Get());
}
-End2EndTestRenderer::~End2EndTestRenderer() = default;
+End2EndTestRenderer::~End2EndTestRenderer() {
+ receiver_rpc_broker_->UnregisterMessageReceiverCallback(
+ RpcBroker::kAcquireRendererHandle);
+}
void End2EndTestRenderer::Initialize(MediaResource* media_resource,
RendererClient* client,
PipelineStatusCallback init_cb) {
- courier_renderer_->Initialize(media_resource, client, std::move(init_cb));
+ init_cb_ = std::move(init_cb);
+
+ stream_provider_->Initialize(
+ nullptr, base::BindOnce(&End2EndTestRenderer::InitializeReceiverRenderer,
+ weak_factory_.GetWeakPtr()));
+
+ courier_renderer_->Initialize(
+ media_resource, client,
+ base::BindOnce(&End2EndTestRenderer::OnCourierRendererInitialized,
+ weak_factory_.GetWeakPtr()));
+}
+
+void End2EndTestRenderer::InitializeReceiverRenderer(PipelineStatus status) {
+ DCHECK_EQ(PIPELINE_OK, status);
+ receiver_->Initialize(
+ stream_provider_.get(), nullptr,
+ base::BindOnce(&End2EndTestRenderer::OnReceiverInitalized,
+ weak_factory_.GetWeakPtr()));
}
-void End2EndTestRenderer::SetCdm(CdmContext* cdm_context,
- CdmAttachedCB cdc_attached_cb) {
- // TODO(xjz): Add the implementation when media remoting starts supporting
- // encrypted contents.
- NOTIMPLEMENTED() << "Media Remoting doesn't support EME for now.";
+void End2EndTestRenderer::OnCourierRendererInitialized(PipelineStatus status) {
+ DCHECK_EQ(PIPELINE_OK, status);
+ courier_renderer_initialized_ = true;
+ CompleteInitialize();
+}
+
+void End2EndTestRenderer::OnReceiverInitalized(PipelineStatus status) {
+ DCHECK_EQ(PIPELINE_OK, status);
+ receiver_initialized_ = true;
+ CompleteInitialize();
+}
+void End2EndTestRenderer::CompleteInitialize() {
+ if (!courier_renderer_initialized_ || !receiver_initialized_)
+ return;
+
+ DCHECK(init_cb_);
+ std::move(init_cb_).Run(PIPELINE_OK);
+}
+
+void End2EndTestRenderer::OnReceivedRpc(
+ std::unique_ptr<media::remoting::pb::RpcMessage> message) {
+ DCHECK(message);
+ DCHECK_EQ(message->proc(),
+ media::remoting::pb::RpcMessage::RPC_ACQUIRE_RENDERER);
+ OnAcquireRenderer(std::move(message));
+}
+
+void End2EndTestRenderer::OnAcquireRenderer(
+ std::unique_ptr<media::remoting::pb::RpcMessage> message) {
+ DCHECK(message->has_integer_value());
+ DCHECK(message->integer_value() != RpcBroker::kInvalidHandle);
+
+ if (sender_renderer_handle_ == RpcBroker::kInvalidHandle) {
+ sender_renderer_handle_ = message->integer_value();
+ receiver_->SetRemoteHandle(sender_renderer_handle_);
+ }
+}
+
+void End2EndTestRenderer::OnAcquireRendererDone(int receiver_renderer_handle) {
+ auto rpc = std::make_unique<pb::RpcMessage>();
+ rpc->set_handle(sender_renderer_handle_);
+ rpc->set_proc(pb::RpcMessage::RPC_ACQUIRE_RENDERER_DONE);
+ rpc->set_integer_value(receiver_renderer_handle);
+ receiver_rpc_broker_->SendMessageToRemote(std::move(rpc));
}
void End2EndTestRenderer::SetLatencyHint(
@@ -187,6 +364,10 @@ void End2EndTestRenderer::SetLatencyHint(
courier_renderer_->SetLatencyHint(latency_hint);
}
+void End2EndTestRenderer::SetPreservesPitch(bool preserves_pitch) {
+ courier_renderer_->SetPreservesPitch(preserves_pitch);
+}
+
void End2EndTestRenderer::Flush(base::OnceClosure flush_cb) {
courier_renderer_->Flush(std::move(flush_cb));
}
@@ -209,19 +390,21 @@ base::TimeDelta End2EndTestRenderer::GetMediaTime() {
void End2EndTestRenderer::SendMessageToSink(
const std::vector<uint8_t>& message) {
- std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage());
- if (!rpc->ParseFromArray(message.data(), message.size())) {
- VLOG(1) << __func__ << ": Received corrupted Rpc message.";
- return;
- }
- receiver_rpc_broker_.ProcessMessageFromRemote(std::move(rpc));
+ media_remotee_->OnMessage(message);
}
-void End2EndTestRenderer::SendFrameToSink(const std::vector<uint8_t>& frame,
+void End2EndTestRenderer::SendFrameToSink(uint32_t frame_count,
+ const std::vector<uint8_t>& frame,
DemuxerStream::Type type) {
scoped_refptr<DecoderBuffer> decoder_buffer =
ByteArrayToDecoderBuffer(frame.data(), frame.size());
- receiver_->OnReceivedBuffer(type, decoder_buffer);
+ if (type == DemuxerStream::Type::AUDIO) {
+ media_remotee_->OnAudioFrame(frame_count, decoder_buffer);
+ } else if (type == DemuxerStream::Type::VIDEO) {
+ media_remotee_->OnVideoFrame(frame_count, decoder_buffer);
+ } else {
+ NOTREACHED();
+ }
}
void End2EndTestRenderer::OnMessageFromSink(
diff --git a/chromium/media/remoting/end2end_test_renderer.h b/chromium/media/remoting/end2end_test_renderer.h
index 1e8e635f878..905e27f5776 100644
--- a/chromium/media/remoting/end2end_test_renderer.h
+++ b/chromium/media/remoting/end2end_test_renderer.h
@@ -5,12 +5,14 @@
#ifndef MEDIA_REMOTING_END2END_RENDERER_H_
#define MEDIA_REMOTING_END2END_RENDERER_H_
+#include <memory>
#include <vector>
#include "base/memory/weak_ptr.h"
#include "media/base/demuxer_stream.h"
#include "media/base/renderer.h"
#include "media/remoting/rpc_broker.h"
+#include "media/remoting/stream_provider.h"
namespace media {
namespace remoting {
@@ -18,6 +20,7 @@ namespace remoting {
class RendererController;
class CourierRenderer;
class Receiver;
+class ReceiverController;
// Simulates the media remoting pipeline.
class End2EndTestRenderer final : public Renderer {
@@ -29,8 +32,8 @@ class End2EndTestRenderer final : public Renderer {
void Initialize(MediaResource* media_resource,
RendererClient* client,
PipelineStatusCallback init_cb) override;
- void SetCdm(CdmContext* cdm_context, CdmAttachedCB cdm_attached_cb) override;
void SetLatencyHint(base::Optional<base::TimeDelta> latency_hint) override;
+ void SetPreservesPitch(bool preserves_pitch) override;
void Flush(base::OnceClosure flush_cb) override;
void StartPlayingFrom(base::TimeDelta time) override;
void SetPlaybackRate(double playback_rate) override;
@@ -46,28 +49,55 @@ class End2EndTestRenderer final : public Renderer {
base::OnceClosure change_completed_cb) override;
private:
+ class TestRemotee;
+
+ void InitTestApi();
+
// Called to send RPC messages to |receiver_|.
void SendMessageToSink(const std::vector<uint8_t>& message);
// Called to send frame data to |receiver_|.
- void SendFrameToSink(const std::vector<uint8_t>& data,
+ void SendFrameToSink(uint32_t frame_count,
+ const std::vector<uint8_t>& data,
DemuxerStream::Type type);
// Called when receives RPC messages from |receiver_|.
void OnMessageFromSink(std::unique_ptr<std::vector<uint8_t>> message);
+ void InitializeReceiverRenderer(PipelineStatus status);
+ void OnCourierRendererInitialized(PipelineStatus status);
+ void OnReceiverInitalized(PipelineStatus status);
+ void CompleteInitialize();
+
+ // Callback function when RPC message is received.
+ void OnReceivedRpc(std::unique_ptr<media::remoting::pb::RpcMessage> message);
+ void OnAcquireRenderer(
+ std::unique_ptr<media::remoting::pb::RpcMessage> message);
+ void OnAcquireRendererDone(int receiver_renderer_handle);
+
+ PipelineStatusCallback init_cb_;
+
+ bool courier_renderer_initialized_;
+ bool receiver_initialized_;
+
+ // Sender components.
std::unique_ptr<RendererController> controller_;
std::unique_ptr<CourierRenderer> courier_renderer_;
- // The RpcBroker to handle the RPC messages to/from |receiver_|.
- RpcBroker receiver_rpc_broker_;
-
- // A receiver that renders media streams.
+ // Receiver components.
+ std::unique_ptr<TestRemotee> media_remotee_;
+ ReceiverController* receiver_controller_;
std::unique_ptr<Receiver> receiver_;
+ std::unique_ptr<StreamProvider> stream_provider_;
+ RpcBroker* receiver_rpc_broker_;
- base::WeakPtrFactory<End2EndTestRenderer> weak_factory_{this};
+ // Handle of |receiver_|
+ int receiver_renderer_handle_ = RpcBroker::kInvalidHandle;
+ // Handle of |courier_renderer_|, it would be sent with AcquireRenderer
+ // message.
+ int sender_renderer_handle_ = RpcBroker::kInvalidHandle;
- DISALLOW_COPY_AND_ASSIGN(End2EndTestRenderer);
+ base::WeakPtrFactory<End2EndTestRenderer> weak_factory_{this};
};
} // namespace remoting
diff --git a/chromium/media/remoting/fake_media_resource.cc b/chromium/media/remoting/fake_media_resource.cc
index e77cf87e10c..710b84a8931 100644
--- a/chromium/media/remoting/fake_media_resource.cc
+++ b/chromium/media/remoting/fake_media_resource.cc
@@ -101,13 +101,15 @@ void FakeDemuxerStream::CreateFakeFrame(size_t size,
}
FakeMediaResource::FakeMediaResource()
- : demuxer_stream_(new FakeDemuxerStream(true)) {}
+ : audio_stream_(new FakeDemuxerStream(true)),
+ video_stream_(new FakeDemuxerStream(false)) {}
FakeMediaResource::~FakeMediaResource() = default;
std::vector<DemuxerStream*> FakeMediaResource::GetAllStreams() {
std::vector<DemuxerStream*> streams;
- streams.push_back(demuxer_stream_.get());
+ streams.push_back(audio_stream_.get());
+ streams.push_back(video_stream_.get());
return streams;
}
diff --git a/chromium/media/remoting/fake_media_resource.h b/chromium/media/remoting/fake_media_resource.h
index 93b53180a05..f6391f8a033 100644
--- a/chromium/media/remoting/fake_media_resource.h
+++ b/chromium/media/remoting/fake_media_resource.h
@@ -5,6 +5,8 @@
#ifndef MEDIA_REMOTING_FAKE_MEDIA_RESOURCE_H_
#define MEDIA_REMOTING_FAKE_MEDIA_RESOURCE_H_
+#include <memory>
+
#include "base/containers/circular_deque.h"
#include "media/base/audio_decoder_config.h"
#include "media/base/demuxer_stream.h"
@@ -54,7 +56,8 @@ class FakeMediaResource : public MediaResource {
std::vector<DemuxerStream*> GetAllStreams() override;
private:
- std::unique_ptr<FakeDemuxerStream> demuxer_stream_;
+ std::unique_ptr<FakeDemuxerStream> audio_stream_;
+ std::unique_ptr<FakeDemuxerStream> video_stream_;
DISALLOW_COPY_AND_ASSIGN(FakeMediaResource);
};
diff --git a/chromium/media/remoting/integration_test.cc b/chromium/media/remoting/integration_test.cc
index bc0aa888909..bbac6f6a8a8 100644
--- a/chromium/media/remoting/integration_test.cc
+++ b/chromium/media/remoting/integration_test.cc
@@ -72,8 +72,7 @@ TEST_F(MediaRemotingIntegrationTest, MediaSource_ConfigChange_WebM) {
Stop();
}
-// Flaky: http://crbug.com/1043812.
-TEST_F(MediaRemotingIntegrationTest, DISABLED_SeekWhilePlaying) {
+TEST_F(MediaRemotingIntegrationTest, SeekWhilePlaying) {
ASSERT_EQ(PIPELINE_OK, Start("bear-320x240.webm"));
base::TimeDelta duration(pipeline_->GetMediaDuration());
diff --git a/chromium/media/remoting/media_remoting_rpc.proto b/chromium/media/remoting/media_remoting_rpc.proto
index bea8599a538..a7b584f07ac 100644
--- a/chromium/media/remoting/media_remoting_rpc.proto
+++ b/chromium/media/remoting/media_remoting_rpc.proto
@@ -337,6 +337,11 @@ enum CdmSessionType {
kPersistentUsageRecord = 2;
};
+message AcquireDemuxer {
+ optional int32 audio_demuxer_handle = 1;
+ optional int32 video_demuxer_handle = 2;
+}
+
message RendererInitialize {
optional int32 client_handle = 1;
optional int32 audio_demuxer_handle = 2;
@@ -495,6 +500,7 @@ message RpcMessage {
RPC_ACQUIRE_RENDERER_DONE = 2;
RPC_ACQUIRE_CDM = 3;
RPC_ACQUIRE_CDM_DONE = 4;
+ RPC_ACQUIRE_DEMUXER = 5;
// Renderer message
RPC_R_INITIALIZE = 1000;
RPC_R_FLUSHUNTIL = 1001;
@@ -522,6 +528,7 @@ message RpcMessage {
RPC_DS_INITIALIZE = 3000;
RPC_DS_READUNTIL = 3001;
RPC_DS_ENABLEBITSTREAMCONVERTER = 3002;
+ RPC_DS_ONERROR = 3003;
// DemuxerStream callbacks
RPC_DS_INITIALIZE_CALLBACK = 3100;
RPC_DS_READUNTIL_CALLBACK = 3101;
@@ -594,6 +601,9 @@ message RpcMessage {
// RPC_R_SETCDM
RendererSetCdm renderer_set_cdm_rpc = 102;
+ // RPC_ACQUIRE_DEMUXER
+ AcquireDemuxer acquire_demuxer_rpc = 103;
+
// RPC_RC_ONTIMEUPDATE
RendererClientOnTimeUpdate rendererclient_ontimeupdate_rpc = 200;
// RPC_RC_ONVIDEONATURALSIZECHANGE
diff --git a/chromium/media/remoting/mock_receiver_controller.cc b/chromium/media/remoting/mock_receiver_controller.cc
new file mode 100644
index 00000000000..b0e742d7635
--- /dev/null
+++ b/chromium/media/remoting/mock_receiver_controller.cc
@@ -0,0 +1,118 @@
+// Copyright 2020 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "media/remoting/mock_receiver_controller.h"
+
+#include "base/check.h"
+#include "media/mojo/common/mojo_decoder_buffer_converter.h"
+#include "media/remoting/test_utils.h"
+#include "mojo/public/cpp/system/data_pipe.h"
+
+namespace media {
+namespace remoting {
+
+MockRemotee::MockRemotee() = default;
+
+MockRemotee::~MockRemotee() = default;
+
+void MockRemotee::BindMojoReceiver(mojo::PendingReceiver<Remotee> receiver) {
+ DCHECK(receiver);
+ receiver_.Bind(std::move(receiver));
+}
+
+void MockRemotee::SendAudioFrame(uint32_t frame_count,
+ scoped_refptr<DecoderBuffer> buffer) {
+ mojom::DecoderBufferPtr mojo_buffer =
+ audio_buffer_writer_->WriteDecoderBuffer(std::move(buffer));
+ audio_stream_->ReceiveFrame(frame_count, std::move(mojo_buffer));
+}
+
+void MockRemotee::SendVideoFrame(uint32_t frame_count,
+ scoped_refptr<DecoderBuffer> buffer) {
+ mojom::DecoderBufferPtr mojo_buffer =
+ video_buffer_writer_->WriteDecoderBuffer(std::move(buffer));
+ video_stream_->ReceiveFrame(frame_count, std::move(mojo_buffer));
+}
+
+void MockRemotee::OnRemotingSinkReady(
+ mojo::PendingRemote<mojom::RemotingSink> remoting_sink) {
+ DCHECK(remoting_sink);
+ remoting_sink_.Bind(std::move(remoting_sink));
+}
+
+void MockRemotee::SendMessageToSource(const std::vector<uint8_t>& message) {}
+
+void MockRemotee::StartDataStreams(
+ mojo::PendingRemote<mojom::RemotingDataStreamReceiver> audio_stream,
+ mojo::PendingRemote<mojom::RemotingDataStreamReceiver> video_stream) {
+ if (audio_stream.is_valid()) {
+ // Initialize data pipe for audio data stream receiver.
+ audio_stream_.Bind(std::move(audio_stream));
+ mojo::ScopedDataPipeConsumerHandle audio_data_pipe;
+ audio_buffer_writer_ = MojoDecoderBufferWriter::Create(
+ GetDefaultDecoderBufferConverterCapacity(DemuxerStream::AUDIO),
+ &audio_data_pipe);
+ audio_stream_->InitializeDataPipe(std::move(audio_data_pipe));
+ }
+
+ if (video_stream.is_valid()) {
+ // Initialize data pipe for video data stream receiver.
+ video_stream_.Bind(std::move(video_stream));
+ mojo::ScopedDataPipeConsumerHandle video_data_pipe;
+ video_buffer_writer_ = MojoDecoderBufferWriter::Create(
+ GetDefaultDecoderBufferConverterCapacity(DemuxerStream::VIDEO),
+ &video_data_pipe);
+ video_stream_->InitializeDataPipe(std::move(video_data_pipe));
+ }
+}
+
+void MockRemotee::OnFlushUntil(uint32_t audio_count, uint32_t video_count) {
+ flush_audio_count_ = audio_count;
+ flush_video_count_ = video_count;
+
+ if (audio_stream_.is_bound()) {
+ audio_stream_->FlushUntil(audio_count);
+ }
+ if (video_stream_.is_bound()) {
+ video_stream_->FlushUntil(video_count);
+ }
+}
+
+void MockRemotee::OnVideoNaturalSizeChange(const gfx::Size& size) {
+ DCHECK(!size.IsEmpty());
+ changed_size_ = size;
+}
+
+void MockRemotee::Reset() {
+ audio_stream_.reset();
+ video_stream_.reset();
+ receiver_.reset();
+ remoting_sink_.reset();
+}
+
+// static
+MockReceiverController* MockReceiverController::GetInstance() {
+ static base::NoDestructor<MockReceiverController> controller;
+ ResetForTesting(controller.get());
+ controller->mock_remotee_->Reset();
+ return controller.get();
+}
+
+MockReceiverController::MockReceiverController()
+ : mock_remotee_(new MockRemotee()) {
+ // Overwrites |rpc_broker_|.
+ rpc_broker_.SetMessageCallbackForTesting(base::BindRepeating(
+ &MockReceiverController::OnSendRpc, base::Unretained(this)));
+}
+
+MockReceiverController::~MockReceiverController() = default;
+
+void MockReceiverController::OnSendRpc(
+ std::unique_ptr<std::vector<uint8_t>> message) {
+ std::vector<uint8_t> binary_message = *message;
+ ReceiverController::OnMessageFromSource(binary_message);
+}
+
+} // namespace remoting
+} // namespace media
diff --git a/chromium/media/remoting/mock_receiver_controller.h b/chromium/media/remoting/mock_receiver_controller.h
new file mode 100644
index 00000000000..e4c6da6c37b
--- /dev/null
+++ b/chromium/media/remoting/mock_receiver_controller.h
@@ -0,0 +1,96 @@
+// Copyright 2020 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MEDIA_REMOTING_MOCK_RECEIVER_CONTROLLER_H_
+#define MEDIA_REMOTING_MOCK_RECEIVER_CONTROLLER_H_
+
+#include <memory>
+#include <vector>
+
+#include "base/memory/scoped_refptr.h"
+#include "base/no_destructor.h"
+#include "media/mojo/mojom/remoting.mojom.h"
+#include "media/remoting/receiver_controller.h"
+#include "testing/gmock/include/gmock/gmock.h"
+#include "testing/gtest/include/gtest/gtest.h"
+#include "ui/gfx/geometry/size.h"
+
+namespace media {
+
+class MojoDecoderBufferWriter;
+
+namespace remoting {
+
+class MockRemotee : public mojom::Remotee {
+ public:
+ MockRemotee();
+ ~MockRemotee() override;
+
+ void BindMojoReceiver(mojo::PendingReceiver<Remotee> receiver);
+
+ void SendAudioFrame(uint32_t frame_count,
+ scoped_refptr<DecoderBuffer> buffer);
+ void SendVideoFrame(uint32_t frame_count,
+ scoped_refptr<DecoderBuffer> buffer);
+
+ // mojom::Remotee implementation
+ void OnRemotingSinkReady(
+ mojo::PendingRemote<mojom::RemotingSink> remoting_sink) override;
+ void SendMessageToSource(const std::vector<uint8_t>& message) override;
+ void StartDataStreams(
+ mojo::PendingRemote<mojom::RemotingDataStreamReceiver> audio_stream,
+ mojo::PendingRemote<mojom::RemotingDataStreamReceiver> video_stream)
+ override;
+ void OnFlushUntil(uint32_t audio_count, uint32_t video_count) override;
+ void OnVideoNaturalSizeChange(const gfx::Size& size) override;
+
+ void Reset();
+
+ gfx::Size changed_size() { return changed_size_; }
+ uint32_t flush_audio_count() { return flush_audio_count_; }
+ uint32_t flush_video_count() { return flush_video_count_; }
+
+ mojo::PendingRemote<mojom::Remotee> BindNewPipeAndPassRemote() {
+ return receiver_.BindNewPipeAndPassRemote();
+ }
+
+ mojo::Remote<mojom::RemotingDataStreamReceiver> audio_stream_;
+ mojo::Remote<mojom::RemotingDataStreamReceiver> video_stream_;
+
+ private:
+ gfx::Size changed_size_;
+
+ uint32_t flush_audio_count_{0};
+ uint32_t flush_video_count_{0};
+
+ std::unique_ptr<MojoDecoderBufferWriter> audio_buffer_writer_;
+ std::unique_ptr<MojoDecoderBufferWriter> video_buffer_writer_;
+
+ mojo::Remote<mojom::RemotingSink> remoting_sink_;
+ mojo::Receiver<mojom::Remotee> receiver_{this};
+};
+
+class MockReceiverController : public ReceiverController {
+ public:
+ static MockReceiverController* GetInstance();
+
+ MockRemotee* mock_remotee() { return mock_remotee_.get(); }
+
+ private:
+ friend base::NoDestructor<MockReceiverController>;
+ friend testing::StrictMock<MockReceiverController>;
+ friend testing::NiceMock<MockReceiverController>;
+
+ MockReceiverController();
+ ~MockReceiverController() override;
+
+ void OnSendRpc(std::unique_ptr<std::vector<uint8_t>> message);
+
+ std::unique_ptr<MockRemotee> mock_remotee_;
+};
+
+} // namespace remoting
+} // namespace media
+
+#endif // MEDIA_REMOTING_MOCK_RECEIVER_CONTROLLER_H_
diff --git a/chromium/media/remoting/receiver.cc b/chromium/media/remoting/receiver.cc
index 6db0c7d3b9d..40bffe10dd3 100644
--- a/chromium/media/remoting/receiver.cc
+++ b/chromium/media/remoting/receiver.cc
@@ -6,10 +6,14 @@
#include "base/bind.h"
#include "base/callback.h"
+#include "base/single_thread_task_runner.h"
+#include "base/threading/thread_task_runner_handle.h"
+#include "media/base/bind_to_current_loop.h"
#include "media/base/decoder_buffer.h"
#include "media/base/renderer.h"
#include "media/remoting/proto_enum_utils.h"
#include "media/remoting/proto_utils.h"
+#include "media/remoting/receiver_controller.h"
#include "media/remoting/stream_provider.h"
namespace media {
@@ -23,116 +27,168 @@ constexpr base::TimeDelta kTimeUpdateInterval =
} // namespace
-Receiver::Receiver(std::unique_ptr<Renderer> renderer, RpcBroker* rpc_broker)
- : renderer_(std::move(renderer)),
- rpc_broker_(rpc_broker),
- rpc_handle_(rpc_broker_->GetUniqueHandle()) {
- DCHECK(renderer_);
+Receiver::Receiver(
+ int rpc_handle,
+ int remote_handle,
+ ReceiverController* receiver_controller,
+ const scoped_refptr<base::SingleThreadTaskRunner>& media_task_runner,
+ std::unique_ptr<Renderer> renderer,
+ base::OnceCallback<void(int)> acquire_renderer_done_cb)
+ : rpc_handle_(rpc_handle),
+ remote_handle_(remote_handle),
+ receiver_controller_(receiver_controller),
+ rpc_broker_(receiver_controller_->rpc_broker()),
+ main_task_runner_(base::ThreadTaskRunnerHandle::Get()),
+ media_task_runner_(media_task_runner),
+ renderer_(std::move(renderer)),
+ acquire_renderer_done_cb_(std::move(acquire_renderer_done_cb)) {
+ DCHECK(rpc_handle_ != RpcBroker::kInvalidHandle);
+ DCHECK(receiver_controller_);
DCHECK(rpc_broker_);
- rpc_broker_->RegisterMessageReceiverCallback(
- rpc_handle_, base::BindRepeating(&Receiver::OnReceivedRpc,
- weak_factory_.GetWeakPtr()));
- rpc_broker_->RegisterMessageReceiverCallback(
- RpcBroker::kAcquireHandle,
- base::BindRepeating(&Receiver::OnReceivedRpc,
- weak_factory_.GetWeakPtr()));
+ DCHECK(renderer_);
+
+ // Note: The constructor is running on the main thread, but will be destroyed
+ // on the media thread. Therefore, all weak pointers must be dereferenced on
+ // the media thread.
+ const RpcBroker::ReceiveMessageCallback receive_callback = BindToLoop(
+ media_task_runner_,
+ BindRepeating(&Receiver::OnReceivedRpc, weak_factory_.GetWeakPtr()));
+
+ // Listening all renderer rpc messages.
+ rpc_broker_->RegisterMessageReceiverCallback(rpc_handle_, receive_callback);
+ VerifyAcquireRendererDone();
}
Receiver::~Receiver() {
rpc_broker_->UnregisterMessageReceiverCallback(rpc_handle_);
- rpc_broker_->UnregisterMessageReceiverCallback(RpcBroker::kAcquireHandle);
+ rpc_broker_->UnregisterMessageReceiverCallback(
+ RpcBroker::kAcquireRendererHandle);
+}
+
+// Receiver::Initialize() will be called by the local pipeline, it would only
+// keep the |init_cb| in order to continue the initialization once it receives
+// RPC_R_INITIALIZE, which means Receiver::RpcInitialize() is called.
+void Receiver::Initialize(MediaResource* media_resource,
+ RendererClient* client,
+ PipelineStatusCallback init_cb) {
+ demuxer_ = media_resource;
+ init_cb_ = std::move(init_cb);
+ ShouldInitializeRenderer();
+}
+
+/* CDM is not supported for remoting media */
+void Receiver::SetCdm(CdmContext* cdm_context, CdmAttachedCB cdm_attached_cb) {
+ NOTREACHED();
+}
+
+// No-op. Controlled by sender via RPC calls instead.
+void Receiver::SetLatencyHint(base::Optional<base::TimeDelta> latency_hint) {}
+
+// No-op. Controlled by sender via RPC calls instead.
+void Receiver::Flush(base::OnceClosure flush_cb) {}
+
+// No-op. Controlled by sender via RPC calls instead.
+void Receiver::StartPlayingFrom(base::TimeDelta time) {}
+
+// No-op. Controlled by sender via RPC calls instead.
+void Receiver::SetPlaybackRate(double playback_rate) {}
+
+// No-op. Controlled by sender via RPC calls instead.
+void Receiver::SetVolume(float volume) {}
+
+// No-op. Controlled by sender via RPC calls instead.
+base::TimeDelta Receiver::GetMediaTime() {
+ return base::TimeDelta();
+}
+
+void Receiver::SendRpcMessageOnMainThread(
+ std::unique_ptr<pb::RpcMessage> message) {
+ // |rpc_broker_| is owned by |receiver_controller_| which is a singleton per
+ // process, so it's safe to use Unretained() here.
+ main_task_runner_->PostTask(
+ FROM_HERE,
+ base::BindOnce(&RpcBroker::SendMessageToRemote,
+ base::Unretained(rpc_broker_), std::move(message)));
}
void Receiver::OnReceivedRpc(std::unique_ptr<pb::RpcMessage> message) {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
DCHECK(message);
switch (message->proc()) {
- case pb::RpcMessage::RPC_ACQUIRE_RENDERER:
- AcquireRenderer(std::move(message));
+ case pb::RpcMessage::RPC_R_INITIALIZE:
+ RpcInitialize(std::move(message));
break;
case pb::RpcMessage::RPC_R_FLUSHUNTIL:
- FlushUntil(std::move(message));
+ RpcFlushUntil(std::move(message));
break;
case pb::RpcMessage::RPC_R_STARTPLAYINGFROM:
- StartPlayingFrom(std::move(message));
+ RpcStartPlayingFrom(std::move(message));
break;
case pb::RpcMessage::RPC_R_SETPLAYBACKRATE:
- SetPlaybackRate(std::move(message));
+ RpcSetPlaybackRate(std::move(message));
break;
case pb::RpcMessage::RPC_R_SETVOLUME:
- SetVolume(std::move(message));
- break;
- case pb::RpcMessage::RPC_R_INITIALIZE:
- Initialize(std::move(message));
+ RpcSetVolume(std::move(message));
break;
default:
- VLOG(1) << __func__ << ": Unknow RPC message. proc=" << message->proc();
+ VLOG(1) << __func__ << ": Unknown RPC message. proc=" << message->proc();
}
}
-void Receiver::AcquireRenderer(std::unique_ptr<pb::RpcMessage> message) {
- DVLOG(3) << __func__ << ": Receives RPC_ACQUIRE_RENDERER with remote_handle= "
- << message->integer_value();
+void Receiver::SetRemoteHandle(int remote_handle) {
+ DCHECK_NE(remote_handle, RpcBroker::kInvalidHandle);
+ DCHECK_EQ(remote_handle_, RpcBroker::kInvalidHandle);
+ remote_handle_ = remote_handle;
+ VerifyAcquireRendererDone();
+}
- remote_handle_ = message->integer_value();
- if (stream_provider_) {
- VLOG(1) << "Acquire renderer error: Already acquired.";
- OnError(PipelineStatus::PIPELINE_ERROR_DECODE);
+void Receiver::VerifyAcquireRendererDone() {
+ if (remote_handle_ == RpcBroker::kInvalidHandle)
return;
- }
-
- stream_provider_.reset(new StreamProvider(
- rpc_broker_,
- base::BindOnce(&Receiver::OnError, weak_factory_.GetWeakPtr(),
- PipelineStatus::PIPELINE_ERROR_DECODE)));
- DVLOG(3) << __func__
- << ": Issues RPC_ACQUIRE_RENDERER_DONE RPC message. remote_handle="
- << remote_handle_ << " rpc_handle=" << rpc_handle_;
- std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage());
- rpc->set_handle(remote_handle_);
- rpc->set_proc(pb::RpcMessage::RPC_ACQUIRE_RENDERER_DONE);
- rpc->set_integer_value(rpc_handle_);
- rpc_broker_->SendMessageToRemote(std::move(rpc));
+ DCHECK(acquire_renderer_done_cb_);
+ std::move(acquire_renderer_done_cb_).Run(rpc_handle_);
}
-void Receiver::Initialize(std::unique_ptr<pb::RpcMessage> message) {
- DCHECK(stream_provider_);
- DVLOG(3) << __func__ << ": Receives RPC_R_INITIALIZE with callback handle= "
- << message->renderer_initialize_rpc().callback_handle();
- DCHECK(message->renderer_initialize_rpc().callback_handle() ==
- remote_handle_);
- if (!stream_provider_)
- OnRendererInitialized(PipelineStatus::PIPELINE_ERROR_INITIALIZATION_FAILED);
-
- stream_provider_->Initialize(
- message->renderer_initialize_rpc().audio_demuxer_handle(),
- message->renderer_initialize_rpc().video_demuxer_handle(),
- base::BindOnce(&Receiver::OnStreamInitialized,
- weak_factory_.GetWeakPtr()));
+void Receiver::RpcInitialize(std::unique_ptr<pb::RpcMessage> message) {
+ DCHECK(renderer_);
+ rpc_initialize_received_ = true;
+ ShouldInitializeRenderer();
}
-void Receiver::OnStreamInitialized() {
- DCHECK(stream_provider_);
- renderer_->Initialize(stream_provider_.get(), this,
+void Receiver::ShouldInitializeRenderer() {
+ // ShouldInitializeRenderer() will be called from Initialize() and
+ // RpcInitialize() in different orders.
+ //
+ // |renderer_| must be initialized when both Initialize() and
+ // RpcInitialize() are called.
+ if (!rpc_initialize_received_ || !init_cb_)
+ return;
+
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
+ DCHECK(renderer_);
+ DCHECK(demuxer_);
+ renderer_->Initialize(demuxer_, this,
base::BindOnce(&Receiver::OnRendererInitialized,
weak_factory_.GetWeakPtr()));
}
void Receiver::OnRendererInitialized(PipelineStatus status) {
- DVLOG(3) << __func__ << ": Issues RPC_R_INITIALIZE_CALLBACK RPC message."
- << "remote_handle=" << remote_handle_;
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
+ DCHECK(init_cb_);
+ std::move(init_cb_).Run(status);
- std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage());
+ auto rpc = std::make_unique<pb::RpcMessage>();
rpc->set_handle(remote_handle_);
rpc->set_proc(pb::RpcMessage::RPC_R_INITIALIZE_CALLBACK);
rpc->set_boolean_value(status == PIPELINE_OK);
- rpc_broker_->SendMessageToRemote(std::move(rpc));
+ SendRpcMessageOnMainThread(std::move(rpc));
}
-void Receiver::SetPlaybackRate(std::unique_ptr<pb::RpcMessage> message) {
+void Receiver::RpcSetPlaybackRate(std::unique_ptr<pb::RpcMessage> message) {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
+
const double playback_rate = message->double_value();
- DVLOG(3) << __func__
- << ": Receives RPC_R_SETPLAYBACKRATE with rate=" << playback_rate;
renderer_->SetPlaybackRate(playback_rate);
if (playback_rate == 0.0) {
@@ -147,38 +203,32 @@ void Receiver::SetPlaybackRate(std::unique_ptr<pb::RpcMessage> message) {
}
}
-void Receiver::FlushUntil(std::unique_ptr<pb::RpcMessage> message) {
- DVLOG(3) << __func__ << ": Receives RPC_R_FLUSHUNTIL RPC message.";
+void Receiver::RpcFlushUntil(std::unique_ptr<pb::RpcMessage> message) {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
+ DCHECK(message->has_renderer_flushuntil_rpc());
const pb::RendererFlushUntil flush_message =
message->renderer_flushuntil_rpc();
DCHECK_EQ(flush_message.callback_handle(), remote_handle_);
- if (stream_provider_) {
- if (flush_message.has_audio_count()) {
- stream_provider_->FlushUntil(DemuxerStream::AUDIO,
- flush_message.audio_count());
- }
- if (flush_message.has_video_count()) {
- stream_provider_->FlushUntil(DemuxerStream::VIDEO,
- flush_message.video_count());
- }
- }
+
+ receiver_controller_->OnRendererFlush(flush_message.audio_count(),
+ flush_message.video_count());
+
time_update_timer_.Stop();
renderer_->Flush(
base::BindOnce(&Receiver::OnFlushDone, weak_factory_.GetWeakPtr()));
}
void Receiver::OnFlushDone() {
- DVLOG(3) << __func__ << ": Issues RPC_R_FLUSHUNTIL_CALLBACK RPC message.";
-
- std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage());
+ auto rpc = std::make_unique<pb::RpcMessage>();
rpc->set_handle(remote_handle_);
rpc->set_proc(pb::RpcMessage::RPC_R_FLUSHUNTIL_CALLBACK);
- rpc_broker_->SendMessageToRemote(std::move(rpc));
+ SendRpcMessageOnMainThread(std::move(rpc));
}
-void Receiver::StartPlayingFrom(std::unique_ptr<pb::RpcMessage> message) {
- DVLOG(3) << __func__ << ": Receives RPC_R_STARTPLAYINGFROM message.";
+void Receiver::RpcStartPlayingFrom(std::unique_ptr<pb::RpcMessage> message) {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
+
base::TimeDelta time =
base::TimeDelta::FromMicroseconds(message->integer64_value());
renderer_->StartPlayingFrom(time);
@@ -194,14 +244,14 @@ void Receiver::ScheduleMediaTimeUpdates() {
weak_factory_.GetWeakPtr()));
}
-void Receiver::SetVolume(std::unique_ptr<pb::RpcMessage> message) {
- DVLOG(3) << __func__ << ": Receives RPC_R_SETVOLUME message.";
+void Receiver::RpcSetVolume(std::unique_ptr<pb::RpcMessage> message) {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
renderer_->SetVolume(message->double_value());
}
void Receiver::SendMediaTimeUpdate() {
// Issues RPC_RC_ONTIMEUPDATE RPC message.
- std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage());
+ auto rpc = std::make_unique<pb::RpcMessage>();
rpc->set_handle(remote_handle_);
rpc->set_proc(pb::RpcMessage::RPC_RC_ONTIMEUPDATE);
auto* message = rpc->mutable_rendererclient_ontimeupdate_rpc();
@@ -209,40 +259,26 @@ void Receiver::SendMediaTimeUpdate() {
message->set_time_usec(media_time.InMicroseconds());
base::TimeDelta max_time = media_time;
message->set_max_time_usec(max_time.InMicroseconds());
- DVLOG(3) << __func__ << ": Issues RPC_RC_ONTIMEUPDATE message."
- << " media_time = " << media_time.InMicroseconds()
- << " max_time= " << max_time.InMicroseconds();
- rpc_broker_->SendMessageToRemote(std::move(rpc));
-}
-
-void Receiver::OnReceivedBuffer(DemuxerStream::Type type,
- scoped_refptr<DecoderBuffer> buffer) {
- DVLOG(3) << __func__
- << ": type=" << (type == DemuxerStream::AUDIO ? "Audio" : "Video");
- DCHECK(stream_provider_);
- stream_provider_->AppendBuffer(type, buffer);
+ SendRpcMessageOnMainThread(std::move(rpc));
}
void Receiver::OnError(PipelineStatus status) {
- VLOG(1) << __func__ << ": Issues RPC_RC_ONERROR message.";
- std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage());
+ auto rpc = std::make_unique<pb::RpcMessage>();
rpc->set_handle(remote_handle_);
rpc->set_proc(pb::RpcMessage::RPC_RC_ONERROR);
- rpc_broker_->SendMessageToRemote(std::move(rpc));
+ SendRpcMessageOnMainThread(std::move(rpc));
}
void Receiver::OnEnded() {
- DVLOG(3) << __func__ << ": Issues RPC_RC_ONENDED message.";
- std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage());
+ auto rpc = std::make_unique<pb::RpcMessage>();
rpc->set_handle(remote_handle_);
rpc->set_proc(pb::RpcMessage::RPC_RC_ONENDED);
- rpc_broker_->SendMessageToRemote(std::move(rpc));
+ SendRpcMessageOnMainThread(std::move(rpc));
time_update_timer_.Stop();
}
void Receiver::OnStatisticsUpdate(const PipelineStatistics& stats) {
- DVLOG(3) << __func__ << ": Issues RPC_RC_ONSTATISTICSUPDATE message.";
- std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage());
+ auto rpc = std::make_unique<pb::RpcMessage>();
rpc->set_handle(remote_handle_);
rpc->set_proc(pb::RpcMessage::RPC_RC_ONSTATISTICSUPDATE);
auto* message = rpc->mutable_rendererclient_onstatisticsupdate_rpc();
@@ -252,77 +288,68 @@ void Receiver::OnStatisticsUpdate(const PipelineStatistics& stats) {
message->set_video_frames_dropped(stats.video_frames_dropped);
message->set_audio_memory_usage(stats.audio_memory_usage);
message->set_video_memory_usage(stats.video_memory_usage);
- rpc_broker_->SendMessageToRemote(std::move(rpc));
+ SendRpcMessageOnMainThread(std::move(rpc));
}
void Receiver::OnBufferingStateChange(BufferingState state,
BufferingStateChangeReason reason) {
- DVLOG(3) << __func__
- << ": Issues RPC_RC_ONBUFFERINGSTATECHANGE message: state=" << state;
-
- // The |reason| is determined on the other side of the RPC in CourierRenderer.
- // For now, there is no reason to provide this in the |message| below.
- DCHECK_EQ(reason, BUFFERING_CHANGE_REASON_UNKNOWN);
-
- std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage());
+ auto rpc = std::make_unique<pb::RpcMessage>();
rpc->set_handle(remote_handle_);
rpc->set_proc(pb::RpcMessage::RPC_RC_ONBUFFERINGSTATECHANGE);
auto* message = rpc->mutable_rendererclient_onbufferingstatechange_rpc();
message->set_state(ToProtoMediaBufferingState(state).value());
- rpc_broker_->SendMessageToRemote(std::move(rpc));
+ SendRpcMessageOnMainThread(std::move(rpc));
}
// TODO: Passes |reason| over.
void Receiver::OnWaiting(WaitingReason reason) {
- DVLOG(3) << __func__ << ": Issues RPC_RC_ONWAITINGFORDECRYPTIONKEY message.";
- std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage());
+ auto rpc = std::make_unique<pb::RpcMessage>();
rpc->set_handle(remote_handle_);
rpc->set_proc(pb::RpcMessage::RPC_RC_ONWAITINGFORDECRYPTIONKEY);
- rpc_broker_->SendMessageToRemote(std::move(rpc));
+ SendRpcMessageOnMainThread(std::move(rpc));
}
void Receiver::OnAudioConfigChange(const AudioDecoderConfig& config) {
- DVLOG(3) << __func__ << ": Issues RPC_RC_ONAUDIOCONFIGCHANGE message.";
- std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage());
+ auto rpc = std::make_unique<pb::RpcMessage>();
rpc->set_handle(remote_handle_);
rpc->set_proc(pb::RpcMessage::RPC_RC_ONAUDIOCONFIGCHANGE);
auto* message = rpc->mutable_rendererclient_onaudioconfigchange_rpc();
pb::AudioDecoderConfig* proto_audio_config =
message->mutable_audio_decoder_config();
ConvertAudioDecoderConfigToProto(config, proto_audio_config);
- rpc_broker_->SendMessageToRemote(std::move(rpc));
+ SendRpcMessageOnMainThread(std::move(rpc));
}
void Receiver::OnVideoConfigChange(const VideoDecoderConfig& config) {
- DVLOG(3) << __func__ << ": Issues RPC_RC_ONVIDEOCONFIGCHANGE message.";
- std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage());
+ auto rpc = std::make_unique<pb::RpcMessage>();
rpc->set_handle(remote_handle_);
rpc->set_proc(pb::RpcMessage::RPC_RC_ONVIDEOCONFIGCHANGE);
auto* message = rpc->mutable_rendererclient_onvideoconfigchange_rpc();
pb::VideoDecoderConfig* proto_video_config =
message->mutable_video_decoder_config();
ConvertVideoDecoderConfigToProto(config, proto_video_config);
- rpc_broker_->SendMessageToRemote(std::move(rpc));
+ SendRpcMessageOnMainThread(std::move(rpc));
}
void Receiver::OnVideoNaturalSizeChange(const gfx::Size& size) {
- DVLOG(3) << __func__ << ": Issues RPC_RC_ONVIDEONATURALSIZECHANGE message.";
- std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage());
+ auto rpc = std::make_unique<pb::RpcMessage>();
rpc->set_handle(remote_handle_);
rpc->set_proc(pb::RpcMessage::RPC_RC_ONVIDEONATURALSIZECHANGE);
auto* message = rpc->mutable_rendererclient_onvideonatualsizechange_rpc();
message->set_width(size.width());
message->set_height(size.height());
- rpc_broker_->SendMessageToRemote(std::move(rpc));
+ SendRpcMessageOnMainThread(std::move(rpc));
+
+ // Notify the host.
+ receiver_controller_->OnVideoNaturalSizeChange(size);
}
void Receiver::OnVideoOpacityChange(bool opaque) {
- DVLOG(3) << __func__ << ": Issues RPC_RC_ONVIDEOOPACITYCHANGE message.";
- std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage());
+ auto rpc = std::make_unique<pb::RpcMessage>();
rpc->set_handle(remote_handle_);
rpc->set_proc(pb::RpcMessage::RPC_RC_ONVIDEOOPACITYCHANGE);
rpc->set_boolean_value(opaque);
- rpc_broker_->SendMessageToRemote(std::move(rpc));
+ SendRpcMessageOnMainThread(std::move(rpc));
}
void Receiver::OnVideoFrameRateChange(base::Optional<int>) {}
diff --git a/chromium/media/remoting/receiver.h b/chromium/media/remoting/receiver.h
index 38d5f18f91b..8f0ac1154b2 100644
--- a/chromium/media/remoting/receiver.h
+++ b/chromium/media/remoting/receiver.h
@@ -5,30 +5,61 @@
#ifndef MEDIA_REMOTING_RECEIVER_H_
#define MEDIA_REMOTING_RECEIVER_H_
+#include <memory>
+
+#include "base/callback_forward.h"
+#include "base/memory/scoped_refptr.h"
#include "base/memory/weak_ptr.h"
+#include "base/single_thread_task_runner.h"
#include "base/timer/timer.h"
#include "media/base/buffering_state.h"
#include "media/base/demuxer_stream.h"
+#include "media/base/renderer.h"
#include "media/base/renderer_client.h"
+#include "media/remoting/media_remoting_rpc.pb.h"
#include "media/remoting/rpc_broker.h"
-namespace media {
-class Renderer;
-class DecoderBuffer;
-} // namespace media
+namespace base {
+class SingleThreadTaskRunner;
+} // namespace base
namespace media {
namespace remoting {
+class ReceiverController;
class RpcBroker;
-class StreamProvider;
-// Media remoting receiver. Media streams are rendered by |renderer|.
-// |rpc_broker| outlives this class.
-class Receiver final : public RendererClient {
+// Receiver runs on a remote device, and forwards the information sent from a
+// CourierRenderer to |renderer_|, which actually renders the media.
+//
+// Receiver implements media::Renderer to be able to work with
+// WebMediaPlayerImpl. However, most of the APIs of media::Renderer are dummy
+// functions, because the media playback of the remoting media is not controlled
+// by the local pipeline of WMPI. It should be controlled by the remoting sender
+// via RPC calls. When Receiver receives RPC calls, it will call the
+// corresponding functions of |renderer_| to control the media playback of
+// the remoting media.
+class Receiver final : public Renderer, public RendererClient {
public:
- Receiver(std::unique_ptr<Renderer> renderer, RpcBroker* rpc_broker);
- ~Receiver();
+ Receiver(int rpc_handle,
+ int remote_handle,
+ ReceiverController* receiver_controller,
+ const scoped_refptr<base::SingleThreadTaskRunner>& media_task_runner,
+ std::unique_ptr<Renderer> renderer,
+ base::OnceCallback<void(int)> acquire_renderer_done_cb);
+ ~Receiver() override;
+
+ // Renderer implementation
+ void Initialize(MediaResource* media_resource,
+ RendererClient* client,
+ PipelineStatusCallback init_cb) override;
+ void SetCdm(CdmContext* cdm_context, CdmAttachedCB cdm_attached_cb) override;
+ void SetLatencyHint(base::Optional<base::TimeDelta> latency_hint) override;
+ void Flush(base::OnceClosure flush_cb) override;
+ void StartPlayingFrom(base::TimeDelta time) override;
+ void SetPlaybackRate(double playback_rate) override;
+ void SetVolume(float volume) override;
+ base::TimeDelta GetMediaTime() override;
// RendererClient implementation.
void OnError(PipelineStatus status) override;
@@ -43,46 +74,70 @@ class Receiver final : public RendererClient {
void OnVideoOpacityChange(bool opaque) override;
void OnVideoFrameRateChange(base::Optional<int>) override;
- void OnReceivedRpc(std::unique_ptr<pb::RpcMessage> message);
- void OnReceivedBuffer(DemuxerStream::Type type,
- scoped_refptr<DecoderBuffer> buffer);
+ // Used to set |remote_handle_| after Receiver is created, because the remote
+ // handle might be received after Receiver is created.
+ void SetRemoteHandle(int remote_handle);
+
+ base::WeakPtr<Receiver> GetWeakPtr() { return weak_factory_.GetWeakPtr(); }
private:
+ // Send RPC message on |main_task_runner_|.
+ void SendRpcMessageOnMainThread(std::unique_ptr<pb::RpcMessage> message);
+
+ // Callback function when RPC message is received.
+ void OnReceivedRpc(std::unique_ptr<pb::RpcMessage> message);
+
// RPC message handlers.
- void AcquireRenderer(std::unique_ptr<pb::RpcMessage> message);
- void Initialize(std::unique_ptr<pb::RpcMessage> message);
- void SetPlaybackRate(std::unique_ptr<pb::RpcMessage> message);
- void FlushUntil(std::unique_ptr<pb::RpcMessage> message);
- void StartPlayingFrom(std::unique_ptr<pb::RpcMessage> message);
- void SetVolume(std::unique_ptr<pb::RpcMessage> message);
-
- // Initialization callbacks.
- void OnStreamInitialized();
- void OnRendererInitialized(PipelineStatus status);
+ void RpcInitialize(std::unique_ptr<pb::RpcMessage> message);
+ void RpcSetPlaybackRate(std::unique_ptr<pb::RpcMessage> message);
+ void RpcFlushUntil(std::unique_ptr<pb::RpcMessage> message);
+ void RpcStartPlayingFrom(std::unique_ptr<pb::RpcMessage> message);
+ void RpcSetVolume(std::unique_ptr<pb::RpcMessage> message);
+ void ShouldInitializeRenderer();
+ void OnRendererInitialized(PipelineStatus status);
+ void VerifyAcquireRendererDone();
void OnFlushDone();
// Periodically send the UpdateTime RPC message to update the media time.
void ScheduleMediaTimeUpdates();
void SendMediaTimeUpdate();
- const std::unique_ptr<Renderer> renderer_;
- RpcBroker* const rpc_broker_; // Outlives this class.
+ // The callback function to call when |this| is initialized.
+ PipelineStatusCallback init_cb_;
+
+ // Indicates whether |this| received RPC_R_INITIALIZE message or not.
+ bool rpc_initialize_received_ = false;
+
+ // Owns by the WebMediaPlayerImpl instance.
+ MediaResource* demuxer_ = nullptr;
+
+ // The handle of |this| for listening RPC messages.
+ const int rpc_handle_;
- // The CourierRenderer handle on sender side. Set when AcauireRenderer() is
- // called.
- int remote_handle_ = RpcBroker::kInvalidHandle;
+ // The CourierRenderer handle on sender side. |remote_handle_| could be set
+ // through the ctor or SetRemoteHandle().
+ int remote_handle_;
- int rpc_handle_ = RpcBroker::kInvalidHandle;
+ ReceiverController* const receiver_controller_; // Outlives this class.
+ RpcBroker* const rpc_broker_; // Outlives this class.
- std::unique_ptr<StreamProvider> stream_provider_;
+ // Calling SendMessageCallback() of |rpc_broker_| should be on main thread.
+ const scoped_refptr<base::SingleThreadTaskRunner> main_task_runner_;
+
+ // Media tasks should run on media thread.
+ const scoped_refptr<base::SingleThreadTaskRunner> media_task_runner_;
+
+ // |renderer_| is the real renderer to render media.
+ std::unique_ptr<Renderer> renderer_;
+
+ // The callback function to send RPC_ACQUIRE_RENDERER_DONE.
+ base::OnceCallback<void(int)> acquire_renderer_done_cb_;
// The timer to periodically update the media time.
base::RepeatingTimer time_update_timer_;
base::WeakPtrFactory<Receiver> weak_factory_{this};
-
- DISALLOW_COPY_AND_ASSIGN(Receiver);
};
} // namespace remoting
diff --git a/chromium/media/remoting/receiver_controller.cc b/chromium/media/remoting/receiver_controller.cc
new file mode 100644
index 00000000000..549087cf391
--- /dev/null
+++ b/chromium/media/remoting/receiver_controller.cc
@@ -0,0 +1,116 @@
+// Copyright 2020 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "media/remoting/receiver_controller.h"
+
+#include "base/single_thread_task_runner.h"
+#include "base/threading/thread_task_runner_handle.h"
+
+namespace media {
+namespace remoting {
+
+// static
+ReceiverController* ReceiverController::GetInstance() {
+ static base::NoDestructor<ReceiverController> controller;
+ return controller.get();
+}
+
+ReceiverController::ReceiverController()
+ : rpc_broker_(base::BindRepeating(&ReceiverController::OnSendRpc,
+ base::Unretained(this))),
+ main_task_runner_(base::ThreadTaskRunnerHandle::Get()) {}
+
+ReceiverController::~ReceiverController() = default;
+
+void ReceiverController::Initialize(
+ mojo::PendingRemote<mojom::Remotee> remotee) {
+ DCHECK(main_task_runner_->BelongsToCurrentThread());
+ DCHECK(!media_remotee_.is_bound());
+ media_remotee_.Bind(std::move(remotee));
+
+ // Calling NotifyRemotingSinkReady() to notify the host that RemotingSink is
+ // ready.
+ media_remotee_->OnRemotingSinkReady(receiver_.BindNewPipeAndPassRemote());
+}
+
+void ReceiverController::OnRendererFlush(uint32_t audio_count,
+ uint32_t video_count) {
+ if (!main_task_runner_->BelongsToCurrentThread()) {
+ // |this| is a singleton per process, it would be safe to use
+ // base::Unretained() here.
+ main_task_runner_->PostTask(
+ FROM_HERE,
+ base::BindOnce(&ReceiverController::OnRendererFlush,
+ base::Unretained(this), audio_count, video_count));
+ return;
+ }
+
+ if (media_remotee_.is_bound())
+ media_remotee_->OnFlushUntil(audio_count, video_count);
+}
+
+void ReceiverController::OnVideoNaturalSizeChange(const gfx::Size& size) {
+ if (!main_task_runner_->BelongsToCurrentThread()) {
+ // |this| is a singleton per process, it would be safe to use
+ // base::Unretained() here.
+ main_task_runner_->PostTask(
+ FROM_HERE, base::BindOnce(&ReceiverController::OnVideoNaturalSizeChange,
+ base::Unretained(this), size));
+ return;
+ }
+
+ if (media_remotee_.is_bound())
+ media_remotee_->OnVideoNaturalSizeChange(size);
+}
+
+void ReceiverController::StartDataStreams(
+ mojo::PendingRemote<::media::mojom::RemotingDataStreamReceiver>
+ audio_stream,
+ mojo::PendingRemote<::media::mojom::RemotingDataStreamReceiver>
+ video_stream) {
+ if (!main_task_runner_->BelongsToCurrentThread()) {
+ // |this| is a singleton per process, it would be safe to use
+ // base::Unretained() here.
+ main_task_runner_->PostTask(
+ FROM_HERE,
+ base::BindOnce(&ReceiverController::StartDataStreams,
+ base::Unretained(this), std::move(audio_stream),
+ std::move(video_stream)));
+ return;
+ }
+ if (media_remotee_.is_bound()) {
+ media_remotee_->StartDataStreams(std::move(audio_stream),
+ std::move(video_stream));
+ }
+}
+
+void ReceiverController::OnMessageFromSource(
+ const std::vector<uint8_t>& message) {
+ DCHECK(main_task_runner_->BelongsToCurrentThread());
+ auto rpc_message = std::make_unique<pb::RpcMessage>(pb::RpcMessage());
+ if (!rpc_message->ParseFromArray(message.data(), message.size()))
+ return;
+
+ rpc_broker_.ProcessMessageFromRemote(std::move(rpc_message));
+}
+
+void ReceiverController::OnSendRpc(
+ std::unique_ptr<std::vector<uint8_t>> message) {
+ if (!main_task_runner_->BelongsToCurrentThread()) {
+ // |this| is a singleton per process, it would be safe to use
+ // base::Unretained() here.
+ main_task_runner_->PostTask(
+ FROM_HERE, base::BindOnce(&ReceiverController::OnSendRpc,
+ base::Unretained(this), std::move(message)));
+ return;
+ }
+
+ DCHECK(media_remotee_.is_bound());
+ std::vector<uint8_t> binary_message = *message;
+ if (media_remotee_.is_bound())
+ media_remotee_->SendMessageToSource(binary_message);
+}
+
+} // namespace remoting
+} // namespace media
diff --git a/chromium/media/remoting/receiver_controller.h b/chromium/media/remoting/receiver_controller.h
new file mode 100644
index 00000000000..1071de2760a
--- /dev/null
+++ b/chromium/media/remoting/receiver_controller.h
@@ -0,0 +1,70 @@
+// Copyright 2020 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MEDIA_REMOTING_RECEIVER_CONTROLLER_H_
+#define MEDIA_REMOTING_RECEIVER_CONTROLLER_H_
+
+#include <memory>
+
+#include "base/no_destructor.h"
+#include "media/mojo/mojom/remoting.mojom.h"
+#include "media/remoting/rpc_broker.h"
+#include "mojo/public/cpp/bindings/pending_remote.h"
+#include "mojo/public/cpp/bindings/receiver.h"
+#include "mojo/public/cpp/bindings/remote.h"
+
+namespace media {
+namespace remoting {
+
+// ReceiverController is the bridge that owns |rpc_broker_| to allow Receivers
+// and StreamProvider::MediaStreams to communicate with the sender via RPC
+// calls.
+//
+// It also forwards calls to a |media_remotee_| instance, which will be
+// implemented the browser process. Currently, the only use case will be on
+// Chromecast, the Remotee implementation will be implemented in the browser
+// code on Chromecast.
+//
+// NOTE: ReceiverController is a singleton per process.
+class ReceiverController : mojom::RemotingSink {
+ public:
+ static ReceiverController* GetInstance();
+ void Initialize(mojo::PendingRemote<mojom::Remotee> remotee);
+
+ // Proxy functions to |media_remotee_|.
+ void OnRendererFlush(uint32_t audio_count, uint32_t video_count);
+ void OnVideoNaturalSizeChange(const gfx::Size& size);
+ void StartDataStreams(
+ mojo::PendingRemote<mojom::RemotingDataStreamReceiver> audio_stream,
+ mojo::PendingRemote<mojom::RemotingDataStreamReceiver> video_stream);
+
+ // The reference of |rpc_broker_|.
+ media::remoting::RpcBroker* rpc_broker() { return &rpc_broker_; }
+
+ private:
+ friend base::NoDestructor<ReceiverController>;
+ friend class MockReceiverController;
+ friend void ResetForTesting(ReceiverController* controller);
+
+ ReceiverController();
+ ~ReceiverController() override;
+
+ // media::mojom::RemotingSink implementation.
+ void OnMessageFromSource(const std::vector<uint8_t>& message) override;
+
+ // Callback for |rpc_broker_| to send messages.
+ void OnSendRpc(std::unique_ptr<std::vector<uint8_t>> message);
+
+ RpcBroker rpc_broker_;
+
+ const scoped_refptr<base::SingleThreadTaskRunner> main_task_runner_;
+
+ mojo::Remote<media::mojom::Remotee> media_remotee_;
+ mojo::Receiver<media::mojom::RemotingSink> receiver_{this};
+};
+
+} // namespace remoting
+} // namespace media
+
+#endif // MEDIA_REMOTING_RECEIVER_CONTROLLER_H_
diff --git a/chromium/media/remoting/receiver_unittest.cc b/chromium/media/remoting/receiver_unittest.cc
new file mode 100644
index 00000000000..94cb8cc50ef
--- /dev/null
+++ b/chromium/media/remoting/receiver_unittest.cc
@@ -0,0 +1,471 @@
+// Copyright 2020 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "media/remoting/receiver.h"
+
+#include "base/check.h"
+#include "base/optional.h"
+#include "base/test/gmock_callback_support.h"
+#include "base/test/task_environment.h"
+#include "media/base/audio_decoder_config.h"
+#include "media/base/media_util.h"
+#include "media/base/mock_filters.h"
+#include "media/base/renderer.h"
+#include "media/base/test_helpers.h"
+#include "media/base/video_decoder_config.h"
+#include "media/remoting/mock_receiver_controller.h"
+#include "media/remoting/proto_enum_utils.h"
+#include "media/remoting/proto_utils.h"
+#include "testing/gmock/include/gmock/gmock.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+using base::test::RunOnceCallback;
+using testing::_;
+using testing::AtLeast;
+using testing::NiceMock;
+using testing::StrictMock;
+
+namespace media {
+namespace remoting {
+
+class MockSender {
+ public:
+ MockSender(RpcBroker* rpc_broker, int remote_handle)
+ : rpc_broker_(rpc_broker),
+ rpc_handle_(rpc_broker->GetUniqueHandle()),
+ remote_handle_(remote_handle) {
+ rpc_broker_->RegisterMessageReceiverCallback(
+ rpc_handle_, base::BindRepeating(&MockSender::OnReceivedRpc,
+ base::Unretained(this)));
+ }
+
+ MOCK_METHOD(void, AcquireRendererDone, ());
+ MOCK_METHOD(void, InitializeCallback, (bool));
+ MOCK_METHOD(void, FlushUntilCallback, ());
+ MOCK_METHOD(void, OnTimeUpdate, (int64_t, int64_t));
+ MOCK_METHOD(void, OnBufferingStateChange, (BufferingState));
+ MOCK_METHOD(void, OnEnded, ());
+ MOCK_METHOD(void, OnFatalError, ());
+ MOCK_METHOD(void, OnAudioConfigChange, (AudioDecoderConfig));
+ MOCK_METHOD(void, OnVideoConfigChange, (VideoDecoderConfig));
+ MOCK_METHOD(void, OnVideoNaturalSizeChange, (gfx::Size));
+ MOCK_METHOD(void, OnVideoOpacityChange, (bool));
+ MOCK_METHOD(void, OnStatisticsUpdate, (PipelineStatistics));
+ MOCK_METHOD(void, OnWaiting, ());
+
+ void OnReceivedRpc(std::unique_ptr<pb::RpcMessage> message) {
+ DCHECK(message);
+ switch (message->proc()) {
+ case pb::RpcMessage::RPC_ACQUIRE_RENDERER_DONE:
+ AcquireRendererDone();
+ break;
+ case pb::RpcMessage::RPC_R_INITIALIZE_CALLBACK:
+ InitializeCallback(message->boolean_value());
+ break;
+ case pb::RpcMessage::RPC_R_FLUSHUNTIL_CALLBACK:
+ FlushUntilCallback();
+ break;
+ case pb::RpcMessage::RPC_RC_ONTIMEUPDATE: {
+ DCHECK(message->has_rendererclient_ontimeupdate_rpc());
+ const int64_t time_usec =
+ message->rendererclient_ontimeupdate_rpc().time_usec();
+ const int64_t max_time_usec =
+ message->rendererclient_ontimeupdate_rpc().max_time_usec();
+ OnTimeUpdate(time_usec, max_time_usec);
+ break;
+ }
+ case pb::RpcMessage::RPC_RC_ONBUFFERINGSTATECHANGE: {
+ base::Optional<BufferingState> state = ToMediaBufferingState(
+ message->rendererclient_onbufferingstatechange_rpc().state());
+ if (state.has_value())
+ OnBufferingStateChange(state.value());
+ break;
+ }
+ case pb::RpcMessage::RPC_RC_ONENDED:
+ OnEnded();
+ break;
+ case pb::RpcMessage::RPC_RC_ONERROR:
+ OnFatalError();
+ break;
+ case pb::RpcMessage::RPC_RC_ONAUDIOCONFIGCHANGE: {
+ DCHECK(message->has_rendererclient_onaudioconfigchange_rpc());
+ const auto* audio_config_message =
+ message->mutable_rendererclient_onaudioconfigchange_rpc();
+ const pb::AudioDecoderConfig pb_audio_config =
+ audio_config_message->audio_decoder_config();
+ AudioDecoderConfig out_audio_config;
+ ConvertProtoToAudioDecoderConfig(pb_audio_config, &out_audio_config);
+ DCHECK(out_audio_config.IsValidConfig());
+ OnAudioConfigChange(out_audio_config);
+ break;
+ }
+ case pb::RpcMessage::RPC_RC_ONVIDEOCONFIGCHANGE: {
+ DCHECK(message->has_rendererclient_onvideoconfigchange_rpc());
+ const auto* video_config_message =
+ message->mutable_rendererclient_onvideoconfigchange_rpc();
+ const pb::VideoDecoderConfig pb_video_config =
+ video_config_message->video_decoder_config();
+ VideoDecoderConfig out_video_config;
+ ConvertProtoToVideoDecoderConfig(pb_video_config, &out_video_config);
+ DCHECK(out_video_config.IsValidConfig());
+
+ OnVideoConfigChange(out_video_config);
+ break;
+ }
+ case pb::RpcMessage::RPC_RC_ONVIDEONATURALSIZECHANGE: {
+ DCHECK(message->has_rendererclient_onvideonatualsizechange_rpc());
+
+ gfx::Size size(
+ message->rendererclient_onvideonatualsizechange_rpc().width(),
+ message->rendererclient_onvideonatualsizechange_rpc().height());
+ OnVideoNaturalSizeChange(size);
+ break;
+ }
+ case pb::RpcMessage::RPC_RC_ONVIDEOOPACITYCHANGE:
+ OnVideoOpacityChange(message->boolean_value());
+ break;
+ case pb::RpcMessage::RPC_RC_ONSTATISTICSUPDATE: {
+ DCHECK(message->has_rendererclient_onstatisticsupdate_rpc());
+ auto rpc_message = message->rendererclient_onstatisticsupdate_rpc();
+ PipelineStatistics statistics;
+ statistics.audio_bytes_decoded = rpc_message.audio_bytes_decoded();
+ statistics.video_bytes_decoded = rpc_message.video_bytes_decoded();
+ statistics.video_frames_decoded = rpc_message.video_frames_decoded();
+ statistics.video_frames_dropped = rpc_message.video_frames_dropped();
+ statistics.audio_memory_usage = rpc_message.audio_memory_usage();
+ statistics.video_memory_usage = rpc_message.video_memory_usage();
+ OnStatisticsUpdate(statistics);
+ break;
+ }
+ case pb::RpcMessage::RPC_RC_ONWAITINGFORDECRYPTIONKEY:
+ OnWaiting();
+ break;
+
+ default:
+ VLOG(1) << "Unknown RPC: " << message->proc();
+ }
+ }
+
+ void SendRpcAcquireRenderer() {
+ auto rpc = std::make_unique<pb::RpcMessage>();
+ rpc->set_handle(RpcBroker::kAcquireRendererHandle);
+ rpc->set_proc(pb::RpcMessage::RPC_ACQUIRE_RENDERER);
+ rpc->set_integer_value(rpc_handle_);
+ rpc_broker_->SendMessageToRemote(std::move(rpc));
+ }
+
+ void SendRpcInitialize() {
+ auto rpc = std::make_unique<pb::RpcMessage>();
+ rpc->set_handle(remote_handle_);
+ rpc->set_proc(pb::RpcMessage::RPC_R_INITIALIZE);
+ rpc_broker_->SendMessageToRemote(std::move(rpc));
+ }
+
+ void SendRpcSetPlaybackRate(double playback_rate) {
+ auto rpc = std::make_unique<pb::RpcMessage>();
+ rpc->set_handle(remote_handle_);
+ rpc->set_proc(pb::RpcMessage::RPC_R_SETPLAYBACKRATE);
+ rpc->set_double_value(playback_rate);
+ rpc_broker_->SendMessageToRemote(std::move(rpc));
+ }
+
+ void SendRpcFlushUntil(uint32_t audio_count, uint32_t video_count) {
+ auto rpc = std::make_unique<pb::RpcMessage>();
+ rpc->set_handle(remote_handle_);
+ rpc->set_proc(pb::RpcMessage::RPC_R_FLUSHUNTIL);
+ pb::RendererFlushUntil* message = rpc->mutable_renderer_flushuntil_rpc();
+ message->set_audio_count(audio_count);
+ message->set_video_count(video_count);
+ message->set_callback_handle(rpc_handle_);
+ rpc_broker_->SendMessageToRemote(std::move(rpc));
+ }
+
+ void SendRpcStartPlayingFrom(base::TimeDelta time) {
+ auto rpc = std::make_unique<pb::RpcMessage>();
+ rpc->set_handle(remote_handle_);
+ rpc->set_proc(pb::RpcMessage::RPC_R_STARTPLAYINGFROM);
+ rpc->set_integer64_value(time.InMicroseconds());
+ rpc_broker_->SendMessageToRemote(std::move(rpc));
+ }
+
+ void SendRpcSetVolume(float volume) {
+ auto rpc = std::make_unique<pb::RpcMessage>();
+ rpc->set_handle(remote_handle_);
+ rpc->set_proc(pb::RpcMessage::RPC_R_SETVOLUME);
+ rpc->set_double_value(volume);
+ rpc_broker_->SendMessageToRemote(std::move(rpc));
+ }
+
+ private:
+ RpcBroker* const rpc_broker_;
+ const int rpc_handle_;
+ const int remote_handle_;
+};
+
+class ReceiverTest : public ::testing::Test {
+ public:
+ ReceiverTest() = default;
+
+ void SetUp() override {
+ mock_controller_ = MockReceiverController::GetInstance();
+ mock_controller_->Initialize(
+ mock_controller_->mock_remotee()->BindNewPipeAndPassRemote());
+ mock_remotee_ = mock_controller_->mock_remotee();
+
+ rpc_broker_ = mock_controller_->rpc_broker();
+ receiver_renderer_handle_ = rpc_broker_->GetUniqueHandle();
+
+ mock_sender_ = std::make_unique<StrictMock<MockSender>>(
+ rpc_broker_, receiver_renderer_handle_);
+
+ rpc_broker_->RegisterMessageReceiverCallback(
+ RpcBroker::kAcquireRendererHandle,
+ base::BindRepeating(&ReceiverTest::OnReceivedRpc,
+ weak_factory_.GetWeakPtr()));
+ }
+
+ void TearDown() override {
+ rpc_broker_->UnregisterMessageReceiverCallback(
+ RpcBroker::kAcquireRendererHandle);
+ }
+
+ void OnReceivedRpc(std::unique_ptr<media::remoting::pb::RpcMessage> message) {
+ DCHECK(message);
+ EXPECT_EQ(message->proc(),
+ media::remoting::pb::RpcMessage::RPC_ACQUIRE_RENDERER);
+ OnAcquireRenderer(std::move(message));
+ }
+
+ void OnAcquireRenderer(
+ std::unique_ptr<media::remoting::pb::RpcMessage> message) {
+ DCHECK(message->has_integer_value());
+ DCHECK(message->integer_value() != RpcBroker::kInvalidHandle);
+
+ if (sender_renderer_handle_ == RpcBroker::kInvalidHandle) {
+ sender_renderer_handle_ = message->integer_value();
+ SetRemoteHandle();
+ }
+ }
+
+ void OnAcquireRendererDone(int receiver_renderer_handle) {
+ DVLOG(3) << __func__
+ << ": Issues RPC_ACQUIRE_RENDERER_DONE RPC message. remote_handle="
+ << sender_renderer_handle_
+ << " rpc_handle=" << receiver_renderer_handle;
+ auto rpc = std::make_unique<pb::RpcMessage>();
+ rpc->set_handle(sender_renderer_handle_);
+ rpc->set_proc(pb::RpcMessage::RPC_ACQUIRE_RENDERER_DONE);
+ rpc->set_integer_value(receiver_renderer_handle);
+ rpc_broker_->SendMessageToRemote(std::move(rpc));
+ }
+
+ void CreateReceiver() {
+ auto renderer = std::make_unique<NiceMock<MockRenderer>>();
+ mock_renderer_ = renderer.get();
+ receiver_ = std::make_unique<Receiver>(
+ receiver_renderer_handle_, sender_renderer_handle_, mock_controller_,
+ base::ThreadTaskRunnerHandle::Get(), std::move(renderer),
+ base::BindOnce(&ReceiverTest::OnAcquireRendererDone,
+ weak_factory_.GetWeakPtr()));
+ }
+
+ void SetRemoteHandle() {
+ if (!receiver_)
+ return;
+ receiver_->SetRemoteHandle(sender_renderer_handle_);
+ }
+
+ void InitializeReceiver() {
+ receiver_->Initialize(&mock_media_resource_, nullptr,
+ base::BindOnce(&ReceiverTest::OnRendererInitialized,
+ weak_factory_.GetWeakPtr()));
+ }
+
+ MOCK_METHOD(void, OnRendererInitialized, (PipelineStatus));
+
+ base::test::TaskEnvironment task_environment_;
+
+ int sender_renderer_handle_ = RpcBroker::kInvalidHandle;
+ int receiver_renderer_handle_ = RpcBroker::kInvalidHandle;
+
+ MockMediaResource mock_media_resource_;
+ MockRenderer* mock_renderer_;
+ std::unique_ptr<MockSender> mock_sender_;
+
+ RpcBroker* rpc_broker_;
+ MockRemotee* mock_remotee_;
+ MockReceiverController* mock_controller_;
+ std::unique_ptr<Receiver> receiver_;
+
+ base::WeakPtrFactory<ReceiverTest> weak_factory_{this};
+};
+
+TEST_F(ReceiverTest, AcquireRendererBeforeCreateReceiver) {
+ mock_sender_->SendRpcAcquireRenderer();
+ EXPECT_CALL(*mock_sender_, AcquireRendererDone()).Times(1);
+ CreateReceiver();
+ task_environment_.RunUntilIdle();
+}
+
+TEST_F(ReceiverTest, AcquireRendererAfterCreateReceiver) {
+ CreateReceiver();
+ EXPECT_CALL(*mock_sender_, AcquireRendererDone()).Times(1);
+ mock_sender_->SendRpcAcquireRenderer();
+ task_environment_.RunUntilIdle();
+}
+
+// |Receiver::Initialize| will be called by the local pipeline, and the
+// |Receiver::RpcInitialize| will be called once it received the
+// RPC_R_INITIALIZE messages, so these two initialization functions are possible
+// to be called in difference orders.
+//
+// Call |Receiver::Initialize| first, then send RPC_R_INITIALIZE.
+TEST_F(ReceiverTest, InitializeBeforeRpcInitialize) {
+ EXPECT_CALL(*mock_sender_, AcquireRendererDone()).Times(1);
+ mock_sender_->SendRpcAcquireRenderer();
+ CreateReceiver();
+
+ EXPECT_CALL(*mock_renderer_,
+ OnInitialize(&mock_media_resource_, receiver_.get(), _))
+ .WillOnce(RunOnceCallback<2>(PipelineStatus::PIPELINE_OK));
+ EXPECT_CALL(*this, OnRendererInitialized(PipelineStatus::PIPELINE_OK))
+ .Times(1);
+ EXPECT_CALL(*mock_sender_, InitializeCallback(true)).Times(1);
+
+ InitializeReceiver();
+ mock_sender_->SendRpcInitialize();
+ task_environment_.RunUntilIdle();
+}
+
+// Send RPC_R_INITIALIZE first, then call |Receiver::Initialize|.
+TEST_F(ReceiverTest, InitializeAfterRpcInitialize) {
+ EXPECT_CALL(*mock_sender_, AcquireRendererDone()).Times(1);
+ mock_sender_->SendRpcAcquireRenderer();
+ CreateReceiver();
+
+ EXPECT_CALL(*mock_renderer_,
+ OnInitialize(&mock_media_resource_, receiver_.get(), _))
+ .WillOnce(RunOnceCallback<2>(PipelineStatus::PIPELINE_OK));
+ EXPECT_CALL(*this, OnRendererInitialized(PipelineStatus::PIPELINE_OK))
+ .Times(1);
+ EXPECT_CALL(*mock_sender_, InitializeCallback(true)).Times(1);
+
+ mock_sender_->SendRpcInitialize();
+ InitializeReceiver();
+ task_environment_.RunUntilIdle();
+}
+
+TEST_F(ReceiverTest, RpcRendererMessages) {
+ EXPECT_CALL(*mock_sender_, AcquireRendererDone()).Times(1);
+ mock_sender_->SendRpcAcquireRenderer();
+ CreateReceiver();
+ mock_sender_->SendRpcInitialize();
+ InitializeReceiver();
+ task_environment_.RunUntilIdle();
+
+ // SetVolume
+ const float volume = 0.5;
+ EXPECT_CALL(*mock_renderer_, SetVolume(volume)).Times(1);
+ mock_sender_->SendRpcSetVolume(volume);
+ task_environment_.RunUntilIdle();
+
+ EXPECT_CALL(*mock_sender_, OnTimeUpdate(_, _)).Times(AtLeast(1));
+
+ // SetPlaybackRate
+ const double playback_rate = 1.2;
+ EXPECT_CALL(*mock_renderer_, SetPlaybackRate(playback_rate)).Times(1);
+ mock_sender_->SendRpcSetPlaybackRate(playback_rate);
+ task_environment_.RunUntilIdle();
+
+ // Flush
+ const uint32_t flush_audio_count = 10;
+ const uint32_t flush_video_count = 20;
+ EXPECT_CALL(*mock_renderer_, OnFlush(_)).WillOnce(RunOnceCallback<0>());
+ EXPECT_CALL(*mock_sender_, FlushUntilCallback()).Times(1);
+ mock_sender_->SendRpcFlushUntil(flush_audio_count, flush_video_count);
+ task_environment_.RunUntilIdle();
+ EXPECT_EQ(flush_audio_count, mock_remotee_->flush_audio_count());
+ EXPECT_EQ(flush_video_count, mock_remotee_->flush_video_count());
+
+ // StartPlayingFrom
+ const base::TimeDelta time = base::TimeDelta::FromSeconds(100);
+ EXPECT_CALL(*mock_renderer_, StartPlayingFrom(time)).Times(1);
+ mock_sender_->SendRpcStartPlayingFrom(time);
+ task_environment_.RunUntilIdle();
+}
+
+TEST_F(ReceiverTest, RendererClientInterface) {
+ EXPECT_CALL(*mock_sender_, AcquireRendererDone()).Times(1);
+ mock_sender_->SendRpcAcquireRenderer();
+ CreateReceiver();
+ mock_sender_->SendRpcInitialize();
+ InitializeReceiver();
+ task_environment_.RunUntilIdle();
+
+ // OnBufferingStateChange
+ EXPECT_CALL(*mock_sender_, OnBufferingStateChange(BUFFERING_HAVE_ENOUGH))
+ .Times(1);
+ receiver_->OnBufferingStateChange(BUFFERING_HAVE_ENOUGH,
+ BUFFERING_CHANGE_REASON_UNKNOWN);
+ task_environment_.RunUntilIdle();
+
+ // OnEnded
+ EXPECT_CALL(*mock_sender_, OnEnded()).Times(1);
+ receiver_->OnEnded();
+ task_environment_.RunUntilIdle();
+
+ // OnError
+ EXPECT_CALL(*mock_sender_, OnFatalError()).Times(1);
+ receiver_->OnError(PipelineStatus::AUDIO_RENDERER_ERROR);
+ task_environment_.RunUntilIdle();
+
+ // OnAudioConfigChange
+ const auto kNewAudioConfig = TestAudioConfig::Normal();
+ EXPECT_CALL(*mock_sender_,
+ OnAudioConfigChange(DecoderConfigEq(kNewAudioConfig)))
+ .Times(1);
+ receiver_->OnAudioConfigChange(kNewAudioConfig);
+ task_environment_.RunUntilIdle();
+
+ // OnVideoConfigChange
+ const auto kNewVideoConfig = TestVideoConfig::Normal();
+ EXPECT_CALL(*mock_sender_,
+ OnVideoConfigChange(DecoderConfigEq(kNewVideoConfig)))
+ .Times(1);
+ receiver_->OnVideoConfigChange(kNewVideoConfig);
+ task_environment_.RunUntilIdle();
+
+ // OnVideoNaturalSizeChange
+ const gfx::Size size(100, 200);
+ EXPECT_CALL(*mock_sender_, OnVideoNaturalSizeChange(size)).Times(1);
+ receiver_->OnVideoNaturalSizeChange(size);
+ task_environment_.RunUntilIdle();
+ EXPECT_EQ(size, mock_remotee_->changed_size());
+
+ // OnVideoOpacityChange
+ const bool opaque = true;
+ EXPECT_CALL(*mock_sender_, OnVideoOpacityChange(opaque)).Times(1);
+ receiver_->OnVideoOpacityChange(opaque);
+ task_environment_.RunUntilIdle();
+
+ // OnStatisticsUpdate
+ PipelineStatistics statistics;
+ statistics.audio_bytes_decoded = 100;
+ statistics.video_bytes_decoded = 200;
+ statistics.video_frames_decoded = 300;
+ statistics.video_frames_dropped = 400;
+ statistics.audio_memory_usage = 500;
+ statistics.video_memory_usage = 600;
+ EXPECT_CALL(*mock_sender_, OnStatisticsUpdate(statistics)).Times(1);
+ receiver_->OnStatisticsUpdate(statistics);
+ task_environment_.RunUntilIdle();
+
+ // OnWaiting
+ EXPECT_CALL(*mock_sender_, OnWaiting()).Times(1);
+ receiver_->OnWaiting(WaitingReason::kNoDecryptionKey);
+ task_environment_.RunUntilIdle();
+}
+
+} // namespace remoting
+} // namespace media
diff --git a/chromium/media/remoting/remoting_constants.h b/chromium/media/remoting/remoting_constants.h
new file mode 100644
index 00000000000..4dd35dd21f5
--- /dev/null
+++ b/chromium/media/remoting/remoting_constants.h
@@ -0,0 +1,18 @@
+// Copyright 2020 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MEDIA_REMOTING_REMOTING_CONSTANTS_H_
+#define MEDIA_REMOTING_REMOTING_CONSTANTS_H_
+
+namespace media {
+namespace remoting {
+
+// The src attribute for remoting media should use the URL with this scheme.
+// The URL format is "media-remoting:<id>", e.g. "media-remoting:test".
+constexpr char kRemotingScheme[] = "media-remoting";
+
+} // namespace remoting
+} // namespace media
+
+#endif // MEDIA_REMOTING_REMOTING_CONSTANTS_H_
diff --git a/chromium/media/remoting/remoting_renderer_factory.cc b/chromium/media/remoting/remoting_renderer_factory.cc
new file mode 100644
index 00000000000..ea3051e715b
--- /dev/null
+++ b/chromium/media/remoting/remoting_renderer_factory.cc
@@ -0,0 +1,122 @@
+// Copyright 2020 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "media/remoting/remoting_renderer_factory.h"
+
+#include "media/base/demuxer.h"
+#include "media/remoting/receiver.h"
+#include "media/remoting/receiver_controller.h"
+#include "media/remoting/stream_provider.h"
+
+namespace media {
+namespace remoting {
+
+RemotingRendererFactory::RemotingRendererFactory(
+ mojo::PendingRemote<mojom::Remotee> remotee,
+ std::unique_ptr<RendererFactory> renderer_factory,
+ const scoped_refptr<base::SingleThreadTaskRunner>& media_task_runner)
+ : receiver_controller_(ReceiverController::GetInstance()),
+ rpc_broker_(receiver_controller_->rpc_broker()),
+ renderer_handle_(rpc_broker_->GetUniqueHandle()),
+ waiting_for_remote_handle_receiver_(nullptr),
+ real_renderer_factory_(std::move(renderer_factory)),
+ media_task_runner_(media_task_runner) {
+ DVLOG(2) << __func__;
+ DCHECK(receiver_controller_);
+
+ // Register the callback to listen RPC_ACQUIRE_RENDERER message.
+ rpc_broker_->RegisterMessageReceiverCallback(
+ RpcBroker::kAcquireRendererHandle,
+ base::BindRepeating(&RemotingRendererFactory::OnAcquireRenderer,
+ weak_factory_.GetWeakPtr()));
+ receiver_controller_->Initialize(std::move(remotee));
+}
+
+RemotingRendererFactory::~RemotingRendererFactory() {
+ rpc_broker_->UnregisterMessageReceiverCallback(
+ RpcBroker::kAcquireRendererHandle);
+}
+
+std::unique_ptr<Renderer> RemotingRendererFactory::CreateRenderer(
+ const scoped_refptr<base::SingleThreadTaskRunner>& media_task_runner,
+ const scoped_refptr<base::TaskRunner>& worker_task_runner,
+ AudioRendererSink* audio_renderer_sink,
+ VideoRendererSink* video_renderer_sink,
+ RequestOverlayInfoCB request_overlay_info_cb,
+ const gfx::ColorSpace& target_color_space) {
+ DVLOG(2) << __func__;
+
+ auto receiver = std::make_unique<Receiver>(
+ renderer_handle_, remote_renderer_handle_, receiver_controller_,
+ media_task_runner,
+ real_renderer_factory_->CreateRenderer(
+ media_task_runner, worker_task_runner, audio_renderer_sink,
+ video_renderer_sink, request_overlay_info_cb, target_color_space),
+ base::BindOnce(&RemotingRendererFactory::OnAcquireRendererDone,
+ base::Unretained(this)));
+
+ // If we haven't received a RPC_ACQUIRE_RENDERER yet, keep a reference to
+ // |receiver|, and set its remote handle when we get the call to
+ // OnAcquireRenderer().
+ if (remote_renderer_handle_ == RpcBroker::kInvalidHandle)
+ waiting_for_remote_handle_receiver_ = receiver->GetWeakPtr();
+
+ return std::move(receiver);
+}
+
+void RemotingRendererFactory::OnReceivedRpc(
+ std::unique_ptr<pb::RpcMessage> message) {
+ DCHECK(message);
+ if (message->proc() == pb::RpcMessage::RPC_ACQUIRE_RENDERER)
+ OnAcquireRenderer(std::move(message));
+ else
+ VLOG(1) << __func__ << ": Unknow RPC message. proc=" << message->proc();
+}
+
+void RemotingRendererFactory::OnAcquireRenderer(
+ std::unique_ptr<pb::RpcMessage> message) {
+ DCHECK(message->has_integer_value());
+ DCHECK(message->integer_value() != RpcBroker::kInvalidHandle);
+
+ remote_renderer_handle_ = message->integer_value();
+
+ // If CreateRenderer() was called before we had a valid
+ // |remote_renderer_handle_|, set it on the already created Receiver.
+ if (waiting_for_remote_handle_receiver_) {
+ // |waiting_for_remote_handle_receiver_| is the WeakPtr of the Receiver
+ // instance and should be deref in the media thread.
+ media_task_runner_->PostTask(
+ FROM_HERE, base::BindOnce(&Receiver::SetRemoteHandle,
+ waiting_for_remote_handle_receiver_,
+ remote_renderer_handle_));
+ }
+}
+
+void RemotingRendererFactory::OnAcquireRendererDone(int receiver_rpc_handle) {
+ // RPC_ACQUIRE_RENDERER_DONE should be sent only once.
+ //
+ // WebMediaPlayerImpl might destroy and re-create the Receiver instance
+ // several times for saving resources. However, RPC_ACQUIRE_RENDERER_DONE
+ // shouldn't be sent multiple times whenever a Receiver instance is created.
+ if (is_acquire_renderer_done_sent_)
+ return;
+
+ DVLOG(3) << __func__
+ << ": Issues RPC_ACQUIRE_RENDERER_DONE RPC message. remote_handle="
+ << remote_renderer_handle_ << " rpc_handle=" << receiver_rpc_handle;
+ auto rpc = std::make_unique<pb::RpcMessage>();
+ rpc->set_handle(remote_renderer_handle_);
+ rpc->set_proc(pb::RpcMessage::RPC_ACQUIRE_RENDERER_DONE);
+ rpc->set_integer_value(receiver_rpc_handle);
+ rpc_broker_->SendMessageToRemote(std::move(rpc));
+
+ // Once RPC_ACQUIRE_RENDERER_DONE is sent, it implies there is no Receiver
+ // instance that is waiting the remote handle.
+ waiting_for_remote_handle_receiver_ = nullptr;
+
+ is_acquire_renderer_done_sent_ = true;
+}
+
+} // namespace remoting
+} // namespace media
diff --git a/chromium/media/remoting/remoting_renderer_factory.h b/chromium/media/remoting/remoting_renderer_factory.h
new file mode 100644
index 00000000000..cf74f57efcb
--- /dev/null
+++ b/chromium/media/remoting/remoting_renderer_factory.h
@@ -0,0 +1,72 @@
+// Copyright 2020 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MEDIA_REMOTING_REMOTING_RENDERER_FACTORY_H_
+#define MEDIA_REMOTING_REMOTING_RENDERER_FACTORY_H_
+
+#include "media/base/renderer_factory.h"
+#include "media/mojo/mojom/remoting.mojom.h"
+#include "media/remoting/rpc_broker.h"
+#include "mojo/public/cpp/bindings/pending_remote.h"
+
+namespace media {
+namespace remoting {
+
+class Receiver;
+class ReceiverController;
+
+class RemotingRendererFactory : public RendererFactory {
+ public:
+ RemotingRendererFactory(
+ mojo::PendingRemote<mojom::Remotee> remotee,
+ std::unique_ptr<RendererFactory> renderer_factory,
+ const scoped_refptr<base::SingleThreadTaskRunner>& media_task_runner);
+ ~RemotingRendererFactory() override;
+
+ // RendererFactory implementation
+ std::unique_ptr<Renderer> CreateRenderer(
+ const scoped_refptr<base::SingleThreadTaskRunner>& media_task_runner,
+ const scoped_refptr<base::TaskRunner>& worker_task_runner,
+ AudioRendererSink* audio_renderer_sink,
+ VideoRendererSink* video_renderer_sink,
+ RequestOverlayInfoCB request_overlay_info_cb,
+ const gfx::ColorSpace& target_color_space) override;
+
+ private:
+ // Callback function when RPC message is received.
+ void OnReceivedRpc(std::unique_ptr<pb::RpcMessage> message);
+ void OnAcquireRenderer(std::unique_ptr<pb::RpcMessage> message);
+ void OnAcquireRendererDone(int receiver_rpc_handle);
+
+ // Indicates whether RPC_ACQUIRE_RENDERER_DONE is sent or not.
+ bool is_acquire_renderer_done_sent_ = false;
+
+ ReceiverController* receiver_controller_;
+
+ RpcBroker* rpc_broker_; // Outlives this class.
+
+ // The RPC handle used by all Receiver instances created by |this|. Sent only
+ // once to the sender side, through RPC_ACQUIRE_RENDERER_DONE, regardless of
+ // how many times CreateRenderer() is called."
+ const int renderer_handle_ = RpcBroker::kInvalidHandle;
+
+ // The RPC handle of the CourierRenderer on the sender side. Will be received
+ // once, via an RPC_ACQUIRE_RENDERER message"
+ int remote_renderer_handle_ = RpcBroker::kInvalidHandle;
+
+ // Used to set remote handle if receiving RPC_ACQUIRE_RENDERER after
+ // CreateRenderer() is called.
+ base::WeakPtr<Receiver> waiting_for_remote_handle_receiver_;
+ std::unique_ptr<RendererFactory> real_renderer_factory_;
+
+ // Used to instantiate |receiver_|.
+ const scoped_refptr<base::SingleThreadTaskRunner> media_task_runner_;
+
+ base::WeakPtrFactory<RemotingRendererFactory> weak_factory_{this};
+};
+
+} // namespace remoting
+} // namespace media
+
+#endif // MEDIA_REMOTING_REMOTING_RENDERER_FACTORY_H_
diff --git a/chromium/media/remoting/rpc_broker.h b/chromium/media/remoting/rpc_broker.h
index 280d63fa112..912e3b61ad4 100644
--- a/chromium/media/remoting/rpc_broker.h
+++ b/chromium/media/remoting/rpc_broker.h
@@ -46,6 +46,7 @@ class RpcBroker {
// Get unique handle value (larger than 0) for RPC message handles.
int GetUniqueHandle();
+ // TODO(chkuo): Change the parameter to accept const ref of RpcMessage.
using ReceiveMessageCallback =
base::RepeatingCallback<void(std::unique_ptr<pb::RpcMessage>)>;
// Register a component to receive messages via the given
@@ -77,10 +78,11 @@ class RpcBroker {
// Predefined handle value for RPC messages related to initialization (before
// the receiver handle(s) are known).
- static constexpr int kAcquireHandle = 0;
+ static constexpr int kAcquireRendererHandle = 0;
+ static constexpr int kAcquireDemuxerHandle = 1;
// The first handle to return from GetUniqueHandle().
- static constexpr int kFirstHandle = 1;
+ static constexpr int kFirstHandle = 100;
private:
// Checks that all method calls occur on the same thread.
diff --git a/chromium/media/remoting/stream_provider.cc b/chromium/media/remoting/stream_provider.cc
index 7ff69c808c5..198cf6c3f14 100644
--- a/chromium/media/remoting/stream_provider.cc
+++ b/chromium/media/remoting/stream_provider.cc
@@ -3,16 +3,23 @@
// found in the LICENSE file.
#include "media/remoting/stream_provider.h"
+#include <vector>
#include "base/bind.h"
#include "base/callback.h"
#include "base/callback_helpers.h"
#include "base/containers/circular_deque.h"
#include "base/logging.h"
+#include "base/single_thread_task_runner.h"
+#include "base/threading/thread_task_runner_handle.h"
+#include "media/base/bind_to_current_loop.h"
#include "media/base/decoder_buffer.h"
#include "media/base/video_transformation.h"
+#include "media/mojo/common/mojo_decoder_buffer_converter.h"
#include "media/remoting/proto_enum_utils.h"
#include "media/remoting/proto_utils.h"
+#include "media/remoting/receiver_controller.h"
+#include "media/remoting/rpc_broker.h"
namespace media {
namespace remoting {
@@ -22,131 +29,140 @@ namespace {
constexpr int kNumFramesInEachReadUntil = 10;
}
-// An implementation of media::DemuxerStream on Media Remoting receiver.
-// Receives data from mojo data pipe, and returns one frame or/and status when
-// Read() is called.
-class MediaStream final : public DemuxerStream {
- public:
- MediaStream(RpcBroker* rpc_broker,
- Type type,
- int remote_handle,
- base::OnceClosure error_callback);
- ~MediaStream() override;
-
- // DemuxerStream implementation.
- void Read(ReadCB read_cb) override;
- bool IsReadPending() const override;
- AudioDecoderConfig audio_decoder_config() override;
- VideoDecoderConfig video_decoder_config() override;
- DemuxerStream::Type type() const override;
- Liveness liveness() const override;
- bool SupportsConfigChanges() override;
-
- void Initialize(base::OnceClosure init_done_cb);
- void FlushUntil(int count);
- void AppendBuffer(scoped_refptr<DecoderBuffer> buffer);
-
- private:
- // RPC messages handlers.
- void OnReceivedRpc(std::unique_ptr<pb::RpcMessage> message);
- void OnInitializeCallback(std::unique_ptr<pb::RpcMessage> message);
- void OnReadUntilCallback(std::unique_ptr<pb::RpcMessage> message);
-
- // Issues the ReadUntil RPC message when read is pending and buffer is empty.
- void SendReadUntil();
-
- // Run and reset the read callback.
- void CompleteRead(DemuxerStream::Status status);
-
- // Update the audio/video decoder config When config changes in the mid
- // stream, the new config will be stored in
- // |next_audio/video_decoder_config_|. Old config will be droped when all
- // associated frames are consumed.
- void UpdateConfig(const pb::AudioDecoderConfig* audio_message,
- const pb::VideoDecoderConfig* video_message);
-
- // Called when any error occurs.
- void OnError(const std::string& error);
-
- RpcBroker* const rpc_broker_; // Outlives this class.
- const Type type_;
- const int remote_handle_;
- const int rpc_handle_;
-
- // Set when Initialize() is called, and will be run after initialization is
- // done.
- base::OnceClosure init_done_callback_;
-
- // The read until count in the last ReadUntil RPC message.
- int last_read_until_count_ = 0;
-
- // Indicates whether Audio/VideoDecoderConfig changed and the frames with the
- // old config are not yet consumed. The new config is stored in the end of
- // |audio/video_decoder_config_|;
- bool config_changed_ = false;
-
- // Indicates whether a ReadUntil RPC message was sent without receiving the
- // ReadUntilCallback message yet.
- bool read_until_sent_ = false;
-
- // Set when Read() is called. Run only once when read completes.
- ReadCB read_complete_callback_;
-
- base::OnceClosure error_callback_; // Called when first error occurs.
-
- base::circular_deque<scoped_refptr<DecoderBuffer>> buffers_;
-
- // Current audio/video config.
- AudioDecoderConfig audio_decoder_config_;
- VideoDecoderConfig video_decoder_config_;
-
- // Stores the new auido/video config when config changes.
- AudioDecoderConfig next_audio_decoder_config_;
- VideoDecoderConfig next_video_decoder_config_;
-
- base::WeakPtrFactory<MediaStream> weak_factory_{this};
-
- DISALLOW_COPY_AND_ASSIGN(MediaStream);
-};
-
-MediaStream::MediaStream(RpcBroker* rpc_broker,
- Type type,
- int remote_handle,
- base::OnceClosure error_callback)
- : rpc_broker_(rpc_broker),
+// static
+void StreamProvider::MediaStream::CreateOnMainThread(
+ RpcBroker* rpc_broker,
+ Type type,
+ int32_t handle,
+ const scoped_refptr<base::SingleThreadTaskRunner>& media_task_runner,
+ base::OnceCallback<void(MediaStream::UniquePtr)> callback) {
+ MediaStream::UniquePtr stream(
+ new MediaStream(rpc_broker, type, handle, media_task_runner),
+ &DestructionHelper);
+ std::move(callback).Run(std::move(stream));
+}
+
+// static
+void StreamProvider::MediaStream::DestructionHelper(MediaStream* stream) {
+ stream->Destroy();
+}
+
+StreamProvider::MediaStream::MediaStream(
+ RpcBroker* rpc_broker,
+ Type type,
+ int remote_handle,
+ const scoped_refptr<base::SingleThreadTaskRunner>& media_task_runner)
+ : main_task_runner_(base::ThreadTaskRunnerHandle::Get()),
+ media_task_runner_(media_task_runner),
+ rpc_broker_(rpc_broker),
type_(type),
remote_handle_(remote_handle),
- rpc_handle_(rpc_broker_->GetUniqueHandle()),
- error_callback_(std::move(error_callback)) {
+ rpc_handle_(rpc_broker_->GetUniqueHandle()) {
DCHECK(remote_handle_ != RpcBroker::kInvalidHandle);
- rpc_broker_->RegisterMessageReceiverCallback(
- rpc_handle_, base::BindRepeating(&MediaStream::OnReceivedRpc,
- weak_factory_.GetWeakPtr()));
+
+ media_weak_this_ = media_weak_factory_.GetWeakPtr();
+
+ const RpcBroker::ReceiveMessageCallback receive_callback =
+ BindToLoop(media_task_runner_,
+ BindRepeating(&MediaStream::OnReceivedRpc, media_weak_this_));
+ rpc_broker_->RegisterMessageReceiverCallback(rpc_handle_, receive_callback);
}
-MediaStream::~MediaStream() {
+StreamProvider::MediaStream::~MediaStream() {
+ DCHECK(main_task_runner_->BelongsToCurrentThread());
rpc_broker_->UnregisterMessageReceiverCallback(rpc_handle_);
}
-void MediaStream::Initialize(base::OnceClosure init_done_cb) {
- DCHECK(init_done_cb);
- if (!init_done_callback_.is_null()) {
+void StreamProvider::MediaStream::Destroy() {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
+
+ // Invalid weak pointers to prevent |this| from receiving RPC calls on the
+ // media thread.
+ media_weak_factory_.InvalidateWeakPtrs();
+
+ // Unbind all mojo pipes and bindings.
+ receiver_.reset();
+ decoder_buffer_reader_.reset();
+
+ // After invalidating all weak ptrs of |media_weak_factory_|, MediaStream
+ // won't be access anymore, so using |this| here is safe.
+ main_task_runner_->DeleteSoon(FROM_HERE, this);
+}
+
+void StreamProvider::MediaStream::SendRpcMessageOnMainThread(
+ std::unique_ptr<pb::RpcMessage> message) {
+ // |rpc_broker_| is owned by |receiver_controller_| which is a singleton per
+ // process, so it's safe to use Unretained() here.
+ main_task_runner_->PostTask(
+ FROM_HERE,
+ base::BindOnce(&RpcBroker::SendMessageToRemote,
+ base::Unretained(rpc_broker_), std::move(message)));
+}
+
+void StreamProvider::MediaStream::Initialize(
+ base::OnceClosure init_done_callback) {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
+ DCHECK(init_done_callback);
+
+ if (init_done_callback_) {
OnError("Duplicate initialization");
return;
}
- init_done_callback_ = std::move(init_done_cb);
- DVLOG(3) << __func__ << "Issues RpcMessage::RPC_DS_INITIALIZE with "
- << "remote_handle=" << remote_handle_
- << " rpc_handle=" << rpc_handle_;
- std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage());
+ init_done_callback_ = std::move(init_done_callback);
+
+ auto rpc = std::make_unique<pb::RpcMessage>();
rpc->set_handle(remote_handle_);
rpc->set_proc(pb::RpcMessage::RPC_DS_INITIALIZE);
rpc->set_integer_value(rpc_handle_);
- rpc_broker_->SendMessageToRemote(std::move(rpc));
+ SendRpcMessageOnMainThread(std::move(rpc));
+}
+
+void StreamProvider::MediaStream::InitializeDataPipe(
+ mojo::ScopedDataPipeConsumerHandle data_pipe) {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
+
+ decoder_buffer_reader_ =
+ std::make_unique<MojoDecoderBufferReader>(std::move(data_pipe));
+ CompleteInitialize();
+}
+
+void StreamProvider::MediaStream::ReceiveFrame(uint32_t count,
+ mojom::DecoderBufferPtr buffer) {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
+ DCHECK(decoder_buffer_reader_);
+
+ auto callback = BindToCurrentLoop(
+ base::BindOnce(&MediaStream::AppendBuffer, media_weak_this_, count));
+ decoder_buffer_reader_->ReadDecoderBuffer(std::move(buffer),
+ std::move(callback));
+}
+
+void StreamProvider::MediaStream::FlushUntil(uint32_t count) {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
+
+ if (count < current_frame_count_)
+ return;
+
+ uint32_t buffers_to_erase = count - current_frame_count_;
+
+ if (buffers_to_erase > buffers_.size()) {
+ buffers_.clear();
+ } else {
+ buffers_.erase(buffers_.begin(), buffers_.begin() + buffers_to_erase);
+ }
+
+ current_frame_count_ = count;
+
+ if (!read_complete_callback_.is_null())
+ CompleteRead(DemuxerStream::kAborted);
+
+ read_until_sent_ = false;
}
-void MediaStream::OnReceivedRpc(std::unique_ptr<pb::RpcMessage> message) {
+void StreamProvider::MediaStream::OnReceivedRpc(
+ std::unique_ptr<pb::RpcMessage> message) {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
DCHECK(message->handle() == rpc_handle_);
switch (message->proc()) {
@@ -161,24 +177,21 @@ void MediaStream::OnReceivedRpc(std::unique_ptr<pb::RpcMessage> message) {
}
}
-void MediaStream::OnInitializeCallback(
+void StreamProvider::MediaStream::OnInitializeCallback(
std::unique_ptr<pb::RpcMessage> message) {
- DVLOG(3) << __func__ << "Receives RPC_DS_INITIALIZE_CALLBACK message.";
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
const pb::DemuxerStreamInitializeCallback callback_message =
message->demuxerstream_initializecb_rpc();
if (callback_message.type() != type_) {
OnError("Wrong type");
return;
}
+
if ((type_ == DemuxerStream::AUDIO &&
audio_decoder_config_.IsValidConfig()) ||
(type_ == DemuxerStream::VIDEO &&
video_decoder_config_.IsValidConfig())) {
- OnError("Duplicate Iniitialize");
- return;
- }
- if (init_done_callback_.is_null()) {
- OnError("Iniitialize callback missing");
+ OnError("Duplicate initialization");
return;
}
@@ -186,21 +199,41 @@ void MediaStream::OnInitializeCallback(
callback_message.has_audio_decoder_config()) {
const pb::AudioDecoderConfig audio_message =
callback_message.audio_decoder_config();
- UpdateConfig(&audio_message, nullptr);
+ UpdateAudioConfig(audio_message);
} else if (type_ == DemuxerStream::VIDEO &&
callback_message.has_video_decoder_config()) {
const pb::VideoDecoderConfig video_message =
callback_message.video_decoder_config();
- UpdateConfig(nullptr, &video_message);
+ UpdateVideoConfig(video_message);
} else {
- OnError("config missing");
+ OnError("Config missing");
return;
}
+
+ rpc_initialized_ = true;
+ CompleteInitialize();
+}
+
+void StreamProvider::MediaStream::CompleteInitialize() {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
+
+ // Initialization finished when received RPC_DS_INITIALIZE_CALLBACK and
+ // |decoder_buffer_reader_| is created.
+ if (!rpc_initialized_ || !decoder_buffer_reader_)
+ return;
+
+ if (!init_done_callback_) {
+ OnError("Initialize callback missing");
+ return;
+ }
+
std::move(init_done_callback_).Run();
}
-void MediaStream::OnReadUntilCallback(std::unique_ptr<pb::RpcMessage> message) {
- DVLOG(3) << __func__ << ": Receives RPC_DS_READUNTIL_CALLBACK message.";
+void StreamProvider::MediaStream::OnReadUntilCallback(
+ std::unique_ptr<pb::RpcMessage> message) {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
+
if (!read_until_sent_) {
OnError("Unexpected ReadUntilCallback");
return;
@@ -208,98 +241,90 @@ void MediaStream::OnReadUntilCallback(std::unique_ptr<pb::RpcMessage> message) {
read_until_sent_ = false;
const pb::DemuxerStreamReadUntilCallback callback_message =
message->demuxerstream_readuntilcb_rpc();
- last_read_until_count_ = callback_message.count();
+ total_received_frame_count_ = callback_message.count();
+
if (ToDemuxerStreamStatus(callback_message.status()) == kConfigChanged) {
config_changed_ = true;
+
if (callback_message.has_audio_decoder_config()) {
const pb::AudioDecoderConfig audio_message =
callback_message.audio_decoder_config();
- UpdateConfig(&audio_message, nullptr);
+ UpdateAudioConfig(audio_message);
}
+
if (callback_message.has_video_decoder_config()) {
const pb::VideoDecoderConfig video_message =
callback_message.video_decoder_config();
- UpdateConfig(nullptr, &video_message);
+ UpdateVideoConfig(video_message);
}
+
if (buffers_.empty() && !read_complete_callback_.is_null())
CompleteRead(DemuxerStream::kConfigChanged);
+
return;
}
+
if (buffers_.empty() && !read_complete_callback_.is_null())
SendReadUntil();
}
-void MediaStream::UpdateConfig(const pb::AudioDecoderConfig* audio_message,
- const pb::VideoDecoderConfig* video_message) {
- if (type_ == AUDIO) {
- DCHECK(audio_message && !video_message);
- AudioDecoderConfig audio_config;
- ConvertProtoToAudioDecoderConfig(*audio_message, &audio_config);
- if (!audio_config.IsValidConfig()) {
- OnError("Invalid audio config");
- return;
- }
- if (config_changed_) {
- DCHECK(audio_decoder_config_.IsValidConfig());
- DCHECK(!next_audio_decoder_config_.IsValidConfig());
- next_audio_decoder_config_ = audio_config;
- } else {
- DCHECK(!audio_decoder_config_.IsValidConfig());
- audio_decoder_config_ = audio_config;
- }
- } else if (type_ == VIDEO) {
- DCHECK(video_message && !audio_message);
- VideoDecoderConfig video_config;
- ConvertProtoToVideoDecoderConfig(*video_message, &video_config);
- if (!video_config.IsValidConfig()) {
- OnError("Invalid video config");
- return;
- }
- if (config_changed_) {
- DCHECK(video_decoder_config_.IsValidConfig());
- DCHECK(!next_video_decoder_config_.IsValidConfig());
- next_video_decoder_config_ = video_config;
- } else {
- DCHECK(!video_decoder_config_.IsValidConfig());
- video_decoder_config_ = video_config;
- }
+void StreamProvider::MediaStream::UpdateAudioConfig(
+ const pb::AudioDecoderConfig& audio_message) {
+ DCHECK(type_ == AUDIO);
+ AudioDecoderConfig audio_config;
+ ConvertProtoToAudioDecoderConfig(audio_message, &audio_config);
+ if (!audio_config.IsValidConfig()) {
+ OnError("Invalid audio config");
+ return;
+ }
+ if (config_changed_) {
+ DCHECK(audio_decoder_config_.IsValidConfig());
+ DCHECK(!next_audio_decoder_config_.IsValidConfig());
+ next_audio_decoder_config_ = audio_config;
} else {
- NOTREACHED() << ": Only supports video or audio stream.";
+ DCHECK(!audio_decoder_config_.IsValidConfig());
+ audio_decoder_config_ = audio_config;
}
}
-void MediaStream::SendReadUntil() {
+void StreamProvider::MediaStream::UpdateVideoConfig(
+ const pb::VideoDecoderConfig& video_message) {
+ DCHECK(type_ == VIDEO);
+ VideoDecoderConfig video_config;
+ ConvertProtoToVideoDecoderConfig(video_message, &video_config);
+ if (!video_config.IsValidConfig()) {
+ OnError("Invalid video config");
+ return;
+ }
+ if (config_changed_) {
+ DCHECK(video_decoder_config_.IsValidConfig());
+ DCHECK(!next_video_decoder_config_.IsValidConfig());
+ next_video_decoder_config_ = video_config;
+ } else {
+ DCHECK(!video_decoder_config_.IsValidConfig());
+ video_decoder_config_ = video_config;
+ }
+}
+
+void StreamProvider::MediaStream::SendReadUntil() {
if (read_until_sent_)
return;
- DVLOG(3) << "Issues RPC_DS_READUNTIL RPC message to remote_handle_="
- << remote_handle_ << " with callback handle=" << rpc_handle_
- << " count=" << last_read_until_count_;
std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage());
rpc->set_handle(remote_handle_);
rpc->set_proc(pb::RpcMessage::RPC_DS_READUNTIL);
auto* message = rpc->mutable_demuxerstream_readuntil_rpc();
- last_read_until_count_ += kNumFramesInEachReadUntil;
- message->set_count(last_read_until_count_);
+ message->set_count(total_received_frame_count_ + kNumFramesInEachReadUntil);
message->set_callback_handle(rpc_handle_);
- rpc_broker_->SendMessageToRemote(std::move(rpc));
+ SendRpcMessageOnMainThread(std::move(rpc));
read_until_sent_ = true;
}
-void MediaStream::FlushUntil(int count) {
- while (!buffers_.empty()) {
- buffers_.pop_front();
- }
-
- last_read_until_count_ = count;
- if (!read_complete_callback_.is_null())
- CompleteRead(DemuxerStream::kAborted);
- read_until_sent_ = false;
-}
-
-void MediaStream::Read(ReadCB read_cb) {
+void StreamProvider::MediaStream::Read(ReadCB read_cb) {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
DCHECK(read_complete_callback_.is_null());
DCHECK(read_cb);
+
read_complete_callback_ = std::move(read_cb);
if (buffers_.empty() && config_changed_) {
CompleteRead(DemuxerStream::kConfigChanged);
@@ -315,26 +340,21 @@ void MediaStream::Read(ReadCB read_cb) {
CompleteRead(DemuxerStream::kOk);
}
-bool MediaStream::IsReadPending() const {
+bool StreamProvider::MediaStream::IsReadPending() const {
return !read_complete_callback_.is_null();
}
-void MediaStream::CompleteRead(DemuxerStream::Status status) {
- DVLOG(3) << __func__ << ": " << status;
+void StreamProvider::MediaStream::CompleteRead(DemuxerStream::Status status) {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
+
switch (status) {
case DemuxerStream::kConfigChanged:
if (type_ == AUDIO) {
DCHECK(next_audio_decoder_config_.IsValidConfig());
audio_decoder_config_ = next_audio_decoder_config_;
-#if DCHECK_IS_ON()
- next_audio_decoder_config_ = AudioDecoderConfig();
-#endif // DCHECK_IS_ON()
} else {
DCHECK(next_video_decoder_config_.IsValidConfig());
video_decoder_config_ = next_video_decoder_config_;
-#if DCHECK_IS_ON()
- next_video_decoder_config_ = VideoDecoderConfig();
-#endif // DCHECK_IS_ON()
}
config_changed_ = false;
std::move(read_complete_callback_).Run(status, nullptr);
@@ -344,111 +364,263 @@ void MediaStream::CompleteRead(DemuxerStream::Status status) {
std::move(read_complete_callback_).Run(status, nullptr);
return;
case DemuxerStream::kOk:
+ DCHECK(read_complete_callback_);
DCHECK(!buffers_.empty());
+ DCHECK_LT(current_frame_count_, buffered_frame_count_);
scoped_refptr<DecoderBuffer> frame_data = buffers_.front();
buffers_.pop_front();
+ ++current_frame_count_;
std::move(read_complete_callback_).Run(status, frame_data);
return;
}
}
-AudioDecoderConfig MediaStream::audio_decoder_config() {
- DVLOG(3) << __func__;
+AudioDecoderConfig StreamProvider::MediaStream::audio_decoder_config() {
DCHECK(type_ == DemuxerStream::AUDIO);
return audio_decoder_config_;
}
-VideoDecoderConfig MediaStream::video_decoder_config() {
- DVLOG(3) << __func__;
+VideoDecoderConfig StreamProvider::MediaStream::video_decoder_config() {
DCHECK(type_ == DemuxerStream::VIDEO);
return video_decoder_config_;
}
-DemuxerStream::Type MediaStream::type() const {
+DemuxerStream::Type StreamProvider::MediaStream::type() const {
return type_;
}
-DemuxerStream::Liveness MediaStream::liveness() const {
+DemuxerStream::Liveness StreamProvider::MediaStream::liveness() const {
return DemuxerStream::LIVENESS_LIVE;
}
-bool MediaStream::SupportsConfigChanges() {
+bool StreamProvider::MediaStream::SupportsConfigChanges() {
return true;
}
-void MediaStream::AppendBuffer(scoped_refptr<DecoderBuffer> buffer) {
- DVLOG(3) << __func__;
+void StreamProvider::MediaStream::AppendBuffer(
+ uint32_t count,
+ scoped_refptr<DecoderBuffer> buffer) {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
+
+ // Drop flushed frame.
+ if (count < current_frame_count_)
+ return;
+
+ // Continuity check.
+ DCHECK(buffers_.empty() || buffered_frame_count_ == count);
+
buffers_.push_back(buffer);
+ buffered_frame_count_ = count + 1;
+
if (!read_complete_callback_.is_null())
CompleteRead(DemuxerStream::kOk);
}
-void MediaStream::OnError(const std::string& error) {
- VLOG(1) << __func__ << ": " << error;
- if (error_callback_.is_null())
- return;
- std::move(error_callback_).Run();
+void StreamProvider::MediaStream::OnError(const std::string& error) {
+ auto rpc = std::make_unique<pb::RpcMessage>();
+ rpc->set_handle(remote_handle_);
+ rpc->set_proc(pb::RpcMessage::RPC_DS_ONERROR);
+ SendRpcMessageOnMainThread(std::move(rpc));
}
-StreamProvider::StreamProvider(RpcBroker* rpc_broker,
- base::OnceClosure error_callback)
- : rpc_broker_(rpc_broker), error_callback_(std::move(error_callback)) {}
+StreamProvider::StreamProvider(
+ ReceiverController* receiver_controller,
+ const scoped_refptr<base::SingleThreadTaskRunner>& media_task_runner)
+ : main_task_runner_(base::ThreadTaskRunnerHandle::Get()),
+ media_task_runner_(media_task_runner),
+ receiver_controller_(receiver_controller),
+ rpc_broker_(receiver_controller_->rpc_broker()) {
+ DCHECK(receiver_controller_);
+ DCHECK(rpc_broker_);
+
+ media_weak_this_ = media_weak_factory_.GetWeakPtr();
+
+ auto callback = BindToLoop(
+ media_task_runner_,
+ base::BindRepeating(&StreamProvider::OnReceivedRpc, media_weak_this_));
+ rpc_broker_->RegisterMessageReceiverCallback(RpcBroker::kAcquireDemuxerHandle,
+ callback);
+}
-StreamProvider::~StreamProvider() = default;
+StreamProvider::~StreamProvider() {
+ DCHECK(main_task_runner_->BelongsToCurrentThread());
+ rpc_broker_->UnregisterMessageReceiverCallback(
+ RpcBroker::kAcquireDemuxerHandle);
+}
-void StreamProvider::Initialize(int remote_audio_handle,
- int remote_video_handle,
- base::OnceClosure callback) {
- DVLOG(3) << __func__ << ": remote_audio_handle=" << remote_audio_handle
- << " remote_video_handle=" << remote_video_handle;
- if (!init_done_callback_.is_null()) {
- OnError("Duplicate initialization.");
- return;
- }
- if (remote_audio_handle == RpcBroker::kInvalidHandle &&
- remote_video_handle == RpcBroker::kInvalidHandle) {
- OnError("Invalid handle.");
- return;
+std::string StreamProvider::GetDisplayName() const {
+ return "media::remoting::StreamProvider";
+}
+
+void StreamProvider::Initialize(DemuxerHost* host,
+ PipelineStatusCallback status_cb) {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
+ init_done_callback_ = std::move(status_cb);
+ CompleteInitialize();
+}
+
+void StreamProvider::AbortPendingReads() {}
+
+void StreamProvider::StartWaitingForSeek(base::TimeDelta seek_time) {}
+
+void StreamProvider::CancelPendingSeek(base::TimeDelta seek_time) {}
+
+void StreamProvider::Seek(base::TimeDelta time,
+ PipelineStatusCallback seek_cb) {
+ media_task_runner_->PostTask(
+ FROM_HERE,
+ base::BindOnce(std::move(seek_cb), PipelineStatus::PIPELINE_OK));
+}
+
+void StreamProvider::Stop() {}
+
+base::TimeDelta StreamProvider::GetStartTime() const {
+ return base::TimeDelta();
+}
+
+base::Time StreamProvider::GetTimelineOffset() const {
+ return base::Time();
+}
+
+int64_t StreamProvider::GetMemoryUsage() const {
+ return 0;
+}
+
+base::Optional<container_names::MediaContainerName>
+StreamProvider::GetContainerForMetrics() const {
+ return base::Optional<container_names::MediaContainerName>();
+}
+
+void StreamProvider::OnEnabledAudioTracksChanged(
+ const std::vector<MediaTrack::Id>& track_ids,
+ base::TimeDelta curr_time,
+ TrackChangeCB change_completed_cb) {
+ std::vector<DemuxerStream*> streams;
+ std::move(change_completed_cb).Run(DemuxerStream::AUDIO, streams);
+ DVLOG(1) << "Track changes are not supported.";
+}
+
+void StreamProvider::OnSelectedVideoTrackChanged(
+ const std::vector<MediaTrack::Id>& track_ids,
+ base::TimeDelta curr_time,
+ TrackChangeCB change_completed_cb) {
+ std::vector<DemuxerStream*> streams;
+ std::move(change_completed_cb).Run(DemuxerStream::VIDEO, streams);
+ DVLOG(1) << "Track changes are not supported.";
+}
+
+void StreamProvider::Destroy() {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
+
+ if (init_done_callback_)
+ std::move(init_done_callback_).Run(PIPELINE_ERROR_ABORT);
+
+ // Invalid weak pointers to prevent |this| from receiving RPC calls on the
+ // media thread.
+ media_weak_factory_.InvalidateWeakPtrs();
+
+ audio_stream_.reset();
+ video_stream_.reset();
+
+ // After invalidating all weak ptrs of |media_weak_factory_|, StreamProvider
+ // won't be access anymore, so using |this| here is safe.
+ main_task_runner_->DeleteSoon(FROM_HERE, this);
+}
+
+void StreamProvider::OnReceivedRpc(std::unique_ptr<pb::RpcMessage> message) {
+ switch (message->proc()) {
+ case pb::RpcMessage::RPC_ACQUIRE_DEMUXER:
+ OnAcquireDemuxer(std::move(message));
+ break;
+ default:
+ VLOG(3) << __func__ << "Unknown RPC message.";
}
+}
- init_done_callback_ = std::move(callback);
- if (remote_audio_handle != RpcBroker::kInvalidHandle) {
- audio_stream_.reset(new MediaStream(
- rpc_broker_, DemuxerStream::AUDIO, remote_audio_handle,
- base::BindOnce(&StreamProvider::OnError, weak_factory_.GetWeakPtr(),
- "Media stream error")));
- audio_stream_->Initialize(base::BindOnce(
- &StreamProvider::AudioStreamInitialized, weak_factory_.GetWeakPtr()));
+void StreamProvider::OnAcquireDemuxer(std::unique_ptr<pb::RpcMessage> message) {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
+ DCHECK(message->has_acquire_demuxer_rpc());
+
+ int32_t audio_demuxer_handle =
+ message->acquire_demuxer_rpc().audio_demuxer_handle();
+ int32_t video_demuxer_handle =
+ message->acquire_demuxer_rpc().video_demuxer_handle();
+ has_audio_ = audio_demuxer_handle != RpcBroker::kInvalidHandle;
+ has_video_ = video_demuxer_handle != RpcBroker::kInvalidHandle;
+
+ DCHECK(has_audio_ || has_video_);
+
+ if (has_audio_) {
+ auto callback = BindToCurrentLoop(base::BindOnce(
+ &StreamProvider::OnAudioStreamCreated, media_weak_this_));
+ main_task_runner_->PostTask(
+ FROM_HERE, base::BindOnce(&MediaStream::CreateOnMainThread, rpc_broker_,
+ DemuxerStream::AUDIO, audio_demuxer_handle,
+ media_task_runner_, std::move(callback)));
}
- if (remote_video_handle != RpcBroker::kInvalidHandle) {
- video_stream_.reset(new MediaStream(
- rpc_broker_, DemuxerStream::VIDEO, remote_video_handle,
- base::BindOnce(&StreamProvider::OnError, weak_factory_.GetWeakPtr(),
- "Media stream error")));
- video_stream_->Initialize(base::BindOnce(
- &StreamProvider::VideoStreamInitialized, weak_factory_.GetWeakPtr()));
+
+ if (has_video_) {
+ auto callback = BindToCurrentLoop(base::BindOnce(
+ &StreamProvider::OnVideoStreamCreated, media_weak_this_));
+ main_task_runner_->PostTask(
+ FROM_HERE, base::BindOnce(&MediaStream::CreateOnMainThread, rpc_broker_,
+ DemuxerStream::VIDEO, video_demuxer_handle,
+ media_task_runner_, std::move(callback)));
}
}
-void StreamProvider::OnError(const std::string& error) {
- VLOG(1) << __func__ << ": " << error;
- if (error_callback_.is_null())
+void StreamProvider::OnAudioStreamCreated(MediaStream::UniquePtr stream) {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
+ audio_stream_ = std::move(stream);
+ audio_stream_->Initialize(base::BindOnce(
+ &StreamProvider::OnAudioStreamInitialized, media_weak_this_));
+ InitializeDataPipe();
+}
+
+void StreamProvider::OnVideoStreamCreated(MediaStream::UniquePtr stream) {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
+ video_stream_ = std::move(stream);
+ video_stream_->Initialize(base::BindOnce(
+ &StreamProvider::OnVideoStreamInitialized, media_weak_this_));
+ InitializeDataPipe();
+}
+
+void StreamProvider::InitializeDataPipe() {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
+
+ if ((has_audio_ && !audio_stream_) || (has_video_ && !video_stream_))
return;
- std::move(error_callback_).Run();
+
+ receiver_controller_->StartDataStreams(
+ has_audio_ ? audio_stream_->BindNewPipeAndPassRemote()
+ : mojo::NullRemote(),
+ has_video_ ? video_stream_->BindNewPipeAndPassRemote()
+ : mojo::NullRemote());
}
-void StreamProvider::AudioStreamInitialized() {
- DCHECK(!init_done_callback_.is_null());
+void StreamProvider::OnAudioStreamInitialized() {
audio_stream_initialized_ = true;
- if (video_stream_initialized_ || !video_stream_)
- std::move(init_done_callback_).Run();
+ CompleteInitialize();
}
-void StreamProvider::VideoStreamInitialized() {
- DCHECK(!init_done_callback_.is_null());
+void StreamProvider::OnVideoStreamInitialized() {
video_stream_initialized_ = true;
- if (audio_stream_initialized_ || !audio_stream_)
- std::move(init_done_callback_).Run();
+ CompleteInitialize();
+}
+
+void StreamProvider::CompleteInitialize() {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
+
+ // Haven't receive RpcAcquireRenderer message
+ if (!has_audio_ && !has_video_)
+ return;
+
+ if ((has_audio_ && !audio_stream_initialized_) ||
+ (has_video_ && !video_stream_initialized_) || !init_done_callback_)
+ return;
+
+ // |init_done_callback_| should be called on |media_task_runner_|.
+ std::move(init_done_callback_).Run(PipelineStatus::PIPELINE_OK);
}
std::vector<DemuxerStream*> StreamProvider::GetAllStreams() {
@@ -460,25 +632,14 @@ std::vector<DemuxerStream*> StreamProvider::GetAllStreams() {
return streams;
}
-void StreamProvider::AppendBuffer(DemuxerStream::Type type,
- scoped_refptr<DecoderBuffer> buffer) {
- if (type == DemuxerStream::AUDIO)
- audio_stream_->AppendBuffer(buffer);
- else if (type == DemuxerStream::VIDEO)
- video_stream_->AppendBuffer(buffer);
- else
- NOTREACHED() << ": Only supports video or audio stream.";
-}
+} // namespace remoting
+} // namespace media
+
+namespace std {
-void StreamProvider::FlushUntil(DemuxerStream::Type type, int count) {
- DVLOG(3) << __func__ << ": type=" << type << " count=" << count;
- if (type == DemuxerStream::AUDIO)
- audio_stream_->FlushUntil(count);
- else if (type == DemuxerStream::VIDEO)
- video_stream_->FlushUntil(count);
- else
- NOTREACHED() << ": Only supports video or audio stream.";
+void default_delete<media::remoting::StreamProvider>::operator()(
+ media::remoting::StreamProvider* ptr) const {
+ ptr->Destroy();
}
-} // namespace remoting
-} // namespace media
+} // namespace std
diff --git a/chromium/media/remoting/stream_provider.h b/chromium/media/remoting/stream_provider.h
index f6c7802008b..95d2c214279 100644
--- a/chromium/media/remoting/stream_provider.h
+++ b/chromium/media/remoting/stream_provider.h
@@ -5,62 +5,280 @@
#ifndef MEDIA_REMOTING_STREAM_PROVIDER_H_
#define MEDIA_REMOTING_STREAM_PROVIDER_H_
+#include "base/callback_forward.h"
+#include "base/containers/circular_deque.h"
+#include "base/memory/scoped_refptr.h"
#include "base/memory/weak_ptr.h"
+#include "base/sequenced_task_runner_helpers.h"
+#include "base/single_thread_task_runner.h"
#include "media/base/audio_decoder_config.h"
+#include "media/base/demuxer.h"
#include "media/base/demuxer_stream.h"
-#include "media/base/media_resource.h"
#include "media/base/video_decoder_config.h"
-#include "media/remoting/rpc_broker.h"
+#include "media/mojo/mojom/remoting.mojom.h"
+#include "media/remoting/media_remoting_rpc.pb.h"
+#include "mojo/public/cpp/bindings/receiver.h"
+#include "mojo/public/cpp/bindings/remote.h"
+
+namespace base {
+class SingleThreadTaskRunner;
+} // namespace base
namespace media {
+
+class MojoDecoderBufferReader;
+
namespace remoting {
-class MediaStream;
+class ReceiverController;
+class RpcBroker;
// The media stream provider for Media Remoting receiver.
-class StreamProvider final : public MediaResource {
+class StreamProvider final : public Demuxer {
public:
- StreamProvider(RpcBroker* rpc_broker, base::OnceClosure error_callback);
+ StreamProvider(
+ ReceiverController* receiver_controller,
+ const scoped_refptr<base::SingleThreadTaskRunner>& media_task_runner);
- ~StreamProvider() override;
-
- // MediaResource implemenation.
+ // Demuxer implementation.
std::vector<DemuxerStream*> GetAllStreams() override;
+ std::string GetDisplayName() const override;
+ void Initialize(DemuxerHost* host, PipelineStatusCallback status_cb) override;
+ void AbortPendingReads() override;
+ void StartWaitingForSeek(base::TimeDelta seek_time) override;
+ void CancelPendingSeek(base::TimeDelta seek_time) override;
+ void Seek(base::TimeDelta time, PipelineStatusCallback status_cb) override;
+ void Stop() override;
+ base::TimeDelta GetStartTime() const override;
+ base::Time GetTimelineOffset() const override;
+ int64_t GetMemoryUsage() const override;
+ base::Optional<container_names::MediaContainerName> GetContainerForMetrics()
+ const override;
+ void OnEnabledAudioTracksChanged(const std::vector<MediaTrack::Id>& track_ids,
+ base::TimeDelta curr_time,
+ TrackChangeCB change_completed_cb) override;
+ void OnSelectedVideoTrackChanged(const std::vector<MediaTrack::Id>& track_ids,
+ base::TimeDelta curr_time,
+ TrackChangeCB change_completed_cb) override;
- void Initialize(int remote_audio_handle,
- int remote_video_handle,
- base::OnceClosure callback);
- void AppendBuffer(DemuxerStream::Type type,
- scoped_refptr<DecoderBuffer> buffer);
- void FlushUntil(DemuxerStream::Type type, int count);
+ protected:
+ // Deletion is only allowed via Destroy().
+ ~StreamProvider() override;
private:
- // Called when audio/video stream is initialized.
- void AudioStreamInitialized();
- void VideoStreamInitialized();
+ // An implementation of media::DemuxerStream on Media Remoting receiver.
+ // Receives data from mojo data pipe, and returns one frame or/and status when
+ // Read() is called.
+ class MediaStream final : public DemuxerStream,
+ public mojom::RemotingDataStreamReceiver {
+ public:
+ using UniquePtr =
+ std::unique_ptr<MediaStream, std::function<void(MediaStream*)>>;
- // Called when any error occurs.
- void OnError(const std::string& error);
+ // MediaStream should be created on the main thread to be able to get unique
+ // handle ID from |rpc_broker_|.
+ static void CreateOnMainThread(
+ RpcBroker* rpc_broker,
+ Type type,
+ int32_t handle,
+ const scoped_refptr<base::SingleThreadTaskRunner>& media_task_runner,
+ base::OnceCallback<void(MediaStream::UniquePtr)> callback);
- RpcBroker* const rpc_broker_; // Outlives this class.
- std::unique_ptr<MediaStream> video_stream_;
- std::unique_ptr<MediaStream> audio_stream_;
- bool audio_stream_initialized_ = false;
- bool video_stream_initialized_ = false;
+ // In order to destroy members in the right thread, MediaStream has to use
+ // DestructionHelper() to destroy itself.
+ static void DestructionHelper(MediaStream* stream);
- // Set when Initialize() is called, and will run when both video and audio
- // streams are initialized or error occurs.
- base::OnceClosure init_done_callback_;
+ MediaStream(
+ RpcBroker* rpc_broker,
+ Type type,
+ int32_t remote_handle,
+ const scoped_refptr<base::SingleThreadTaskRunner>& media_task_runner);
+
+ // DemuxerStream implementation.
+ void Read(ReadCB read_cb) override;
+ bool IsReadPending() const override;
+ AudioDecoderConfig audio_decoder_config() override;
+ VideoDecoderConfig video_decoder_config() override;
+ DemuxerStream::Type type() const override;
+ Liveness liveness() const override;
+ bool SupportsConfigChanges() override;
+
+ void Initialize(base::OnceClosure init_done_cb);
+
+ mojo::PendingRemote<mojom::RemotingDataStreamReceiver>
+ BindNewPipeAndPassRemote() {
+ return receiver_.BindNewPipeAndPassRemote();
+ }
+
+ private:
+ friend class base::DeleteHelper<MediaStream>; // For using DeleteSoon().
+ // For testing.
+ friend class StreamProviderTest;
+
+ // Prevent from unique_ptr using ~MediaStream() to destroy MediaStream
+ // instances. Use DestructionHelper() as the custom deleter with unique_ptr
+ // to destroy MediaStream instances.
+ ~MediaStream() override;
+
+ void Destroy();
+
+ // Send RPC message on |main_task_runner_|.
+ void SendRpcMessageOnMainThread(std::unique_ptr<pb::RpcMessage> message);
+
+ // mojom::RemotingDataStreamReceiver implementation.
+ void InitializeDataPipe(
+ mojo::ScopedDataPipeConsumerHandle data_pipe) override;
+ void ReceiveFrame(uint32_t count, mojom::DecoderBufferPtr buffer) override;
+ void FlushUntil(uint32_t count) override;
+
+ // RPC messages handlers.
+ void OnReceivedRpc(std::unique_ptr<pb::RpcMessage> message);
+ void OnInitializeCallback(std::unique_ptr<pb::RpcMessage> message);
+ void OnReadUntilCallback(std::unique_ptr<pb::RpcMessage> message);
+
+ // Issues the ReadUntil RPC message when read is pending and buffer is
+ // empty.
+ void SendReadUntil();
+
+ // Run |init_done_callback_| when MojoDecoderBufferReader is created and
+ // received RPC_DS_INITIALIZE_CALLBACK
+ void CompleteInitialize();
+
+ // Append a frame into |buffers_|.
+ void AppendBuffer(uint32_t count, scoped_refptr<DecoderBuffer> buffer);
+
+ // Run and reset the read callback.
+ void CompleteRead(DemuxerStream::Status status);
+
+ // Update the audio/video decoder config. When config changes in the mid
+ // stream, the new config will be stored in |next_audio_decoder_config_|.
+ // Old config will be dropped when all associated frames are consumed.
+ void UpdateAudioConfig(const pb::AudioDecoderConfig& audio_message);
+ void UpdateVideoConfig(const pb::VideoDecoderConfig& video_message);
+
+ // Called when any error occurs.
+ void OnError(const std::string& error);
+
+ scoped_refptr<base::SingleThreadTaskRunner> main_task_runner_;
+ scoped_refptr<base::SingleThreadTaskRunner> media_task_runner_;
+
+ RpcBroker* const rpc_broker_; // Outlives this class.
+ const Type type_;
+ const int remote_handle_;
+ const int rpc_handle_;
+
+ // Set when Initialize() is called.
+ base::OnceClosure init_done_callback_;
- // Run when first error occurs;
- base::OnceClosure error_callback_;
+ // The frame count of the frame to be returned on the next Read call. It
+ // will be increased whenever a frame is read. It will be updated when
+ // FlushUntil() is called.
+ uint32_t current_frame_count_ = 0;
- base::WeakPtrFactory<StreamProvider> weak_factory_{this};
+ // One plus the last frame count received over RTP. Used for continuity
+ // check.
+ uint32_t buffered_frame_count_ = 0;
- DISALLOW_COPY_AND_ASSIGN(StreamProvider);
+ // The total number of frames received from the sender side. It will be used
+ // as the base value for sending ReadUntil() to request more frames and be
+ // updated in OnReadUntilCallback() which would get the message that
+ // contains how many frames are sent.
+ uint32_t total_received_frame_count_ = 0;
+
+ // Indicates whether Audio/VideoDecoderConfig changed and the frames with
+ // the old config are not yet consumed. The new config is stored in the end
+ // of |audio/video_decoder_config_|.
+ bool config_changed_ = false;
+
+ // Indicates whether a ReadUntil RPC message was sent without receiving the
+ // ReadUntilCallback message yet.
+ bool read_until_sent_ = false;
+
+ // Indicates whether RPC_DS_INITIALIZE_CALLBACK received.
+ bool rpc_initialized_ = false;
+
+ // Set when Read() is called. Run only once when read completes.
+ ReadCB read_complete_callback_;
+
+ // The frame data would be sent via Mojo IPC as MojoDecoderBuffer. When a
+ // frame is sent to |this| from host by calling ReceiveFrame(),
+ // |decoder_buffer_reader_| is used to read the frame date from data pipe.
+ std::unique_ptr<MojoDecoderBufferReader> decoder_buffer_reader_;
+
+ base::circular_deque<scoped_refptr<DecoderBuffer>> buffers_;
+
+ // Current audio/video config.
+ AudioDecoderConfig audio_decoder_config_;
+ VideoDecoderConfig video_decoder_config_;
+
+ // Stores the new audio/video config when config changes.
+ AudioDecoderConfig next_audio_decoder_config_;
+ VideoDecoderConfig next_video_decoder_config_;
+
+ mojo::Receiver<mojom::RemotingDataStreamReceiver> receiver_{this};
+
+ base::WeakPtr<MediaStream> media_weak_this_;
+ base::WeakPtrFactory<MediaStream> media_weak_factory_{this};
+ };
+
+ friend std::default_delete<StreamProvider>;
+ friend class base::DeleteHelper<StreamProvider>; // For using DeleteSoon().
+
+ // For testing.
+ friend class StreamProviderTest;
+
+ void Destroy();
+
+ // RPC messages handlers.
+ void OnReceivedRpc(std::unique_ptr<pb::RpcMessage> message);
+ void OnAcquireDemuxer(std::unique_ptr<pb::RpcMessage> message);
+
+ // Called when audio/video stream is created and initialized.
+ void InitializeDataPipe();
+ void OnAudioStreamCreated(MediaStream::UniquePtr stream);
+ void OnVideoStreamCreated(MediaStream::UniquePtr stream);
+ void OnAudioStreamInitialized();
+ void OnVideoStreamInitialized();
+ void CompleteInitialize();
+
+ scoped_refptr<base::SingleThreadTaskRunner> main_task_runner_;
+ scoped_refptr<base::SingleThreadTaskRunner> media_task_runner_;
+ ReceiverController* const receiver_controller_; // Outlives this class
+ RpcBroker* const rpc_broker_; // Outlives this class
+ MediaStream::UniquePtr audio_stream_;
+ MediaStream::UniquePtr video_stream_;
+ bool has_audio_{false};
+ bool has_video_{false};
+ bool audio_stream_initialized_{false};
+ bool video_stream_initialized_{false};
+
+ // Set when Initialize() is called, and will run when both video and audio
+ // streams are initialized or error occurs.
+ PipelineStatusCallback init_done_callback_;
+
+ base::WeakPtr<StreamProvider> media_weak_this_;
+ base::WeakPtrFactory<StreamProvider> media_weak_factory_{this};
};
} // namespace remoting
} // namespace media
+namespace std {
+
+// Specialize std::default_delete to call Destroy().
+template <>
+struct default_delete<media::remoting::StreamProvider> {
+ constexpr default_delete() = default;
+
+ template <typename U,
+ typename = typename std::enable_if<std::is_convertible<
+ U*,
+ media::remoting::StreamProvider*>::value>::type>
+ explicit default_delete(const default_delete<U>& d) {}
+
+ void operator()(media::remoting::StreamProvider* ptr) const;
+};
+
+} // namespace std
+
#endif // MEDIA_REMOTING_STREAM_PROVIDER_H_
diff --git a/chromium/media/remoting/stream_provider_unittest.cc b/chromium/media/remoting/stream_provider_unittest.cc
new file mode 100644
index 00000000000..7ad43222db0
--- /dev/null
+++ b/chromium/media/remoting/stream_provider_unittest.cc
@@ -0,0 +1,316 @@
+// Copyright 2020 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "media/remoting/stream_provider.h"
+
+#include "base/test/task_environment.h"
+#include "media/base/audio_decoder_config.h"
+#include "media/base/demuxer_stream.h"
+#include "media/base/media_util.h"
+#include "media/base/test_helpers.h"
+#include "media/base/video_decoder_config.h"
+#include "media/remoting/mock_receiver_controller.h"
+#include "media/remoting/proto_enum_utils.h"
+#include "media/remoting/proto_utils.h"
+#include "testing/gmock/include/gmock/gmock.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+using testing::NiceMock;
+
+namespace {
+constexpr int kBufferSize = 10;
+} // namespace
+
+namespace media {
+namespace remoting {
+
+class StreamProviderTest : public testing::Test {
+ public:
+ StreamProviderTest()
+ : audio_config_(TestAudioConfig::Normal()),
+ video_config_(TestVideoConfig::Normal()),
+ audio_buffer_(new DecoderBuffer(kBufferSize)),
+ video_buffer_(DecoderBuffer::CreateEOSBuffer()) {}
+
+ void SetUp() override {
+ mock_controller_ = MockReceiverController::GetInstance();
+ mock_controller_->Initialize(
+ mock_controller_->mock_remotee()->BindNewPipeAndPassRemote());
+ mock_remotee_ = mock_controller_->mock_remotee();
+ stream_provider_ = std::make_unique<StreamProvider>(
+ mock_controller_, base::ThreadTaskRunnerHandle::Get());
+
+ rpc_broker_ = mock_controller_->rpc_broker();
+ sender_audio_demuxer_stream_handle_ = rpc_broker_->GetUniqueHandle();
+ sender_video_demuxer_stream_handle_ = rpc_broker_->GetUniqueHandle();
+ rpc_broker_->RegisterMessageReceiverCallback(
+ sender_audio_demuxer_stream_handle_,
+ base::BindRepeating(&StreamProviderTest::OnDemuxerStreamReceivedRpc,
+ base::Unretained(this),
+ DemuxerStream::Type::AUDIO));
+ rpc_broker_->RegisterMessageReceiverCallback(
+ sender_video_demuxer_stream_handle_,
+ base::BindRepeating(&StreamProviderTest::OnDemuxerStreamReceivedRpc,
+ base::Unretained(this),
+ DemuxerStream::Type::VIDEO));
+ }
+
+ void TearDown() override {
+ stream_provider_.reset();
+ task_environment_.RunUntilIdle();
+ }
+
+ void OnDemuxerStreamReceivedRpc(DemuxerStream::Type type,
+ std::unique_ptr<pb::RpcMessage> message) {
+ DCHECK(message);
+ switch (message->proc()) {
+ case pb::RpcMessage::RPC_DS_INITIALIZE:
+ if (type == DemuxerStream::Type::AUDIO) {
+ receiver_audio_demuxer_stream_handle_ = message->integer_value();
+ } else if (type == DemuxerStream::Type::VIDEO) {
+ receiver_video_demuxer_stream_handle_ = message->integer_value();
+ } else {
+ NOTREACHED();
+ }
+
+ RpcInitializeCallback(type);
+ break;
+
+ case pb::RpcMessage::RPC_DS_READUNTIL:
+ ReadUntil(type);
+ break;
+
+ default:
+ DVLOG(1) << __func__ << "Unknown supported message.";
+ }
+ }
+
+ void RpcInitializeCallback(DemuxerStream::Type type) {
+ // Issues RPC_DS_INITIALIZE_CALLBACK RPC message.
+ auto rpc = std::make_unique<pb::RpcMessage>();
+ rpc->set_handle(type == DemuxerStream::Type::AUDIO
+ ? receiver_audio_demuxer_stream_handle_
+ : receiver_video_demuxer_stream_handle_);
+ rpc->set_proc(pb::RpcMessage::RPC_DS_INITIALIZE_CALLBACK);
+ auto* init_cb_message = rpc->mutable_demuxerstream_initializecb_rpc();
+ init_cb_message->set_type(type);
+
+ switch (type) {
+ case DemuxerStream::Type::AUDIO: {
+ pb::AudioDecoderConfig* audio_message =
+ init_cb_message->mutable_audio_decoder_config();
+ ConvertAudioDecoderConfigToProto(audio_config_, audio_message);
+ break;
+ }
+
+ case DemuxerStream::Type::VIDEO: {
+ pb::VideoDecoderConfig* video_message =
+ init_cb_message->mutable_video_decoder_config();
+ ConvertVideoDecoderConfigToProto(video_config_, video_message);
+ break;
+ }
+
+ default:
+ NOTREACHED();
+ }
+
+ rpc_broker_->SendMessageToRemote(std::move(rpc));
+ }
+
+ void ReadUntil(DemuxerStream::Type type) {
+ switch (type) {
+ case DemuxerStream::Type::AUDIO:
+ SendAudioFrame();
+ break;
+ case DemuxerStream::Type::VIDEO:
+ SendVideoFrame();
+ break;
+ default:
+ NOTREACHED();
+ }
+ }
+
+ void SendRpcAcquireDemuxer() {
+ auto rpc = std::make_unique<pb::RpcMessage>();
+ rpc->set_handle(RpcBroker::kAcquireDemuxerHandle);
+ rpc->set_proc(pb::RpcMessage::RPC_ACQUIRE_DEMUXER);
+ pb::AcquireDemuxer* message = rpc->mutable_acquire_demuxer_rpc();
+ message->set_audio_demuxer_handle(sender_audio_demuxer_stream_handle_);
+ message->set_video_demuxer_handle(sender_video_demuxer_stream_handle_);
+ rpc_broker_->SendMessageToRemote(std::move(rpc));
+ }
+
+ void OnStreamProviderInitialized(PipelineStatus status) {
+ EXPECT_EQ(PipelineStatus::PIPELINE_OK, status);
+ stream_provider_initialized_ = true;
+ audio_stream_ =
+ stream_provider_->GetFirstStream(DemuxerStream::Type::AUDIO);
+ video_stream_ =
+ stream_provider_->GetFirstStream(DemuxerStream::Type::VIDEO);
+
+ EXPECT_TRUE(audio_stream_);
+ EXPECT_TRUE(video_stream_);
+ }
+
+ void InitializeDemuxer() {
+ DCHECK(stream_provider_);
+ stream_provider_->Initialize(
+ nullptr,
+ base::BindOnce(&StreamProviderTest::OnStreamProviderInitialized,
+ base::Unretained(this)));
+ }
+
+ void SendAudioFrame() {
+ mock_remotee_->SendAudioFrame(0, audio_buffer_);
+ SendRpcReadUntilCallback(DemuxerStream::Type::AUDIO);
+ }
+
+ void SendVideoFrame() {
+ mock_remotee_->SendVideoFrame(0, video_buffer_);
+ SendRpcReadUntilCallback(DemuxerStream::Type::VIDEO);
+ }
+
+ void SendRpcReadUntilCallback(DemuxerStream::Type type) {
+ // Issues RPC_DS_READUNTIL_CALLBACK RPC message.
+ auto rpc = std::make_unique<pb::RpcMessage>();
+ rpc->set_handle(type == DemuxerStream::Type::AUDIO
+ ? receiver_audio_demuxer_stream_handle_
+ : receiver_video_demuxer_stream_handle_);
+ rpc->set_proc(pb::RpcMessage::RPC_DS_READUNTIL_CALLBACK);
+ auto* message = rpc->mutable_demuxerstream_readuntilcb_rpc();
+ message->set_count(0);
+ message->set_status(
+ ToProtoDemuxerStreamStatus(DemuxerStream::Status::kOk).value());
+ rpc_broker_->SendMessageToRemote(std::move(rpc));
+ }
+
+ void FlushUntil(uint32_t flush_audio_count, uint32_t flush_video_count) {
+ mock_remotee_->OnFlushUntil(flush_audio_count, flush_video_count);
+ }
+
+ uint32_t GetAudioCurrentFrameCount() {
+ return stream_provider_->audio_stream_->current_frame_count_;
+ }
+
+ uint32_t GetVideoCurrentFrameCount() {
+ return stream_provider_->video_stream_->current_frame_count_;
+ }
+
+ void OnBufferReadFromDemuxerStream(DemuxerStream::Type type,
+ DemuxerStream::Status status,
+ scoped_refptr<DecoderBuffer> buffer) {
+ EXPECT_EQ(status, DemuxerStream::Status::kOk);
+ switch (type) {
+ case DemuxerStream::Type::AUDIO:
+ received_audio_buffer_ = buffer;
+ break;
+ case DemuxerStream::Type::VIDEO:
+ received_video_buffer_ = buffer;
+ break;
+ default:
+ NOTREACHED();
+ }
+ }
+
+ base::test::TaskEnvironment task_environment_;
+
+ AudioDecoderConfig audio_config_;
+ VideoDecoderConfig video_config_;
+
+ DemuxerStream* audio_stream_;
+ DemuxerStream* video_stream_;
+
+ scoped_refptr<DecoderBuffer> audio_buffer_;
+ scoped_refptr<DecoderBuffer> video_buffer_;
+
+ bool stream_provider_initialized_{false};
+ scoped_refptr<DecoderBuffer> received_audio_buffer_;
+ scoped_refptr<DecoderBuffer> received_video_buffer_;
+
+ int sender_audio_demuxer_stream_handle_ = RpcBroker::kInvalidHandle;
+ int sender_video_demuxer_stream_handle_ = RpcBroker::kInvalidHandle;
+ int receiver_audio_demuxer_stream_handle_ = RpcBroker::kInvalidHandle;
+ int receiver_video_demuxer_stream_handle_ = RpcBroker::kInvalidHandle;
+
+ RpcBroker* rpc_broker_;
+ MockReceiverController* mock_controller_;
+ MockRemotee* mock_remotee_;
+ std::unique_ptr<StreamProvider> stream_provider_;
+};
+
+TEST_F(StreamProviderTest, InitializeBeforeRpcAcquireDemuxer) {
+ InitializeDemuxer();
+ EXPECT_FALSE(stream_provider_initialized_);
+
+ SendRpcAcquireDemuxer();
+ task_environment_.RunUntilIdle();
+
+ EXPECT_TRUE(mock_remotee_->audio_stream_.is_bound());
+ EXPECT_TRUE(mock_remotee_->video_stream_.is_bound());
+ EXPECT_TRUE(stream_provider_initialized_);
+
+ // 1 audio stream and 1 video stream
+ EXPECT_EQ(size_t(2), stream_provider_->GetAllStreams().size());
+}
+
+TEST_F(StreamProviderTest, InitializeAfterRpcAcquireDemuxer) {
+ SendRpcAcquireDemuxer();
+ EXPECT_FALSE(stream_provider_initialized_);
+
+ InitializeDemuxer();
+ task_environment_.RunUntilIdle();
+
+ EXPECT_TRUE(mock_remotee_->audio_stream_.is_bound());
+ EXPECT_TRUE(mock_remotee_->video_stream_.is_bound());
+ EXPECT_TRUE(stream_provider_initialized_);
+
+ // 1 audio stream and 1 video stream
+ EXPECT_EQ(size_t(2), stream_provider_->GetAllStreams().size());
+}
+
+TEST_F(StreamProviderTest, ReadBuffer) {
+ InitializeDemuxer();
+ SendRpcAcquireDemuxer();
+ task_environment_.RunUntilIdle();
+ EXPECT_TRUE(mock_remotee_->audio_stream_.is_bound());
+ EXPECT_TRUE(mock_remotee_->video_stream_.is_bound());
+ EXPECT_TRUE(stream_provider_initialized_);
+
+ audio_stream_->Read(
+ base::BindOnce(&StreamProviderTest::OnBufferReadFromDemuxerStream,
+ base::Unretained(this), DemuxerStream::Type::AUDIO));
+ task_environment_.RunUntilIdle();
+ EXPECT_EQ(audio_buffer_->data_size(), received_audio_buffer_->data_size());
+ EXPECT_EQ(audio_buffer_->end_of_stream(),
+ received_audio_buffer_->end_of_stream());
+ EXPECT_EQ(audio_buffer_->is_key_frame(),
+ received_audio_buffer_->is_key_frame());
+
+ video_stream_->Read(
+ base::BindOnce(&StreamProviderTest::OnBufferReadFromDemuxerStream,
+ base::Unretained(this), DemuxerStream::Type::VIDEO));
+ task_environment_.RunUntilIdle();
+ EXPECT_EQ(video_buffer_->end_of_stream(),
+ received_video_buffer_->end_of_stream());
+}
+
+TEST_F(StreamProviderTest, FlushUntil) {
+ InitializeDemuxer();
+ SendRpcAcquireDemuxer();
+ task_environment_.RunUntilIdle();
+ EXPECT_TRUE(mock_remotee_->audio_stream_.is_bound());
+ EXPECT_TRUE(mock_remotee_->video_stream_.is_bound());
+ EXPECT_TRUE(stream_provider_initialized_);
+
+ uint32_t flush_audio_count = 10;
+ uint32_t flush_video_count = 20;
+ FlushUntil(flush_audio_count, flush_video_count);
+ task_environment_.RunUntilIdle();
+
+ EXPECT_EQ(GetAudioCurrentFrameCount(), flush_audio_count);
+ EXPECT_EQ(GetVideoCurrentFrameCount(), flush_video_count);
+}
+
+} // namespace remoting
+} // namespace media
diff --git a/chromium/media/remoting/test_utils.cc b/chromium/media/remoting/test_utils.cc
new file mode 100644
index 00000000000..d3ec82254d4
--- /dev/null
+++ b/chromium/media/remoting/test_utils.cc
@@ -0,0 +1,17 @@
+// Copyright 2020 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "media/remoting/test_utils.h"
+#include "media/remoting/receiver_controller.h"
+
+namespace media {
+namespace remoting {
+
+void ResetForTesting(ReceiverController* controller) {
+ controller->receiver_.reset();
+ controller->media_remotee_.reset();
+}
+
+} // namespace remoting
+} // namespace media
diff --git a/chromium/media/remoting/test_utils.h b/chromium/media/remoting/test_utils.h
new file mode 100644
index 00000000000..2b5a3e77de3
--- /dev/null
+++ b/chromium/media/remoting/test_utils.h
@@ -0,0 +1,19 @@
+// Copyright 2020 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MEDIA_REMOTING_TEST_UTILS_H_
+#define MEDIA_REMOTING_TEST_UTILS_H_
+
+namespace media {
+namespace remoting {
+
+class ReceiverController;
+
+// Friend function for resetting the mojo binding in ReceiverController.
+void ResetForTesting(ReceiverController* controller);
+
+} // namespace remoting
+} // namespace media
+
+#endif // MEDIA_REMOTING_TEST_UTILS_H_