summaryrefslogtreecommitdiff
path: root/webrtc/rtc_base/swap_queue.h
diff options
context:
space:
mode:
Diffstat (limited to 'webrtc/rtc_base/swap_queue.h')
-rw-r--r--webrtc/rtc_base/swap_queue.h249
1 files changed, 249 insertions, 0 deletions
diff --git a/webrtc/rtc_base/swap_queue.h b/webrtc/rtc_base/swap_queue.h
new file mode 100644
index 0000000..9eac49a
--- /dev/null
+++ b/webrtc/rtc_base/swap_queue.h
@@ -0,0 +1,249 @@
+/*
+ * Copyright (c) 2015 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef RTC_BASE_SWAP_QUEUE_H_
+#define RTC_BASE_SWAP_QUEUE_H_
+
+#include <stddef.h>
+
+#include <atomic>
+#include <utility>
+#include <vector>
+
+#include "rtc_base/checks.h"
+#include "rtc_base/system/unused.h"
+
+namespace webrtc {
+
+namespace internal {
+
+// (Internal; please don't use outside this file.)
+template <typename T>
+bool NoopSwapQueueItemVerifierFunction(const T&) {
+ return true;
+}
+
+} // namespace internal
+
+// Functor to use when supplying a verifier function for the queue.
+template <typename T,
+ bool (*QueueItemVerifierFunction)(const T&) =
+ internal::NoopSwapQueueItemVerifierFunction>
+class SwapQueueItemVerifier {
+ public:
+ bool operator()(const T& t) const { return QueueItemVerifierFunction(t); }
+};
+
+// This class is a fixed-size queue. A single producer calls Insert() to insert
+// an element of type T at the back of the queue, and a single consumer calls
+// Remove() to remove an element from the front of the queue. It's safe for the
+// producer and the consumer to access the queue concurrently, from different
+// threads.
+//
+// To avoid the construction, copying, and destruction of Ts that a naive
+// queue implementation would require, for each "full" T passed from
+// producer to consumer, SwapQueue<T> passes an "empty" T in the other
+// direction (an "empty" T is one that contains nothing of value for the
+// consumer). This bidirectional movement is implemented with swap().
+//
+// // Create queue:
+// Bottle proto(568); // Prepare an empty Bottle. Heap allocates space for
+// // 568 ml.
+// SwapQueue<Bottle> q(N, proto); // Init queue with N copies of proto.
+// // Each copy allocates on the heap.
+// // Producer pseudo-code:
+// Bottle b(568); // Prepare an empty Bottle. Heap allocates space for 568 ml.
+// loop {
+// b.Fill(amount); // Where amount <= 568 ml.
+// q.Insert(&b); // Swap our full Bottle for an empty one from q.
+// }
+//
+// // Consumer pseudo-code:
+// Bottle b(568); // Prepare an empty Bottle. Heap allocates space for 568 ml.
+// loop {
+// q.Remove(&b); // Swap our empty Bottle for the next-in-line full Bottle.
+// Drink(&b);
+// }
+//
+// For a well-behaved Bottle class, there are no allocations in the
+// producer, since it just fills an empty Bottle that's already large
+// enough; no deallocations in the consumer, since it returns each empty
+// Bottle to the queue after having drunk it; and no copies along the
+// way, since the queue uses swap() everywhere to move full Bottles in
+// one direction and empty ones in the other.
+template <typename T, typename QueueItemVerifier = SwapQueueItemVerifier<T>>
+class SwapQueue {
+ public:
+ // Creates a queue of size size and fills it with default constructed Ts.
+ explicit SwapQueue(size_t size) : queue_(size) {
+ RTC_DCHECK(VerifyQueueSlots());
+ }
+
+ // Same as above and accepts an item verification functor.
+ SwapQueue(size_t size, const QueueItemVerifier& queue_item_verifier)
+ : queue_item_verifier_(queue_item_verifier), queue_(size) {
+ RTC_DCHECK(VerifyQueueSlots());
+ }
+
+ // Creates a queue of size size and fills it with copies of prototype.
+ SwapQueue(size_t size, const T& prototype) : queue_(size, prototype) {
+ RTC_DCHECK(VerifyQueueSlots());
+ }
+
+ // Same as above and accepts an item verification functor.
+ SwapQueue(size_t size,
+ const T& prototype,
+ const QueueItemVerifier& queue_item_verifier)
+ : queue_item_verifier_(queue_item_verifier), queue_(size, prototype) {
+ RTC_DCHECK(VerifyQueueSlots());
+ }
+
+ // Resets the queue to have zero content while maintaining the queue size.
+ // Just like Remove(), this can only be called (safely) from the
+ // consumer.
+ void Clear() {
+ // Drop all non-empty elements by resetting num_elements_ and incrementing
+ // next_read_index_ by the previous value of num_elements_. Relaxed memory
+ // ordering is sufficient since the dropped elements are not accessed.
+ next_read_index_ += std::atomic_exchange_explicit(
+ &num_elements_, size_t{0}, std::memory_order_relaxed);
+ if (next_read_index_ >= queue_.size()) {
+ next_read_index_ -= queue_.size();
+ }
+
+ RTC_DCHECK_LT(next_read_index_, queue_.size());
+ }
+
+ // Inserts a "full" T at the back of the queue by swapping *input with an
+ // "empty" T from the queue.
+ // Returns true if the item was inserted or false if not (the queue was full).
+ // When specified, the T given in *input must pass the ItemVerifier() test.
+ // The contents of *input after the call are then also guaranteed to pass the
+ // ItemVerifier() test.
+ bool Insert(T* input) RTC_WARN_UNUSED_RESULT {
+ RTC_DCHECK(input);
+
+ RTC_DCHECK(queue_item_verifier_(*input));
+
+ // Load the value of num_elements_. Acquire memory ordering prevents reads
+ // and writes to queue_[next_write_index_] to be reordered to before the
+ // load. (That element might be accessed by a concurrent call to Remove()
+ // until the load finishes.)
+ if (std::atomic_load_explicit(&num_elements_, std::memory_order_acquire) ==
+ queue_.size()) {
+ return false;
+ }
+
+ using std::swap;
+ swap(*input, queue_[next_write_index_]);
+
+ // Increment the value of num_elements_ to account for the inserted element.
+ // Release memory ordering prevents the reads and writes to
+ // queue_[next_write_index_] to be reordered to after the increment. (Once
+ // the increment has finished, Remove() might start accessing that element.)
+ const size_t old_num_elements = std::atomic_fetch_add_explicit(
+ &num_elements_, size_t{1}, std::memory_order_release);
+
+ ++next_write_index_;
+ if (next_write_index_ == queue_.size()) {
+ next_write_index_ = 0;
+ }
+
+ RTC_DCHECK_LT(next_write_index_, queue_.size());
+ RTC_DCHECK_LT(old_num_elements, queue_.size());
+
+ return true;
+ }
+
+ // Removes the frontmost "full" T from the queue by swapping it with
+ // the "empty" T in *output.
+ // Returns true if an item could be removed or false if not (the queue was
+ // empty). When specified, The T given in *output must pass the ItemVerifier()
+ // test and the contents of *output after the call are then also guaranteed to
+ // pass the ItemVerifier() test.
+ bool Remove(T* output) RTC_WARN_UNUSED_RESULT {
+ RTC_DCHECK(output);
+
+ RTC_DCHECK(queue_item_verifier_(*output));
+
+ // Load the value of num_elements_. Acquire memory ordering prevents reads
+ // and writes to queue_[next_read_index_] to be reordered to before the
+ // load. (That element might be accessed by a concurrent call to Insert()
+ // until the load finishes.)
+ if (std::atomic_load_explicit(&num_elements_, std::memory_order_acquire) ==
+ 0) {
+ return false;
+ }
+
+ using std::swap;
+ swap(*output, queue_[next_read_index_]);
+
+ // Decrement the value of num_elements_ to account for the removed element.
+ // Release memory ordering prevents the reads and writes to
+ // queue_[next_write_index_] to be reordered to after the decrement. (Once
+ // the decrement has finished, Insert() might start accessing that element.)
+ std::atomic_fetch_sub_explicit(&num_elements_, size_t{1},
+ std::memory_order_release);
+
+ ++next_read_index_;
+ if (next_read_index_ == queue_.size()) {
+ next_read_index_ = 0;
+ }
+
+ RTC_DCHECK_LT(next_read_index_, queue_.size());
+
+ return true;
+ }
+
+ // Returns the current number of elements in the queue. Since elements may be
+ // concurrently added to the queue, the caller must treat this as a lower
+ // bound, not an exact count.
+ // May only be called by the consumer.
+ size_t SizeAtLeast() const {
+ // Acquire memory ordering ensures that we wait for the producer to finish
+ // inserting any element in progress.
+ return std::atomic_load_explicit(&num_elements_, std::memory_order_acquire);
+ }
+
+ private:
+ // Verify that the queue slots complies with the ItemVerifier test. This
+ // function is not thread-safe and can only be used in the constructors.
+ bool VerifyQueueSlots() {
+ for (const auto& v : queue_) {
+ RTC_DCHECK(queue_item_verifier_(v));
+ }
+ return true;
+ }
+
+ // TODO(peah): Change this to use std::function() once we can use C++11 std
+ // lib.
+ QueueItemVerifier queue_item_verifier_;
+
+ // Only accessed by the single producer.
+ size_t next_write_index_ = 0;
+
+ // Only accessed by the single consumer.
+ size_t next_read_index_ = 0;
+
+ // Accessed by both the producer and the consumer and used for synchronization
+ // between them.
+ std::atomic<size_t> num_elements_{0};
+
+ // The elements of the queue are acced by both the producer and the consumer,
+ // mediated by num_elements_. queue_.size() is constant.
+ std::vector<T> queue_;
+
+ SwapQueue(const SwapQueue&) = delete;
+ SwapQueue& operator=(const SwapQueue&) = delete;
+};
+
+} // namespace webrtc
+
+#endif // RTC_BASE_SWAP_QUEUE_H_