diff options
author | Allan Sandfeld Jensen <allan.jensen@qt.io> | 2020-10-12 14:27:29 +0200 |
---|---|---|
committer | Allan Sandfeld Jensen <allan.jensen@qt.io> | 2020-10-13 09:35:20 +0000 |
commit | c30a6232df03e1efbd9f3b226777b07e087a1122 (patch) | |
tree | e992f45784689f373bcc38d1b79a239ebe17ee23 /chromium/third_party/webrtc/call/call.cc | |
parent | 7b5b123ac58f58ffde0f4f6e488bcd09aa4decd3 (diff) | |
download | qtwebengine-chromium-85-based.tar.gz |
BASELINE: Update Chromium to 85.0.4183.14085-based
Change-Id: Iaa42f4680837c57725b1344f108c0196741f6057
Reviewed-by: Allan Sandfeld Jensen <allan.jensen@qt.io>
Diffstat (limited to 'chromium/third_party/webrtc/call/call.cc')
-rw-r--r-- | chromium/third_party/webrtc/call/call.cc | 665 |
1 files changed, 377 insertions, 288 deletions
diff --git a/chromium/third_party/webrtc/call/call.cc b/chromium/third_party/webrtc/call/call.cc index 4068db9f007..0ef2a3a4bc0 100644 --- a/chromium/third_party/webrtc/call/call.cc +++ b/chromium/third_party/webrtc/call/call.cc @@ -25,6 +25,7 @@ #include "audio/audio_receive_stream.h" #include "audio/audio_send_stream.h" #include "audio/audio_state.h" +#include "call/adaptation/broadcast_resource_listener.h" #include "call/bitrate_allocator.h" #include "call/flexfec_receive_stream_impl.h" #include "call/receive_time_calculator.h" @@ -49,8 +50,8 @@ #include "rtc_base/location.h" #include "rtc_base/logging.h" #include "rtc_base/strings/string_builder.h" -#include "rtc_base/synchronization/rw_lock_wrapper.h" #include "rtc_base/synchronization/sequence_checker.h" +#include "rtc_base/task_utils/pending_task_safety_flag.h" #include "rtc_base/thread_annotations.h" #include "rtc_base/time_utils.h" #include "rtc_base/trace_event.h" @@ -168,6 +169,47 @@ TaskQueueBase* GetCurrentTaskQueueOrThread() { namespace internal { +// Wraps an injected resource in a BroadcastResourceListener and handles adding +// and removing adapter resources to individual VideoSendStreams. +class ResourceVideoSendStreamForwarder { + public: + ResourceVideoSendStreamForwarder( + rtc::scoped_refptr<webrtc::Resource> resource) + : broadcast_resource_listener_(resource) { + broadcast_resource_listener_.StartListening(); + } + ~ResourceVideoSendStreamForwarder() { + RTC_DCHECK(adapter_resources_.empty()); + broadcast_resource_listener_.StopListening(); + } + + rtc::scoped_refptr<webrtc::Resource> Resource() const { + return broadcast_resource_listener_.SourceResource(); + } + + void OnCreateVideoSendStream(VideoSendStream* video_send_stream) { + RTC_DCHECK(adapter_resources_.find(video_send_stream) == + adapter_resources_.end()); + auto adapter_resource = + broadcast_resource_listener_.CreateAdapterResource(); + video_send_stream->AddAdaptationResource(adapter_resource); + adapter_resources_.insert( + std::make_pair(video_send_stream, adapter_resource)); + } + + void OnDestroyVideoSendStream(VideoSendStream* video_send_stream) { + auto it = adapter_resources_.find(video_send_stream); + RTC_DCHECK(it != adapter_resources_.end()); + broadcast_resource_listener_.RemoveAdapterResource(it->second); + adapter_resources_.erase(it); + } + + private: + BroadcastResourceListener broadcast_resource_listener_; + std::map<VideoSendStream*, rtc::scoped_refptr<webrtc::Resource>> + adapter_resources_; +}; + class Call final : public webrtc::Call, public PacketReceiver, public RecoveredPacketReceiver, @@ -177,7 +219,7 @@ class Call final : public webrtc::Call, Call(Clock* clock, const Call::Config& config, std::unique_ptr<RtpTransportControllerSendInterface> transport_send, - std::unique_ptr<ProcessThread> module_process_thread, + rtc::scoped_refptr<SharedModuleThread> module_process_thread, TaskQueueFactory* task_queue_factory); ~Call() override; @@ -212,6 +254,8 @@ class Call final : public webrtc::Call, void DestroyFlexfecReceiveStream( FlexfecReceiveStream* receive_stream) override; + void AddAdaptationResource(rtc::scoped_refptr<Resource> resource) override; + RtpTransportControllerSendInterface* GetTransportControllerSend() override; Stats GetStats() const override; @@ -243,54 +287,54 @@ class Call final : public webrtc::Call, private: DeliveryStatus DeliverRtcp(MediaType media_type, const uint8_t* packet, - size_t length); + size_t length) + RTC_EXCLUSIVE_LOCKS_REQUIRED(worker_thread_); DeliveryStatus DeliverRtp(MediaType media_type, rtc::CopyOnWriteBuffer packet, - int64_t packet_time_us); + int64_t packet_time_us) + RTC_EXCLUSIVE_LOCKS_REQUIRED(worker_thread_); void ConfigureSync(const std::string& sync_group) - RTC_EXCLUSIVE_LOCKS_REQUIRED(receive_crit_); + RTC_EXCLUSIVE_LOCKS_REQUIRED(worker_thread_); void NotifyBweOfReceivedPacket(const RtpPacketReceived& packet, MediaType media_type) - RTC_SHARED_LOCKS_REQUIRED(receive_crit_); + RTC_SHARED_LOCKS_REQUIRED(worker_thread_); void UpdateSendHistograms(Timestamp first_sent_packet) - RTC_EXCLUSIVE_LOCKS_REQUIRED(&bitrate_crit_); + RTC_EXCLUSIVE_LOCKS_REQUIRED(worker_thread_); void UpdateReceiveHistograms(); void UpdateHistograms(); void UpdateAggregateNetworkState(); void RegisterRateObserver(); - rtc::TaskQueue* network_queue() const { + rtc::TaskQueue* send_transport_queue() const { return transport_send_ptr_->GetWorkerQueue(); } Clock* const clock_; TaskQueueFactory* const task_queue_factory_; + TaskQueueBase* const worker_thread_; const int num_cpu_cores_; - const std::unique_ptr<ProcessThread> module_process_thread_; + const rtc::scoped_refptr<SharedModuleThread> module_process_thread_; const std::unique_ptr<CallStats> call_stats_; const std::unique_ptr<BitrateAllocator> bitrate_allocator_; Call::Config config_; - SequenceChecker configuration_sequence_checker_; - SequenceChecker worker_sequence_checker_; NetworkState audio_network_state_; NetworkState video_network_state_; - bool aggregate_network_up_ RTC_GUARDED_BY(configuration_sequence_checker_); + bool aggregate_network_up_ RTC_GUARDED_BY(worker_thread_); - std::unique_ptr<RWLockWrapper> receive_crit_; // Audio, Video, and FlexFEC receive streams are owned by the client that // creates them. std::set<AudioReceiveStream*> audio_receive_streams_ - RTC_GUARDED_BY(receive_crit_); + RTC_GUARDED_BY(worker_thread_); std::set<VideoReceiveStream2*> video_receive_streams_ - RTC_GUARDED_BY(receive_crit_); + RTC_GUARDED_BY(worker_thread_); std::map<std::string, AudioReceiveStream*> sync_stream_mapping_ - RTC_GUARDED_BY(receive_crit_); + RTC_GUARDED_BY(worker_thread_); // TODO(nisse): Should eventually be injected at creation, // with a single object in the bundled case. @@ -324,25 +368,26 @@ class Call final : public webrtc::Call, const bool use_send_side_bwe; }; std::map<uint32_t, ReceiveRtpConfig> receive_rtp_config_ - RTC_GUARDED_BY(receive_crit_); + RTC_GUARDED_BY(worker_thread_); - std::unique_ptr<RWLockWrapper> send_crit_; // Audio and Video send streams are owned by the client that creates them. std::map<uint32_t, AudioSendStream*> audio_send_ssrcs_ - RTC_GUARDED_BY(send_crit_); + RTC_GUARDED_BY(worker_thread_); std::map<uint32_t, VideoSendStream*> video_send_ssrcs_ - RTC_GUARDED_BY(send_crit_); - std::set<VideoSendStream*> video_send_streams_ RTC_GUARDED_BY(send_crit_); + RTC_GUARDED_BY(worker_thread_); + std::set<VideoSendStream*> video_send_streams_ RTC_GUARDED_BY(worker_thread_); + + // Each forwarder wraps an adaptation resource that was added to the call. + std::vector<std::unique_ptr<ResourceVideoSendStreamForwarder>> + adaptation_resource_forwarders_ RTC_GUARDED_BY(worker_thread_); using RtpStateMap = std::map<uint32_t, RtpState>; - RtpStateMap suspended_audio_send_ssrcs_ - RTC_GUARDED_BY(configuration_sequence_checker_); - RtpStateMap suspended_video_send_ssrcs_ - RTC_GUARDED_BY(configuration_sequence_checker_); + RtpStateMap suspended_audio_send_ssrcs_ RTC_GUARDED_BY(worker_thread_); + RtpStateMap suspended_video_send_ssrcs_ RTC_GUARDED_BY(worker_thread_); using RtpPayloadStateMap = std::map<uint32_t, RtpPayloadState>; RtpPayloadStateMap suspended_video_payload_states_ - RTC_GUARDED_BY(configuration_sequence_checker_); + RTC_GUARDED_BY(worker_thread_); webrtc::RtcEventLog* event_log_; @@ -358,17 +403,14 @@ class Call final : public webrtc::Call, absl::optional<int64_t> first_received_rtp_video_ms_; absl::optional<int64_t> last_received_rtp_video_ms_; - rtc::CriticalSection last_bandwidth_bps_crit_; - uint32_t last_bandwidth_bps_ RTC_GUARDED_BY(&last_bandwidth_bps_crit_); + uint32_t last_bandwidth_bps_ RTC_GUARDED_BY(worker_thread_); // TODO(holmer): Remove this lock once BitrateController no longer calls // OnNetworkChanged from multiple threads. - rtc::CriticalSection bitrate_crit_; - uint32_t min_allocated_send_bitrate_bps_ - RTC_GUARDED_BY(&worker_sequence_checker_); - uint32_t configured_max_padding_bitrate_bps_ RTC_GUARDED_BY(&bitrate_crit_); + uint32_t min_allocated_send_bitrate_bps_ RTC_GUARDED_BY(worker_thread_); + uint32_t configured_max_padding_bitrate_bps_ RTC_GUARDED_BY(worker_thread_); AvgCounter estimated_send_bitrate_kbps_counter_ - RTC_GUARDED_BY(&bitrate_crit_); - AvgCounter pacer_bitrate_kbps_counter_ RTC_GUARDED_BY(&bitrate_crit_); + RTC_GUARDED_BY(worker_thread_); + AvgCounter pacer_bitrate_kbps_counter_ RTC_GUARDED_BY(worker_thread_); ReceiveSideCongestionController receive_side_cc_; @@ -377,6 +419,11 @@ class Call final : public webrtc::Call, const std::unique_ptr<SendDelayStats> video_send_delay_stats_; const int64_t start_ms_; + // Note that |task_safety_| needs to be at a greater scope than the task queue + // owned by |transport_send_| since calls might arrive on the network thread + // while Call is being deleted and the task queue is being torn down. + ScopedTaskSafety task_safety_; + // Caches transport_send_.get(), to avoid racing with destructor. // Note that this is declared before transport_send_ to ensure that it is not // invalidated until no more tasks can be running on the transport_send_ task @@ -386,8 +433,8 @@ class Call final : public webrtc::Call, // last ensures that it is destroyed first and any running tasks are finished. std::unique_ptr<RtpTransportControllerSendInterface> transport_send_; - bool is_target_rate_observer_registered_ - RTC_GUARDED_BY(&configuration_sequence_checker_) = false; + bool is_target_rate_observer_registered_ RTC_GUARDED_BY(worker_thread_) = + false; RTC_DISALLOW_COPY_AND_ASSIGN(Call); }; @@ -407,14 +454,20 @@ std::string Call::Stats::ToString(int64_t time_ms) const { } Call* Call::Create(const Call::Config& config) { - return Create(config, Clock::GetRealTimeClock(), - ProcessThread::Create("ModuleProcessThread"), + rtc::scoped_refptr<SharedModuleThread> call_thread = + SharedModuleThread::Create("ModuleProcessThread", nullptr); + return Create(config, std::move(call_thread)); +} + +Call* Call::Create(const Call::Config& config, + rtc::scoped_refptr<SharedModuleThread> call_thread) { + return Create(config, Clock::GetRealTimeClock(), std::move(call_thread), ProcessThread::Create("PacerThread")); } Call* Call::Create(const Call::Config& config, Clock* clock, - std::unique_ptr<ProcessThread> call_thread, + rtc::scoped_refptr<SharedModuleThread> call_thread, std::unique_ptr<ProcessThread> pacer_thread) { RTC_DCHECK(config.task_queue_factory); return new internal::Call( @@ -426,6 +479,104 @@ Call* Call::Create(const Call::Config& config, std::move(call_thread), config.task_queue_factory); } +class SharedModuleThread::Impl { + public: + Impl(std::unique_ptr<ProcessThread> process_thread, + std::function<void()> on_one_ref_remaining) + : module_thread_(std::move(process_thread)), + on_one_ref_remaining_(std::move(on_one_ref_remaining)) {} + + void EnsureStarted() { + RTC_DCHECK_RUN_ON(&sequence_checker_); + if (started_) + return; + started_ = true; + module_thread_->Start(); + } + + ProcessThread* process_thread() { + RTC_DCHECK_RUN_ON(&sequence_checker_); + return module_thread_.get(); + } + + void AddRef() const { + RTC_DCHECK_RUN_ON(&sequence_checker_); + ++ref_count_; + } + + rtc::RefCountReleaseStatus Release() const { + RTC_DCHECK_RUN_ON(&sequence_checker_); + --ref_count_; + + if (ref_count_ == 0) { + module_thread_->Stop(); + return rtc::RefCountReleaseStatus::kDroppedLastRef; + } + + if (ref_count_ == 1 && on_one_ref_remaining_) { + auto moved_fn = std::move(on_one_ref_remaining_); + // NOTE: after this function returns, chances are that |this| has been + // deleted - do not touch any member variables. + // If the owner of the last reference implements a lambda that releases + // that last reference inside of the callback (which is legal according + // to this implementation), we will recursively enter Release() above, + // call Stop() and release the last reference. + moved_fn(); + } + + return rtc::RefCountReleaseStatus::kOtherRefsRemained; + } + + private: + SequenceChecker sequence_checker_; + mutable int ref_count_ RTC_GUARDED_BY(sequence_checker_) = 0; + std::unique_ptr<ProcessThread> const module_thread_; + std::function<void()> const on_one_ref_remaining_; + bool started_ = false; +}; + +SharedModuleThread::SharedModuleThread( + std::unique_ptr<ProcessThread> process_thread, + std::function<void()> on_one_ref_remaining) + : impl_(std::make_unique<Impl>(std::move(process_thread), + std::move(on_one_ref_remaining))) {} + +SharedModuleThread::~SharedModuleThread() = default; + +// static +rtc::scoped_refptr<SharedModuleThread> SharedModuleThread::Create( + const char* name, + std::function<void()> on_one_ref_remaining) { + return new SharedModuleThread(ProcessThread::Create(name), + std::move(on_one_ref_remaining)); +} + +rtc::scoped_refptr<SharedModuleThread> SharedModuleThread::Create( + std::unique_ptr<ProcessThread> process_thread, + std::function<void()> on_one_ref_remaining) { + return new SharedModuleThread(std::move(process_thread), + std::move(on_one_ref_remaining)); +} + +void SharedModuleThread::EnsureStarted() { + impl_->EnsureStarted(); +} + +ProcessThread* SharedModuleThread::process_thread() { + return impl_->process_thread(); +} + +void SharedModuleThread::AddRef() const { + impl_->AddRef(); +} + +rtc::RefCountReleaseStatus SharedModuleThread::Release() const { + auto ret = impl_->Release(); + if (ret == rtc::RefCountReleaseStatus::kDroppedLastRef) + delete this; + return ret; +} + // This method here to avoid subclasses has to implement this method. // Call perf test will use Internal::Call::CreateVideoSendStream() to inject // FecController. @@ -441,20 +592,19 @@ namespace internal { Call::Call(Clock* clock, const Call::Config& config, std::unique_ptr<RtpTransportControllerSendInterface> transport_send, - std::unique_ptr<ProcessThread> module_process_thread, + rtc::scoped_refptr<SharedModuleThread> module_process_thread, TaskQueueFactory* task_queue_factory) : clock_(clock), task_queue_factory_(task_queue_factory), + worker_thread_(GetCurrentTaskQueueOrThread()), num_cpu_cores_(CpuInfo::DetectNumberOfCores()), module_process_thread_(std::move(module_process_thread)), - call_stats_(new CallStats(clock_, GetCurrentTaskQueueOrThread())), + call_stats_(new CallStats(clock_, worker_thread_)), bitrate_allocator_(new BitrateAllocator(this)), config_(config), audio_network_state_(kNetworkDown), video_network_state_(kNetworkDown), aggregate_network_up_(false), - receive_crit_(RWLockWrapper::CreateRWLock()), - send_crit_(RWLockWrapper::CreateRWLock()), event_log_(config.event_log), received_bytes_per_second_counter_(clock_, nullptr, true), received_audio_bytes_per_second_counter_(clock_, nullptr, true), @@ -473,17 +623,18 @@ Call::Call(Clock* clock, transport_send_(std::move(transport_send)) { RTC_DCHECK(config.event_log != nullptr); RTC_DCHECK(config.trials != nullptr); - worker_sequence_checker_.Detach(); + RTC_DCHECK(worker_thread_->IsCurrent()); call_stats_->RegisterStatsObserver(&receive_side_cc_); - module_process_thread_->RegisterModule( + module_process_thread_->process_thread()->RegisterModule( receive_side_cc_.GetRemoteBitrateEstimator(true), RTC_FROM_HERE); - module_process_thread_->RegisterModule(&receive_side_cc_, RTC_FROM_HERE); + module_process_thread_->process_thread()->RegisterModule(&receive_side_cc_, + RTC_FROM_HERE); } Call::~Call() { - RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); + RTC_DCHECK_RUN_ON(worker_thread_); RTC_CHECK(audio_send_ssrcs_.empty()); RTC_CHECK(video_send_ssrcs_.empty()); @@ -491,10 +642,9 @@ Call::~Call() { RTC_CHECK(audio_receive_streams_.empty()); RTC_CHECK(video_receive_streams_.empty()); - module_process_thread_->Stop(); - module_process_thread_->DeRegisterModule( + module_process_thread_->process_thread()->DeRegisterModule( receive_side_cc_.GetRemoteBitrateEstimator(true)); - module_process_thread_->DeRegisterModule(&receive_side_cc_); + module_process_thread_->process_thread()->DeRegisterModule(&receive_side_cc_); call_stats_->DeregisterStatsObserver(&receive_side_cc_); absl::optional<Timestamp> first_sent_packet_ms = @@ -503,7 +653,6 @@ Call::~Call() { // Only update histograms after process threads have been shut down, so that // they won't try to concurrently update stats. if (first_sent_packet_ms) { - rtc::CritScope lock(&bitrate_crit_); UpdateSendHistograms(*first_sent_packet_ms); } @@ -512,7 +661,7 @@ Call::~Call() { } void Call::RegisterRateObserver() { - RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); + RTC_DCHECK_RUN_ON(worker_thread_); if (is_target_rate_observer_registered_) return; @@ -523,11 +672,11 @@ void Call::RegisterRateObserver() { // off being kicked off on request rather than in the ctor. transport_send_ptr_->RegisterTargetTransferRateObserver(this); - module_process_thread_->Start(); + module_process_thread_->EnsureStarted(); } void Call::SetClientBitratePreferences(const BitrateSettings& preferences) { - RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); + RTC_DCHECK_RUN_ON(worker_thread_); GetTransportControllerSend()->SetClientBitratePreferences(preferences); } @@ -609,14 +758,14 @@ void Call::UpdateReceiveHistograms() { } PacketReceiver* Call::Receiver() { - RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); + RTC_DCHECK_RUN_ON(worker_thread_); return this; } webrtc::AudioSendStream* Call::CreateAudioSendStream( const webrtc::AudioSendStream::Config& config) { TRACE_EVENT0("webrtc", "Call::CreateAudioSendStream"); - RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); + RTC_DCHECK_RUN_ON(worker_thread_); RegisterRateObserver(); @@ -632,30 +781,26 @@ webrtc::AudioSendStream* Call::CreateAudioSendStream( AudioSendStream* send_stream = new AudioSendStream( clock_, config, config_.audio_state, task_queue_factory_, - module_process_thread_.get(), transport_send_ptr_, + module_process_thread_->process_thread(), transport_send_ptr_, bitrate_allocator_.get(), event_log_, call_stats_->AsRtcpRttStats(), suspended_rtp_state); - { - WriteLockScoped write_lock(*send_crit_); - RTC_DCHECK(audio_send_ssrcs_.find(config.rtp.ssrc) == - audio_send_ssrcs_.end()); - audio_send_ssrcs_[config.rtp.ssrc] = send_stream; - } - { - ReadLockScoped read_lock(*receive_crit_); - for (AudioReceiveStream* stream : audio_receive_streams_) { - if (stream->config().rtp.local_ssrc == config.rtp.ssrc) { - stream->AssociateSendStream(send_stream); - } + RTC_DCHECK(audio_send_ssrcs_.find(config.rtp.ssrc) == + audio_send_ssrcs_.end()); + audio_send_ssrcs_[config.rtp.ssrc] = send_stream; + + for (AudioReceiveStream* stream : audio_receive_streams_) { + if (stream->config().rtp.local_ssrc == config.rtp.ssrc) { + stream->AssociateSendStream(send_stream); } } + UpdateAggregateNetworkState(); return send_stream; } void Call::DestroyAudioSendStream(webrtc::AudioSendStream* send_stream) { TRACE_EVENT0("webrtc", "Call::DestroyAudioSendStream"); - RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); + RTC_DCHECK_RUN_ON(worker_thread_); RTC_DCHECK(send_stream != nullptr); send_stream->Stop(); @@ -664,19 +809,16 @@ void Call::DestroyAudioSendStream(webrtc::AudioSendStream* send_stream) { webrtc::internal::AudioSendStream* audio_send_stream = static_cast<webrtc::internal::AudioSendStream*>(send_stream); suspended_audio_send_ssrcs_[ssrc] = audio_send_stream->GetRtpState(); - { - WriteLockScoped write_lock(*send_crit_); - size_t num_deleted = audio_send_ssrcs_.erase(ssrc); - RTC_DCHECK_EQ(1, num_deleted); - } - { - ReadLockScoped read_lock(*receive_crit_); - for (AudioReceiveStream* stream : audio_receive_streams_) { - if (stream->config().rtp.local_ssrc == ssrc) { - stream->AssociateSendStream(nullptr); - } + + size_t num_deleted = audio_send_ssrcs_.erase(ssrc); + RTC_DCHECK_EQ(1, num_deleted); + + for (AudioReceiveStream* stream : audio_receive_streams_) { + if (stream->config().rtp.local_ssrc == ssrc) { + stream->AssociateSendStream(nullptr); } } + UpdateAggregateNetworkState(); delete send_stream; } @@ -684,29 +826,25 @@ void Call::DestroyAudioSendStream(webrtc::AudioSendStream* send_stream) { webrtc::AudioReceiveStream* Call::CreateAudioReceiveStream( const webrtc::AudioReceiveStream::Config& config) { TRACE_EVENT0("webrtc", "Call::CreateAudioReceiveStream"); - RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); + RTC_DCHECK_RUN_ON(worker_thread_); RegisterRateObserver(); event_log_->Log(std::make_unique<RtcEventAudioReceiveStreamConfig>( CreateRtcLogStreamConfig(config))); AudioReceiveStream* receive_stream = new AudioReceiveStream( clock_, &audio_receiver_controller_, transport_send_ptr_->packet_router(), - module_process_thread_.get(), config_.neteq_factory, config, + module_process_thread_->process_thread(), config_.neteq_factory, config, config_.audio_state, event_log_); - { - WriteLockScoped write_lock(*receive_crit_); - receive_rtp_config_.emplace(config.rtp.remote_ssrc, - ReceiveRtpConfig(config)); - audio_receive_streams_.insert(receive_stream); - ConfigureSync(config.sync_group); - } - { - ReadLockScoped read_lock(*send_crit_); - auto it = audio_send_ssrcs_.find(config.rtp.local_ssrc); - if (it != audio_send_ssrcs_.end()) { - receive_stream->AssociateSendStream(it->second); - } + receive_rtp_config_.emplace(config.rtp.remote_ssrc, ReceiveRtpConfig(config)); + audio_receive_streams_.insert(receive_stream); + + ConfigureSync(config.sync_group); + + auto it = audio_send_ssrcs_.find(config.rtp.local_ssrc); + if (it != audio_send_ssrcs_.end()) { + receive_stream->AssociateSendStream(it->second); } + UpdateAggregateNetworkState(); return receive_stream; } @@ -714,26 +852,24 @@ webrtc::AudioReceiveStream* Call::CreateAudioReceiveStream( void Call::DestroyAudioReceiveStream( webrtc::AudioReceiveStream* receive_stream) { TRACE_EVENT0("webrtc", "Call::DestroyAudioReceiveStream"); - RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); + RTC_DCHECK_RUN_ON(worker_thread_); RTC_DCHECK(receive_stream != nullptr); webrtc::internal::AudioReceiveStream* audio_receive_stream = static_cast<webrtc::internal::AudioReceiveStream*>(receive_stream); - { - WriteLockScoped write_lock(*receive_crit_); - const AudioReceiveStream::Config& config = audio_receive_stream->config(); - uint32_t ssrc = config.rtp.remote_ssrc; - receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config)) - ->RemoveStream(ssrc); - audio_receive_streams_.erase(audio_receive_stream); - const std::string& sync_group = audio_receive_stream->config().sync_group; - const auto it = sync_stream_mapping_.find(sync_group); - if (it != sync_stream_mapping_.end() && - it->second == audio_receive_stream) { - sync_stream_mapping_.erase(it); - ConfigureSync(sync_group); - } - receive_rtp_config_.erase(ssrc); + + const AudioReceiveStream::Config& config = audio_receive_stream->config(); + uint32_t ssrc = config.rtp.remote_ssrc; + receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config)) + ->RemoveStream(ssrc); + audio_receive_streams_.erase(audio_receive_stream); + const std::string& sync_group = audio_receive_stream->config().sync_group; + const auto it = sync_stream_mapping_.find(sync_group); + if (it != sync_stream_mapping_.end() && it->second == audio_receive_stream) { + sync_stream_mapping_.erase(it); + ConfigureSync(sync_group); } + receive_rtp_config_.erase(ssrc); + UpdateAggregateNetworkState(); delete audio_receive_stream; } @@ -744,7 +880,7 @@ webrtc::VideoSendStream* Call::CreateVideoSendStream( VideoEncoderConfig encoder_config, std::unique_ptr<FecController> fec_controller) { TRACE_EVENT0("webrtc", "Call::CreateVideoSendStream"); - RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); + RTC_DCHECK_RUN_ON(worker_thread_); RegisterRateObserver(); @@ -761,20 +897,22 @@ webrtc::VideoSendStream* Call::CreateVideoSendStream( std::vector<uint32_t> ssrcs = config.rtp.ssrcs; VideoSendStream* send_stream = new VideoSendStream( - clock_, num_cpu_cores_, module_process_thread_.get(), task_queue_factory_, - call_stats_->AsRtcpRttStats(), transport_send_ptr_, + clock_, num_cpu_cores_, module_process_thread_->process_thread(), + task_queue_factory_, call_stats_->AsRtcpRttStats(), transport_send_ptr_, bitrate_allocator_.get(), video_send_delay_stats_.get(), event_log_, std::move(config), std::move(encoder_config), suspended_video_send_ssrcs_, suspended_video_payload_states_, std::move(fec_controller)); - { - WriteLockScoped write_lock(*send_crit_); - for (uint32_t ssrc : ssrcs) { - RTC_DCHECK(video_send_ssrcs_.find(ssrc) == video_send_ssrcs_.end()); - video_send_ssrcs_[ssrc] = send_stream; - } - video_send_streams_.insert(send_stream); + for (uint32_t ssrc : ssrcs) { + RTC_DCHECK(video_send_ssrcs_.find(ssrc) == video_send_ssrcs_.end()); + video_send_ssrcs_[ssrc] = send_stream; + } + video_send_streams_.insert(send_stream); + // Forward resources that were previously added to the call to the new stream. + for (const auto& resource_forwarder : adaptation_resource_forwarders_) { + resource_forwarder->OnCreateVideoSendStream(send_stream); } + UpdateAggregateNetworkState(); return send_stream; @@ -797,24 +935,27 @@ webrtc::VideoSendStream* Call::CreateVideoSendStream( void Call::DestroyVideoSendStream(webrtc::VideoSendStream* send_stream) { TRACE_EVENT0("webrtc", "Call::DestroyVideoSendStream"); RTC_DCHECK(send_stream != nullptr); - RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); + RTC_DCHECK_RUN_ON(worker_thread_); send_stream->Stop(); VideoSendStream* send_stream_impl = nullptr; - { - WriteLockScoped write_lock(*send_crit_); - auto it = video_send_ssrcs_.begin(); - while (it != video_send_ssrcs_.end()) { - if (it->second == static_cast<VideoSendStream*>(send_stream)) { - send_stream_impl = it->second; - video_send_ssrcs_.erase(it++); - } else { - ++it; - } + + auto it = video_send_ssrcs_.begin(); + while (it != video_send_ssrcs_.end()) { + if (it->second == static_cast<VideoSendStream*>(send_stream)) { + send_stream_impl = it->second; + video_send_ssrcs_.erase(it++); + } else { + ++it; } - video_send_streams_.erase(send_stream_impl); } + // Stop forwarding resources to the stream being destroyed. + for (const auto& resource_forwarder : adaptation_resource_forwarders_) { + resource_forwarder->OnDestroyVideoSendStream(send_stream_impl); + } + video_send_streams_.erase(send_stream_impl); + RTC_CHECK(send_stream_impl != nullptr); VideoSendStream::RtpStateMap rtp_states; @@ -835,7 +976,7 @@ void Call::DestroyVideoSendStream(webrtc::VideoSendStream* send_stream) { webrtc::VideoReceiveStream* Call::CreateVideoReceiveStream( webrtc::VideoReceiveStream::Config configuration) { TRACE_EVENT0("webrtc", "Call::CreateVideoReceiveStream"); - RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); + RTC_DCHECK_RUN_ON(worker_thread_); receive_side_cc_.SetSendPeriodicFeedback( SendPeriodicFeedback(configuration.rtp.extensions)); @@ -847,25 +988,21 @@ webrtc::VideoReceiveStream* Call::CreateVideoReceiveStream( VideoReceiveStream2* receive_stream = new VideoReceiveStream2( task_queue_factory_, current, &video_receiver_controller_, num_cpu_cores_, transport_send_ptr_->packet_router(), std::move(configuration), - module_process_thread_.get(), call_stats_.get(), clock_, + module_process_thread_->process_thread(), call_stats_.get(), clock_, new VCMTiming(clock_)); const webrtc::VideoReceiveStream::Config& config = receive_stream->config(); - { - WriteLockScoped write_lock(*receive_crit_); - if (config.rtp.rtx_ssrc) { - // We record identical config for the rtx stream as for the main - // stream. Since the transport_send_cc negotiation is per payload - // type, we may get an incorrect value for the rtx stream, but - // that is unlikely to matter in practice. - receive_rtp_config_.emplace(config.rtp.rtx_ssrc, - ReceiveRtpConfig(config)); - } - receive_rtp_config_.emplace(config.rtp.remote_ssrc, - ReceiveRtpConfig(config)); - video_receive_streams_.insert(receive_stream); - ConfigureSync(config.sync_group); + if (config.rtp.rtx_ssrc) { + // We record identical config for the rtx stream as for the main + // stream. Since the transport_send_cc negotiation is per payload + // type, we may get an incorrect value for the rtx stream, but + // that is unlikely to matter in practice. + receive_rtp_config_.emplace(config.rtp.rtx_ssrc, ReceiveRtpConfig(config)); } + receive_rtp_config_.emplace(config.rtp.remote_ssrc, ReceiveRtpConfig(config)); + video_receive_streams_.insert(receive_stream); + ConfigureSync(config.sync_group); + receive_stream->SignalNetworkState(video_network_state_); UpdateAggregateNetworkState(); event_log_->Log(std::make_unique<RtcEventVideoReceiveStreamConfig>( @@ -876,22 +1013,20 @@ webrtc::VideoReceiveStream* Call::CreateVideoReceiveStream( void Call::DestroyVideoReceiveStream( webrtc::VideoReceiveStream* receive_stream) { TRACE_EVENT0("webrtc", "Call::DestroyVideoReceiveStream"); - RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); + RTC_DCHECK_RUN_ON(worker_thread_); RTC_DCHECK(receive_stream != nullptr); VideoReceiveStream2* receive_stream_impl = static_cast<VideoReceiveStream2*>(receive_stream); const VideoReceiveStream::Config& config = receive_stream_impl->config(); - { - WriteLockScoped write_lock(*receive_crit_); - // Remove all ssrcs pointing to a receive stream. As RTX retransmits on a - // separate SSRC there can be either one or two. - receive_rtp_config_.erase(config.rtp.remote_ssrc); - if (config.rtp.rtx_ssrc) { - receive_rtp_config_.erase(config.rtp.rtx_ssrc); - } - video_receive_streams_.erase(receive_stream_impl); - ConfigureSync(config.sync_group); + + // Remove all ssrcs pointing to a receive stream. As RTX retransmits on a + // separate SSRC there can be either one or two. + receive_rtp_config_.erase(config.rtp.remote_ssrc); + if (config.rtp.rtx_ssrc) { + receive_rtp_config_.erase(config.rtp.rtx_ssrc); } + video_receive_streams_.erase(receive_stream_impl); + ConfigureSync(config.sync_group); receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config)) ->RemoveStream(config.rtp.remote_ssrc); @@ -903,30 +1038,25 @@ void Call::DestroyVideoReceiveStream( FlexfecReceiveStream* Call::CreateFlexfecReceiveStream( const FlexfecReceiveStream::Config& config) { TRACE_EVENT0("webrtc", "Call::CreateFlexfecReceiveStream"); - RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); + RTC_DCHECK_RUN_ON(worker_thread_); RecoveredPacketReceiver* recovered_packet_receiver = this; FlexfecReceiveStreamImpl* receive_stream; - { - WriteLockScoped write_lock(*receive_crit_); - // Unlike the video and audio receive streams, - // FlexfecReceiveStream implements RtpPacketSinkInterface itself, - // and hence its constructor passes its |this| pointer to - // video_receiver_controller_->CreateStream(). Calling the - // constructor while holding |receive_crit_| ensures that we don't - // call OnRtpPacket until the constructor is finished and the - // object is in a valid state. - // TODO(nisse): Fix constructor so that it can be moved outside of - // this locked scope. - receive_stream = new FlexfecReceiveStreamImpl( - clock_, &video_receiver_controller_, config, recovered_packet_receiver, - call_stats_->AsRtcpRttStats(), module_process_thread_.get()); - - RTC_DCHECK(receive_rtp_config_.find(config.remote_ssrc) == - receive_rtp_config_.end()); - receive_rtp_config_.emplace(config.remote_ssrc, ReceiveRtpConfig(config)); - } + + // Unlike the video and audio receive streams, FlexfecReceiveStream implements + // RtpPacketSinkInterface itself, and hence its constructor passes its |this| + // pointer to video_receiver_controller_->CreateStream(). Calling the + // constructor while on the worker thread ensures that we don't call + // OnRtpPacket until the constructor is finished and the object is + // in a valid state, since OnRtpPacket runs on the same thread. + receive_stream = new FlexfecReceiveStreamImpl( + clock_, &video_receiver_controller_, config, recovered_packet_receiver, + call_stats_->AsRtcpRttStats(), module_process_thread_->process_thread()); + + RTC_DCHECK(receive_rtp_config_.find(config.remote_ssrc) == + receive_rtp_config_.end()); + receive_rtp_config_.emplace(config.remote_ssrc, ReceiveRtpConfig(config)); // TODO(brandtr): Store config in RtcEventLog here. @@ -935,39 +1065,37 @@ FlexfecReceiveStream* Call::CreateFlexfecReceiveStream( void Call::DestroyFlexfecReceiveStream(FlexfecReceiveStream* receive_stream) { TRACE_EVENT0("webrtc", "Call::DestroyFlexfecReceiveStream"); - RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); + RTC_DCHECK_RUN_ON(worker_thread_); RTC_DCHECK(receive_stream != nullptr); - { - WriteLockScoped write_lock(*receive_crit_); + const FlexfecReceiveStream::Config& config = receive_stream->GetConfig(); + uint32_t ssrc = config.remote_ssrc; + receive_rtp_config_.erase(ssrc); - const FlexfecReceiveStream::Config& config = receive_stream->GetConfig(); - uint32_t ssrc = config.remote_ssrc; - receive_rtp_config_.erase(ssrc); - - // Remove all SSRCs pointing to the FlexfecReceiveStreamImpl to be - // destroyed. - receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config)) - ->RemoveStream(ssrc); - } + // Remove all SSRCs pointing to the FlexfecReceiveStreamImpl to be + // destroyed. + receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config)) + ->RemoveStream(ssrc); delete receive_stream; } +void Call::AddAdaptationResource(rtc::scoped_refptr<Resource> resource) { + RTC_DCHECK_RUN_ON(worker_thread_); + adaptation_resource_forwarders_.push_back( + std::make_unique<ResourceVideoSendStreamForwarder>(resource)); + const auto& resource_forwarder = adaptation_resource_forwarders_.back(); + for (VideoSendStream* send_stream : video_send_streams_) { + resource_forwarder->OnCreateVideoSendStream(send_stream); + } +} + RtpTransportControllerSendInterface* Call::GetTransportControllerSend() { return transport_send_ptr_; } Call::Stats Call::GetStats() const { - RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); - - // TODO(tommi): The following stats are managed on the process thread: - // - pacer_delay_ms (PacedSender::Process) - // - rtt_ms - // - recv_bandwidth_bps - // These are delivered on the network TQ: - // - send_bandwidth_bps (see OnTargetTransferRate) - // - max_padding_bitrate_bps (see OnAllocationLimitsChanged) + RTC_DCHECK_RUN_ON(worker_thread_); Stats stats; // TODO(srte): It is unclear if we only want to report queues if network is @@ -983,22 +1111,14 @@ Call::Stats Call::GetStats() const { receive_side_cc_.GetRemoteBitrateEstimator(false)->LatestEstimate( &ssrcs, &recv_bandwidth); stats.recv_bandwidth_bps = recv_bandwidth; - - { - rtc::CritScope cs(&last_bandwidth_bps_crit_); - stats.send_bandwidth_bps = last_bandwidth_bps_; - } - - { - rtc::CritScope cs(&bitrate_crit_); - stats.max_padding_bitrate_bps = configured_max_padding_bitrate_bps_; - } + stats.send_bandwidth_bps = last_bandwidth_bps_; + stats.max_padding_bitrate_bps = configured_max_padding_bitrate_bps_; return stats; } void Call::SignalChannelNetworkState(MediaType media, NetworkState state) { - RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); + RTC_DCHECK_RUN_ON(worker_thread_); switch (media) { case MediaType::AUDIO: audio_network_state_ = state; @@ -1013,40 +1133,25 @@ void Call::SignalChannelNetworkState(MediaType media, NetworkState state) { } UpdateAggregateNetworkState(); - { - ReadLockScoped read_lock(*receive_crit_); - for (VideoReceiveStream2* video_receive_stream : video_receive_streams_) { - video_receive_stream->SignalNetworkState(video_network_state_); - } + for (VideoReceiveStream2* video_receive_stream : video_receive_streams_) { + video_receive_stream->SignalNetworkState(video_network_state_); } } void Call::OnAudioTransportOverheadChanged(int transport_overhead_per_packet) { - ReadLockScoped read_lock(*send_crit_); + RTC_DCHECK_RUN_ON(worker_thread_); for (auto& kv : audio_send_ssrcs_) { kv.second->SetTransportOverhead(transport_overhead_per_packet); } } void Call::UpdateAggregateNetworkState() { - RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); + RTC_DCHECK_RUN_ON(worker_thread_); - bool have_audio = false; - bool have_video = false; - { - ReadLockScoped read_lock(*send_crit_); - if (!audio_send_ssrcs_.empty()) - have_audio = true; - if (!video_send_ssrcs_.empty()) - have_video = true; - } - { - ReadLockScoped read_lock(*receive_crit_); - if (!audio_receive_streams_.empty()) - have_audio = true; - if (!video_receive_streams_.empty()) - have_video = true; - } + bool have_audio = + !audio_send_ssrcs_.empty() || !audio_receive_streams_.empty(); + bool have_video = + !video_send_ssrcs_.empty() || !video_receive_streams_.empty(); bool aggregate_network_up = ((have_video && video_network_state_ == kNetworkUp) || @@ -1073,61 +1178,50 @@ void Call::OnSentPacket(const rtc::SentPacket& sent_packet) { } void Call::OnStartRateUpdate(DataRate start_rate) { - RTC_DCHECK(network_queue()->IsCurrent()); + RTC_DCHECK_RUN_ON(send_transport_queue()); bitrate_allocator_->UpdateStartRate(start_rate.bps<uint32_t>()); } void Call::OnTargetTransferRate(TargetTransferRate msg) { - RTC_DCHECK(network_queue()->IsCurrent()); - RTC_DCHECK_RUN_ON(&worker_sequence_checker_); - { - rtc::CritScope cs(&last_bandwidth_bps_crit_); - last_bandwidth_bps_ = msg.target_rate.bps(); - } + RTC_DCHECK_RUN_ON(send_transport_queue()); uint32_t target_bitrate_bps = msg.target_rate.bps(); // For controlling the rate of feedback messages. receive_side_cc_.OnBitrateChanged(target_bitrate_bps); bitrate_allocator_->OnNetworkEstimateChanged(msg); - // Ignore updates if bitrate is zero (the aggregate network state is down). - if (target_bitrate_bps == 0) { - rtc::CritScope lock(&bitrate_crit_); - estimated_send_bitrate_kbps_counter_.ProcessAndPause(); - pacer_bitrate_kbps_counter_.ProcessAndPause(); - return; - } - - bool sending_video; - { - ReadLockScoped read_lock(*send_crit_); - sending_video = !video_send_streams_.empty(); - } + worker_thread_->PostTask( + ToQueuedTask(task_safety_, [this, target_bitrate_bps]() { + RTC_DCHECK_RUN_ON(worker_thread_); + last_bandwidth_bps_ = target_bitrate_bps; + + // Ignore updates if bitrate is zero (the aggregate network state is + // down) or if we're not sending video. + if (target_bitrate_bps == 0 || video_send_streams_.empty()) { + estimated_send_bitrate_kbps_counter_.ProcessAndPause(); + pacer_bitrate_kbps_counter_.ProcessAndPause(); + return; + } - rtc::CritScope lock(&bitrate_crit_); - if (!sending_video) { - // Do not update the stats if we are not sending video. - estimated_send_bitrate_kbps_counter_.ProcessAndPause(); - pacer_bitrate_kbps_counter_.ProcessAndPause(); - return; - } - estimated_send_bitrate_kbps_counter_.Add(target_bitrate_bps / 1000); - // Pacer bitrate may be higher than bitrate estimate if enforcing min bitrate. - uint32_t pacer_bitrate_bps = - std::max(target_bitrate_bps, min_allocated_send_bitrate_bps_); - pacer_bitrate_kbps_counter_.Add(pacer_bitrate_bps / 1000); + estimated_send_bitrate_kbps_counter_.Add(target_bitrate_bps / 1000); + // Pacer bitrate may be higher than bitrate estimate if enforcing min + // bitrate. + uint32_t pacer_bitrate_bps = + std::max(target_bitrate_bps, min_allocated_send_bitrate_bps_); + pacer_bitrate_kbps_counter_.Add(pacer_bitrate_bps / 1000); + })); } void Call::OnAllocationLimitsChanged(BitrateAllocationLimits limits) { - RTC_DCHECK(network_queue()->IsCurrent()); - RTC_DCHECK_RUN_ON(&worker_sequence_checker_); + RTC_DCHECK_RUN_ON(send_transport_queue()); transport_send_ptr_->SetAllocatedSendBitrateLimits(limits); - min_allocated_send_bitrate_bps_ = limits.min_allocatable_rate.bps(); - - rtc::CritScope lock(&bitrate_crit_); - configured_max_padding_bitrate_bps_ = limits.max_padding_rate.bps(); + worker_thread_->PostTask(ToQueuedTask(task_safety_, [this, limits]() { + RTC_DCHECK_RUN_ON(worker_thread_); + min_allocated_send_bitrate_bps_ = limits.min_allocatable_rate.bps(); + configured_max_padding_bitrate_bps_ = limits.max_padding_rate.bps(); + })); } void Call::ConfigureSync(const std::string& sync_group) { @@ -1194,28 +1288,24 @@ PacketReceiver::DeliveryStatus Call::DeliverRtcp(MediaType media_type, } bool rtcp_delivered = false; if (media_type == MediaType::ANY || media_type == MediaType::VIDEO) { - ReadLockScoped read_lock(*receive_crit_); for (VideoReceiveStream2* stream : video_receive_streams_) { if (stream->DeliverRtcp(packet, length)) rtcp_delivered = true; } } if (media_type == MediaType::ANY || media_type == MediaType::AUDIO) { - ReadLockScoped read_lock(*receive_crit_); for (AudioReceiveStream* stream : audio_receive_streams_) { stream->DeliverRtcp(packet, length); rtcp_delivered = true; } } if (media_type == MediaType::ANY || media_type == MediaType::VIDEO) { - ReadLockScoped read_lock(*send_crit_); for (VideoSendStream* stream : video_send_streams_) { stream->DeliverRtcp(packet, length); rtcp_delivered = true; } } if (media_type == MediaType::ANY || media_type == MediaType::AUDIO) { - ReadLockScoped read_lock(*send_crit_); for (auto& kv : audio_send_ssrcs_) { kv.second->DeliverRtcp(packet, length); rtcp_delivered = true; @@ -1259,17 +1349,15 @@ PacketReceiver::DeliveryStatus Call::DeliverRtp(MediaType media_type, RTC_DCHECK(media_type == MediaType::AUDIO || media_type == MediaType::VIDEO || is_keep_alive_packet); - ReadLockScoped read_lock(*receive_crit_); auto it = receive_rtp_config_.find(parsed_packet.Ssrc()); if (it == receive_rtp_config_.end()) { RTC_LOG(LS_ERROR) << "receive_rtp_config_ lookup failed for ssrc " << parsed_packet.Ssrc(); // Destruction of the receive stream, including deregistering from the - // RtpDemuxer, is not protected by the |receive_crit_| lock. But - // deregistering in the |receive_rtp_config_| map is protected by that lock. - // So by not passing the packet on to demuxing in this case, we prevent - // incoming packets to be passed on via the demuxer to a receive stream - // which is being torned down. + // RtpDemuxer, is not protected by the |worker_thread_|. + // But deregistering in the |receive_rtp_config_| map is. So by not passing + // the packet on to demuxing in this case, we prevent incoming packets to be + // passed on via the demuxer to a receive stream which is being torned down. return DELIVERY_UNKNOWN_SSRC; } @@ -1315,7 +1403,8 @@ PacketReceiver::DeliveryStatus Call::DeliverPacket( MediaType media_type, rtc::CopyOnWriteBuffer packet, int64_t packet_time_us) { - RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); + RTC_DCHECK_RUN_ON(worker_thread_); + if (IsRtcp(packet.cdata(), packet.size())) return DeliverRtcp(media_type, packet.cdata(), packet.size()); @@ -1323,20 +1412,20 @@ PacketReceiver::DeliveryStatus Call::DeliverPacket( } void Call::OnRecoveredPacket(const uint8_t* packet, size_t length) { + RTC_DCHECK_RUN_ON(worker_thread_); RtpPacketReceived parsed_packet; if (!parsed_packet.Parse(packet, length)) return; parsed_packet.set_recovered(true); - ReadLockScoped read_lock(*receive_crit_); auto it = receive_rtp_config_.find(parsed_packet.Ssrc()); if (it == receive_rtp_config_.end()) { RTC_LOG(LS_ERROR) << "receive_rtp_config_ lookup failed for ssrc " << parsed_packet.Ssrc(); // Destruction of the receive stream, including deregistering from the - // RtpDemuxer, is not protected by the |receive_crit_| lock. But - // deregistering in the |receive_rtp_config_| map is protected by that lock. + // RtpDemuxer, is not protected by the |worker_thread_|. + // But deregistering in the |receive_rtp_config_| map is. // So by not passing the packet on to demuxing in this case, we prevent // incoming packets to be passed on via the demuxer to a receive stream // which is being torn down. |