summaryrefslogtreecommitdiff
path: root/lib/mpsc-queue.h
blob: 70c2d7a01ecffe7c737b7790f0923e69b147d5c0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
/*
 * Copyright (c) 2021 NVIDIA Corporation.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at:
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#ifndef MPSC_QUEUE_H
#define MPSC_QUEUE_H 1

#include <stdbool.h>
#include <stdint.h>
#include <stddef.h>

#include <openvswitch/thread.h>
#include <openvswitch/util.h>

#include "ovs-atomic.h"

/* Multi-producer, single-consumer queue
 * =====================================
 *
 * This data structure is a lockless queue implementation with
 * the following properties:
 *
 *  * Multi-producer: multiple threads can write concurrently.
 *    Insertion in the queue is thread-safe, no inter-thread
 *    synchronization is necessary.
 *
 *  * Single-consumer: only a single thread can safely remove
 *    nodes from the queue.  The queue must be 'acquired' using
 *    'mpsc_queue_acquire()' before removing nodes.
 *
 *  * Unbounded: the queue is backed by a linked-list and is not
 *    limited in number of elements.
 *
 *  * Intrusive: queue elements are allocated as part of larger
 *    objects.  Objects are retrieved by offset manipulation.
 *
 *  * per-producer FIFO: Elements in the queue are kept in the
 *    order their producer inserted them.  The consumer retrieves
 *    them in in the same insertion order.  When multiple
 *    producers insert at the same time, either will proceed.
 *
 * This queue is well-suited for message passing between threads,
 * where any number of thread can insert a message and a single
 * thread is meant to receive and process it.
 *
 * Thread-safety
 * =============
 *
 *  The consumer thread must acquire the queue using 'mpsc_queue_acquire()'.
 *  Once the queue is protected against concurrent reads, the thread can call
 *  the consumer API:
 *
 *      * mpsc_queue_poll() to peek and return the tail of the queue
 *      * mpsc_queue_pop() to remove the tail of the queue
 *      * mpsc_queue_tail() to read the current tail
 *      * mpsc_queue_push_front() to enqueue an element safely at the tail
 *      * MPSC_QUEUE_FOR_EACH() to iterate over the current elements,
 *        without removing them.
 *      * MPSC_QUEUE_FOR_EACH_POP() to iterate over the elements while
 *        removing them.
 *
 *  When a thread is finished with reading the queue, it can release the
 *  reader lock using 'mpsc_queue_release()'.
 *
 *  Producers can always insert elements in the queue, even if no consumer
 *  acquired the reader lock.  No inter-producer synchronization is needed.
 *
 *  The consumer thread is also allowed to insert elements while it holds the
 *  reader lock.
 *
 *  Producer threads must never be cancelled while writing to the queue.
 *  This will block the consumer, that will then lose any subsequent elements
 *  in the queue.  Producers should ideally be cooperatively managed or
 *  the queue insertion should be within non-cancellable sections.
 *
 * Queue state
 * ===========
 *
 *  When polling the queue, three states can be observed: 'empty', 'non-empty',
 *  and 'inconsistent'.  Three polling results are defined, respectively:
 *
 *   * MPSC_QUEUE_EMPTY: the queue is empty.
 *   * MPSC_QUEUE_ITEM: an item was available and has been removed.
 *   * MPSC_QUEUE_RETRY: the queue is inconsistent.
 *
 *  If 'MPSC_QUEUE_RETRY' is returned, then a producer has not yet finished
 *  writing to the queue and the list of nodes is not coherent.  The consumer
 *  can retry shortly to check if the producer has finished.
 *
 *  This behavior is the reason the removal function is called
 *  'mpsc_queue_poll()'.
 *
 */

struct mpsc_queue_node {
    ATOMIC(struct mpsc_queue_node *) next;
};

struct mpsc_queue {
    ATOMIC(struct mpsc_queue_node *) head;
    ATOMIC(struct mpsc_queue_node *) tail;
    struct mpsc_queue_node stub;
    struct ovs_mutex read_lock;
};

#define MPSC_QUEUE_INITIALIZER(Q) { \
    .head = &(Q)->stub, \
    .tail = &(Q)->stub, \
    .stub = { .next = NULL }, \
    .read_lock = OVS_MUTEX_INITIALIZER, \
}

/* Consumer API. */

/* Initialize the queue. Not necessary is 'MPSC_QUEUE_INITIALIZER' was used. */
void mpsc_queue_init(struct mpsc_queue *queue);
/* The reader lock must be released prior to destroying the queue. */
void mpsc_queue_destroy(struct mpsc_queue *queue);

/* Acquire and release the consumer lock. */
#define mpsc_queue_acquire(q) do { \
        ovs_mutex_lock(&(q)->read_lock); \
    } while (0)
#define mpsc_queue_release(q) do { \
        ovs_mutex_unlock(&(q)->read_lock); \
    } while (0)

enum mpsc_queue_poll_result {
    /* Queue is empty. */
    MPSC_QUEUE_EMPTY,
    /* Polling the queue returned an item. */
    MPSC_QUEUE_ITEM,
    /* Data has been enqueued but one or more producer thread have not
     * finished writing it. The queue is in an inconsistent state.
     * Retrying shortly, if the producer threads are still active, will
     * return the data.
     */
    MPSC_QUEUE_RETRY,
};

/* Set 'node' to a removed item from the queue if 'MPSC_QUEUE_ITEM' is
 * returned, otherwise 'node' is not set.
 */
enum mpsc_queue_poll_result mpsc_queue_poll(struct mpsc_queue *queue,
                                            struct mpsc_queue_node **node)
    OVS_REQUIRES(queue->read_lock);

/* Pop an element if there is any in the queue. */
struct mpsc_queue_node *mpsc_queue_pop(struct mpsc_queue *queue)
    OVS_REQUIRES(queue->read_lock);

/* Insert at the front of the queue. Only the consumer can do it. */
void mpsc_queue_push_front(struct mpsc_queue *queue,
                           struct mpsc_queue_node *node)
    OVS_REQUIRES(queue->read_lock);

/* Get the current queue tail. */
struct mpsc_queue_node *mpsc_queue_tail(struct mpsc_queue *queue)
    OVS_REQUIRES(queue->read_lock);

/* Get the next element of a node. */
struct mpsc_queue_node *mpsc_queue_next(struct mpsc_queue *queue,
                                        struct mpsc_queue_node *prev)
    OVS_REQUIRES(queue->read_lock);

#define MPSC_QUEUE_FOR_EACH(node, queue) \
    for (node = mpsc_queue_tail(queue); node != NULL; \
         node = mpsc_queue_next((queue), node))

#define MPSC_QUEUE_FOR_EACH_POP(node, queue) \
    for (node = mpsc_queue_pop(queue); node != NULL; \
         node = mpsc_queue_pop(queue))

/* Producer API. */

void mpsc_queue_insert(struct mpsc_queue *queue, struct mpsc_queue_node *node);

#endif /* MPSC_QUEUE_H */