summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/modules/module-combine-sink.c108
1 files changed, 80 insertions, 28 deletions
diff --git a/src/modules/module-combine-sink.c b/src/modules/module-combine-sink.c
index 602fa877f..b6322c6b7 100644
--- a/src/modules/module-combine-sink.c
+++ b/src/modules/module-combine-sink.c
@@ -90,9 +90,27 @@ struct output {
pa_sink_input *sink_input;
bool ignore_state_change;
- pa_asyncmsgq *inq, /* Message queue from the sink thread to this sink input */
- *outq; /* Message queue from this sink input to the sink thread */
- pa_rtpoll_item *inq_rtpoll_item_read, *inq_rtpoll_item_write;
+ /* This message queue is only for POST messages, i.e. the messages that
+ * carry audio data from the sink thread to the output thread. The POST
+ * messages need to be handled in a separate queue, because the queue is
+ * processed not only in the output thread mainloop, but also inside the
+ * sink input pop() callback. Processing other messages (such as
+ * SET_REQUESTED_LATENCY) is not safe inside the pop() callback; at least
+ * one reason why it's not safe is that messages that generate rewind
+ * requests (such as SET_REQUESTED_LATENCY) cause crashes when processed
+ * in the pop() callback. */
+ pa_asyncmsgq *audio_inq;
+
+ /* This message queue is for all other messages than POST from the sink
+ * thread to the output thread (currently "all other messages" means just
+ * the SET_REQUESTED_LATENCY message). */
+ pa_asyncmsgq *control_inq;
+
+ /* Message queue from the output thread to the sink thread. */
+ pa_asyncmsgq *outq;
+
+ pa_rtpoll_item *audio_inq_rtpoll_item_read, *audio_inq_rtpoll_item_write;
+ pa_rtpoll_item *control_inq_rtpoll_item_read, *control_inq_rtpoll_item_write;
pa_rtpoll_item *outq_rtpoll_item_read, *outq_rtpoll_item_write;
pa_memblockq *memblockq;
@@ -352,7 +370,7 @@ finish:
pa_log_debug("Thread shutting down");
}
-/* Called from I/O thread context */
+/* Called from combine sink I/O thread context */
static void render_memblock(struct userdata *u, struct output *o, size_t length) {
pa_assert(u);
pa_assert(o);
@@ -367,7 +385,7 @@ static void render_memblock(struct userdata *u, struct output *o, size_t length)
/* Maybe there's some data in the requesting output's queue
* now? */
- while (pa_asyncmsgq_process_one(o->inq) > 0)
+ while (pa_asyncmsgq_process_one(o->audio_inq) > 0)
;
/* Ok, now let's prepare some data if we really have to */
@@ -385,7 +403,7 @@ static void render_memblock(struct userdata *u, struct output *o, size_t length)
if (j == o)
continue;
- pa_asyncmsgq_post(j->inq, PA_MSGOBJECT(j->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, &chunk, NULL);
+ pa_asyncmsgq_post(j->audio_inq, PA_MSGOBJECT(j->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, &chunk, NULL);
}
/* And place it directly into the requesting output's queue */
@@ -403,7 +421,7 @@ static void request_memblock(struct output *o, size_t length) {
/* If another thread already prepared some data we received
* the data over the asyncmsgq, hence let's first process
* it. */
- while (pa_asyncmsgq_process_one(o->inq) > 0)
+ while (pa_asyncmsgq_process_one(o->audio_inq) > 0)
;
/* Check whether we're now readable */
@@ -514,12 +532,19 @@ static void sink_input_attach_cb(pa_sink_input *i) {
pa_assert_se(o = i->userdata);
/* Set up the queue from the sink thread to us */
- pa_assert(!o->inq_rtpoll_item_read && !o->outq_rtpoll_item_write);
+ pa_assert(!o->audio_inq_rtpoll_item_read);
+ pa_assert(!o->control_inq_rtpoll_item_read);
+ pa_assert(!o->outq_rtpoll_item_write);
- o->inq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
+ o->audio_inq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
i->sink->thread_info.rtpoll,
PA_RTPOLL_LATE, /* This one is not that important, since we check for data in _peek() anyway. */
- o->inq);
+ o->audio_inq);
+
+ o->control_inq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
+ i->sink->thread_info.rtpoll,
+ PA_RTPOLL_NORMAL,
+ o->control_inq);
o->outq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
i->sink->thread_info.rtpoll,
@@ -559,9 +584,14 @@ static void sink_input_detach_cb(pa_sink_input *i) {
* pass any further data to this output */
pa_asyncmsgq_send(o->userdata->sink->asyncmsgq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_REMOVE_OUTPUT, o, 0, NULL);
- if (o->inq_rtpoll_item_read) {
- pa_rtpoll_item_free(o->inq_rtpoll_item_read);
- o->inq_rtpoll_item_read = NULL;
+ if (o->audio_inq_rtpoll_item_read) {
+ pa_rtpoll_item_free(o->audio_inq_rtpoll_item_read);
+ o->audio_inq_rtpoll_item_read = NULL;
+ }
+
+ if (o->control_inq_rtpoll_item_read) {
+ pa_rtpoll_item_free(o->control_inq_rtpoll_item_read);
+ o->control_inq_rtpoll_item_read = NULL;
}
if (o->outq_rtpoll_item_write) {
@@ -756,16 +786,22 @@ static void output_add_within_thread(struct output *o) {
PA_LLIST_PREPEND(struct output, o->userdata->thread_info.active_outputs, o);
- pa_assert(!o->outq_rtpoll_item_read && !o->inq_rtpoll_item_write);
+ pa_assert(!o->outq_rtpoll_item_read);
+ pa_assert(!o->audio_inq_rtpoll_item_write);
+ pa_assert(!o->control_inq_rtpoll_item_write);
o->outq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
o->userdata->rtpoll,
PA_RTPOLL_EARLY-1, /* This item is very important */
o->outq);
- o->inq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
+ o->audio_inq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
o->userdata->rtpoll,
PA_RTPOLL_EARLY,
- o->inq);
+ o->audio_inq);
+ o->control_inq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
+ o->userdata->rtpoll,
+ PA_RTPOLL_NORMAL,
+ o->control_inq);
}
/* Called from thread context of the io thread */
@@ -780,9 +816,14 @@ static void output_remove_within_thread(struct output *o) {
o->outq_rtpoll_item_read = NULL;
}
- if (o->inq_rtpoll_item_write) {
- pa_rtpoll_item_free(o->inq_rtpoll_item_write);
- o->inq_rtpoll_item_write = NULL;
+ if (o->audio_inq_rtpoll_item_write) {
+ pa_rtpoll_item_free(o->audio_inq_rtpoll_item_write);
+ o->audio_inq_rtpoll_item_write = NULL;
+ }
+
+ if (o->control_inq_rtpoll_item_write) {
+ pa_rtpoll_item_free(o->control_inq_rtpoll_item_write);
+ o->control_inq_rtpoll_item_write = NULL;
}
}
@@ -803,7 +844,8 @@ static void sink_update_requested_latency(pa_sink *s) {
/* Just hand this one over to all sink_inputs */
PA_LLIST_FOREACH(o, u->thread_info.active_outputs) {
- pa_asyncmsgq_post(o->inq, PA_MSGOBJECT(o->sink_input), SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY, NULL, u->block_usec, NULL, NULL);
+ pa_asyncmsgq_post(o->control_inq, PA_MSGOBJECT(o->sink_input), SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY, NULL,
+ u->block_usec, NULL, NULL);
}
}
@@ -977,7 +1019,8 @@ static struct output *output_new(struct userdata *u, pa_sink *sink) {
o = pa_xnew0(struct output, 1);
o->userdata = u;
- o->inq = pa_asyncmsgq_new(0);
+ o->audio_inq = pa_asyncmsgq_new(0);
+ o->control_inq = pa_asyncmsgq_new(0);
o->outq = pa_asyncmsgq_new(0);
o->sink = sink;
o->memblockq = pa_memblockq_new(
@@ -1004,18 +1047,26 @@ static void output_free(struct output *o) {
output_disable(o);
update_description(o->userdata);
- if (o->inq_rtpoll_item_read)
- pa_rtpoll_item_free(o->inq_rtpoll_item_read);
- if (o->inq_rtpoll_item_write)
- pa_rtpoll_item_free(o->inq_rtpoll_item_write);
+ if (o->audio_inq_rtpoll_item_read)
+ pa_rtpoll_item_free(o->audio_inq_rtpoll_item_read);
+ if (o->audio_inq_rtpoll_item_write)
+ pa_rtpoll_item_free(o->audio_inq_rtpoll_item_write);
+
+ if (o->control_inq_rtpoll_item_read)
+ pa_rtpoll_item_free(o->control_inq_rtpoll_item_read);
+ if (o->control_inq_rtpoll_item_write)
+ pa_rtpoll_item_free(o->control_inq_rtpoll_item_write);
if (o->outq_rtpoll_item_read)
pa_rtpoll_item_free(o->outq_rtpoll_item_read);
if (o->outq_rtpoll_item_write)
pa_rtpoll_item_free(o->outq_rtpoll_item_write);
- if (o->inq)
- pa_asyncmsgq_unref(o->inq);
+ if (o->audio_inq)
+ pa_asyncmsgq_unref(o->audio_inq);
+
+ if (o->control_inq)
+ pa_asyncmsgq_unref(o->control_inq);
if (o->outq)
pa_asyncmsgq_unref(o->outq);
@@ -1068,7 +1119,8 @@ static void output_disable(struct output *o) {
/* Finally, drop all queued data */
pa_memblockq_flush_write(o->memblockq, true);
- pa_asyncmsgq_flush(o->inq, false);
+ pa_asyncmsgq_flush(o->audio_inq, false);
+ pa_asyncmsgq_flush(o->control_inq, false);
pa_asyncmsgq_flush(o->outq, false);
}