// Copyright 2013 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "media/blink/multibuffer_data_source.h" #include #include "base/bind.h" #include "base/callback_helpers.h" #include "base/location.h" #include "base/macros.h" #include "base/numerics/ranges.h" #include "base/numerics/safe_conversions.h" #include "base/single_thread_task_runner.h" #include "media/base/media_log.h" #include "media/blink/buffered_data_source_host_impl.h" #include "media/blink/multibuffer_reader.h" #include "net/base/net_errors.h" namespace { // Minimum preload buffer. const int64_t kMinBufferPreload = 2 << 20; // 2 Mb // Maxmimum preload buffer. const int64_t kMaxBufferPreload = 50 << 20; // 50 Mb // If preload_ == METADATA, preloading size will be // shifted down this many bits. This shift turns // one Mb into one 32k block. // This seems to be the smallest amount of preload we can do without // ending up repeatedly closing and re-opening the connection // due to read calls after OnBufferingHaveEnough have been called. const int64_t kMetadataShift = 6; // Preload this much extra, then stop preloading until we fall below the // kTargetSecondsBufferedAhead. const int64_t kPreloadHighExtra = 1 << 20; // 1 Mb // Default pin region size. // Note that we go over this if preload is calculated high enough. const int64_t kDefaultPinSize = 25 << 20; // 25 Mb // If bitrate is not known, use this. const int64_t kDefaultBitrate = 200 * 8 << 10; // 200 Kbps. // Maximum bitrate for buffer calculations. const int64_t kMaxBitrate = 20 * 8 << 20; // 20 Mbps. // Maximum playback rate for buffer calculations. const double kMaxPlaybackRate = 25.0; // Preload this many seconds of data by default. const int64_t kTargetSecondsBufferedAhead = 10; // Keep this many seconds of data for going back by default. const int64_t kTargetSecondsBufferedBehind = 2; // Extra buffer accumulation speed, in terms of download buffer. const int kSlowPreloadPercentage = 10; // Update buffer sizes every 32 progress updates. const int kUpdateBufferSizeFrequency = 32; // How long to we delay a seek after a read? constexpr base::TimeDelta kSeekDelay = base::TimeDelta::FromMilliseconds(20); } // namespace namespace media { class MultibufferDataSource::ReadOperation { public: ReadOperation(int64_t position, int size, uint8_t* data, DataSource::ReadCB callback); ~ReadOperation(); // Runs |callback_| with the given |result|, deleting the operation // afterwards. static void Run(std::unique_ptr read_op, int result); int64_t position() { return position_; } int size() { return size_; } uint8_t* data() { return data_; } private: const int64_t position_; const int size_; uint8_t* data_; DataSource::ReadCB callback_; DISALLOW_IMPLICIT_CONSTRUCTORS(ReadOperation); }; MultibufferDataSource::ReadOperation::ReadOperation(int64_t position, int size, uint8_t* data, DataSource::ReadCB callback) : position_(position), size_(size), data_(data), callback_(std::move(callback)) { DCHECK(!callback_.is_null()); } MultibufferDataSource::ReadOperation::~ReadOperation() { DCHECK(callback_.is_null()); } // static void MultibufferDataSource::ReadOperation::Run( std::unique_ptr read_op, int result) { std::move(read_op->callback_).Run(result); } MultibufferDataSource::MultibufferDataSource( const scoped_refptr& task_runner, scoped_refptr url_data_arg, MediaLog* media_log, BufferedDataSourceHost* host, DownloadingCB downloading_cb) : total_bytes_(kPositionNotSpecified), streaming_(false), loading_(false), failed_(false), render_task_runner_(task_runner), url_data_(std::move(url_data_arg)), stop_signal_received_(false), media_has_played_(false), single_origin_(true), cancel_on_defer_(false), preload_(AUTO), bitrate_(0), playback_rate_(0.0), media_log_(media_log), host_(host), downloading_cb_(std::move(downloading_cb)) { weak_ptr_ = weak_factory_.GetWeakPtr(); DCHECK(host_); DCHECK(downloading_cb_); DCHECK(render_task_runner_->BelongsToCurrentThread()); DCHECK(url_data_.get()); url_data_->Use(); url_data_->OnRedirect( base::BindOnce(&MultibufferDataSource::OnRedirect, weak_ptr_)); } MultibufferDataSource::~MultibufferDataSource() { DCHECK(render_task_runner_->BelongsToCurrentThread()); } bool MultibufferDataSource::media_has_played() const { return media_has_played_; } bool MultibufferDataSource::AssumeFullyBuffered() const { DCHECK(url_data_); return !url_data_->url().SchemeIsHTTPOrHTTPS(); } void MultibufferDataSource::SetReader(MultiBufferReader* reader) { DCHECK(render_task_runner_->BelongsToCurrentThread()); base::AutoLock auto_lock(lock_); reader_.reset(reader); } void MultibufferDataSource::CreateResourceLoader(int64_t first_byte_position, int64_t last_byte_position) { DCHECK(render_task_runner_->BelongsToCurrentThread()); SetReader(new MultiBufferReader( url_data_->multibuffer(), first_byte_position, last_byte_position, base::BindRepeating(&MultibufferDataSource::ProgressCallback, weak_ptr_))); reader_->SetIsClientAudioElement(is_client_audio_element_); UpdateBufferSizes(); } void MultibufferDataSource::CreateResourceLoader_Locked( int64_t first_byte_position, int64_t last_byte_position) { DCHECK(render_task_runner_->BelongsToCurrentThread()); lock_.AssertAcquired(); reader_ = std::make_unique( url_data_->multibuffer(), first_byte_position, last_byte_position, base::BindRepeating(&MultibufferDataSource::ProgressCallback, weak_ptr_)); UpdateBufferSizes(); } void MultibufferDataSource::Initialize(InitializeCB init_cb) { DCHECK(render_task_runner_->BelongsToCurrentThread()); DCHECK(init_cb); DCHECK(!reader_.get()); init_cb_ = std::move(init_cb); CreateResourceLoader(0, kPositionNotSpecified); // We're not allowed to call Wait() if data is already available. if (reader_->Available()) { render_task_runner_->PostTask( FROM_HERE, base::BindOnce(&MultibufferDataSource::StartCallback, weak_ptr_)); // When the entire file is already in the cache, we won't get any more // progress callbacks, which breaks some expectations. Post a task to // make sure that the client gets at least one call each for the progress // and loading callbacks. render_task_runner_->PostTask( FROM_HERE, base::BindOnce(&MultibufferDataSource::UpdateProgress, weak_factory_.GetWeakPtr())); } else { reader_->Wait( 1, base::BindOnce(&MultibufferDataSource::StartCallback, weak_ptr_)); } } void MultibufferDataSource::OnRedirect( const scoped_refptr& destination) { if (!destination) { // A failure occured. failed_ = true; if (init_cb_) { render_task_runner_->PostTask( FROM_HERE, base::BindOnce(&MultibufferDataSource::StartCallback, weak_ptr_)); } else { base::AutoLock auto_lock(lock_); StopInternal_Locked(); } StopLoader(); return; } if (url_data_->url().GetOrigin() != destination->url().GetOrigin()) { single_origin_ = false; } SetReader(nullptr); url_data_ = std::move(destination); if (url_data_) { url_data_->OnRedirect( base::BindOnce(&MultibufferDataSource::OnRedirect, weak_ptr_)); if (init_cb_) { CreateResourceLoader(0, kPositionNotSpecified); if (reader_->Available()) { render_task_runner_->PostTask( FROM_HERE, base::BindOnce(&MultibufferDataSource::StartCallback, weak_ptr_)); } else { reader_->Wait(1, base::BindOnce(&MultibufferDataSource::StartCallback, weak_ptr_)); } } else if (read_op_) { CreateResourceLoader(read_op_->position(), kPositionNotSpecified); if (reader_->Available()) { render_task_runner_->PostTask( FROM_HERE, base::BindOnce(&MultibufferDataSource::ReadTask, weak_ptr_)); } else { reader_->Wait( 1, base::BindOnce(&MultibufferDataSource::ReadTask, weak_ptr_)); } } } } void MultibufferDataSource::SetPreload(Preload preload) { DVLOG(1) << __func__ << "(" << preload << ")"; DCHECK(render_task_runner_->BelongsToCurrentThread()); preload_ = preload; UpdateBufferSizes(); } bool MultibufferDataSource::HasSingleOrigin() { DCHECK(render_task_runner_->BelongsToCurrentThread()); // Before initialization completes there is no risk of leaking data. Callers // are required to order checks such that this isn't a race. return single_origin_; } bool MultibufferDataSource::IsCorsCrossOrigin() const { return url_data_->is_cors_cross_origin(); } bool MultibufferDataSource::HasAccessControl() const { return url_data_->has_access_control(); } UrlData::CorsMode MultibufferDataSource::cors_mode() const { return url_data_->cors_mode(); } void MultibufferDataSource::MediaPlaybackRateChanged(double playback_rate) { DCHECK(render_task_runner_->BelongsToCurrentThread()); if (playback_rate < 0 || playback_rate == playback_rate_) return; playback_rate_ = playback_rate; cancel_on_defer_ = false; UpdateBufferSizes(); } void MultibufferDataSource::MediaIsPlaying() { DCHECK(render_task_runner_->BelongsToCurrentThread()); // Always clear this since it can be set by OnBufferingHaveEnough() calls at // any point in time. cancel_on_defer_ = false; if (media_has_played_) return; media_has_played_ = true; // Once we start playing, we need preloading. preload_ = AUTO; UpdateBufferSizes(); } ///////////////////////////////////////////////////////////////////////////// // DataSource implementation. void MultibufferDataSource::Stop() { { base::AutoLock auto_lock(lock_); StopInternal_Locked(); // Cleanup resources immediately if we're already on the right thread. if (render_task_runner_->BelongsToCurrentThread()) { reader_.reset(); url_data_.reset(); return; } } render_task_runner_->PostTask( FROM_HERE, base::BindOnce(&MultibufferDataSource::StopLoader, weak_factory_.GetWeakPtr())); } void MultibufferDataSource::Abort() { base::AutoLock auto_lock(lock_); DCHECK(!init_cb_); if (read_op_) ReadOperation::Run(std::move(read_op_), kAborted); // Abort does not call StopLoader() since it is typically called prior to a // seek or suspend. Let the loader logic make the decision about whether a new // loader is necessary upon the seek or resume. } void MultibufferDataSource::SetBitrate(int bitrate) { render_task_runner_->PostTask( FROM_HERE, base::BindOnce(&MultibufferDataSource::SetBitrateTask, weak_factory_.GetWeakPtr(), bitrate)); } void MultibufferDataSource::OnBufferingHaveEnough(bool always_cancel) { DCHECK(render_task_runner_->BelongsToCurrentThread()); if (reader_ && (always_cancel || (preload_ == METADATA && !media_has_played_ && !IsStreaming()))) { cancel_on_defer_ = true; if (!loading_) { base::AutoLock auto_lock(lock_); if (read_op_) { // We can't destroy the reader if a read operation is pending. // UpdateLoadingState_Locked will take care of it after the // operation is done. return; } // Already locked, no need to use SetReader(). reader_.reset(nullptr); } } } int64_t MultibufferDataSource::GetMemoryUsage() { // TODO(hubbe): Make more accurate when url_data_ is shared. return base::checked_cast(url_data_->CachedSize()) << url_data_->multibuffer()->block_size_shift(); } GURL MultibufferDataSource::GetUrlAfterRedirects() const { return url_data_->url(); } void MultibufferDataSource::Read(int64_t position, int size, uint8_t* data, DataSource::ReadCB read_cb) { DVLOG(1) << "Read: " << position << " offset, " << size << " bytes"; // Reading is not allowed until after initialization. DCHECK(!init_cb_); DCHECK(read_cb); { base::AutoLock auto_lock(lock_); DCHECK(!read_op_); if (stop_signal_received_) { std::move(read_cb).Run(kReadError); return; } // Optimization: Try reading from the cache here to get back to // muxing as soon as possible. This works because TryReadAt is // thread-safe. if (reader_) { int bytes_read = reader_->TryReadAt(position, data, size); if (bytes_read > 0) { bytes_read_ += bytes_read; seek_positions_.push_back(position + bytes_read); if (seek_positions_.size() == 1) { render_task_runner_->PostDelayedTask( FROM_HERE, base::BindOnce(&MultibufferDataSource::SeekTask, weak_factory_.GetWeakPtr()), kSeekDelay); } std::move(read_cb).Run(bytes_read); return; } } read_op_ = std::make_unique(position, size, data, std::move(read_cb)); } render_task_runner_->PostTask(FROM_HERE, base::BindOnce(&MultibufferDataSource::ReadTask, weak_factory_.GetWeakPtr())); } bool MultibufferDataSource::GetSize(int64_t* size_out) { base::AutoLock auto_lock(lock_); if (total_bytes_ != kPositionNotSpecified) { *size_out = total_bytes_; return true; } *size_out = 0; return false; } bool MultibufferDataSource::IsStreaming() { return streaming_; } ///////////////////////////////////////////////////////////////////////////// // This method is the place where actual read happens, void MultibufferDataSource::ReadTask() { DCHECK(render_task_runner_->BelongsToCurrentThread()); base::AutoLock auto_lock(lock_); int bytes_read = 0; if (stop_signal_received_ || !read_op_) return; DCHECK(read_op_->size()); if (!reader_) CreateResourceLoader_Locked(read_op_->position(), kPositionNotSpecified); int64_t available = reader_->AvailableAt(read_op_->position()); if (available < 0) { // A failure has occured. ReadOperation::Run(std::move(read_op_), kReadError); return; } if (available) { bytes_read = static_cast(std::min(available, read_op_->size())); bytes_read = reader_->TryReadAt(read_op_->position(), read_op_->data(), bytes_read); bytes_read_ += bytes_read; seek_positions_.push_back(read_op_->position() + bytes_read); if (bytes_read == 0 && total_bytes_ == kPositionNotSpecified) { // We've reached the end of the file and we didn't know the total size // before. Update the total size so Read()s past the end of the file will // fail like they would if we had known the file size at the beginning. total_bytes_ = read_op_->position() + bytes_read; if (total_bytes_ != kPositionNotSpecified) host_->SetTotalBytes(total_bytes_); } ReadOperation::Run(std::move(read_op_), bytes_read); SeekTask_Locked(); } else { reader_->Seek(read_op_->position()); reader_->Wait(1, base::BindOnce(&MultibufferDataSource::ReadTask, weak_factory_.GetWeakPtr())); UpdateLoadingState_Locked(false); } } void MultibufferDataSource::SeekTask() { DCHECK(render_task_runner_->BelongsToCurrentThread()); base::AutoLock auto_lock(lock_); SeekTask_Locked(); } void MultibufferDataSource::SeekTask_Locked() { DCHECK(render_task_runner_->BelongsToCurrentThread()); lock_.AssertAcquired(); if (stop_signal_received_) return; // A read operation is pending, which will call SeekTask_Locked when // it's done. We'll defer any seeking until the read op is done. if (read_op_) return; url_data_->AddBytesRead(bytes_read_); bytes_read_ = 0; if (reader_) { // If we're seeking to a new location, (not just slightly further // in the file) and we have more data buffered in that new location // than in our current location, then we don't actually seek anywhere. // Instead we keep preloading at the old location a while longer. int64_t pos = reader_->Tell(); int64_t available = reader_->Available(); // Iterate backwards, because if two positions have the same // amount of buffered data, we probably want to prefer the latest // one in the array. for (auto i = seek_positions_.rbegin(); i != seek_positions_.rend(); ++i) { int64_t new_pos = *i; int64_t available_at_new_pos = reader_->AvailableAt(new_pos); if (total_bytes_ != kPositionNotSpecified) { if (new_pos + available_at_new_pos >= total_bytes_) { // Buffer reaches end of file, no need to seek here. continue; } } if (available_at_new_pos < available) { pos = new_pos; available = available_at_new_pos; } } reader_->Seek(pos); } seek_positions_.clear(); UpdateLoadingState_Locked(false); } void MultibufferDataSource::StopInternal_Locked() { lock_.AssertAcquired(); if (stop_signal_received_) return; stop_signal_received_ = true; // Initialize() isn't part of the DataSource interface so don't call it in // response to Stop(). init_cb_.Reset(); if (read_op_) ReadOperation::Run(std::move(read_op_), kReadError); } void MultibufferDataSource::StopLoader() { DCHECK(render_task_runner_->BelongsToCurrentThread()); SetReader(nullptr); } void MultibufferDataSource::SetBitrateTask(int bitrate) { DCHECK(render_task_runner_->BelongsToCurrentThread()); bitrate_ = bitrate; UpdateBufferSizes(); } ///////////////////////////////////////////////////////////////////////////// // BufferedResourceLoader callback methods. void MultibufferDataSource::StartCallback() { DCHECK(render_task_runner_->BelongsToCurrentThread()); if (!init_cb_) { SetReader(nullptr); return; } // All responses must be successful. Resources that are assumed to be fully // buffered must have a known content length. bool success = reader_ && reader_->Available() > 0 && url_data_ && (!AssumeFullyBuffered() || url_data_->length() != kPositionNotSpecified); if (success) { { base::AutoLock auto_lock(lock_); total_bytes_ = url_data_->length(); } streaming_ = !AssumeFullyBuffered() && (total_bytes_ == kPositionNotSpecified || !url_data_->range_supported()); media_log_->SetProperty(total_bytes_); media_log_->SetProperty(streaming_); } else { SetReader(nullptr); } // TODO(scherkus): we shouldn't have to lock to signal host(), see // http://crbug.com/113712 for details. base::AutoLock auto_lock(lock_); if (stop_signal_received_) return; if (success) { if (total_bytes_ != kPositionNotSpecified) { host_->SetTotalBytes(total_bytes_); if (AssumeFullyBuffered()) host_->AddBufferedByteRange(0, total_bytes_); } // Progress callback might be called after the start callback, // make sure that we update single_origin_ now. media_log_->SetProperty(single_origin_); media_log_->SetProperty( url_data_->range_supported()); } render_task_runner_->PostTask(FROM_HERE, base::BindOnce(std::move(init_cb_), success)); UpdateBufferSizes(); // Even if data is cached, say that we're loading at this point for // compatibility. UpdateLoadingState_Locked(true); } void MultibufferDataSource::ProgressCallback(int64_t begin, int64_t end) { DVLOG(1) << __func__ << "(" << begin << ", " << end << ")"; DCHECK(render_task_runner_->BelongsToCurrentThread()); base::AutoLock auto_lock(lock_); if (stop_signal_received_) return; if (AssumeFullyBuffered()) return; if (end > begin) host_->AddBufferedByteRange(begin, end); if (buffer_size_update_counter_ > 0) buffer_size_update_counter_--; else UpdateBufferSizes(); UpdateLoadingState_Locked(false); } void MultibufferDataSource::UpdateLoadingState_Locked(bool force_loading) { DVLOG(1) << __func__; lock_.AssertAcquired(); if (AssumeFullyBuffered()) return; // Update loading state. bool is_loading = !!reader_ && reader_->IsLoading(); if (force_loading || is_loading != loading_) { bool loading = is_loading || force_loading; if (!loading && cancel_on_defer_) { if (read_op_) { // We can't destroy the reader if a read operation is pending. // UpdateLoadingState_Locked will be called again when the read // operation is done. return; } // Already locked, no need to use SetReader(). reader_.reset(nullptr); } loading_ = loading; downloading_cb_.Run(loading_); } } void MultibufferDataSource::UpdateProgress() { DCHECK(render_task_runner_->BelongsToCurrentThread()); if (reader_) { uint64_t available = reader_->Available(); uint64_t pos = reader_->Tell(); ProgressCallback(pos, pos + available); } } void MultibufferDataSource::UpdateBufferSizes() { DVLOG(1) << __func__; if (!reader_) return; buffer_size_update_counter_ = kUpdateBufferSizeFrequency; // Use a default bit rate if unknown and clamp to prevent overflow. int64_t bitrate = base::ClampToRange(bitrate_, 0, kMaxBitrate); if (bitrate == 0) bitrate = kDefaultBitrate; // Only scale the buffer window for playback rates greater than 1.0 in // magnitude and clamp to prevent overflow. double playback_rate = playback_rate_; playback_rate = std::max(playback_rate, 1.0); playback_rate = std::min(playback_rate, kMaxPlaybackRate); int64_t bytes_per_second = (bitrate / 8.0) * playback_rate; // Preload 10 seconds of data, clamped to some min/max value. int64_t preload = base::ClampToRange(kTargetSecondsBufferedAhead * bytes_per_second, kMinBufferPreload, kMaxBufferPreload); // Increase buffering slowly at a rate of 10% of data downloaded so // far, maxing out at the preload size. int64_t extra_buffer = std::min( preload, url_data_->BytesReadFromCache() * kSlowPreloadPercentage / 100); // Add extra buffer to preload. preload += extra_buffer; // We preload this much, then we stop unil we read |preload| before resuming. int64_t preload_high = preload + kPreloadHighExtra; // We pin a few seconds of data behind the current reading position. int64_t pin_backward = base::ClampToRange(kTargetSecondsBufferedBehind * bytes_per_second, kMinBufferPreload, kMaxBufferPreload); // We always pin at least kDefaultPinSize ahead of the read position. // Normally, the extra space between preload_high and kDefaultPinSize will // not actually have any data in it, but if it does, we don't want to throw it // away right before we need it. int64_t pin_forward = std::max(preload_high, kDefaultPinSize); // Note that the buffer size is advisory as only non-pinned data is allowed // to be thrown away. Most of the time we pin a region that is larger than // |buffer_size|, which only makes sense because most of the time, some of // the data in pinned region is not present in the cache. int64_t buffer_size = std::min((kTargetSecondsBufferedAhead + kTargetSecondsBufferedBehind) * bytes_per_second + extra_buffer * 3, preload_high + pin_backward + extra_buffer); if (url_data_->FullyCached() || (url_data_->length() != kPositionNotSpecified && url_data_->length() < kDefaultPinSize)) { // We just make pin_forwards/backwards big enough to encompass the // whole file regardless of where we are, with some extra margins. pin_forward = std::max(pin_forward, url_data_->length() * 2); pin_backward = std::max(pin_backward, url_data_->length() * 2); buffer_size = url_data_->length(); } reader_->SetMaxBuffer(buffer_size); reader_->SetPinRange(pin_backward, pin_forward); if (preload_ == METADATA) { preload_high >>= kMetadataShift; preload >>= kMetadataShift; } reader_->SetPreload(preload_high, preload); } } // namespace media