1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
|
// Copyright 2017 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MEDIA_REMOTING_STREAM_PROVIDER_H_
#define MEDIA_REMOTING_STREAM_PROVIDER_H_
#include "base/callback_forward.h"
#include "base/containers/circular_deque.h"
#include "base/memory/scoped_refptr.h"
#include "base/memory/weak_ptr.h"
#include "base/sequenced_task_runner_helpers.h"
#include "base/single_thread_task_runner.h"
#include "media/base/audio_decoder_config.h"
#include "media/base/demuxer.h"
#include "media/base/demuxer_stream.h"
#include "media/base/video_decoder_config.h"
#include "media/mojo/mojom/remoting.mojom.h"
#include "media/remoting/media_remoting_rpc.pb.h"
#include "mojo/public/cpp/bindings/receiver.h"
#include "mojo/public/cpp/bindings/remote.h"
namespace base {
class SingleThreadTaskRunner;
} // namespace base
namespace media {
class MojoDecoderBufferReader;
namespace remoting {
class ReceiverController;
class RpcBroker;
// The media stream provider for Media Remoting receiver.
class StreamProvider final : public Demuxer {
public:
StreamProvider(
ReceiverController* receiver_controller,
const scoped_refptr<base::SingleThreadTaskRunner>& media_task_runner);
// Demuxer implementation.
std::vector<DemuxerStream*> GetAllStreams() override;
std::string GetDisplayName() const override;
void Initialize(DemuxerHost* host, PipelineStatusCallback status_cb) override;
void AbortPendingReads() override;
void StartWaitingForSeek(base::TimeDelta seek_time) override;
void CancelPendingSeek(base::TimeDelta seek_time) override;
void Seek(base::TimeDelta time, PipelineStatusCallback status_cb) override;
void Stop() override;
base::TimeDelta GetStartTime() const override;
base::Time GetTimelineOffset() const override;
int64_t GetMemoryUsage() const override;
base::Optional<container_names::MediaContainerName> GetContainerForMetrics()
const override;
void OnEnabledAudioTracksChanged(const std::vector<MediaTrack::Id>& track_ids,
base::TimeDelta curr_time,
TrackChangeCB change_completed_cb) override;
void OnSelectedVideoTrackChanged(const std::vector<MediaTrack::Id>& track_ids,
base::TimeDelta curr_time,
TrackChangeCB change_completed_cb) override;
protected:
// Deletion is only allowed via Destroy().
~StreamProvider() override;
private:
// An implementation of media::DemuxerStream on Media Remoting receiver.
// Receives data from mojo data pipe, and returns one frame or/and status when
// Read() is called.
class MediaStream final : public DemuxerStream,
public mojom::RemotingDataStreamReceiver {
public:
using UniquePtr =
std::unique_ptr<MediaStream, std::function<void(MediaStream*)>>;
// MediaStream should be created on the main thread to be able to get unique
// handle ID from |rpc_broker_|.
static void CreateOnMainThread(
RpcBroker* rpc_broker,
Type type,
int32_t handle,
const scoped_refptr<base::SingleThreadTaskRunner>& media_task_runner,
base::OnceCallback<void(MediaStream::UniquePtr)> callback);
// In order to destroy members in the right thread, MediaStream has to use
// DestructionHelper() to destroy itself.
static void DestructionHelper(MediaStream* stream);
MediaStream(
RpcBroker* rpc_broker,
Type type,
int32_t remote_handle,
const scoped_refptr<base::SingleThreadTaskRunner>& media_task_runner);
// DemuxerStream implementation.
void Read(ReadCB read_cb) override;
bool IsReadPending() const override;
AudioDecoderConfig audio_decoder_config() override;
VideoDecoderConfig video_decoder_config() override;
DemuxerStream::Type type() const override;
Liveness liveness() const override;
bool SupportsConfigChanges() override;
void Initialize(base::OnceClosure init_done_cb);
mojo::PendingRemote<mojom::RemotingDataStreamReceiver>
BindNewPipeAndPassRemote() {
return receiver_.BindNewPipeAndPassRemote();
}
private:
friend class base::DeleteHelper<MediaStream>; // For using DeleteSoon().
// For testing.
friend class StreamProviderTest;
// Prevent from unique_ptr using ~MediaStream() to destroy MediaStream
// instances. Use DestructionHelper() as the custom deleter with unique_ptr
// to destroy MediaStream instances.
~MediaStream() override;
void Destroy();
// Send RPC message on |main_task_runner_|.
void SendRpcMessageOnMainThread(std::unique_ptr<pb::RpcMessage> message);
// mojom::RemotingDataStreamReceiver implementation.
void InitializeDataPipe(
mojo::ScopedDataPipeConsumerHandle data_pipe) override;
void ReceiveFrame(uint32_t count, mojom::DecoderBufferPtr buffer) override;
void FlushUntil(uint32_t count) override;
// RPC messages handlers.
void OnReceivedRpc(std::unique_ptr<pb::RpcMessage> message);
void OnInitializeCallback(std::unique_ptr<pb::RpcMessage> message);
void OnReadUntilCallback(std::unique_ptr<pb::RpcMessage> message);
// Issues the ReadUntil RPC message when read is pending and buffer is
// empty.
void SendReadUntil();
// Run |init_done_callback_| when MojoDecoderBufferReader is created and
// received RPC_DS_INITIALIZE_CALLBACK
void CompleteInitialize();
// Append a frame into |buffers_|.
void AppendBuffer(uint32_t count, scoped_refptr<DecoderBuffer> buffer);
// Run and reset the read callback.
void CompleteRead(DemuxerStream::Status status);
// Update the audio/video decoder config. When config changes in the mid
// stream, the new config will be stored in |next_audio_decoder_config_|.
// Old config will be dropped when all associated frames are consumed.
void UpdateAudioConfig(const pb::AudioDecoderConfig& audio_message);
void UpdateVideoConfig(const pb::VideoDecoderConfig& video_message);
// Called when any error occurs.
void OnError(const std::string& error);
scoped_refptr<base::SingleThreadTaskRunner> main_task_runner_;
scoped_refptr<base::SingleThreadTaskRunner> media_task_runner_;
RpcBroker* const rpc_broker_; // Outlives this class.
const Type type_;
const int remote_handle_;
const int rpc_handle_;
// Set when Initialize() is called.
base::OnceClosure init_done_callback_;
// The frame count of the frame to be returned on the next Read call. It
// will be increased whenever a frame is read. It will be updated when
// FlushUntil() is called.
uint32_t current_frame_count_ = 0;
// One plus the last frame count received over RTP. Used for continuity
// check.
uint32_t buffered_frame_count_ = 0;
// The total number of frames received from the sender side. It will be used
// as the base value for sending ReadUntil() to request more frames and be
// updated in OnReadUntilCallback() which would get the message that
// contains how many frames are sent.
uint32_t total_received_frame_count_ = 0;
// Indicates whether Audio/VideoDecoderConfig changed and the frames with
// the old config are not yet consumed. The new config is stored in the end
// of |audio/video_decoder_config_|.
bool config_changed_ = false;
// Indicates whether a ReadUntil RPC message was sent without receiving the
// ReadUntilCallback message yet.
bool read_until_sent_ = false;
// Indicates whether RPC_DS_INITIALIZE_CALLBACK received.
bool rpc_initialized_ = false;
// Set when Read() is called. Run only once when read completes.
ReadCB read_complete_callback_;
// The frame data would be sent via Mojo IPC as MojoDecoderBuffer. When a
// frame is sent to |this| from host by calling ReceiveFrame(),
// |decoder_buffer_reader_| is used to read the frame date from data pipe.
std::unique_ptr<MojoDecoderBufferReader> decoder_buffer_reader_;
base::circular_deque<scoped_refptr<DecoderBuffer>> buffers_;
// Current audio/video config.
AudioDecoderConfig audio_decoder_config_;
VideoDecoderConfig video_decoder_config_;
// Stores the new audio/video config when config changes.
AudioDecoderConfig next_audio_decoder_config_;
VideoDecoderConfig next_video_decoder_config_;
mojo::Receiver<mojom::RemotingDataStreamReceiver> receiver_{this};
base::WeakPtr<MediaStream> media_weak_this_;
base::WeakPtrFactory<MediaStream> media_weak_factory_{this};
};
friend std::default_delete<StreamProvider>;
friend class base::DeleteHelper<StreamProvider>; // For using DeleteSoon().
// For testing.
friend class StreamProviderTest;
void Destroy();
// RPC messages handlers.
void OnReceivedRpc(std::unique_ptr<pb::RpcMessage> message);
void OnAcquireDemuxer(std::unique_ptr<pb::RpcMessage> message);
// Called when audio/video stream is created and initialized.
void InitializeDataPipe();
void OnAudioStreamCreated(MediaStream::UniquePtr stream);
void OnVideoStreamCreated(MediaStream::UniquePtr stream);
void OnAudioStreamInitialized();
void OnVideoStreamInitialized();
void CompleteInitialize();
scoped_refptr<base::SingleThreadTaskRunner> main_task_runner_;
scoped_refptr<base::SingleThreadTaskRunner> media_task_runner_;
ReceiverController* const receiver_controller_; // Outlives this class
RpcBroker* const rpc_broker_; // Outlives this class
MediaStream::UniquePtr audio_stream_;
MediaStream::UniquePtr video_stream_;
bool has_audio_{false};
bool has_video_{false};
bool audio_stream_initialized_{false};
bool video_stream_initialized_{false};
// Set when Initialize() is called, and will run when both video and audio
// streams are initialized or error occurs.
PipelineStatusCallback init_done_callback_;
base::WeakPtr<StreamProvider> media_weak_this_;
base::WeakPtrFactory<StreamProvider> media_weak_factory_{this};
};
} // namespace remoting
} // namespace media
namespace std {
// Specialize std::default_delete to call Destroy().
template <>
struct default_delete<media::remoting::StreamProvider> {
constexpr default_delete() = default;
template <typename U,
typename = typename std::enable_if<std::is_convertible<
U*,
media::remoting::StreamProvider*>::value>::type>
explicit default_delete(const default_delete<U>& d) {}
void operator()(media::remoting::StreamProvider* ptr) const;
};
} // namespace std
#endif // MEDIA_REMOTING_STREAM_PROVIDER_H_
|