// Copyright 2017 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "media/remoting/stream_provider.h" #include "base/bind.h" #include "base/callback.h" #include "base/callback_helpers.h" #include "base/logging.h" #include "media/base/decoder_buffer.h" #include "media/base/video_rotation.h" #include "media/remoting/proto_enum_utils.h" #include "media/remoting/proto_utils.h" namespace media { namespace remoting { namespace { // The number of frames requested in each ReadUntil RPC message. constexpr int kNumFramesInEachReadUntil = 10; } // An implementation of media::DemuxerStream on Media Remoting receiver. // Receives data from mojo data pipe, and returns one frame or/and status when // Read() is called. class MediaStream final : public DemuxerStream { public: MediaStream(RpcBroker* rpc_broker, Type type, int remote_handle, const base::Closure& error_callback); ~MediaStream() override; // DemuxerStream implementation. void Read(const ReadCB& read_cb) override; AudioDecoderConfig audio_decoder_config() override; VideoDecoderConfig video_decoder_config() override; DemuxerStream::Type type() const override; Liveness liveness() const override; bool SupportsConfigChanges() override; VideoRotation video_rotation() override; void Initialize(const base::Closure& init_done_cb); void FlushUntil(int count); void AppendBuffer(scoped_refptr buffer); private: // RPC messages handlers. void OnReceivedRpc(std::unique_ptr message); void OnInitializeCallback(std::unique_ptr message); void OnReadUntilCallback(std::unique_ptr message); // Issues the ReadUntil RPC message when read is pending and buffer is empty. void SendReadUntil(); // Run and reset the read callback. void CompleteRead(DemuxerStream::Status status); // Update the audio/video decoder config When config changes in the mid // stream, the new config will be stored in // |next_audio/video_decoder_config_|. Old config will be droped when all // associated frames are consumed. void UpdateConfig(const pb::AudioDecoderConfig* audio_message, const pb::VideoDecoderConfig* video_message); // Called when any error occurs. void OnError(const std::string& error); RpcBroker* const rpc_broker_; // Outlives this class. const Type type_; const int remote_handle_; const int rpc_handle_; // Set when Initialize() is called, and will be run only once after // initialization is done. base::Closure init_done_callback_; // The read until count in the last ReadUntil RPC message. int last_read_until_count_ = 0; // Indicates whether Audio/VideoDecoderConfig changed and the frames with the // old config are not yet consumed. The new config is stored in the end of // |audio/video_decoder_config_|; bool config_changed_ = false; // Indicates whether a ReadUntil RPC message was sent without receiving the // ReadUntilCallback message yet. bool read_until_sent_ = false; // Set when Read() is called. Run only once when read completes. ReadCB read_complete_callback_; base::Closure error_callback_; // Called only once when first error occurs. std::deque> buffers_; // Current audio/video config. AudioDecoderConfig audio_decoder_config_; VideoDecoderConfig video_decoder_config_; // Stores the new auido/video config when config changes. AudioDecoderConfig next_audio_decoder_config_; VideoDecoderConfig next_video_decoder_config_; base::WeakPtrFactory weak_factory_; DISALLOW_COPY_AND_ASSIGN(MediaStream); }; MediaStream::MediaStream(RpcBroker* rpc_broker, Type type, int remote_handle, const base::Closure& error_callback) : rpc_broker_(rpc_broker), type_(type), remote_handle_(remote_handle), rpc_handle_(rpc_broker_->GetUniqueHandle()), error_callback_(error_callback), weak_factory_(this) { DCHECK(remote_handle_ != RpcBroker::kInvalidHandle); rpc_broker_->RegisterMessageReceiverCallback( rpc_handle_, base::Bind(&MediaStream::OnReceivedRpc, weak_factory_.GetWeakPtr())); } MediaStream::~MediaStream() { rpc_broker_->UnregisterMessageReceiverCallback(rpc_handle_); } void MediaStream::Initialize(const base::Closure& init_done_cb) { DCHECK(!init_done_cb.is_null()); if (!init_done_callback_.is_null()) { OnError("Duplicate initialization"); return; } init_done_callback_ = init_done_cb; DVLOG(3) << __func__ << "Issues RpcMessage::RPC_DS_INITIALIZE with " << "remote_handle=" << remote_handle_ << " rpc_handle=" << rpc_handle_; std::unique_ptr rpc(new pb::RpcMessage()); rpc->set_handle(remote_handle_); rpc->set_proc(pb::RpcMessage::RPC_DS_INITIALIZE); rpc->set_integer_value(rpc_handle_); rpc_broker_->SendMessageToRemote(std::move(rpc)); } void MediaStream::OnReceivedRpc(std::unique_ptr message) { DCHECK(message->handle() == rpc_handle_); switch (message->proc()) { case pb::RpcMessage::RPC_DS_INITIALIZE_CALLBACK: OnInitializeCallback(std::move(message)); break; case pb::RpcMessage::RPC_DS_READUNTIL_CALLBACK: OnReadUntilCallback(std::move(message)); break; default: VLOG(3) << __func__ << "Unknow RPC message."; } } void MediaStream::OnInitializeCallback( std::unique_ptr message) { DVLOG(3) << __func__ << "Receives RPC_DS_INITIALIZE_CALLBACK message."; const pb::DemuxerStreamInitializeCallback callback_message = message->demuxerstream_initializecb_rpc(); if (callback_message.type() != type_) { OnError("Wrong type"); return; } if ((type_ == DemuxerStream::AUDIO && audio_decoder_config_.IsValidConfig()) || (type_ == DemuxerStream::VIDEO && video_decoder_config_.IsValidConfig())) { OnError("Duplicate Iniitialize"); return; } if (init_done_callback_.is_null()) { OnError("Iniitialize callback missing"); return; } if (type_ == DemuxerStream::AUDIO && callback_message.has_audio_decoder_config()) { const pb::AudioDecoderConfig audio_message = callback_message.audio_decoder_config(); UpdateConfig(&audio_message, nullptr); } else if (type_ == DemuxerStream::VIDEO && callback_message.has_video_decoder_config()) { const pb::VideoDecoderConfig video_message = callback_message.video_decoder_config(); UpdateConfig(nullptr, &video_message); } else { OnError("config missing"); return; } base::ResetAndReturn(&init_done_callback_).Run(); } void MediaStream::OnReadUntilCallback(std::unique_ptr message) { DVLOG(3) << __func__ << ": Receives RPC_DS_READUNTIL_CALLBACK message."; if (!read_until_sent_) { OnError("Unexpected ReadUntilCallback"); return; } read_until_sent_ = false; const pb::DemuxerStreamReadUntilCallback callback_message = message->demuxerstream_readuntilcb_rpc(); last_read_until_count_ = callback_message.count(); if (ToDemuxerStreamStatus(callback_message.status()) == kConfigChanged) { config_changed_ = true; if (callback_message.has_audio_decoder_config()) { const pb::AudioDecoderConfig audio_message = callback_message.audio_decoder_config(); UpdateConfig(&audio_message, nullptr); } if (callback_message.has_video_decoder_config()) { const pb::VideoDecoderConfig video_message = callback_message.video_decoder_config(); UpdateConfig(nullptr, &video_message); } if (buffers_.empty() && !read_complete_callback_.is_null()) CompleteRead(DemuxerStream::kConfigChanged); return; } if (buffers_.empty() && !read_complete_callback_.is_null()) SendReadUntil(); } void MediaStream::UpdateConfig(const pb::AudioDecoderConfig* audio_message, const pb::VideoDecoderConfig* video_message) { if (type_ == AUDIO) { DCHECK(audio_message && !video_message); AudioDecoderConfig audio_config; ConvertProtoToAudioDecoderConfig(*audio_message, &audio_config); if (!audio_config.IsValidConfig()) { OnError("Invalid audio config"); return; } if (config_changed_) { DCHECK(audio_decoder_config_.IsValidConfig()); DCHECK(!next_audio_decoder_config_.IsValidConfig()); next_audio_decoder_config_ = audio_config; } else { DCHECK(!audio_decoder_config_.IsValidConfig()); audio_decoder_config_ = audio_config; } } else if (type_ == VIDEO) { DCHECK(video_message && !audio_message); VideoDecoderConfig video_config; ConvertProtoToVideoDecoderConfig(*video_message, &video_config); if (!video_config.IsValidConfig()) { OnError("Invalid video config"); return; } if (config_changed_) { DCHECK(video_decoder_config_.IsValidConfig()); DCHECK(!next_video_decoder_config_.IsValidConfig()); next_video_decoder_config_ = video_config; } else { DCHECK(!video_decoder_config_.IsValidConfig()); video_decoder_config_ = video_config; } } else { NOTREACHED() << ": Only supports video or audio stream."; } } void MediaStream::SendReadUntil() { if (read_until_sent_) return; DVLOG(3) << "Issues RPC_DS_READUNTIL RPC message to remote_handle_=" << remote_handle_ << " with callback handle=" << rpc_handle_ << " count=" << last_read_until_count_; std::unique_ptr rpc(new pb::RpcMessage()); rpc->set_handle(remote_handle_); rpc->set_proc(pb::RpcMessage::RPC_DS_READUNTIL); auto* message = rpc->mutable_demuxerstream_readuntil_rpc(); last_read_until_count_ += kNumFramesInEachReadUntil; message->set_count(last_read_until_count_); message->set_callback_handle(rpc_handle_); rpc_broker_->SendMessageToRemote(std::move(rpc)); read_until_sent_ = true; } void MediaStream::FlushUntil(int count) { while (!buffers_.empty()) { buffers_.pop_front(); } last_read_until_count_ = count; if (!read_complete_callback_.is_null()) CompleteRead(DemuxerStream::kAborted); read_until_sent_ = false; } void MediaStream::Read(const ReadCB& read_cb) { DCHECK(read_complete_callback_.is_null()); DCHECK(!read_cb.is_null()); read_complete_callback_ = read_cb; if (buffers_.empty() && config_changed_) { CompleteRead(DemuxerStream::kConfigChanged); return; } // Wait for more data. if (buffers_.empty()) { SendReadUntil(); return; } CompleteRead(DemuxerStream::kOk); } void MediaStream::CompleteRead(DemuxerStream::Status status) { DVLOG(3) << __func__ << ": " << status; switch (status) { case DemuxerStream::kConfigChanged: if (type_ == AUDIO) { DCHECK(next_audio_decoder_config_.IsValidConfig()); audio_decoder_config_ = next_audio_decoder_config_; #if DCHECK_IS_ON() next_audio_decoder_config_ = AudioDecoderConfig(); #endif // DCHECK_IS_ON() } else { DCHECK(next_video_decoder_config_.IsValidConfig()); video_decoder_config_ = next_video_decoder_config_; #if DCHECK_IS_ON() next_video_decoder_config_ = VideoDecoderConfig(); #endif // DCHECK_IS_ON() } config_changed_ = false; base::ResetAndReturn(&read_complete_callback_).Run(status, nullptr); return; case DemuxerStream::kAborted: base::ResetAndReturn(&read_complete_callback_).Run(status, nullptr); return; case DemuxerStream::kOk: DCHECK(!buffers_.empty()); scoped_refptr frame_data = buffers_.front(); buffers_.pop_front(); base::ResetAndReturn(&read_complete_callback_).Run(status, frame_data); return; } } AudioDecoderConfig MediaStream::audio_decoder_config() { DVLOG(3) << __func__; DCHECK(type_ == DemuxerStream::AUDIO); return audio_decoder_config_; } VideoDecoderConfig MediaStream::video_decoder_config() { DVLOG(3) << __func__; DCHECK(type_ == DemuxerStream::VIDEO); return video_decoder_config_; } DemuxerStream::Type MediaStream::type() const { return type_; } DemuxerStream::Liveness MediaStream::liveness() const { return DemuxerStream::LIVENESS_LIVE; } bool MediaStream::SupportsConfigChanges() { return true; } VideoRotation MediaStream::video_rotation() { return VideoRotation::VIDEO_ROTATION_0; } void MediaStream::AppendBuffer(scoped_refptr buffer) { DVLOG(3) << __func__; buffers_.push_back(buffer); if (!read_complete_callback_.is_null()) CompleteRead(DemuxerStream::kOk); } void MediaStream::OnError(const std::string& error) { VLOG(1) << __func__ << ": " << error; if (error_callback_.is_null()) return; base::ResetAndReturn(&error_callback_).Run(); } StreamProvider::StreamProvider(RpcBroker* rpc_broker, const base::Closure& error_callback) : rpc_broker_(rpc_broker), error_callback_(error_callback), weak_factory_(this) {} StreamProvider::~StreamProvider() {} void StreamProvider::Initialize(int remote_audio_handle, int remote_video_handle, const base::Closure& callback) { DVLOG(3) << __func__ << ": remote_audio_handle=" << remote_audio_handle << " remote_video_handle=" << remote_video_handle; if (!init_done_callback_.is_null()) { OnError("Duplicate initialization."); return; } if (remote_audio_handle == RpcBroker::kInvalidHandle && remote_video_handle == RpcBroker::kInvalidHandle) { OnError("Invalid handle."); return; } init_done_callback_ = callback; if (remote_audio_handle != RpcBroker::kInvalidHandle) { audio_stream_.reset(new MediaStream( rpc_broker_, DemuxerStream::AUDIO, remote_audio_handle, base::Bind(&StreamProvider::OnError, weak_factory_.GetWeakPtr(), "Media stream error"))); audio_stream_->Initialize(base::Bind( &StreamProvider::AudioStreamInitialized, weak_factory_.GetWeakPtr())); } if (remote_video_handle != RpcBroker::kInvalidHandle) { video_stream_.reset(new MediaStream( rpc_broker_, DemuxerStream::VIDEO, remote_video_handle, base::Bind(&StreamProvider::OnError, weak_factory_.GetWeakPtr(), "Media stream error"))); video_stream_->Initialize(base::Bind( &StreamProvider::VideoStreamInitialized, weak_factory_.GetWeakPtr())); } } void StreamProvider::OnError(const std::string& error) { VLOG(1) << __func__ << ": " << error; if (error_callback_.is_null()) return; base::ResetAndReturn(&error_callback_).Run(); } void StreamProvider::AudioStreamInitialized() { DCHECK(!init_done_callback_.is_null()); audio_stream_initialized_ = true; if (video_stream_initialized_ || !video_stream_) base::ResetAndReturn(&init_done_callback_).Run(); } void StreamProvider::VideoStreamInitialized() { DCHECK(!init_done_callback_.is_null()); video_stream_initialized_ = true; if (audio_stream_initialized_ || !audio_stream_) base::ResetAndReturn(&init_done_callback_).Run(); } std::vector StreamProvider::GetAllStreams() { std::vector streams; if (audio_stream_) streams.push_back(audio_stream_.get()); if (video_stream_) streams.push_back(video_stream_.get()); return streams; } void StreamProvider::AppendBuffer(DemuxerStream::Type type, scoped_refptr buffer) { if (type == DemuxerStream::AUDIO) audio_stream_->AppendBuffer(buffer); else if (type == DemuxerStream::VIDEO) video_stream_->AppendBuffer(buffer); else NOTREACHED() << ": Only supports video or audio stream."; } void StreamProvider::FlushUntil(DemuxerStream::Type type, int count) { DVLOG(3) << __func__ << ": type=" << type << " count=" << count; if (type == DemuxerStream::AUDIO) audio_stream_->FlushUntil(count); else if (type == DemuxerStream::VIDEO) video_stream_->FlushUntil(count); else NOTREACHED() << ": Only supports video or audio stream."; } } // namespace remoting } // namespace media