summaryrefslogtreecommitdiff
path: root/chromium/third_party/webrtc/call/call.cc
diff options
context:
space:
mode:
authorAllan Sandfeld Jensen <allan.jensen@qt.io>2020-10-12 14:27:29 +0200
committerAllan Sandfeld Jensen <allan.jensen@qt.io>2020-10-13 09:35:20 +0000
commitc30a6232df03e1efbd9f3b226777b07e087a1122 (patch)
treee992f45784689f373bcc38d1b79a239ebe17ee23 /chromium/third_party/webrtc/call/call.cc
parent7b5b123ac58f58ffde0f4f6e488bcd09aa4decd3 (diff)
downloadqtwebengine-chromium-85-based.tar.gz
BASELINE: Update Chromium to 85.0.4183.14085-based
Change-Id: Iaa42f4680837c57725b1344f108c0196741f6057 Reviewed-by: Allan Sandfeld Jensen <allan.jensen@qt.io>
Diffstat (limited to 'chromium/third_party/webrtc/call/call.cc')
-rw-r--r--chromium/third_party/webrtc/call/call.cc665
1 files changed, 377 insertions, 288 deletions
diff --git a/chromium/third_party/webrtc/call/call.cc b/chromium/third_party/webrtc/call/call.cc
index 4068db9f007..0ef2a3a4bc0 100644
--- a/chromium/third_party/webrtc/call/call.cc
+++ b/chromium/third_party/webrtc/call/call.cc
@@ -25,6 +25,7 @@
#include "audio/audio_receive_stream.h"
#include "audio/audio_send_stream.h"
#include "audio/audio_state.h"
+#include "call/adaptation/broadcast_resource_listener.h"
#include "call/bitrate_allocator.h"
#include "call/flexfec_receive_stream_impl.h"
#include "call/receive_time_calculator.h"
@@ -49,8 +50,8 @@
#include "rtc_base/location.h"
#include "rtc_base/logging.h"
#include "rtc_base/strings/string_builder.h"
-#include "rtc_base/synchronization/rw_lock_wrapper.h"
#include "rtc_base/synchronization/sequence_checker.h"
+#include "rtc_base/task_utils/pending_task_safety_flag.h"
#include "rtc_base/thread_annotations.h"
#include "rtc_base/time_utils.h"
#include "rtc_base/trace_event.h"
@@ -168,6 +169,47 @@ TaskQueueBase* GetCurrentTaskQueueOrThread() {
namespace internal {
+// Wraps an injected resource in a BroadcastResourceListener and handles adding
+// and removing adapter resources to individual VideoSendStreams.
+class ResourceVideoSendStreamForwarder {
+ public:
+ ResourceVideoSendStreamForwarder(
+ rtc::scoped_refptr<webrtc::Resource> resource)
+ : broadcast_resource_listener_(resource) {
+ broadcast_resource_listener_.StartListening();
+ }
+ ~ResourceVideoSendStreamForwarder() {
+ RTC_DCHECK(adapter_resources_.empty());
+ broadcast_resource_listener_.StopListening();
+ }
+
+ rtc::scoped_refptr<webrtc::Resource> Resource() const {
+ return broadcast_resource_listener_.SourceResource();
+ }
+
+ void OnCreateVideoSendStream(VideoSendStream* video_send_stream) {
+ RTC_DCHECK(adapter_resources_.find(video_send_stream) ==
+ adapter_resources_.end());
+ auto adapter_resource =
+ broadcast_resource_listener_.CreateAdapterResource();
+ video_send_stream->AddAdaptationResource(adapter_resource);
+ adapter_resources_.insert(
+ std::make_pair(video_send_stream, adapter_resource));
+ }
+
+ void OnDestroyVideoSendStream(VideoSendStream* video_send_stream) {
+ auto it = adapter_resources_.find(video_send_stream);
+ RTC_DCHECK(it != adapter_resources_.end());
+ broadcast_resource_listener_.RemoveAdapterResource(it->second);
+ adapter_resources_.erase(it);
+ }
+
+ private:
+ BroadcastResourceListener broadcast_resource_listener_;
+ std::map<VideoSendStream*, rtc::scoped_refptr<webrtc::Resource>>
+ adapter_resources_;
+};
+
class Call final : public webrtc::Call,
public PacketReceiver,
public RecoveredPacketReceiver,
@@ -177,7 +219,7 @@ class Call final : public webrtc::Call,
Call(Clock* clock,
const Call::Config& config,
std::unique_ptr<RtpTransportControllerSendInterface> transport_send,
- std::unique_ptr<ProcessThread> module_process_thread,
+ rtc::scoped_refptr<SharedModuleThread> module_process_thread,
TaskQueueFactory* task_queue_factory);
~Call() override;
@@ -212,6 +254,8 @@ class Call final : public webrtc::Call,
void DestroyFlexfecReceiveStream(
FlexfecReceiveStream* receive_stream) override;
+ void AddAdaptationResource(rtc::scoped_refptr<Resource> resource) override;
+
RtpTransportControllerSendInterface* GetTransportControllerSend() override;
Stats GetStats() const override;
@@ -243,54 +287,54 @@ class Call final : public webrtc::Call,
private:
DeliveryStatus DeliverRtcp(MediaType media_type,
const uint8_t* packet,
- size_t length);
+ size_t length)
+ RTC_EXCLUSIVE_LOCKS_REQUIRED(worker_thread_);
DeliveryStatus DeliverRtp(MediaType media_type,
rtc::CopyOnWriteBuffer packet,
- int64_t packet_time_us);
+ int64_t packet_time_us)
+ RTC_EXCLUSIVE_LOCKS_REQUIRED(worker_thread_);
void ConfigureSync(const std::string& sync_group)
- RTC_EXCLUSIVE_LOCKS_REQUIRED(receive_crit_);
+ RTC_EXCLUSIVE_LOCKS_REQUIRED(worker_thread_);
void NotifyBweOfReceivedPacket(const RtpPacketReceived& packet,
MediaType media_type)
- RTC_SHARED_LOCKS_REQUIRED(receive_crit_);
+ RTC_SHARED_LOCKS_REQUIRED(worker_thread_);
void UpdateSendHistograms(Timestamp first_sent_packet)
- RTC_EXCLUSIVE_LOCKS_REQUIRED(&bitrate_crit_);
+ RTC_EXCLUSIVE_LOCKS_REQUIRED(worker_thread_);
void UpdateReceiveHistograms();
void UpdateHistograms();
void UpdateAggregateNetworkState();
void RegisterRateObserver();
- rtc::TaskQueue* network_queue() const {
+ rtc::TaskQueue* send_transport_queue() const {
return transport_send_ptr_->GetWorkerQueue();
}
Clock* const clock_;
TaskQueueFactory* const task_queue_factory_;
+ TaskQueueBase* const worker_thread_;
const int num_cpu_cores_;
- const std::unique_ptr<ProcessThread> module_process_thread_;
+ const rtc::scoped_refptr<SharedModuleThread> module_process_thread_;
const std::unique_ptr<CallStats> call_stats_;
const std::unique_ptr<BitrateAllocator> bitrate_allocator_;
Call::Config config_;
- SequenceChecker configuration_sequence_checker_;
- SequenceChecker worker_sequence_checker_;
NetworkState audio_network_state_;
NetworkState video_network_state_;
- bool aggregate_network_up_ RTC_GUARDED_BY(configuration_sequence_checker_);
+ bool aggregate_network_up_ RTC_GUARDED_BY(worker_thread_);
- std::unique_ptr<RWLockWrapper> receive_crit_;
// Audio, Video, and FlexFEC receive streams are owned by the client that
// creates them.
std::set<AudioReceiveStream*> audio_receive_streams_
- RTC_GUARDED_BY(receive_crit_);
+ RTC_GUARDED_BY(worker_thread_);
std::set<VideoReceiveStream2*> video_receive_streams_
- RTC_GUARDED_BY(receive_crit_);
+ RTC_GUARDED_BY(worker_thread_);
std::map<std::string, AudioReceiveStream*> sync_stream_mapping_
- RTC_GUARDED_BY(receive_crit_);
+ RTC_GUARDED_BY(worker_thread_);
// TODO(nisse): Should eventually be injected at creation,
// with a single object in the bundled case.
@@ -324,25 +368,26 @@ class Call final : public webrtc::Call,
const bool use_send_side_bwe;
};
std::map<uint32_t, ReceiveRtpConfig> receive_rtp_config_
- RTC_GUARDED_BY(receive_crit_);
+ RTC_GUARDED_BY(worker_thread_);
- std::unique_ptr<RWLockWrapper> send_crit_;
// Audio and Video send streams are owned by the client that creates them.
std::map<uint32_t, AudioSendStream*> audio_send_ssrcs_
- RTC_GUARDED_BY(send_crit_);
+ RTC_GUARDED_BY(worker_thread_);
std::map<uint32_t, VideoSendStream*> video_send_ssrcs_
- RTC_GUARDED_BY(send_crit_);
- std::set<VideoSendStream*> video_send_streams_ RTC_GUARDED_BY(send_crit_);
+ RTC_GUARDED_BY(worker_thread_);
+ std::set<VideoSendStream*> video_send_streams_ RTC_GUARDED_BY(worker_thread_);
+
+ // Each forwarder wraps an adaptation resource that was added to the call.
+ std::vector<std::unique_ptr<ResourceVideoSendStreamForwarder>>
+ adaptation_resource_forwarders_ RTC_GUARDED_BY(worker_thread_);
using RtpStateMap = std::map<uint32_t, RtpState>;
- RtpStateMap suspended_audio_send_ssrcs_
- RTC_GUARDED_BY(configuration_sequence_checker_);
- RtpStateMap suspended_video_send_ssrcs_
- RTC_GUARDED_BY(configuration_sequence_checker_);
+ RtpStateMap suspended_audio_send_ssrcs_ RTC_GUARDED_BY(worker_thread_);
+ RtpStateMap suspended_video_send_ssrcs_ RTC_GUARDED_BY(worker_thread_);
using RtpPayloadStateMap = std::map<uint32_t, RtpPayloadState>;
RtpPayloadStateMap suspended_video_payload_states_
- RTC_GUARDED_BY(configuration_sequence_checker_);
+ RTC_GUARDED_BY(worker_thread_);
webrtc::RtcEventLog* event_log_;
@@ -358,17 +403,14 @@ class Call final : public webrtc::Call,
absl::optional<int64_t> first_received_rtp_video_ms_;
absl::optional<int64_t> last_received_rtp_video_ms_;
- rtc::CriticalSection last_bandwidth_bps_crit_;
- uint32_t last_bandwidth_bps_ RTC_GUARDED_BY(&last_bandwidth_bps_crit_);
+ uint32_t last_bandwidth_bps_ RTC_GUARDED_BY(worker_thread_);
// TODO(holmer): Remove this lock once BitrateController no longer calls
// OnNetworkChanged from multiple threads.
- rtc::CriticalSection bitrate_crit_;
- uint32_t min_allocated_send_bitrate_bps_
- RTC_GUARDED_BY(&worker_sequence_checker_);
- uint32_t configured_max_padding_bitrate_bps_ RTC_GUARDED_BY(&bitrate_crit_);
+ uint32_t min_allocated_send_bitrate_bps_ RTC_GUARDED_BY(worker_thread_);
+ uint32_t configured_max_padding_bitrate_bps_ RTC_GUARDED_BY(worker_thread_);
AvgCounter estimated_send_bitrate_kbps_counter_
- RTC_GUARDED_BY(&bitrate_crit_);
- AvgCounter pacer_bitrate_kbps_counter_ RTC_GUARDED_BY(&bitrate_crit_);
+ RTC_GUARDED_BY(worker_thread_);
+ AvgCounter pacer_bitrate_kbps_counter_ RTC_GUARDED_BY(worker_thread_);
ReceiveSideCongestionController receive_side_cc_;
@@ -377,6 +419,11 @@ class Call final : public webrtc::Call,
const std::unique_ptr<SendDelayStats> video_send_delay_stats_;
const int64_t start_ms_;
+ // Note that |task_safety_| needs to be at a greater scope than the task queue
+ // owned by |transport_send_| since calls might arrive on the network thread
+ // while Call is being deleted and the task queue is being torn down.
+ ScopedTaskSafety task_safety_;
+
// Caches transport_send_.get(), to avoid racing with destructor.
// Note that this is declared before transport_send_ to ensure that it is not
// invalidated until no more tasks can be running on the transport_send_ task
@@ -386,8 +433,8 @@ class Call final : public webrtc::Call,
// last ensures that it is destroyed first and any running tasks are finished.
std::unique_ptr<RtpTransportControllerSendInterface> transport_send_;
- bool is_target_rate_observer_registered_
- RTC_GUARDED_BY(&configuration_sequence_checker_) = false;
+ bool is_target_rate_observer_registered_ RTC_GUARDED_BY(worker_thread_) =
+ false;
RTC_DISALLOW_COPY_AND_ASSIGN(Call);
};
@@ -407,14 +454,20 @@ std::string Call::Stats::ToString(int64_t time_ms) const {
}
Call* Call::Create(const Call::Config& config) {
- return Create(config, Clock::GetRealTimeClock(),
- ProcessThread::Create("ModuleProcessThread"),
+ rtc::scoped_refptr<SharedModuleThread> call_thread =
+ SharedModuleThread::Create("ModuleProcessThread", nullptr);
+ return Create(config, std::move(call_thread));
+}
+
+Call* Call::Create(const Call::Config& config,
+ rtc::scoped_refptr<SharedModuleThread> call_thread) {
+ return Create(config, Clock::GetRealTimeClock(), std::move(call_thread),
ProcessThread::Create("PacerThread"));
}
Call* Call::Create(const Call::Config& config,
Clock* clock,
- std::unique_ptr<ProcessThread> call_thread,
+ rtc::scoped_refptr<SharedModuleThread> call_thread,
std::unique_ptr<ProcessThread> pacer_thread) {
RTC_DCHECK(config.task_queue_factory);
return new internal::Call(
@@ -426,6 +479,104 @@ Call* Call::Create(const Call::Config& config,
std::move(call_thread), config.task_queue_factory);
}
+class SharedModuleThread::Impl {
+ public:
+ Impl(std::unique_ptr<ProcessThread> process_thread,
+ std::function<void()> on_one_ref_remaining)
+ : module_thread_(std::move(process_thread)),
+ on_one_ref_remaining_(std::move(on_one_ref_remaining)) {}
+
+ void EnsureStarted() {
+ RTC_DCHECK_RUN_ON(&sequence_checker_);
+ if (started_)
+ return;
+ started_ = true;
+ module_thread_->Start();
+ }
+
+ ProcessThread* process_thread() {
+ RTC_DCHECK_RUN_ON(&sequence_checker_);
+ return module_thread_.get();
+ }
+
+ void AddRef() const {
+ RTC_DCHECK_RUN_ON(&sequence_checker_);
+ ++ref_count_;
+ }
+
+ rtc::RefCountReleaseStatus Release() const {
+ RTC_DCHECK_RUN_ON(&sequence_checker_);
+ --ref_count_;
+
+ if (ref_count_ == 0) {
+ module_thread_->Stop();
+ return rtc::RefCountReleaseStatus::kDroppedLastRef;
+ }
+
+ if (ref_count_ == 1 && on_one_ref_remaining_) {
+ auto moved_fn = std::move(on_one_ref_remaining_);
+ // NOTE: after this function returns, chances are that |this| has been
+ // deleted - do not touch any member variables.
+ // If the owner of the last reference implements a lambda that releases
+ // that last reference inside of the callback (which is legal according
+ // to this implementation), we will recursively enter Release() above,
+ // call Stop() and release the last reference.
+ moved_fn();
+ }
+
+ return rtc::RefCountReleaseStatus::kOtherRefsRemained;
+ }
+
+ private:
+ SequenceChecker sequence_checker_;
+ mutable int ref_count_ RTC_GUARDED_BY(sequence_checker_) = 0;
+ std::unique_ptr<ProcessThread> const module_thread_;
+ std::function<void()> const on_one_ref_remaining_;
+ bool started_ = false;
+};
+
+SharedModuleThread::SharedModuleThread(
+ std::unique_ptr<ProcessThread> process_thread,
+ std::function<void()> on_one_ref_remaining)
+ : impl_(std::make_unique<Impl>(std::move(process_thread),
+ std::move(on_one_ref_remaining))) {}
+
+SharedModuleThread::~SharedModuleThread() = default;
+
+// static
+rtc::scoped_refptr<SharedModuleThread> SharedModuleThread::Create(
+ const char* name,
+ std::function<void()> on_one_ref_remaining) {
+ return new SharedModuleThread(ProcessThread::Create(name),
+ std::move(on_one_ref_remaining));
+}
+
+rtc::scoped_refptr<SharedModuleThread> SharedModuleThread::Create(
+ std::unique_ptr<ProcessThread> process_thread,
+ std::function<void()> on_one_ref_remaining) {
+ return new SharedModuleThread(std::move(process_thread),
+ std::move(on_one_ref_remaining));
+}
+
+void SharedModuleThread::EnsureStarted() {
+ impl_->EnsureStarted();
+}
+
+ProcessThread* SharedModuleThread::process_thread() {
+ return impl_->process_thread();
+}
+
+void SharedModuleThread::AddRef() const {
+ impl_->AddRef();
+}
+
+rtc::RefCountReleaseStatus SharedModuleThread::Release() const {
+ auto ret = impl_->Release();
+ if (ret == rtc::RefCountReleaseStatus::kDroppedLastRef)
+ delete this;
+ return ret;
+}
+
// This method here to avoid subclasses has to implement this method.
// Call perf test will use Internal::Call::CreateVideoSendStream() to inject
// FecController.
@@ -441,20 +592,19 @@ namespace internal {
Call::Call(Clock* clock,
const Call::Config& config,
std::unique_ptr<RtpTransportControllerSendInterface> transport_send,
- std::unique_ptr<ProcessThread> module_process_thread,
+ rtc::scoped_refptr<SharedModuleThread> module_process_thread,
TaskQueueFactory* task_queue_factory)
: clock_(clock),
task_queue_factory_(task_queue_factory),
+ worker_thread_(GetCurrentTaskQueueOrThread()),
num_cpu_cores_(CpuInfo::DetectNumberOfCores()),
module_process_thread_(std::move(module_process_thread)),
- call_stats_(new CallStats(clock_, GetCurrentTaskQueueOrThread())),
+ call_stats_(new CallStats(clock_, worker_thread_)),
bitrate_allocator_(new BitrateAllocator(this)),
config_(config),
audio_network_state_(kNetworkDown),
video_network_state_(kNetworkDown),
aggregate_network_up_(false),
- receive_crit_(RWLockWrapper::CreateRWLock()),
- send_crit_(RWLockWrapper::CreateRWLock()),
event_log_(config.event_log),
received_bytes_per_second_counter_(clock_, nullptr, true),
received_audio_bytes_per_second_counter_(clock_, nullptr, true),
@@ -473,17 +623,18 @@ Call::Call(Clock* clock,
transport_send_(std::move(transport_send)) {
RTC_DCHECK(config.event_log != nullptr);
RTC_DCHECK(config.trials != nullptr);
- worker_sequence_checker_.Detach();
+ RTC_DCHECK(worker_thread_->IsCurrent());
call_stats_->RegisterStatsObserver(&receive_side_cc_);
- module_process_thread_->RegisterModule(
+ module_process_thread_->process_thread()->RegisterModule(
receive_side_cc_.GetRemoteBitrateEstimator(true), RTC_FROM_HERE);
- module_process_thread_->RegisterModule(&receive_side_cc_, RTC_FROM_HERE);
+ module_process_thread_->process_thread()->RegisterModule(&receive_side_cc_,
+ RTC_FROM_HERE);
}
Call::~Call() {
- RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
+ RTC_DCHECK_RUN_ON(worker_thread_);
RTC_CHECK(audio_send_ssrcs_.empty());
RTC_CHECK(video_send_ssrcs_.empty());
@@ -491,10 +642,9 @@ Call::~Call() {
RTC_CHECK(audio_receive_streams_.empty());
RTC_CHECK(video_receive_streams_.empty());
- module_process_thread_->Stop();
- module_process_thread_->DeRegisterModule(
+ module_process_thread_->process_thread()->DeRegisterModule(
receive_side_cc_.GetRemoteBitrateEstimator(true));
- module_process_thread_->DeRegisterModule(&receive_side_cc_);
+ module_process_thread_->process_thread()->DeRegisterModule(&receive_side_cc_);
call_stats_->DeregisterStatsObserver(&receive_side_cc_);
absl::optional<Timestamp> first_sent_packet_ms =
@@ -503,7 +653,6 @@ Call::~Call() {
// Only update histograms after process threads have been shut down, so that
// they won't try to concurrently update stats.
if (first_sent_packet_ms) {
- rtc::CritScope lock(&bitrate_crit_);
UpdateSendHistograms(*first_sent_packet_ms);
}
@@ -512,7 +661,7 @@ Call::~Call() {
}
void Call::RegisterRateObserver() {
- RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
+ RTC_DCHECK_RUN_ON(worker_thread_);
if (is_target_rate_observer_registered_)
return;
@@ -523,11 +672,11 @@ void Call::RegisterRateObserver() {
// off being kicked off on request rather than in the ctor.
transport_send_ptr_->RegisterTargetTransferRateObserver(this);
- module_process_thread_->Start();
+ module_process_thread_->EnsureStarted();
}
void Call::SetClientBitratePreferences(const BitrateSettings& preferences) {
- RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
+ RTC_DCHECK_RUN_ON(worker_thread_);
GetTransportControllerSend()->SetClientBitratePreferences(preferences);
}
@@ -609,14 +758,14 @@ void Call::UpdateReceiveHistograms() {
}
PacketReceiver* Call::Receiver() {
- RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
+ RTC_DCHECK_RUN_ON(worker_thread_);
return this;
}
webrtc::AudioSendStream* Call::CreateAudioSendStream(
const webrtc::AudioSendStream::Config& config) {
TRACE_EVENT0("webrtc", "Call::CreateAudioSendStream");
- RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
+ RTC_DCHECK_RUN_ON(worker_thread_);
RegisterRateObserver();
@@ -632,30 +781,26 @@ webrtc::AudioSendStream* Call::CreateAudioSendStream(
AudioSendStream* send_stream = new AudioSendStream(
clock_, config, config_.audio_state, task_queue_factory_,
- module_process_thread_.get(), transport_send_ptr_,
+ module_process_thread_->process_thread(), transport_send_ptr_,
bitrate_allocator_.get(), event_log_, call_stats_->AsRtcpRttStats(),
suspended_rtp_state);
- {
- WriteLockScoped write_lock(*send_crit_);
- RTC_DCHECK(audio_send_ssrcs_.find(config.rtp.ssrc) ==
- audio_send_ssrcs_.end());
- audio_send_ssrcs_[config.rtp.ssrc] = send_stream;
- }
- {
- ReadLockScoped read_lock(*receive_crit_);
- for (AudioReceiveStream* stream : audio_receive_streams_) {
- if (stream->config().rtp.local_ssrc == config.rtp.ssrc) {
- stream->AssociateSendStream(send_stream);
- }
+ RTC_DCHECK(audio_send_ssrcs_.find(config.rtp.ssrc) ==
+ audio_send_ssrcs_.end());
+ audio_send_ssrcs_[config.rtp.ssrc] = send_stream;
+
+ for (AudioReceiveStream* stream : audio_receive_streams_) {
+ if (stream->config().rtp.local_ssrc == config.rtp.ssrc) {
+ stream->AssociateSendStream(send_stream);
}
}
+
UpdateAggregateNetworkState();
return send_stream;
}
void Call::DestroyAudioSendStream(webrtc::AudioSendStream* send_stream) {
TRACE_EVENT0("webrtc", "Call::DestroyAudioSendStream");
- RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
+ RTC_DCHECK_RUN_ON(worker_thread_);
RTC_DCHECK(send_stream != nullptr);
send_stream->Stop();
@@ -664,19 +809,16 @@ void Call::DestroyAudioSendStream(webrtc::AudioSendStream* send_stream) {
webrtc::internal::AudioSendStream* audio_send_stream =
static_cast<webrtc::internal::AudioSendStream*>(send_stream);
suspended_audio_send_ssrcs_[ssrc] = audio_send_stream->GetRtpState();
- {
- WriteLockScoped write_lock(*send_crit_);
- size_t num_deleted = audio_send_ssrcs_.erase(ssrc);
- RTC_DCHECK_EQ(1, num_deleted);
- }
- {
- ReadLockScoped read_lock(*receive_crit_);
- for (AudioReceiveStream* stream : audio_receive_streams_) {
- if (stream->config().rtp.local_ssrc == ssrc) {
- stream->AssociateSendStream(nullptr);
- }
+
+ size_t num_deleted = audio_send_ssrcs_.erase(ssrc);
+ RTC_DCHECK_EQ(1, num_deleted);
+
+ for (AudioReceiveStream* stream : audio_receive_streams_) {
+ if (stream->config().rtp.local_ssrc == ssrc) {
+ stream->AssociateSendStream(nullptr);
}
}
+
UpdateAggregateNetworkState();
delete send_stream;
}
@@ -684,29 +826,25 @@ void Call::DestroyAudioSendStream(webrtc::AudioSendStream* send_stream) {
webrtc::AudioReceiveStream* Call::CreateAudioReceiveStream(
const webrtc::AudioReceiveStream::Config& config) {
TRACE_EVENT0("webrtc", "Call::CreateAudioReceiveStream");
- RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
+ RTC_DCHECK_RUN_ON(worker_thread_);
RegisterRateObserver();
event_log_->Log(std::make_unique<RtcEventAudioReceiveStreamConfig>(
CreateRtcLogStreamConfig(config)));
AudioReceiveStream* receive_stream = new AudioReceiveStream(
clock_, &audio_receiver_controller_, transport_send_ptr_->packet_router(),
- module_process_thread_.get(), config_.neteq_factory, config,
+ module_process_thread_->process_thread(), config_.neteq_factory, config,
config_.audio_state, event_log_);
- {
- WriteLockScoped write_lock(*receive_crit_);
- receive_rtp_config_.emplace(config.rtp.remote_ssrc,
- ReceiveRtpConfig(config));
- audio_receive_streams_.insert(receive_stream);
- ConfigureSync(config.sync_group);
- }
- {
- ReadLockScoped read_lock(*send_crit_);
- auto it = audio_send_ssrcs_.find(config.rtp.local_ssrc);
- if (it != audio_send_ssrcs_.end()) {
- receive_stream->AssociateSendStream(it->second);
- }
+ receive_rtp_config_.emplace(config.rtp.remote_ssrc, ReceiveRtpConfig(config));
+ audio_receive_streams_.insert(receive_stream);
+
+ ConfigureSync(config.sync_group);
+
+ auto it = audio_send_ssrcs_.find(config.rtp.local_ssrc);
+ if (it != audio_send_ssrcs_.end()) {
+ receive_stream->AssociateSendStream(it->second);
}
+
UpdateAggregateNetworkState();
return receive_stream;
}
@@ -714,26 +852,24 @@ webrtc::AudioReceiveStream* Call::CreateAudioReceiveStream(
void Call::DestroyAudioReceiveStream(
webrtc::AudioReceiveStream* receive_stream) {
TRACE_EVENT0("webrtc", "Call::DestroyAudioReceiveStream");
- RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
+ RTC_DCHECK_RUN_ON(worker_thread_);
RTC_DCHECK(receive_stream != nullptr);
webrtc::internal::AudioReceiveStream* audio_receive_stream =
static_cast<webrtc::internal::AudioReceiveStream*>(receive_stream);
- {
- WriteLockScoped write_lock(*receive_crit_);
- const AudioReceiveStream::Config& config = audio_receive_stream->config();
- uint32_t ssrc = config.rtp.remote_ssrc;
- receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config))
- ->RemoveStream(ssrc);
- audio_receive_streams_.erase(audio_receive_stream);
- const std::string& sync_group = audio_receive_stream->config().sync_group;
- const auto it = sync_stream_mapping_.find(sync_group);
- if (it != sync_stream_mapping_.end() &&
- it->second == audio_receive_stream) {
- sync_stream_mapping_.erase(it);
- ConfigureSync(sync_group);
- }
- receive_rtp_config_.erase(ssrc);
+
+ const AudioReceiveStream::Config& config = audio_receive_stream->config();
+ uint32_t ssrc = config.rtp.remote_ssrc;
+ receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config))
+ ->RemoveStream(ssrc);
+ audio_receive_streams_.erase(audio_receive_stream);
+ const std::string& sync_group = audio_receive_stream->config().sync_group;
+ const auto it = sync_stream_mapping_.find(sync_group);
+ if (it != sync_stream_mapping_.end() && it->second == audio_receive_stream) {
+ sync_stream_mapping_.erase(it);
+ ConfigureSync(sync_group);
}
+ receive_rtp_config_.erase(ssrc);
+
UpdateAggregateNetworkState();
delete audio_receive_stream;
}
@@ -744,7 +880,7 @@ webrtc::VideoSendStream* Call::CreateVideoSendStream(
VideoEncoderConfig encoder_config,
std::unique_ptr<FecController> fec_controller) {
TRACE_EVENT0("webrtc", "Call::CreateVideoSendStream");
- RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
+ RTC_DCHECK_RUN_ON(worker_thread_);
RegisterRateObserver();
@@ -761,20 +897,22 @@ webrtc::VideoSendStream* Call::CreateVideoSendStream(
std::vector<uint32_t> ssrcs = config.rtp.ssrcs;
VideoSendStream* send_stream = new VideoSendStream(
- clock_, num_cpu_cores_, module_process_thread_.get(), task_queue_factory_,
- call_stats_->AsRtcpRttStats(), transport_send_ptr_,
+ clock_, num_cpu_cores_, module_process_thread_->process_thread(),
+ task_queue_factory_, call_stats_->AsRtcpRttStats(), transport_send_ptr_,
bitrate_allocator_.get(), video_send_delay_stats_.get(), event_log_,
std::move(config), std::move(encoder_config), suspended_video_send_ssrcs_,
suspended_video_payload_states_, std::move(fec_controller));
- {
- WriteLockScoped write_lock(*send_crit_);
- for (uint32_t ssrc : ssrcs) {
- RTC_DCHECK(video_send_ssrcs_.find(ssrc) == video_send_ssrcs_.end());
- video_send_ssrcs_[ssrc] = send_stream;
- }
- video_send_streams_.insert(send_stream);
+ for (uint32_t ssrc : ssrcs) {
+ RTC_DCHECK(video_send_ssrcs_.find(ssrc) == video_send_ssrcs_.end());
+ video_send_ssrcs_[ssrc] = send_stream;
+ }
+ video_send_streams_.insert(send_stream);
+ // Forward resources that were previously added to the call to the new stream.
+ for (const auto& resource_forwarder : adaptation_resource_forwarders_) {
+ resource_forwarder->OnCreateVideoSendStream(send_stream);
}
+
UpdateAggregateNetworkState();
return send_stream;
@@ -797,24 +935,27 @@ webrtc::VideoSendStream* Call::CreateVideoSendStream(
void Call::DestroyVideoSendStream(webrtc::VideoSendStream* send_stream) {
TRACE_EVENT0("webrtc", "Call::DestroyVideoSendStream");
RTC_DCHECK(send_stream != nullptr);
- RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
+ RTC_DCHECK_RUN_ON(worker_thread_);
send_stream->Stop();
VideoSendStream* send_stream_impl = nullptr;
- {
- WriteLockScoped write_lock(*send_crit_);
- auto it = video_send_ssrcs_.begin();
- while (it != video_send_ssrcs_.end()) {
- if (it->second == static_cast<VideoSendStream*>(send_stream)) {
- send_stream_impl = it->second;
- video_send_ssrcs_.erase(it++);
- } else {
- ++it;
- }
+
+ auto it = video_send_ssrcs_.begin();
+ while (it != video_send_ssrcs_.end()) {
+ if (it->second == static_cast<VideoSendStream*>(send_stream)) {
+ send_stream_impl = it->second;
+ video_send_ssrcs_.erase(it++);
+ } else {
+ ++it;
}
- video_send_streams_.erase(send_stream_impl);
}
+ // Stop forwarding resources to the stream being destroyed.
+ for (const auto& resource_forwarder : adaptation_resource_forwarders_) {
+ resource_forwarder->OnDestroyVideoSendStream(send_stream_impl);
+ }
+ video_send_streams_.erase(send_stream_impl);
+
RTC_CHECK(send_stream_impl != nullptr);
VideoSendStream::RtpStateMap rtp_states;
@@ -835,7 +976,7 @@ void Call::DestroyVideoSendStream(webrtc::VideoSendStream* send_stream) {
webrtc::VideoReceiveStream* Call::CreateVideoReceiveStream(
webrtc::VideoReceiveStream::Config configuration) {
TRACE_EVENT0("webrtc", "Call::CreateVideoReceiveStream");
- RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
+ RTC_DCHECK_RUN_ON(worker_thread_);
receive_side_cc_.SetSendPeriodicFeedback(
SendPeriodicFeedback(configuration.rtp.extensions));
@@ -847,25 +988,21 @@ webrtc::VideoReceiveStream* Call::CreateVideoReceiveStream(
VideoReceiveStream2* receive_stream = new VideoReceiveStream2(
task_queue_factory_, current, &video_receiver_controller_, num_cpu_cores_,
transport_send_ptr_->packet_router(), std::move(configuration),
- module_process_thread_.get(), call_stats_.get(), clock_,
+ module_process_thread_->process_thread(), call_stats_.get(), clock_,
new VCMTiming(clock_));
const webrtc::VideoReceiveStream::Config& config = receive_stream->config();
- {
- WriteLockScoped write_lock(*receive_crit_);
- if (config.rtp.rtx_ssrc) {
- // We record identical config for the rtx stream as for the main
- // stream. Since the transport_send_cc negotiation is per payload
- // type, we may get an incorrect value for the rtx stream, but
- // that is unlikely to matter in practice.
- receive_rtp_config_.emplace(config.rtp.rtx_ssrc,
- ReceiveRtpConfig(config));
- }
- receive_rtp_config_.emplace(config.rtp.remote_ssrc,
- ReceiveRtpConfig(config));
- video_receive_streams_.insert(receive_stream);
- ConfigureSync(config.sync_group);
+ if (config.rtp.rtx_ssrc) {
+ // We record identical config for the rtx stream as for the main
+ // stream. Since the transport_send_cc negotiation is per payload
+ // type, we may get an incorrect value for the rtx stream, but
+ // that is unlikely to matter in practice.
+ receive_rtp_config_.emplace(config.rtp.rtx_ssrc, ReceiveRtpConfig(config));
}
+ receive_rtp_config_.emplace(config.rtp.remote_ssrc, ReceiveRtpConfig(config));
+ video_receive_streams_.insert(receive_stream);
+ ConfigureSync(config.sync_group);
+
receive_stream->SignalNetworkState(video_network_state_);
UpdateAggregateNetworkState();
event_log_->Log(std::make_unique<RtcEventVideoReceiveStreamConfig>(
@@ -876,22 +1013,20 @@ webrtc::VideoReceiveStream* Call::CreateVideoReceiveStream(
void Call::DestroyVideoReceiveStream(
webrtc::VideoReceiveStream* receive_stream) {
TRACE_EVENT0("webrtc", "Call::DestroyVideoReceiveStream");
- RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
+ RTC_DCHECK_RUN_ON(worker_thread_);
RTC_DCHECK(receive_stream != nullptr);
VideoReceiveStream2* receive_stream_impl =
static_cast<VideoReceiveStream2*>(receive_stream);
const VideoReceiveStream::Config& config = receive_stream_impl->config();
- {
- WriteLockScoped write_lock(*receive_crit_);
- // Remove all ssrcs pointing to a receive stream. As RTX retransmits on a
- // separate SSRC there can be either one or two.
- receive_rtp_config_.erase(config.rtp.remote_ssrc);
- if (config.rtp.rtx_ssrc) {
- receive_rtp_config_.erase(config.rtp.rtx_ssrc);
- }
- video_receive_streams_.erase(receive_stream_impl);
- ConfigureSync(config.sync_group);
+
+ // Remove all ssrcs pointing to a receive stream. As RTX retransmits on a
+ // separate SSRC there can be either one or two.
+ receive_rtp_config_.erase(config.rtp.remote_ssrc);
+ if (config.rtp.rtx_ssrc) {
+ receive_rtp_config_.erase(config.rtp.rtx_ssrc);
}
+ video_receive_streams_.erase(receive_stream_impl);
+ ConfigureSync(config.sync_group);
receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config))
->RemoveStream(config.rtp.remote_ssrc);
@@ -903,30 +1038,25 @@ void Call::DestroyVideoReceiveStream(
FlexfecReceiveStream* Call::CreateFlexfecReceiveStream(
const FlexfecReceiveStream::Config& config) {
TRACE_EVENT0("webrtc", "Call::CreateFlexfecReceiveStream");
- RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
+ RTC_DCHECK_RUN_ON(worker_thread_);
RecoveredPacketReceiver* recovered_packet_receiver = this;
FlexfecReceiveStreamImpl* receive_stream;
- {
- WriteLockScoped write_lock(*receive_crit_);
- // Unlike the video and audio receive streams,
- // FlexfecReceiveStream implements RtpPacketSinkInterface itself,
- // and hence its constructor passes its |this| pointer to
- // video_receiver_controller_->CreateStream(). Calling the
- // constructor while holding |receive_crit_| ensures that we don't
- // call OnRtpPacket until the constructor is finished and the
- // object is in a valid state.
- // TODO(nisse): Fix constructor so that it can be moved outside of
- // this locked scope.
- receive_stream = new FlexfecReceiveStreamImpl(
- clock_, &video_receiver_controller_, config, recovered_packet_receiver,
- call_stats_->AsRtcpRttStats(), module_process_thread_.get());
-
- RTC_DCHECK(receive_rtp_config_.find(config.remote_ssrc) ==
- receive_rtp_config_.end());
- receive_rtp_config_.emplace(config.remote_ssrc, ReceiveRtpConfig(config));
- }
+
+ // Unlike the video and audio receive streams, FlexfecReceiveStream implements
+ // RtpPacketSinkInterface itself, and hence its constructor passes its |this|
+ // pointer to video_receiver_controller_->CreateStream(). Calling the
+ // constructor while on the worker thread ensures that we don't call
+ // OnRtpPacket until the constructor is finished and the object is
+ // in a valid state, since OnRtpPacket runs on the same thread.
+ receive_stream = new FlexfecReceiveStreamImpl(
+ clock_, &video_receiver_controller_, config, recovered_packet_receiver,
+ call_stats_->AsRtcpRttStats(), module_process_thread_->process_thread());
+
+ RTC_DCHECK(receive_rtp_config_.find(config.remote_ssrc) ==
+ receive_rtp_config_.end());
+ receive_rtp_config_.emplace(config.remote_ssrc, ReceiveRtpConfig(config));
// TODO(brandtr): Store config in RtcEventLog here.
@@ -935,39 +1065,37 @@ FlexfecReceiveStream* Call::CreateFlexfecReceiveStream(
void Call::DestroyFlexfecReceiveStream(FlexfecReceiveStream* receive_stream) {
TRACE_EVENT0("webrtc", "Call::DestroyFlexfecReceiveStream");
- RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
+ RTC_DCHECK_RUN_ON(worker_thread_);
RTC_DCHECK(receive_stream != nullptr);
- {
- WriteLockScoped write_lock(*receive_crit_);
+ const FlexfecReceiveStream::Config& config = receive_stream->GetConfig();
+ uint32_t ssrc = config.remote_ssrc;
+ receive_rtp_config_.erase(ssrc);
- const FlexfecReceiveStream::Config& config = receive_stream->GetConfig();
- uint32_t ssrc = config.remote_ssrc;
- receive_rtp_config_.erase(ssrc);
-
- // Remove all SSRCs pointing to the FlexfecReceiveStreamImpl to be
- // destroyed.
- receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config))
- ->RemoveStream(ssrc);
- }
+ // Remove all SSRCs pointing to the FlexfecReceiveStreamImpl to be
+ // destroyed.
+ receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config))
+ ->RemoveStream(ssrc);
delete receive_stream;
}
+void Call::AddAdaptationResource(rtc::scoped_refptr<Resource> resource) {
+ RTC_DCHECK_RUN_ON(worker_thread_);
+ adaptation_resource_forwarders_.push_back(
+ std::make_unique<ResourceVideoSendStreamForwarder>(resource));
+ const auto& resource_forwarder = adaptation_resource_forwarders_.back();
+ for (VideoSendStream* send_stream : video_send_streams_) {
+ resource_forwarder->OnCreateVideoSendStream(send_stream);
+ }
+}
+
RtpTransportControllerSendInterface* Call::GetTransportControllerSend() {
return transport_send_ptr_;
}
Call::Stats Call::GetStats() const {
- RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
-
- // TODO(tommi): The following stats are managed on the process thread:
- // - pacer_delay_ms (PacedSender::Process)
- // - rtt_ms
- // - recv_bandwidth_bps
- // These are delivered on the network TQ:
- // - send_bandwidth_bps (see OnTargetTransferRate)
- // - max_padding_bitrate_bps (see OnAllocationLimitsChanged)
+ RTC_DCHECK_RUN_ON(worker_thread_);
Stats stats;
// TODO(srte): It is unclear if we only want to report queues if network is
@@ -983,22 +1111,14 @@ Call::Stats Call::GetStats() const {
receive_side_cc_.GetRemoteBitrateEstimator(false)->LatestEstimate(
&ssrcs, &recv_bandwidth);
stats.recv_bandwidth_bps = recv_bandwidth;
-
- {
- rtc::CritScope cs(&last_bandwidth_bps_crit_);
- stats.send_bandwidth_bps = last_bandwidth_bps_;
- }
-
- {
- rtc::CritScope cs(&bitrate_crit_);
- stats.max_padding_bitrate_bps = configured_max_padding_bitrate_bps_;
- }
+ stats.send_bandwidth_bps = last_bandwidth_bps_;
+ stats.max_padding_bitrate_bps = configured_max_padding_bitrate_bps_;
return stats;
}
void Call::SignalChannelNetworkState(MediaType media, NetworkState state) {
- RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
+ RTC_DCHECK_RUN_ON(worker_thread_);
switch (media) {
case MediaType::AUDIO:
audio_network_state_ = state;
@@ -1013,40 +1133,25 @@ void Call::SignalChannelNetworkState(MediaType media, NetworkState state) {
}
UpdateAggregateNetworkState();
- {
- ReadLockScoped read_lock(*receive_crit_);
- for (VideoReceiveStream2* video_receive_stream : video_receive_streams_) {
- video_receive_stream->SignalNetworkState(video_network_state_);
- }
+ for (VideoReceiveStream2* video_receive_stream : video_receive_streams_) {
+ video_receive_stream->SignalNetworkState(video_network_state_);
}
}
void Call::OnAudioTransportOverheadChanged(int transport_overhead_per_packet) {
- ReadLockScoped read_lock(*send_crit_);
+ RTC_DCHECK_RUN_ON(worker_thread_);
for (auto& kv : audio_send_ssrcs_) {
kv.second->SetTransportOverhead(transport_overhead_per_packet);
}
}
void Call::UpdateAggregateNetworkState() {
- RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
+ RTC_DCHECK_RUN_ON(worker_thread_);
- bool have_audio = false;
- bool have_video = false;
- {
- ReadLockScoped read_lock(*send_crit_);
- if (!audio_send_ssrcs_.empty())
- have_audio = true;
- if (!video_send_ssrcs_.empty())
- have_video = true;
- }
- {
- ReadLockScoped read_lock(*receive_crit_);
- if (!audio_receive_streams_.empty())
- have_audio = true;
- if (!video_receive_streams_.empty())
- have_video = true;
- }
+ bool have_audio =
+ !audio_send_ssrcs_.empty() || !audio_receive_streams_.empty();
+ bool have_video =
+ !video_send_ssrcs_.empty() || !video_receive_streams_.empty();
bool aggregate_network_up =
((have_video && video_network_state_ == kNetworkUp) ||
@@ -1073,61 +1178,50 @@ void Call::OnSentPacket(const rtc::SentPacket& sent_packet) {
}
void Call::OnStartRateUpdate(DataRate start_rate) {
- RTC_DCHECK(network_queue()->IsCurrent());
+ RTC_DCHECK_RUN_ON(send_transport_queue());
bitrate_allocator_->UpdateStartRate(start_rate.bps<uint32_t>());
}
void Call::OnTargetTransferRate(TargetTransferRate msg) {
- RTC_DCHECK(network_queue()->IsCurrent());
- RTC_DCHECK_RUN_ON(&worker_sequence_checker_);
- {
- rtc::CritScope cs(&last_bandwidth_bps_crit_);
- last_bandwidth_bps_ = msg.target_rate.bps();
- }
+ RTC_DCHECK_RUN_ON(send_transport_queue());
uint32_t target_bitrate_bps = msg.target_rate.bps();
// For controlling the rate of feedback messages.
receive_side_cc_.OnBitrateChanged(target_bitrate_bps);
bitrate_allocator_->OnNetworkEstimateChanged(msg);
- // Ignore updates if bitrate is zero (the aggregate network state is down).
- if (target_bitrate_bps == 0) {
- rtc::CritScope lock(&bitrate_crit_);
- estimated_send_bitrate_kbps_counter_.ProcessAndPause();
- pacer_bitrate_kbps_counter_.ProcessAndPause();
- return;
- }
-
- bool sending_video;
- {
- ReadLockScoped read_lock(*send_crit_);
- sending_video = !video_send_streams_.empty();
- }
+ worker_thread_->PostTask(
+ ToQueuedTask(task_safety_, [this, target_bitrate_bps]() {
+ RTC_DCHECK_RUN_ON(worker_thread_);
+ last_bandwidth_bps_ = target_bitrate_bps;
+
+ // Ignore updates if bitrate is zero (the aggregate network state is
+ // down) or if we're not sending video.
+ if (target_bitrate_bps == 0 || video_send_streams_.empty()) {
+ estimated_send_bitrate_kbps_counter_.ProcessAndPause();
+ pacer_bitrate_kbps_counter_.ProcessAndPause();
+ return;
+ }
- rtc::CritScope lock(&bitrate_crit_);
- if (!sending_video) {
- // Do not update the stats if we are not sending video.
- estimated_send_bitrate_kbps_counter_.ProcessAndPause();
- pacer_bitrate_kbps_counter_.ProcessAndPause();
- return;
- }
- estimated_send_bitrate_kbps_counter_.Add(target_bitrate_bps / 1000);
- // Pacer bitrate may be higher than bitrate estimate if enforcing min bitrate.
- uint32_t pacer_bitrate_bps =
- std::max(target_bitrate_bps, min_allocated_send_bitrate_bps_);
- pacer_bitrate_kbps_counter_.Add(pacer_bitrate_bps / 1000);
+ estimated_send_bitrate_kbps_counter_.Add(target_bitrate_bps / 1000);
+ // Pacer bitrate may be higher than bitrate estimate if enforcing min
+ // bitrate.
+ uint32_t pacer_bitrate_bps =
+ std::max(target_bitrate_bps, min_allocated_send_bitrate_bps_);
+ pacer_bitrate_kbps_counter_.Add(pacer_bitrate_bps / 1000);
+ }));
}
void Call::OnAllocationLimitsChanged(BitrateAllocationLimits limits) {
- RTC_DCHECK(network_queue()->IsCurrent());
- RTC_DCHECK_RUN_ON(&worker_sequence_checker_);
+ RTC_DCHECK_RUN_ON(send_transport_queue());
transport_send_ptr_->SetAllocatedSendBitrateLimits(limits);
- min_allocated_send_bitrate_bps_ = limits.min_allocatable_rate.bps();
-
- rtc::CritScope lock(&bitrate_crit_);
- configured_max_padding_bitrate_bps_ = limits.max_padding_rate.bps();
+ worker_thread_->PostTask(ToQueuedTask(task_safety_, [this, limits]() {
+ RTC_DCHECK_RUN_ON(worker_thread_);
+ min_allocated_send_bitrate_bps_ = limits.min_allocatable_rate.bps();
+ configured_max_padding_bitrate_bps_ = limits.max_padding_rate.bps();
+ }));
}
void Call::ConfigureSync(const std::string& sync_group) {
@@ -1194,28 +1288,24 @@ PacketReceiver::DeliveryStatus Call::DeliverRtcp(MediaType media_type,
}
bool rtcp_delivered = false;
if (media_type == MediaType::ANY || media_type == MediaType::VIDEO) {
- ReadLockScoped read_lock(*receive_crit_);
for (VideoReceiveStream2* stream : video_receive_streams_) {
if (stream->DeliverRtcp(packet, length))
rtcp_delivered = true;
}
}
if (media_type == MediaType::ANY || media_type == MediaType::AUDIO) {
- ReadLockScoped read_lock(*receive_crit_);
for (AudioReceiveStream* stream : audio_receive_streams_) {
stream->DeliverRtcp(packet, length);
rtcp_delivered = true;
}
}
if (media_type == MediaType::ANY || media_type == MediaType::VIDEO) {
- ReadLockScoped read_lock(*send_crit_);
for (VideoSendStream* stream : video_send_streams_) {
stream->DeliverRtcp(packet, length);
rtcp_delivered = true;
}
}
if (media_type == MediaType::ANY || media_type == MediaType::AUDIO) {
- ReadLockScoped read_lock(*send_crit_);
for (auto& kv : audio_send_ssrcs_) {
kv.second->DeliverRtcp(packet, length);
rtcp_delivered = true;
@@ -1259,17 +1349,15 @@ PacketReceiver::DeliveryStatus Call::DeliverRtp(MediaType media_type,
RTC_DCHECK(media_type == MediaType::AUDIO || media_type == MediaType::VIDEO ||
is_keep_alive_packet);
- ReadLockScoped read_lock(*receive_crit_);
auto it = receive_rtp_config_.find(parsed_packet.Ssrc());
if (it == receive_rtp_config_.end()) {
RTC_LOG(LS_ERROR) << "receive_rtp_config_ lookup failed for ssrc "
<< parsed_packet.Ssrc();
// Destruction of the receive stream, including deregistering from the
- // RtpDemuxer, is not protected by the |receive_crit_| lock. But
- // deregistering in the |receive_rtp_config_| map is protected by that lock.
- // So by not passing the packet on to demuxing in this case, we prevent
- // incoming packets to be passed on via the demuxer to a receive stream
- // which is being torned down.
+ // RtpDemuxer, is not protected by the |worker_thread_|.
+ // But deregistering in the |receive_rtp_config_| map is. So by not passing
+ // the packet on to demuxing in this case, we prevent incoming packets to be
+ // passed on via the demuxer to a receive stream which is being torned down.
return DELIVERY_UNKNOWN_SSRC;
}
@@ -1315,7 +1403,8 @@ PacketReceiver::DeliveryStatus Call::DeliverPacket(
MediaType media_type,
rtc::CopyOnWriteBuffer packet,
int64_t packet_time_us) {
- RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
+ RTC_DCHECK_RUN_ON(worker_thread_);
+
if (IsRtcp(packet.cdata(), packet.size()))
return DeliverRtcp(media_type, packet.cdata(), packet.size());
@@ -1323,20 +1412,20 @@ PacketReceiver::DeliveryStatus Call::DeliverPacket(
}
void Call::OnRecoveredPacket(const uint8_t* packet, size_t length) {
+ RTC_DCHECK_RUN_ON(worker_thread_);
RtpPacketReceived parsed_packet;
if (!parsed_packet.Parse(packet, length))
return;
parsed_packet.set_recovered(true);
- ReadLockScoped read_lock(*receive_crit_);
auto it = receive_rtp_config_.find(parsed_packet.Ssrc());
if (it == receive_rtp_config_.end()) {
RTC_LOG(LS_ERROR) << "receive_rtp_config_ lookup failed for ssrc "
<< parsed_packet.Ssrc();
// Destruction of the receive stream, including deregistering from the
- // RtpDemuxer, is not protected by the |receive_crit_| lock. But
- // deregistering in the |receive_rtp_config_| map is protected by that lock.
+ // RtpDemuxer, is not protected by the |worker_thread_|.
+ // But deregistering in the |receive_rtp_config_| map is.
// So by not passing the packet on to demuxing in this case, we prevent
// incoming packets to be passed on via the demuxer to a receive stream
// which is being torn down.