summaryrefslogtreecommitdiff
path: root/chromium/media/remoting/stream_provider.cc
diff options
context:
space:
mode:
Diffstat (limited to 'chromium/media/remoting/stream_provider.cc')
-rw-r--r--chromium/media/remoting/stream_provider.cc687
1 files changed, 424 insertions, 263 deletions
diff --git a/chromium/media/remoting/stream_provider.cc b/chromium/media/remoting/stream_provider.cc
index 7ff69c808c5..198cf6c3f14 100644
--- a/chromium/media/remoting/stream_provider.cc
+++ b/chromium/media/remoting/stream_provider.cc
@@ -3,16 +3,23 @@
// found in the LICENSE file.
#include "media/remoting/stream_provider.h"
+#include <vector>
#include "base/bind.h"
#include "base/callback.h"
#include "base/callback_helpers.h"
#include "base/containers/circular_deque.h"
#include "base/logging.h"
+#include "base/single_thread_task_runner.h"
+#include "base/threading/thread_task_runner_handle.h"
+#include "media/base/bind_to_current_loop.h"
#include "media/base/decoder_buffer.h"
#include "media/base/video_transformation.h"
+#include "media/mojo/common/mojo_decoder_buffer_converter.h"
#include "media/remoting/proto_enum_utils.h"
#include "media/remoting/proto_utils.h"
+#include "media/remoting/receiver_controller.h"
+#include "media/remoting/rpc_broker.h"
namespace media {
namespace remoting {
@@ -22,131 +29,140 @@ namespace {
constexpr int kNumFramesInEachReadUntil = 10;
}
-// An implementation of media::DemuxerStream on Media Remoting receiver.
-// Receives data from mojo data pipe, and returns one frame or/and status when
-// Read() is called.
-class MediaStream final : public DemuxerStream {
- public:
- MediaStream(RpcBroker* rpc_broker,
- Type type,
- int remote_handle,
- base::OnceClosure error_callback);
- ~MediaStream() override;
-
- // DemuxerStream implementation.
- void Read(ReadCB read_cb) override;
- bool IsReadPending() const override;
- AudioDecoderConfig audio_decoder_config() override;
- VideoDecoderConfig video_decoder_config() override;
- DemuxerStream::Type type() const override;
- Liveness liveness() const override;
- bool SupportsConfigChanges() override;
-
- void Initialize(base::OnceClosure init_done_cb);
- void FlushUntil(int count);
- void AppendBuffer(scoped_refptr<DecoderBuffer> buffer);
-
- private:
- // RPC messages handlers.
- void OnReceivedRpc(std::unique_ptr<pb::RpcMessage> message);
- void OnInitializeCallback(std::unique_ptr<pb::RpcMessage> message);
- void OnReadUntilCallback(std::unique_ptr<pb::RpcMessage> message);
-
- // Issues the ReadUntil RPC message when read is pending and buffer is empty.
- void SendReadUntil();
-
- // Run and reset the read callback.
- void CompleteRead(DemuxerStream::Status status);
-
- // Update the audio/video decoder config When config changes in the mid
- // stream, the new config will be stored in
- // |next_audio/video_decoder_config_|. Old config will be droped when all
- // associated frames are consumed.
- void UpdateConfig(const pb::AudioDecoderConfig* audio_message,
- const pb::VideoDecoderConfig* video_message);
-
- // Called when any error occurs.
- void OnError(const std::string& error);
-
- RpcBroker* const rpc_broker_; // Outlives this class.
- const Type type_;
- const int remote_handle_;
- const int rpc_handle_;
-
- // Set when Initialize() is called, and will be run after initialization is
- // done.
- base::OnceClosure init_done_callback_;
-
- // The read until count in the last ReadUntil RPC message.
- int last_read_until_count_ = 0;
-
- // Indicates whether Audio/VideoDecoderConfig changed and the frames with the
- // old config are not yet consumed. The new config is stored in the end of
- // |audio/video_decoder_config_|;
- bool config_changed_ = false;
-
- // Indicates whether a ReadUntil RPC message was sent without receiving the
- // ReadUntilCallback message yet.
- bool read_until_sent_ = false;
-
- // Set when Read() is called. Run only once when read completes.
- ReadCB read_complete_callback_;
-
- base::OnceClosure error_callback_; // Called when first error occurs.
-
- base::circular_deque<scoped_refptr<DecoderBuffer>> buffers_;
-
- // Current audio/video config.
- AudioDecoderConfig audio_decoder_config_;
- VideoDecoderConfig video_decoder_config_;
-
- // Stores the new auido/video config when config changes.
- AudioDecoderConfig next_audio_decoder_config_;
- VideoDecoderConfig next_video_decoder_config_;
-
- base::WeakPtrFactory<MediaStream> weak_factory_{this};
-
- DISALLOW_COPY_AND_ASSIGN(MediaStream);
-};
-
-MediaStream::MediaStream(RpcBroker* rpc_broker,
- Type type,
- int remote_handle,
- base::OnceClosure error_callback)
- : rpc_broker_(rpc_broker),
+// static
+void StreamProvider::MediaStream::CreateOnMainThread(
+ RpcBroker* rpc_broker,
+ Type type,
+ int32_t handle,
+ const scoped_refptr<base::SingleThreadTaskRunner>& media_task_runner,
+ base::OnceCallback<void(MediaStream::UniquePtr)> callback) {
+ MediaStream::UniquePtr stream(
+ new MediaStream(rpc_broker, type, handle, media_task_runner),
+ &DestructionHelper);
+ std::move(callback).Run(std::move(stream));
+}
+
+// static
+void StreamProvider::MediaStream::DestructionHelper(MediaStream* stream) {
+ stream->Destroy();
+}
+
+StreamProvider::MediaStream::MediaStream(
+ RpcBroker* rpc_broker,
+ Type type,
+ int remote_handle,
+ const scoped_refptr<base::SingleThreadTaskRunner>& media_task_runner)
+ : main_task_runner_(base::ThreadTaskRunnerHandle::Get()),
+ media_task_runner_(media_task_runner),
+ rpc_broker_(rpc_broker),
type_(type),
remote_handle_(remote_handle),
- rpc_handle_(rpc_broker_->GetUniqueHandle()),
- error_callback_(std::move(error_callback)) {
+ rpc_handle_(rpc_broker_->GetUniqueHandle()) {
DCHECK(remote_handle_ != RpcBroker::kInvalidHandle);
- rpc_broker_->RegisterMessageReceiverCallback(
- rpc_handle_, base::BindRepeating(&MediaStream::OnReceivedRpc,
- weak_factory_.GetWeakPtr()));
+
+ media_weak_this_ = media_weak_factory_.GetWeakPtr();
+
+ const RpcBroker::ReceiveMessageCallback receive_callback =
+ BindToLoop(media_task_runner_,
+ BindRepeating(&MediaStream::OnReceivedRpc, media_weak_this_));
+ rpc_broker_->RegisterMessageReceiverCallback(rpc_handle_, receive_callback);
}
-MediaStream::~MediaStream() {
+StreamProvider::MediaStream::~MediaStream() {
+ DCHECK(main_task_runner_->BelongsToCurrentThread());
rpc_broker_->UnregisterMessageReceiverCallback(rpc_handle_);
}
-void MediaStream::Initialize(base::OnceClosure init_done_cb) {
- DCHECK(init_done_cb);
- if (!init_done_callback_.is_null()) {
+void StreamProvider::MediaStream::Destroy() {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
+
+ // Invalid weak pointers to prevent |this| from receiving RPC calls on the
+ // media thread.
+ media_weak_factory_.InvalidateWeakPtrs();
+
+ // Unbind all mojo pipes and bindings.
+ receiver_.reset();
+ decoder_buffer_reader_.reset();
+
+ // After invalidating all weak ptrs of |media_weak_factory_|, MediaStream
+ // won't be access anymore, so using |this| here is safe.
+ main_task_runner_->DeleteSoon(FROM_HERE, this);
+}
+
+void StreamProvider::MediaStream::SendRpcMessageOnMainThread(
+ std::unique_ptr<pb::RpcMessage> message) {
+ // |rpc_broker_| is owned by |receiver_controller_| which is a singleton per
+ // process, so it's safe to use Unretained() here.
+ main_task_runner_->PostTask(
+ FROM_HERE,
+ base::BindOnce(&RpcBroker::SendMessageToRemote,
+ base::Unretained(rpc_broker_), std::move(message)));
+}
+
+void StreamProvider::MediaStream::Initialize(
+ base::OnceClosure init_done_callback) {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
+ DCHECK(init_done_callback);
+
+ if (init_done_callback_) {
OnError("Duplicate initialization");
return;
}
- init_done_callback_ = std::move(init_done_cb);
- DVLOG(3) << __func__ << "Issues RpcMessage::RPC_DS_INITIALIZE with "
- << "remote_handle=" << remote_handle_
- << " rpc_handle=" << rpc_handle_;
- std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage());
+ init_done_callback_ = std::move(init_done_callback);
+
+ auto rpc = std::make_unique<pb::RpcMessage>();
rpc->set_handle(remote_handle_);
rpc->set_proc(pb::RpcMessage::RPC_DS_INITIALIZE);
rpc->set_integer_value(rpc_handle_);
- rpc_broker_->SendMessageToRemote(std::move(rpc));
+ SendRpcMessageOnMainThread(std::move(rpc));
+}
+
+void StreamProvider::MediaStream::InitializeDataPipe(
+ mojo::ScopedDataPipeConsumerHandle data_pipe) {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
+
+ decoder_buffer_reader_ =
+ std::make_unique<MojoDecoderBufferReader>(std::move(data_pipe));
+ CompleteInitialize();
+}
+
+void StreamProvider::MediaStream::ReceiveFrame(uint32_t count,
+ mojom::DecoderBufferPtr buffer) {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
+ DCHECK(decoder_buffer_reader_);
+
+ auto callback = BindToCurrentLoop(
+ base::BindOnce(&MediaStream::AppendBuffer, media_weak_this_, count));
+ decoder_buffer_reader_->ReadDecoderBuffer(std::move(buffer),
+ std::move(callback));
+}
+
+void StreamProvider::MediaStream::FlushUntil(uint32_t count) {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
+
+ if (count < current_frame_count_)
+ return;
+
+ uint32_t buffers_to_erase = count - current_frame_count_;
+
+ if (buffers_to_erase > buffers_.size()) {
+ buffers_.clear();
+ } else {
+ buffers_.erase(buffers_.begin(), buffers_.begin() + buffers_to_erase);
+ }
+
+ current_frame_count_ = count;
+
+ if (!read_complete_callback_.is_null())
+ CompleteRead(DemuxerStream::kAborted);
+
+ read_until_sent_ = false;
}
-void MediaStream::OnReceivedRpc(std::unique_ptr<pb::RpcMessage> message) {
+void StreamProvider::MediaStream::OnReceivedRpc(
+ std::unique_ptr<pb::RpcMessage> message) {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
DCHECK(message->handle() == rpc_handle_);
switch (message->proc()) {
@@ -161,24 +177,21 @@ void MediaStream::OnReceivedRpc(std::unique_ptr<pb::RpcMessage> message) {
}
}
-void MediaStream::OnInitializeCallback(
+void StreamProvider::MediaStream::OnInitializeCallback(
std::unique_ptr<pb::RpcMessage> message) {
- DVLOG(3) << __func__ << "Receives RPC_DS_INITIALIZE_CALLBACK message.";
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
const pb::DemuxerStreamInitializeCallback callback_message =
message->demuxerstream_initializecb_rpc();
if (callback_message.type() != type_) {
OnError("Wrong type");
return;
}
+
if ((type_ == DemuxerStream::AUDIO &&
audio_decoder_config_.IsValidConfig()) ||
(type_ == DemuxerStream::VIDEO &&
video_decoder_config_.IsValidConfig())) {
- OnError("Duplicate Iniitialize");
- return;
- }
- if (init_done_callback_.is_null()) {
- OnError("Iniitialize callback missing");
+ OnError("Duplicate initialization");
return;
}
@@ -186,21 +199,41 @@ void MediaStream::OnInitializeCallback(
callback_message.has_audio_decoder_config()) {
const pb::AudioDecoderConfig audio_message =
callback_message.audio_decoder_config();
- UpdateConfig(&audio_message, nullptr);
+ UpdateAudioConfig(audio_message);
} else if (type_ == DemuxerStream::VIDEO &&
callback_message.has_video_decoder_config()) {
const pb::VideoDecoderConfig video_message =
callback_message.video_decoder_config();
- UpdateConfig(nullptr, &video_message);
+ UpdateVideoConfig(video_message);
} else {
- OnError("config missing");
+ OnError("Config missing");
return;
}
+
+ rpc_initialized_ = true;
+ CompleteInitialize();
+}
+
+void StreamProvider::MediaStream::CompleteInitialize() {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
+
+ // Initialization finished when received RPC_DS_INITIALIZE_CALLBACK and
+ // |decoder_buffer_reader_| is created.
+ if (!rpc_initialized_ || !decoder_buffer_reader_)
+ return;
+
+ if (!init_done_callback_) {
+ OnError("Initialize callback missing");
+ return;
+ }
+
std::move(init_done_callback_).Run();
}
-void MediaStream::OnReadUntilCallback(std::unique_ptr<pb::RpcMessage> message) {
- DVLOG(3) << __func__ << ": Receives RPC_DS_READUNTIL_CALLBACK message.";
+void StreamProvider::MediaStream::OnReadUntilCallback(
+ std::unique_ptr<pb::RpcMessage> message) {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
+
if (!read_until_sent_) {
OnError("Unexpected ReadUntilCallback");
return;
@@ -208,98 +241,90 @@ void MediaStream::OnReadUntilCallback(std::unique_ptr<pb::RpcMessage> message) {
read_until_sent_ = false;
const pb::DemuxerStreamReadUntilCallback callback_message =
message->demuxerstream_readuntilcb_rpc();
- last_read_until_count_ = callback_message.count();
+ total_received_frame_count_ = callback_message.count();
+
if (ToDemuxerStreamStatus(callback_message.status()) == kConfigChanged) {
config_changed_ = true;
+
if (callback_message.has_audio_decoder_config()) {
const pb::AudioDecoderConfig audio_message =
callback_message.audio_decoder_config();
- UpdateConfig(&audio_message, nullptr);
+ UpdateAudioConfig(audio_message);
}
+
if (callback_message.has_video_decoder_config()) {
const pb::VideoDecoderConfig video_message =
callback_message.video_decoder_config();
- UpdateConfig(nullptr, &video_message);
+ UpdateVideoConfig(video_message);
}
+
if (buffers_.empty() && !read_complete_callback_.is_null())
CompleteRead(DemuxerStream::kConfigChanged);
+
return;
}
+
if (buffers_.empty() && !read_complete_callback_.is_null())
SendReadUntil();
}
-void MediaStream::UpdateConfig(const pb::AudioDecoderConfig* audio_message,
- const pb::VideoDecoderConfig* video_message) {
- if (type_ == AUDIO) {
- DCHECK(audio_message && !video_message);
- AudioDecoderConfig audio_config;
- ConvertProtoToAudioDecoderConfig(*audio_message, &audio_config);
- if (!audio_config.IsValidConfig()) {
- OnError("Invalid audio config");
- return;
- }
- if (config_changed_) {
- DCHECK(audio_decoder_config_.IsValidConfig());
- DCHECK(!next_audio_decoder_config_.IsValidConfig());
- next_audio_decoder_config_ = audio_config;
- } else {
- DCHECK(!audio_decoder_config_.IsValidConfig());
- audio_decoder_config_ = audio_config;
- }
- } else if (type_ == VIDEO) {
- DCHECK(video_message && !audio_message);
- VideoDecoderConfig video_config;
- ConvertProtoToVideoDecoderConfig(*video_message, &video_config);
- if (!video_config.IsValidConfig()) {
- OnError("Invalid video config");
- return;
- }
- if (config_changed_) {
- DCHECK(video_decoder_config_.IsValidConfig());
- DCHECK(!next_video_decoder_config_.IsValidConfig());
- next_video_decoder_config_ = video_config;
- } else {
- DCHECK(!video_decoder_config_.IsValidConfig());
- video_decoder_config_ = video_config;
- }
+void StreamProvider::MediaStream::UpdateAudioConfig(
+ const pb::AudioDecoderConfig& audio_message) {
+ DCHECK(type_ == AUDIO);
+ AudioDecoderConfig audio_config;
+ ConvertProtoToAudioDecoderConfig(audio_message, &audio_config);
+ if (!audio_config.IsValidConfig()) {
+ OnError("Invalid audio config");
+ return;
+ }
+ if (config_changed_) {
+ DCHECK(audio_decoder_config_.IsValidConfig());
+ DCHECK(!next_audio_decoder_config_.IsValidConfig());
+ next_audio_decoder_config_ = audio_config;
} else {
- NOTREACHED() << ": Only supports video or audio stream.";
+ DCHECK(!audio_decoder_config_.IsValidConfig());
+ audio_decoder_config_ = audio_config;
}
}
-void MediaStream::SendReadUntil() {
+void StreamProvider::MediaStream::UpdateVideoConfig(
+ const pb::VideoDecoderConfig& video_message) {
+ DCHECK(type_ == VIDEO);
+ VideoDecoderConfig video_config;
+ ConvertProtoToVideoDecoderConfig(video_message, &video_config);
+ if (!video_config.IsValidConfig()) {
+ OnError("Invalid video config");
+ return;
+ }
+ if (config_changed_) {
+ DCHECK(video_decoder_config_.IsValidConfig());
+ DCHECK(!next_video_decoder_config_.IsValidConfig());
+ next_video_decoder_config_ = video_config;
+ } else {
+ DCHECK(!video_decoder_config_.IsValidConfig());
+ video_decoder_config_ = video_config;
+ }
+}
+
+void StreamProvider::MediaStream::SendReadUntil() {
if (read_until_sent_)
return;
- DVLOG(3) << "Issues RPC_DS_READUNTIL RPC message to remote_handle_="
- << remote_handle_ << " with callback handle=" << rpc_handle_
- << " count=" << last_read_until_count_;
std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage());
rpc->set_handle(remote_handle_);
rpc->set_proc(pb::RpcMessage::RPC_DS_READUNTIL);
auto* message = rpc->mutable_demuxerstream_readuntil_rpc();
- last_read_until_count_ += kNumFramesInEachReadUntil;
- message->set_count(last_read_until_count_);
+ message->set_count(total_received_frame_count_ + kNumFramesInEachReadUntil);
message->set_callback_handle(rpc_handle_);
- rpc_broker_->SendMessageToRemote(std::move(rpc));
+ SendRpcMessageOnMainThread(std::move(rpc));
read_until_sent_ = true;
}
-void MediaStream::FlushUntil(int count) {
- while (!buffers_.empty()) {
- buffers_.pop_front();
- }
-
- last_read_until_count_ = count;
- if (!read_complete_callback_.is_null())
- CompleteRead(DemuxerStream::kAborted);
- read_until_sent_ = false;
-}
-
-void MediaStream::Read(ReadCB read_cb) {
+void StreamProvider::MediaStream::Read(ReadCB read_cb) {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
DCHECK(read_complete_callback_.is_null());
DCHECK(read_cb);
+
read_complete_callback_ = std::move(read_cb);
if (buffers_.empty() && config_changed_) {
CompleteRead(DemuxerStream::kConfigChanged);
@@ -315,26 +340,21 @@ void MediaStream::Read(ReadCB read_cb) {
CompleteRead(DemuxerStream::kOk);
}
-bool MediaStream::IsReadPending() const {
+bool StreamProvider::MediaStream::IsReadPending() const {
return !read_complete_callback_.is_null();
}
-void MediaStream::CompleteRead(DemuxerStream::Status status) {
- DVLOG(3) << __func__ << ": " << status;
+void StreamProvider::MediaStream::CompleteRead(DemuxerStream::Status status) {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
+
switch (status) {
case DemuxerStream::kConfigChanged:
if (type_ == AUDIO) {
DCHECK(next_audio_decoder_config_.IsValidConfig());
audio_decoder_config_ = next_audio_decoder_config_;
-#if DCHECK_IS_ON()
- next_audio_decoder_config_ = AudioDecoderConfig();
-#endif // DCHECK_IS_ON()
} else {
DCHECK(next_video_decoder_config_.IsValidConfig());
video_decoder_config_ = next_video_decoder_config_;
-#if DCHECK_IS_ON()
- next_video_decoder_config_ = VideoDecoderConfig();
-#endif // DCHECK_IS_ON()
}
config_changed_ = false;
std::move(read_complete_callback_).Run(status, nullptr);
@@ -344,111 +364,263 @@ void MediaStream::CompleteRead(DemuxerStream::Status status) {
std::move(read_complete_callback_).Run(status, nullptr);
return;
case DemuxerStream::kOk:
+ DCHECK(read_complete_callback_);
DCHECK(!buffers_.empty());
+ DCHECK_LT(current_frame_count_, buffered_frame_count_);
scoped_refptr<DecoderBuffer> frame_data = buffers_.front();
buffers_.pop_front();
+ ++current_frame_count_;
std::move(read_complete_callback_).Run(status, frame_data);
return;
}
}
-AudioDecoderConfig MediaStream::audio_decoder_config() {
- DVLOG(3) << __func__;
+AudioDecoderConfig StreamProvider::MediaStream::audio_decoder_config() {
DCHECK(type_ == DemuxerStream::AUDIO);
return audio_decoder_config_;
}
-VideoDecoderConfig MediaStream::video_decoder_config() {
- DVLOG(3) << __func__;
+VideoDecoderConfig StreamProvider::MediaStream::video_decoder_config() {
DCHECK(type_ == DemuxerStream::VIDEO);
return video_decoder_config_;
}
-DemuxerStream::Type MediaStream::type() const {
+DemuxerStream::Type StreamProvider::MediaStream::type() const {
return type_;
}
-DemuxerStream::Liveness MediaStream::liveness() const {
+DemuxerStream::Liveness StreamProvider::MediaStream::liveness() const {
return DemuxerStream::LIVENESS_LIVE;
}
-bool MediaStream::SupportsConfigChanges() {
+bool StreamProvider::MediaStream::SupportsConfigChanges() {
return true;
}
-void MediaStream::AppendBuffer(scoped_refptr<DecoderBuffer> buffer) {
- DVLOG(3) << __func__;
+void StreamProvider::MediaStream::AppendBuffer(
+ uint32_t count,
+ scoped_refptr<DecoderBuffer> buffer) {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
+
+ // Drop flushed frame.
+ if (count < current_frame_count_)
+ return;
+
+ // Continuity check.
+ DCHECK(buffers_.empty() || buffered_frame_count_ == count);
+
buffers_.push_back(buffer);
+ buffered_frame_count_ = count + 1;
+
if (!read_complete_callback_.is_null())
CompleteRead(DemuxerStream::kOk);
}
-void MediaStream::OnError(const std::string& error) {
- VLOG(1) << __func__ << ": " << error;
- if (error_callback_.is_null())
- return;
- std::move(error_callback_).Run();
+void StreamProvider::MediaStream::OnError(const std::string& error) {
+ auto rpc = std::make_unique<pb::RpcMessage>();
+ rpc->set_handle(remote_handle_);
+ rpc->set_proc(pb::RpcMessage::RPC_DS_ONERROR);
+ SendRpcMessageOnMainThread(std::move(rpc));
}
-StreamProvider::StreamProvider(RpcBroker* rpc_broker,
- base::OnceClosure error_callback)
- : rpc_broker_(rpc_broker), error_callback_(std::move(error_callback)) {}
+StreamProvider::StreamProvider(
+ ReceiverController* receiver_controller,
+ const scoped_refptr<base::SingleThreadTaskRunner>& media_task_runner)
+ : main_task_runner_(base::ThreadTaskRunnerHandle::Get()),
+ media_task_runner_(media_task_runner),
+ receiver_controller_(receiver_controller),
+ rpc_broker_(receiver_controller_->rpc_broker()) {
+ DCHECK(receiver_controller_);
+ DCHECK(rpc_broker_);
+
+ media_weak_this_ = media_weak_factory_.GetWeakPtr();
+
+ auto callback = BindToLoop(
+ media_task_runner_,
+ base::BindRepeating(&StreamProvider::OnReceivedRpc, media_weak_this_));
+ rpc_broker_->RegisterMessageReceiverCallback(RpcBroker::kAcquireDemuxerHandle,
+ callback);
+}
-StreamProvider::~StreamProvider() = default;
+StreamProvider::~StreamProvider() {
+ DCHECK(main_task_runner_->BelongsToCurrentThread());
+ rpc_broker_->UnregisterMessageReceiverCallback(
+ RpcBroker::kAcquireDemuxerHandle);
+}
-void StreamProvider::Initialize(int remote_audio_handle,
- int remote_video_handle,
- base::OnceClosure callback) {
- DVLOG(3) << __func__ << ": remote_audio_handle=" << remote_audio_handle
- << " remote_video_handle=" << remote_video_handle;
- if (!init_done_callback_.is_null()) {
- OnError("Duplicate initialization.");
- return;
- }
- if (remote_audio_handle == RpcBroker::kInvalidHandle &&
- remote_video_handle == RpcBroker::kInvalidHandle) {
- OnError("Invalid handle.");
- return;
+std::string StreamProvider::GetDisplayName() const {
+ return "media::remoting::StreamProvider";
+}
+
+void StreamProvider::Initialize(DemuxerHost* host,
+ PipelineStatusCallback status_cb) {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
+ init_done_callback_ = std::move(status_cb);
+ CompleteInitialize();
+}
+
+void StreamProvider::AbortPendingReads() {}
+
+void StreamProvider::StartWaitingForSeek(base::TimeDelta seek_time) {}
+
+void StreamProvider::CancelPendingSeek(base::TimeDelta seek_time) {}
+
+void StreamProvider::Seek(base::TimeDelta time,
+ PipelineStatusCallback seek_cb) {
+ media_task_runner_->PostTask(
+ FROM_HERE,
+ base::BindOnce(std::move(seek_cb), PipelineStatus::PIPELINE_OK));
+}
+
+void StreamProvider::Stop() {}
+
+base::TimeDelta StreamProvider::GetStartTime() const {
+ return base::TimeDelta();
+}
+
+base::Time StreamProvider::GetTimelineOffset() const {
+ return base::Time();
+}
+
+int64_t StreamProvider::GetMemoryUsage() const {
+ return 0;
+}
+
+base::Optional<container_names::MediaContainerName>
+StreamProvider::GetContainerForMetrics() const {
+ return base::Optional<container_names::MediaContainerName>();
+}
+
+void StreamProvider::OnEnabledAudioTracksChanged(
+ const std::vector<MediaTrack::Id>& track_ids,
+ base::TimeDelta curr_time,
+ TrackChangeCB change_completed_cb) {
+ std::vector<DemuxerStream*> streams;
+ std::move(change_completed_cb).Run(DemuxerStream::AUDIO, streams);
+ DVLOG(1) << "Track changes are not supported.";
+}
+
+void StreamProvider::OnSelectedVideoTrackChanged(
+ const std::vector<MediaTrack::Id>& track_ids,
+ base::TimeDelta curr_time,
+ TrackChangeCB change_completed_cb) {
+ std::vector<DemuxerStream*> streams;
+ std::move(change_completed_cb).Run(DemuxerStream::VIDEO, streams);
+ DVLOG(1) << "Track changes are not supported.";
+}
+
+void StreamProvider::Destroy() {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
+
+ if (init_done_callback_)
+ std::move(init_done_callback_).Run(PIPELINE_ERROR_ABORT);
+
+ // Invalid weak pointers to prevent |this| from receiving RPC calls on the
+ // media thread.
+ media_weak_factory_.InvalidateWeakPtrs();
+
+ audio_stream_.reset();
+ video_stream_.reset();
+
+ // After invalidating all weak ptrs of |media_weak_factory_|, StreamProvider
+ // won't be access anymore, so using |this| here is safe.
+ main_task_runner_->DeleteSoon(FROM_HERE, this);
+}
+
+void StreamProvider::OnReceivedRpc(std::unique_ptr<pb::RpcMessage> message) {
+ switch (message->proc()) {
+ case pb::RpcMessage::RPC_ACQUIRE_DEMUXER:
+ OnAcquireDemuxer(std::move(message));
+ break;
+ default:
+ VLOG(3) << __func__ << "Unknown RPC message.";
}
+}
- init_done_callback_ = std::move(callback);
- if (remote_audio_handle != RpcBroker::kInvalidHandle) {
- audio_stream_.reset(new MediaStream(
- rpc_broker_, DemuxerStream::AUDIO, remote_audio_handle,
- base::BindOnce(&StreamProvider::OnError, weak_factory_.GetWeakPtr(),
- "Media stream error")));
- audio_stream_->Initialize(base::BindOnce(
- &StreamProvider::AudioStreamInitialized, weak_factory_.GetWeakPtr()));
+void StreamProvider::OnAcquireDemuxer(std::unique_ptr<pb::RpcMessage> message) {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
+ DCHECK(message->has_acquire_demuxer_rpc());
+
+ int32_t audio_demuxer_handle =
+ message->acquire_demuxer_rpc().audio_demuxer_handle();
+ int32_t video_demuxer_handle =
+ message->acquire_demuxer_rpc().video_demuxer_handle();
+ has_audio_ = audio_demuxer_handle != RpcBroker::kInvalidHandle;
+ has_video_ = video_demuxer_handle != RpcBroker::kInvalidHandle;
+
+ DCHECK(has_audio_ || has_video_);
+
+ if (has_audio_) {
+ auto callback = BindToCurrentLoop(base::BindOnce(
+ &StreamProvider::OnAudioStreamCreated, media_weak_this_));
+ main_task_runner_->PostTask(
+ FROM_HERE, base::BindOnce(&MediaStream::CreateOnMainThread, rpc_broker_,
+ DemuxerStream::AUDIO, audio_demuxer_handle,
+ media_task_runner_, std::move(callback)));
}
- if (remote_video_handle != RpcBroker::kInvalidHandle) {
- video_stream_.reset(new MediaStream(
- rpc_broker_, DemuxerStream::VIDEO, remote_video_handle,
- base::BindOnce(&StreamProvider::OnError, weak_factory_.GetWeakPtr(),
- "Media stream error")));
- video_stream_->Initialize(base::BindOnce(
- &StreamProvider::VideoStreamInitialized, weak_factory_.GetWeakPtr()));
+
+ if (has_video_) {
+ auto callback = BindToCurrentLoop(base::BindOnce(
+ &StreamProvider::OnVideoStreamCreated, media_weak_this_));
+ main_task_runner_->PostTask(
+ FROM_HERE, base::BindOnce(&MediaStream::CreateOnMainThread, rpc_broker_,
+ DemuxerStream::VIDEO, video_demuxer_handle,
+ media_task_runner_, std::move(callback)));
}
}
-void StreamProvider::OnError(const std::string& error) {
- VLOG(1) << __func__ << ": " << error;
- if (error_callback_.is_null())
+void StreamProvider::OnAudioStreamCreated(MediaStream::UniquePtr stream) {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
+ audio_stream_ = std::move(stream);
+ audio_stream_->Initialize(base::BindOnce(
+ &StreamProvider::OnAudioStreamInitialized, media_weak_this_));
+ InitializeDataPipe();
+}
+
+void StreamProvider::OnVideoStreamCreated(MediaStream::UniquePtr stream) {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
+ video_stream_ = std::move(stream);
+ video_stream_->Initialize(base::BindOnce(
+ &StreamProvider::OnVideoStreamInitialized, media_weak_this_));
+ InitializeDataPipe();
+}
+
+void StreamProvider::InitializeDataPipe() {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
+
+ if ((has_audio_ && !audio_stream_) || (has_video_ && !video_stream_))
return;
- std::move(error_callback_).Run();
+
+ receiver_controller_->StartDataStreams(
+ has_audio_ ? audio_stream_->BindNewPipeAndPassRemote()
+ : mojo::NullRemote(),
+ has_video_ ? video_stream_->BindNewPipeAndPassRemote()
+ : mojo::NullRemote());
}
-void StreamProvider::AudioStreamInitialized() {
- DCHECK(!init_done_callback_.is_null());
+void StreamProvider::OnAudioStreamInitialized() {
audio_stream_initialized_ = true;
- if (video_stream_initialized_ || !video_stream_)
- std::move(init_done_callback_).Run();
+ CompleteInitialize();
}
-void StreamProvider::VideoStreamInitialized() {
- DCHECK(!init_done_callback_.is_null());
+void StreamProvider::OnVideoStreamInitialized() {
video_stream_initialized_ = true;
- if (audio_stream_initialized_ || !audio_stream_)
- std::move(init_done_callback_).Run();
+ CompleteInitialize();
+}
+
+void StreamProvider::CompleteInitialize() {
+ DCHECK(media_task_runner_->BelongsToCurrentThread());
+
+ // Haven't receive RpcAcquireRenderer message
+ if (!has_audio_ && !has_video_)
+ return;
+
+ if ((has_audio_ && !audio_stream_initialized_) ||
+ (has_video_ && !video_stream_initialized_) || !init_done_callback_)
+ return;
+
+ // |init_done_callback_| should be called on |media_task_runner_|.
+ std::move(init_done_callback_).Run(PipelineStatus::PIPELINE_OK);
}
std::vector<DemuxerStream*> StreamProvider::GetAllStreams() {
@@ -460,25 +632,14 @@ std::vector<DemuxerStream*> StreamProvider::GetAllStreams() {
return streams;
}
-void StreamProvider::AppendBuffer(DemuxerStream::Type type,
- scoped_refptr<DecoderBuffer> buffer) {
- if (type == DemuxerStream::AUDIO)
- audio_stream_->AppendBuffer(buffer);
- else if (type == DemuxerStream::VIDEO)
- video_stream_->AppendBuffer(buffer);
- else
- NOTREACHED() << ": Only supports video or audio stream.";
-}
+} // namespace remoting
+} // namespace media
+
+namespace std {
-void StreamProvider::FlushUntil(DemuxerStream::Type type, int count) {
- DVLOG(3) << __func__ << ": type=" << type << " count=" << count;
- if (type == DemuxerStream::AUDIO)
- audio_stream_->FlushUntil(count);
- else if (type == DemuxerStream::VIDEO)
- video_stream_->FlushUntil(count);
- else
- NOTREACHED() << ": Only supports video or audio stream.";
+void default_delete<media::remoting::StreamProvider>::operator()(
+ media::remoting::StreamProvider* ptr) const {
+ ptr->Destroy();
}
-} // namespace remoting
-} // namespace media
+} // namespace std