diff options
-rw-r--r-- | src/modules/module-combine-sink.c | 108 |
1 files changed, 80 insertions, 28 deletions
diff --git a/src/modules/module-combine-sink.c b/src/modules/module-combine-sink.c index 602fa877f..b6322c6b7 100644 --- a/src/modules/module-combine-sink.c +++ b/src/modules/module-combine-sink.c @@ -90,9 +90,27 @@ struct output { pa_sink_input *sink_input; bool ignore_state_change; - pa_asyncmsgq *inq, /* Message queue from the sink thread to this sink input */ - *outq; /* Message queue from this sink input to the sink thread */ - pa_rtpoll_item *inq_rtpoll_item_read, *inq_rtpoll_item_write; + /* This message queue is only for POST messages, i.e. the messages that + * carry audio data from the sink thread to the output thread. The POST + * messages need to be handled in a separate queue, because the queue is + * processed not only in the output thread mainloop, but also inside the + * sink input pop() callback. Processing other messages (such as + * SET_REQUESTED_LATENCY) is not safe inside the pop() callback; at least + * one reason why it's not safe is that messages that generate rewind + * requests (such as SET_REQUESTED_LATENCY) cause crashes when processed + * in the pop() callback. */ + pa_asyncmsgq *audio_inq; + + /* This message queue is for all other messages than POST from the sink + * thread to the output thread (currently "all other messages" means just + * the SET_REQUESTED_LATENCY message). */ + pa_asyncmsgq *control_inq; + + /* Message queue from the output thread to the sink thread. */ + pa_asyncmsgq *outq; + + pa_rtpoll_item *audio_inq_rtpoll_item_read, *audio_inq_rtpoll_item_write; + pa_rtpoll_item *control_inq_rtpoll_item_read, *control_inq_rtpoll_item_write; pa_rtpoll_item *outq_rtpoll_item_read, *outq_rtpoll_item_write; pa_memblockq *memblockq; @@ -352,7 +370,7 @@ finish: pa_log_debug("Thread shutting down"); } -/* Called from I/O thread context */ +/* Called from combine sink I/O thread context */ static void render_memblock(struct userdata *u, struct output *o, size_t length) { pa_assert(u); pa_assert(o); @@ -367,7 +385,7 @@ static void render_memblock(struct userdata *u, struct output *o, size_t length) /* Maybe there's some data in the requesting output's queue * now? */ - while (pa_asyncmsgq_process_one(o->inq) > 0) + while (pa_asyncmsgq_process_one(o->audio_inq) > 0) ; /* Ok, now let's prepare some data if we really have to */ @@ -385,7 +403,7 @@ static void render_memblock(struct userdata *u, struct output *o, size_t length) if (j == o) continue; - pa_asyncmsgq_post(j->inq, PA_MSGOBJECT(j->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, &chunk, NULL); + pa_asyncmsgq_post(j->audio_inq, PA_MSGOBJECT(j->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, &chunk, NULL); } /* And place it directly into the requesting output's queue */ @@ -403,7 +421,7 @@ static void request_memblock(struct output *o, size_t length) { /* If another thread already prepared some data we received * the data over the asyncmsgq, hence let's first process * it. */ - while (pa_asyncmsgq_process_one(o->inq) > 0) + while (pa_asyncmsgq_process_one(o->audio_inq) > 0) ; /* Check whether we're now readable */ @@ -514,12 +532,19 @@ static void sink_input_attach_cb(pa_sink_input *i) { pa_assert_se(o = i->userdata); /* Set up the queue from the sink thread to us */ - pa_assert(!o->inq_rtpoll_item_read && !o->outq_rtpoll_item_write); + pa_assert(!o->audio_inq_rtpoll_item_read); + pa_assert(!o->control_inq_rtpoll_item_read); + pa_assert(!o->outq_rtpoll_item_write); - o->inq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read( + o->audio_inq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read( i->sink->thread_info.rtpoll, PA_RTPOLL_LATE, /* This one is not that important, since we check for data in _peek() anyway. */ - o->inq); + o->audio_inq); + + o->control_inq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read( + i->sink->thread_info.rtpoll, + PA_RTPOLL_NORMAL, + o->control_inq); o->outq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write( i->sink->thread_info.rtpoll, @@ -559,9 +584,14 @@ static void sink_input_detach_cb(pa_sink_input *i) { * pass any further data to this output */ pa_asyncmsgq_send(o->userdata->sink->asyncmsgq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_REMOVE_OUTPUT, o, 0, NULL); - if (o->inq_rtpoll_item_read) { - pa_rtpoll_item_free(o->inq_rtpoll_item_read); - o->inq_rtpoll_item_read = NULL; + if (o->audio_inq_rtpoll_item_read) { + pa_rtpoll_item_free(o->audio_inq_rtpoll_item_read); + o->audio_inq_rtpoll_item_read = NULL; + } + + if (o->control_inq_rtpoll_item_read) { + pa_rtpoll_item_free(o->control_inq_rtpoll_item_read); + o->control_inq_rtpoll_item_read = NULL; } if (o->outq_rtpoll_item_write) { @@ -756,16 +786,22 @@ static void output_add_within_thread(struct output *o) { PA_LLIST_PREPEND(struct output, o->userdata->thread_info.active_outputs, o); - pa_assert(!o->outq_rtpoll_item_read && !o->inq_rtpoll_item_write); + pa_assert(!o->outq_rtpoll_item_read); + pa_assert(!o->audio_inq_rtpoll_item_write); + pa_assert(!o->control_inq_rtpoll_item_write); o->outq_rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read( o->userdata->rtpoll, PA_RTPOLL_EARLY-1, /* This item is very important */ o->outq); - o->inq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write( + o->audio_inq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write( o->userdata->rtpoll, PA_RTPOLL_EARLY, - o->inq); + o->audio_inq); + o->control_inq_rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write( + o->userdata->rtpoll, + PA_RTPOLL_NORMAL, + o->control_inq); } /* Called from thread context of the io thread */ @@ -780,9 +816,14 @@ static void output_remove_within_thread(struct output *o) { o->outq_rtpoll_item_read = NULL; } - if (o->inq_rtpoll_item_write) { - pa_rtpoll_item_free(o->inq_rtpoll_item_write); - o->inq_rtpoll_item_write = NULL; + if (o->audio_inq_rtpoll_item_write) { + pa_rtpoll_item_free(o->audio_inq_rtpoll_item_write); + o->audio_inq_rtpoll_item_write = NULL; + } + + if (o->control_inq_rtpoll_item_write) { + pa_rtpoll_item_free(o->control_inq_rtpoll_item_write); + o->control_inq_rtpoll_item_write = NULL; } } @@ -803,7 +844,8 @@ static void sink_update_requested_latency(pa_sink *s) { /* Just hand this one over to all sink_inputs */ PA_LLIST_FOREACH(o, u->thread_info.active_outputs) { - pa_asyncmsgq_post(o->inq, PA_MSGOBJECT(o->sink_input), SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY, NULL, u->block_usec, NULL, NULL); + pa_asyncmsgq_post(o->control_inq, PA_MSGOBJECT(o->sink_input), SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY, NULL, + u->block_usec, NULL, NULL); } } @@ -977,7 +1019,8 @@ static struct output *output_new(struct userdata *u, pa_sink *sink) { o = pa_xnew0(struct output, 1); o->userdata = u; - o->inq = pa_asyncmsgq_new(0); + o->audio_inq = pa_asyncmsgq_new(0); + o->control_inq = pa_asyncmsgq_new(0); o->outq = pa_asyncmsgq_new(0); o->sink = sink; o->memblockq = pa_memblockq_new( @@ -1004,18 +1047,26 @@ static void output_free(struct output *o) { output_disable(o); update_description(o->userdata); - if (o->inq_rtpoll_item_read) - pa_rtpoll_item_free(o->inq_rtpoll_item_read); - if (o->inq_rtpoll_item_write) - pa_rtpoll_item_free(o->inq_rtpoll_item_write); + if (o->audio_inq_rtpoll_item_read) + pa_rtpoll_item_free(o->audio_inq_rtpoll_item_read); + if (o->audio_inq_rtpoll_item_write) + pa_rtpoll_item_free(o->audio_inq_rtpoll_item_write); + + if (o->control_inq_rtpoll_item_read) + pa_rtpoll_item_free(o->control_inq_rtpoll_item_read); + if (o->control_inq_rtpoll_item_write) + pa_rtpoll_item_free(o->control_inq_rtpoll_item_write); if (o->outq_rtpoll_item_read) pa_rtpoll_item_free(o->outq_rtpoll_item_read); if (o->outq_rtpoll_item_write) pa_rtpoll_item_free(o->outq_rtpoll_item_write); - if (o->inq) - pa_asyncmsgq_unref(o->inq); + if (o->audio_inq) + pa_asyncmsgq_unref(o->audio_inq); + + if (o->control_inq) + pa_asyncmsgq_unref(o->control_inq); if (o->outq) pa_asyncmsgq_unref(o->outq); @@ -1068,7 +1119,8 @@ static void output_disable(struct output *o) { /* Finally, drop all queued data */ pa_memblockq_flush_write(o->memblockq, true); - pa_asyncmsgq_flush(o->inq, false); + pa_asyncmsgq_flush(o->audio_inq, false); + pa_asyncmsgq_flush(o->control_inq, false); pa_asyncmsgq_flush(o->outq, false); } |